xlog_cuda/launch.rs
1//! Launch / use recorder for runtime-backed buffers.
2//!
3//! Closes the production-side of the cross-stream lifetime gap
4//! identified by A4 *and* the use-after-prior-write hazard
5//! discovered by the multi-threaded sort+hash-join regression.
6//! Code that enqueues kernels or copies on a `launch_stream`
7//! other than the buffer's `alloc_stream` MUST tell the runtime
8//! about the use BEFORE the launch (so prior cross-stream waits
9//! can be queued ahead of the work) AND AFTER the launch (so a
10//! use-event is recorded for future readers / writers and for
11//! the eventual deallocate).
12//!
13//! Without preflight, the CUDA mempool is free to reuse the
14//! address while the cross-stream work is still in flight, AND
15//! prior writes / reads on a different stream remain
16//! invisible to the new work — kernels read torn state.
17//!
18//! # Modes
19//!
20//! Two construction modes:
21//!
22//! * [`LaunchRecorder::new_permissive`] — silently skips
23//! buffers that have no runtime-side identity (legacy
24//! cudarc-backed `TrackedCudaSlice`, external `Dlpack` /
25//! `ArrowDevice` columns). Intended for low-level helpers
26//! during the migration window where mixed legacy/runtime
27//! calls are unavoidable. **Not safe for production
28//! migrated paths** — silent skips are silent gaps.
29//!
30//! * [`LaunchRecorder::new_strict`] — rejects any buffer that
31//! cannot be tracked. Intended for production migrated
32//! launch paths: any buffer the recorder cannot attach an
33//! event to is a structural problem the caller must fix
34//! (route the allocation through the runtime, or refuse
35//! external memory in this code path).
36//!
37//! # Preflight + commit
38//!
39//! Production callers split the recorder into TWO phases around
40//! the actual CUDA call:
41//!
42//! 1. Build the recorder, register every buffer the launch
43//! will touch via `read` / `write` / `read_write` /
44//! `read_column` / `write_column` *before* enqueueing any
45//! CUDA work. Fresh output buffers go through the same
46//! `write` / `write_column` API — there is no separate
47//! post-launch path. The recorder snapshots the block id
48//! at record time and immediately drops the slice borrow,
49//! so callers can take `&mut` afterwards.
50//! 2. Call [`LaunchRecorder::preflight`] BEFORE enqueueing
51//! any CUDA work. Preflight verifies the active resource
52//! supports cross-stream tracking and (in strict mode)
53//! that every recorded buffer has a runtime block, then
54//! queues the cross-stream waits required by each
55//! recorded access kind via
56//! [`crate::device_runtime::XlogDeviceRuntime::prepare_block_use`].
57//! On failure no CUDA work has been queued yet.
58//! 3. Enqueue the CUDA call on `launch_stream`.
59//! 4. Call [`LaunchRecorder::commit`] AFTER the launch is
60//! enqueued. Commit calls `finish_block_use` on each
61//! tracked block — the runtime records its event on
62//! `launch_stream` at this point, and that event becomes
63//! part of the block's dependency state for future
64//! readers / writers and the eventual deallocate.
65//!
66//! # Why preflight queues waits, not just validates
67//!
68//! Earlier revisions only validated the resource stack at
69//! preflight and queued waits implicitly via deallocate.
70//! That protected free-after-use but NOT use-after-prior-write
71//! across streams: if sort writes column A on stream X and
72//! join reads column A on stream Y, the join's read kernel
73//! could observe sort's pre-write contents because no event
74//! fenced X→Y. This recorder closes that gap by queuing
75//! `cuStreamWaitEvent` calls in preflight, before the join
76//! kernel is enqueued on Y, against sort's recorded write
77//! event on X.
78//!
79//! # External memory (DLPack, Arrow device)
80//!
81//! Strict mode rejects [`crate::memory::CudaColumn::Dlpack`]
82//! and [`crate::memory::CudaColumn::ArrowDevice`] columns
83//! outright. External memory has no xlog-side runtime identity
84//! — the prepare/finish APIs cannot attach events to a buffer
85//! the runtime did not allocate. Callers that need to consume
86//! external columns must either:
87//! * use a permissive recorder (and accept that no
88//! cross-stream safety applies to those buffers), or
89//! * synchronize externally (e.g., wait on the producing
90//! framework's stream / event before queueing xlog work).
91//!
92//! Permissive mode skips external columns silently, matching
93//! the legacy-buffer policy.
94
95use std::collections::HashMap;
96
97use crate::device_runtime::{
98 Access, BlockId, DeviceBlock, Generation, ResourceError, ResourceResult, StreamId,
99 XlogDeviceRuntime,
100};
101
102/// Recorder construction mode.
103#[derive(Clone, Copy, Debug, Eq, PartialEq)]
104pub enum RecorderMode {
105 /// Silently skip untracked buffers. Acceptable for low-level
106 /// helpers during the migration window; **not** safe for
107 /// production migrated paths.
108 Permissive,
109 /// Reject untracked buffers. Production migrated paths use
110 /// this so silent skips become loud failures.
111 Strict,
112}
113
114/// Records buffer uses for a single launch / copy on
115/// `launch_stream`. Drop without `commit` is a programmer error;
116/// the recorder logs (debug builds only) and never panics.
117///
118/// # Lifetime model
119///
120/// The recorder snapshots each registered block's identity
121/// ([`BlockId`]) at record time and immediately drops the source
122/// slice borrow. The recorder type itself carries no lifetime
123/// parameter, so callers can interleave `rec.read(&buf)` calls
124/// with later `&mut buf` kernel-param borrows freely. The
125/// runtime's generation guard catches misuse where the snapshot
126/// outlives the underlying allocation.
127///
128/// # Required call order for non-empty recorders
129///
130/// `preflight(&runtime)` MUST be called and return `Ok(())`
131/// BEFORE any CUDA work is enqueued, AND BEFORE `commit`.
132/// Preflight queues the cross-stream waits each recorded access
133/// kind requires (read waits on prior writes; write waits on
134/// prior writes AND prior reads), so the launch sees a
135/// well-fenced view of every input. Commit then records the
136/// new event on `launch_stream` so future ops can wait on it.
137///
138/// Empty recorders (no `read`/`write`/... calls) are a no-op
139/// and bypass the preflight requirement: there are no waits
140/// to queue, no events to record.
141pub struct LaunchRecorder {
142 launch_stream: StreamId,
143 mode: RecorderMode,
144 /// Recorded uses, snapshotted from source blocks at record
145 /// time. The recorder holds no slice borrows after the
146 /// record call returns — `&mut` kernel params are free.
147 uses: Vec<RecordedUse>,
148 /// First strict-mode rejection encountered while recording.
149 /// Surfaced from `preflight`; the recorder's record methods
150 /// return `&mut Self` so callers can chain naturally.
151 strict_reject: Option<ResourceError>,
152 /// `true` after a successful `preflight(&runtime)` returns
153 /// `Ok(())`. `commit` rejects non-empty recorders that
154 /// were not preflighted.
155 preflighted: bool,
156 committed: bool,
157}
158
159#[derive(Clone, Copy)]
160struct RecordedUse {
161 block: BlockId,
162 access: Access,
163 /// Site label (e.g., `"read"`, `"write"`, `"read_column"`)
164 /// for diagnostics. Not used at runtime beyond error
165 /// messages.
166 #[allow(dead_code)]
167 label: &'static str,
168}
169
170impl LaunchRecorder {
171 /// Permissive recorder: silently skips untracked buffers.
172 pub fn new_permissive(launch_stream: StreamId) -> Self {
173 Self::new(launch_stream, RecorderMode::Permissive)
174 }
175
176 /// Strict recorder: rejects any untracked buffer.
177 /// Production migrated launch paths use this.
178 pub fn new_strict(launch_stream: StreamId) -> Self {
179 Self::new(launch_stream, RecorderMode::Strict)
180 }
181
182 fn new(launch_stream: StreamId, mode: RecorderMode) -> Self {
183 Self {
184 launch_stream,
185 mode,
186 uses: Vec::new(),
187 strict_reject: None,
188 preflighted: false,
189 committed: false,
190 }
191 }
192
193 /// Configured launch stream.
194 pub fn launch_stream(&self) -> StreamId {
195 self.launch_stream
196 }
197
198 /// Configured mode.
199 pub fn mode(&self) -> RecorderMode {
200 self.mode
201 }
202
203 /// Snapshot a block reference into a recorded use. Reject
204 /// post-preflight additions so the validity check at
205 /// preflight time stays the source of truth.
206 fn note(
207 &mut self,
208 label: &'static str,
209 block: Option<&DeviceBlock>,
210 access: Access,
211 external: bool,
212 ) -> &mut Self {
213 if self.preflighted && self.strict_reject.is_none() {
214 self.strict_reject = Some(ResourceError::StreamMisuse(format!(
215 "LaunchRecorder::{}: recorded after preflight — once preflight \
216 succeeds, the set of uses is frozen so commit-time discoveries \
217 cannot leave unprotected work in flight. Record this use BEFORE \
218 preflight (the recorder is lifetime-free; snapshots release the \
219 source borrow immediately, so kernel-param &mut borrows still \
220 work)",
221 label,
222 )));
223 return self;
224 }
225 if let Some(b) = block {
226 self.uses.push(RecordedUse {
227 block: BlockId::from_block(b),
228 access,
229 label,
230 });
231 return self;
232 }
233 if self.mode == RecorderMode::Strict && self.strict_reject.is_none() {
234 let why = if external {
235 "external (DLPack / ArrowDevice) memory has no runtime identity; \
236 strict launch recorders cannot attach a cross-stream use to it. \
237 Use a permissive recorder OR coordinate the cross-stream \
238 synchronization explicitly outside xlog"
239 } else {
240 "buffer is legacy cudarc-backed (no runtime block); strict launch \
241 recorders require the allocation to be routed through \
242 GpuMemoryManager::with_runtime so a DeviceBlock is available"
243 };
244 self.strict_reject = Some(ResourceError::StreamMisuse(format!(
245 "LaunchRecorder::{}: untracked buffer rejected — {}",
246 label, why
247 )));
248 }
249 self
250 }
251
252 /// Record a runtime-backed [`crate::memory::TrackedCudaSlice`]
253 /// the launch will read.
254 pub fn read<T: cudarc::driver::DeviceRepr>(
255 &mut self,
256 slice: &crate::memory::TrackedCudaSlice<T>,
257 ) -> &mut Self {
258 self.note("read", slice.runtime_block(), Access::Read, false)
259 }
260
261 /// Record a runtime-backed slice the launch will write.
262 /// Use this for both pre-existing buffers being overwritten
263 /// AND for fresh runtime-backed allocations whose lifetime
264 /// began in the same operator. The recorder snapshots block
265 /// identity at record time and drops the borrow, so kernel
266 /// `&mut slice` borrows after preflight are unaffected.
267 pub fn write<T: cudarc::driver::DeviceRepr>(
268 &mut self,
269 slice: &crate::memory::TrackedCudaSlice<T>,
270 ) -> &mut Self {
271 self.note("write", slice.runtime_block(), Access::Write, false)
272 }
273
274 /// Record a runtime-backed slice the launch will both read
275 /// and write.
276 pub fn read_write<T: cudarc::driver::DeviceRepr>(
277 &mut self,
278 slice: &crate::memory::TrackedCudaSlice<T>,
279 ) -> &mut Self {
280 self.note(
281 "read_write",
282 slice.runtime_block(),
283 Access::ReadWrite,
284 false,
285 )
286 }
287
288 /// Record a [`crate::memory::CudaColumn`] the launch will
289 /// read. Owned columns surface their runtime block; external
290 /// (`Dlpack` / `ArrowDevice`) columns are rejected in strict
291 /// mode and silently skipped in permissive mode.
292 pub fn read_column(&mut self, col: &crate::memory::CudaColumn) -> &mut Self {
293 self.note(
294 "read_column",
295 col.runtime_block(),
296 Access::Read,
297 col.is_external(),
298 )
299 }
300
301 /// Record a [`crate::memory::CudaColumn`] the launch will
302 /// write.
303 pub fn write_column(&mut self, col: &crate::memory::CudaColumn) -> &mut Self {
304 self.note(
305 "write_column",
306 col.runtime_block(),
307 Access::Write,
308 col.is_external(),
309 )
310 }
311
312 /// Record a [`crate::provider::RawCudaView`]-style view that
313 /// borrows a region of a runtime-backed allocation. The
314 /// view must carry its source block via `runtime_block()`;
315 /// strict mode rejects views built from legacy / external
316 /// paths.
317 ///
318 /// Public API placeholder for the upcoming filter-class
319 /// migration; no production caller exists yet.
320 #[allow(dead_code)]
321 pub(crate) fn read_view_runtime(&mut self, block: Option<&DeviceBlock>) -> &mut Self {
322 self.note("read_view", block, Access::Read, false)
323 }
324
325 /// Number of recorded runtime-backed uses. Diagnostic.
326 pub fn recorded_count(&self) -> usize {
327 self.uses.len()
328 }
329
330 /// Preflight: validate the recorder is ready to commit
331 /// against `runtime` AND queue every cross-stream wait the
332 /// recorded access kinds require. **Stateful** — sets a flag
333 /// that `commit` checks. MUST be called BEFORE enqueueing
334 /// the CUDA launch / copy. On failure no CUDA work has been
335 /// queued yet, the flag remains unset, and the caller can
336 /// either fix the recorder or abandon the launch.
337 ///
338 /// Verifies (in order):
339 /// * No strict-mode rejection accumulated during recording
340 /// (untracked / external buffer in strict mode, or
341 /// post-preflight `note` attempt).
342 /// * The active resource stack supports cross-stream
343 /// tracking (`runtime.supports_block_use_tracking()`)
344 /// OR the recorder has zero tracked uses (no events to
345 /// record).
346 ///
347 /// Then for each recorded use, calls
348 /// [`XlogDeviceRuntime::prepare_block_use`] which queues
349 /// `cuStreamWaitEvent` calls on `launch_stream` for any
350 /// prior write (read access) or any prior write + prior
351 /// reads (write / read-write access) on a different stream.
352 /// Same-stream events are skipped — already ordered.
353 ///
354 /// Repeated registrations of the same block in the same
355 /// recorder are deduplicated to a single prepare call (the
356 /// strongest access kind wins): `read` + `write` of the
357 /// same block becomes one `Access::ReadWrite` prepare.
358 pub fn preflight(&mut self, runtime: &XlogDeviceRuntime) -> ResourceResult<()> {
359 if let Some(err) = &self.strict_reject {
360 // Surface the captured strict-mode rejection
361 // verbatim. Do NOT mark preflighted.
362 return Err(ResourceError::StreamMisuse(format!("{}", err)));
363 }
364 if !self.uses.is_empty() && !runtime.supports_block_use_tracking() {
365 return Err(ResourceError::StreamMisuse(
366 "LaunchRecorder::preflight: active resource does not support \
367 cross-stream use tracking. Build the runtime around \
368 AsyncCudaResource (or a decorator stack over it) for \
369 stream-lifetime-safe launches"
370 .to_string(),
371 ));
372 }
373
374 let deduped = dedup_uses(&self.uses);
375 for use_ in &deduped {
376 runtime.prepare_block_use(use_.block, self.launch_stream, use_.access)?;
377 }
378
379 self.preflighted = true;
380 Ok(())
381 }
382
383 /// Commit the recorded uses to the runtime. MUST be called
384 /// AFTER preflight succeeded AND the CUDA launch has been
385 /// enqueued on `launch_stream`.
386 ///
387 /// **Non-empty recorders that were not preflighted are
388 /// rejected** with `StreamMisuse`. This closes the footgun
389 /// where a caller could enqueue CUDA work, then call
390 /// commit, then discover at commit-time that the active
391 /// resource is unsupported — leaving unprotected work in
392 /// flight. Production migrated launch paths must therefore
393 /// always preflight BEFORE the CUDA call.
394 ///
395 /// Empty recorders (no recorded uses) bypass the check:
396 /// nothing to record, no events to fire, no contract to
397 /// honor.
398 ///
399 /// For each recorded use, calls
400 /// [`XlogDeviceRuntime::finish_block_use`] which records an
401 /// event on `launch_stream` and folds it into the block's
402 /// dependency state (writers replace `last_write` and clear
403 /// `outstanding_reads`; readers append to
404 /// `outstanding_reads`). Repeated registrations of the same
405 /// block are deduplicated identically to preflight.
406 pub fn commit(mut self, runtime: &XlogDeviceRuntime) -> ResourceResult<()> {
407 // Re-check any strict reject that may have accumulated
408 // — preflight may not have been called, or may not have
409 // surfaced this particular path. (Same string as
410 // preflight would produce.)
411 if let Some(err) = self.strict_reject.take() {
412 return Err(err);
413 }
414 if !self.uses.is_empty() && !self.preflighted {
415 return Err(ResourceError::StreamMisuse(
416 "LaunchRecorder::commit: non-empty recorder reached commit without \
417 a successful preflight. The caller MUST call preflight(&runtime) \
418 BEFORE enqueueing CUDA work; otherwise commit-time failures leave \
419 unprotected work in flight. See the preflight + commit contract \
420 in the LaunchRecorder doc"
421 .to_string(),
422 ));
423 }
424
425 let deduped = dedup_uses(&self.uses);
426 for use_ in &deduped {
427 runtime.finish_block_use(use_.block, self.launch_stream, use_.access)?;
428 }
429 self.committed = true;
430 Ok(())
431 }
432}
433
434/// Collapse multiple registrations of the same block into one
435/// use with the strongest access.
436///
437/// The dedup key is `(ptr, generation, device_ordinal)` — NOT
438/// `ptr` alone. ABA reuse inside a single recorder is rare but
439/// possible (record use of buffer X, drop X, allocate a new
440/// block reusing X's address, record THAT) and a ptr-only key
441/// would incorrectly merge those two distinct uses. Keying by
442/// the full identity tuple lets the prepare/finish path see
443/// each generation's events independently; the runtime's
444/// generation guard then catches any stale id at the resource
445/// boundary.
446///
447/// Access combine: Read+Write → ReadWrite; otherwise the
448/// strongest of the two operands wins.
449fn dedup_uses(uses: &[RecordedUse]) -> Vec<RecordedUse> {
450 let mut by_id: HashMap<(u64, Generation, u32), usize> = HashMap::with_capacity(uses.len());
451 let mut deduped: Vec<RecordedUse> = Vec::with_capacity(uses.len());
452 for use_ in uses {
453 let key = (
454 use_.block.ptr,
455 use_.block.generation,
456 use_.block.device_ordinal,
457 );
458 match by_id.get(&key) {
459 Some(&idx) => {
460 deduped[idx].access = combine_access(deduped[idx].access, use_.access);
461 }
462 None => {
463 by_id.insert(key, deduped.len());
464 deduped.push(*use_);
465 }
466 }
467 }
468 deduped
469}
470
471/// Strongest-access lattice: ReadWrite >= Write/Read; Write+Read = ReadWrite.
472fn combine_access(a: Access, b: Access) -> Access {
473 match (a, b) {
474 (Access::ReadWrite, _) | (_, Access::ReadWrite) => Access::ReadWrite,
475 (Access::Read, Access::Write) | (Access::Write, Access::Read) => Access::ReadWrite,
476 (Access::Read, Access::Read) => Access::Read,
477 (Access::Write, Access::Write) => Access::Write,
478 }
479}
480
481impl Drop for LaunchRecorder {
482 fn drop(&mut self) {
483 if !self.committed && !self.uses.is_empty() {
484 #[cfg(debug_assertions)]
485 eprintln!(
486 "[xlog_cuda::launch] LaunchRecorder dropped without commit: \
487 {} uses on launch_stream={} (mode={:?}) were NOT recorded; \
488 cross-stream lifetime safety lost for this launch",
489 self.uses.len(),
490 self.launch_stream.0,
491 self.mode,
492 );
493 }
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500 use crate::device_runtime::{
501 AsyncCudaResource, DeviceMemoryResource, DirectCudaResource, StreamPool,
502 };
503 use crate::CudaDevice;
504 use std::sync::Arc;
505 use xlog_core::MemoryBudget;
506
507 fn try_async_runtime() -> Option<(Arc<CudaDevice>, Arc<XlogDeviceRuntime>, StreamId)> {
508 let device = Arc::new(CudaDevice::new(0).ok()?);
509 let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
510 let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
511 AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
512 );
513 let runtime = Arc::new(XlogDeviceRuntime::with_resource(
514 Arc::clone(&device),
515 0,
516 Arc::clone(&pool),
517 async_resource,
518 ));
519 let launch_stream = pool.acquire().ok()?;
520 Some((device, runtime, launch_stream))
521 }
522
523 fn try_direct_runtime() -> Option<(Arc<CudaDevice>, Arc<XlogDeviceRuntime>, StreamId)> {
524 let device = Arc::new(CudaDevice::new(0).ok()?);
525 let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
526 let direct: Box<dyn DeviceMemoryResource + Send + Sync> =
527 Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
528 let runtime = Arc::new(XlogDeviceRuntime::with_resource(
529 Arc::clone(&device),
530 0,
531 Arc::clone(&pool),
532 direct,
533 ));
534 Some((device, runtime, StreamId::DEFAULT))
535 }
536
537 #[test]
538 fn empty_commit_is_ok_in_both_modes() {
539 let Some((_d, rt, ls)) = try_async_runtime() else {
540 return;
541 };
542 LaunchRecorder::new_permissive(ls)
543 .commit(&rt)
544 .expect("permissive empty");
545 LaunchRecorder::new_strict(ls)
546 .commit(&rt)
547 .expect("strict empty");
548 }
549
550 #[test]
551 fn permissive_skips_legacy_silently() {
552 let Some(device) = CudaDevice::new(0).ok().map(Arc::new) else {
553 return;
554 };
555 let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
556 let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
557 AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
558 );
559 let runtime = Arc::new(XlogDeviceRuntime::with_resource(
560 Arc::clone(&device),
561 0,
562 Arc::clone(&pool),
563 async_resource,
564 ));
565 let launch_stream = pool.acquire().expect("acquire");
566
567 // Legacy manager — no runtime — produces None block.
568 let manager = Arc::new(crate::GpuMemoryManager::new(
569 Arc::clone(&device),
570 MemoryBudget::with_limit(1024 * 1024),
571 ));
572 let legacy = manager.alloc::<u8>(64).expect("legacy alloc");
573 assert!(legacy.runtime_block().is_none());
574
575 let mut rec = LaunchRecorder::new_permissive(launch_stream);
576 rec.read(&legacy);
577 assert_eq!(rec.recorded_count(), 0);
578 rec.preflight(&runtime).expect("permissive preflight");
579 rec.commit(&runtime).expect("permissive commit");
580 }
581
582 #[test]
583 fn strict_rejects_legacy_at_preflight() {
584 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
585 return;
586 };
587 let manager = Arc::new(crate::GpuMemoryManager::new(
588 Arc::clone(&device),
589 MemoryBudget::with_limit(1024 * 1024),
590 ));
591 let legacy = manager.alloc::<u8>(64).expect("legacy alloc");
592
593 let mut rec = LaunchRecorder::new_strict(launch_stream);
594 rec.read(&legacy);
595 let err = rec.preflight(&runtime);
596 match err {
597 Err(ResourceError::StreamMisuse(msg)) => {
598 assert!(msg.contains("untracked buffer rejected"), "msg: {}", msg);
599 }
600 other => panic!(
601 "strict mode must reject untracked buffer at preflight; got {:?}",
602 other
603 ),
604 }
605 }
606
607 #[test]
608 fn preflight_rejects_direct_runtime_before_enqueue() {
609 let Some((device, runtime, launch_stream)) = try_direct_runtime() else {
610 return;
611 };
612 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
613 Arc::clone(&device),
614 MemoryBudget::with_limit(1024 * 1024),
615 Arc::clone(&runtime),
616 ));
617 let buf = manager.alloc::<u8>(64).expect("alloc");
618 assert!(buf.runtime_block().is_some());
619
620 let mut rec = LaunchRecorder::new_strict(launch_stream);
621 rec.read(&buf);
622 let err = rec.preflight(&runtime);
623 match err {
624 Err(ResourceError::StreamMisuse(msg)) => {
625 assert!(
626 msg.contains("does not support cross-stream use tracking"),
627 "msg: {}",
628 msg
629 );
630 }
631 other => panic!(
632 "preflight must reject Direct-backed runtime before enqueue; got {:?}",
633 other
634 ),
635 }
636 }
637
638 #[test]
639 fn preflight_then_commit_async_runtime() {
640 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
641 return;
642 };
643 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
644 Arc::clone(&device),
645 MemoryBudget::with_limit(1024 * 1024),
646 Arc::clone(&runtime),
647 ));
648 let buf = manager.alloc::<u8>(64).expect("alloc");
649
650 let mut rec = LaunchRecorder::new_strict(launch_stream);
651 rec.read(&buf);
652 rec.preflight(&runtime).expect("preflight ok");
653 // (in production: enqueue CUDA launch here)
654 rec.commit(&runtime).expect("commit ok");
655 }
656
657 #[test]
658 fn commit_rejects_un_preflighted_strict_recorder() {
659 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
660 return;
661 };
662 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
663 Arc::clone(&device),
664 MemoryBudget::with_limit(1024 * 1024),
665 Arc::clone(&runtime),
666 ));
667 let buf = manager.alloc::<u8>(64).expect("alloc");
668
669 let mut rec = LaunchRecorder::new_strict(launch_stream);
670 rec.read(&buf);
671 let err = rec.commit(&runtime);
672 match err {
673 Err(ResourceError::StreamMisuse(msg)) => {
674 assert!(
675 msg.contains("without a successful preflight"),
676 "msg: {}",
677 msg
678 );
679 }
680 other => panic!(
681 "non-empty un-preflighted commit must return StreamMisuse, got {:?}",
682 other
683 ),
684 }
685 }
686
687 #[test]
688 fn empty_recorder_commit_without_preflight_is_ok() {
689 let Some((_d, rt, ls)) = try_async_runtime() else {
690 return;
691 };
692 LaunchRecorder::new_strict(ls)
693 .commit(&rt)
694 .expect("empty strict commit without preflight");
695 }
696
697 #[test]
698 fn note_after_preflight_via_standard_method_is_rejected() {
699 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
700 return;
701 };
702 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
703 Arc::clone(&device),
704 MemoryBudget::with_limit(1024 * 1024),
705 Arc::clone(&runtime),
706 ));
707 let buf_a = manager.alloc::<u8>(64).expect("alloc a");
708 let buf_b = manager.alloc::<u8>(64).expect("alloc b");
709
710 let mut rec = LaunchRecorder::new_strict(launch_stream);
711 rec.read(&buf_a);
712 rec.preflight(&runtime).expect("preflight ok");
713 rec.read(&buf_b);
714 let err = rec.commit(&runtime);
715 match err {
716 Err(ResourceError::StreamMisuse(msg)) => {
717 assert!(msg.contains("recorded after preflight"), "msg: {}", msg);
718 }
719 other => panic!(
720 "post-preflight standard-method record must be rejected; got {:?}",
721 other
722 ),
723 }
724 }
725
726 /// Pre-launch fresh-write path: fresh outputs are recorded
727 /// BEFORE preflight via the regular `write` API. Snapshot
728 /// drops the source borrow, so kernel `&mut` borrows after
729 /// preflight remain valid.
730 #[test]
731 fn pre_preflight_fresh_write_is_accepted() {
732 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
733 return;
734 };
735 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
736 Arc::clone(&device),
737 MemoryBudget::with_limit(1024 * 1024),
738 Arc::clone(&runtime),
739 ));
740 let buf_a = manager.alloc::<u8>(64).expect("alloc a");
741 let mut buf_fresh = manager.alloc::<u8>(64).expect("alloc fresh");
742
743 let mut rec = LaunchRecorder::new_strict(launch_stream);
744 rec.read(&buf_a);
745 rec.write(&buf_fresh);
746 rec.preflight(&runtime).expect("preflight ok");
747 // Borrows are released; kernel-style &mut works here.
748 let _kernel_param = &mut buf_fresh;
749 rec.commit(&runtime).expect("commit ok");
750 }
751
752 /// Read+write of the same block in a single recorder
753 /// dedupes to a single ReadWrite prepare/finish call.
754 #[test]
755 fn read_then_write_same_block_dedupes_to_read_write() {
756 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
757 return;
758 };
759 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
760 Arc::clone(&device),
761 MemoryBudget::with_limit(1024 * 1024),
762 Arc::clone(&runtime),
763 ));
764 let buf = manager.alloc::<u8>(64).expect("alloc");
765
766 let mut rec = LaunchRecorder::new_strict(launch_stream);
767 rec.read(&buf);
768 rec.write(&buf);
769 rec.preflight(&runtime).expect("preflight");
770 rec.commit(&runtime).expect("commit");
771 }
772
773 /// Locks the dedup key: `(ptr, generation, device_ordinal)`,
774 /// not `ptr` alone. Two `RecordedUse`s sharing a ptr but
775 /// differing in generation MUST be treated as distinct
776 /// entries — otherwise an ABA reuse inside a single recorder
777 /// would silently collapse an event for the new allocation
778 /// onto the old block's prepare/finish chain.
779 #[test]
780 fn dedup_keys_on_full_block_id_not_ptr_alone() {
781 // Construct two RecordedUses with the same ptr but
782 // distinct generations — directly drive `dedup_uses` so
783 // the test is deterministic and does not require ABA to
784 // actually occur on real CUDA.
785 let block_a = BlockId {
786 ptr: 0xdead_beef,
787 generation: Generation(1),
788 alloc_stream: StreamId::DEFAULT,
789 device_ordinal: 0,
790 };
791 let block_b = BlockId {
792 ptr: 0xdead_beef,
793 generation: Generation(2),
794 alloc_stream: StreamId::DEFAULT,
795 device_ordinal: 0,
796 };
797 let uses = vec![
798 RecordedUse {
799 block: block_a,
800 access: Access::Read,
801 label: "read",
802 },
803 RecordedUse {
804 block: block_b,
805 access: Access::Write,
806 label: "write",
807 },
808 ];
809 let deduped = dedup_uses(&uses);
810 assert_eq!(deduped.len(), 2, "ABA generations must NOT collapse");
811 assert_eq!(deduped[0].block.generation, Generation(1));
812 assert_eq!(deduped[0].access, Access::Read);
813 assert_eq!(deduped[1].block.generation, Generation(2));
814 assert_eq!(deduped[1].access, Access::Write);
815
816 // Same ptr + same generation + duplicate access must
817 // collapse into one entry with combined access.
818 let same_id = vec![
819 RecordedUse {
820 block: block_a,
821 access: Access::Read,
822 label: "read",
823 },
824 RecordedUse {
825 block: block_a,
826 access: Access::Write,
827 label: "write",
828 },
829 ];
830 let collapsed = dedup_uses(&same_id);
831 assert_eq!(collapsed.len(), 1);
832 assert_eq!(collapsed[0].access, Access::ReadWrite);
833 }
834
835 #[test]
836 fn read_column_owned_runtime_backed() {
837 use crate::memory::CudaColumn;
838 let Some((device, runtime, launch_stream)) = try_async_runtime() else {
839 return;
840 };
841 let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
842 Arc::clone(&device),
843 MemoryBudget::with_limit(1024 * 1024),
844 Arc::clone(&runtime),
845 ));
846 let slice = manager.alloc::<u8>(64).expect("alloc");
847 let col = CudaColumn::owned(slice);
848 assert!(col.runtime_block().is_some());
849
850 let mut rec = LaunchRecorder::new_strict(launch_stream);
851 rec.read_column(&col);
852 assert_eq!(rec.recorded_count(), 1);
853 rec.preflight(&runtime).expect("preflight");
854 rec.commit(&runtime).expect("commit");
855 }
856}