Skip to main content

xlog_cuda/provider/
launch_safe.rs

1//! First-slice migrated launch path through the v0.6 launch
2//! recorder.
3//!
4//! Adds a single, narrow method
5//! [`CudaKernelProvider::memset_recorded`] that performs an
6//! async memset on a caller-supplied launch stream **and**
7//! records the buffer use against the runtime via the
8//! [`crate::launch::LaunchRecorder`]. This is intentionally the
9//! simplest possible "real launch" path:
10//!   * one buffer arg (a write),
11//!   * one CUDA call (`cuMemsetD8Async`),
12//!   * a launch_stream that can differ from the buffer's
13//!     `alloc_stream`,
14//!   * an explicit `commit` step that surfaces
15//!     `ResourceError::StreamMisuse` from the runtime if the
16//!     active resource cannot track cross-stream uses.
17//!
18//! No existing operator is modified by this slice. Existing
19//! kernel launches (filter, compact, dedup, hash join, etc.)
20//! continue to use raw `Vec<*mut c_void>` and remain unsafe by
21//! themselves. They will be migrated in follow-up commits once
22//! this minimal path is certified.
23
24use cudarc::driver::sys;
25use xlog_core::{Result, XlogError};
26
27use crate::device_runtime::StreamId;
28use crate::launch::LaunchRecorder;
29use crate::memory::{CudaColumn, TrackedCudaSlice};
30
31impl super::CudaKernelProvider {
32    /// Async memset of `value` into every byte of `dst` on
33    /// `launch_stream`, then record the use against the
34    /// runtime.
35    ///
36    /// Requires the provider's `GpuMemoryManager` to be built
37    /// via [`crate::GpuMemoryManager::with_runtime`] (so
38    /// `dst.runtime_block()` is `Some` and the runtime is
39    /// reachable). On a legacy/no-runtime manager, returns
40    /// [`XlogError::Kernel`].
41    ///
42    /// # Errors
43    ///   * `XlogError::Kernel("memset_recorded requires
44    ///     runtime-backed manager")` if the manager has no
45    ///     runtime attached.
46    ///   * `XlogError::Kernel` from
47    ///     `cuMemsetD8Async`/stream-resolution failure.
48    ///   * `XlogError::Kernel` wrapping any
49    ///     `ResourceError::StreamMisuse` from the recorder's
50    ///     commit (notably when the active resource is
51    ///     `DirectCudaResource` — the trait default that
52    ///     intentionally rejects `record_block_use`).
53    pub fn memset_recorded(
54        &self,
55        dst: &mut TrackedCudaSlice<u8>,
56        value: u8,
57        launch_stream: StreamId,
58    ) -> Result<()> {
59        let runtime = self.memory().runtime().ok_or_else(|| {
60            XlogError::Kernel(
61                "memset_recorded requires a runtime-backed GpuMemoryManager \
62                 (constructed via with_runtime)"
63                    .to_string(),
64            )
65        })?;
66        let pool = runtime.stream_pool();
67        let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
68            XlogError::Kernel(format!(
69                "memset_recorded: launch_stream StreamId({}) does not resolve",
70                launch_stream.0
71            ))
72        })?;
73
74        // Capture identity bits before borrowing dst into the
75        // recorder. `cuMemsetD8Async` writes to `dst.device_ptr`
76        // for `dst.len()` bytes (T = u8 so len == byte count).
77        let dst_ptr = dst.device_ptr_value();
78        let dst_len = dst.len();
79
80        // STRICT recorder + PREFLIGHT before queueing CUDA work.
81        // If the active resource cannot track cross-stream uses,
82        // or if dst is not actually runtime-backed, preflight
83        // surfaces the failure here and we never enqueue the
84        // memset. Without preflight, `commit` would discover the
85        // failure only after the memset is in flight, leaving
86        // unprotected work hanging.
87        let mut rec = LaunchRecorder::new_strict(launch_stream);
88        rec.write(dst);
89        rec.preflight(runtime).map_err(|e| {
90            XlogError::Kernel(format!(
91                "memset_recorded: launch recorder preflight failed: {}",
92                e
93            ))
94        })?;
95
96        // SAFETY: dst_ptr is a live device pointer, the buffer
97        // owns `dst_len` bytes (verified by the slice's len),
98        // and cu_stream is a valid CUDA stream the runtime
99        // owns. cuMemsetD8Async is genuinely
100        // stream-asynchronous: it queues on the stream and
101        // returns immediately. We get here only if preflight
102        // succeeded.
103        unsafe {
104            let res = sys::cuMemsetD8Async(dst_ptr, value, dst_len, cu_stream.cu_stream());
105            if res != sys::cudaError_enum::CUDA_SUCCESS {
106                return Err(XlogError::Kernel(format!(
107                    "cuMemsetD8Async failed: {:?}",
108                    res
109                )));
110            }
111        }
112
113        // Record the use AFTER the launch is queued so the
114        // event fires when the memset completes. Preflight
115        // already validated the path; commit should only fail
116        // on transient driver errors at event-record time.
117        rec.commit(runtime).map_err(|e| {
118            XlogError::Kernel(format!(
119                "memset_recorded: launch recorder commit failed: {}",
120                e
121            ))
122        })?;
123        Ok(())
124    }
125
126    /// Column-level variant of [`Self::memset_recorded`] —
127    /// exercises the `LaunchRecorder::write_column` path. Used
128    /// by tests that prove `CudaColumn::Owned` records its
129    /// runtime block automatically; strict mode rejects
130    /// `CudaColumn::Dlpack` / `CudaColumn::ArrowDevice` at
131    /// preflight (no CUDA work queued).
132    pub fn memset_column_recorded(
133        &self,
134        dst: &mut CudaColumn,
135        value: u8,
136        launch_stream: StreamId,
137    ) -> Result<()> {
138        let runtime = self.memory().runtime().ok_or_else(|| {
139            XlogError::Kernel(
140                "memset_column_recorded requires a runtime-backed GpuMemoryManager".to_string(),
141            )
142        })?;
143        let pool = runtime.stream_pool();
144        let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
145            XlogError::Kernel(format!(
146                "memset_column_recorded: launch_stream StreamId({}) does not resolve",
147                launch_stream.0
148            ))
149        })?;
150
151        let dst_ptr = *dst.device_ptr();
152        let dst_len = <CudaColumn as cudarc::driver::DeviceSlice<u8>>::len(dst);
153
154        let mut rec = LaunchRecorder::new_strict(launch_stream);
155        rec.write_column(dst);
156        rec.preflight(runtime).map_err(|e| {
157            XlogError::Kernel(format!(
158                "memset_column_recorded: launch recorder preflight failed: {}",
159                e
160            ))
161        })?;
162
163        // SAFETY: dst_ptr is a live device pointer of `dst_len`
164        // bytes (CudaColumn ensures len matches the underlying
165        // memory). cuMemsetD8Async queues on the stream.
166        unsafe {
167            let res = sys::cuMemsetD8Async(dst_ptr, value, dst_len, cu_stream.cu_stream());
168            if res != sys::cudaError_enum::CUDA_SUCCESS {
169                return Err(XlogError::Kernel(format!(
170                    "cuMemsetD8Async (column) failed: {:?}",
171                    res
172                )));
173            }
174        }
175
176        rec.commit(runtime).map_err(|e| {
177            XlogError::Kernel(format!(
178                "memset_column_recorded: launch recorder commit failed: {}",
179                e
180            ))
181        })?;
182        Ok(())
183    }
184}