xlog_cuda/provider/launch_safe.rs
1//! First-slice migrated launch path through the v0.6 launch
2//! recorder.
3//!
4//! Adds a single, narrow method
5//! [`CudaKernelProvider::memset_recorded`] that performs an
6//! async memset on a caller-supplied launch stream **and**
7//! records the buffer use against the runtime via the
8//! [`crate::launch::LaunchRecorder`]. This is intentionally the
9//! simplest possible "real launch" path:
10//! * one buffer arg (a write),
11//! * one CUDA call (`cuMemsetD8Async`),
12//! * a launch_stream that can differ from the buffer's
13//! `alloc_stream`,
14//! * an explicit `commit` step that surfaces
15//! `ResourceError::StreamMisuse` from the runtime if the
16//! active resource cannot track cross-stream uses.
17//!
18//! No existing operator is modified by this slice. Existing
19//! kernel launches (filter, compact, dedup, hash join, etc.)
20//! continue to use raw `Vec<*mut c_void>` and remain unsafe by
21//! themselves. They will be migrated in follow-up commits once
22//! this minimal path is certified.
23
24use cudarc::driver::sys;
25use xlog_core::{Result, XlogError};
26
27use crate::device_runtime::StreamId;
28use crate::launch::LaunchRecorder;
29use crate::memory::{CudaColumn, TrackedCudaSlice};
30
31impl super::CudaKernelProvider {
32 /// Async memset of `value` into every byte of `dst` on
33 /// `launch_stream`, then record the use against the
34 /// runtime.
35 ///
36 /// Requires the provider's `GpuMemoryManager` to be built
37 /// via [`crate::GpuMemoryManager::with_runtime`] (so
38 /// `dst.runtime_block()` is `Some` and the runtime is
39 /// reachable). On a legacy/no-runtime manager, returns
40 /// [`XlogError::Kernel`].
41 ///
42 /// # Errors
43 /// * `XlogError::Kernel("memset_recorded requires
44 /// runtime-backed manager")` if the manager has no
45 /// runtime attached.
46 /// * `XlogError::Kernel` from
47 /// `cuMemsetD8Async`/stream-resolution failure.
48 /// * `XlogError::Kernel` wrapping any
49 /// `ResourceError::StreamMisuse` from the recorder's
50 /// commit (notably when the active resource is
51 /// `DirectCudaResource` — the trait default that
52 /// intentionally rejects `record_block_use`).
53 pub fn memset_recorded(
54 &self,
55 dst: &mut TrackedCudaSlice<u8>,
56 value: u8,
57 launch_stream: StreamId,
58 ) -> Result<()> {
59 let runtime = self.memory().runtime().ok_or_else(|| {
60 XlogError::Kernel(
61 "memset_recorded requires a runtime-backed GpuMemoryManager \
62 (constructed via with_runtime)"
63 .to_string(),
64 )
65 })?;
66 let pool = runtime.stream_pool();
67 let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
68 XlogError::Kernel(format!(
69 "memset_recorded: launch_stream StreamId({}) does not resolve",
70 launch_stream.0
71 ))
72 })?;
73
74 // Capture identity bits before borrowing dst into the
75 // recorder. `cuMemsetD8Async` writes to `dst.device_ptr`
76 // for `dst.len()` bytes (T = u8 so len == byte count).
77 let dst_ptr = dst.device_ptr_value();
78 let dst_len = dst.len();
79
80 // STRICT recorder + PREFLIGHT before queueing CUDA work.
81 // If the active resource cannot track cross-stream uses,
82 // or if dst is not actually runtime-backed, preflight
83 // surfaces the failure here and we never enqueue the
84 // memset. Without preflight, `commit` would discover the
85 // failure only after the memset is in flight, leaving
86 // unprotected work hanging.
87 let mut rec = LaunchRecorder::new_strict(launch_stream);
88 rec.write(dst);
89 rec.preflight(runtime).map_err(|e| {
90 XlogError::Kernel(format!(
91 "memset_recorded: launch recorder preflight failed: {}",
92 e
93 ))
94 })?;
95
96 // SAFETY: dst_ptr is a live device pointer, the buffer
97 // owns `dst_len` bytes (verified by the slice's len),
98 // and cu_stream is a valid CUDA stream the runtime
99 // owns. cuMemsetD8Async is genuinely
100 // stream-asynchronous: it queues on the stream and
101 // returns immediately. We get here only if preflight
102 // succeeded.
103 unsafe {
104 let res = sys::cuMemsetD8Async(dst_ptr, value, dst_len, cu_stream.cu_stream());
105 if res != sys::cudaError_enum::CUDA_SUCCESS {
106 return Err(XlogError::Kernel(format!(
107 "cuMemsetD8Async failed: {:?}",
108 res
109 )));
110 }
111 }
112
113 // Record the use AFTER the launch is queued so the
114 // event fires when the memset completes. Preflight
115 // already validated the path; commit should only fail
116 // on transient driver errors at event-record time.
117 rec.commit(runtime).map_err(|e| {
118 XlogError::Kernel(format!(
119 "memset_recorded: launch recorder commit failed: {}",
120 e
121 ))
122 })?;
123 Ok(())
124 }
125
126 /// Column-level variant of [`Self::memset_recorded`] —
127 /// exercises the `LaunchRecorder::write_column` path. Used
128 /// by tests that prove `CudaColumn::Owned` records its
129 /// runtime block automatically; strict mode rejects
130 /// `CudaColumn::Dlpack` / `CudaColumn::ArrowDevice` at
131 /// preflight (no CUDA work queued).
132 pub fn memset_column_recorded(
133 &self,
134 dst: &mut CudaColumn,
135 value: u8,
136 launch_stream: StreamId,
137 ) -> Result<()> {
138 let runtime = self.memory().runtime().ok_or_else(|| {
139 XlogError::Kernel(
140 "memset_column_recorded requires a runtime-backed GpuMemoryManager".to_string(),
141 )
142 })?;
143 let pool = runtime.stream_pool();
144 let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
145 XlogError::Kernel(format!(
146 "memset_column_recorded: launch_stream StreamId({}) does not resolve",
147 launch_stream.0
148 ))
149 })?;
150
151 let dst_ptr = *dst.device_ptr();
152 let dst_len = <CudaColumn as cudarc::driver::DeviceSlice<u8>>::len(dst);
153
154 let mut rec = LaunchRecorder::new_strict(launch_stream);
155 rec.write_column(dst);
156 rec.preflight(runtime).map_err(|e| {
157 XlogError::Kernel(format!(
158 "memset_column_recorded: launch recorder preflight failed: {}",
159 e
160 ))
161 })?;
162
163 // SAFETY: dst_ptr is a live device pointer of `dst_len`
164 // bytes (CudaColumn ensures len matches the underlying
165 // memory). cuMemsetD8Async queues on the stream.
166 unsafe {
167 let res = sys::cuMemsetD8Async(dst_ptr, value, dst_len, cu_stream.cu_stream());
168 if res != sys::cudaError_enum::CUDA_SUCCESS {
169 return Err(XlogError::Kernel(format!(
170 "cuMemsetD8Async (column) failed: {:?}",
171 res
172 )));
173 }
174 }
175
176 rec.commit(runtime).map_err(|e| {
177 XlogError::Kernel(format!(
178 "memset_column_recorded: launch recorder commit failed: {}",
179 e
180 ))
181 })?;
182 Ok(())
183 }
184}