Skip to main content

xlog_cuda/
memory.rs

1//! CUDA memory management
2//!
3//! This module provides GPU memory management with budget enforcement.
4//! It wraps cudarc's allocation functions and tracks total allocated memory.
5
6use std::mem::ManuallyDrop;
7use std::ops::{Deref, DerefMut};
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10
11use cudarc::driver::{CudaSlice, CudaStream, DevicePtr, DevicePtrMut, DeviceSlice, SyncOnDrop};
12use xlog_core::{MemoryBudget, Result, Schema, XlogError};
13
14use crate::arrow_device::ArrowDeviceImport;
15use crate::cuda_compat::{AsKernelParam, DeviceParamStorage, IntoKernelParamStorage};
16use crate::device_runtime::{AllocTag, DeviceBlock, ResourceError, StreamId, XlogDeviceRuntime};
17use crate::dlpack::DlpackManagedTensor;
18use crate::CudaDevice;
19
20/// GPU memory manager with budget enforcement
21///
22/// Tracks allocated GPU memory and enforces a memory budget.
23/// When the budget would be exceeded, returns `XlogError::ResourceExhausted`.
24///
25/// # v0.6 device-runtime routing (opt-in)
26///
27/// Constructing via [`GpuMemoryManager::with_runtime`] attaches an
28/// [`XlogDeviceRuntime`] that mediates allocations through the v0.6
29/// resource stack (e.g., `GlobalDeviceBudget` → `LoggingResource` →
30/// `AsyncCudaResource`). When attached:
31///   * [`GpuMemoryManager::alloc::<T>`] routes the underlying
32///     allocation through the runtime and produces a typed view via
33///     cudarc's `upgrade_device_ptr::<T>`. The returned
34///     [`TrackedCudaSlice`] frees through the runtime on drop.
35///   * [`GpuMemoryManager::alloc_raw`] is the explicit raw-bytes
36///     entry point (no typed view), also runtime-routed.
37///
38/// Both budgets apply: the manager's local `MemoryBudget` AND any
39/// `GlobalDeviceBudget` stacked above the runtime's underlying
40/// resource.
41///
42/// When the manager is constructed via [`GpuMemoryManager::new`]
43/// (no runtime attached), `alloc::<T>` and the rest of the public
44/// API behave bit-for-bit identically to pre-migration: cudarc's
45/// `device.alloc::<T>(len)` allocates and `cudarc` frees on drop.
46/// `alloc_raw` returns `XlogError::Kernel` when no runtime is
47/// attached (no silent fallback). `CudaKernelProvider::new`
48/// continues to construct the manager via `new` for now;
49/// runtime-routed providers are an opt-in through `with_runtime`
50/// at construction sites that need it.
51pub struct GpuMemoryManager {
52    /// The CUDA device for memory operations
53    device: Arc<CudaDevice>,
54    /// Memory budget configuration
55    budget: MemoryBudget,
56    /// Currently allocated bytes (tracked atomically for thread safety)
57    allocated: AtomicU64,
58    /// High-water mark of `allocated` since construction or the last
59    /// [`reset_peak`](Self::reset_peak). Updated at the two reservation
60    /// funnels (`alloc`, `alloc_raw`); used by measurement harnesses to
61    /// report true peak device-memory pressure across an operation.
62    peak: AtomicU64,
63    /// Count of `alloc` calls (device allocation requests). Resettable; used by
64    /// the GPU-resident MC engine's no-host gate to prove that **zero** device
65    /// allocations happen inside the measured region (all arenas are allocated
66    /// before it). Distinct from `allocated` (bytes).
67    alloc_count: AtomicU64,
68    /// Optional v0.6 device runtime. When set, [`alloc_raw`]
69    /// reserves through the runtime's resource stack in addition
70    /// to enforcing the local budget; both must accept for the
71    /// allocation to proceed.
72    runtime: Option<Arc<XlogDeviceRuntime>>,
73}
74
75/// Selects which allocator owns the underlying device memory of a
76/// [`TrackedCudaSlice`]. Internal — surfaced only via the methods
77/// on `TrackedCudaSlice`. Migrated allocations carry `Runtime`
78/// backing; legacy allocations stay on `Cudarc`.
79enum Backing {
80    /// Legacy: cudarc owns the slice. The inner `CudaSlice<T>` is
81    /// the actual handle returned by `device.alloc::<T>(..)`, and
82    /// dropping it invokes cudarc's free path. The
83    /// `TrackedCudaSlice` `Drop` impl runs that drop explicitly so
84    /// the timing is identical to pre-migration behavior.
85    Cudarc,
86    /// v0.6 runtime-routed: the [`XlogDeviceRuntime`] owns the
87    /// allocation via its resource stack, and the inner
88    /// `CudaSlice<T>` is a typed view created by
89    /// `upgrade_device_ptr::<T>` over the runtime's raw pointer.
90    /// On drop, the inner view must be **forgotten** (cudarc must
91    /// not free) and the runtime must be told to deallocate the
92    /// `DeviceBlock`. Order of operations matters: deallocate the
93    /// block first, then forget the view, so the runtime sees the
94    /// block in its `live` map.
95    Runtime {
96        runtime: Arc<XlogDeviceRuntime>,
97        block: Option<DeviceBlock>,
98    },
99}
100
101/// Debug probe: poison legacy allocations with 0xDD at drop so any
102/// live alias of freed memory becomes visually distinct. Gated on
103/// `XLOG_DEBUG_POISON_FREE=1`, read once per process.
104fn poison_free_enabled() -> bool {
105    static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
106    *ENABLED.get_or_init(|| std::env::var("XLOG_DEBUG_POISON_FREE").map(|v| v == "1") == Ok(true))
107}
108
109/// Debug probe: poison fresh legacy allocations with 0xDD so reads of
110/// unwritten contents surface deterministically. Gated on
111/// `XLOG_DEBUG_POISON_ALLOC=1`, read once per process.
112fn poison_alloc_enabled() -> bool {
113    static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
114    *ENABLED.get_or_init(|| std::env::var("XLOG_DEBUG_POISON_ALLOC").map(|v| v == "1") == Ok(true))
115}
116
117/// Debug probe: track live legacy allocation ranges and panic if the
118/// allocator ever hands out a region overlapping one that is still
119/// live (double-hand-out / use-after-free detector, timing
120/// independent). Gated on `XLOG_DEBUG_ALLOC_GUARD=1`.
121fn alloc_guard() -> Option<&'static std::sync::Mutex<std::collections::BTreeMap<u64, u64>>> {
122    static GUARD: std::sync::OnceLock<
123        Option<std::sync::Mutex<std::collections::BTreeMap<u64, u64>>>,
124    > = std::sync::OnceLock::new();
125    GUARD
126        .get_or_init(|| {
127            if std::env::var("XLOG_DEBUG_ALLOC_GUARD").map(|v| v == "1") == Ok(true) {
128                Some(std::sync::Mutex::new(std::collections::BTreeMap::new()))
129            } else {
130                None
131            }
132        })
133        .as_ref()
134}
135
136fn alloc_guard_insert(ptr: u64, bytes: u64) {
137    let Some(guard) = alloc_guard() else { return };
138    if bytes == 0 {
139        return;
140    }
141    let mut live = guard.lock().unwrap();
142    // Overlap check against the nearest live range at or below ptr and
143    // the first live range above it.
144    if let Some((&p, &b)) = live.range(..=ptr).next_back() {
145        if p + b > ptr {
146            panic!(
147                "ALLOC GUARD: new allocation [{:#x}, {:#x}) overlaps live [{:#x}, {:#x})",
148                ptr,
149                ptr + bytes,
150                p,
151                p + b
152            );
153        }
154    }
155    if let Some((&p, _)) = live.range(ptr + 1..).next() {
156        if ptr + bytes > p {
157            panic!(
158                "ALLOC GUARD: new allocation [{:#x}, {:#x}) overlaps live starting at {:#x}",
159                ptr,
160                ptr + bytes,
161                p
162            );
163        }
164    }
165    live.insert(ptr, bytes);
166}
167
168fn alloc_guard_remove(ptr: u64) {
169    let Some(guard) = alloc_guard() else { return };
170    guard.lock().unwrap().remove(&ptr);
171}
172
173/// A `CudaSlice` that automatically updates `GpuMemoryManager`
174/// allocation tracking on drop. Inner slice is wrapped in
175/// `ManuallyDrop` so the [`Backing`] enum can choose between
176/// cudarc-side free (legacy) and runtime-side deallocate (migrated)
177/// without producing a double-free.
178pub struct TrackedCudaSlice<T: cudarc::driver::DeviceRepr> {
179    bytes: u64,
180    manager: Arc<GpuMemoryManager>,
181    inner: ManuallyDrop<CudaSlice<T>>,
182    raw_ptr: cudarc::driver::sys::CUdeviceptr,
183    backing: Backing,
184}
185
186impl<T: cudarc::driver::DeviceRepr> Deref for TrackedCudaSlice<T> {
187    type Target = CudaSlice<T>;
188
189    fn deref(&self) -> &Self::Target {
190        &self.inner
191    }
192}
193
194impl<T: cudarc::driver::DeviceRepr> DerefMut for TrackedCudaSlice<T> {
195    fn deref_mut(&mut self) -> &mut Self::Target {
196        &mut self.inner
197    }
198}
199
200impl<T: cudarc::driver::DeviceRepr> DeviceSlice<T> for TrackedCudaSlice<T> {
201    fn len(&self) -> usize {
202        self.inner.len()
203    }
204
205    fn stream(&self) -> &Arc<CudaStream> {
206        self.inner.stream()
207    }
208}
209
210impl<T: cudarc::driver::DeviceRepr> DevicePtr<T> for TrackedCudaSlice<T> {
211    fn device_ptr<'a>(
212        &'a self,
213        stream: &'a CudaStream,
214    ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
215        // Explicit `&*` deref through ManuallyDrop — the trait
216        // method is not auto-resolved through the wrapper.
217        DevicePtr::device_ptr(&*self.inner, stream)
218    }
219}
220
221impl<T: cudarc::driver::DeviceRepr> DevicePtrMut<T> for TrackedCudaSlice<T> {
222    fn device_ptr_mut<'a>(
223        &'a mut self,
224        stream: &'a CudaStream,
225    ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
226        DevicePtrMut::device_ptr_mut(&mut *self.inner, stream)
227    }
228}
229
230impl<T: cudarc::driver::DeviceRepr> TrackedCudaSlice<T> {
231    pub fn device_ptr(&self) -> &cudarc::driver::sys::CUdeviceptr {
232        &self.raw_ptr
233    }
234
235    pub fn device_ptr_value(&self) -> cudarc::driver::sys::CUdeviceptr {
236        self.raw_ptr
237    }
238
239    /// Stable address of the memory manager that owns this allocation.
240    pub fn memory_manager_ptr_value(&self) -> usize {
241        Arc::as_ptr(&self.manager) as usize
242    }
243
244    /// Borrow the underlying [`DeviceBlock`] for runtime-backed
245    /// allocations. Returns `None` for legacy cudarc-backed
246    /// slices ([`Backing::Cudarc`]) — those are not tracked by
247    /// the v0.6 device runtime and therefore have no
248    /// runtime-side block to record uses against.
249    ///
250    /// Callers (notably [`crate::launch::LaunchRecorder`]) use
251    /// this to attach cross-stream uses via
252    /// [`crate::device_runtime::XlogDeviceRuntime::record_block_use`].
253    /// A `None` return signals that the slice is on the legacy
254    /// path and the recorder cannot track its lifetime — callers
255    /// must either route the allocation through
256    /// [`GpuMemoryManager::with_runtime`] or accept that no
257    /// cross-stream safety applies to this buffer.
258    pub fn runtime_block(&self) -> Option<&crate::device_runtime::DeviceBlock> {
259        match &self.backing {
260            Backing::Cudarc => None,
261            Backing::Runtime { block, .. } => block.as_ref(),
262        }
263    }
264
265    /// Reinterpret this typed allocation as a raw byte allocation.
266    ///
267    /// This is a zero-copy conversion used by XLOG's columnar
268    /// `CudaBuffer` representation, which stores device memory as
269    /// untyped bytes + a schema. The conversion preserves the
270    /// underlying [`Backing`] — runtime-routed slices remain
271    /// runtime-routed, legacy cudarc slices remain cudarc-routed —
272    /// so deallocation continues to match the original allocator.
273    pub fn into_bytes(self) -> TrackedCudaSlice<u8> {
274        // Wrap `self` in `ManuallyDrop` so its `Drop` impl never
275        // runs — we are doing the cleanup manually below by either
276        // (a) leaving the original `inner` forgotten and reusing
277        // its `backing` (Runtime mode), or (b) leaving the original
278        // `inner` forgotten while the new u8 view takes ownership
279        // via `upgrade_device_ptr` (Cudarc mode — same dance as
280        // the pre-migration code).
281        let this = ManuallyDrop::new(self);
282        let bytes = this.bytes;
283        let manager = Arc::clone(&this.manager);
284        let ptr = this.raw_ptr;
285
286        let len_bytes: usize = bytes
287            .try_into()
288            .expect("TrackedCudaSlice byte size must fit into usize");
289
290        // SAFETY: `this` is `ManuallyDrop`, so its destructor will
291        // not run. We bit-copy `backing` out of the original; the
292        // original location is forgotten along with the rest of
293        // `this`. This is sound because each field is owned and not
294        // touched again.
295        let backing: Backing = unsafe { std::ptr::read(&this.backing) };
296
297        // SAFETY: the runtime / cudarc-side memory is still live —
298        // the original `inner` ManuallyDrop never had its
299        // destructor called, so cudarc has not freed. The new
300        // `CudaSlice<u8>` is a typed view over the same bytes.
301        // For Cudarc backing the new view will free on drop (one
302        // alloc, one free, balanced — same as pre-migration).
303        // For Runtime backing the new view will be `mem::forget`
304        // -ed by the new `Drop` impl, and the runtime's
305        // `deallocate(block)` (carried in `backing`) is the sole
306        // free path.
307        let new_inner = unsafe {
308            manager
309                .device
310                .inner()
311                .upgrade_device_ptr::<u8>(ptr, len_bytes)
312        };
313
314        TrackedCudaSlice {
315            bytes,
316            manager,
317            inner: ManuallyDrop::new(new_inner),
318            raw_ptr: ptr,
319            backing,
320        }
321    }
322}
323
324impl<T: cudarc::driver::DeviceRepr> AsKernelParam for &TrackedCudaSlice<T> {
325    fn as_kernel_param(&self) -> *mut std::ffi::c_void {
326        ((*self).device_ptr() as *const cudarc::driver::sys::CUdeviceptr)
327            .cast_mut()
328            .cast()
329    }
330}
331
332impl<T: cudarc::driver::DeviceRepr> AsKernelParam for &mut TrackedCudaSlice<T> {
333    fn as_kernel_param(&self) -> *mut std::ffi::c_void {
334        ((self.device_ptr()) as *const cudarc::driver::sys::CUdeviceptr)
335            .cast_mut()
336            .cast()
337    }
338}
339
340impl<'a, T: cudarc::driver::DeviceRepr> IntoKernelParamStorage for &'a TrackedCudaSlice<T> {
341    type Storage = DeviceParamStorage<'a>;
342
343    fn into_kernel_param_storage(self) -> Self::Storage {
344        let (ptr, sync) = DevicePtr::device_ptr(&*self.inner, self.inner.stream());
345        DeviceParamStorage::synced(ptr, sync)
346    }
347}
348
349impl<T: cudarc::driver::DeviceRepr> IntoKernelParamStorage for &mut TrackedCudaSlice<T> {
350    type Storage = DeviceParamStorage<'static>;
351
352    fn into_kernel_param_storage(self) -> Self::Storage {
353        let stream = self.inner.stream().clone();
354        let (ptr, sync) = DevicePtrMut::device_ptr_mut(&mut *self.inner, &stream);
355        std::mem::forget(sync);
356        DeviceParamStorage::unsynced(ptr)
357    }
358}
359
360impl<T: cudarc::driver::DeviceRepr> Drop for TrackedCudaSlice<T> {
361    fn drop(&mut self) {
362        match &mut self.backing {
363            Backing::Cudarc => {
364                // Debug probe (XLOG_DEBUG_POISON_FREE=1): overwrite the
365                // allocation with 0xDD before cudarc frees it, so any
366                // still-live alias of this memory reads the poison
367                // pattern instead of recycled contents. Diagnostic only;
368                // off unless the env var is set.
369                if poison_free_enabled() && self.bytes > 0 {
370                    unsafe {
371                        let _ = cudarc::driver::sys::cuMemsetD8_v2(
372                            self.raw_ptr,
373                            0xDD,
374                            self.bytes as usize,
375                        );
376                    }
377                }
378                alloc_guard_remove(self.raw_ptr);
379                // SAFETY: drop runs at most once per slice, and the
380                // inner CudaSlice<T> has not been moved out by any
381                // method (`into_bytes` consumes `self` by value and
382                // leaves the original ManuallyDrop forgotten).
383                unsafe { ManuallyDrop::drop(&mut self.inner) };
384            }
385            Backing::Runtime { runtime, block } => {
386                // Runtime owns the underlying memory. Tell it to
387                // deallocate the block; the inner `CudaSlice<T>` is
388                // a typed view that must NOT free on its own,
389                // which `ManuallyDrop` ensures by simply not
390                // calling its destructor here.
391                if let Some(block) = block.take() {
392                    let _ = runtime.deallocate(block);
393                }
394            }
395        }
396        self.manager.record_free(self.bytes);
397    }
398}
399
400impl GpuMemoryManager {
401    /// Create a new GPU memory manager
402    ///
403    /// # Arguments
404    /// * `device` - The CUDA device to allocate memory on
405    /// * `budget` - Memory budget configuration
406    pub fn new(device: Arc<CudaDevice>, budget: MemoryBudget) -> Self {
407        Self {
408            device,
409            budget,
410            allocated: AtomicU64::new(0),
411            peak: AtomicU64::new(0),
412            alloc_count: AtomicU64::new(0),
413            runtime: None,
414        }
415    }
416
417    /// Like [`new`], but additionally attaches a v0.6
418    /// [`XlogDeviceRuntime`]. The runtime mediates **both**
419    /// [`alloc::<T>`](Self::alloc) and [`alloc_raw`](Self::alloc_raw)
420    /// through the v0.6 resource stack: typed `alloc::<T>` returns a
421    /// [`TrackedCudaSlice<T>`] whose underlying memory is owned by
422    /// the runtime (typed view via cudarc's `upgrade_device_ptr::<T>`,
423    /// freed through the runtime on drop). The legacy cudarc path is
424    /// only used when the manager is built via [`new`] (no runtime
425    /// attached). Provider construction does not yet require the
426    /// runtime; callers that want runtime-routed allocations opt in
427    /// here.
428    pub fn with_runtime(
429        device: Arc<CudaDevice>,
430        budget: MemoryBudget,
431        runtime: Arc<XlogDeviceRuntime>,
432    ) -> Self {
433        Self {
434            device,
435            budget,
436            allocated: AtomicU64::new(0),
437            peak: AtomicU64::new(0),
438            alloc_count: AtomicU64::new(0),
439            runtime: Some(runtime),
440        }
441    }
442
443    /// Borrow the attached device runtime, if any. `None` when the
444    /// manager was constructed via [`new`]. Test/diagnostic
445    /// accessor; production call sites that need the runtime own
446    /// it directly.
447    pub fn runtime(&self) -> Option<&Arc<XlogDeviceRuntime>> {
448        self.runtime.as_ref()
449    }
450
451    /// Allocate GPU memory for `len` elements of type `T`
452    ///
453    /// # Arguments
454    /// * `len` - Number of elements to allocate
455    ///
456    /// # Returns
457    /// A tracked `CudaSlice<T>` containing the allocated memory
458    ///
459    /// # Errors
460    /// - `XlogError::ResourceExhausted` if allocation would exceed budget
461    /// - `XlogError::Kernel` if CUDA allocation fails
462    ///
463    /// # v0.6 routing
464    /// When the manager has an attached [`XlogDeviceRuntime`]
465    /// (constructed via [`with_runtime`]), the underlying allocation
466    /// is routed through the runtime's resource stack and a typed
467    /// view is created via cudarc's `upgrade_device_ptr::<T>` over
468    /// the runtime's raw pointer. The returned [`TrackedCudaSlice`]
469    /// frees through the runtime on drop. Without a runtime
470    /// attached, the legacy cudarc `alloc::<T>` path is used and
471    /// drop frees through cudarc — bit-for-bit identical to
472    /// pre-migration behavior.
473    pub fn alloc<T: cudarc::driver::DeviceRepr>(
474        self: &Arc<Self>,
475        len: usize,
476    ) -> Result<TrackedCudaSlice<T>> {
477        // Count every device allocation request (resettable no-host-gate counter).
478        self.alloc_count.fetch_add(1, Ordering::Relaxed);
479
480        // Fix Issue 2: Use checked_mul to prevent integer overflow before cast
481        let bytes = (len as u64)
482            .checked_mul(std::mem::size_of::<T>() as u64)
483            .ok_or_else(|| XlogError::Kernel("Allocation size overflow".to_string()))?;
484
485        // Fix Issue 1: Use compare_exchange loop to prevent TOCTOU race condition
486        // Two threads could both pass check_budget() but exceed budget together
487        loop {
488            let current = self.allocated.load(Ordering::SeqCst);
489            let new_val = current.saturating_add(bytes);
490            if new_val > self.budget.device_bytes {
491                return Err(XlogError::ResourceExhausted {
492                    context: "GPU memory allocation".to_string(),
493                    estimated_bytes: bytes,
494                    budget_bytes: self.budget.device_bytes,
495                });
496            }
497            if self
498                .allocated
499                .compare_exchange(current, new_val, Ordering::SeqCst, Ordering::SeqCst)
500                .is_ok()
501            {
502                self.peak.fetch_max(new_val, Ordering::SeqCst);
503                break;
504            }
505        }
506
507        if let Some(runtime) = &self.runtime {
508            // Zero-byte allocations (empty Vec, empty buffer) are
509            // legitimate in production code. The v0.6 resource
510            // stack rejects zero-byte requests by contract
511            // (DirectCudaResource and AsyncCudaResource both error
512            // on `bytes == 0` because `cuMemAlloc(0)` is undefined
513            // behavior in the CUDA driver). Cudarc's `alloc::<T>(0)`
514            // does the right thing — returns an empty CudaSlice<T>
515            // without calling the driver — so route zero-byte
516            // requests through the legacy path even when a runtime
517            // is attached. The resulting slice carries
518            // `Backing::Cudarc`; its drop is a no-op against
519            // cudarc's empty handle.
520            //
521            // `len == 0` and `bytes == 0` are equivalent here only
522            // if `T` has nonzero size (the common case). For
523            // zero-sized types (rare but valid in Rust) `bytes`
524            // would also be 0 regardless of `len`; the cudarc empty
525            // path handles both consistently.
526            if bytes == 0 {
527                let slice = unsafe {
528                    self.device.inner().alloc::<T>(len).map_err(|e| {
529                        self.allocated.fetch_sub(bytes, Ordering::SeqCst);
530                        XlogError::Kernel(format!("GPU allocation failed (zero-byte): {}", e))
531                    })?
532                };
533                let (raw_ptr, sync) = DevicePtr::device_ptr(&slice, slice.stream());
534                std::mem::forget(sync);
535                return Ok(TrackedCudaSlice {
536                    bytes,
537                    manager: Arc::clone(self),
538                    inner: ManuallyDrop::new(slice),
539                    raw_ptr,
540                    backing: Backing::Cudarc,
541                });
542            }
543
544            // v0.6 path: route through the runtime resource stack.
545            // Convert checked: `bytes` is u64 from
546            // `len * size_of::<T>()`, and the runtime trait surface
547            // uses `usize`. On 64-bit targets this is lossless; on
548            // 32-bit a stray `bytes as usize` would silently
549            // truncate and desync manager accounting (which still
550            // tracks the full u64) from the runtime's view. Surface
551            // the overflow as `XlogError::Kernel` and roll back the
552            // local reservation so the manager stays consistent.
553            let bytes_usize = match usize::try_from(bytes) {
554                Ok(v) => v,
555                Err(_) => {
556                    self.allocated.fetch_sub(bytes, Ordering::SeqCst);
557                    return Err(XlogError::Kernel(format!(
558                        "GPU allocation size {} bytes exceeds platform usize",
559                        bytes
560                    )));
561                }
562            };
563            let block = match runtime.allocate(bytes_usize, StreamId::DEFAULT, AllocTag::UNTAGGED) {
564                Ok(b) => b,
565                Err(e) => {
566                    self.allocated.fetch_sub(bytes, Ordering::SeqCst);
567                    return Err(map_resource_error(e));
568                }
569            };
570            let raw_ptr = block.ptr;
571            // SAFETY: `block.ptr` is a live device pointer of size
572            // `bytes` returned by the runtime; `len * size_of::<T>()`
573            // == `bytes` by construction. The resulting CudaSlice<T>
574            // is a typed view; the `Backing::Runtime` Drop branch
575            // forgets it (via ManuallyDrop + no destructor call) so
576            // cudarc never frees — the runtime's deallocate is the
577            // sole free path.
578            let typed_view = unsafe { self.device.inner().upgrade_device_ptr::<T>(raw_ptr, len) };
579            return Ok(TrackedCudaSlice {
580                bytes,
581                manager: Arc::clone(self),
582                inner: ManuallyDrop::new(typed_view),
583                raw_ptr,
584                backing: Backing::Runtime {
585                    runtime: Arc::clone(runtime),
586                    block: Some(block),
587                },
588            });
589        }
590
591        // Legacy path: cudarc allocator. SAFETY: budget reserved
592        // atomically above and the device is valid; cudarc's
593        // alloc returns properly aligned memory for type T.
594        let slice = unsafe {
595            self.device.inner().alloc::<T>(len).map_err(|e| {
596                // Rollback the allocation tracking if CUDA allocation fails
597                self.allocated.fetch_sub(bytes, Ordering::SeqCst);
598                XlogError::Kernel(format!("GPU allocation failed: {}", e))
599            })?
600        };
601        let (raw_ptr, sync) = DevicePtr::device_ptr(&slice, slice.stream());
602        std::mem::forget(sync);
603        alloc_guard_insert(raw_ptr, bytes);
604
605        // Debug probe (XLOG_DEBUG_POISON_ALLOC=1): poison fresh legacy
606        // allocations with 0xDD so any read of unwritten allocation
607        // contents becomes a deterministic, recognizable pattern
608        // instead of whatever the recycled memory held. Diagnostic
609        // only; off unless the env var is set.
610        if poison_alloc_enabled() && bytes > 0 {
611            unsafe {
612                let _ = cudarc::driver::sys::cuMemsetD8Async(
613                    raw_ptr,
614                    0xDD,
615                    bytes as usize,
616                    std::ptr::null_mut(),
617                );
618            }
619        }
620
621        Ok(TrackedCudaSlice {
622            bytes,
623            manager: Arc::clone(self),
624            inner: ManuallyDrop::new(slice),
625            raw_ptr,
626            backing: Backing::Cudarc,
627        })
628    }
629
630    /// Check if an allocation of `bytes` would exceed the budget
631    ///
632    /// # Arguments
633    /// * `bytes` - Number of bytes to allocate
634    ///
635    /// # Returns
636    /// `Ok(())` if allocation is within budget
637    ///
638    /// # Errors
639    /// `XlogError::ResourceExhausted` if allocation would exceed budget
640    pub fn check_budget(&self, bytes: u64) -> Result<()> {
641        let current = self.allocated.load(Ordering::SeqCst);
642        let proposed = current.saturating_add(bytes);
643
644        if proposed > self.budget.device_bytes {
645            return Err(XlogError::ResourceExhausted {
646                context: "GPU memory allocation".to_string(),
647                estimated_bytes: bytes,
648                budget_bytes: self.budget.device_bytes,
649            });
650        }
651
652        Ok(())
653    }
654
655    /// Get the current allocated memory in bytes
656    pub fn allocated_bytes(&self) -> u64 {
657        self.allocated.load(Ordering::SeqCst)
658    }
659
660    /// High-water mark of allocated bytes since construction or the
661    /// last [`reset_peak`](Self::reset_peak). Always ≥
662    /// [`allocated_bytes`](Self::allocated_bytes) at the moment it was
663    /// recorded. Measurement-harness API (S3 peak-memory gate).
664    pub fn peak_bytes(&self) -> u64 {
665        self.peak.load(Ordering::SeqCst)
666    }
667
668    /// Reset the peak high-water mark to the *current* allocated
669    /// level, so a measurement window starts from live state rather
670    /// than zero. Measurement-harness API.
671    pub fn reset_peak(&self) {
672        self.peak
673            .store(self.allocated.load(Ordering::SeqCst), Ordering::SeqCst);
674    }
675
676    /// Number of `alloc` calls issued so far (device allocation requests).
677    /// The GPU-resident MC engine snapshots this around the measured region to
678    /// prove `per_operator_host_allocations == 0` (all arenas pre-allocated).
679    pub fn alloc_count(&self) -> u64 {
680        self.alloc_count.load(Ordering::Relaxed)
681    }
682
683    /// Reset the allocation-request counter to zero.
684    pub fn reset_alloc_count(&self) {
685        self.alloc_count.store(0, Ordering::Relaxed);
686    }
687
688    /// Get the memory budget
689    pub fn budget(&self) -> &MemoryBudget {
690        &self.budget
691    }
692
693    /// Get the underlying CUDA device
694    pub fn device(&self) -> &Arc<CudaDevice> {
695        &self.device
696    }
697
698    /// Record that memory has been freed
699    ///
700    /// Note: cudarc automatically frees memory when CudaSlice is dropped.
701    /// This method should be called to update tracking when memory is freed.
702    pub fn record_free(&self, bytes: u64) {
703        self.allocated.fetch_sub(bytes, Ordering::SeqCst);
704    }
705
706    /// v0.6 device-runtime entry point: allocate `bytes` raw bytes
707    /// through the attached [`XlogDeviceRuntime`].
708    ///
709    /// Returns a [`RuntimeAllocBlock`] that owns the allocation. On
710    /// drop, the block deallocates through the runtime and updates
711    /// both the manager's local `allocated` counter and the
712    /// runtime's bookkeeping.
713    ///
714    /// Both budgets apply: the manager's local
715    /// `MemoryBudget::device_bytes` AND any `GlobalDeviceBudget`
716    /// stacked above the runtime's underlying resource. Either
717    /// rejecting the request returns an `XlogError`. On runtime
718    /// rejection the local reservation is rolled back so subsequent
719    /// allocations see consistent state.
720    ///
721    /// # Errors
722    /// * `XlogError::Kernel` if no runtime is attached.
723    /// * `XlogError::ResourceExhausted` if the local budget cannot
724    ///   accommodate the request.
725    /// * `XlogError::Kernel` (with the resource error rendered)
726    ///   if the runtime rejects the request — including the
727    ///   runtime's own `OutOfBudget`, which is mapped here so
728    ///   callers see a single error surface.
729    pub fn alloc_raw(self: &Arc<Self>, bytes: usize, tag: AllocTag) -> Result<RuntimeAllocBlock> {
730        let runtime = self.runtime.as_ref().ok_or_else(|| {
731            XlogError::Kernel(
732                "GpuMemoryManager::alloc_raw called without an attached XlogDeviceRuntime; \
733                 construct via with_runtime to enable runtime routing"
734                    .to_string(),
735            )
736        })?;
737
738        let bytes_u64 = bytes as u64;
739
740        // Reserve against the local budget first (preserves the
741        // pre-existing semantics for callers that mix alloc and
742        // alloc_raw under a single MemoryBudget).
743        loop {
744            let current = self.allocated.load(Ordering::SeqCst);
745            let new_val = current.saturating_add(bytes_u64);
746            if new_val > self.budget.device_bytes {
747                return Err(XlogError::ResourceExhausted {
748                    context: "GPU memory allocation (runtime path)".to_string(),
749                    estimated_bytes: bytes_u64,
750                    budget_bytes: self.budget.device_bytes,
751                });
752            }
753            if self
754                .allocated
755                .compare_exchange(current, new_val, Ordering::SeqCst, Ordering::SeqCst)
756                .is_ok()
757            {
758                self.peak.fetch_max(new_val, Ordering::SeqCst);
759                break;
760            }
761        }
762
763        // Route through the runtime. Stream is the runtime's
764        // default for now; once stream-aware kernel launches start
765        // routing through alloc_raw the caller will pass an
766        // explicit StreamId.
767        match runtime.allocate(bytes, StreamId::DEFAULT, tag) {
768            Ok(block) => Ok(RuntimeAllocBlock {
769                bytes: bytes_u64,
770                manager: Arc::clone(self),
771                runtime: Arc::clone(runtime),
772                block: Some(block),
773            }),
774            Err(e) => {
775                // Roll back local reservation; runtime did not
776                // accept the bytes.
777                self.allocated.fetch_sub(bytes_u64, Ordering::SeqCst);
778                Err(map_resource_error(e))
779            }
780        }
781    }
782
783    /// Get remaining budget in bytes
784    pub fn remaining_bytes(&self) -> u64 {
785        let allocated = self.allocated.load(Ordering::SeqCst);
786        self.budget.device_bytes.saturating_sub(allocated)
787    }
788
789    /// Reset allocation tracking
790    ///
791    /// This should be called when GPU memory has been freed but the tracker
792    /// hasn't been updated (e.g., when CudaSlice instances are dropped without
793    /// calling record_free). This is a temporary workaround until proper
794    /// RAII-based tracking is implemented.
795    pub fn reset_tracking(&self) {
796        self.allocated.store(0, Ordering::SeqCst);
797        self.peak.store(0, Ordering::SeqCst);
798    }
799}
800
801fn map_resource_error(e: ResourceError) -> XlogError {
802    match e {
803        ResourceError::OutOfBudget {
804            requested,
805            remaining,
806        } => XlogError::ResourceExhausted {
807            context: format!(
808                "device-runtime budget refused allocation ({} bytes, {} remaining)",
809                requested, remaining
810            ),
811            estimated_bytes: requested as u64,
812            budget_bytes: (requested + remaining) as u64,
813        },
814        ResourceError::Driver(msg) => XlogError::Kernel(format!("device-runtime driver: {}", msg)),
815        ResourceError::StreamMisuse(msg) => {
816            XlogError::Kernel(format!("device-runtime stream misuse: {}", msg))
817        }
818        ResourceError::UseAfterFree { generation } => XlogError::Kernel(format!(
819            "device-runtime use-after-free on generation {:?}",
820            generation
821        )),
822        ResourceError::OutOfBounds { generation } => XlogError::Kernel(format!(
823            "device-runtime out-of-bounds on generation {:?}",
824            generation
825        )),
826    }
827}
828
829/// Owned handle for a raw allocation routed through
830/// [`GpuMemoryManager::alloc_raw`] / the v0.6 device runtime.
831///
832/// Manual `Debug` impl below — the runtime / manager handles
833/// inside this struct are not `Debug`, so a derive would not
834/// compile.
835///
836/// On drop, deallocates through the runtime (returning the bytes
837/// to the runtime's bookkeeping — pending if the runtime's backend
838/// is async) and decrements the manager's local `allocated`
839/// counter. The block exposes only the raw device pointer and
840/// byte length; typed views are the caller's responsibility (this
841/// path is not yet wired into the typed `CudaSlice<T>` API — that
842/// is a follow-up slice).
843pub struct RuntimeAllocBlock {
844    bytes: u64,
845    manager: Arc<GpuMemoryManager>,
846    runtime: Arc<XlogDeviceRuntime>,
847    /// `None` after Drop fires; `Some(_)` while the block is live.
848    /// Wrapped in Option so `Drop` can move the block out and pass
849    /// it by value to `runtime.deallocate`.
850    block: Option<DeviceBlock>,
851}
852
853impl RuntimeAllocBlock {
854    /// Raw device pointer for this allocation. Live until the
855    /// block is dropped.
856    pub fn ptr(&self) -> u64 {
857        self.block
858            .as_ref()
859            .expect("RuntimeAllocBlock used after drop")
860            .ptr
861    }
862
863    /// Allocation size in bytes.
864    pub fn bytes(&self) -> usize {
865        self.bytes as usize
866    }
867
868    /// Borrow the underlying [`DeviceBlock`] metadata. Test/
869    /// diagnostic accessor.
870    pub fn device_block(&self) -> &DeviceBlock {
871        self.block
872            .as_ref()
873            .expect("RuntimeAllocBlock used after drop")
874    }
875}
876
877impl std::fmt::Debug for RuntimeAllocBlock {
878    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
879        let mut dbg = f.debug_struct("RuntimeAllocBlock");
880        dbg.field("bytes", &self.bytes);
881        match &self.block {
882            Some(b) => {
883                dbg.field("ptr", &format_args!("{:#x}", b.ptr));
884                dbg.field("device_ordinal", &b.device_ordinal);
885                dbg.field("alloc_stream", &b.alloc_stream);
886                dbg.field("tag", &b.tag);
887                dbg.field("generation", &b.generation);
888                dbg.field("state", &b.state);
889            }
890            None => {
891                dbg.field("block", &"<dropped>");
892            }
893        }
894        dbg.finish()
895    }
896}
897
898impl Drop for RuntimeAllocBlock {
899    fn drop(&mut self) {
900        if let Some(block) = self.block.take() {
901            // Runtime deallocate may queue an async free (see
902            // AsyncCudaResource); the local manager counter
903            // releases immediately because the block.bytes are
904            // no longer "live from the manager's perspective".
905            // Runtime-side bookkeeping converges after
906            // `runtime.reap_pending()`.
907            let _ = self.runtime.deallocate(block);
908            self.manager
909                .allocated
910                .fetch_sub(self.bytes, Ordering::SeqCst);
911        }
912    }
913}
914
915/// Column data stored in device memory.
916///
917/// Most columns are owned by XLOG (`Owned`) and tracked against the memory budget. Columns may
918/// also be imported via DLPack (`Dlpack`) or Arrow device (`ArrowDevice`) without copies; these are
919/// freed via the DLPack deleter or Arrow release callback.
920pub enum CudaColumn {
921    Owned(TrackedCudaSlice<u8>),
922    Dlpack(DlpackColumn),
923    ArrowDevice(ArrowDeviceColumn),
924}
925
926pub struct DlpackColumn {
927    ptr: cudarc::driver::sys::CUdeviceptr,
928    len_bytes: usize,
929    stream: Arc<CudaStream>,
930    _tensor: DlpackManagedTensor,
931    /// `Some` when this DLPack column wraps memory that xlog
932    /// itself owns through the device runtime — i.e. the
933    /// caller exported an xlog-allocated slice via DLPack and
934    /// kept ownership inside xlog. The strong reference keeps
935    /// the source slice's [`crate::device_runtime::DeviceBlock`]
936    /// reachable for runtime-block identity propagation, and
937    /// keeps the underlying allocation alive across the
938    /// DLPack handoff (drop order: column → tensor →
939    /// `source_slice` → `runtime.deallocate`).
940    ///
941    /// `None` for true external DLPack producers; those
942    /// columns continue to be rejected by strict-mode launch
943    /// recorders.
944    source_slice: Option<Arc<TrackedCudaSlice<u8>>>,
945}
946
947pub struct ArrowDeviceColumn {
948    ptr: cudarc::driver::sys::CUdeviceptr,
949    len_bytes: usize,
950    stream: Arc<CudaStream>,
951    _import: Arc<ArrowDeviceImport>,
952    /// Same role as [`DlpackColumn::source_slice`]: `Some` for
953    /// xlog-owned Arrow device columns, `None` for true
954    /// external Arrow producers.
955    source_slice: Option<Arc<TrackedCudaSlice<u8>>>,
956}
957
958impl CudaColumn {
959    pub fn owned(slice: TrackedCudaSlice<u8>) -> Self {
960        Self::Owned(slice)
961    }
962
963    pub fn dlpack(
964        ptr: cudarc::driver::sys::CUdeviceptr,
965        len_bytes: usize,
966        stream: Arc<CudaStream>,
967        tensor: DlpackManagedTensor,
968    ) -> Self {
969        Self::Dlpack(DlpackColumn {
970            ptr,
971            len_bytes,
972            stream,
973            _tensor: tensor,
974            source_slice: None,
975        })
976    }
977
978    /// Construct a DLPack column that wraps memory **xlog
979    /// itself owns** through the device runtime.
980    ///
981    /// Use this when xlog allocated `source_slice` via the
982    /// runtime-backed manager and is exporting it as a DLPack
983    /// tensor for inspection by external code while retaining
984    /// ownership. The resulting column reports
985    /// [`Self::is_external`] as `false` and
986    /// [`Self::runtime_block`] returns the slice's
987    /// [`crate::device_runtime::DeviceBlock`] — strict-mode
988    /// launch recorders will record it normally instead of
989    /// rejecting.
990    ///
991    /// True external DLPack producers (DLPack tensors handed
992    /// to xlog by another framework) must continue to use
993    /// [`Self::dlpack`].
994    pub fn dlpack_xlog_owned(
995        source_slice: Arc<TrackedCudaSlice<u8>>,
996        stream: Arc<CudaStream>,
997        tensor: DlpackManagedTensor,
998    ) -> Self {
999        let ptr = *source_slice.device_ptr();
1000        let len_bytes = source_slice.len();
1001        Self::Dlpack(DlpackColumn {
1002            ptr,
1003            len_bytes,
1004            stream,
1005            _tensor: tensor,
1006            source_slice: Some(source_slice),
1007        })
1008    }
1009
1010    pub fn arrow_device(
1011        ptr: cudarc::driver::sys::CUdeviceptr,
1012        len_bytes: usize,
1013        stream: Arc<CudaStream>,
1014        import: Arc<ArrowDeviceImport>,
1015    ) -> Self {
1016        Self::ArrowDevice(ArrowDeviceColumn {
1017            ptr,
1018            len_bytes,
1019            stream,
1020            _import: import,
1021            source_slice: None,
1022        })
1023    }
1024
1025    /// Construct an Arrow device column that wraps memory
1026    /// **xlog itself owns** through the device runtime. Same
1027    /// contract as [`Self::dlpack_xlog_owned`]: identity is
1028    /// preserved, strict recorders accept the column, and
1029    /// drop order keeps the underlying allocation alive
1030    /// through the Arrow handoff.
1031    ///
1032    /// True external Arrow device producers must continue to
1033    /// use [`Self::arrow_device`].
1034    pub fn arrow_device_xlog_owned(
1035        source_slice: Arc<TrackedCudaSlice<u8>>,
1036        stream: Arc<CudaStream>,
1037        import: Arc<ArrowDeviceImport>,
1038    ) -> Self {
1039        let ptr = *source_slice.device_ptr();
1040        let len_bytes = source_slice.len();
1041        Self::ArrowDevice(ArrowDeviceColumn {
1042            ptr,
1043            len_bytes,
1044            stream,
1045            _import: import,
1046            source_slice: Some(source_slice),
1047        })
1048    }
1049
1050    pub fn stream(&self) -> &Arc<CudaStream> {
1051        match self {
1052            CudaColumn::Owned(slice) => slice.stream(),
1053            CudaColumn::Dlpack(col) => &col.stream,
1054            CudaColumn::ArrowDevice(col) => &col.stream,
1055        }
1056    }
1057
1058    pub fn device_ptr(&self) -> &cudarc::driver::sys::CUdeviceptr {
1059        match self {
1060            CudaColumn::Owned(slice) => slice.device_ptr(),
1061            CudaColumn::Dlpack(col) => &col.ptr,
1062            CudaColumn::ArrowDevice(col) => &col.ptr,
1063        }
1064    }
1065
1066    /// Borrow the underlying [`crate::device_runtime::DeviceBlock`].
1067    ///
1068    /// Returns `Some(&block)` when xlog owns the memory through
1069    /// the runtime — `Owned` slices that were allocated via a
1070    /// runtime-backed manager, AND `Dlpack` / `ArrowDevice`
1071    /// columns constructed via the `*_xlog_owned` constructors
1072    /// (where the source slice's block is reachable through
1073    /// the retained `Arc<TrackedCudaSlice<u8>>`).
1074    ///
1075    /// Returns `None` for legacy cudarc-backed `Owned` slices
1076    /// (no runtime block exists) and for true external
1077    /// `Dlpack` / `ArrowDevice` columns (xlog never owned the
1078    /// allocation). Strict-mode launch recorders reject `None`
1079    /// returns; permissive recorders silently skip.
1080    pub fn runtime_block(&self) -> Option<&crate::device_runtime::DeviceBlock> {
1081        match self {
1082            CudaColumn::Owned(slice) => slice.runtime_block(),
1083            CudaColumn::Dlpack(col) => col.source_slice.as_ref().and_then(|s| s.runtime_block()),
1084            CudaColumn::ArrowDevice(col) => {
1085                col.source_slice.as_ref().and_then(|s| s.runtime_block())
1086            }
1087        }
1088    }
1089
1090    /// Whether this column wraps externally-managed device
1091    /// memory.
1092    ///
1093    /// Returns `true` only for `Dlpack` / `ArrowDevice` columns
1094    /// where xlog never owned the allocation (no `source_slice`).
1095    /// `Dlpack` / `ArrowDevice` columns built via
1096    /// `*_xlog_owned` constructors return `false` — xlog still
1097    /// owns the memory; the DLPack / Arrow handle is just an
1098    /// export wrapper.
1099    ///
1100    /// External memory has no xlog-side runtime identity;
1101    /// strict launch recorders reject such columns and require
1102    /// callers to coordinate cross-stream synchronization
1103    /// themselves.
1104    pub fn is_external(&self) -> bool {
1105        match self {
1106            CudaColumn::Owned(_) => false,
1107            CudaColumn::Dlpack(col) => col.source_slice.is_none(),
1108            CudaColumn::ArrowDevice(col) => col.source_slice.is_none(),
1109        }
1110    }
1111}
1112
1113impl From<TrackedCudaSlice<u8>> for CudaColumn {
1114    fn from(value: TrackedCudaSlice<u8>) -> Self {
1115        CudaColumn::Owned(value)
1116    }
1117}
1118
1119impl DeviceSlice<u8> for CudaColumn {
1120    fn len(&self) -> usize {
1121        match self {
1122            CudaColumn::Owned(slice) => slice.len(),
1123            CudaColumn::Dlpack(col) => col.len_bytes,
1124            CudaColumn::ArrowDevice(col) => col.len_bytes,
1125        }
1126    }
1127
1128    fn stream(&self) -> &Arc<CudaStream> {
1129        self.stream()
1130    }
1131}
1132
1133impl DevicePtr<u8> for CudaColumn {
1134    fn device_ptr<'a>(
1135        &'a self,
1136        stream: &'a CudaStream,
1137    ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
1138        match self {
1139            CudaColumn::Owned(slice) => DevicePtr::device_ptr(slice, stream),
1140            CudaColumn::Dlpack(col) => (col.ptr, SyncOnDrop::Sync(None)),
1141            CudaColumn::ArrowDevice(col) => (col.ptr, SyncOnDrop::Sync(None)),
1142        }
1143    }
1144}
1145
1146impl DevicePtrMut<u8> for CudaColumn {
1147    fn device_ptr_mut<'a>(
1148        &'a mut self,
1149        stream: &'a CudaStream,
1150    ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
1151        match self {
1152            CudaColumn::Owned(slice) => DevicePtrMut::device_ptr_mut(slice, stream),
1153            CudaColumn::Dlpack(col) => (col.ptr, SyncOnDrop::Sync(None)),
1154            CudaColumn::ArrowDevice(col) => (col.ptr, SyncOnDrop::Sync(None)),
1155        }
1156    }
1157}
1158
1159impl AsKernelParam for &CudaColumn {
1160    fn as_kernel_param(&self) -> *mut std::ffi::c_void {
1161        ((self.device_ptr()) as *const cudarc::driver::sys::CUdeviceptr)
1162            .cast_mut()
1163            .cast()
1164    }
1165}
1166
1167impl AsKernelParam for &mut CudaColumn {
1168    fn as_kernel_param(&self) -> *mut std::ffi::c_void {
1169        ((self.device_ptr()) as *const cudarc::driver::sys::CUdeviceptr)
1170            .cast_mut()
1171            .cast()
1172    }
1173}
1174
1175impl<'a> IntoKernelParamStorage for &'a CudaColumn {
1176    type Storage = DeviceParamStorage<'a>;
1177
1178    fn into_kernel_param_storage(self) -> Self::Storage {
1179        match self {
1180            CudaColumn::Owned(slice) => slice.into_kernel_param_storage(),
1181            CudaColumn::Dlpack(col) => DeviceParamStorage::unsynced(col.ptr),
1182            CudaColumn::ArrowDevice(col) => DeviceParamStorage::unsynced(col.ptr),
1183        }
1184    }
1185}
1186
1187impl<'a> IntoKernelParamStorage for &'a mut CudaColumn {
1188    type Storage = DeviceParamStorage<'a>;
1189
1190    fn into_kernel_param_storage(self) -> Self::Storage {
1191        match self {
1192            CudaColumn::Owned(slice) => slice.into_kernel_param_storage(),
1193            CudaColumn::Dlpack(col) => DeviceParamStorage::unsynced(col.ptr),
1194            CudaColumn::ArrowDevice(col) => DeviceParamStorage::unsynced(col.ptr),
1195        }
1196    }
1197}
1198
1199/// Column-oriented GPU buffer
1200///
1201/// Holds columnar data on the GPU with an associated schema.
1202/// Each column is stored as a separate `CudaSlice<u8>`.
1203pub struct CudaBuffer {
1204    /// Column data stored as raw bytes
1205    pub columns: Vec<CudaColumn>,
1206    /// Row capacity for allocated columns
1207    pub row_cap: u64,
1208    /// Device-resident row count (len = 1)
1209    pub d_num_rows: TrackedCudaSlice<u32>,
1210    /// Schema describing the column types
1211    pub schema: Schema,
1212    /// Cached host-side row count (u32::MAX = not yet cached).
1213    /// Avoids repeated synchronous D2H transfers for the immutable row count.
1214    cached_row_count: AtomicU32,
1215}
1216
1217impl CudaBuffer {
1218    /// Create a buffer from existing columns
1219    ///
1220    /// # Arguments
1221    /// * `columns` - Pre-allocated column data
1222    /// * `row_cap` - Row capacity for the buffer
1223    /// * `d_num_rows` - Device-resident row count
1224    /// * `schema` - Schema describing the columns
1225    ///
1226    /// # Panics
1227    /// Panics if the number of columns doesn't match the schema arity
1228    pub fn from_columns(
1229        columns: Vec<CudaColumn>,
1230        row_cap: u64,
1231        d_num_rows: TrackedCudaSlice<u32>,
1232        schema: Schema,
1233    ) -> Self {
1234        assert_eq!(
1235            columns.len(),
1236            schema.arity(),
1237            "Number of columns ({}) must match schema arity ({})",
1238            columns.len(),
1239            schema.arity()
1240        );
1241        Self {
1242            columns,
1243            row_cap,
1244            d_num_rows,
1245            schema,
1246            cached_row_count: AtomicU32::new(u32::MAX),
1247        }
1248    }
1249
1250    /// Like `from_columns`, but eagerly populates the row-count cache.
1251    /// Use when the host already knows the exact row count (e.g., `buffer_from_columns`).
1252    pub fn from_columns_with_host_count(
1253        columns: Vec<CudaColumn>,
1254        row_cap: u64,
1255        d_num_rows: TrackedCudaSlice<u32>,
1256        schema: Schema,
1257        host_row_count: u32,
1258    ) -> Self {
1259        assert_eq!(
1260            columns.len(),
1261            schema.arity(),
1262            "Number of columns ({}) must match schema arity ({})",
1263            columns.len(),
1264            schema.arity()
1265        );
1266        Self {
1267            columns,
1268            row_cap,
1269            d_num_rows,
1270            schema,
1271            cached_row_count: AtomicU32::new(host_row_count),
1272        }
1273    }
1274
1275    /// Returns the cached row count if available (not sentinel `u32::MAX`).
1276    pub fn cached_row_count(&self) -> Option<u32> {
1277        let v = self.cached_row_count.load(Ordering::Relaxed);
1278        if v == u32::MAX {
1279            None
1280        } else {
1281            Some(v)
1282        }
1283    }
1284
1285    /// Sets the cached row count if not already set (CAS from sentinel).
1286    /// No-op if already cached.
1287    pub fn set_cached_row_count_if_unset(&self, count: u32) {
1288        let _ = self.cached_row_count.compare_exchange(
1289            u32::MAX,
1290            count,
1291            Ordering::Relaxed,
1292            Ordering::Relaxed,
1293        );
1294    }
1295
1296    /// Get the row capacity
1297    pub fn num_rows(&self) -> u64 {
1298        self.row_cap
1299    }
1300
1301    /// Get the device-resident row count
1302    pub fn num_rows_device(&self) -> &TrackedCudaSlice<u32> {
1303        &self.d_num_rows
1304    }
1305
1306    /// Check if the buffer has zero row capacity
1307    pub fn is_empty(&self) -> bool {
1308        self.row_cap == 0
1309    }
1310
1311    /// Get the schema
1312    pub fn schema(&self) -> &Schema {
1313        &self.schema
1314    }
1315
1316    /// Get the number of columns (arity)
1317    pub fn arity(&self) -> usize {
1318        self.schema.arity()
1319    }
1320
1321    /// Estimated memory usage in bytes
1322    pub fn estimated_bytes(&self) -> u64 {
1323        self.row_cap * self.schema.row_size_bytes() as u64
1324    }
1325
1326    /// Get a reference to a specific column by index
1327    pub fn column(&self, index: usize) -> Option<&CudaColumn> {
1328        self.columns.get(index)
1329    }
1330}
1331
1332pub fn validate_logical_row_count(row_cap: u64, logical_rows: usize) -> Result<usize> {
1333    let row_cap_usize = usize::try_from(row_cap)
1334        .map_err(|_| XlogError::Kernel(format!("Row capacity {} exceeds usize::MAX", row_cap)))?;
1335    if logical_rows > row_cap_usize {
1336        return Err(XlogError::Kernel(format!(
1337            "Logical row count {} exceeds row capacity {}",
1338            logical_rows, row_cap
1339        )));
1340    }
1341    debug_assert!(logical_rows <= row_cap_usize);
1342    Ok(logical_rows)
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347    use super::*;
1348    use xlog_core::ScalarType;
1349
1350    fn try_device() -> Option<Arc<CudaDevice>> {
1351        match CudaDevice::new(0) {
1352            Ok(d) => Some(Arc::new(d)),
1353            Err(e) => {
1354                eprintln!("Skipping test: CUDA runtime unavailable: {}", e);
1355                None
1356            }
1357        }
1358    }
1359
1360    // Test CudaBuffer without requiring a GPU
1361    #[test]
1362    fn test_cuda_buffer_empty() {
1363        let Some(device) = try_device() else {
1364            return;
1365        };
1366        let budget = MemoryBudget::with_limit(1024 * 1024);
1367        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1368        let mut d_num_rows = manager.alloc::<u32>(1).unwrap();
1369        manager
1370            .device()
1371            .inner()
1372            .htod_sync_copy_into(&[0u32], &mut d_num_rows)
1373            .unwrap();
1374        let buffer = CudaBuffer::from_columns(Vec::new(), 0, d_num_rows, Schema::new(vec![]));
1375        assert!(buffer.is_empty());
1376        assert_eq!(buffer.num_rows(), 0);
1377        assert_eq!(buffer.arity(), 0);
1378        assert_eq!(buffer.estimated_bytes(), 0);
1379    }
1380
1381    #[test]
1382    fn test_cuda_buffer_schema() {
1383        let schema = Schema::new(vec![
1384            ("a".to_string(), ScalarType::U32),
1385            ("b".to_string(), ScalarType::U64),
1386        ]);
1387
1388        let Some(device) = try_device() else {
1389            return;
1390        };
1391        let budget = MemoryBudget::with_limit(1024 * 1024);
1392        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1393        let mut d_num_rows = manager.alloc::<u32>(1).unwrap();
1394        manager
1395            .device()
1396            .inner()
1397            .htod_sync_copy_into(&[100u32], &mut d_num_rows)
1398            .unwrap();
1399
1400        // Allocate dummy columns matching the schema arity (100 rows each)
1401        let col_a = CudaColumn::owned(manager.alloc::<u8>(100 * 4).unwrap()); // U32: 4 bytes
1402        let col_b = CudaColumn::owned(manager.alloc::<u8>(100 * 8).unwrap()); // U64: 8 bytes
1403        let buffer = CudaBuffer::from_columns(vec![col_a, col_b], 100, d_num_rows, schema.clone());
1404
1405        assert_eq!(buffer.num_rows(), 100);
1406        assert_eq!(buffer.arity(), 2);
1407        // 4 bytes (U32) + 8 bytes (U64) = 12 bytes per row * 100 rows
1408        assert_eq!(buffer.estimated_bytes(), 1200);
1409        assert_eq!(buffer.schema(), &schema);
1410    }
1411
1412    // Tests requiring GPU
1413    #[test]
1414    fn test_memory_manager_creation() {
1415        let Some(device) = try_device() else {
1416            return;
1417        };
1418        let budget = MemoryBudget::with_limit(1024 * 1024); // 1 MB
1419        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1420
1421        assert_eq!(manager.allocated_bytes(), 0);
1422        assert_eq!(manager.budget().device_bytes, 1024 * 1024);
1423        assert_eq!(manager.remaining_bytes(), 1024 * 1024);
1424    }
1425
1426    #[test]
1427    fn test_memory_manager_alloc() {
1428        let Some(device) = try_device() else {
1429            return;
1430        };
1431        let budget = MemoryBudget::with_limit(1024 * 1024); // 1 MB
1432        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1433
1434        // Allocate 256 u32 values = 1024 bytes
1435        let _slice = manager
1436            .alloc::<u32>(256)
1437            .expect("Allocation should succeed");
1438
1439        assert_eq!(manager.allocated_bytes(), 1024);
1440        assert_eq!(manager.remaining_bytes(), 1024 * 1024 - 1024);
1441    }
1442
1443    #[test]
1444    fn test_memory_manager_budget_exceeded() {
1445        let Some(device) = try_device() else {
1446            return;
1447        };
1448        let budget = MemoryBudget::with_limit(1024); // 1 KB limit
1449        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1450
1451        // Try to allocate 512 u32 values = 2048 bytes (exceeds 1KB budget)
1452        let result = manager.alloc::<u32>(512);
1453
1454        assert!(result.is_err());
1455        if let Err(XlogError::ResourceExhausted {
1456            estimated_bytes,
1457            budget_bytes,
1458            ..
1459        }) = result
1460        {
1461            assert_eq!(estimated_bytes, 2048);
1462            assert_eq!(budget_bytes, 1024);
1463        } else {
1464            panic!("Expected ResourceExhausted error");
1465        }
1466    }
1467
1468    #[test]
1469    fn test_memory_manager_check_budget() {
1470        let Some(device) = try_device() else {
1471            return;
1472        };
1473        let budget = MemoryBudget::with_limit(1000);
1474        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1475
1476        // Check that 500 bytes is within budget
1477        assert!(manager.check_budget(500).is_ok());
1478
1479        // Check that 1001 bytes exceeds budget
1480        assert!(manager.check_budget(1001).is_err());
1481    }
1482
1483    #[test]
1484    fn test_memory_manager_multiple_allocs() {
1485        let Some(device) = try_device() else {
1486            return;
1487        };
1488        let budget = MemoryBudget::with_limit(4096); // 4 KB
1489        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1490
1491        // First allocation: 256 u32 = 1024 bytes
1492        let _slice1 = manager
1493            .alloc::<u32>(256)
1494            .expect("First allocation should succeed");
1495        assert_eq!(manager.allocated_bytes(), 1024);
1496
1497        // Second allocation: 256 u32 = 1024 bytes
1498        let _slice2 = manager
1499            .alloc::<u32>(256)
1500            .expect("Second allocation should succeed");
1501        assert_eq!(manager.allocated_bytes(), 2048);
1502
1503        // Third allocation that would exceed budget
1504        let result = manager.alloc::<u32>(1024); // 4096 bytes, would make total 6144
1505        assert!(result.is_err());
1506
1507        // Allocated should still be 2048
1508        assert_eq!(manager.allocated_bytes(), 2048);
1509    }
1510
1511    #[test]
1512    fn test_memory_manager_record_free() {
1513        let Some(device) = try_device() else {
1514            return;
1515        };
1516        let budget = MemoryBudget::with_limit(4096);
1517        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1518
1519        // Allocate
1520        let slice = manager
1521            .alloc::<u32>(256)
1522            .expect("Allocation should succeed");
1523        assert_eq!(manager.allocated_bytes(), 1024);
1524
1525        // Drop should automatically update tracking
1526        drop(slice);
1527        assert_eq!(manager.allocated_bytes(), 0);
1528        assert_eq!(manager.remaining_bytes(), 4096);
1529    }
1530
1531    #[test]
1532    fn test_memory_manager_peak_tracking() {
1533        let Some(device) = try_device() else {
1534            return;
1535        };
1536        let budget = MemoryBudget::with_limit(8192);
1537        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1538
1539        let a = manager.alloc::<u32>(256).expect("alloc a"); // 1024 B
1540        let b = manager.alloc::<u32>(512).expect("alloc b"); // 2048 B
1541        assert_eq!(manager.peak_bytes(), 3072);
1542
1543        // Frees lower `allocated` but never the peak.
1544        drop(b);
1545        assert_eq!(manager.allocated_bytes(), 1024);
1546        assert_eq!(manager.peak_bytes(), 3072);
1547
1548        // reset_peak restarts the window from live state.
1549        manager.reset_peak();
1550        assert_eq!(manager.peak_bytes(), 1024);
1551
1552        let c = manager.alloc::<u32>(128).expect("alloc c"); // 512 B
1553        assert_eq!(manager.peak_bytes(), 1536);
1554
1555        drop(c);
1556        drop(a);
1557        assert_eq!(manager.allocated_bytes(), 0);
1558        assert_eq!(manager.peak_bytes(), 1536);
1559    }
1560
1561    #[test]
1562    fn test_cuda_buffer_from_columns() {
1563        let Some(device) = try_device() else {
1564            return;
1565        };
1566        let budget = MemoryBudget::with_limit(1024 * 1024);
1567        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1568
1569        let schema = Schema::new(vec![
1570            ("col1".to_string(), ScalarType::U32),
1571            ("col2".to_string(), ScalarType::U32),
1572        ]);
1573
1574        // Allocate columns (100 rows * 4 bytes = 400 bytes each)
1575        let col1 = manager.alloc::<u8>(400).expect("Alloc col1");
1576        let col2 = manager.alloc::<u8>(400).expect("Alloc col2");
1577
1578        let mut d_num_rows = manager.alloc::<u32>(1).expect("Alloc row count");
1579        manager
1580            .device()
1581            .inner()
1582            .htod_sync_copy_into(&[100u32], &mut d_num_rows)
1583            .expect("Upload row count");
1584        let buffer =
1585            CudaBuffer::from_columns(vec![col1.into(), col2.into()], 100, d_num_rows, schema);
1586
1587        assert_eq!(buffer.num_rows(), 100);
1588        assert_eq!(buffer.arity(), 2);
1589        assert!(!buffer.is_empty());
1590        assert!(buffer.column(0).is_some());
1591        assert!(buffer.column(1).is_some());
1592        assert!(buffer.column(2).is_none());
1593    }
1594
1595    #[test]
1596    fn test_cuda_buffer_from_columns_mismatch() {
1597        let schema = Schema::new(vec![
1598            ("col1".to_string(), ScalarType::U32),
1599            ("col2".to_string(), ScalarType::U32),
1600        ]);
1601
1602        let Some(device) = try_device() else {
1603            return;
1604        };
1605        let budget = MemoryBudget::with_limit(1024 * 1024);
1606        let manager = Arc::new(GpuMemoryManager::new(device, budget));
1607        let mut d_num_rows = manager.alloc::<u32>(1).expect("Alloc row count");
1608        manager
1609            .device()
1610            .inner()
1611            .htod_sync_copy_into(&[100u32], &mut d_num_rows)
1612            .expect("Upload row count");
1613
1614        // This should panic: 0 columns but schema has 2.
1615        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1616            CudaBuffer::from_columns(vec![], 100, d_num_rows, schema);
1617        }));
1618        assert!(
1619            result.is_err(),
1620            "Expected from_columns to panic on schema mismatch"
1621        );
1622    }
1623
1624    fn try_runtime() -> Option<(
1625        Arc<CudaDevice>,
1626        Arc<crate::device_runtime::XlogDeviceRuntime>,
1627    )> {
1628        use crate::device_runtime::{
1629            AsyncCudaResource, DeviceMemoryResource, GlobalDeviceBudget, StreamPool,
1630            XlogDeviceRuntime,
1631        };
1632        let device = try_device()?;
1633        let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
1634        let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
1635            AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
1636        );
1637        let budget: Box<dyn DeviceMemoryResource + Send + Sync> =
1638            Box::new(GlobalDeviceBudget::new(async_resource, 64 * 1024 * 1024));
1639        Some((
1640            Arc::clone(&device),
1641            Arc::new(XlogDeviceRuntime::with_resource(
1642                Arc::clone(&device),
1643                0,
1644                pool,
1645                budget,
1646            )),
1647        ))
1648    }
1649
1650    /// xlog-owned DLPack column constructed from a
1651    /// runtime-backed slice exposes its `DeviceBlock` via
1652    /// `runtime_block()` and reports `is_external() == false`.
1653    /// The recorder will record it normally instead of
1654    /// strict-rejecting.
1655    ///
1656    /// Uses a null-pointer `DlpackManagedTensor` purely as a
1657    /// drop-safe placeholder — the recorder never derefs the
1658    /// tensor, only the source slice.
1659    #[test]
1660    fn test_xlog_owned_dlpack_runtime_backed_carries_identity() {
1661        let Some((device, runtime)) = try_runtime() else {
1662            return;
1663        };
1664        let manager = Arc::new(GpuMemoryManager::with_runtime(
1665            Arc::clone(&device),
1666            MemoryBudget::with_limit(1024 * 1024),
1667            Arc::clone(&runtime),
1668        ));
1669        let slice = manager.alloc::<u8>(64).expect("alloc runtime-backed");
1670        assert!(slice.runtime_block().is_some());
1671        let stream = device.inner().stream().clone();
1672        // SAFETY: null-pointer DlpackManagedTensor is drop-safe
1673        // (the Drop impl checks for null before invoking the
1674        // deleter). Acceptable for a unit fixture that exercises
1675        // identity propagation, not the tensor lifecycle.
1676        let tensor = unsafe { DlpackManagedTensor::from_raw(std::ptr::null_mut()) };
1677        let col = CudaColumn::dlpack_xlog_owned(Arc::new(slice), stream, tensor);
1678        assert!(
1679            !col.is_external(),
1680            "xlog-owned DLPack column must report is_external=false"
1681        );
1682        assert!(
1683            col.runtime_block().is_some(),
1684            "xlog-owned DLPack column over a runtime-backed slice must expose runtime_block"
1685        );
1686    }
1687
1688    /// xlog-owned DLPack over a LEGACY (cudarc-backed) slice:
1689    /// `is_external()` is still false (xlog owns the
1690    /// allocation), but `runtime_block()` is None because the
1691    /// underlying slice has no runtime block. Strict recorders
1692    /// will reject with the "legacy cudarc-backed" message
1693    /// rather than the "external memory" message.
1694    #[test]
1695    fn test_xlog_owned_dlpack_legacy_backed_no_runtime_block() {
1696        let Some(device) = try_device() else {
1697            return;
1698        };
1699        let manager = Arc::new(GpuMemoryManager::new(
1700            Arc::clone(&device),
1701            MemoryBudget::with_limit(1024 * 1024),
1702        ));
1703        let slice = manager.alloc::<u8>(64).expect("alloc legacy");
1704        assert!(slice.runtime_block().is_none());
1705        let stream = device.inner().stream().clone();
1706        let tensor = unsafe { DlpackManagedTensor::from_raw(std::ptr::null_mut()) };
1707        let col = CudaColumn::dlpack_xlog_owned(Arc::new(slice), stream, tensor);
1708        assert!(
1709            !col.is_external(),
1710            "xlog-owned DLPack column is owned regardless of allocator backing"
1711        );
1712        assert!(
1713            col.runtime_block().is_none(),
1714            "legacy-backed slice has no runtime block, even when wrapped xlog-owned"
1715        );
1716    }
1717
1718    /// True external DLPack (no source_slice) — the existing
1719    /// `dlpack` constructor — keeps reporting `is_external=true`
1720    /// and `runtime_block=None`. Strict recorders reject with
1721    /// the "external memory" message.
1722    #[test]
1723    fn test_external_dlpack_remains_external() {
1724        let Some(device) = try_device() else {
1725            return;
1726        };
1727        let stream = device.inner().stream().clone();
1728        let tensor = unsafe { DlpackManagedTensor::from_raw(std::ptr::null_mut()) };
1729        // Bogus ptr/len — never dereferenced in this unit test
1730        // (we only inspect the column metadata).
1731        let col = CudaColumn::dlpack(0, 0, stream, tensor);
1732        assert!(
1733            col.is_external(),
1734            "true external DLPack column must report is_external=true"
1735        );
1736        assert!(
1737            col.runtime_block().is_none(),
1738            "true external DLPack column has no xlog-side runtime block"
1739        );
1740    }
1741
1742    /// xlog-owned Arrow device column carries identity through
1743    /// `arrow_device_xlog_owned`. Mirrors the DLPack test;
1744    /// builds a minimal `ArrowDeviceImport` from an empty
1745    /// `ArrayData`.
1746    #[test]
1747    fn test_xlog_owned_arrow_device_runtime_backed_carries_identity() {
1748        let Some((device, runtime)) = try_runtime() else {
1749            return;
1750        };
1751        let manager = Arc::new(GpuMemoryManager::with_runtime(
1752            Arc::clone(&device),
1753            MemoryBudget::with_limit(1024 * 1024),
1754            Arc::clone(&runtime),
1755        ));
1756        let slice = manager.alloc::<u8>(64).expect("alloc runtime-backed");
1757        assert!(slice.runtime_block().is_some());
1758        let stream = device.inner().stream().clone();
1759        // Synthesize a minimal ArrowDeviceImport via empty
1760        // ArrayData; Arrow is not exercised on the data path
1761        // here — the recorder only reads the column metadata.
1762        let import = Arc::new(crate::arrow_device::ArrowDeviceImport::new(
1763            arrow::array::ArrayData::new_null(&arrow::datatypes::DataType::UInt8, 0),
1764        ));
1765        let col = CudaColumn::arrow_device_xlog_owned(Arc::new(slice), stream, import);
1766        assert!(
1767            !col.is_external(),
1768            "xlog-owned Arrow device column must report is_external=false"
1769        );
1770        assert!(
1771            col.runtime_block().is_some(),
1772            "xlog-owned Arrow column over a runtime-backed slice must expose runtime_block"
1773        );
1774    }
1775
1776    /// True external Arrow device column (no source_slice)
1777    /// keeps reporting external + no runtime block.
1778    #[test]
1779    fn test_external_arrow_device_remains_external() {
1780        let Some(device) = try_device() else {
1781            return;
1782        };
1783        let stream = device.inner().stream().clone();
1784        let import = Arc::new(crate::arrow_device::ArrowDeviceImport::new(
1785            arrow::array::ArrayData::new_null(&arrow::datatypes::DataType::UInt8, 0),
1786        ));
1787        let col = CudaColumn::arrow_device(0, 0, stream, import);
1788        assert!(
1789            col.is_external(),
1790            "true external Arrow column must report is_external=true"
1791        );
1792        assert!(
1793            col.runtime_block().is_none(),
1794            "true external Arrow column has no xlog-side runtime block"
1795        );
1796    }
1797}