Skip to main content

xlog_cuda/
launch.rs

1//! Launch / use recorder for runtime-backed buffers.
2//!
3//! Closes the production-side of the cross-stream lifetime gap
4//! identified by A4 *and* the use-after-prior-write hazard
5//! discovered by the multi-threaded sort+hash-join regression.
6//! Code that enqueues kernels or copies on a `launch_stream`
7//! other than the buffer's `alloc_stream` MUST tell the runtime
8//! about the use BEFORE the launch (so prior cross-stream waits
9//! can be queued ahead of the work) AND AFTER the launch (so a
10//! use-event is recorded for future readers / writers and for
11//! the eventual deallocate).
12//!
13//! Without preflight, the CUDA mempool is free to reuse the
14//! address while the cross-stream work is still in flight, AND
15//! prior writes / reads on a different stream remain
16//! invisible to the new work — kernels read torn state.
17//!
18//! # Modes
19//!
20//! Two construction modes:
21//!
22//!   * [`LaunchRecorder::new_permissive`] — silently skips
23//!     buffers that have no runtime-side identity (legacy
24//!     cudarc-backed `TrackedCudaSlice`, external `Dlpack` /
25//!     `ArrowDevice` columns). Intended for low-level helpers
26//!     during the migration window where mixed legacy/runtime
27//!     calls are unavoidable. **Not safe for production
28//!     migrated paths** — silent skips are silent gaps.
29//!
30//!   * [`LaunchRecorder::new_strict`] — rejects any buffer that
31//!     cannot be tracked. Intended for production migrated
32//!     launch paths: any buffer the recorder cannot attach an
33//!     event to is a structural problem the caller must fix
34//!     (route the allocation through the runtime, or refuse
35//!     external memory in this code path).
36//!
37//! # Preflight + commit
38//!
39//! Production callers split the recorder into TWO phases around
40//! the actual CUDA call:
41//!
42//!   1. Build the recorder, register every buffer the launch
43//!      will touch via `read` / `write` / `read_write` /
44//!      `read_column` / `write_column` *before* enqueueing any
45//!      CUDA work. Fresh output buffers go through the same
46//!      `write` / `write_column` API — there is no separate
47//!      post-launch path. The recorder snapshots the block id
48//!      at record time and immediately drops the slice borrow,
49//!      so callers can take `&mut` afterwards.
50//!   2. Call [`LaunchRecorder::preflight`] BEFORE enqueueing
51//!      any CUDA work. Preflight verifies the active resource
52//!      supports cross-stream tracking and (in strict mode)
53//!      that every recorded buffer has a runtime block, then
54//!      queues the cross-stream waits required by each
55//!      recorded access kind via
56//!      [`crate::device_runtime::XlogDeviceRuntime::prepare_block_use`].
57//!      On failure no CUDA work has been queued yet.
58//!   3. Enqueue the CUDA call on `launch_stream`.
59//!   4. Call [`LaunchRecorder::commit`] AFTER the launch is
60//!      enqueued. Commit calls `finish_block_use` on each
61//!      tracked block — the runtime records its event on
62//!      `launch_stream` at this point, and that event becomes
63//!      part of the block's dependency state for future
64//!      readers / writers and the eventual deallocate.
65//!
66//! # Why preflight queues waits, not just validates
67//!
68//! Earlier revisions only validated the resource stack at
69//! preflight and queued waits implicitly via deallocate.
70//! That protected free-after-use but NOT use-after-prior-write
71//! across streams: if sort writes column A on stream X and
72//! join reads column A on stream Y, the join's read kernel
73//! could observe sort's pre-write contents because no event
74//! fenced X→Y. This recorder closes that gap by queuing
75//! `cuStreamWaitEvent` calls in preflight, before the join
76//! kernel is enqueued on Y, against sort's recorded write
77//! event on X.
78//!
79//! # External memory (DLPack, Arrow device)
80//!
81//! Strict mode rejects [`crate::memory::CudaColumn::Dlpack`]
82//! and [`crate::memory::CudaColumn::ArrowDevice`] columns
83//! outright. External memory has no xlog-side runtime identity
84//! — the prepare/finish APIs cannot attach events to a buffer
85//! the runtime did not allocate. Callers that need to consume
86//! external columns must either:
87//!   * use a permissive recorder (and accept that no
88//!     cross-stream safety applies to those buffers), or
89//!   * synchronize externally (e.g., wait on the producing
90//!     framework's stream / event before queueing xlog work).
91//!
92//! Permissive mode skips external columns silently, matching
93//! the legacy-buffer policy.
94
95use std::collections::HashMap;
96
97use crate::device_runtime::{
98    Access, BlockId, DeviceBlock, Generation, ResourceError, ResourceResult, StreamId,
99    XlogDeviceRuntime,
100};
101
102/// Recorder construction mode.
103#[derive(Clone, Copy, Debug, Eq, PartialEq)]
104pub enum RecorderMode {
105    /// Silently skip untracked buffers. Acceptable for low-level
106    /// helpers during the migration window; **not** safe for
107    /// production migrated paths.
108    Permissive,
109    /// Reject untracked buffers. Production migrated paths use
110    /// this so silent skips become loud failures.
111    Strict,
112}
113
114/// Records buffer uses for a single launch / copy on
115/// `launch_stream`. Drop without `commit` is a programmer error;
116/// the recorder logs (debug builds only) and never panics.
117///
118/// # Lifetime model
119///
120/// The recorder snapshots each registered block's identity
121/// ([`BlockId`]) at record time and immediately drops the source
122/// slice borrow. The recorder type itself carries no lifetime
123/// parameter, so callers can interleave `rec.read(&buf)` calls
124/// with later `&mut buf` kernel-param borrows freely. The
125/// runtime's generation guard catches misuse where the snapshot
126/// outlives the underlying allocation.
127///
128/// # Required call order for non-empty recorders
129///
130/// `preflight(&runtime)` MUST be called and return `Ok(())`
131/// BEFORE any CUDA work is enqueued, AND BEFORE `commit`.
132/// Preflight queues the cross-stream waits each recorded access
133/// kind requires (read waits on prior writes; write waits on
134/// prior writes AND prior reads), so the launch sees a
135/// well-fenced view of every input. Commit then records the
136/// new event on `launch_stream` so future ops can wait on it.
137///
138/// Empty recorders (no `read`/`write`/... calls) are a no-op
139/// and bypass the preflight requirement: there are no waits
140/// to queue, no events to record.
141pub struct LaunchRecorder {
142    launch_stream: StreamId,
143    mode: RecorderMode,
144    /// Recorded uses, snapshotted from source blocks at record
145    /// time. The recorder holds no slice borrows after the
146    /// record call returns — `&mut` kernel params are free.
147    uses: Vec<RecordedUse>,
148    /// First strict-mode rejection encountered while recording.
149    /// Surfaced from `preflight`; the recorder's record methods
150    /// return `&mut Self` so callers can chain naturally.
151    strict_reject: Option<ResourceError>,
152    /// `true` after a successful `preflight(&runtime)` returns
153    /// `Ok(())`. `commit` rejects non-empty recorders that
154    /// were not preflighted.
155    preflighted: bool,
156    committed: bool,
157}
158
159#[derive(Clone, Copy)]
160struct RecordedUse {
161    block: BlockId,
162    access: Access,
163    /// Site label (e.g., `"read"`, `"write"`, `"read_column"`)
164    /// for diagnostics. Not used at runtime beyond error
165    /// messages.
166    #[allow(dead_code)]
167    label: &'static str,
168}
169
170impl LaunchRecorder {
171    /// Permissive recorder: silently skips untracked buffers.
172    pub fn new_permissive(launch_stream: StreamId) -> Self {
173        Self::new(launch_stream, RecorderMode::Permissive)
174    }
175
176    /// Strict recorder: rejects any untracked buffer.
177    /// Production migrated launch paths use this.
178    pub fn new_strict(launch_stream: StreamId) -> Self {
179        Self::new(launch_stream, RecorderMode::Strict)
180    }
181
182    fn new(launch_stream: StreamId, mode: RecorderMode) -> Self {
183        Self {
184            launch_stream,
185            mode,
186            uses: Vec::new(),
187            strict_reject: None,
188            preflighted: false,
189            committed: false,
190        }
191    }
192
193    /// Configured launch stream.
194    pub fn launch_stream(&self) -> StreamId {
195        self.launch_stream
196    }
197
198    /// Configured mode.
199    pub fn mode(&self) -> RecorderMode {
200        self.mode
201    }
202
203    /// Snapshot a block reference into a recorded use. Reject
204    /// post-preflight additions so the validity check at
205    /// preflight time stays the source of truth.
206    fn note(
207        &mut self,
208        label: &'static str,
209        block: Option<&DeviceBlock>,
210        access: Access,
211        external: bool,
212    ) -> &mut Self {
213        if self.preflighted && self.strict_reject.is_none() {
214            self.strict_reject = Some(ResourceError::StreamMisuse(format!(
215                "LaunchRecorder::{}: recorded after preflight — once preflight \
216                 succeeds, the set of uses is frozen so commit-time discoveries \
217                 cannot leave unprotected work in flight. Record this use BEFORE \
218                 preflight (the recorder is lifetime-free; snapshots release the \
219                 source borrow immediately, so kernel-param &mut borrows still \
220                 work)",
221                label,
222            )));
223            return self;
224        }
225        if let Some(b) = block {
226            self.uses.push(RecordedUse {
227                block: BlockId::from_block(b),
228                access,
229                label,
230            });
231            return self;
232        }
233        if self.mode == RecorderMode::Strict && self.strict_reject.is_none() {
234            let why = if external {
235                "external (DLPack / ArrowDevice) memory has no runtime identity; \
236                 strict launch recorders cannot attach a cross-stream use to it. \
237                 Use a permissive recorder OR coordinate the cross-stream \
238                 synchronization explicitly outside xlog"
239            } else {
240                "buffer is legacy cudarc-backed (no runtime block); strict launch \
241                 recorders require the allocation to be routed through \
242                 GpuMemoryManager::with_runtime so a DeviceBlock is available"
243            };
244            self.strict_reject = Some(ResourceError::StreamMisuse(format!(
245                "LaunchRecorder::{}: untracked buffer rejected — {}",
246                label, why
247            )));
248        }
249        self
250    }
251
252    /// Record a runtime-backed [`crate::memory::TrackedCudaSlice`]
253    /// the launch will read.
254    pub fn read<T: cudarc::driver::DeviceRepr>(
255        &mut self,
256        slice: &crate::memory::TrackedCudaSlice<T>,
257    ) -> &mut Self {
258        self.note("read", slice.runtime_block(), Access::Read, false)
259    }
260
261    /// Record a runtime-backed slice the launch will write.
262    /// Use this for both pre-existing buffers being overwritten
263    /// AND for fresh runtime-backed allocations whose lifetime
264    /// began in the same operator. The recorder snapshots block
265    /// identity at record time and drops the borrow, so kernel
266    /// `&mut slice` borrows after preflight are unaffected.
267    pub fn write<T: cudarc::driver::DeviceRepr>(
268        &mut self,
269        slice: &crate::memory::TrackedCudaSlice<T>,
270    ) -> &mut Self {
271        self.note("write", slice.runtime_block(), Access::Write, false)
272    }
273
274    /// Record a runtime-backed slice the launch will both read
275    /// and write.
276    pub fn read_write<T: cudarc::driver::DeviceRepr>(
277        &mut self,
278        slice: &crate::memory::TrackedCudaSlice<T>,
279    ) -> &mut Self {
280        self.note(
281            "read_write",
282            slice.runtime_block(),
283            Access::ReadWrite,
284            false,
285        )
286    }
287
288    /// Record a [`crate::memory::CudaColumn`] the launch will
289    /// read. Owned columns surface their runtime block; external
290    /// (`Dlpack` / `ArrowDevice`) columns are rejected in strict
291    /// mode and silently skipped in permissive mode.
292    pub fn read_column(&mut self, col: &crate::memory::CudaColumn) -> &mut Self {
293        self.note(
294            "read_column",
295            col.runtime_block(),
296            Access::Read,
297            col.is_external(),
298        )
299    }
300
301    /// Record a [`crate::memory::CudaColumn`] the launch will
302    /// write.
303    pub fn write_column(&mut self, col: &crate::memory::CudaColumn) -> &mut Self {
304        self.note(
305            "write_column",
306            col.runtime_block(),
307            Access::Write,
308            col.is_external(),
309        )
310    }
311
312    /// Record a [`crate::provider::RawCudaView`]-style view that
313    /// borrows a region of a runtime-backed allocation. The
314    /// view must carry its source block via `runtime_block()`;
315    /// strict mode rejects views built from legacy / external
316    /// paths.
317    ///
318    /// Public API placeholder for the upcoming filter-class
319    /// migration; no production caller exists yet.
320    #[allow(dead_code)]
321    pub(crate) fn read_view_runtime(&mut self, block: Option<&DeviceBlock>) -> &mut Self {
322        self.note("read_view", block, Access::Read, false)
323    }
324
325    /// Number of recorded runtime-backed uses. Diagnostic.
326    pub fn recorded_count(&self) -> usize {
327        self.uses.len()
328    }
329
330    /// Preflight: validate the recorder is ready to commit
331    /// against `runtime` AND queue every cross-stream wait the
332    /// recorded access kinds require. **Stateful** — sets a flag
333    /// that `commit` checks. MUST be called BEFORE enqueueing
334    /// the CUDA launch / copy. On failure no CUDA work has been
335    /// queued yet, the flag remains unset, and the caller can
336    /// either fix the recorder or abandon the launch.
337    ///
338    /// Verifies (in order):
339    ///   * No strict-mode rejection accumulated during recording
340    ///     (untracked / external buffer in strict mode, or
341    ///     post-preflight `note` attempt).
342    ///   * The active resource stack supports cross-stream
343    ///     tracking (`runtime.supports_block_use_tracking()`)
344    ///     OR the recorder has zero tracked uses (no events to
345    ///     record).
346    ///
347    /// Then for each recorded use, calls
348    /// [`XlogDeviceRuntime::prepare_block_use`] which queues
349    /// `cuStreamWaitEvent` calls on `launch_stream` for any
350    /// prior write (read access) or any prior write + prior
351    /// reads (write / read-write access) on a different stream.
352    /// Same-stream events are skipped — already ordered.
353    ///
354    /// Repeated registrations of the same block in the same
355    /// recorder are deduplicated to a single prepare call (the
356    /// strongest access kind wins): `read` + `write` of the
357    /// same block becomes one `Access::ReadWrite` prepare.
358    pub fn preflight(&mut self, runtime: &XlogDeviceRuntime) -> ResourceResult<()> {
359        if let Some(err) = &self.strict_reject {
360            // Surface the captured strict-mode rejection
361            // verbatim. Do NOT mark preflighted.
362            return Err(ResourceError::StreamMisuse(format!("{}", err)));
363        }
364        if !self.uses.is_empty() && !runtime.supports_block_use_tracking() {
365            return Err(ResourceError::StreamMisuse(
366                "LaunchRecorder::preflight: active resource does not support \
367                 cross-stream use tracking. Build the runtime around \
368                 AsyncCudaResource (or a decorator stack over it) for \
369                 stream-lifetime-safe launches"
370                    .to_string(),
371            ));
372        }
373
374        let deduped = dedup_uses(&self.uses);
375        for use_ in &deduped {
376            runtime.prepare_block_use(use_.block, self.launch_stream, use_.access)?;
377        }
378
379        self.preflighted = true;
380        Ok(())
381    }
382
383    /// Commit the recorded uses to the runtime. MUST be called
384    /// AFTER preflight succeeded AND the CUDA launch has been
385    /// enqueued on `launch_stream`.
386    ///
387    /// **Non-empty recorders that were not preflighted are
388    /// rejected** with `StreamMisuse`. This closes the footgun
389    /// where a caller could enqueue CUDA work, then call
390    /// commit, then discover at commit-time that the active
391    /// resource is unsupported — leaving unprotected work in
392    /// flight. Production migrated launch paths must therefore
393    /// always preflight BEFORE the CUDA call.
394    ///
395    /// Empty recorders (no recorded uses) bypass the check:
396    /// nothing to record, no events to fire, no contract to
397    /// honor.
398    ///
399    /// For each recorded use, calls
400    /// [`XlogDeviceRuntime::finish_block_use`] which records an
401    /// event on `launch_stream` and folds it into the block's
402    /// dependency state (writers replace `last_write` and clear
403    /// `outstanding_reads`; readers append to
404    /// `outstanding_reads`). Repeated registrations of the same
405    /// block are deduplicated identically to preflight.
406    pub fn commit(mut self, runtime: &XlogDeviceRuntime) -> ResourceResult<()> {
407        // Re-check any strict reject that may have accumulated
408        // — preflight may not have been called, or may not have
409        // surfaced this particular path. (Same string as
410        // preflight would produce.)
411        if let Some(err) = self.strict_reject.take() {
412            return Err(err);
413        }
414        if !self.uses.is_empty() && !self.preflighted {
415            return Err(ResourceError::StreamMisuse(
416                "LaunchRecorder::commit: non-empty recorder reached commit without \
417                 a successful preflight. The caller MUST call preflight(&runtime) \
418                 BEFORE enqueueing CUDA work; otherwise commit-time failures leave \
419                 unprotected work in flight. See the preflight + commit contract \
420                 in the LaunchRecorder doc"
421                    .to_string(),
422            ));
423        }
424
425        let deduped = dedup_uses(&self.uses);
426        for use_ in &deduped {
427            runtime.finish_block_use(use_.block, self.launch_stream, use_.access)?;
428        }
429        self.committed = true;
430        Ok(())
431    }
432}
433
434/// Collapse multiple registrations of the same block into one
435/// use with the strongest access.
436///
437/// The dedup key is `(ptr, generation, device_ordinal)` — NOT
438/// `ptr` alone. ABA reuse inside a single recorder is rare but
439/// possible (record use of buffer X, drop X, allocate a new
440/// block reusing X's address, record THAT) and a ptr-only key
441/// would incorrectly merge those two distinct uses. Keying by
442/// the full identity tuple lets the prepare/finish path see
443/// each generation's events independently; the runtime's
444/// generation guard then catches any stale id at the resource
445/// boundary.
446///
447/// Access combine: Read+Write → ReadWrite; otherwise the
448/// strongest of the two operands wins.
449fn dedup_uses(uses: &[RecordedUse]) -> Vec<RecordedUse> {
450    let mut by_id: HashMap<(u64, Generation, u32), usize> = HashMap::with_capacity(uses.len());
451    let mut deduped: Vec<RecordedUse> = Vec::with_capacity(uses.len());
452    for use_ in uses {
453        let key = (
454            use_.block.ptr,
455            use_.block.generation,
456            use_.block.device_ordinal,
457        );
458        match by_id.get(&key) {
459            Some(&idx) => {
460                deduped[idx].access = combine_access(deduped[idx].access, use_.access);
461            }
462            None => {
463                by_id.insert(key, deduped.len());
464                deduped.push(*use_);
465            }
466        }
467    }
468    deduped
469}
470
471/// Strongest-access lattice: ReadWrite >= Write/Read; Write+Read = ReadWrite.
472fn combine_access(a: Access, b: Access) -> Access {
473    match (a, b) {
474        (Access::ReadWrite, _) | (_, Access::ReadWrite) => Access::ReadWrite,
475        (Access::Read, Access::Write) | (Access::Write, Access::Read) => Access::ReadWrite,
476        (Access::Read, Access::Read) => Access::Read,
477        (Access::Write, Access::Write) => Access::Write,
478    }
479}
480
481impl Drop for LaunchRecorder {
482    fn drop(&mut self) {
483        if !self.committed && !self.uses.is_empty() {
484            #[cfg(debug_assertions)]
485            eprintln!(
486                "[xlog_cuda::launch] LaunchRecorder dropped without commit: \
487                 {} uses on launch_stream={} (mode={:?}) were NOT recorded; \
488                 cross-stream lifetime safety lost for this launch",
489                self.uses.len(),
490                self.launch_stream.0,
491                self.mode,
492            );
493        }
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use crate::device_runtime::{
501        AsyncCudaResource, DeviceMemoryResource, DirectCudaResource, StreamPool,
502    };
503    use crate::CudaDevice;
504    use std::sync::Arc;
505    use xlog_core::MemoryBudget;
506
507    fn try_async_runtime() -> Option<(Arc<CudaDevice>, Arc<XlogDeviceRuntime>, StreamId)> {
508        let device = Arc::new(CudaDevice::new(0).ok()?);
509        let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
510        let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
511            AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
512        );
513        let runtime = Arc::new(XlogDeviceRuntime::with_resource(
514            Arc::clone(&device),
515            0,
516            Arc::clone(&pool),
517            async_resource,
518        ));
519        let launch_stream = pool.acquire().ok()?;
520        Some((device, runtime, launch_stream))
521    }
522
523    fn try_direct_runtime() -> Option<(Arc<CudaDevice>, Arc<XlogDeviceRuntime>, StreamId)> {
524        let device = Arc::new(CudaDevice::new(0).ok()?);
525        let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
526        let direct: Box<dyn DeviceMemoryResource + Send + Sync> =
527            Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
528        let runtime = Arc::new(XlogDeviceRuntime::with_resource(
529            Arc::clone(&device),
530            0,
531            Arc::clone(&pool),
532            direct,
533        ));
534        Some((device, runtime, StreamId::DEFAULT))
535    }
536
537    #[test]
538    fn empty_commit_is_ok_in_both_modes() {
539        let Some((_d, rt, ls)) = try_async_runtime() else {
540            return;
541        };
542        LaunchRecorder::new_permissive(ls)
543            .commit(&rt)
544            .expect("permissive empty");
545        LaunchRecorder::new_strict(ls)
546            .commit(&rt)
547            .expect("strict empty");
548    }
549
550    #[test]
551    fn permissive_skips_legacy_silently() {
552        let Some(device) = CudaDevice::new(0).ok().map(Arc::new) else {
553            return;
554        };
555        let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
556        let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
557            AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
558        );
559        let runtime = Arc::new(XlogDeviceRuntime::with_resource(
560            Arc::clone(&device),
561            0,
562            Arc::clone(&pool),
563            async_resource,
564        ));
565        let launch_stream = pool.acquire().expect("acquire");
566
567        // Legacy manager — no runtime — produces None block.
568        let manager = Arc::new(crate::GpuMemoryManager::new(
569            Arc::clone(&device),
570            MemoryBudget::with_limit(1024 * 1024),
571        ));
572        let legacy = manager.alloc::<u8>(64).expect("legacy alloc");
573        assert!(legacy.runtime_block().is_none());
574
575        let mut rec = LaunchRecorder::new_permissive(launch_stream);
576        rec.read(&legacy);
577        assert_eq!(rec.recorded_count(), 0);
578        rec.preflight(&runtime).expect("permissive preflight");
579        rec.commit(&runtime).expect("permissive commit");
580    }
581
582    #[test]
583    fn strict_rejects_legacy_at_preflight() {
584        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
585            return;
586        };
587        let manager = Arc::new(crate::GpuMemoryManager::new(
588            Arc::clone(&device),
589            MemoryBudget::with_limit(1024 * 1024),
590        ));
591        let legacy = manager.alloc::<u8>(64).expect("legacy alloc");
592
593        let mut rec = LaunchRecorder::new_strict(launch_stream);
594        rec.read(&legacy);
595        let err = rec.preflight(&runtime);
596        match err {
597            Err(ResourceError::StreamMisuse(msg)) => {
598                assert!(msg.contains("untracked buffer rejected"), "msg: {}", msg);
599            }
600            other => panic!(
601                "strict mode must reject untracked buffer at preflight; got {:?}",
602                other
603            ),
604        }
605    }
606
607    #[test]
608    fn preflight_rejects_direct_runtime_before_enqueue() {
609        let Some((device, runtime, launch_stream)) = try_direct_runtime() else {
610            return;
611        };
612        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
613            Arc::clone(&device),
614            MemoryBudget::with_limit(1024 * 1024),
615            Arc::clone(&runtime),
616        ));
617        let buf = manager.alloc::<u8>(64).expect("alloc");
618        assert!(buf.runtime_block().is_some());
619
620        let mut rec = LaunchRecorder::new_strict(launch_stream);
621        rec.read(&buf);
622        let err = rec.preflight(&runtime);
623        match err {
624            Err(ResourceError::StreamMisuse(msg)) => {
625                assert!(
626                    msg.contains("does not support cross-stream use tracking"),
627                    "msg: {}",
628                    msg
629                );
630            }
631            other => panic!(
632                "preflight must reject Direct-backed runtime before enqueue; got {:?}",
633                other
634            ),
635        }
636    }
637
638    #[test]
639    fn preflight_then_commit_async_runtime() {
640        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
641            return;
642        };
643        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
644            Arc::clone(&device),
645            MemoryBudget::with_limit(1024 * 1024),
646            Arc::clone(&runtime),
647        ));
648        let buf = manager.alloc::<u8>(64).expect("alloc");
649
650        let mut rec = LaunchRecorder::new_strict(launch_stream);
651        rec.read(&buf);
652        rec.preflight(&runtime).expect("preflight ok");
653        // (in production: enqueue CUDA launch here)
654        rec.commit(&runtime).expect("commit ok");
655    }
656
657    #[test]
658    fn commit_rejects_un_preflighted_strict_recorder() {
659        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
660            return;
661        };
662        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
663            Arc::clone(&device),
664            MemoryBudget::with_limit(1024 * 1024),
665            Arc::clone(&runtime),
666        ));
667        let buf = manager.alloc::<u8>(64).expect("alloc");
668
669        let mut rec = LaunchRecorder::new_strict(launch_stream);
670        rec.read(&buf);
671        let err = rec.commit(&runtime);
672        match err {
673            Err(ResourceError::StreamMisuse(msg)) => {
674                assert!(
675                    msg.contains("without a successful preflight"),
676                    "msg: {}",
677                    msg
678                );
679            }
680            other => panic!(
681                "non-empty un-preflighted commit must return StreamMisuse, got {:?}",
682                other
683            ),
684        }
685    }
686
687    #[test]
688    fn empty_recorder_commit_without_preflight_is_ok() {
689        let Some((_d, rt, ls)) = try_async_runtime() else {
690            return;
691        };
692        LaunchRecorder::new_strict(ls)
693            .commit(&rt)
694            .expect("empty strict commit without preflight");
695    }
696
697    #[test]
698    fn note_after_preflight_via_standard_method_is_rejected() {
699        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
700            return;
701        };
702        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
703            Arc::clone(&device),
704            MemoryBudget::with_limit(1024 * 1024),
705            Arc::clone(&runtime),
706        ));
707        let buf_a = manager.alloc::<u8>(64).expect("alloc a");
708        let buf_b = manager.alloc::<u8>(64).expect("alloc b");
709
710        let mut rec = LaunchRecorder::new_strict(launch_stream);
711        rec.read(&buf_a);
712        rec.preflight(&runtime).expect("preflight ok");
713        rec.read(&buf_b);
714        let err = rec.commit(&runtime);
715        match err {
716            Err(ResourceError::StreamMisuse(msg)) => {
717                assert!(msg.contains("recorded after preflight"), "msg: {}", msg);
718            }
719            other => panic!(
720                "post-preflight standard-method record must be rejected; got {:?}",
721                other
722            ),
723        }
724    }
725
726    /// Pre-launch fresh-write path: fresh outputs are recorded
727    /// BEFORE preflight via the regular `write` API. Snapshot
728    /// drops the source borrow, so kernel `&mut` borrows after
729    /// preflight remain valid.
730    #[test]
731    fn pre_preflight_fresh_write_is_accepted() {
732        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
733            return;
734        };
735        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
736            Arc::clone(&device),
737            MemoryBudget::with_limit(1024 * 1024),
738            Arc::clone(&runtime),
739        ));
740        let buf_a = manager.alloc::<u8>(64).expect("alloc a");
741        let mut buf_fresh = manager.alloc::<u8>(64).expect("alloc fresh");
742
743        let mut rec = LaunchRecorder::new_strict(launch_stream);
744        rec.read(&buf_a);
745        rec.write(&buf_fresh);
746        rec.preflight(&runtime).expect("preflight ok");
747        // Borrows are released; kernel-style &mut works here.
748        let _kernel_param = &mut buf_fresh;
749        rec.commit(&runtime).expect("commit ok");
750    }
751
752    /// Read+write of the same block in a single recorder
753    /// dedupes to a single ReadWrite prepare/finish call.
754    #[test]
755    fn read_then_write_same_block_dedupes_to_read_write() {
756        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
757            return;
758        };
759        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
760            Arc::clone(&device),
761            MemoryBudget::with_limit(1024 * 1024),
762            Arc::clone(&runtime),
763        ));
764        let buf = manager.alloc::<u8>(64).expect("alloc");
765
766        let mut rec = LaunchRecorder::new_strict(launch_stream);
767        rec.read(&buf);
768        rec.write(&buf);
769        rec.preflight(&runtime).expect("preflight");
770        rec.commit(&runtime).expect("commit");
771    }
772
773    /// Locks the dedup key: `(ptr, generation, device_ordinal)`,
774    /// not `ptr` alone. Two `RecordedUse`s sharing a ptr but
775    /// differing in generation MUST be treated as distinct
776    /// entries — otherwise an ABA reuse inside a single recorder
777    /// would silently collapse an event for the new allocation
778    /// onto the old block's prepare/finish chain.
779    #[test]
780    fn dedup_keys_on_full_block_id_not_ptr_alone() {
781        // Construct two RecordedUses with the same ptr but
782        // distinct generations — directly drive `dedup_uses` so
783        // the test is deterministic and does not require ABA to
784        // actually occur on real CUDA.
785        let block_a = BlockId {
786            ptr: 0xdead_beef,
787            generation: Generation(1),
788            alloc_stream: StreamId::DEFAULT,
789            device_ordinal: 0,
790        };
791        let block_b = BlockId {
792            ptr: 0xdead_beef,
793            generation: Generation(2),
794            alloc_stream: StreamId::DEFAULT,
795            device_ordinal: 0,
796        };
797        let uses = vec![
798            RecordedUse {
799                block: block_a,
800                access: Access::Read,
801                label: "read",
802            },
803            RecordedUse {
804                block: block_b,
805                access: Access::Write,
806                label: "write",
807            },
808        ];
809        let deduped = dedup_uses(&uses);
810        assert_eq!(deduped.len(), 2, "ABA generations must NOT collapse");
811        assert_eq!(deduped[0].block.generation, Generation(1));
812        assert_eq!(deduped[0].access, Access::Read);
813        assert_eq!(deduped[1].block.generation, Generation(2));
814        assert_eq!(deduped[1].access, Access::Write);
815
816        // Same ptr + same generation + duplicate access must
817        // collapse into one entry with combined access.
818        let same_id = vec![
819            RecordedUse {
820                block: block_a,
821                access: Access::Read,
822                label: "read",
823            },
824            RecordedUse {
825                block: block_a,
826                access: Access::Write,
827                label: "write",
828            },
829        ];
830        let collapsed = dedup_uses(&same_id);
831        assert_eq!(collapsed.len(), 1);
832        assert_eq!(collapsed[0].access, Access::ReadWrite);
833    }
834
835    #[test]
836    fn read_column_owned_runtime_backed() {
837        use crate::memory::CudaColumn;
838        let Some((device, runtime, launch_stream)) = try_async_runtime() else {
839            return;
840        };
841        let manager = Arc::new(crate::GpuMemoryManager::with_runtime(
842            Arc::clone(&device),
843            MemoryBudget::with_limit(1024 * 1024),
844            Arc::clone(&runtime),
845        ));
846        let slice = manager.alloc::<u8>(64).expect("alloc");
847        let col = CudaColumn::owned(slice);
848        assert!(col.runtime_block().is_some());
849
850        let mut rec = LaunchRecorder::new_strict(launch_stream);
851        rec.read_column(&col);
852        assert_eq!(rec.recorded_count(), 1);
853        rec.preflight(&runtime).expect("preflight");
854        rec.commit(&runtime).expect("commit");
855    }
856}