Skip to main content

xlog_cuda/provider/
filter.rs

1//! Filter and compare operations on GPU buffers.
2//!
3//! Generic versions of `filter`, `compare_columns`, plus the shared helpers
4//! `compare_const_mask` and `compare_columns_mask` (moved from mod.rs).
5
6use std::marker::PhantomData;
7
8use crate::{DeviceRepr, DeviceSlice, KernelScalar, LaunchAsync, LaunchConfig};
9use xlog_core::{Result, ScalarType, XlogError};
10
11use super::{filter_kernels, scan_kernels, RawCudaView, FILTER_MODULE, SCAN_MODULE};
12use crate::device_runtime::StreamId;
13use crate::launch::LaunchRecorder;
14use crate::memory::{CudaColumn, TrackedCudaSlice};
15use crate::type_seam::GpuScalar;
16use crate::{CompareOp, CudaBuffer};
17
18impl super::CudaKernelProvider {
19    // ------------------------------------------------------------------
20    // Generic public API
21    // ------------------------------------------------------------------
22
23    /// Generic compare-columns: produce a device mask for `left <op> right`.
24    ///
25    /// Replaces: `compare_columns_u32`, `compare_columns_i32`, `compare_columns_i64`,
26    /// `compare_columns_u64`, `compare_columns_f32`, `compare_columns_f64`,
27    /// `compare_columns_u8`.
28    pub fn compare_columns<T: GpuScalar>(
29        &self,
30        input: &CudaBuffer,
31        left: usize,
32        right: usize,
33        op: CompareOp,
34    ) -> Result<TrackedCudaSlice<u8>> {
35        self.compare_columns_mask::<T>(
36            input,
37            left,
38            right,
39            op,
40            T::allowed_scalar_types(),
41            T::compare_col_kernel(),
42        )
43    }
44
45    /// Generic filter: keep rows where `column[col] <op> value`.
46    ///
47    /// Dispatches between fused compare+scan+compact (u32, f64) and
48    /// mask+compact (all other types).
49    ///
50    /// Replaces: `filter_u32`, `filter_f64`, `filter_i32`, `filter_u64`,
51    /// `filter_f32`, `filter_bool`.
52    pub fn filter<T: GpuScalar>(
53        &self,
54        input: &CudaBuffer,
55        col: usize,
56        value: T,
57        op: CompareOp,
58    ) -> Result<CudaBuffer> {
59        // Opt-in dispatch: route through the recorded filter path
60        // when the env var is set AND the manager is runtime-backed
61        // AND a launch stream can be acquired from the pool.
62        // Default behavior is unchanged (legacy fused / mask+compact).
63        if Self::use_recorded_filters_env() {
64            if let Some(launch_stream) = self.recorded_op_stream_or_init() {
65                return self.filter_recorded::<T>(input, col, value, op, launch_stream);
66            }
67        }
68
69        if input.is_empty() {
70            return self.create_empty_buffer(input.schema.clone());
71        }
72
73        if T::filter_scan_phase1_kernel().is_some() {
74            // Fused compare+scan+compact path (u32, f64).
75            self.filter_fused_scan::<T>(input, col, value, op)
76        } else {
77            // Mask + compact path (i32, u64, f32, i64, u8, bool).
78            let mask = self.compare_const_mask::<T>(
79                input,
80                col,
81                value,
82                op,
83                T::allowed_scalar_types(),
84                T::filter_compare_kernel(),
85            )?;
86            self.filter_by_device_mask(input, &mask)
87        }
88    }
89
90    /// Strict-recorder, end-to-end variant of [`Self::filter`] —
91    /// the first composed migrated DATA path.
92    ///
93    /// Composes [`Self::compare_const_mask_recorded`] and
94    /// [`Self::compact_buffer_by_device_mask_counted_recorded`]
95    /// on a single `launch_stream`. Each primitive builds its
96    /// own [`crate::launch::LaunchRecorder`], records its uses,
97    /// preflights, runs its kernels, and commits independently.
98    ///
99    /// Composition correctness rests on the runtime's
100    /// "record-all, wait-all" semantics: every
101    /// `record_block_use` call APPENDS a fresh event to the
102    /// live entry's `last_use_events: Vec<CudaEvent>`, and
103    /// `deallocate` waits on EVERY event in that vector before
104    /// queueing `cuMemFreeAsync`. So the compare's commit and
105    /// the compact's later commit each push their own event
106    /// for `input.column[i]` (and other shared buffers), and
107    /// the deallocate gates the free behind both — closing
108    /// the cross-stream lifetime gap end-to-end. (Latest-event
109    /// coalescing per `(block, launch_stream)` is a possible
110    /// future optimization; today every recorded use is
111    /// retained and waited on.)
112    ///
113    /// # Dispatch
114    /// For types with a fused `filter_compare_*_scan_phase1`
115    /// kernel (`u32`, `f64`), routes to
116    /// [`Self::filter_fused_scan_recorded`] — single-pass
117    /// compare+scan+compact mirror of the legacy fast path.
118    /// For all other types, composes
119    /// [`Self::compare_const_mask_recorded`] +
120    /// [`Self::compact_buffer_by_device_mask_counted_recorded`].
121    ///
122    /// # Errors
123    /// Propagates the structured `XlogError::Kernel` errors
124    /// produced by either underlying recorded primitive
125    /// (legacy manager, unresolved launch_stream, external
126    /// column, preflight / commit failures, kernel launch
127    /// failures, `cu_stream.synchronize()` before host scalar
128    /// read).
129    pub fn filter_recorded<T: GpuScalar>(
130        &self,
131        input: &CudaBuffer,
132        col: usize,
133        value: T,
134        op: CompareOp,
135        launch_stream: StreamId,
136    ) -> Result<CudaBuffer> {
137        if input.is_empty() {
138            return self.create_empty_buffer(input.schema.clone());
139        }
140        if T::filter_scan_phase1_kernel().is_some() {
141            return self.filter_fused_scan_recorded::<T>(input, col, value, op, launch_stream);
142        }
143        let d_mask = self.compare_const_mask_recorded::<T>(input, col, value, op, launch_stream)?;
144        self.compact_buffer_by_device_mask_counted_recorded(input, &d_mask, launch_stream)
145    }
146
147    /// Strict-recorder variant of [`Self::filter_fused_scan`] —
148    /// the migrated fused compare+scan+compact fast path for
149    /// `u32` and `f64`.
150    ///
151    /// Mirrors the legacy chain on a single explicit
152    /// `launch_stream`:
153    ///   1. `filter_compare_T_scan_phase1` — fused compare +
154    ///      block-local scan that produces `d_mask`,
155    ///      `d_prefix_sum`, `d_block_sums` in one launch.
156    ///   2. When `num_blocks > 1`,
157    ///      `multiblock_scan_u32_inplace_on_stream` on
158    ///      `d_block_sums` followed by
159    ///      `multiblock_scan_phase3` to propagate block offsets
160    ///      into `d_prefix_sum`.
161    ///   3. `capture_compact_count` — writes `d_out_count` for
162    ///      the masked total.
163    ///   4. `cu_stream.synchronize()` — explicitly orders the
164    ///      host scalar read of `d_out_count` against the
165    ///      pending capture kernel.
166    ///   5. `dtoh_scalar_untracked(&d_out_count, 0)` →
167    ///      `output_rows`.
168    ///   6. Per-input-column `compact_bytes_by_mask` on the
169    ///      same `launch_stream`.
170    ///
171    /// # Strict-mode contract
172    /// Identical to
173    /// [`Self::compact_buffer_by_device_mask_counted_recorded`]:
174    /// `input.num_rows_device()` and every `input.column(i)`
175    /// recorded as reads BEFORE preflight; every fresh
176    /// runtime-backed allocation (`d_mask`, `d_prefix_sum`,
177    /// `d_block_sums`, `d_out_count`, each `dst_col`) recorded
178    /// via `write` BEFORE preflight; the recorder snapshots
179    /// block identity at record time and drops the source
180    /// borrow, so kernel `&mut` borrows after preflight remain
181    /// valid before the kernels enqueue.
182    ///
183    /// # Panics
184    /// `T::filter_scan_phase1_kernel()` must be `Some` —
185    /// callers should only reach this method for `u32` / `f64`.
186    pub fn filter_fused_scan_recorded<T: GpuScalar>(
187        &self,
188        input: &CudaBuffer,
189        col: usize,
190        value: T,
191        op: CompareOp,
192        launch_stream: StreamId,
193    ) -> Result<CudaBuffer> {
194        let runtime = self.memory.runtime().ok_or_else(|| {
195            XlogError::Kernel(
196                "filter_fused_scan_recorded requires a runtime-backed GpuMemoryManager \
197                 (constructed via with_runtime)"
198                    .to_string(),
199            )
200        })?;
201        let pool = runtime.stream_pool();
202        let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
203            XlogError::Kernel(format!(
204                "filter_fused_scan_recorded: launch_stream StreamId({}) does not resolve",
205                launch_stream.0
206            ))
207        })?;
208
209        if input.num_rows() > u32::MAX as u64 {
210            return Err(XlogError::Kernel(format!(
211                "filter supports at most {} rows, got {}",
212                u32::MAX,
213                input.num_rows()
214            )));
215        }
216        if col >= input.arity() {
217            return Err(XlogError::Kernel(format!(
218                "Column index {} out of bounds (arity {})",
219                col,
220                input.arity()
221            )));
222        }
223        let col_type = input
224            .schema()
225            .column_type(col)
226            .ok_or_else(|| XlogError::Kernel("Missing column type".into()))?;
227        if !T::allowed_scalar_types().contains(&col_type) {
228            return Err(XlogError::Kernel(format!(
229                "Column {} is {:?} (expected one of {:?})",
230                col,
231                col_type,
232                T::allowed_scalar_types()
233            )));
234        }
235        if input.is_empty() {
236            return self.create_empty_buffer(input.schema.clone());
237        }
238
239        let n = input.num_rows() as usize;
240        let num_rows = input.num_rows() as u32;
241        let block_size = 256u32;
242        let num_blocks = num_rows.div_ceil(block_size);
243        let row_cap = u64::from(num_rows);
244        let device = self.device.inner();
245
246        let col_data = input
247            .column(col)
248            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col)))?;
249        let col_view = Self::column_as_typed_view::<T>(col_data, n)?;
250
251        // Allocate ALL fresh runtime-backed buffers BEFORE the
252        // recorder (Rust drop order — these are recorded as
253        // pre-launch writes via the standard `write` API; the
254        // recorder snapshots block identity at record time so
255        // the kernel `&mut` borrow after preflight is unaffected).
256        let d_mask = self.memory.alloc::<u8>(n)?;
257        let d_prefix_sum = self.memory.alloc::<u32>(n)?;
258        let mut d_block_sums = self.memory.alloc::<u32>(num_blocks as usize)?;
259        let mut d_out_count = self.memory.alloc::<u32>(1)?;
260
261        let mut dst_cols: Vec<TrackedCudaSlice<u8>> = Vec::with_capacity(input.columns.len());
262        for col_idx in 0..input.columns.len() {
263            let elem_size = input
264                .schema
265                .column_type(col_idx)
266                .map(|t| t.size_bytes())
267                .unwrap_or(4);
268            let output_bytes = (row_cap as usize) * elem_size;
269            dst_cols.push(self.memory.alloc::<u8>(output_bytes)?);
270        }
271
272        let mut rec = LaunchRecorder::new_strict(launch_stream);
273        rec.read(input.num_rows_device());
274        for col_idx in 0..input.columns.len() {
275            let src_col = input
276                .column(col_idx)
277                .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col_idx)))?;
278            rec.read_column(src_col);
279        }
280        rec.write(&d_mask);
281        rec.write(&d_prefix_sum);
282        rec.write(&d_block_sums);
283        rec.write(&d_out_count);
284        for dst_col in &dst_cols {
285            rec.write(dst_col);
286        }
287        rec.preflight(runtime).map_err(|e| {
288            XlogError::Kernel(format!(
289                "filter_fused_scan_recorded: launch recorder preflight failed: {}",
290                e
291            ))
292        })?;
293
294        // Step 1: fused compare + block-local scan.
295        let scan_kernel_name = T::filter_scan_phase1_kernel()
296            .expect("filter_fused_scan_recorded called without scan phase1 kernel");
297        let filter_scan_fn = device
298            .get_func(FILTER_MODULE, scan_kernel_name)
299            .ok_or_else(|| XlogError::Kernel(format!("{} kernel not found", scan_kernel_name)))?;
300        // SAFETY: filter_compare_*_scan_phase1(column, constant, num_rows,
301        // num_rows_device, op, mask, prefix_sum, block_sums)
302        unsafe {
303            filter_scan_fn.clone().launch_on_stream(
304                &cu_stream,
305                LaunchConfig {
306                    grid_dim: (num_blocks, 1, 1),
307                    block_dim: (block_size, 1, 1),
308                    shared_mem_bytes: 0,
309                },
310                (
311                    &col_view,
312                    value,
313                    num_rows,
314                    input.num_rows_device(),
315                    op as u8,
316                    &d_mask,
317                    &d_prefix_sum,
318                    &d_block_sums,
319                ),
320            )
321        }
322        .map_err(|e| {
323            XlogError::Kernel(format!("{} (on_stream) failed: {}", scan_kernel_name, e))
324        })?;
325
326        // Step 2: multi-block scan propagation (only when there
327        // is more than one block).
328        if num_blocks > 1 {
329            self.multiblock_scan_u32_inplace_on_stream(
330                &mut d_block_sums,
331                num_blocks,
332                &cu_stream,
333                launch_stream,
334                runtime,
335            )?;
336
337            let phase3_fn = device
338                .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE3)
339                .ok_or_else(|| {
340                    XlogError::Kernel("Failed to get multiblock_scan_phase3 kernel".to_string())
341                })?;
342            // SAFETY: multiblock_scan_phase3(prefix_sum, block_offsets, n)
343            unsafe {
344                phase3_fn.clone().launch_on_stream(
345                    &cu_stream,
346                    LaunchConfig {
347                        grid_dim: (num_blocks, 1, 1),
348                        block_dim: (block_size, 1, 1),
349                        shared_mem_bytes: 0,
350                    },
351                    (&d_prefix_sum, &d_block_sums, num_rows),
352                )
353            }
354            .map_err(|e| {
355                XlogError::Kernel(format!("multiblock_scan_phase3 (on_stream) failed: {}", e))
356            })?;
357        }
358
359        // Step 3: capture compact count on launch_stream.
360        let capture_fn = device
361            .get_func(FILTER_MODULE, filter_kernels::CAPTURE_COMPACT_COUNT)
362            .ok_or_else(|| {
363                XlogError::Kernel("capture_compact_count kernel not found".to_string())
364            })?;
365        // SAFETY: capture_compact_count(prefix_sum, mask, n, out_count)
366        unsafe {
367            capture_fn.clone().launch_on_stream(
368                &cu_stream,
369                LaunchConfig {
370                    grid_dim: (1, 1, 1),
371                    block_dim: (1, 1, 1),
372                    shared_mem_bytes: 0,
373                },
374                (&d_prefix_sum, &d_mask, num_rows, &mut d_out_count),
375            )
376        }
377        .map_err(|e| {
378            XlogError::Kernel(format!("capture_compact_count (on_stream) failed: {}", e))
379        })?;
380
381        // Step 4: explicit barrier before host scalar read.
382        // Non-blocking streams do NOT get default-stream
383        // implicit synchronization, so the dtoh_scalar_untracked
384        // call below would otherwise race the still-pending
385        // capture kernel.
386        cu_stream.synchronize().map_err(|e| {
387            XlogError::Kernel(format!(
388                "filter_fused_scan_recorded: launch_stream synchronize before host scalar \
389                 read failed: {}",
390                e
391            ))
392        })?;
393        let output_rows = self.dtoh_scalar_untracked(&d_out_count, 0)? as u64;
394
395        // Step 5: per-column compact_bytes_by_mask on
396        // launch_stream. Same shape as
397        // compact_buffer_by_device_mask_counted_recorded; only
398        // run when there are rows to keep.
399        if output_rows > 0 {
400            let compact_fn = device
401                .get_func(FILTER_MODULE, filter_kernels::COMPACT_BYTES_BY_MASK)
402                .ok_or_else(|| {
403                    XlogError::Kernel("compact_bytes_by_mask kernel not found".to_string())
404                })?;
405            let cfg = LaunchConfig {
406                grid_dim: (num_blocks, 1, 1),
407                block_dim: (block_size, 1, 1),
408                shared_mem_bytes: 0,
409            };
410            for (col_idx, dst_col) in dst_cols.iter().enumerate() {
411                let src_col = input
412                    .column(col_idx)
413                    .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col_idx)))?;
414                let elem_size = input
415                    .schema
416                    .column_type(col_idx)
417                    .map(|t| t.size_bytes())
418                    .unwrap_or(4) as u32;
419                // SAFETY: compact_bytes_by_mask(input, mask, prefix_sum, n, elem_size, output)
420                unsafe {
421                    compact_fn.clone().launch_on_stream(
422                        &cu_stream,
423                        cfg,
424                        (
425                            src_col,
426                            &d_mask,
427                            &d_prefix_sum,
428                            num_rows,
429                            elem_size,
430                            dst_col,
431                        ),
432                    )
433                }
434                .map_err(|e| {
435                    XlogError::Kernel(format!("compact_bytes_by_mask (on_stream) failed: {}", e))
436                })?;
437            }
438        }
439
440        // Record fresh writes via the post-preflight escape
441        // hatch and commit.
442        rec.commit(runtime).map_err(|e| {
443            XlogError::Kernel(format!(
444                "filter_fused_scan_recorded: launch recorder commit failed: {}",
445                e
446            ))
447        })?;
448
449        let new_columns: Vec<CudaColumn> = dst_cols.into_iter().map(|s| s.into()).collect();
450        Ok(CudaBuffer::from_columns_with_host_count(
451            new_columns,
452            row_cap,
453            d_out_count,
454            input.schema.clone(),
455            output_rows as u32,
456        ))
457    }
458
459    /// Strict-recorder, end-to-end variant of column-column
460    /// filter: keep rows where `column[left] <op> column[right]`.
461    ///
462    /// Composes [`Self::compare_columns_mask_recorded`] and
463    /// [`Self::compact_buffer_by_device_mask_counted_recorded`]
464    /// on a single `launch_stream`. Same composition contract
465    /// as [`Self::filter_recorded`]: each primitive builds its
466    /// own recorder and commits independently; the runtime
467    /// appends every recorded event to `last_use_events`, and
468    /// `deallocate` waits on every event, so input columns
469    /// referenced by BOTH the compare AND the per-column
470    /// compacts are correctly gated end-to-end.
471    ///
472    /// # Errors
473    /// Propagates the structured `XlogError::Kernel` errors
474    /// produced by either underlying recorded primitive
475    /// (legacy manager, unresolved launch_stream, external
476    /// column on either input side, preflight / commit
477    /// failures, kernel launch failures,
478    /// `cu_stream.synchronize()` before host scalar read).
479    pub fn filter_columns_recorded<T: GpuScalar>(
480        &self,
481        input: &CudaBuffer,
482        left: usize,
483        right: usize,
484        op: CompareOp,
485        launch_stream: StreamId,
486    ) -> Result<CudaBuffer> {
487        if input.is_empty() {
488            return self.create_empty_buffer(input.schema.clone());
489        }
490        let d_mask =
491            self.compare_columns_mask_recorded::<T>(input, left, right, op, launch_stream)?;
492        self.compact_buffer_by_device_mask_counted_recorded(input, &d_mask, launch_stream)
493    }
494
495    // ------------------------------------------------------------------
496    // Private helpers (moved from mod.rs)
497    // ------------------------------------------------------------------
498
499    /// Generate a device mask by comparing a column against a constant value.
500    ///
501    /// Each output byte is 1 if the comparison holds, 0 otherwise.
502    pub(crate) fn compare_const_mask<T: KernelScalar>(
503        &self,
504        input: &CudaBuffer,
505        col: usize,
506        value: T,
507        op: CompareOp,
508        allowed_types: &[ScalarType],
509        kernel: &str,
510    ) -> Result<TrackedCudaSlice<u8>> {
511        if input.num_rows() > u32::MAX as u64 {
512            return Err(XlogError::Kernel(format!(
513                "Filter supports at most {} rows, got {}",
514                u32::MAX,
515                input.num_rows()
516            )));
517        }
518        if col >= input.arity() {
519            return Err(XlogError::Kernel(format!(
520                "Column index {} out of bounds (arity {})",
521                col,
522                input.arity()
523            )));
524        }
525
526        if input.is_empty() {
527            return self.memory.alloc::<u8>(0);
528        }
529
530        let col_type = input
531            .schema()
532            .column_type(col)
533            .ok_or_else(|| XlogError::Kernel("Missing column type".into()))?;
534        if !allowed_types.contains(&col_type) {
535            return Err(XlogError::Kernel(format!(
536                "Column {} is {:?} (expected {:?})",
537                col, col_type, allowed_types
538            )));
539        }
540
541        let num_rows = input.num_rows() as u32;
542        let expected_bytes = (num_rows as usize)
543            .checked_mul(std::mem::size_of::<T>())
544            .ok_or_else(|| XlogError::Kernel("filter compare size overflow".into()))?;
545        let col_data = input
546            .column(col)
547            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col)))?;
548        if col_data.num_bytes() != expected_bytes {
549            return Err(XlogError::Kernel(format!(
550                "Column {} has {} bytes but expected {} for {} rows",
551                col,
552                col_data.num_bytes(),
553                expected_bytes,
554                input.num_rows()
555            )));
556        }
557
558        let block_size = 256u32;
559        let num_blocks = num_rows.div_ceil(block_size);
560        let config = LaunchConfig {
561            grid_dim: (num_blocks, 1, 1),
562            block_dim: (block_size, 1, 1),
563            shared_mem_bytes: 0,
564        };
565
566        let mut d_mask = self.memory.alloc::<u8>(num_rows as usize)?;
567        let func = self
568            .device
569            .inner()
570            .get_func(FILTER_MODULE, kernel)
571            .ok_or_else(|| XlogError::Kernel("filter compare kernel not found".into()))?;
572
573        // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
574        unsafe {
575            func.clone()
576                .launch(config, (col_data, value, num_rows, op as u8, &mut d_mask))
577        }
578        .map_err(|e| XlogError::Kernel(format!("filter compare failed: {}", e)))?;
579
580        Ok(d_mask)
581    }
582
583    /// Strict-recorder variant of [`Self::compare_const_mask`].
584    ///
585    /// Runs the filter compare kernel on the caller-supplied
586    /// `launch_stream` and threads the column read through the
587    /// runtime via [`LaunchRecorder`]. This is the second
588    /// migrated launch path (after `memset_recorded`) and the
589    /// first kernel-driven one — it is intentionally a sibling
590    /// of the legacy [`Self::compare_const_mask`] rather than a
591    /// replacement. Existing callers stay on the legacy path
592    /// until the broader filter migration lands.
593    ///
594    /// # Strict-mode contract
595    /// * Requires the provider's manager to be built via
596    ///   [`crate::GpuMemoryManager::with_runtime`]; otherwise
597    ///   returns `XlogError::Kernel` before any allocation.
598    /// * `input.column(col)` is recorded as a read; external
599    ///   (`CudaColumn::Dlpack` / `CudaColumn::ArrowDevice`)
600    ///   columns are rejected at preflight, before the kernel
601    ///   is enqueued.
602    /// * `d_mask` is freshly allocated through the same
603    ///   runtime-backed manager. By construction its
604    ///   `runtime_block()` is `Some`, so its write recording
605    ///   cannot strict-reject. The write is therefore noted
606    ///   AFTER the kernel is enqueued — this sidesteps the
607    ///   borrow conflict between `&mut d_mask` (cudarc kernel
608    ///   param) and `&d_mask` (recorder). A future migration
609    ///   that may write to a buffer of unknown provenance must
610    ///   instead capture identity pre-launch (e.g. via a raw
611    ///   view) so strict rejection happens at preflight.
612    ///
613    /// # Errors
614    ///   * `XlogError::Kernel` if the manager has no runtime,
615    ///     or if `launch_stream` does not resolve.
616    ///   * `XlogError::Kernel` from preflight (external column,
617    ///     unsupported active resource).
618    ///   * `XlogError::Kernel` from the underlying CUDA launch.
619    ///   * `XlogError::Kernel` from commit on transient
620    ///     `record_block_use` failure.
621    pub fn compare_const_mask_recorded<T: GpuScalar>(
622        &self,
623        input: &CudaBuffer,
624        col: usize,
625        value: T,
626        op: CompareOp,
627        launch_stream: StreamId,
628    ) -> Result<TrackedCudaSlice<u8>> {
629        let allowed_types = T::allowed_scalar_types();
630        let kernel = T::filter_compare_kernel();
631        let runtime = self.memory.runtime().ok_or_else(|| {
632            XlogError::Kernel(
633                "compare_const_mask_recorded requires a runtime-backed GpuMemoryManager \
634                 (constructed via with_runtime)"
635                    .to_string(),
636            )
637        })?;
638        let pool = runtime.stream_pool();
639        let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
640            XlogError::Kernel(format!(
641                "compare_const_mask_recorded: launch_stream StreamId({}) does not resolve",
642                launch_stream.0
643            ))
644        })?;
645
646        if input.num_rows() > u32::MAX as u64 {
647            return Err(XlogError::Kernel(format!(
648                "Filter supports at most {} rows, got {}",
649                u32::MAX,
650                input.num_rows()
651            )));
652        }
653        if col >= input.arity() {
654            return Err(XlogError::Kernel(format!(
655                "Column index {} out of bounds (arity {})",
656                col,
657                input.arity()
658            )));
659        }
660
661        if input.is_empty() {
662            return self.memory.alloc::<u8>(0);
663        }
664
665        let col_type = input
666            .schema()
667            .column_type(col)
668            .ok_or_else(|| XlogError::Kernel("Missing column type".into()))?;
669        if !allowed_types.contains(&col_type) {
670            return Err(XlogError::Kernel(format!(
671                "Column {} is {:?} (expected {:?})",
672                col, col_type, allowed_types
673            )));
674        }
675
676        let num_rows = input.num_rows() as u32;
677        let expected_bytes = (num_rows as usize)
678            .checked_mul(std::mem::size_of::<T>())
679            .ok_or_else(|| XlogError::Kernel("filter compare size overflow".into()))?;
680        let col_data = input
681            .column(col)
682            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col)))?;
683        if col_data.num_bytes() != expected_bytes {
684            return Err(XlogError::Kernel(format!(
685                "Column {} has {} bytes but expected {} for {} rows",
686                col,
687                col_data.num_bytes(),
688                expected_bytes,
689                input.num_rows()
690            )));
691        }
692
693        let block_size = 256u32;
694        let num_blocks = num_rows.div_ceil(block_size);
695        let config = LaunchConfig {
696            grid_dim: (num_blocks, 1, 1),
697            block_dim: (block_size, 1, 1),
698            shared_mem_bytes: 0,
699        };
700
701        let mut d_mask = self.memory.alloc::<u8>(num_rows as usize)?;
702        let func = self
703            .device
704            .inner()
705            .get_func(FILTER_MODULE, kernel)
706            .ok_or_else(|| XlogError::Kernel("filter compare kernel not found".into()))?;
707
708        // Strict recorder + PREFLIGHT before the kernel queues.
709        // External columns (DLPack / Arrow) and unsupported
710        // active resources are caught here without any CUDA
711        // work in flight.
712        let mut rec = LaunchRecorder::new_strict(launch_stream);
713        rec.read_column(col_data);
714        rec.write(&d_mask);
715        rec.preflight(runtime).map_err(|e| {
716            XlogError::Kernel(format!(
717                "compare_const_mask_recorded: launch recorder preflight failed: {}",
718                e
719            ))
720        })?;
721
722        // SAFETY: PTX kernel signature matches the params tuple;
723        // col_data is validated above and lives through the
724        // launch (held by `input`); d_mask was allocated by the
725        // same runtime-backed manager and matches `num_rows`.
726        // launch_on_stream queues on `cu_stream` and returns
727        // immediately.
728        unsafe {
729            func.clone().launch_on_stream(
730                &cu_stream,
731                config,
732                (col_data, value, num_rows, op as u8, &mut d_mask),
733            )
734        }
735        .map_err(|e| {
736            XlogError::Kernel(format!("compare_const_mask_recorded launch failed: {}", e))
737        })?;
738
739        // Record the write AFTER the launch enqueues, using the
740        // explicit escape hatch. d_mask is the freshly-allocated
741        // runtime-backed output of THIS call; the kernel-param
742        // borrow rules force this ordering. See the
743        // "Strict-mode contract" on this method.
744        rec.commit(runtime).map_err(|e| {
745            XlogError::Kernel(format!(
746                "compare_const_mask_recorded: launch recorder commit failed: {}",
747                e
748            ))
749        })?;
750
751        Ok(d_mask)
752    }
753
754    /// Generate a device mask by comparing two columns element-wise.
755    pub(crate) fn compare_columns_mask<T: DeviceRepr>(
756        &self,
757        input: &CudaBuffer,
758        left: usize,
759        right: usize,
760        op: CompareOp,
761        allowed_types: &[ScalarType],
762        kernel: &str,
763    ) -> Result<TrackedCudaSlice<u8>> {
764        if input.num_rows() > u32::MAX as u64 {
765            return Err(XlogError::Kernel(format!(
766                "Filter supports at most {} rows, got {}",
767                u32::MAX,
768                input.num_rows()
769            )));
770        }
771        if left >= input.arity() || right >= input.arity() {
772            return Err(XlogError::Kernel(format!(
773                "Column indices {} or {} out of bounds (arity {})",
774                left,
775                right,
776                input.arity()
777            )));
778        }
779
780        if input.is_empty() {
781            return self.memory.alloc::<u8>(0);
782        }
783
784        let left_type = input
785            .schema()
786            .column_type(left)
787            .ok_or_else(|| XlogError::Kernel("Missing left column type".into()))?;
788        let right_type = input
789            .schema()
790            .column_type(right)
791            .ok_or_else(|| XlogError::Kernel("Missing right column type".into()))?;
792
793        if left_type != right_type {
794            return Err(XlogError::Kernel(
795                "Column-column compare requires matching types".into(),
796            ));
797        }
798        if !allowed_types.contains(&left_type) {
799            return Err(XlogError::Kernel(format!(
800                "Column type {:?} not supported for compare",
801                left_type
802            )));
803        }
804
805        let num_rows = input.num_rows() as u32;
806        let expected_bytes = (num_rows as usize)
807            .checked_mul(std::mem::size_of::<T>())
808            .ok_or_else(|| XlogError::Kernel("compare columns size overflow".into()))?;
809
810        let left_col = input
811            .column(left)
812            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", left)))?;
813        let right_col = input
814            .column(right)
815            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", right)))?;
816
817        if left_col.num_bytes() != expected_bytes || right_col.num_bytes() != expected_bytes {
818            return Err(XlogError::Kernel(format!(
819                "Compare columns expect {} bytes per column for {} rows",
820                expected_bytes,
821                input.num_rows()
822            )));
823        }
824
825        let block_size = 256u32;
826        let num_blocks = num_rows.div_ceil(block_size);
827        let config = LaunchConfig {
828            grid_dim: (num_blocks, 1, 1),
829            block_dim: (block_size, 1, 1),
830            shared_mem_bytes: 0,
831        };
832
833        let mut d_mask = self.memory.alloc::<u8>(num_rows as usize)?;
834        let func = self
835            .device
836            .inner()
837            .get_func(FILTER_MODULE, kernel)
838            .ok_or_else(|| XlogError::Kernel("filter compare kernel not found".into()))?;
839
840        // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
841        unsafe {
842            func.clone().launch(
843                config,
844                (left_col, right_col, num_rows, op as u8, &mut d_mask),
845            )
846        }
847        .map_err(|e| XlogError::Kernel(format!("filter compare failed: {}", e)))?;
848
849        Ok(d_mask)
850    }
851
852    /// Strict-recorder variant of [`Self::compare_columns_mask`].
853    ///
854    /// Runs the column-column compare kernel on the
855    /// caller-supplied `launch_stream` and threads BOTH column
856    /// reads through the runtime via [`LaunchRecorder`]. Sibling
857    /// of the legacy [`Self::compare_columns_mask`]; existing
858    /// callers stay on the legacy path.
859    ///
860    /// # Strict-mode contract
861    /// * Requires the provider's manager to be built via
862    ///   [`crate::GpuMemoryManager::with_runtime`]; otherwise
863    ///   returns `XlogError::Kernel` before any allocation.
864    /// * `input.column(left)` and `input.column(right)` are both
865    ///   recorded as reads BEFORE preflight. External (DLPack /
866    ///   Arrow) columns on either side are rejected at preflight,
867    ///   before the kernel is enqueued.
868    /// * `d_mask` is freshly allocated by the same runtime-backed
869    ///   manager; its write is recorded via the standard `write`
870    ///   API BEFORE preflight (the recorder snapshots block
871    ///   identity, so the kernel `&mut d_mask` borrow after
872    ///   preflight is unaffected).
873    ///
874    /// # Errors
875    ///   * `XlogError::Kernel` if the manager has no runtime,
876    ///     or if `launch_stream` does not resolve.
877    ///   * `XlogError::Kernel` from preflight (external column
878    ///     on either side, unsupported active resource).
879    ///   * `XlogError::Kernel` from the underlying CUDA launch.
880    ///   * `XlogError::Kernel` from commit on transient
881    ///     `record_block_use` failure.
882    pub fn compare_columns_mask_recorded<T: GpuScalar>(
883        &self,
884        input: &CudaBuffer,
885        left: usize,
886        right: usize,
887        op: CompareOp,
888        launch_stream: StreamId,
889    ) -> Result<TrackedCudaSlice<u8>> {
890        let allowed_types = T::allowed_scalar_types();
891        let kernel = T::compare_col_kernel();
892
893        let runtime = self.memory.runtime().ok_or_else(|| {
894            XlogError::Kernel(
895                "compare_columns_mask_recorded requires a runtime-backed GpuMemoryManager \
896                 (constructed via with_runtime)"
897                    .to_string(),
898            )
899        })?;
900        let pool = runtime.stream_pool();
901        let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
902            XlogError::Kernel(format!(
903                "compare_columns_mask_recorded: launch_stream StreamId({}) does not resolve",
904                launch_stream.0
905            ))
906        })?;
907
908        if input.num_rows() > u32::MAX as u64 {
909            return Err(XlogError::Kernel(format!(
910                "Filter supports at most {} rows, got {}",
911                u32::MAX,
912                input.num_rows()
913            )));
914        }
915        if left >= input.arity() || right >= input.arity() {
916            return Err(XlogError::Kernel(format!(
917                "Column indices {} or {} out of bounds (arity {})",
918                left,
919                right,
920                input.arity()
921            )));
922        }
923
924        if input.is_empty() {
925            return self.memory.alloc::<u8>(0);
926        }
927
928        let left_type = input
929            .schema()
930            .column_type(left)
931            .ok_or_else(|| XlogError::Kernel("Missing left column type".into()))?;
932        let right_type = input
933            .schema()
934            .column_type(right)
935            .ok_or_else(|| XlogError::Kernel("Missing right column type".into()))?;
936        if left_type != right_type {
937            return Err(XlogError::Kernel(
938                "Column-column compare requires matching types".into(),
939            ));
940        }
941        if !allowed_types.contains(&left_type) {
942            return Err(XlogError::Kernel(format!(
943                "Column type {:?} not supported for compare",
944                left_type
945            )));
946        }
947
948        let num_rows = input.num_rows() as u32;
949        let expected_bytes = (num_rows as usize)
950            .checked_mul(std::mem::size_of::<T>())
951            .ok_or_else(|| XlogError::Kernel("compare columns size overflow".into()))?;
952        let left_col = input
953            .column(left)
954            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", left)))?;
955        let right_col = input
956            .column(right)
957            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", right)))?;
958        if left_col.num_bytes() != expected_bytes || right_col.num_bytes() != expected_bytes {
959            return Err(XlogError::Kernel(format!(
960                "Compare columns expect {} bytes per column for {} rows",
961                expected_bytes,
962                input.num_rows()
963            )));
964        }
965
966        let block_size = 256u32;
967        let num_blocks = num_rows.div_ceil(block_size);
968        let config = LaunchConfig {
969            grid_dim: (num_blocks, 1, 1),
970            block_dim: (block_size, 1, 1),
971            shared_mem_bytes: 0,
972        };
973
974        let mut d_mask = self.memory.alloc::<u8>(num_rows as usize)?;
975        let func = self
976            .device
977            .inner()
978            .get_func(FILTER_MODULE, kernel)
979            .ok_or_else(|| XlogError::Kernel("filter compare kernel not found".into()))?;
980
981        // Record BOTH column reads BEFORE preflight. Strict mode
982        // catches external columns on either side here, before
983        // any CUDA work is queued.
984        let mut rec = LaunchRecorder::new_strict(launch_stream);
985        rec.read_column(left_col);
986        rec.read_column(right_col);
987        rec.write(&d_mask);
988        rec.preflight(runtime).map_err(|e| {
989            XlogError::Kernel(format!(
990                "compare_columns_mask_recorded: launch recorder preflight failed: {}",
991                e
992            ))
993        })?;
994
995        // SAFETY: PTX kernel signature matches the params tuple;
996        // both columns are validated above and live through the
997        // launch (held by `input`); d_mask was allocated by the
998        // same runtime-backed manager and matches `num_rows`.
999        // launch_on_stream queues on `cu_stream` and returns
1000        // immediately.
1001        unsafe {
1002            func.clone().launch_on_stream(
1003                &cu_stream,
1004                config,
1005                (left_col, right_col, num_rows, op as u8, &mut d_mask),
1006            )
1007        }
1008        .map_err(|e| {
1009            XlogError::Kernel(format!(
1010                "compare_columns_mask_recorded launch failed: {}",
1011                e
1012            ))
1013        })?;
1014
1015        // Record d_mask write AFTER the launch enqueues, via the
1016        // explicit escape hatch — d_mask is the freshly-allocated
1017        // runtime-backed output of THIS call.
1018        rec.commit(runtime).map_err(|e| {
1019            XlogError::Kernel(format!(
1020                "compare_columns_mask_recorded: launch recorder commit failed: {}",
1021                e
1022            ))
1023        })?;
1024
1025        Ok(d_mask)
1026    }
1027
1028    // ------------------------------------------------------------------
1029    // Fused compare+scan+compact path (generic over T)
1030    // ------------------------------------------------------------------
1031
1032    /// Fused compare+scan+compact filter. Used for types with a dedicated
1033    /// `filter_scan_phase1` kernel (u32 and f64).
1034    fn filter_fused_scan<T: GpuScalar>(
1035        &self,
1036        input: &CudaBuffer,
1037        col: usize,
1038        value: T,
1039        op: CompareOp,
1040    ) -> Result<CudaBuffer> {
1041        if input.num_rows() > u32::MAX as u64 {
1042            return Err(XlogError::Kernel(format!(
1043                "filter supports at most {} rows, got {}",
1044                u32::MAX,
1045                input.num_rows()
1046            )));
1047        }
1048
1049        let n = input.num_rows() as usize;
1050        let num_rows = input.num_rows() as u32;
1051        let device = self.device.inner();
1052
1053        // Validate column index
1054        if col >= input.arity() {
1055            return Err(XlogError::Kernel(format!(
1056                "Column index {} out of bounds (arity {})",
1057                col,
1058                input.arity()
1059            )));
1060        }
1061
1062        // Validate column type
1063        let col_type = input
1064            .schema()
1065            .column_type(col)
1066            .ok_or_else(|| XlogError::Kernel("Missing column type".into()))?;
1067        if !T::allowed_scalar_types().contains(&col_type) {
1068            return Err(XlogError::Kernel(format!(
1069                "Column {} is {:?} (expected one of {:?})",
1070                col,
1071                col_type,
1072                T::allowed_scalar_types()
1073            )));
1074        }
1075
1076        // Get the filter column as a typed view
1077        let col_data = input
1078            .column(col)
1079            .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col)))?;
1080        let col_view = Self::column_as_typed_view::<T>(col_data, n)?;
1081
1082        let block_size = 256u32;
1083        let num_blocks = num_rows.div_ceil(block_size);
1084        let config = LaunchConfig {
1085            grid_dim: (num_blocks, 1, 1),
1086            block_dim: (block_size, 1, 1),
1087            shared_mem_bytes: 0,
1088        };
1089
1090        // Fused compare + scan phase1.
1091        let d_mask = self.memory.alloc::<u8>(n)?;
1092        let d_prefix_sum = self.memory.alloc::<u32>(n)?;
1093        let mut d_block_sums = self.memory.alloc::<u32>(num_blocks as usize)?;
1094
1095        let scan_kernel_name = T::filter_scan_phase1_kernel()
1096            .expect("filter_fused_scan called without scan phase1 kernel");
1097        let filter_scan_fn = device
1098            .get_func(FILTER_MODULE, scan_kernel_name)
1099            .ok_or_else(|| XlogError::Kernel(format!("{} kernel not found", scan_kernel_name)))?;
1100
1101        // SAFETY: filter_compare_*_scan_phase1(column, constant, num_rows, num_rows_device, op, mask, prefix_sum, block_sums)
1102        unsafe {
1103            filter_scan_fn.clone().launch(
1104                config,
1105                (
1106                    &col_view,
1107                    value,
1108                    num_rows,
1109                    input.num_rows_device(),
1110                    op as u8,
1111                    &d_mask,
1112                    &d_prefix_sum,
1113                    &d_block_sums,
1114                ),
1115            )
1116        }
1117        .map_err(|e| XlogError::Kernel(format!("{} failed: {}", scan_kernel_name, e)))?;
1118
1119        if num_blocks > 1 {
1120            self.multiblock_scan_u32_inplace(&mut d_block_sums, num_blocks)?;
1121
1122            let phase3_fn = device
1123                .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE3)
1124                .ok_or_else(|| {
1125                    XlogError::Kernel("Failed to get multiblock_scan_phase3 kernel".to_string())
1126                })?;
1127
1128            // SAFETY: multiblock_scan_phase3(uint32_t* prefix_sum, const uint32_t* block_offsets, uint32_t n)
1129            unsafe {
1130                phase3_fn.clone().launch(
1131                    LaunchConfig {
1132                        grid_dim: (num_blocks, 1, 1),
1133                        block_dim: (block_size, 1, 1),
1134                        shared_mem_bytes: 0,
1135                    },
1136                    (&d_prefix_sum, &d_block_sums, num_rows),
1137                )
1138            }
1139            .map_err(|e| XlogError::Kernel(format!("multiblock_scan_phase3 failed: {}", e)))?;
1140        }
1141
1142        self.device.synchronize()?;
1143
1144        let d_out_count = self.capture_compact_count(&d_prefix_sum, &d_mask, num_rows)?;
1145        self.compact_buffer_by_device_mask_device_count(input, &d_mask, &d_prefix_sum, d_out_count)
1146    }
1147
1148    // ------------------------------------------------------------------
1149    // Generic column view helper
1150    // ------------------------------------------------------------------
1151
1152    /// Reinterpret a `CudaColumn` as a typed `RawCudaView<T>` for kernel access.
1153    ///
1154    /// This is the generic equivalent of `column_as_u32_view`, `column_as_f64_view`, etc.
1155    fn column_as_typed_view<'a, T: GpuScalar>(
1156        col: &'a CudaColumn,
1157        num_elements: usize,
1158    ) -> Result<RawCudaView<'a, T>> {
1159        let required_bytes = num_elements * T::BYTE_WIDTH;
1160        if col.num_bytes() < required_bytes {
1161            return Err(XlogError::Kernel(format!(
1162                "Column has {} bytes but {} required for {} elements of size {}",
1163                col.num_bytes(),
1164                required_bytes,
1165                num_elements,
1166                T::BYTE_WIDTH,
1167            )));
1168        }
1169        let ptr = *col.device_ptr();
1170        if T::BYTE_WIDTH > 1 && !(ptr as usize).is_multiple_of(T::BYTE_WIDTH) {
1171            return Err(XlogError::Kernel(format!(
1172                "Column device pointer is not {}-byte aligned",
1173                T::BYTE_WIDTH,
1174            )));
1175        }
1176        Ok(RawCudaView {
1177            ptr,
1178            len: num_elements,
1179            stream: col.stream().clone(),
1180            source_block: col.runtime_block(),
1181            _marker: PhantomData,
1182        })
1183    }
1184
1185    // ------------------------------------------------------------------
1186    // Host-mask prefix sum + filter/compact infrastructure
1187    // (moved from mod.rs)
1188    // ------------------------------------------------------------------
1189
1190    pub fn prefix_sum_mask(&self, mask: &[u8]) -> Result<(Vec<u32>, u32)> {
1191        if mask.is_empty() {
1192            return Ok((vec![], 0));
1193        }
1194
1195        let n = mask.len();
1196        if n > u32::MAX as usize {
1197            return Err(XlogError::Kernel(format!(
1198                "Mask length {} exceeds u32::MAX",
1199                n
1200            )));
1201        }
1202
1203        // For small inputs, use CPU scan (faster than kernel launch overhead)
1204        if n <= 256 {
1205            return self.prefix_sum_mask_cpu(mask);
1206        }
1207
1208        // For larger inputs, use multi-block GPU scan
1209        self.prefix_sum_mask_gpu_multiblock(mask)
1210    }
1211
1212    /// CPU implementation for small prefix sums (avoids kernel launch overhead)
1213    fn prefix_sum_mask_cpu(&self, mask: &[u8]) -> Result<(Vec<u32>, u32)> {
1214        let mut prefix_sum = Vec::with_capacity(mask.len());
1215        let mut sum = 0u32;
1216        for &m in mask {
1217            prefix_sum.push(sum);
1218            sum += m as u32;
1219        }
1220        Ok((prefix_sum, sum))
1221    }
1222
1223    /// Multi-block GPU implementation for large prefix sums
1224    /// Uses three-phase algorithm:
1225    /// 1. Each block computes local exclusive scan and outputs block total
1226    /// 2. Scan the block totals to get block offsets
1227    /// 3. Add block offsets to each element
1228    fn prefix_sum_mask_gpu_multiblock(&self, mask: &[u8]) -> Result<(Vec<u32>, u32)> {
1229        let n = mask.len();
1230        let device = self.device.inner();
1231        let block_size = 256u32;
1232        let num_blocks = (n as u32).div_ceil(block_size);
1233
1234        // Upload mask to GPU
1235        let d_mask = self
1236            .htod_sync_copy_tracked(mask)
1237            .map_err(|e| XlogError::Kernel(format!("Failed to upload mask: {}", e)))?;
1238
1239        // Allocate output for prefix sum (using memory manager for budget enforcement)
1240        let d_prefix_sum = self.memory.alloc::<u32>(n)?;
1241
1242        // Allocate block sums array (using memory manager for budget enforcement)
1243        let mut d_block_sums = self.memory.alloc::<u32>(num_blocks as usize)?;
1244
1245        // Phase 1: Block-level exclusive scans + collect block totals
1246        let phase1_fn = device
1247            .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE1)
1248            .ok_or_else(|| {
1249                XlogError::Kernel("Failed to get multiblock_scan_phase1 kernel".to_string())
1250            })?;
1251
1252        // SAFETY: Kernel parameters match expected signature:
1253        // multiblock_scan_phase1(const uint8_t* mask, uint32_t* prefix_sum, uint32_t* block_sums, uint32_t n)
1254        // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
1255        unsafe {
1256            phase1_fn.clone().launch(
1257                LaunchConfig {
1258                    grid_dim: (num_blocks, 1, 1),
1259                    block_dim: (block_size, 1, 1),
1260                    shared_mem_bytes: 0,
1261                },
1262                (&d_mask, &d_prefix_sum, &d_block_sums, n as u32),
1263            )
1264        }
1265        .map_err(|e| {
1266            XlogError::Kernel(format!("Failed to launch multiblock_scan_phase1: {}", e))
1267        })?;
1268
1269        // Phase 2: Scan block sums (only if we have more than 1 block)
1270        if num_blocks > 1 {
1271            self.multiblock_scan_u32_inplace(&mut d_block_sums, num_blocks)?;
1272
1273            // Phase 3: Add block offsets
1274            let phase3_fn = device
1275                .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE3)
1276                .ok_or_else(|| {
1277                    XlogError::Kernel("Failed to get multiblock_scan_phase3 kernel".to_string())
1278                })?;
1279
1280            // SAFETY: Kernel parameters match expected signature:
1281            // multiblock_scan_phase3(uint32_t* prefix_sum, const uint32_t* block_offsets, uint32_t n)
1282            // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
1283            unsafe {
1284                phase3_fn.clone().launch(
1285                    LaunchConfig {
1286                        grid_dim: (num_blocks, 1, 1),
1287                        block_dim: (block_size, 1, 1),
1288                        shared_mem_bytes: 0,
1289                    },
1290                    (&d_prefix_sum, &d_block_sums, n as u32),
1291                )
1292            }
1293            .map_err(|e| {
1294                XlogError::Kernel(format!("Failed to launch multiblock_scan_phase3: {}", e))
1295            })?;
1296        }
1297
1298        // Synchronize and download results
1299        self.device.synchronize()?;
1300
1301        let prefix_sum = device
1302            .dtoh_sync_copy(&d_prefix_sum)
1303            .map_err(|e| XlogError::Kernel(format!("Failed to download prefix_sum: {}", e)))?;
1304
1305        // Compute count from the last prefix sum value + last mask value
1306        let count = prefix_sum[n - 1] + mask[n - 1] as u32;
1307
1308        Ok((prefix_sum, count))
1309    }
1310
1311    /// Filter buffer by pre-computed mask.
1312    ///
1313    /// # Arguments
1314    /// * `input` - The input buffer to filter
1315    /// * `mask` - Mask slice where non-zero means keep the row
1316    ///
1317    /// # Errors
1318    /// Returns error if mask length doesn't match buffer rows.
1319    pub fn filter_by_mask(&self, input: &CudaBuffer, mask: &[u8]) -> Result<CudaBuffer> {
1320        if input.num_rows() == 0 {
1321            return self.create_empty_buffer(input.schema.clone());
1322        }
1323
1324        let n = input.num_rows() as usize;
1325        if mask.len() != n {
1326            return Err(XlogError::Kernel(format!(
1327                "Mask length {} doesn't match buffer rows {}",
1328                mask.len(),
1329                n
1330            )));
1331        }
1332
1333        // Compute prefix sum and count
1334        let (prefix_sum, count) = self.prefix_sum_mask(mask)?;
1335
1336        if count == 0 {
1337            return self.create_empty_buffer(input.schema.clone());
1338        }
1339
1340        // Compact all columns using mask
1341        self.compact_buffer_by_mask(input, mask, &prefix_sum, count as u64)
1342    }
1343
1344    /// Compact buffer columns using mask and prefix sum indices
1345    fn compact_buffer_by_mask(
1346        &self,
1347        input: &CudaBuffer,
1348        mask: &[u8],
1349        prefix_sum: &[u32],
1350        output_count: u64,
1351    ) -> Result<CudaBuffer> {
1352        // Upload mask and prefix sum to GPU
1353        let d_mask = self
1354            .htod_sync_copy_tracked(mask)
1355            .map_err(|e| XlogError::Kernel(format!("Failed to upload mask: {}", e)))?;
1356        let d_prefix_sum = self
1357            .htod_sync_copy_tracked(prefix_sum)
1358            .map_err(|e| XlogError::Kernel(format!("Failed to upload prefix_sum: {}", e)))?;
1359
1360        self.compact_buffer_by_device_mask(input, &d_mask, &d_prefix_sum, output_count)
1361    }
1362
1363    /// Compact a buffer using a device-resident mask.
1364    ///
1365    /// Computes prefix sum and output count fully on-device.
1366    /// Strict-recorder variant of
1367    /// [`Self::compact_buffer_by_device_mask_counted`] — the
1368    /// first migrated COMPACT path.
1369    ///
1370    /// The compact pipeline is a multi-kernel chain:
1371    /// `mask_clamp_rows` → `multiblock_scan_phase1` →
1372    /// `multiblock_scan_u32_inplace_on_stream` (recursive,
1373    /// only when `num_blocks > 1`) → `multiblock_scan_phase3` →
1374    /// `capture_compact_count` → host scalar read of
1375    /// `d_out_count` → per-column `compact_bytes_by_mask`.
1376    /// **Every kernel runs on the same explicit `launch_stream`
1377    /// via `launch_on_stream`**, and the host scalar read at
1378    /// the chain's middle is explicitly ordered by
1379    /// `cu_stream.synchronize()` — non-blocking streams do
1380    /// NOT get default-stream implicit ordering.
1381    ///
1382    /// # Strict-mode contract
1383    /// * Requires the provider's manager to be built via
1384    ///   [`crate::GpuMemoryManager::with_runtime`]; otherwise
1385    ///   returns `XlogError::Kernel` before any allocation.
1386    /// * `d_mask` is recorded as a read.
1387    /// * `input.num_rows_device()` is recorded as a read.
1388    /// * Each `input.column(i)` is recorded as a read; external
1389    ///   columns on any side are rejected at preflight, before
1390    ///   any CUDA work is enqueued.
1391    /// * Every fresh runtime-backed allocation that this
1392    ///   function makes (`d_mask_clamped`, `d_prefix_sum`,
1393    ///   `d_block_sums`, `d_out_count`, each `dst_col`) is
1394    ///   recorded via `write` BEFORE the
1395    ///   kernel chain enqueues. Locals that drop at end-of-scope
1396    ///   (`d_mask_clamped`, `d_prefix_sum`, `d_block_sums`)
1397    ///   stay safe because the runtime's deallocate queues
1398    ///   `cuStreamWaitEvent(alloc_stream, recorded_event)`
1399    ///   BEFORE `cuMemFreeAsync`, gating the free on the
1400    ///   launch_stream chain.
1401    /// * Intermediate `block_sums` allocations created by the
1402    ///   recursive scan helper are recorded directly inside the
1403    ///   helper (they don't outlive the helper call).
1404    ///
1405    /// # Errors
1406    ///   * `XlogError::Kernel` if the manager has no runtime,
1407    ///     or if `launch_stream` does not resolve.
1408    ///   * `XlogError::Kernel` from preflight (external column
1409    ///     on any side, unsupported active resource).
1410    ///   * `XlogError::Kernel` from any underlying CUDA launch
1411    ///     or from the launch_stream synchronize before the
1412    ///     host scalar read.
1413    ///   * `XlogError::Kernel` from commit on transient
1414    ///     `record_block_use` failure.
1415    pub fn compact_buffer_by_device_mask_counted_recorded(
1416        &self,
1417        input: &CudaBuffer,
1418        d_mask: &TrackedCudaSlice<u8>,
1419        launch_stream: StreamId,
1420    ) -> Result<CudaBuffer> {
1421        let runtime = self.memory.runtime().ok_or_else(|| {
1422            XlogError::Kernel(
1423                "compact_buffer_by_device_mask_counted_recorded requires a \
1424                 runtime-backed GpuMemoryManager (constructed via with_runtime)"
1425                    .to_string(),
1426            )
1427        })?;
1428        let pool = runtime.stream_pool();
1429        let cu_stream = pool.resolve(launch_stream).ok_or_else(|| {
1430            XlogError::Kernel(format!(
1431                "compact_buffer_by_device_mask_counted_recorded: launch_stream \
1432                 StreamId({}) does not resolve",
1433                launch_stream.0
1434            ))
1435        })?;
1436
1437        let n = input.num_rows() as u32;
1438        if n == 0 {
1439            return self.create_empty_buffer(input.schema.clone());
1440        }
1441        // The bound `d_mask.len() >= n (row_cap)` is too
1442        // strict — `mask_clamp_rows` only reads `d_mask[i]`
1443        // for `i < num_rows_device` (logical count). Real
1444        // callers (hash_join_semi/anti recorded) pass masks
1445        // sized to logical count which can be < row_cap. We
1446        // require only that the mask is non-empty; OOB reads
1447        // are bounded by the kernel's own check against the
1448        // device-resident logical row count.
1449        if d_mask.is_empty() {
1450            return Err(XlogError::Kernel(
1451                "compact_buffer_by_device_mask_counted_recorded: empty d_mask".to_string(),
1452            ));
1453        }
1454
1455        let device = self.device.inner();
1456        let block_size = 256u32;
1457        let num_blocks = n.div_ceil(block_size);
1458        let row_cap = u64::from(n);
1459
1460        // Allocate ALL fresh runtime-backed buffers up front,
1461        // BEFORE the recorder is constructed. The recorder
1462        // snapshots each block's identity at record time and
1463        // drops the slice borrow, so the buffers can be
1464        // mutably borrowed by kernel launches after preflight.
1465        // Output column sizes are known up front from
1466        // `row_cap = n`, so this
1467        // is sound — the host scalar read of `d_out_count` only
1468        // tells us `output_rows`, which we use as metadata, not
1469        // for sizing.
1470        let mut d_mask_clamped = self.memory.alloc::<u8>(n as usize)?;
1471        let d_prefix_sum = self.memory.alloc::<u32>(n as usize)?;
1472        let mut d_block_sums = self.memory.alloc::<u32>(num_blocks as usize)?;
1473        let mut d_out_count = self.memory.alloc::<u32>(1)?;
1474
1475        let mut dst_cols: Vec<TrackedCudaSlice<u8>> = Vec::with_capacity(input.columns.len());
1476        for col_idx in 0..input.columns.len() {
1477            let elem_size = input
1478                .schema
1479                .column_type(col_idx)
1480                .map(|t| t.size_bytes())
1481                .unwrap_or(4);
1482            let output_bytes = (row_cap as usize) * elem_size;
1483            dst_cols.push(self.memory.alloc::<u8>(output_bytes)?);
1484        }
1485
1486        // Build recorder, record reads BEFORE preflight.
1487        let mut rec = LaunchRecorder::new_strict(launch_stream);
1488        rec.read(d_mask);
1489        rec.read(input.num_rows_device());
1490        for col_idx in 0..input.columns.len() {
1491            let src_col = input
1492                .column(col_idx)
1493                .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col_idx)))?;
1494            rec.read_column(src_col);
1495        }
1496        rec.write(&d_mask_clamped);
1497        rec.write(&d_prefix_sum);
1498        rec.write(&d_block_sums);
1499        rec.write(&d_out_count);
1500        for dst_col in &dst_cols {
1501            rec.write(dst_col);
1502        }
1503        rec.preflight(runtime).map_err(|e| {
1504            XlogError::Kernel(format!(
1505                "compact_buffer_by_device_mask_counted_recorded: launch recorder \
1506                 preflight failed: {}",
1507                e
1508            ))
1509        })?;
1510
1511        // Step 1: mask_clamp_rows on launch_stream.
1512        let clamp_fn = device
1513            .get_func(FILTER_MODULE, filter_kernels::MASK_CLAMP_ROWS)
1514            .ok_or_else(|| XlogError::Kernel("mask_clamp_rows kernel not found".to_string()))?;
1515        // SAFETY: mask_clamp_rows(in_mask, num_rows_device, row_cap, out_mask)
1516        unsafe {
1517            clamp_fn.clone().launch_on_stream(
1518                &cu_stream,
1519                LaunchConfig {
1520                    grid_dim: (num_blocks, 1, 1),
1521                    block_dim: (block_size, 1, 1),
1522                    shared_mem_bytes: 0,
1523                },
1524                (d_mask, input.num_rows_device(), n, &mut d_mask_clamped),
1525            )
1526        }
1527        .map_err(|e| XlogError::Kernel(format!("mask_clamp_rows (on_stream) failed: {}", e)))?;
1528
1529        // Step 2: multiblock_scan_phase1 on launch_stream.
1530        let phase1_fn = device
1531            .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE1)
1532            .ok_or_else(|| {
1533                XlogError::Kernel("Failed to get multiblock_scan_phase1 kernel".to_string())
1534            })?;
1535        // SAFETY: multiblock_scan_phase1(const u8 mask, u32 prefix_sum, u32 block_sums, u32 n)
1536        unsafe {
1537            phase1_fn.clone().launch_on_stream(
1538                &cu_stream,
1539                LaunchConfig {
1540                    grid_dim: (num_blocks, 1, 1),
1541                    block_dim: (block_size, 1, 1),
1542                    shared_mem_bytes: 0,
1543                },
1544                (&d_mask_clamped, &d_prefix_sum, &d_block_sums, n),
1545            )
1546        }
1547        .map_err(|e| {
1548            XlogError::Kernel(format!("multiblock_scan_phase1 (on_stream) failed: {}", e))
1549        })?;
1550
1551        // Step 3: scan inplace on block_sums + phase3 propagate
1552        // (only when there is more than one block).
1553        if num_blocks > 1 {
1554            self.multiblock_scan_u32_inplace_on_stream(
1555                &mut d_block_sums,
1556                num_blocks,
1557                &cu_stream,
1558                launch_stream,
1559                runtime,
1560            )?;
1561
1562            let phase3_fn = device
1563                .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE3)
1564                .ok_or_else(|| {
1565                    XlogError::Kernel("Failed to get multiblock_scan_phase3 kernel".to_string())
1566                })?;
1567            // SAFETY: multiblock_scan_phase3(prefix_sum, block_offsets, n)
1568            unsafe {
1569                phase3_fn.clone().launch_on_stream(
1570                    &cu_stream,
1571                    LaunchConfig {
1572                        grid_dim: (num_blocks, 1, 1),
1573                        block_dim: (block_size, 1, 1),
1574                        shared_mem_bytes: 0,
1575                    },
1576                    (&d_prefix_sum, &d_block_sums, n),
1577                )
1578            }
1579            .map_err(|e| {
1580                XlogError::Kernel(format!("multiblock_scan_phase3 (on_stream) failed: {}", e))
1581            })?;
1582        }
1583
1584        // Step 4: capture_compact_count on launch_stream.
1585        // (`d_out_count` was pre-allocated up front; see header.)
1586        let capture_fn = device
1587            .get_func(FILTER_MODULE, filter_kernels::CAPTURE_COMPACT_COUNT)
1588            .ok_or_else(|| {
1589                XlogError::Kernel("capture_compact_count kernel not found".to_string())
1590            })?;
1591        // SAFETY: capture_compact_count(prefix_sum, mask, n, out_count)
1592        //
1593        // Use the clamped mask, not the caller's original mask.
1594        // Some recorded callers provide a mask sized to the
1595        // device-resident logical row count while `n` is the
1596        // buffer row capacity. `mask_clamp_rows` expanded that
1597        // shorter domain into a row-capacity-sized mask with
1598        // slack rows forced to zero; every downstream consumer
1599        // in this compaction chain must use that expanded mask.
1600        unsafe {
1601            capture_fn.clone().launch_on_stream(
1602                &cu_stream,
1603                LaunchConfig {
1604                    grid_dim: (1, 1, 1),
1605                    block_dim: (1, 1, 1),
1606                    shared_mem_bytes: 0,
1607                },
1608                (&d_prefix_sum, &d_mask_clamped, n, &mut d_out_count),
1609            )
1610        }
1611        .map_err(|e| {
1612            XlogError::Kernel(format!("capture_compact_count (on_stream) failed: {}", e))
1613        })?;
1614
1615        // Explicit ordering for the host scalar read of
1616        // `d_out_count`. `dtoh_scalar_untracked` routes its
1617        // copy through the device's default cudarc stream,
1618        // which does NOT get implicit synchronization with the
1619        // non-blocking `launch_stream`. Without this barrier
1620        // we would race the still-pending capture kernel.
1621        cu_stream.synchronize().map_err(|e| {
1622            XlogError::Kernel(format!(
1623                "compact_buffer_by_device_mask_counted_recorded: launch_stream \
1624                 synchronize before host scalar read failed: {}",
1625                e
1626            ))
1627        })?;
1628
1629        let output_rows = self.dtoh_scalar_untracked(&d_out_count, 0)? as u64;
1630
1631        // Step 5: per-column compact_bytes_by_mask on
1632        // launch_stream. Only run when output_rows > 0; an
1633        // empty mask still allocates row_cap-sized columns
1634        // (matching legacy) but skips the kernel.
1635        if output_rows > 0 {
1636            let compact_fn = device
1637                .get_func(FILTER_MODULE, filter_kernels::COMPACT_BYTES_BY_MASK)
1638                .ok_or_else(|| {
1639                    XlogError::Kernel("compact_bytes_by_mask kernel not found".to_string())
1640                })?;
1641            let grid_size = n.div_ceil(block_size);
1642            let cfg = LaunchConfig {
1643                grid_dim: (grid_size, 1, 1),
1644                block_dim: (block_size, 1, 1),
1645                shared_mem_bytes: 0,
1646            };
1647            for (col_idx, dst_col) in dst_cols.iter().enumerate() {
1648                let src_col = input
1649                    .column(col_idx)
1650                    .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col_idx)))?;
1651                let elem_size = input
1652                    .schema
1653                    .column_type(col_idx)
1654                    .map(|t| t.size_bytes())
1655                    .unwrap_or(4) as u32;
1656                // SAFETY: compact_bytes_by_mask(input, mask, prefix_sum, n, elem_size, output)
1657                //
1658                // Same domain rule as capture_compact_count:
1659                // compact over the row-capacity-sized clamped
1660                // mask so rows >= logical_count are never
1661                // materialized from valid-looking slack.
1662                unsafe {
1663                    compact_fn.clone().launch_on_stream(
1664                        &cu_stream,
1665                        cfg,
1666                        (
1667                            src_col,
1668                            &d_mask_clamped,
1669                            &d_prefix_sum,
1670                            n,
1671                            elem_size,
1672                            dst_col,
1673                        ),
1674                    )
1675                }
1676                .map_err(|e| {
1677                    XlogError::Kernel(format!("compact_bytes_by_mask (on_stream) failed: {}", e))
1678                })?;
1679            }
1680        }
1681
1682        // Record fresh writes via the post-preflight escape
1683        // hatch. ALL fresh runtime-backed allocations made by
1684        // this function are recorded so that drops at
1685        // end-of-scope (or on the returned buffer's drop) are
1686        // correctly serialized with the launch_stream chain.
1687        rec.commit(runtime).map_err(|e| {
1688            XlogError::Kernel(format!(
1689                "compact_buffer_by_device_mask_counted_recorded: launch recorder \
1690                 commit failed: {}",
1691                e
1692            ))
1693        })?;
1694
1695        let new_columns: Vec<CudaColumn> = dst_cols.into_iter().map(|s| s.into()).collect();
1696        Ok(CudaBuffer::from_columns_with_host_count(
1697            new_columns,
1698            row_cap,
1699            d_out_count,
1700            input.schema.clone(),
1701            output_rows as u32,
1702        ))
1703    }
1704
1705    pub fn compact_buffer_by_device_mask_counted(
1706        &self,
1707        input: &CudaBuffer,
1708        d_mask: &TrackedCudaSlice<u8>,
1709    ) -> Result<CudaBuffer> {
1710        let n = input.num_rows() as u32;
1711        if n == 0 {
1712            return self.create_empty_buffer(input.schema.clone());
1713        }
1714        if n as usize > d_mask.len() {
1715            return Err(XlogError::Kernel(format!(
1716                "compact_buffer_by_device_mask_counted: mask len {} < rows {}",
1717                d_mask.len(),
1718                n
1719            )));
1720        }
1721
1722        let device = self.device.inner();
1723        let block_size = 256u32;
1724        let num_blocks = n.div_ceil(block_size);
1725
1726        let mut d_mask_clamped = self.memory.alloc::<u8>(n as usize)?;
1727        let clamp_fn = device
1728            .get_func(FILTER_MODULE, filter_kernels::MASK_CLAMP_ROWS)
1729            .ok_or_else(|| XlogError::Kernel("mask_clamp_rows kernel not found".to_string()))?;
1730
1731        // SAFETY: mask_clamp_rows(in_mask, num_rows_device, row_cap, out_mask)
1732        unsafe {
1733            clamp_fn.clone().launch(
1734                LaunchConfig {
1735                    grid_dim: (num_blocks, 1, 1),
1736                    block_dim: (block_size, 1, 1),
1737                    shared_mem_bytes: 0,
1738                },
1739                (d_mask, input.num_rows_device(), n, &mut d_mask_clamped),
1740            )
1741        }
1742        .map_err(|e| XlogError::Kernel(format!("mask_clamp_rows failed: {}", e)))?;
1743
1744        let d_prefix_sum = self.memory.alloc::<u32>(n as usize)?;
1745        let mut d_block_sums = self.memory.alloc::<u32>(num_blocks as usize)?;
1746
1747        let phase1_fn = device
1748            .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE1)
1749            .ok_or_else(|| {
1750                XlogError::Kernel("Failed to get multiblock_scan_phase1 kernel".to_string())
1751            })?;
1752
1753        // SAFETY: multiblock_scan_phase1(const uint8_t* mask, uint32_t* prefix_sum, uint32_t* block_sums, uint32_t n)
1754        unsafe {
1755            phase1_fn.clone().launch(
1756                LaunchConfig {
1757                    grid_dim: (num_blocks, 1, 1),
1758                    block_dim: (block_size, 1, 1),
1759                    shared_mem_bytes: 0,
1760                },
1761                (&d_mask_clamped, &d_prefix_sum, &d_block_sums, n),
1762            )
1763        }
1764        .map_err(|e| XlogError::Kernel(format!("multiblock_scan_phase1 failed: {}", e)))?;
1765
1766        if num_blocks > 1 {
1767            self.multiblock_scan_u32_inplace(&mut d_block_sums, num_blocks)?;
1768
1769            let phase3_fn = device
1770                .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE3)
1771                .ok_or_else(|| {
1772                    XlogError::Kernel("Failed to get multiblock_scan_phase3 kernel".to_string())
1773                })?;
1774
1775            // SAFETY: multiblock_scan_phase3(prefix_sum, block_offsets, n)
1776            unsafe {
1777                phase3_fn.clone().launch(
1778                    LaunchConfig {
1779                        grid_dim: (num_blocks, 1, 1),
1780                        block_dim: (block_size, 1, 1),
1781                        shared_mem_bytes: 0,
1782                    },
1783                    (&d_prefix_sum, &d_block_sums, n),
1784                )
1785            }
1786            .map_err(|e| XlogError::Kernel(format!("multiblock_scan_phase3 failed: {}", e)))?;
1787        }
1788
1789        let d_out_count = self.capture_compact_count(&d_prefix_sum, d_mask, n)?;
1790
1791        self.compact_buffer_by_device_mask_device_count(input, d_mask, &d_prefix_sum, d_out_count)
1792    }
1793
1794    pub(crate) fn capture_compact_count(
1795        &self,
1796        d_prefix_sum: &cudarc::driver::CudaSlice<u32>,
1797        d_mask: &cudarc::driver::CudaSlice<u8>,
1798        n: u32,
1799    ) -> Result<TrackedCudaSlice<u32>> {
1800        let mut d_out_count = self.memory.alloc::<u32>(1)?;
1801        let device = self.device.inner();
1802        let capture_fn = device
1803            .get_func(FILTER_MODULE, filter_kernels::CAPTURE_COMPACT_COUNT)
1804            .ok_or_else(|| {
1805                XlogError::Kernel("capture_compact_count kernel not found".to_string())
1806            })?;
1807        // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
1808        unsafe {
1809            capture_fn.clone().launch(
1810                LaunchConfig {
1811                    grid_dim: (1, 1, 1),
1812                    block_dim: (1, 1, 1),
1813                    shared_mem_bytes: 0,
1814                },
1815                (d_prefix_sum, d_mask, n, &mut d_out_count),
1816            )
1817        }
1818        .map_err(|e| XlogError::Kernel(format!("capture_compact_count failed: {}", e)))?;
1819        Ok(d_out_count)
1820    }
1821
1822    pub(crate) fn compact_buffer_by_device_mask_device_count(
1823        &self,
1824        input: &CudaBuffer,
1825        d_mask: &cudarc::driver::CudaSlice<u8>,
1826        d_prefix_sum: &cudarc::driver::CudaSlice<u32>,
1827        d_out_count: TrackedCudaSlice<u32>,
1828    ) -> Result<CudaBuffer> {
1829        let mask_len = u32::try_from(d_mask.len()).map_err(|_| {
1830            XlogError::Kernel(format!(
1831                "compact_buffer_by_device_mask_device_count: mask len {} exceeds u32::MAX",
1832                d_mask.len()
1833            ))
1834        })?;
1835        let prefix_len = u32::try_from(d_prefix_sum.len()).map_err(|_| {
1836            XlogError::Kernel(format!(
1837                "compact_buffer_by_device_mask_device_count: prefix sum len {} exceeds u32::MAX",
1838                d_prefix_sum.len()
1839            ))
1840        })?;
1841        if prefix_len < mask_len {
1842            return Err(XlogError::Kernel(format!(
1843                "compact_buffer_by_device_mask_device_count: prefix sum len {} < mask len {}",
1844                prefix_len, mask_len
1845            )));
1846        }
1847        if mask_len as u64 > input.num_rows() {
1848            return Err(XlogError::Kernel(format!(
1849                "compact_buffer_by_device_mask_device_count: mask len {} > row cap {}",
1850                mask_len,
1851                input.num_rows()
1852            )));
1853        }
1854        if mask_len == 0 {
1855            return self.create_empty_buffer(input.schema.clone());
1856        }
1857        let n = mask_len;
1858        let device = self.device.inner();
1859
1860        let compact_fn = device
1861            .get_func(FILTER_MODULE, filter_kernels::COMPACT_BYTES_BY_MASK)
1862            .ok_or_else(|| {
1863                XlogError::Kernel("compact_bytes_by_mask kernel not found".to_string())
1864            })?;
1865
1866        let block_size = 256u32;
1867        let grid_size = n.div_ceil(block_size);
1868        let config = LaunchConfig {
1869            grid_dim: (grid_size, 1, 1),
1870            block_dim: (block_size, 1, 1),
1871            shared_mem_bytes: 0,
1872        };
1873
1874        // The compact kernel only writes `output_rows` elements, but downstream
1875        // metadata must still remember the logical span covered by the mask.
1876        // This keeps host-visible `row_cap` aligned with the masked prefix while
1877        // the device-side row count tracks the actual compacted cardinality.
1878        let output_rows = self.dtoh_scalar_untracked(&d_out_count, 0)? as u64;
1879        let row_cap = u64::from(n);
1880
1881        if output_rows == 0 {
1882            let mut new_columns = Vec::with_capacity(input.columns.len());
1883            for col_idx in 0..input.columns.len() {
1884                let elem_size = input
1885                    .schema
1886                    .column_type(col_idx)
1887                    .map(|t| t.size_bytes())
1888                    .unwrap_or(4);
1889                let output_bytes = (row_cap as usize) * elem_size;
1890                new_columns.push(self.memory.alloc::<u8>(output_bytes)?.into());
1891            }
1892            let d_zero_rows = self.upload_device_row_count(0)?;
1893            return Ok(CudaBuffer::from_columns_with_host_count(
1894                new_columns,
1895                row_cap,
1896                d_zero_rows,
1897                input.schema.clone(),
1898                0,
1899            ));
1900        }
1901
1902        let mut new_columns = Vec::with_capacity(input.columns.len());
1903        for col_idx in 0..input.columns.len() {
1904            let src_col = input
1905                .column(col_idx)
1906                .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col_idx)))?;
1907
1908            let elem_size = input
1909                .schema
1910                .column_type(col_idx)
1911                .map(|t| t.size_bytes())
1912                .unwrap_or(4) as u32;
1913
1914            let output_bytes = (row_cap as usize) * (elem_size as usize);
1915            let dst_col = self.memory.alloc::<u8>(output_bytes)?;
1916
1917            // SAFETY: Kernel signature matches:
1918            // compact_bytes_by_mask(input, mask, prefix_sum, num_rows, elem_size, output)
1919            // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
1920            unsafe {
1921                compact_fn.clone().launch(
1922                    config,
1923                    (src_col, d_mask, d_prefix_sum, n, elem_size, &dst_col),
1924                )
1925            }
1926            .map_err(|e| XlogError::Kernel(format!("compact_bytes_by_mask failed: {}", e)))?;
1927
1928            new_columns.push(dst_col.into());
1929        }
1930
1931        self.device.synchronize()?;
1932
1933        Ok(CudaBuffer::from_columns_with_host_count(
1934            new_columns,
1935            row_cap,
1936            d_out_count,
1937            input.schema.clone(),
1938            output_rows as u32,
1939        ))
1940    }
1941
1942    fn compact_buffer_by_device_mask(
1943        &self,
1944        input: &CudaBuffer,
1945        d_mask: &cudarc::driver::CudaSlice<u8>,
1946        d_prefix_sum: &cudarc::driver::CudaSlice<u32>,
1947        output_count: u64,
1948    ) -> Result<CudaBuffer> {
1949        let n = input.num_rows() as u32;
1950        let device = self.device.inner();
1951
1952        // Get compact kernel
1953        let compact_fn = device
1954            .get_func(FILTER_MODULE, filter_kernels::COMPACT_BYTES_BY_MASK)
1955            .ok_or_else(|| {
1956                XlogError::Kernel("compact_bytes_by_mask kernel not found".to_string())
1957            })?;
1958
1959        let block_size = 256u32;
1960        let grid_size = n.div_ceil(block_size);
1961        let config = LaunchConfig {
1962            grid_dim: (grid_size, 1, 1),
1963            block_dim: (block_size, 1, 1),
1964            shared_mem_bytes: 0,
1965        };
1966
1967        // Compact each column
1968        let mut new_columns = Vec::with_capacity(input.columns.len());
1969        for col_idx in 0..input.columns.len() {
1970            let src_col = input
1971                .column(col_idx)
1972                .ok_or_else(|| XlogError::Kernel(format!("Column {} not found", col_idx)))?;
1973
1974            let elem_size = input
1975                .schema
1976                .column_type(col_idx)
1977                .map(|t| t.size_bytes())
1978                .unwrap_or(4) as u32;
1979
1980            let output_bytes = (output_count as usize) * (elem_size as usize);
1981            let dst_col = self.memory.alloc::<u8>(output_bytes)?;
1982
1983            // SAFETY: Kernel signature matches:
1984            // compact_bytes_by_mask(input, mask, prefix_sum, num_rows, elem_size, output)
1985            // SAFETY: kernel arguments match the PTX signature; device buffers were allocated with sufficient size
1986            unsafe {
1987                compact_fn.clone().launch(
1988                    config,
1989                    (src_col, d_mask, d_prefix_sum, n, elem_size, &dst_col),
1990                )
1991            }
1992            .map_err(|e| XlogError::Kernel(format!("compact_bytes_by_mask failed: {}", e)))?;
1993
1994            new_columns.push(dst_col.into());
1995        }
1996
1997        self.device.synchronize()?;
1998
1999        self.buffer_from_columns(new_columns, output_count, input.schema.clone())
2000    }
2001
2002    pub fn filter_by_device_mask(
2003        &self,
2004        input: &CudaBuffer,
2005        d_mask: &cudarc::driver::CudaSlice<u8>,
2006    ) -> Result<CudaBuffer> {
2007        if input.is_empty() {
2008            return self.create_empty_buffer(input.schema().clone());
2009        }
2010
2011        if input.num_rows() > u32::MAX as u64 {
2012            return Err(XlogError::Kernel(format!(
2013                "Device-mask filtering supports at most {} rows, got {}",
2014                u32::MAX,
2015                input.num_rows()
2016            )));
2017        }
2018
2019        let n = input.num_rows() as u32;
2020        let device = self.device.inner();
2021
2022        let block_size = 256u32;
2023        let num_blocks = n.div_ceil(block_size);
2024
2025        let mut d_mask_clamped = self.memory.alloc::<u8>(n as usize)?;
2026        let clamp_fn = device
2027            .get_func(FILTER_MODULE, filter_kernels::MASK_CLAMP_ROWS)
2028            .ok_or_else(|| XlogError::Kernel("mask_clamp_rows kernel not found".to_string()))?;
2029
2030        // SAFETY: mask_clamp_rows(in_mask, num_rows_device, row_cap, out_mask)
2031        unsafe {
2032            clamp_fn.clone().launch(
2033                LaunchConfig {
2034                    grid_dim: (num_blocks, 1, 1),
2035                    block_dim: (block_size, 1, 1),
2036                    shared_mem_bytes: 0,
2037                },
2038                (d_mask, input.num_rows_device(), n, &mut d_mask_clamped),
2039            )
2040        }
2041        .map_err(|e| XlogError::Kernel(format!("mask_clamp_rows failed: {}", e)))?;
2042
2043        let d_prefix_sum = self.memory.alloc::<u32>(n as usize)?;
2044        let mut d_block_sums = self.memory.alloc::<u32>(num_blocks as usize)?;
2045
2046        let phase1_fn = device
2047            .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE1)
2048            .ok_or_else(|| {
2049                XlogError::Kernel("Failed to get multiblock_scan_phase1 kernel".to_string())
2050            })?;
2051
2052        // SAFETY: multiblock_scan_phase1(const uint8_t* mask, uint32_t* prefix_sum, uint32_t* block_sums, uint32_t n)
2053        unsafe {
2054            phase1_fn.clone().launch(
2055                LaunchConfig {
2056                    grid_dim: (num_blocks, 1, 1),
2057                    block_dim: (block_size, 1, 1),
2058                    shared_mem_bytes: 0,
2059                },
2060                (&d_mask_clamped, &d_prefix_sum, &d_block_sums, n),
2061            )
2062        }
2063        .map_err(|e| XlogError::Kernel(format!("multiblock_scan_phase1 failed: {}", e)))?;
2064
2065        if num_blocks > 1 {
2066            self.multiblock_scan_u32_inplace(&mut d_block_sums, num_blocks)?;
2067
2068            let phase3_fn = device
2069                .get_func(SCAN_MODULE, scan_kernels::MULTIBLOCK_SCAN_PHASE3)
2070                .ok_or_else(|| {
2071                    XlogError::Kernel("Failed to get multiblock_scan_phase3 kernel".to_string())
2072                })?;
2073
2074            // SAFETY: multiblock_scan_phase3(uint32_t* prefix_sum, const uint32_t* block_offsets, uint32_t n)
2075            unsafe {
2076                phase3_fn.clone().launch(
2077                    LaunchConfig {
2078                        grid_dim: (num_blocks, 1, 1),
2079                        block_dim: (block_size, 1, 1),
2080                        shared_mem_bytes: 0,
2081                    },
2082                    (&d_prefix_sum, &d_block_sums, n),
2083                )
2084            }
2085            .map_err(|e| XlogError::Kernel(format!("multiblock_scan_phase3 failed: {}", e)))?;
2086        }
2087
2088        self.device.synchronize()?;
2089
2090        let d_out_count = self.capture_compact_count(&d_prefix_sum, &d_mask_clamped, n)?;
2091        self.compact_buffer_by_device_mask_device_count(
2092            input,
2093            &d_mask_clamped,
2094            &d_prefix_sum,
2095            d_out_count,
2096        )
2097    }
2098}