xlog_cuda/provider/transfer.rs
1//! Host <-> device transfer operations.
2//!
3//! Generic versions of `download_column` and `create_buffer_from_slice`,
4//! replacing the 15 type-specialized functions with 3 generics.
5
6use std::sync::atomic::Ordering;
7use xlog_core::{Result, Schema, XlogError};
8
9use crate::type_seam::GpuScalar;
10use crate::CudaBuffer;
11
12impl super::CudaKernelProvider {
13 /// Download a single column from GPU to host as `Vec<T>`.
14 ///
15 /// Replaces: `download_column_u32`, `download_column_u64`,
16 /// `download_column_i32`, `download_column_i64`, `download_column_f32`,
17 /// `download_column_f64`, `download_column_u8`, `download_column_bool`.
18 ///
19 /// Increments `d2h_transfer_count` (the per-call ILP-style counter)
20 /// and is checked by the strict deterministic-Datalog D2H guard when
21 /// it is enabled (see `enable_strict_deterministic_d2h`).
22 pub fn download_column<T: GpuScalar>(
23 &self,
24 buffer: &CudaBuffer,
25 col_idx: usize,
26 ) -> Result<Vec<T>> {
27 // Gate first so a deterministic-strict run fails before any host
28 // allocation or counter mutation. Zero-row downloads are a no-op
29 // in `download_column_inner` and never issue a D2H transfer, so
30 // the gate must not fire for them. The row count we resolve here
31 // is threaded into the inner helper so it does not look it up
32 // again — the cache makes a second call cheap, but the explicit
33 // hand-off keeps the contract clear.
34 let num_rows = self.device_row_count(buffer)?;
35 if num_rows > 0 {
36 self.gate_column_download::<T>("download_column", num_rows)?;
37 self.d2h_transfer_count.fetch_add(1, Ordering::Relaxed);
38 }
39 self.download_column_inner_with_rows::<T>(buffer, col_idx, num_rows)
40 }
41
42 /// Download a column WITHOUT incrementing the per-call
43 /// `d2h_transfer_count` (the ILP-style counter). Records in
44 /// `transfer_tracker` for byte/call profiling stats.
45 ///
46 /// IS still checked by the strict deterministic-Datalog D2H guard when
47 /// it is enabled — "untracked" only refers to `d2h_transfer_count`,
48 /// not to the deterministic gate. Use `dtoh_scalar_untracked` for
49 /// metadata reads that must remain allowed under the gate.
50 ///
51 /// Replaces: `download_f64_untracked` (now generic over `T`).
52 pub fn download_column_untracked<T: GpuScalar>(
53 &self,
54 buffer: &CudaBuffer,
55 col_idx: usize,
56 ) -> Result<Vec<T>> {
57 let col = buffer.column(col_idx).ok_or_else(|| {
58 XlogError::kernel_ctx("download_column_untracked", "column not found", &col_idx)
59 })?;
60
61 let num_rows = self.device_row_count(buffer)?;
62 if num_rows == 0 {
63 return Ok(vec![]);
64 }
65
66 // Gate first so a deterministic-strict run fails before any host
67 // allocation, mirroring `download_column`. The downstream
68 // `dtoh_sync_copy_into_tracked` would also gate, but we stop earlier
69 // and with a more specific op label.
70 self.gate_column_download::<T>("download_column_untracked", num_rows)?;
71
72 let num_bytes = num_rows.checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
73 XlogError::kernel_ctx("download_column_untracked", "byte size overflow", &num_rows)
74 })?;
75 let col_view = self.column_bytes_view(col, num_bytes)?;
76 let mut bytes = vec![0u8; num_bytes];
77 self.dtoh_sync_copy_into_tracked(&col_view, &mut bytes)?;
78
79 Ok(bytes
80 .chunks_exact(T::BYTE_WIDTH)
81 .map(|c| T::from_le_bytes(c))
82 .collect())
83 }
84
85 /// Shared implementation for tracked column downloads, with the row
86 /// count threaded in by the caller so we do not look it up twice.
87 ///
88 /// Uses the provider's tracked D2H chokepoint so transfer-budget traces
89 /// observe column downloads. The caller is responsible for the early
90 /// strict deterministic-D2H gate check and for incrementing
91 /// `d2h_transfer_count`; this helper assumes both have already happened.
92 fn download_column_inner_with_rows<T: GpuScalar>(
93 &self,
94 buffer: &CudaBuffer,
95 col_idx: usize,
96 num_rows: usize,
97 ) -> Result<Vec<T>> {
98 let col = buffer.column(col_idx).ok_or_else(|| {
99 XlogError::kernel_ctx("download_column", "column not found", &col_idx)
100 })?;
101
102 if num_rows == 0 {
103 return Ok(vec![]);
104 }
105
106 let num_bytes = num_rows.checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
107 XlogError::kernel_ctx("download_column", "byte size overflow", &num_rows)
108 })?;
109 let col_view = self.column_bytes_view(col, num_bytes)?;
110 let mut bytes = vec![0u8; num_bytes];
111 self.dtoh_sync_copy_into_tracked(&col_view, &mut bytes)
112 .map_err(|e| XlogError::kernel_ctx("download_column", "dtoh copy failed", &e))?;
113
114 Ok(bytes
115 .chunks_exact(T::BYTE_WIDTH)
116 .map(|c| T::from_le_bytes(c))
117 .collect())
118 }
119
120 /// Upload a typed slice as a single-column GPU buffer.
121 ///
122 /// Replaces: `create_buffer_from_u32_slice`, `create_buffer_from_u64_slice`,
123 /// `create_buffer_from_i32_slice`, `create_buffer_from_i64_slice`,
124 /// `create_buffer_from_f32_slice`, `create_buffer_from_f64_slice`,
125 /// `create_buffer_from_u8_slice`.
126 pub fn create_buffer_from_slice<T: GpuScalar>(
127 &self,
128 data: &[T],
129 schema: Schema,
130 ) -> Result<CudaBuffer> {
131 let num_bytes = data.len().checked_mul(T::BYTE_WIDTH).ok_or_else(|| {
132 XlogError::kernel_ctx(
133 "create_buffer_from_slice",
134 "byte size overflow",
135 &data.len(),
136 )
137 })?;
138
139 let mut bytes = vec![0u8; num_bytes];
140 for (i, val) in data.iter().enumerate() {
141 let offset = i * T::BYTE_WIDTH;
142 val.to_le_bytes_into(&mut bytes[offset..offset + T::BYTE_WIDTH]);
143 }
144
145 let mut col = self.memory.alloc::<u8>(bytes.len())?;
146 self.htod_sync_copy_into_tracked(&bytes, &mut col)
147 .map_err(|e| {
148 XlogError::kernel_ctx("create_buffer_from_slice", "htod copy failed", &e)
149 })?;
150
151 self.buffer_from_columns(vec![col.into()], data.len() as u64, schema)
152 }
153
154 /// Probe the deterministic-D2H gate for a column download of `num_rows`
155 /// rows of scalar `T`. Returns `Err` and increments the violation counter
156 /// when the gate is enabled. Used by `download_column` and
157 /// `download_column_untracked` so the gate fires before any host buffer
158 /// is allocated or counter mutated, mirroring the chokepoint in
159 /// `dtoh_sync_copy_into_tracked`.
160 fn gate_column_download<T: GpuScalar>(&self, op: &'static str, num_rows: usize) -> Result<()> {
161 let bytes = num_rows
162 .checked_mul(T::BYTE_WIDTH)
163 .ok_or_else(|| XlogError::kernel_ctx(op, "byte size overflow", &num_rows))?;
164 self.check_deterministic_d2h(op, bytes as u64)
165 }
166}