Skip to main content

xlog_cuda/
arrow_device.rs

1use std::ffi::c_void;
2use std::sync::Arc;
3
4use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
5
6use crate::memory::{CudaBuffer, TrackedCudaSlice};
7
8pub const ARROW_DEVICE_CUDA: i32 = 2;
9
10pub(crate) struct ArrowCudaAllocation {
11    _buffer: Arc<CudaBuffer>,
12    _extra: Vec<TrackedCudaSlice<u8>>,
13}
14
15impl ArrowCudaAllocation {
16    pub(crate) fn new(buffer: Arc<CudaBuffer>, extra: Vec<TrackedCudaSlice<u8>>) -> Self {
17        Self {
18            _buffer: buffer,
19            _extra: extra,
20        }
21    }
22}
23
24// SAFETY: ArrowCudaAllocation is used only as a keepalive handle for GPU buffers.
25// Dropping CUDA allocations is thread-safe, and no device memory is accessed.
26unsafe impl Send for ArrowCudaAllocation {}
27unsafe impl Sync for ArrowCudaAllocation {}
28
29#[repr(C)]
30pub struct ArrowDeviceArray {
31    pub device_type: i32,
32    pub device_id: i32,
33    pub array: *mut FFI_ArrowArray,
34    pub schema: *mut FFI_ArrowSchema,
35    pub release: Option<unsafe extern "C" fn(*mut ArrowDeviceArray)>,
36    pub private_data: *mut c_void,
37}
38
39unsafe extern "C" fn release_arrow_device_array(ptr: *mut ArrowDeviceArray) {
40    if ptr.is_null() {
41        return;
42    }
43    let dev = &mut *ptr;
44    if !dev.array.is_null() {
45        // SAFETY: dev.array is non-null (checked); was originally created via Box::into_raw; we are the sole owner
46        unsafe {
47            drop(Box::from_raw(dev.array));
48        }
49    }
50    if !dev.schema.is_null() {
51        // SAFETY: dev.schema is non-null (checked); was originally created via Box::into_raw; we are the sole owner
52        unsafe {
53            drop(Box::from_raw(dev.schema));
54        }
55    }
56    if !dev.private_data.is_null() {
57        // SAFETY: memory layout is guaranteed by the Arrow C Data Interface specification
58        unsafe {
59            drop(Box::from_raw(
60                dev.private_data.cast::<ArrowCudaAllocation>(),
61            ));
62        }
63    }
64    dev.release = None;
65}
66
67pub struct ArrowDeviceArrayOwned {
68    ptr: *mut ArrowDeviceArray,
69}
70
71impl ArrowDeviceArrayOwned {
72    pub fn as_ptr(&self) -> *mut ArrowDeviceArray {
73        self.ptr
74    }
75
76    pub fn into_raw(self) -> *mut ArrowDeviceArray {
77        let ptr = self.ptr;
78        std::mem::forget(self);
79        ptr
80    }
81
82    /// Rebuild an owned wrapper from a raw `ArrowDeviceArray` pointer.
83    ///
84    /// # Safety
85    /// `ptr` must be a valid, uniquely owned pointer produced by
86    /// `ArrowDeviceArrayOwned::into_raw` or an equivalent allocation that
87    /// transfers ownership of the underlying `ArrowDeviceArray`.
88    pub unsafe fn from_raw(ptr: *mut ArrowDeviceArray) -> Self {
89        Self { ptr }
90    }
91}
92
93impl Drop for ArrowDeviceArrayOwned {
94    fn drop(&mut self) {
95        // SAFETY: ptr is non-null (checked); release was set by the Arrow producer and is valid for the array lifetime
96        unsafe {
97            if !self.ptr.is_null() {
98                if let Some(release) = (*self.ptr).release {
99                    release(self.ptr);
100                }
101            }
102        }
103    }
104}
105
106impl ArrowDeviceArray {
107    #[allow(clippy::new_ret_no_self)]
108    pub fn new(
109        device_type: i32,
110        device_id: i32,
111        array: *mut FFI_ArrowArray,
112        schema: *mut FFI_ArrowSchema,
113    ) -> ArrowDeviceArrayOwned {
114        let dev = ArrowDeviceArray {
115            device_type,
116            device_id,
117            array,
118            schema,
119            release: Some(release_arrow_device_array),
120            private_data: std::ptr::null_mut(),
121        };
122        ArrowDeviceArrayOwned {
123            ptr: Box::into_raw(Box::new(dev)),
124        }
125    }
126}
127
128/// Keepalive wrapper for imported Arrow device arrays.
129///
130/// Holds the Arrow ArrayData so the FFI buffers remain alive until all
131/// device-backed columns are dropped.
132pub struct ArrowDeviceImport {
133    _data: arrow::array::ArrayData,
134}
135
136impl ArrowDeviceImport {
137    pub fn new(data: arrow::array::ArrayData) -> Self {
138        Self { _data: data }
139    }
140}
141
142impl ArrowDeviceArrayOwned {
143    /// Take ownership of the underlying FFI array + schema.
144    ///
145    /// # Safety
146    /// The caller must ensure the returned FFI objects are eventually released.
147    pub unsafe fn into_ffi_parts(self) -> (i32, i32, FFI_ArrowArray, FFI_ArrowSchema) {
148        let ptr = self.into_raw();
149        let dev = &mut *ptr;
150        let device_type = dev.device_type;
151        let device_id = dev.device_id;
152
153        let array_ptr = dev.array;
154        let schema_ptr = dev.schema;
155        dev.array = std::ptr::null_mut();
156        dev.schema = std::ptr::null_mut();
157
158        if let Some(release) = dev.release {
159            release(ptr);
160        } else {
161            drop(Box::from_raw(ptr));
162        }
163
164        // Take ownership of the exported FFI structs and free the heap allocations
165        // that held them. The returned values will run their own `Drop` (calling
166        // the Arrow C-Data release callbacks) when eventually dropped.
167        let array = *Box::from_raw(array_ptr);
168        let schema = *Box::from_raw(schema_ptr);
169        (device_type, device_id, array, schema)
170    }
171}