Skip to main content

xlog_cuda/provider/
transfer.rs

1//! Host <-> device transfer operations.
2//!
3//! Generic versions of `download_column` and `create_buffer_from_slice`,
4//! replacing the 15 type-specialized functions with 3 generics.
5
6use std::sync::atomic::Ordering;
7use xlog_core::{Result, Schema, XlogError};
8
9use crate::type_seam::GpuScalar;
10use crate::CudaBuffer;
11
12impl super::CudaKernelProvider {
13    /// Download a single column from GPU to host as `Vec<T>`.
14    ///
15    /// Replaces: `download_column_u32`, `download_column_u64`,
16    /// `download_column_i32`, `download_column_i64`, `download_column_f32`,
17    /// `download_column_f64`, `download_column_u8`, `download_column_bool`.
18    ///
19    /// Increments `d2h_transfer_count` (the per-call ILP-style counter)
20    /// and is checked by the strict deterministic-Datalog D2H guard when
21    /// it is enabled (see `enable_strict_deterministic_d2h`).
22    pub fn download_column<T: GpuScalar>(
23        &self,
24        buffer: &CudaBuffer,
25        col_idx: usize,
26    ) -> Result<Vec<T>> {
27        // Gate first so a deterministic-strict run fails before any host
28        // allocation or counter mutation. Zero-row downloads are a no-op
29        // in `download_column_inner` and never issue a D2H transfer, so
30        // the gate must not fire for them. The row count we resolve here
31        // is threaded into the inner helper so it does not look it up
32        // again — the cache makes a second call cheap, but the explicit
33        // hand-off keeps the contract clear.
34        let num_rows = self.device_row_count(buffer)?;
35        if num_rows > 0 {
36            self.gate_column_download::<T>("download_column", num_rows)?;
37            self.d2h_transfer_count.fetch_add(1, Ordering::Relaxed);
38        }
39        self.download_column_inner_with_rows::<T>(buffer, col_idx, num_rows)
40    }
41
42    /// Download a column WITHOUT incrementing the per-call
43    /// `d2h_transfer_count` (the ILP-style counter). Records in
44    /// `transfer_tracker` for byte/call profiling stats.
45    ///
46    /// IS still checked by the strict deterministic-Datalog D2H guard when
47    /// it is enabled — "untracked" only refers to `d2h_transfer_count`,
48    /// not to the deterministic gate. Use `dtoh_scalar_untracked` for
49    /// metadata reads that must remain allowed under the gate.
50    ///
51    /// Replaces: `download_f64_untracked` (now generic over `T`).
52    pub fn download_column_untracked<T: GpuScalar>(
53        &self,
54        buffer: &CudaBuffer,
55        col_idx: usize,
56    ) -> Result<Vec<T>> {
57        let col = buffer.column(col_idx).ok_or_else(|| {
58            XlogError::kernel_ctx("download_column_untracked", "column not found", &col_idx)
59        })?;
60
61        let num_rows = self.device_row_count(buffer)?;
62        if num_rows == 0 {
63            return Ok(vec![]);
64        }
65
66        // Gate first so a deterministic-strict run fails before any host
67        // allocation, mirroring `download_column`. The downstream
68        // `dtoh_sync_copy_into_tracked` would also gate, but we stop earlier
69        // and with a more specific op label.
70        self.gate_column_download::<T>("download_column_untracked", num_rows)?;
71
72        let num_bytes = num_rows.checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
73            XlogError::kernel_ctx("download_column_untracked", "byte size overflow", &num_rows)
74        })?;
75        let col_view = self.column_bytes_view(col, num_bytes)?;
76        let mut bytes = vec![0u8; num_bytes];
77        self.dtoh_sync_copy_into_tracked(&col_view, &mut bytes)?;
78
79        Ok(bytes
80            .chunks_exact(T::BYTE_WIDTH)
81            .map(|c| T::from_le_bytes(c))
82            .collect())
83    }
84
85    /// Shared implementation for tracked column downloads, with the row
86    /// count threaded in by the caller so we do not look it up twice.
87    ///
88    /// Uses the provider's tracked D2H chokepoint so transfer-budget traces
89    /// observe column downloads. The caller is responsible for the early
90    /// strict deterministic-D2H gate check and for incrementing
91    /// `d2h_transfer_count`; this helper assumes both have already happened.
92    fn download_column_inner_with_rows<T: GpuScalar>(
93        &self,
94        buffer: &CudaBuffer,
95        col_idx: usize,
96        num_rows: usize,
97    ) -> Result<Vec<T>> {
98        let col = buffer.column(col_idx).ok_or_else(|| {
99            XlogError::kernel_ctx("download_column", "column not found", &col_idx)
100        })?;
101
102        if num_rows == 0 {
103            return Ok(vec![]);
104        }
105
106        let num_bytes = num_rows.checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
107            XlogError::kernel_ctx("download_column", "byte size overflow", &num_rows)
108        })?;
109        let col_view = self.column_bytes_view(col, num_bytes)?;
110        let mut bytes = vec![0u8; num_bytes];
111        self.dtoh_sync_copy_into_tracked(&col_view, &mut bytes)
112            .map_err(|e| XlogError::kernel_ctx("download_column", "dtoh copy failed", &e))?;
113
114        Ok(bytes
115            .chunks_exact(T::BYTE_WIDTH)
116            .map(|c| T::from_le_bytes(c))
117            .collect())
118    }
119
120    /// Upload a typed slice as a single-column GPU buffer.
121    ///
122    /// Replaces: `create_buffer_from_u32_slice`, `create_buffer_from_u64_slice`,
123    /// `create_buffer_from_i32_slice`, `create_buffer_from_i64_slice`,
124    /// `create_buffer_from_f32_slice`, `create_buffer_from_f64_slice`,
125    /// `create_buffer_from_u8_slice`.
126    pub fn create_buffer_from_slice<T: GpuScalar>(
127        &self,
128        data: &[T],
129        schema: Schema,
130    ) -> Result<CudaBuffer> {
131        let num_bytes = data.len().checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
132            XlogError::kernel_ctx(
133                "create_buffer_from_slice",
134                "byte size overflow",
135                &data.len(),
136            )
137        })?;
138
139        let mut bytes = vec![0u8; num_bytes];
140        for (i, val) in data.iter().enumerate() {
141            let offset = i * T::BYTE_WIDTH;
142            val.to_le_bytes_into(&mut bytes[offset..offset + T::BYTE_WIDTH]);
143        }
144
145        let mut col = self.memory.alloc::<u8>(bytes.len())?;
146        self.htod_sync_copy_into_tracked(&bytes, &mut col)
147            .map_err(|e| {
148                XlogError::kernel_ctx("create_buffer_from_slice", "htod copy failed", &e)
149            })?;
150
151        self.buffer_from_columns(vec![col.into()], data.len() as u64, schema)
152    }
153
154    /// Probe the deterministic-D2H gate for a column download of `num_rows`
155    /// rows of scalar `T`. Returns `Err` and increments the violation counter
156    /// when the gate is enabled. Used by `download_column` and
157    /// `download_column_untracked` so the gate fires before any host buffer
158    /// is allocated or counter mutated, mirroring the chokepoint in
159    /// `dtoh_sync_copy_into_tracked`.
160    fn gate_column_download<T: GpuScalar>(&self, op: &'static str, num_rows: usize) -> Result<()> {
161        let bytes = num_rows
162            .checked_mul(T::BYTE_WIDTH)
163            .ok_or_else(|| XlogError::kernel_ctx(op, "byte size overflow", &num_rows))?;
164        self.check_deterministic_d2h(op, bytes as u64)
165    }
166}