1use std::mem::ManuallyDrop;
7use std::ops::{Deref, DerefMut};
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10
11use cudarc::driver::{CudaSlice, CudaStream, DevicePtr, DevicePtrMut, DeviceSlice, SyncOnDrop};
12use xlog_core::{MemoryBudget, Result, Schema, XlogError};
13
14use crate::arrow_device::ArrowDeviceImport;
15use crate::cuda_compat::{AsKernelParam, DeviceParamStorage, IntoKernelParamStorage};
16use crate::device_runtime::{AllocTag, DeviceBlock, ResourceError, StreamId, XlogDeviceRuntime};
17use crate::dlpack::DlpackManagedTensor;
18use crate::CudaDevice;
19
20pub struct GpuMemoryManager {
52 device: Arc<CudaDevice>,
54 budget: MemoryBudget,
56 allocated: AtomicU64,
58 peak: AtomicU64,
63 alloc_count: AtomicU64,
68 runtime: Option<Arc<XlogDeviceRuntime>>,
73}
74
75enum Backing {
80 Cudarc,
86 Runtime {
96 runtime: Arc<XlogDeviceRuntime>,
97 block: Option<DeviceBlock>,
98 },
99}
100
101fn poison_free_enabled() -> bool {
105 static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
106 *ENABLED.get_or_init(|| std::env::var("XLOG_DEBUG_POISON_FREE").map(|v| v == "1") == Ok(true))
107}
108
109fn poison_alloc_enabled() -> bool {
113 static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
114 *ENABLED.get_or_init(|| std::env::var("XLOG_DEBUG_POISON_ALLOC").map(|v| v == "1") == Ok(true))
115}
116
117fn alloc_guard() -> Option<&'static std::sync::Mutex<std::collections::BTreeMap<u64, u64>>> {
122 static GUARD: std::sync::OnceLock<
123 Option<std::sync::Mutex<std::collections::BTreeMap<u64, u64>>>,
124 > = std::sync::OnceLock::new();
125 GUARD
126 .get_or_init(|| {
127 if std::env::var("XLOG_DEBUG_ALLOC_GUARD").map(|v| v == "1") == Ok(true) {
128 Some(std::sync::Mutex::new(std::collections::BTreeMap::new()))
129 } else {
130 None
131 }
132 })
133 .as_ref()
134}
135
136fn alloc_guard_insert(ptr: u64, bytes: u64) {
137 let Some(guard) = alloc_guard() else { return };
138 if bytes == 0 {
139 return;
140 }
141 let mut live = guard.lock().unwrap();
142 if let Some((&p, &b)) = live.range(..=ptr).next_back() {
145 if p + b > ptr {
146 panic!(
147 "ALLOC GUARD: new allocation [{:#x}, {:#x}) overlaps live [{:#x}, {:#x})",
148 ptr,
149 ptr + bytes,
150 p,
151 p + b
152 );
153 }
154 }
155 if let Some((&p, _)) = live.range(ptr + 1..).next() {
156 if ptr + bytes > p {
157 panic!(
158 "ALLOC GUARD: new allocation [{:#x}, {:#x}) overlaps live starting at {:#x}",
159 ptr,
160 ptr + bytes,
161 p
162 );
163 }
164 }
165 live.insert(ptr, bytes);
166}
167
168fn alloc_guard_remove(ptr: u64) {
169 let Some(guard) = alloc_guard() else { return };
170 guard.lock().unwrap().remove(&ptr);
171}
172
173pub struct TrackedCudaSlice<T: cudarc::driver::DeviceRepr> {
179 bytes: u64,
180 manager: Arc<GpuMemoryManager>,
181 inner: ManuallyDrop<CudaSlice<T>>,
182 raw_ptr: cudarc::driver::sys::CUdeviceptr,
183 backing: Backing,
184}
185
186impl<T: cudarc::driver::DeviceRepr> Deref for TrackedCudaSlice<T> {
187 type Target = CudaSlice<T>;
188
189 fn deref(&self) -> &Self::Target {
190 &self.inner
191 }
192}
193
194impl<T: cudarc::driver::DeviceRepr> DerefMut for TrackedCudaSlice<T> {
195 fn deref_mut(&mut self) -> &mut Self::Target {
196 &mut self.inner
197 }
198}
199
200impl<T: cudarc::driver::DeviceRepr> DeviceSlice<T> for TrackedCudaSlice<T> {
201 fn len(&self) -> usize {
202 self.inner.len()
203 }
204
205 fn stream(&self) -> &Arc<CudaStream> {
206 self.inner.stream()
207 }
208}
209
210impl<T: cudarc::driver::DeviceRepr> DevicePtr<T> for TrackedCudaSlice<T> {
211 fn device_ptr<'a>(
212 &'a self,
213 stream: &'a CudaStream,
214 ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
215 DevicePtr::device_ptr(&*self.inner, stream)
218 }
219}
220
221impl<T: cudarc::driver::DeviceRepr> DevicePtrMut<T> for TrackedCudaSlice<T> {
222 fn device_ptr_mut<'a>(
223 &'a mut self,
224 stream: &'a CudaStream,
225 ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
226 DevicePtrMut::device_ptr_mut(&mut *self.inner, stream)
227 }
228}
229
230impl<T: cudarc::driver::DeviceRepr> TrackedCudaSlice<T> {
231 pub fn device_ptr(&self) -> &cudarc::driver::sys::CUdeviceptr {
232 &self.raw_ptr
233 }
234
235 pub fn device_ptr_value(&self) -> cudarc::driver::sys::CUdeviceptr {
236 self.raw_ptr
237 }
238
239 pub fn memory_manager_ptr_value(&self) -> usize {
241 Arc::as_ptr(&self.manager) as usize
242 }
243
244 pub fn runtime_block(&self) -> Option<&crate::device_runtime::DeviceBlock> {
259 match &self.backing {
260 Backing::Cudarc => None,
261 Backing::Runtime { block, .. } => block.as_ref(),
262 }
263 }
264
265 pub fn into_bytes(self) -> TrackedCudaSlice<u8> {
274 let this = ManuallyDrop::new(self);
282 let bytes = this.bytes;
283 let manager = Arc::clone(&this.manager);
284 let ptr = this.raw_ptr;
285
286 let len_bytes: usize = bytes
287 .try_into()
288 .expect("TrackedCudaSlice byte size must fit into usize");
289
290 let backing: Backing = unsafe { std::ptr::read(&this.backing) };
296
297 let new_inner = unsafe {
308 manager
309 .device
310 .inner()
311 .upgrade_device_ptr::<u8>(ptr, len_bytes)
312 };
313
314 TrackedCudaSlice {
315 bytes,
316 manager,
317 inner: ManuallyDrop::new(new_inner),
318 raw_ptr: ptr,
319 backing,
320 }
321 }
322}
323
324impl<T: cudarc::driver::DeviceRepr> AsKernelParam for &TrackedCudaSlice<T> {
325 fn as_kernel_param(&self) -> *mut std::ffi::c_void {
326 ((*self).device_ptr() as *const cudarc::driver::sys::CUdeviceptr)
327 .cast_mut()
328 .cast()
329 }
330}
331
332impl<T: cudarc::driver::DeviceRepr> AsKernelParam for &mut TrackedCudaSlice<T> {
333 fn as_kernel_param(&self) -> *mut std::ffi::c_void {
334 ((self.device_ptr()) as *const cudarc::driver::sys::CUdeviceptr)
335 .cast_mut()
336 .cast()
337 }
338}
339
340impl<'a, T: cudarc::driver::DeviceRepr> IntoKernelParamStorage for &'a TrackedCudaSlice<T> {
341 type Storage = DeviceParamStorage<'a>;
342
343 fn into_kernel_param_storage(self) -> Self::Storage {
344 let (ptr, sync) = DevicePtr::device_ptr(&*self.inner, self.inner.stream());
345 DeviceParamStorage::synced(ptr, sync)
346 }
347}
348
349impl<T: cudarc::driver::DeviceRepr> IntoKernelParamStorage for &mut TrackedCudaSlice<T> {
350 type Storage = DeviceParamStorage<'static>;
351
352 fn into_kernel_param_storage(self) -> Self::Storage {
353 let stream = self.inner.stream().clone();
354 let (ptr, sync) = DevicePtrMut::device_ptr_mut(&mut *self.inner, &stream);
355 std::mem::forget(sync);
356 DeviceParamStorage::unsynced(ptr)
357 }
358}
359
360impl<T: cudarc::driver::DeviceRepr> Drop for TrackedCudaSlice<T> {
361 fn drop(&mut self) {
362 match &mut self.backing {
363 Backing::Cudarc => {
364 if poison_free_enabled() && self.bytes > 0 {
370 unsafe {
371 let _ = cudarc::driver::sys::cuMemsetD8_v2(
372 self.raw_ptr,
373 0xDD,
374 self.bytes as usize,
375 );
376 }
377 }
378 alloc_guard_remove(self.raw_ptr);
379 unsafe { ManuallyDrop::drop(&mut self.inner) };
384 }
385 Backing::Runtime { runtime, block } => {
386 if let Some(block) = block.take() {
392 let _ = runtime.deallocate(block);
393 }
394 }
395 }
396 self.manager.record_free(self.bytes);
397 }
398}
399
400impl GpuMemoryManager {
401 pub fn new(device: Arc<CudaDevice>, budget: MemoryBudget) -> Self {
407 Self {
408 device,
409 budget,
410 allocated: AtomicU64::new(0),
411 peak: AtomicU64::new(0),
412 alloc_count: AtomicU64::new(0),
413 runtime: None,
414 }
415 }
416
417 pub fn with_runtime(
429 device: Arc<CudaDevice>,
430 budget: MemoryBudget,
431 runtime: Arc<XlogDeviceRuntime>,
432 ) -> Self {
433 Self {
434 device,
435 budget,
436 allocated: AtomicU64::new(0),
437 peak: AtomicU64::new(0),
438 alloc_count: AtomicU64::new(0),
439 runtime: Some(runtime),
440 }
441 }
442
443 pub fn runtime(&self) -> Option<&Arc<XlogDeviceRuntime>> {
448 self.runtime.as_ref()
449 }
450
451 pub fn alloc<T: cudarc::driver::DeviceRepr>(
474 self: &Arc<Self>,
475 len: usize,
476 ) -> Result<TrackedCudaSlice<T>> {
477 self.alloc_count.fetch_add(1, Ordering::Relaxed);
479
480 let bytes = (len as u64)
482 .checked_mul(std::mem::size_of::<T>() as u64)
483 .ok_or_else(|| XlogError::Kernel("Allocation size overflow".to_string()))?;
484
485 loop {
488 let current = self.allocated.load(Ordering::SeqCst);
489 let new_val = current.saturating_add(bytes);
490 if new_val > self.budget.device_bytes {
491 return Err(XlogError::ResourceExhausted {
492 context: "GPU memory allocation".to_string(),
493 estimated_bytes: bytes,
494 budget_bytes: self.budget.device_bytes,
495 });
496 }
497 if self
498 .allocated
499 .compare_exchange(current, new_val, Ordering::SeqCst, Ordering::SeqCst)
500 .is_ok()
501 {
502 self.peak.fetch_max(new_val, Ordering::SeqCst);
503 break;
504 }
505 }
506
507 if let Some(runtime) = &self.runtime {
508 if bytes == 0 {
527 let slice = unsafe {
528 self.device.inner().alloc::<T>(len).map_err(|e| {
529 self.allocated.fetch_sub(bytes, Ordering::SeqCst);
530 XlogError::Kernel(format!("GPU allocation failed (zero-byte): {}", e))
531 })?
532 };
533 let (raw_ptr, sync) = DevicePtr::device_ptr(&slice, slice.stream());
534 std::mem::forget(sync);
535 return Ok(TrackedCudaSlice {
536 bytes,
537 manager: Arc::clone(self),
538 inner: ManuallyDrop::new(slice),
539 raw_ptr,
540 backing: Backing::Cudarc,
541 });
542 }
543
544 let bytes_usize = match usize::try_from(bytes) {
554 Ok(v) => v,
555 Err(_) => {
556 self.allocated.fetch_sub(bytes, Ordering::SeqCst);
557 return Err(XlogError::Kernel(format!(
558 "GPU allocation size {} bytes exceeds platform usize",
559 bytes
560 )));
561 }
562 };
563 let block = match runtime.allocate(bytes_usize, StreamId::DEFAULT, AllocTag::UNTAGGED) {
564 Ok(b) => b,
565 Err(e) => {
566 self.allocated.fetch_sub(bytes, Ordering::SeqCst);
567 return Err(map_resource_error(e));
568 }
569 };
570 let raw_ptr = block.ptr;
571 let typed_view = unsafe { self.device.inner().upgrade_device_ptr::<T>(raw_ptr, len) };
579 return Ok(TrackedCudaSlice {
580 bytes,
581 manager: Arc::clone(self),
582 inner: ManuallyDrop::new(typed_view),
583 raw_ptr,
584 backing: Backing::Runtime {
585 runtime: Arc::clone(runtime),
586 block: Some(block),
587 },
588 });
589 }
590
591 let slice = unsafe {
595 self.device.inner().alloc::<T>(len).map_err(|e| {
596 self.allocated.fetch_sub(bytes, Ordering::SeqCst);
598 XlogError::Kernel(format!("GPU allocation failed: {}", e))
599 })?
600 };
601 let (raw_ptr, sync) = DevicePtr::device_ptr(&slice, slice.stream());
602 std::mem::forget(sync);
603 alloc_guard_insert(raw_ptr, bytes);
604
605 if poison_alloc_enabled() && bytes > 0 {
611 unsafe {
612 let _ = cudarc::driver::sys::cuMemsetD8Async(
613 raw_ptr,
614 0xDD,
615 bytes as usize,
616 std::ptr::null_mut(),
617 );
618 }
619 }
620
621 Ok(TrackedCudaSlice {
622 bytes,
623 manager: Arc::clone(self),
624 inner: ManuallyDrop::new(slice),
625 raw_ptr,
626 backing: Backing::Cudarc,
627 })
628 }
629
630 pub fn check_budget(&self, bytes: u64) -> Result<()> {
641 let current = self.allocated.load(Ordering::SeqCst);
642 let proposed = current.saturating_add(bytes);
643
644 if proposed > self.budget.device_bytes {
645 return Err(XlogError::ResourceExhausted {
646 context: "GPU memory allocation".to_string(),
647 estimated_bytes: bytes,
648 budget_bytes: self.budget.device_bytes,
649 });
650 }
651
652 Ok(())
653 }
654
655 pub fn allocated_bytes(&self) -> u64 {
657 self.allocated.load(Ordering::SeqCst)
658 }
659
660 pub fn peak_bytes(&self) -> u64 {
665 self.peak.load(Ordering::SeqCst)
666 }
667
668 pub fn reset_peak(&self) {
672 self.peak
673 .store(self.allocated.load(Ordering::SeqCst), Ordering::SeqCst);
674 }
675
676 pub fn alloc_count(&self) -> u64 {
680 self.alloc_count.load(Ordering::Relaxed)
681 }
682
683 pub fn reset_alloc_count(&self) {
685 self.alloc_count.store(0, Ordering::Relaxed);
686 }
687
688 pub fn budget(&self) -> &MemoryBudget {
690 &self.budget
691 }
692
693 pub fn device(&self) -> &Arc<CudaDevice> {
695 &self.device
696 }
697
698 pub fn record_free(&self, bytes: u64) {
703 self.allocated.fetch_sub(bytes, Ordering::SeqCst);
704 }
705
706 pub fn alloc_raw(self: &Arc<Self>, bytes: usize, tag: AllocTag) -> Result<RuntimeAllocBlock> {
730 let runtime = self.runtime.as_ref().ok_or_else(|| {
731 XlogError::Kernel(
732 "GpuMemoryManager::alloc_raw called without an attached XlogDeviceRuntime; \
733 construct via with_runtime to enable runtime routing"
734 .to_string(),
735 )
736 })?;
737
738 let bytes_u64 = bytes as u64;
739
740 loop {
744 let current = self.allocated.load(Ordering::SeqCst);
745 let new_val = current.saturating_add(bytes_u64);
746 if new_val > self.budget.device_bytes {
747 return Err(XlogError::ResourceExhausted {
748 context: "GPU memory allocation (runtime path)".to_string(),
749 estimated_bytes: bytes_u64,
750 budget_bytes: self.budget.device_bytes,
751 });
752 }
753 if self
754 .allocated
755 .compare_exchange(current, new_val, Ordering::SeqCst, Ordering::SeqCst)
756 .is_ok()
757 {
758 self.peak.fetch_max(new_val, Ordering::SeqCst);
759 break;
760 }
761 }
762
763 match runtime.allocate(bytes, StreamId::DEFAULT, tag) {
768 Ok(block) => Ok(RuntimeAllocBlock {
769 bytes: bytes_u64,
770 manager: Arc::clone(self),
771 runtime: Arc::clone(runtime),
772 block: Some(block),
773 }),
774 Err(e) => {
775 self.allocated.fetch_sub(bytes_u64, Ordering::SeqCst);
778 Err(map_resource_error(e))
779 }
780 }
781 }
782
783 pub fn remaining_bytes(&self) -> u64 {
785 let allocated = self.allocated.load(Ordering::SeqCst);
786 self.budget.device_bytes.saturating_sub(allocated)
787 }
788
789 pub fn reset_tracking(&self) {
796 self.allocated.store(0, Ordering::SeqCst);
797 self.peak.store(0, Ordering::SeqCst);
798 }
799}
800
801fn map_resource_error(e: ResourceError) -> XlogError {
802 match e {
803 ResourceError::OutOfBudget {
804 requested,
805 remaining,
806 } => XlogError::ResourceExhausted {
807 context: format!(
808 "device-runtime budget refused allocation ({} bytes, {} remaining)",
809 requested, remaining
810 ),
811 estimated_bytes: requested as u64,
812 budget_bytes: (requested + remaining) as u64,
813 },
814 ResourceError::Driver(msg) => XlogError::Kernel(format!("device-runtime driver: {}", msg)),
815 ResourceError::StreamMisuse(msg) => {
816 XlogError::Kernel(format!("device-runtime stream misuse: {}", msg))
817 }
818 ResourceError::UseAfterFree { generation } => XlogError::Kernel(format!(
819 "device-runtime use-after-free on generation {:?}",
820 generation
821 )),
822 ResourceError::OutOfBounds { generation } => XlogError::Kernel(format!(
823 "device-runtime out-of-bounds on generation {:?}",
824 generation
825 )),
826 }
827}
828
829pub struct RuntimeAllocBlock {
844 bytes: u64,
845 manager: Arc<GpuMemoryManager>,
846 runtime: Arc<XlogDeviceRuntime>,
847 block: Option<DeviceBlock>,
851}
852
853impl RuntimeAllocBlock {
854 pub fn ptr(&self) -> u64 {
857 self.block
858 .as_ref()
859 .expect("RuntimeAllocBlock used after drop")
860 .ptr
861 }
862
863 pub fn bytes(&self) -> usize {
865 self.bytes as usize
866 }
867
868 pub fn device_block(&self) -> &DeviceBlock {
871 self.block
872 .as_ref()
873 .expect("RuntimeAllocBlock used after drop")
874 }
875}
876
877impl std::fmt::Debug for RuntimeAllocBlock {
878 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
879 let mut dbg = f.debug_struct("RuntimeAllocBlock");
880 dbg.field("bytes", &self.bytes);
881 match &self.block {
882 Some(b) => {
883 dbg.field("ptr", &format_args!("{:#x}", b.ptr));
884 dbg.field("device_ordinal", &b.device_ordinal);
885 dbg.field("alloc_stream", &b.alloc_stream);
886 dbg.field("tag", &b.tag);
887 dbg.field("generation", &b.generation);
888 dbg.field("state", &b.state);
889 }
890 None => {
891 dbg.field("block", &"<dropped>");
892 }
893 }
894 dbg.finish()
895 }
896}
897
898impl Drop for RuntimeAllocBlock {
899 fn drop(&mut self) {
900 if let Some(block) = self.block.take() {
901 let _ = self.runtime.deallocate(block);
908 self.manager
909 .allocated
910 .fetch_sub(self.bytes, Ordering::SeqCst);
911 }
912 }
913}
914
915pub enum CudaColumn {
921 Owned(TrackedCudaSlice<u8>),
922 Dlpack(DlpackColumn),
923 ArrowDevice(ArrowDeviceColumn),
924}
925
926pub struct DlpackColumn {
927 ptr: cudarc::driver::sys::CUdeviceptr,
928 len_bytes: usize,
929 stream: Arc<CudaStream>,
930 _tensor: DlpackManagedTensor,
931 source_slice: Option<Arc<TrackedCudaSlice<u8>>>,
945}
946
947pub struct ArrowDeviceColumn {
948 ptr: cudarc::driver::sys::CUdeviceptr,
949 len_bytes: usize,
950 stream: Arc<CudaStream>,
951 _import: Arc<ArrowDeviceImport>,
952 source_slice: Option<Arc<TrackedCudaSlice<u8>>>,
956}
957
958impl CudaColumn {
959 pub fn owned(slice: TrackedCudaSlice<u8>) -> Self {
960 Self::Owned(slice)
961 }
962
963 pub fn dlpack(
964 ptr: cudarc::driver::sys::CUdeviceptr,
965 len_bytes: usize,
966 stream: Arc<CudaStream>,
967 tensor: DlpackManagedTensor,
968 ) -> Self {
969 Self::Dlpack(DlpackColumn {
970 ptr,
971 len_bytes,
972 stream,
973 _tensor: tensor,
974 source_slice: None,
975 })
976 }
977
978 pub fn dlpack_xlog_owned(
995 source_slice: Arc<TrackedCudaSlice<u8>>,
996 stream: Arc<CudaStream>,
997 tensor: DlpackManagedTensor,
998 ) -> Self {
999 let ptr = *source_slice.device_ptr();
1000 let len_bytes = source_slice.len();
1001 Self::Dlpack(DlpackColumn {
1002 ptr,
1003 len_bytes,
1004 stream,
1005 _tensor: tensor,
1006 source_slice: Some(source_slice),
1007 })
1008 }
1009
1010 pub fn arrow_device(
1011 ptr: cudarc::driver::sys::CUdeviceptr,
1012 len_bytes: usize,
1013 stream: Arc<CudaStream>,
1014 import: Arc<ArrowDeviceImport>,
1015 ) -> Self {
1016 Self::ArrowDevice(ArrowDeviceColumn {
1017 ptr,
1018 len_bytes,
1019 stream,
1020 _import: import,
1021 source_slice: None,
1022 })
1023 }
1024
1025 pub fn arrow_device_xlog_owned(
1035 source_slice: Arc<TrackedCudaSlice<u8>>,
1036 stream: Arc<CudaStream>,
1037 import: Arc<ArrowDeviceImport>,
1038 ) -> Self {
1039 let ptr = *source_slice.device_ptr();
1040 let len_bytes = source_slice.len();
1041 Self::ArrowDevice(ArrowDeviceColumn {
1042 ptr,
1043 len_bytes,
1044 stream,
1045 _import: import,
1046 source_slice: Some(source_slice),
1047 })
1048 }
1049
1050 pub fn stream(&self) -> &Arc<CudaStream> {
1051 match self {
1052 CudaColumn::Owned(slice) => slice.stream(),
1053 CudaColumn::Dlpack(col) => &col.stream,
1054 CudaColumn::ArrowDevice(col) => &col.stream,
1055 }
1056 }
1057
1058 pub fn device_ptr(&self) -> &cudarc::driver::sys::CUdeviceptr {
1059 match self {
1060 CudaColumn::Owned(slice) => slice.device_ptr(),
1061 CudaColumn::Dlpack(col) => &col.ptr,
1062 CudaColumn::ArrowDevice(col) => &col.ptr,
1063 }
1064 }
1065
1066 pub fn runtime_block(&self) -> Option<&crate::device_runtime::DeviceBlock> {
1081 match self {
1082 CudaColumn::Owned(slice) => slice.runtime_block(),
1083 CudaColumn::Dlpack(col) => col.source_slice.as_ref().and_then(|s| s.runtime_block()),
1084 CudaColumn::ArrowDevice(col) => {
1085 col.source_slice.as_ref().and_then(|s| s.runtime_block())
1086 }
1087 }
1088 }
1089
1090 pub fn is_external(&self) -> bool {
1105 match self {
1106 CudaColumn::Owned(_) => false,
1107 CudaColumn::Dlpack(col) => col.source_slice.is_none(),
1108 CudaColumn::ArrowDevice(col) => col.source_slice.is_none(),
1109 }
1110 }
1111}
1112
1113impl From<TrackedCudaSlice<u8>> for CudaColumn {
1114 fn from(value: TrackedCudaSlice<u8>) -> Self {
1115 CudaColumn::Owned(value)
1116 }
1117}
1118
1119impl DeviceSlice<u8> for CudaColumn {
1120 fn len(&self) -> usize {
1121 match self {
1122 CudaColumn::Owned(slice) => slice.len(),
1123 CudaColumn::Dlpack(col) => col.len_bytes,
1124 CudaColumn::ArrowDevice(col) => col.len_bytes,
1125 }
1126 }
1127
1128 fn stream(&self) -> &Arc<CudaStream> {
1129 self.stream()
1130 }
1131}
1132
1133impl DevicePtr<u8> for CudaColumn {
1134 fn device_ptr<'a>(
1135 &'a self,
1136 stream: &'a CudaStream,
1137 ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
1138 match self {
1139 CudaColumn::Owned(slice) => DevicePtr::device_ptr(slice, stream),
1140 CudaColumn::Dlpack(col) => (col.ptr, SyncOnDrop::Sync(None)),
1141 CudaColumn::ArrowDevice(col) => (col.ptr, SyncOnDrop::Sync(None)),
1142 }
1143 }
1144}
1145
1146impl DevicePtrMut<u8> for CudaColumn {
1147 fn device_ptr_mut<'a>(
1148 &'a mut self,
1149 stream: &'a CudaStream,
1150 ) -> (cudarc::driver::sys::CUdeviceptr, SyncOnDrop<'a>) {
1151 match self {
1152 CudaColumn::Owned(slice) => DevicePtrMut::device_ptr_mut(slice, stream),
1153 CudaColumn::Dlpack(col) => (col.ptr, SyncOnDrop::Sync(None)),
1154 CudaColumn::ArrowDevice(col) => (col.ptr, SyncOnDrop::Sync(None)),
1155 }
1156 }
1157}
1158
1159impl AsKernelParam for &CudaColumn {
1160 fn as_kernel_param(&self) -> *mut std::ffi::c_void {
1161 ((self.device_ptr()) as *const cudarc::driver::sys::CUdeviceptr)
1162 .cast_mut()
1163 .cast()
1164 }
1165}
1166
1167impl AsKernelParam for &mut CudaColumn {
1168 fn as_kernel_param(&self) -> *mut std::ffi::c_void {
1169 ((self.device_ptr()) as *const cudarc::driver::sys::CUdeviceptr)
1170 .cast_mut()
1171 .cast()
1172 }
1173}
1174
1175impl<'a> IntoKernelParamStorage for &'a CudaColumn {
1176 type Storage = DeviceParamStorage<'a>;
1177
1178 fn into_kernel_param_storage(self) -> Self::Storage {
1179 match self {
1180 CudaColumn::Owned(slice) => slice.into_kernel_param_storage(),
1181 CudaColumn::Dlpack(col) => DeviceParamStorage::unsynced(col.ptr),
1182 CudaColumn::ArrowDevice(col) => DeviceParamStorage::unsynced(col.ptr),
1183 }
1184 }
1185}
1186
1187impl<'a> IntoKernelParamStorage for &'a mut CudaColumn {
1188 type Storage = DeviceParamStorage<'a>;
1189
1190 fn into_kernel_param_storage(self) -> Self::Storage {
1191 match self {
1192 CudaColumn::Owned(slice) => slice.into_kernel_param_storage(),
1193 CudaColumn::Dlpack(col) => DeviceParamStorage::unsynced(col.ptr),
1194 CudaColumn::ArrowDevice(col) => DeviceParamStorage::unsynced(col.ptr),
1195 }
1196 }
1197}
1198
1199pub struct CudaBuffer {
1204 pub columns: Vec<CudaColumn>,
1206 pub row_cap: u64,
1208 pub d_num_rows: TrackedCudaSlice<u32>,
1210 pub schema: Schema,
1212 cached_row_count: AtomicU32,
1215}
1216
1217impl CudaBuffer {
1218 pub fn from_columns(
1229 columns: Vec<CudaColumn>,
1230 row_cap: u64,
1231 d_num_rows: TrackedCudaSlice<u32>,
1232 schema: Schema,
1233 ) -> Self {
1234 assert_eq!(
1235 columns.len(),
1236 schema.arity(),
1237 "Number of columns ({}) must match schema arity ({})",
1238 columns.len(),
1239 schema.arity()
1240 );
1241 Self {
1242 columns,
1243 row_cap,
1244 d_num_rows,
1245 schema,
1246 cached_row_count: AtomicU32::new(u32::MAX),
1247 }
1248 }
1249
1250 pub fn from_columns_with_host_count(
1253 columns: Vec<CudaColumn>,
1254 row_cap: u64,
1255 d_num_rows: TrackedCudaSlice<u32>,
1256 schema: Schema,
1257 host_row_count: u32,
1258 ) -> Self {
1259 assert_eq!(
1260 columns.len(),
1261 schema.arity(),
1262 "Number of columns ({}) must match schema arity ({})",
1263 columns.len(),
1264 schema.arity()
1265 );
1266 Self {
1267 columns,
1268 row_cap,
1269 d_num_rows,
1270 schema,
1271 cached_row_count: AtomicU32::new(host_row_count),
1272 }
1273 }
1274
1275 pub fn cached_row_count(&self) -> Option<u32> {
1277 let v = self.cached_row_count.load(Ordering::Relaxed);
1278 if v == u32::MAX {
1279 None
1280 } else {
1281 Some(v)
1282 }
1283 }
1284
1285 pub fn set_cached_row_count_if_unset(&self, count: u32) {
1288 let _ = self.cached_row_count.compare_exchange(
1289 u32::MAX,
1290 count,
1291 Ordering::Relaxed,
1292 Ordering::Relaxed,
1293 );
1294 }
1295
1296 pub fn num_rows(&self) -> u64 {
1298 self.row_cap
1299 }
1300
1301 pub fn num_rows_device(&self) -> &TrackedCudaSlice<u32> {
1303 &self.d_num_rows
1304 }
1305
1306 pub fn is_empty(&self) -> bool {
1308 self.row_cap == 0
1309 }
1310
1311 pub fn schema(&self) -> &Schema {
1313 &self.schema
1314 }
1315
1316 pub fn arity(&self) -> usize {
1318 self.schema.arity()
1319 }
1320
1321 pub fn estimated_bytes(&self) -> u64 {
1323 self.row_cap * self.schema.row_size_bytes() as u64
1324 }
1325
1326 pub fn column(&self, index: usize) -> Option<&CudaColumn> {
1328 self.columns.get(index)
1329 }
1330}
1331
1332pub fn validate_logical_row_count(row_cap: u64, logical_rows: usize) -> Result<usize> {
1333 let row_cap_usize = usize::try_from(row_cap)
1334 .map_err(|_| XlogError::Kernel(format!("Row capacity {} exceeds usize::MAX", row_cap)))?;
1335 if logical_rows > row_cap_usize {
1336 return Err(XlogError::Kernel(format!(
1337 "Logical row count {} exceeds row capacity {}",
1338 logical_rows, row_cap
1339 )));
1340 }
1341 debug_assert!(logical_rows <= row_cap_usize);
1342 Ok(logical_rows)
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347 use super::*;
1348 use xlog_core::ScalarType;
1349
1350 fn try_device() -> Option<Arc<CudaDevice>> {
1351 match CudaDevice::new(0) {
1352 Ok(d) => Some(Arc::new(d)),
1353 Err(e) => {
1354 eprintln!("Skipping test: CUDA runtime unavailable: {}", e);
1355 None
1356 }
1357 }
1358 }
1359
1360 #[test]
1362 fn test_cuda_buffer_empty() {
1363 let Some(device) = try_device() else {
1364 return;
1365 };
1366 let budget = MemoryBudget::with_limit(1024 * 1024);
1367 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1368 let mut d_num_rows = manager.alloc::<u32>(1).unwrap();
1369 manager
1370 .device()
1371 .inner()
1372 .htod_sync_copy_into(&[0u32], &mut d_num_rows)
1373 .unwrap();
1374 let buffer = CudaBuffer::from_columns(Vec::new(), 0, d_num_rows, Schema::new(vec![]));
1375 assert!(buffer.is_empty());
1376 assert_eq!(buffer.num_rows(), 0);
1377 assert_eq!(buffer.arity(), 0);
1378 assert_eq!(buffer.estimated_bytes(), 0);
1379 }
1380
1381 #[test]
1382 fn test_cuda_buffer_schema() {
1383 let schema = Schema::new(vec![
1384 ("a".to_string(), ScalarType::U32),
1385 ("b".to_string(), ScalarType::U64),
1386 ]);
1387
1388 let Some(device) = try_device() else {
1389 return;
1390 };
1391 let budget = MemoryBudget::with_limit(1024 * 1024);
1392 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1393 let mut d_num_rows = manager.alloc::<u32>(1).unwrap();
1394 manager
1395 .device()
1396 .inner()
1397 .htod_sync_copy_into(&[100u32], &mut d_num_rows)
1398 .unwrap();
1399
1400 let col_a = CudaColumn::owned(manager.alloc::<u8>(100 * 4).unwrap()); let col_b = CudaColumn::owned(manager.alloc::<u8>(100 * 8).unwrap()); let buffer = CudaBuffer::from_columns(vec![col_a, col_b], 100, d_num_rows, schema.clone());
1404
1405 assert_eq!(buffer.num_rows(), 100);
1406 assert_eq!(buffer.arity(), 2);
1407 assert_eq!(buffer.estimated_bytes(), 1200);
1409 assert_eq!(buffer.schema(), &schema);
1410 }
1411
1412 #[test]
1414 fn test_memory_manager_creation() {
1415 let Some(device) = try_device() else {
1416 return;
1417 };
1418 let budget = MemoryBudget::with_limit(1024 * 1024); let manager = Arc::new(GpuMemoryManager::new(device, budget));
1420
1421 assert_eq!(manager.allocated_bytes(), 0);
1422 assert_eq!(manager.budget().device_bytes, 1024 * 1024);
1423 assert_eq!(manager.remaining_bytes(), 1024 * 1024);
1424 }
1425
1426 #[test]
1427 fn test_memory_manager_alloc() {
1428 let Some(device) = try_device() else {
1429 return;
1430 };
1431 let budget = MemoryBudget::with_limit(1024 * 1024); let manager = Arc::new(GpuMemoryManager::new(device, budget));
1433
1434 let _slice = manager
1436 .alloc::<u32>(256)
1437 .expect("Allocation should succeed");
1438
1439 assert_eq!(manager.allocated_bytes(), 1024);
1440 assert_eq!(manager.remaining_bytes(), 1024 * 1024 - 1024);
1441 }
1442
1443 #[test]
1444 fn test_memory_manager_budget_exceeded() {
1445 let Some(device) = try_device() else {
1446 return;
1447 };
1448 let budget = MemoryBudget::with_limit(1024); let manager = Arc::new(GpuMemoryManager::new(device, budget));
1450
1451 let result = manager.alloc::<u32>(512);
1453
1454 assert!(result.is_err());
1455 if let Err(XlogError::ResourceExhausted {
1456 estimated_bytes,
1457 budget_bytes,
1458 ..
1459 }) = result
1460 {
1461 assert_eq!(estimated_bytes, 2048);
1462 assert_eq!(budget_bytes, 1024);
1463 } else {
1464 panic!("Expected ResourceExhausted error");
1465 }
1466 }
1467
1468 #[test]
1469 fn test_memory_manager_check_budget() {
1470 let Some(device) = try_device() else {
1471 return;
1472 };
1473 let budget = MemoryBudget::with_limit(1000);
1474 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1475
1476 assert!(manager.check_budget(500).is_ok());
1478
1479 assert!(manager.check_budget(1001).is_err());
1481 }
1482
1483 #[test]
1484 fn test_memory_manager_multiple_allocs() {
1485 let Some(device) = try_device() else {
1486 return;
1487 };
1488 let budget = MemoryBudget::with_limit(4096); let manager = Arc::new(GpuMemoryManager::new(device, budget));
1490
1491 let _slice1 = manager
1493 .alloc::<u32>(256)
1494 .expect("First allocation should succeed");
1495 assert_eq!(manager.allocated_bytes(), 1024);
1496
1497 let _slice2 = manager
1499 .alloc::<u32>(256)
1500 .expect("Second allocation should succeed");
1501 assert_eq!(manager.allocated_bytes(), 2048);
1502
1503 let result = manager.alloc::<u32>(1024); assert!(result.is_err());
1506
1507 assert_eq!(manager.allocated_bytes(), 2048);
1509 }
1510
1511 #[test]
1512 fn test_memory_manager_record_free() {
1513 let Some(device) = try_device() else {
1514 return;
1515 };
1516 let budget = MemoryBudget::with_limit(4096);
1517 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1518
1519 let slice = manager
1521 .alloc::<u32>(256)
1522 .expect("Allocation should succeed");
1523 assert_eq!(manager.allocated_bytes(), 1024);
1524
1525 drop(slice);
1527 assert_eq!(manager.allocated_bytes(), 0);
1528 assert_eq!(manager.remaining_bytes(), 4096);
1529 }
1530
1531 #[test]
1532 fn test_memory_manager_peak_tracking() {
1533 let Some(device) = try_device() else {
1534 return;
1535 };
1536 let budget = MemoryBudget::with_limit(8192);
1537 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1538
1539 let a = manager.alloc::<u32>(256).expect("alloc a"); let b = manager.alloc::<u32>(512).expect("alloc b"); assert_eq!(manager.peak_bytes(), 3072);
1542
1543 drop(b);
1545 assert_eq!(manager.allocated_bytes(), 1024);
1546 assert_eq!(manager.peak_bytes(), 3072);
1547
1548 manager.reset_peak();
1550 assert_eq!(manager.peak_bytes(), 1024);
1551
1552 let c = manager.alloc::<u32>(128).expect("alloc c"); assert_eq!(manager.peak_bytes(), 1536);
1554
1555 drop(c);
1556 drop(a);
1557 assert_eq!(manager.allocated_bytes(), 0);
1558 assert_eq!(manager.peak_bytes(), 1536);
1559 }
1560
1561 #[test]
1562 fn test_cuda_buffer_from_columns() {
1563 let Some(device) = try_device() else {
1564 return;
1565 };
1566 let budget = MemoryBudget::with_limit(1024 * 1024);
1567 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1568
1569 let schema = Schema::new(vec![
1570 ("col1".to_string(), ScalarType::U32),
1571 ("col2".to_string(), ScalarType::U32),
1572 ]);
1573
1574 let col1 = manager.alloc::<u8>(400).expect("Alloc col1");
1576 let col2 = manager.alloc::<u8>(400).expect("Alloc col2");
1577
1578 let mut d_num_rows = manager.alloc::<u32>(1).expect("Alloc row count");
1579 manager
1580 .device()
1581 .inner()
1582 .htod_sync_copy_into(&[100u32], &mut d_num_rows)
1583 .expect("Upload row count");
1584 let buffer =
1585 CudaBuffer::from_columns(vec![col1.into(), col2.into()], 100, d_num_rows, schema);
1586
1587 assert_eq!(buffer.num_rows(), 100);
1588 assert_eq!(buffer.arity(), 2);
1589 assert!(!buffer.is_empty());
1590 assert!(buffer.column(0).is_some());
1591 assert!(buffer.column(1).is_some());
1592 assert!(buffer.column(2).is_none());
1593 }
1594
1595 #[test]
1596 fn test_cuda_buffer_from_columns_mismatch() {
1597 let schema = Schema::new(vec![
1598 ("col1".to_string(), ScalarType::U32),
1599 ("col2".to_string(), ScalarType::U32),
1600 ]);
1601
1602 let Some(device) = try_device() else {
1603 return;
1604 };
1605 let budget = MemoryBudget::with_limit(1024 * 1024);
1606 let manager = Arc::new(GpuMemoryManager::new(device, budget));
1607 let mut d_num_rows = manager.alloc::<u32>(1).expect("Alloc row count");
1608 manager
1609 .device()
1610 .inner()
1611 .htod_sync_copy_into(&[100u32], &mut d_num_rows)
1612 .expect("Upload row count");
1613
1614 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1616 CudaBuffer::from_columns(vec![], 100, d_num_rows, schema);
1617 }));
1618 assert!(
1619 result.is_err(),
1620 "Expected from_columns to panic on schema mismatch"
1621 );
1622 }
1623
1624 fn try_runtime() -> Option<(
1625 Arc<CudaDevice>,
1626 Arc<crate::device_runtime::XlogDeviceRuntime>,
1627 )> {
1628 use crate::device_runtime::{
1629 AsyncCudaResource, DeviceMemoryResource, GlobalDeviceBudget, StreamPool,
1630 XlogDeviceRuntime,
1631 };
1632 let device = try_device()?;
1633 let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
1634 let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
1635 AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
1636 );
1637 let budget: Box<dyn DeviceMemoryResource + Send + Sync> =
1638 Box::new(GlobalDeviceBudget::new(async_resource, 64 * 1024 * 1024));
1639 Some((
1640 Arc::clone(&device),
1641 Arc::new(XlogDeviceRuntime::with_resource(
1642 Arc::clone(&device),
1643 0,
1644 pool,
1645 budget,
1646 )),
1647 ))
1648 }
1649
1650 #[test]
1660 fn test_xlog_owned_dlpack_runtime_backed_carries_identity() {
1661 let Some((device, runtime)) = try_runtime() else {
1662 return;
1663 };
1664 let manager = Arc::new(GpuMemoryManager::with_runtime(
1665 Arc::clone(&device),
1666 MemoryBudget::with_limit(1024 * 1024),
1667 Arc::clone(&runtime),
1668 ));
1669 let slice = manager.alloc::<u8>(64).expect("alloc runtime-backed");
1670 assert!(slice.runtime_block().is_some());
1671 let stream = device.inner().stream().clone();
1672 let tensor = unsafe { DlpackManagedTensor::from_raw(std::ptr::null_mut()) };
1677 let col = CudaColumn::dlpack_xlog_owned(Arc::new(slice), stream, tensor);
1678 assert!(
1679 !col.is_external(),
1680 "xlog-owned DLPack column must report is_external=false"
1681 );
1682 assert!(
1683 col.runtime_block().is_some(),
1684 "xlog-owned DLPack column over a runtime-backed slice must expose runtime_block"
1685 );
1686 }
1687
1688 #[test]
1695 fn test_xlog_owned_dlpack_legacy_backed_no_runtime_block() {
1696 let Some(device) = try_device() else {
1697 return;
1698 };
1699 let manager = Arc::new(GpuMemoryManager::new(
1700 Arc::clone(&device),
1701 MemoryBudget::with_limit(1024 * 1024),
1702 ));
1703 let slice = manager.alloc::<u8>(64).expect("alloc legacy");
1704 assert!(slice.runtime_block().is_none());
1705 let stream = device.inner().stream().clone();
1706 let tensor = unsafe { DlpackManagedTensor::from_raw(std::ptr::null_mut()) };
1707 let col = CudaColumn::dlpack_xlog_owned(Arc::new(slice), stream, tensor);
1708 assert!(
1709 !col.is_external(),
1710 "xlog-owned DLPack column is owned regardless of allocator backing"
1711 );
1712 assert!(
1713 col.runtime_block().is_none(),
1714 "legacy-backed slice has no runtime block, even when wrapped xlog-owned"
1715 );
1716 }
1717
1718 #[test]
1723 fn test_external_dlpack_remains_external() {
1724 let Some(device) = try_device() else {
1725 return;
1726 };
1727 let stream = device.inner().stream().clone();
1728 let tensor = unsafe { DlpackManagedTensor::from_raw(std::ptr::null_mut()) };
1729 let col = CudaColumn::dlpack(0, 0, stream, tensor);
1732 assert!(
1733 col.is_external(),
1734 "true external DLPack column must report is_external=true"
1735 );
1736 assert!(
1737 col.runtime_block().is_none(),
1738 "true external DLPack column has no xlog-side runtime block"
1739 );
1740 }
1741
1742 #[test]
1747 fn test_xlog_owned_arrow_device_runtime_backed_carries_identity() {
1748 let Some((device, runtime)) = try_runtime() else {
1749 return;
1750 };
1751 let manager = Arc::new(GpuMemoryManager::with_runtime(
1752 Arc::clone(&device),
1753 MemoryBudget::with_limit(1024 * 1024),
1754 Arc::clone(&runtime),
1755 ));
1756 let slice = manager.alloc::<u8>(64).expect("alloc runtime-backed");
1757 assert!(slice.runtime_block().is_some());
1758 let stream = device.inner().stream().clone();
1759 let import = Arc::new(crate::arrow_device::ArrowDeviceImport::new(
1763 arrow::array::ArrayData::new_null(&arrow::datatypes::DataType::UInt8, 0),
1764 ));
1765 let col = CudaColumn::arrow_device_xlog_owned(Arc::new(slice), stream, import);
1766 assert!(
1767 !col.is_external(),
1768 "xlog-owned Arrow device column must report is_external=false"
1769 );
1770 assert!(
1771 col.runtime_block().is_some(),
1772 "xlog-owned Arrow column over a runtime-backed slice must expose runtime_block"
1773 );
1774 }
1775
1776 #[test]
1779 fn test_external_arrow_device_remains_external() {
1780 let Some(device) = try_device() else {
1781 return;
1782 };
1783 let stream = device.inner().stream().clone();
1784 let import = Arc::new(crate::arrow_device::ArrowDeviceImport::new(
1785 arrow::array::ArrayData::new_null(&arrow::datatypes::DataType::UInt8, 0),
1786 ));
1787 let col = CudaColumn::arrow_device(0, 0, stream, import);
1788 assert!(
1789 col.is_external(),
1790 "true external Arrow column must report is_external=true"
1791 );
1792 assert!(
1793 col.runtime_block().is_none(),
1794 "true external Arrow column has no xlog-side runtime block"
1795 );
1796 }
1797}