Skip to main content

xlog_cuda/provider/
wcoj_metadata.rs

1use std::collections::BTreeMap;
2use std::ffi::c_void;
3
4use cudarc::driver::sys;
5use xlog_core::{AggOp, Result, ScalarType, Schema, XlogError};
6
7use super::{arith_kernels, wcoj_kernels, CudaKernelProvider, ARITH_MODULE, WCOJ_MODULE};
8use crate::device_runtime::StreamId;
9use crate::launch::LaunchRecorder;
10use crate::memory::{CudaColumn, TrackedCudaSlice};
11use crate::wcoj_metadata::{
12    Wcoj4CycleRootAggValue, WcojCycle4HgWorkPlanU32, WcojCycle4HgWorkPlanU64, WcojRelationMetadata,
13    WcojRootAggValue, WcojTriangleHgCountPhaseU32, WcojTriangleHgWorkPlanU32,
14    WcojTriangleHgWorkPlanU64,
15};
16use crate::{AsKernelParam, CudaBuffer, LaunchAsync, LaunchConfig};
17
18const BLOCK_SIZE: u32 = 256;
19const HG_COUNT_BLOCK_SIZE: u32 = 512;
20
21impl CudaKernelProvider {
22    pub fn wcoj_build_metadata_u32_recorded(
23        &self,
24        input: &CudaBuffer,
25        key_col_idx: usize,
26        launch_stream: StreamId,
27    ) -> Result<WcojRelationMetadata<u32>> {
28        self.validate_metadata_column(input, key_col_idx, MetadataWidth::U32)?;
29        let keys = metadata_column_u32(input, key_col_idx)?;
30        self.build_metadata_u32_from_column(input, key_col_idx, keys, launch_stream)
31    }
32
33    pub fn wcoj_build_metadata_u64_recorded(
34        &self,
35        input: &CudaBuffer,
36        key_col_idx: usize,
37        launch_stream: StreamId,
38    ) -> Result<WcojRelationMetadata<u64>> {
39        self.validate_metadata_column(input, key_col_idx, MetadataWidth::U64)?;
40        let keys = metadata_column_u64(input, key_col_idx)?;
41        self.build_metadata_u64_from_column(input, key_col_idx, keys, launch_stream)
42    }
43
44    pub fn wcoj_triangle_hg_work_plan_u32_recorded(
45        &self,
46        e_xy: &CudaBuffer,
47        e_yz: &CudaBuffer,
48        e_xz: &CudaBuffer,
49        block_work_unit: u32,
50        launch_stream: StreamId,
51    ) -> Result<WcojTriangleHgWorkPlanU32> {
52        let ctx = "wcoj_triangle_hg_work_plan_u32_recorded";
53        if block_work_unit == 0 {
54            return Err(XlogError::Kernel(format!(
55                "{ctx}: block_work_unit must be nonzero"
56            )));
57        }
58        validate_binary_u32(ctx, "e_xy", e_xy)?;
59        validate_binary_u32(ctx, "e_yz", e_yz)?;
60        validate_binary_u32(ctx, "e_xz", e_xz)?;
61
62        let n_xy = self.metadata_logical_rows(e_xy)?;
63        let n_yz = self.metadata_logical_rows(e_yz)?;
64        let n_xz = self.metadata_logical_rows(e_xz)?;
65        let prefix_len = n_xy
66            .checked_add(1)
67            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
68        let mut xy_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
69        let mut xy_yz_start = self.memory().alloc::<u32>(n_xy as usize)?;
70        let mut xy_yz_end = self.memory().alloc::<u32>(n_xy as usize)?;
71        let mut xy_xz_start = self.memory().alloc::<u32>(n_xy as usize)?;
72        let mut xy_xz_end = self.memory().alloc::<u32>(n_xy as usize)?;
73
74        if n_xy == 0 {
75            let block_counts = self.memory().alloc::<u32>(1)?;
76            let block_offsets = self.memory().alloc::<u32>(1)?;
77            let scratch_x = self.memory().alloc::<u32>(1)?;
78            let scratch_y = self.memory().alloc::<u32>(1)?;
79            let scratch_z = self.memory().alloc::<u32>(1)?;
80            return Ok(WcojTriangleHgWorkPlanU32 {
81                xy_work_prefix,
82                xy_yz_start,
83                xy_yz_end,
84                xy_xz_start,
85                xy_xz_end,
86                block_counts,
87                block_offsets,
88                scratch_x,
89                scratch_y,
90                scratch_z,
91                total_work: 0,
92                block_work_unit,
93                row_count: 0,
94            });
95        }
96
97        let runtime = self.memory().runtime().ok_or_else(|| {
98            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
99        })?;
100        let cu_stream = runtime
101            .stream_pool()
102            .resolve(launch_stream)
103            .ok_or_else(|| {
104                XlogError::Kernel(format!(
105                    "{ctx}: launch_stream StreamId({}) does not resolve",
106                    launch_stream.0
107                ))
108            })?;
109
110        let xy_col0 = metadata_column_u32(e_xy, 0)?;
111        let xy_col1 = metadata_column_u32(e_xy, 1)?;
112        let yz_col0 = metadata_column_u32(e_yz, 0)?;
113        let xz_col0 = metadata_column_u32(e_xz, 0)?;
114
115        let mut rec = LaunchRecorder::new_strict(launch_stream);
116        rec.read(e_xy.num_rows_device());
117        rec.read(e_yz.num_rows_device());
118        rec.read(e_xz.num_rows_device());
119        rec.read_column(e_xy.column(0).expect("xy.col0"));
120        rec.read_column(e_xy.column(1).expect("xy.col1"));
121        rec.read_column(e_yz.column(0).expect("yz.col0"));
122        rec.read_column(e_xz.column(0).expect("xz.col0"));
123        rec.write(&xy_work_prefix);
124        rec.write(&xy_yz_start);
125        rec.write(&xy_yz_end);
126        rec.write(&xy_xz_start);
127        rec.write(&xy_xz_end);
128        rec.preflight(runtime)
129            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
130
131        let kernel = self
132            .device()
133            .inner()
134            .get_func(
135                WCOJ_MODULE,
136                wcoj_kernels::WCOJ_TRIANGLE_BUILD_HG_WORK_PLAN_U32,
137            )
138            .ok_or_else(|| {
139                XlogError::Kernel(
140                    "wcoj_triangle_build_hg_work_plan_u32 kernel not found".to_string(),
141                )
142            })?;
143        let grid = n_xy.div_ceil(BLOCK_SIZE);
144        unsafe {
145            kernel
146                .clone()
147                .launch_on_stream(
148                    &cu_stream,
149                    LaunchConfig {
150                        grid_dim: (grid, 1, 1),
151                        block_dim: (BLOCK_SIZE, 1, 1),
152                        shared_mem_bytes: 0,
153                    },
154                    (
155                        xy_col0,
156                        xy_col1,
157                        n_xy,
158                        yz_col0,
159                        n_yz,
160                        xz_col0,
161                        n_xz,
162                        &mut xy_work_prefix,
163                        &mut xy_yz_start,
164                        &mut xy_yz_end,
165                        &mut xy_xz_start,
166                        &mut xy_xz_end,
167                    ),
168                )
169                .map_err(|e| {
170                    XlogError::Kernel(format!(
171                        "wcoj_triangle_build_hg_work_plan_u32 launch failed: {e}"
172                    ))
173                })?;
174        }
175        self.multiblock_scan_u32_inplace_on_stream(
176            &mut xy_work_prefix,
177            prefix_len,
178            &cu_stream,
179            launch_stream,
180            runtime,
181        )?;
182        rec.commit(runtime)
183            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
184        cu_stream
185            .synchronize()
186            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
187        let total_work = self.dtoh_scalar_untracked::<u32>(&xy_work_prefix, n_xy as usize)?;
188        let scratch_slots = if total_work == 0 {
189            1usize
190        } else {
191            let grid = total_work.div_ceil(block_work_unit);
192            (grid as usize)
193                .checked_mul(block_work_unit as usize)
194                .ok_or_else(|| XlogError::Kernel(format!("{ctx}: scratch slot overflow")))?
195        };
196        let scratch_x = self.memory().alloc::<u32>(scratch_slots)?;
197        let scratch_y = self.memory().alloc::<u32>(scratch_slots)?;
198        let scratch_z = self.memory().alloc::<u32>(scratch_slots)?;
199        let grid = if total_work == 0 {
200            1
201        } else {
202            total_work.div_ceil(block_work_unit)
203        };
204        let block_counts = self.memory().alloc::<u32>(grid as usize)?;
205        let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
206
207        Ok(WcojTriangleHgWorkPlanU32 {
208            xy_work_prefix,
209            xy_yz_start,
210            xy_yz_end,
211            xy_xz_start,
212            xy_xz_end,
213            block_counts,
214            block_offsets,
215            scratch_x,
216            scratch_y,
217            scratch_z,
218            total_work,
219            block_work_unit,
220            row_count: n_xy,
221        })
222    }
223
224    pub fn wcoj_triangle_count_hg_u32_recorded(
225        &self,
226        e_yz: &CudaBuffer,
227        e_xz: &CudaBuffer,
228        plan: &WcojTriangleHgWorkPlanU32,
229        launch_stream: StreamId,
230    ) -> Result<CudaBuffer> {
231        let ctx = "wcoj_triangle_count_hg_u32_recorded";
232        validate_binary_u32(ctx, "e_yz", e_yz)?;
233        validate_binary_u32(ctx, "e_xz", e_xz)?;
234        let n_yz = self.metadata_logical_rows(e_yz)?;
235        let n_xz = self.metadata_logical_rows(e_xz)?;
236        let grid = if plan.total_work == 0 {
237            1
238        } else {
239            plan.total_work.div_ceil(plan.block_work_unit)
240        };
241        let bytes_count = (grid as usize)
242            .checked_mul(std::mem::size_of::<u32>())
243            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
244        let mut count_bytes = self.memory().alloc::<u8>(bytes_count)?;
245        let d_num_rows = self.memory().alloc::<u32>(1)?;
246
247        let runtime = self.memory().runtime().ok_or_else(|| {
248            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
249        })?;
250        let cu_stream = runtime
251            .stream_pool()
252            .resolve(launch_stream)
253            .ok_or_else(|| {
254                XlogError::Kernel(format!(
255                    "{ctx}: launch_stream StreamId({}) does not resolve",
256                    launch_stream.0
257                ))
258            })?;
259        let yz_col1 = metadata_column_u32(e_yz, 1)?;
260        let xz_col1 = metadata_column_u32(e_xz, 1)?;
261
262        let mut rec = LaunchRecorder::new_strict(launch_stream);
263        rec.read(e_yz.num_rows_device());
264        rec.read(e_xz.num_rows_device());
265        rec.read_column(e_yz.column(1).expect("yz.col1"));
266        rec.read_column(e_xz.column(1).expect("xz.col1"));
267        rec.read(&plan.xy_work_prefix);
268        rec.read(&plan.xy_yz_start);
269        rec.read(&plan.xy_yz_end);
270        rec.read(&plan.xy_xz_start);
271        rec.read(&plan.xy_xz_end);
272        rec.write(&count_bytes);
273        rec.write(&d_num_rows);
274        rec.preflight(runtime)
275            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
276
277        self.htod_launch_metadata_async_copy_one(
278            &grid,
279            &d_num_rows,
280            &cu_stream,
281            &format!("{ctx}: d_num_rows"),
282        )?;
283
284        let kernel = self
285            .device()
286            .inner()
287            .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_U32)
288            .ok_or_else(|| {
289                XlogError::Kernel("wcoj_triangle_count_hg_u32 kernel not found".to_string())
290            })?;
291        let count_u32 = unsafe { reinterpret_u8_as_u32(&mut count_bytes) };
292        let mut params: Vec<*mut c_void> = vec![
293            yz_col1.as_kernel_param(),
294            n_yz.as_kernel_param(),
295            xz_col1.as_kernel_param(),
296            n_xz.as_kernel_param(),
297            (&plan.xy_work_prefix).as_kernel_param(),
298            (&plan.xy_yz_start).as_kernel_param(),
299            (&plan.xy_yz_end).as_kernel_param(),
300            (&plan.xy_xz_start).as_kernel_param(),
301            (&plan.xy_xz_end).as_kernel_param(),
302            plan.row_count.as_kernel_param(),
303            plan.total_work.as_kernel_param(),
304            plan.block_work_unit.as_kernel_param(),
305            count_u32.as_kernel_param(),
306        ];
307        unsafe {
308            kernel
309                .clone()
310                .launch_on_stream(
311                    &cu_stream,
312                    LaunchConfig {
313                        grid_dim: (grid, 1, 1),
314                        block_dim: (BLOCK_SIZE, 1, 1),
315                        shared_mem_bytes: 0,
316                    },
317                    &mut params,
318                )
319                .map_err(|e| XlogError::Kernel(format!("{ctx}: launch failed: {e}")))?;
320        }
321        rec.commit(runtime)
322            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
323        cu_stream
324            .synchronize()
325            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
326
327        let schema = Schema::new(vec![("count".to_string(), ScalarType::U32)]);
328        Ok(CudaBuffer::from_columns_with_host_count(
329            vec![count_bytes.into()],
330            grid as u64,
331            d_num_rows,
332            schema,
333            grid,
334        ))
335    }
336
337    pub fn wcoj_triangle_hg_u32_recorded(
338        &self,
339        e_xy: &CudaBuffer,
340        e_yz: &CudaBuffer,
341        e_xz: &CudaBuffer,
342        block_work_unit: u32,
343        launch_stream: StreamId,
344    ) -> Result<CudaBuffer> {
345        let ctx = "wcoj_triangle_hg_u32_recorded";
346        validate_binary_u32(ctx, "e_xy", e_xy)?;
347        validate_binary_u32(ctx, "e_yz", e_yz)?;
348        validate_binary_u32(ctx, "e_xz", e_xz)?;
349        let plan = self.wcoj_triangle_hg_work_plan_u32_recorded(
350            e_xy,
351            e_yz,
352            e_xz,
353            block_work_unit,
354            launch_stream,
355        )?;
356        self.wcoj_triangle_hg_u32_with_plan_recorded(e_xy, e_yz, e_xz, &plan, launch_stream)
357    }
358
359    /// Aggregate-fused triangle group-by-root count: evaluate
360    /// `q(X, count) :- e_xy(X,Y), e_yz(Y,Z), e_xz(X,Z)` grouped by the
361    /// variable-order root X, WITHOUT materializing the triangle rows.
362    ///
363    /// Pipeline (all recorded; the triangle result never exists as rows):
364    /// 1. the standard histogram-guided work plan;
365    /// 2. `wcoj_triangle_groupby_root_count_hg_u32` accumulates per-e_xy-row
366    ///    match counts (integer atomicAdd — order-insensitive, deterministic
367    ///    values) into a zero-initialized `n_xy`-long array;
368    /// 3. a 2-column (X, count) staging buffer over the *input* rows is
369    ///    compacted to count>0 rows (group-by over the join result must not
370    ///    emit roots with no completion) and reduced per X via the recorded
371    ///    groupby Sum (rows are already X-sorted because e_xy is lex-sorted).
372    ///
373    /// All reduction work is O(n_xy) — input-sized, never join-output-sized.
374    ///
375    /// Output schema matches the unfused materialize+groupby-count baseline:
376    /// `col0` = X (e_xy.col0 type, U32/Symbol), `col1` = count (U64).
377    ///
378    /// # Errors
379    /// * `XlogError::Kernel` if the manager has no runtime, the launch
380    ///   stream does not resolve, an input is not 2-column U32/Symbol, or
381    ///   any kernel launch fails.
382    pub fn wcoj_triangle_groupby_root_count_u32_recorded(
383        &self,
384        e_xy: &CudaBuffer,
385        e_yz: &CudaBuffer,
386        e_xz: &CudaBuffer,
387        block_work_unit: u32,
388        launch_stream: StreamId,
389    ) -> Result<CudaBuffer> {
390        let ctx = "wcoj_triangle_groupby_root_count_u32_recorded";
391        // Layout-normalize per dispatch (sorted-fast-path clone when the
392        // input is already lex-sorted + unique): the fused path must give
393        // the same guarantee as the unfused pipeline instead of trusting
394        // store-buffer sortedness — unsorted/duplicated inputs previously
395        // produced silently wrong (empty) fused results.
396        let e_xy = &self.wcoj_layout_u32_recorded(e_xy, launch_stream)?;
397        let e_yz = &self.wcoj_layout_u32_recorded(e_yz, launch_stream)?;
398        let e_xz = &self.wcoj_layout_u32_recorded(e_xz, launch_stream)?;
399        validate_binary_u32(ctx, "e_xy", e_xy)?;
400        validate_binary_u32(ctx, "e_yz", e_yz)?;
401        validate_binary_u32(ctx, "e_xz", e_xz)?;
402        let plan = self.wcoj_triangle_hg_work_plan_u32_recorded(
403            e_xy,
404            e_yz,
405            e_xz,
406            block_work_unit,
407            launch_stream,
408        )?;
409        let n_xy = plan.row_count;
410        let x_type = e_xy.schema().column_type(0).expect("xy.col0 type");
411        let out_schema = Schema::new(vec![
412            ("x".to_string(), x_type),
413            ("count".to_string(), ScalarType::U64),
414        ]);
415        if n_xy == 0 || plan.total_work == 0 {
416            return self.create_empty_buffer(out_schema);
417        }
418
419        let runtime = self.memory().runtime().ok_or_else(|| {
420            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
421        })?;
422        let cu_stream = runtime
423            .stream_pool()
424            .resolve(launch_stream)
425            .ok_or_else(|| {
426                XlogError::Kernel(format!(
427                    "{ctx}: launch_stream StreamId({}) does not resolve",
428                    launch_stream.0
429                ))
430            })?;
431
432        let yz_col1 = metadata_column_u32(e_yz, 1)?;
433        let xz_col1 = metadata_column_u32(e_xz, 1)?;
434        let n_yz = self.metadata_logical_rows(e_yz)?;
435        let n_xz = self.metadata_logical_rows(e_xz)?;
436
437        // Per-e_xy-row match counters, zero-initialized. Allocated as the
438        // u8-backed column layout so the array doubles as the staging
439        // buffer's count column after the kernel fills it.
440        let mut row_counts = self
441            .memory()
442            .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
443        self.device()
444            .inner()
445            .memset_zeros(&mut row_counts)
446            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
447
448        let grid = plan.total_work.div_ceil(plan.block_work_unit);
449        let mut rec = LaunchRecorder::new_strict(launch_stream);
450        rec.read(e_xy.num_rows_device());
451        rec.read(e_yz.num_rows_device());
452        rec.read(e_xz.num_rows_device());
453        rec.read_column(e_yz.column(1).expect("yz.col1"));
454        rec.read_column(e_xz.column(1).expect("xz.col1"));
455        rec.read(&plan.xy_work_prefix);
456        rec.read(&plan.xy_yz_start);
457        rec.read(&plan.xy_yz_end);
458        rec.read(&plan.xy_xz_start);
459        rec.read(&plan.xy_xz_end);
460        rec.write(&row_counts);
461        rec.preflight(runtime)
462            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
463        {
464            let kernel = self
465                .device()
466                .inner()
467                .get_func(
468                    WCOJ_MODULE,
469                    wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_COUNT_HG_U32,
470                )
471                .ok_or_else(|| {
472                    XlogError::Kernel(
473                        "wcoj_triangle_groupby_root_count_hg_u32 kernel not found".to_string(),
474                    )
475                })?;
476            let mut params: Vec<*mut c_void> = vec![
477                yz_col1.as_kernel_param(),
478                n_yz.as_kernel_param(),
479                xz_col1.as_kernel_param(),
480                n_xz.as_kernel_param(),
481                (&plan.xy_work_prefix).as_kernel_param(),
482                (&plan.xy_yz_start).as_kernel_param(),
483                (&plan.xy_yz_end).as_kernel_param(),
484                (&plan.xy_xz_start).as_kernel_param(),
485                (&plan.xy_xz_end).as_kernel_param(),
486                plan.row_count.as_kernel_param(),
487                plan.total_work.as_kernel_param(),
488                plan.block_work_unit.as_kernel_param(),
489                (&row_counts).as_kernel_param(),
490            ];
491            unsafe {
492                kernel
493                    .clone()
494                    .launch_on_stream(
495                        &cu_stream,
496                        LaunchConfig {
497                            grid_dim: (grid, 1, 1),
498                            block_dim: (BLOCK_SIZE, 1, 1),
499                            shared_mem_bytes: 0,
500                        },
501                        &mut params,
502                    )
503                    .map_err(|e| {
504                        XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
505                    })?;
506            }
507        }
508        rec.commit(runtime)
509            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
510
511        // Staging buffer (X, count) over the n_xy input rows: X is a
512        // device-to-device copy of e_xy.col0; the count column is the
513        // kernel-filled array. Rows stay lex-sorted by X.
514        let x_src = match e_xy.column(0).expect("xy.col0") {
515            CudaColumn::Owned(slice) => slice,
516            _ => {
517                return Err(XlogError::Kernel(format!(
518                    "{ctx}: e_xy.col0 must be an owned CudaColumn"
519                )))
520            }
521        };
522        let x_copy = self
523            .memory()
524            .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
525        // Explicit-length copy: layout-normalized columns are allocated at
526        // capacity, which can exceed the logical n_xy * 4 bytes a full-slice
527        // typed copy would assert on.
528        unsafe {
529            let res = sys::cuMemcpyDtoD_v2(
530                *x_copy.device_ptr(),
531                *x_src.device_ptr(),
532                n_xy as usize * std::mem::size_of::<u32>(),
533            );
534            if res != sys::cudaError_enum::CUDA_SUCCESS {
535                return Err(XlogError::Kernel(format!(
536                    "{ctx}: copy X column failed: {res:?}"
537                )));
538            }
539        }
540        let mut d_num_rows = self.memory().alloc::<u32>(1)?;
541        self.device()
542            .inner()
543            .dtod_copy(e_xy.num_rows_device(), &mut d_num_rows)
544            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
545        let staging_schema = Schema::new(vec![
546            ("x".to_string(), x_type),
547            ("count".to_string(), ScalarType::U32),
548        ]);
549        let staging = CudaBuffer::from_columns_with_host_count(
550            vec![x_copy.into(), row_counts.into()],
551            n_xy as u64,
552            d_num_rows,
553            staging_schema,
554            n_xy,
555        );
556
557        // Keep only roots with at least one completed triangle, then reduce
558        // per X. Both steps run over input-sized data.
559        let mask = self.compare_const_mask_recorded::<u32>(
560            &staging,
561            1,
562            0u32,
563            crate::CompareOp::Gt,
564            launch_stream,
565        )?;
566        let compacted =
567            self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
568        self.groupby_multi_agg_recorded(
569            &compacted,
570            &[0],
571            &[(1, xlog_core::AggOp::Sum)],
572            launch_stream,
573        )
574    }
575
576    /// Aggregate-fused triangle group-by-root sum/min/max: evaluate
577    /// `q(X, agg(V)) :- e_xy(X,Y), e_yz(Y,Z), e_xz(X,Z)` with
578    /// `agg ∈ {Sum, Min, Max}` and `V ∈ {Y, Z}` grouped by the
579    /// variable-order root X, WITHOUT materializing the triangle rows.
580    ///
581    /// Pipeline (all recorded; the triangle result never exists as rows):
582    /// 1. the standard histogram-guided work plan;
583    /// 2. the per-op fused kernel accumulates, per e_xy row, a match count
584    ///    (compaction mask) and the per-row partial aggregate (integer
585    ///    atomics — order-insensitive, deterministic values). Sum partials
586    ///    are u64 (a per-row partial can exceed `u32::MAX`); min partials
587    ///    start at `u32::MAX`, max partials at 0;
588    /// 3. a 3-column (X, count, agg) staging buffer over the *input* rows
589    ///    is compacted to count>0 rows (groups with no completion must be
590    ///    absent) and reduced per X via the recorded groupby with the same
591    ///    `AggOp` (Sum over the u64 partials; Min/Max over u32).
592    ///
593    /// All reduction work is O(n_xy) — input-sized, never join-output-sized.
594    ///
595    /// Output schema matches the unfused materialize+groupby baseline:
596    /// `col0` = X (e_xy.col0 type, U32/Symbol), `col1` = U64 for Sum,
597    /// U32 for Min/Max.
598    ///
599    /// Bag semantics: every (Y, Z) completion contributes its value,
600    /// exactly like aggregating the materialized projection.
601    ///
602    /// # Errors
603    /// * `XlogError::Kernel` if `agg_op` is not Sum/Min/Max, the value
604    ///   columns are not plain U32, the manager has no runtime, the launch
605    ///   stream does not resolve, an input is not 2-column U32/Symbol, or
606    ///   any kernel launch fails.
607    pub fn wcoj_triangle_groupby_root_agg_u32_recorded(
608        &self,
609        e_xy: &CudaBuffer,
610        e_yz: &CudaBuffer,
611        e_xz: &CudaBuffer,
612        agg_op: AggOp,
613        value: WcojRootAggValue,
614        block_work_unit: u32,
615        launch_stream: StreamId,
616    ) -> Result<CudaBuffer> {
617        let ctx = "wcoj_triangle_groupby_root_agg_u32_recorded";
618        // Layout-normalize per dispatch (sorted-fast-path clone when the
619        // input is already lex-sorted + unique): the fused path must give
620        // the same guarantee as the unfused pipeline instead of trusting
621        // store-buffer sortedness — unsorted/duplicated inputs previously
622        // produced silently wrong (empty) fused results.
623        let e_xy = &self.wcoj_layout_u32_recorded(e_xy, launch_stream)?;
624        let e_yz = &self.wcoj_layout_u32_recorded(e_yz, launch_stream)?;
625        let e_xz = &self.wcoj_layout_u32_recorded(e_xz, launch_stream)?;
626        let (kernel_name, agg_elem_size, agg_scalar, agg_name) = match agg_op {
627            AggOp::Sum => (
628                wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_SUM_HG_U32,
629                std::mem::size_of::<u64>(),
630                ScalarType::U64,
631                "sum_0",
632            ),
633            AggOp::Min => (
634                wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MIN_HG_U32,
635                std::mem::size_of::<u32>(),
636                ScalarType::U32,
637                "min_0",
638            ),
639            AggOp::Max => (
640                wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MAX_HG_U32,
641                std::mem::size_of::<u32>(),
642                ScalarType::U32,
643                "max_0",
644            ),
645            other => {
646                return Err(XlogError::Kernel(format!(
647                    "{ctx}: unsupported AggOp {other:?} (Sum/Min/Max only; use \
648                     wcoj_triangle_groupby_root_count_u32_recorded for Count)"
649                )))
650            }
651        };
652        validate_binary_u32(ctx, "e_xy", e_xy)?;
653        validate_binary_u32(ctx, "e_yz", e_yz)?;
654        validate_binary_u32(ctx, "e_xz", e_xz)?;
655        // The aggregate value is arithmetic: require plain U32 value
656        // columns (Symbol ids are not summable/orderable data).
657        let value_cols: &[(&CudaBuffer, &str)] = match value {
658            WcojRootAggValue::Y => &[(e_xy, "e_xy")],
659            WcojRootAggValue::Z => &[(e_yz, "e_yz"), (e_xz, "e_xz")],
660        };
661        for (buf, label) in value_cols {
662            let ty = buf.schema().column_type(1).expect("validated 2-col");
663            if ty != ScalarType::U32 {
664                return Err(XlogError::Kernel(format!(
665                    "{ctx}: {label}.col1 supplies the aggregate value and must be U32, got {ty:?}"
666                )));
667            }
668        }
669
670        let plan = self.wcoj_triangle_hg_work_plan_u32_recorded(
671            e_xy,
672            e_yz,
673            e_xz,
674            block_work_unit,
675            launch_stream,
676        )?;
677        let n_xy = plan.row_count;
678        let x_type = e_xy.schema().column_type(0).expect("xy.col0 type");
679        let out_schema = Schema::new(vec![
680            ("x".to_string(), x_type),
681            (agg_name.to_string(), agg_scalar),
682        ]);
683        if n_xy == 0 || plan.total_work == 0 {
684            return self.create_empty_buffer(out_schema);
685        }
686
687        let runtime = self.memory().runtime().ok_or_else(|| {
688            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
689        })?;
690        let cu_stream = runtime
691            .stream_pool()
692            .resolve(launch_stream)
693            .ok_or_else(|| {
694                XlogError::Kernel(format!(
695                    "{ctx}: launch_stream StreamId({}) does not resolve",
696                    launch_stream.0
697                ))
698            })?;
699
700        let yz_col1 = metadata_column_u32(e_yz, 1)?;
701        let xz_col1 = metadata_column_u32(e_xz, 1)?;
702        let xy_col1 = metadata_column_u32(e_xy, 1)?;
703        let n_yz = self.metadata_logical_rows(e_yz)?;
704        let n_xz = self.metadata_logical_rows(e_xz)?;
705        let value_from_z: u32 = match value {
706            WcojRootAggValue::Y => 0,
707            WcojRootAggValue::Z => 1,
708        };
709
710        // Per-e_xy-row match counters + aggregate partials, allocated as
711        // the u8-backed column layout so the arrays double as the staging
712        // buffer's columns after the kernel fills them.
713        let mut row_counts = self
714            .memory()
715            .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
716        self.device()
717            .inner()
718            .memset_zeros(&mut row_counts)
719            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
720        let mut row_agg = self.memory().alloc::<u8>(n_xy as usize * agg_elem_size)?;
721        self.device()
722            .inner()
723            .memset_zeros(&mut row_agg)
724            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row aggregates failed: {e}")))?;
725
726        let grid = plan.total_work.div_ceil(plan.block_work_unit);
727        let mut rec = LaunchRecorder::new_strict(launch_stream);
728        rec.read(e_xy.num_rows_device());
729        rec.read(e_yz.num_rows_device());
730        rec.read(e_xz.num_rows_device());
731        rec.read_column(e_yz.column(1).expect("yz.col1"));
732        rec.read_column(e_xz.column(1).expect("xz.col1"));
733        rec.read_column(e_xy.column(1).expect("xy.col1"));
734        rec.read(&plan.xy_work_prefix);
735        rec.read(&plan.xy_yz_start);
736        rec.read(&plan.xy_yz_end);
737        rec.read(&plan.xy_xz_start);
738        rec.read(&plan.xy_xz_end);
739        rec.write(&row_counts);
740        rec.write(&row_agg);
741        rec.preflight(runtime)
742            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
743        if matches!(agg_op, AggOp::Min) {
744            // Min identity: u32::MAX (compaction drops untouched rows).
745            let fill = self
746                .device()
747                .inner()
748                .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U32)
749                .ok_or_else(|| {
750                    XlogError::Kernel("arith_fill_const_u32 kernel not found".to_string())
751                })?;
752            let row_agg_u32 = unsafe { reinterpret_u8_as_u32(&mut row_agg) };
753            // SAFETY: arith_fill_const_u32(value, n, output)
754            unsafe {
755                fill.clone()
756                    .launch_on_stream(
757                        &cu_stream,
758                        LaunchConfig::for_num_elems(n_xy),
759                        (u32::MAX, n_xy, &mut *row_agg_u32),
760                    )
761                    .map_err(|e| {
762                        XlogError::Kernel(format!("{ctx}: min identity fill failed: {e}"))
763                    })?;
764            }
765        }
766        {
767            let kernel = self
768                .device()
769                .inner()
770                .get_func(WCOJ_MODULE, kernel_name)
771                .ok_or_else(|| XlogError::Kernel(format!("{kernel_name} kernel not found")))?;
772            let mut params: Vec<*mut c_void> = vec![
773                yz_col1.as_kernel_param(),
774                n_yz.as_kernel_param(),
775                xz_col1.as_kernel_param(),
776                n_xz.as_kernel_param(),
777                xy_col1.as_kernel_param(),
778                value_from_z.as_kernel_param(),
779                (&plan.xy_work_prefix).as_kernel_param(),
780                (&plan.xy_yz_start).as_kernel_param(),
781                (&plan.xy_yz_end).as_kernel_param(),
782                (&plan.xy_xz_start).as_kernel_param(),
783                (&plan.xy_xz_end).as_kernel_param(),
784                plan.row_count.as_kernel_param(),
785                plan.total_work.as_kernel_param(),
786                plan.block_work_unit.as_kernel_param(),
787                (&row_counts).as_kernel_param(),
788                (&row_agg).as_kernel_param(),
789            ];
790            unsafe {
791                kernel
792                    .clone()
793                    .launch_on_stream(
794                        &cu_stream,
795                        LaunchConfig {
796                            grid_dim: (grid, 1, 1),
797                            block_dim: (BLOCK_SIZE, 1, 1),
798                            shared_mem_bytes: 0,
799                        },
800                        &mut params,
801                    )
802                    .map_err(|e| {
803                        XlogError::Kernel(format!("{ctx}: groupby-agg launch failed: {e}"))
804                    })?;
805            }
806        }
807        rec.commit(runtime)
808            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
809
810        // Staging buffer (X, count, agg) over the n_xy input rows: X is a
811        // device-to-device copy of e_xy.col0; count and agg are the
812        // kernel-filled arrays. Rows stay lex-sorted by X.
813        let x_src = match e_xy.column(0).expect("xy.col0") {
814            CudaColumn::Owned(slice) => slice,
815            _ => {
816                return Err(XlogError::Kernel(format!(
817                    "{ctx}: e_xy.col0 must be an owned CudaColumn"
818                )))
819            }
820        };
821        let x_copy = self
822            .memory()
823            .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
824        // Explicit-length copy: layout-normalized columns are allocated at
825        // capacity, which can exceed the logical n_xy * 4 bytes a full-slice
826        // typed copy would assert on.
827        unsafe {
828            let res = sys::cuMemcpyDtoD_v2(
829                *x_copy.device_ptr(),
830                *x_src.device_ptr(),
831                n_xy as usize * std::mem::size_of::<u32>(),
832            );
833            if res != sys::cudaError_enum::CUDA_SUCCESS {
834                return Err(XlogError::Kernel(format!(
835                    "{ctx}: copy X column failed: {res:?}"
836                )));
837            }
838        }
839        let mut d_num_rows = self.memory().alloc::<u32>(1)?;
840        self.device()
841            .inner()
842            .dtod_copy(e_xy.num_rows_device(), &mut d_num_rows)
843            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
844        let staging_schema = Schema::new(vec![
845            ("x".to_string(), x_type),
846            ("count".to_string(), ScalarType::U32),
847            ("agg".to_string(), agg_scalar),
848        ]);
849        let staging = CudaBuffer::from_columns_with_host_count(
850            vec![x_copy.into(), row_counts.into(), row_agg.into()],
851            n_xy as u64,
852            d_num_rows,
853            staging_schema,
854            n_xy,
855        );
856
857        // Keep only roots with at least one completed triangle, then reduce
858        // per X with the same AggOp. Both steps run over input-sized data.
859        let mask = self.compare_const_mask_recorded::<u32>(
860            &staging,
861            1,
862            0u32,
863            crate::CompareOp::Gt,
864            launch_stream,
865        )?;
866        let compacted =
867            self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
868        self.groupby_multi_agg_recorded(&compacted, &[0], &[(2, agg_op)], launch_stream)
869    }
870
871    /// U64-key aggregate-fused triangle count sibling of
872    /// [`Self::wcoj_triangle_groupby_root_count_u32_recorded`]: evaluate
873    /// `q(X, count)` over the triangle shape grouped by the root X for U64
874    /// relations, WITHOUT materializing the triangle rows.
875    ///
876    /// The recorded groupby is U32/Symbol-key only, so the per-X reduction
877    /// reuses the WCOJ relation metadata instead: e_xy is lex-sorted, so
878    /// `wcoj_build_metadata_u64_recorded` yields one (unique X, group start)
879    /// pair per root, and `wcoj_groupby_root_segment_sum_counts_u32`
880    /// accumulates the per-row match counts into per-unique-root u64
881    /// totals (integer atomicAdd — deterministic). Roots with zero
882    /// completions are compacted away. All reduction work is O(n_xy).
883    ///
884    /// Output schema matches the unfused materialize+groupby baseline:
885    /// `col0` = X (U64), `col1` = count (U64).
886    ///
887    /// # Errors
888    /// * `XlogError::Kernel` if the manager has no runtime, the launch
889    ///   stream does not resolve, an input is not 2-column U64, or any
890    ///   kernel launch fails.
891    pub fn wcoj_triangle_groupby_root_count_u64_recorded(
892        &self,
893        e_xy: &CudaBuffer,
894        e_yz: &CudaBuffer,
895        e_xz: &CudaBuffer,
896        block_work_unit: u32,
897        launch_stream: StreamId,
898    ) -> Result<CudaBuffer> {
899        let ctx = "wcoj_triangle_groupby_root_count_u64_recorded";
900        // Layout-normalize per dispatch (sorted-fast-path clone when the
901        // input is already lex-sorted + unique): the fused path must give
902        // the same guarantee as the unfused pipeline instead of trusting
903        // store-buffer sortedness — unsorted/duplicated inputs previously
904        // produced silently wrong (empty) fused results.
905        let e_xy = &self.wcoj_layout_u64_recorded(e_xy, launch_stream)?;
906        let e_yz = &self.wcoj_layout_u64_recorded(e_yz, launch_stream)?;
907        let e_xz = &self.wcoj_layout_u64_recorded(e_xz, launch_stream)?;
908        validate_binary_u64(ctx, "e_xy", e_xy)?;
909        validate_binary_u64(ctx, "e_yz", e_yz)?;
910        validate_binary_u64(ctx, "e_xz", e_xz)?;
911        let plan = self.wcoj_triangle_hg_work_plan_u64_recorded(
912            e_xy,
913            e_yz,
914            e_xz,
915            block_work_unit,
916            launch_stream,
917        )?;
918        let n_xy = plan.row_count;
919        let out_schema = Schema::new(vec![
920            ("x".to_string(), ScalarType::U64),
921            ("count".to_string(), ScalarType::U64),
922        ]);
923        if n_xy == 0 || plan.total_work == 0 {
924            return self.create_empty_buffer(out_schema);
925        }
926
927        let runtime = self.memory().runtime().ok_or_else(|| {
928            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
929        })?;
930        let cu_stream = runtime
931            .stream_pool()
932            .resolve(launch_stream)
933            .ok_or_else(|| {
934                XlogError::Kernel(format!(
935                    "{ctx}: launch_stream StreamId({}) does not resolve",
936                    launch_stream.0
937                ))
938            })?;
939
940        let yz_col1 = metadata_column_u64(e_yz, 1)?;
941        let xz_col1 = metadata_column_u64(e_xz, 1)?;
942        let n_yz = self.metadata_logical_rows(e_yz)?;
943        let n_xz = self.metadata_logical_rows(e_xz)?;
944
945        // Per-e_xy-row match counters, zero-initialized.
946        let mut row_counts = self.memory().alloc::<u32>(n_xy as usize)?;
947        self.device()
948            .inner()
949            .memset_zeros(&mut row_counts)
950            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
951
952        let grid = plan.total_work.div_ceil(plan.block_work_unit);
953        let mut rec = LaunchRecorder::new_strict(launch_stream);
954        rec.read(e_xy.num_rows_device());
955        rec.read(e_yz.num_rows_device());
956        rec.read(e_xz.num_rows_device());
957        rec.read_column(e_yz.column(1).expect("yz.col1"));
958        rec.read_column(e_xz.column(1).expect("xz.col1"));
959        rec.read(&plan.xy_work_prefix);
960        rec.read(&plan.xy_yz_start);
961        rec.read(&plan.xy_yz_end);
962        rec.read(&plan.xy_xz_start);
963        rec.read(&plan.xy_xz_end);
964        rec.write(&row_counts);
965        rec.preflight(runtime)
966            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
967        {
968            let kernel = self
969                .device()
970                .inner()
971                .get_func(
972                    WCOJ_MODULE,
973                    wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_COUNT_HG_U64,
974                )
975                .ok_or_else(|| {
976                    XlogError::Kernel(
977                        "wcoj_triangle_groupby_root_count_hg_u64 kernel not found".to_string(),
978                    )
979                })?;
980            let mut params: Vec<*mut c_void> = vec![
981                yz_col1.as_kernel_param(),
982                n_yz.as_kernel_param(),
983                xz_col1.as_kernel_param(),
984                n_xz.as_kernel_param(),
985                (&plan.xy_work_prefix).as_kernel_param(),
986                (&plan.xy_yz_start).as_kernel_param(),
987                (&plan.xy_yz_end).as_kernel_param(),
988                (&plan.xy_xz_start).as_kernel_param(),
989                (&plan.xy_xz_end).as_kernel_param(),
990                plan.row_count.as_kernel_param(),
991                plan.total_work.as_kernel_param(),
992                plan.block_work_unit.as_kernel_param(),
993                (&row_counts).as_kernel_param(),
994            ];
995            unsafe {
996                kernel
997                    .clone()
998                    .launch_on_stream(
999                        &cu_stream,
1000                        LaunchConfig {
1001                            grid_dim: (grid, 1, 1),
1002                            block_dim: (BLOCK_SIZE, 1, 1),
1003                            shared_mem_bytes: 0,
1004                        },
1005                        &mut params,
1006                    )
1007                    .map_err(|e| {
1008                        XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
1009                    })?;
1010            }
1011        }
1012        rec.commit(runtime)
1013            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
1014
1015        // Per-X reduction via the relation metadata: one (unique X, group
1016        // start) pair per root; e_xy is lex-sorted by X so group rows are
1017        // contiguous.
1018        let meta = self.wcoj_build_metadata_u64_recorded(e_xy, 0, launch_stream)?;
1019        let key_count = meta.key_count;
1020        if key_count == 0 {
1021            return self.create_empty_buffer(out_schema);
1022        }
1023        let mut sums = self
1024            .memory()
1025            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1026        self.device()
1027            .inner()
1028            .memset_zeros(&mut sums)
1029            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group sums failed: {e}")))?;
1030
1031        let mut rec_sum = LaunchRecorder::new_strict(launch_stream);
1032        rec_sum.read(&row_counts);
1033        rec_sum.read(&meta.prefix_sum);
1034        rec_sum.write(&sums);
1035        rec_sum
1036            .preflight(runtime)
1037            .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce preflight failed: {e}")))?;
1038        {
1039            let kernel = self
1040                .device()
1041                .inner()
1042                .get_func(
1043                    WCOJ_MODULE,
1044                    wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_COUNTS_U32,
1045                )
1046                .ok_or_else(|| {
1047                    XlogError::Kernel(
1048                        "wcoj_groupby_root_segment_sum_counts_u32 kernel not found".to_string(),
1049                    )
1050                })?;
1051            let reduce_grid = n_xy.div_ceil(BLOCK_SIZE);
1052            let mut params: Vec<*mut c_void> = vec![
1053                (&row_counts).as_kernel_param(),
1054                n_xy.as_kernel_param(),
1055                (&meta.prefix_sum).as_kernel_param(),
1056                key_count.as_kernel_param(),
1057                (&sums).as_kernel_param(),
1058            ];
1059            unsafe {
1060                kernel
1061                    .clone()
1062                    .launch_on_stream(
1063                        &cu_stream,
1064                        LaunchConfig {
1065                            grid_dim: (reduce_grid, 1, 1),
1066                            block_dim: (BLOCK_SIZE, 1, 1),
1067                            shared_mem_bytes: 0,
1068                        },
1069                        &mut params,
1070                    )
1071                    .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce launch failed: {e}")))?;
1072            }
1073        }
1074        rec_sum
1075            .commit(runtime)
1076            .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce commit failed: {e}")))?;
1077
1078        // (unique X, total) buffer over the key_count roots, then drop the
1079        // roots with no completion. The copies run on launch_stream and the
1080        // fresh destination blocks are registered through the strict
1081        // recorder BEFORE the enqueue — a raw async copy into a freshly
1082        // pool-allocated block without recording is a visibility race.
1083        let x_copy = self
1084            .memory()
1085            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1086        let d_num_rows = self.memory().alloc::<u32>(1)?;
1087        let mut rec_copy = LaunchRecorder::new_strict(launch_stream);
1088        rec_copy.read(&meta.unique_keys);
1089        rec_copy.write(&x_copy);
1090        rec_copy.write(&d_num_rows);
1091        rec_copy
1092            .preflight(runtime)
1093            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy preflight failed: {e}")))?;
1094        unsafe {
1095            let res = sys::cuMemcpyDtoDAsync_v2(
1096                *x_copy.device_ptr(),
1097                *meta.unique_keys.device_ptr(),
1098                key_count as usize * std::mem::size_of::<u64>(),
1099                cu_stream.cu_stream(),
1100            );
1101            if res != sys::cudaError_enum::CUDA_SUCCESS {
1102                return Err(XlogError::Kernel(format!(
1103                    "{ctx}: DtoD unique keys copy failed: {res:?}"
1104                )));
1105            }
1106        }
1107        self.htod_launch_metadata_async_copy_one(
1108            &key_count,
1109            &d_num_rows,
1110            &cu_stream,
1111            &format!("{ctx}: d_num_rows"),
1112        )?;
1113        rec_copy
1114            .commit(runtime)
1115            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy commit failed: {e}")))?;
1116        cu_stream
1117            .synchronize()
1118            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
1119        let staging_schema = Schema::new(vec![
1120            ("x".to_string(), ScalarType::U64),
1121            ("count".to_string(), ScalarType::U64),
1122        ]);
1123        let staging = CudaBuffer::from_columns_with_host_count(
1124            vec![x_copy.into(), sums.into()],
1125            u64::from(key_count),
1126            d_num_rows,
1127            staging_schema,
1128            key_count,
1129        );
1130        let mask = self.compare_const_mask_recorded::<u64>(
1131            &staging,
1132            1,
1133            0u64,
1134            crate::CompareOp::Gt,
1135            launch_stream,
1136        )?;
1137        self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)
1138    }
1139
1140    /// U64-key aggregate-fused triangle sum/min/max sibling of
1141    /// [`Self::wcoj_triangle_groupby_root_agg_u32_recorded`]: evaluate
1142    /// `q(X, agg(V)) :- e_xy(X,Y), e_yz(Y,Z), e_xz(X,Z)` with
1143    /// `agg ∈ {Sum, Min, Max}` and `V ∈ {Y, Z}` over U64 relations,
1144    /// grouped by the variable-order root X, WITHOUT materializing the
1145    /// triangle rows.
1146    ///
1147    /// The recorded groupby is U32/Symbol-key only, so the per-X reduction
1148    /// reuses the WCOJ relation metadata (one unique root per group, e_xy
1149    /// lex-sorted) like the u64 count path:
1150    /// 1. the per-op fused kernel accumulates, per e_xy row, a match count
1151    ///    and a u64 aggregate partial (integer atomics — deterministic;
1152    ///    sum wraps on overflow exactly like `groupby_sum_u64`; min
1153    ///    partials start at `u64::MAX`, max partials at 0);
1154    /// 2. `wcoj_groupby_root_segment_sum_counts_u32` reduces per-row match
1155    ///    counts to per-unique-root totals (the presence mask), and the
1156    ///    per-op `wcoj_groupby_root_segment_{sum,min,max}_values_u64`
1157    ///    kernel folds the per-row partials into per-unique-root u64
1158    ///    aggregates, skipping zero-match rows;
1159    /// 3. a (X, agg) staging buffer over the unique roots is compacted to
1160    ///    count>0 groups.
1161    ///
1162    /// All reduction work is O(n_xy) — input-sized, never join-output-sized.
1163    ///
1164    /// Output schema matches the unfused materialize+groupby baseline
1165    /// (legacy groupby widened to u64 values): `col0` = X (U64),
1166    /// `col1` = U64 for sum, min and max alike.
1167    ///
1168    /// # Errors
1169    /// * `XlogError::Kernel` if `agg_op` is not Sum/Min/Max, the manager
1170    ///   has no runtime, the launch stream does not resolve, an input is
1171    ///   not 2-column U64, or any kernel launch fails.
1172    pub fn wcoj_triangle_groupby_root_agg_u64_recorded(
1173        &self,
1174        e_xy: &CudaBuffer,
1175        e_yz: &CudaBuffer,
1176        e_xz: &CudaBuffer,
1177        agg_op: AggOp,
1178        value: WcojRootAggValue,
1179        block_work_unit: u32,
1180        launch_stream: StreamId,
1181    ) -> Result<CudaBuffer> {
1182        let ctx = "wcoj_triangle_groupby_root_agg_u64_recorded";
1183        // Layout-normalize per dispatch (sorted-fast-path clone when the
1184        // input is already lex-sorted + unique): the fused path must give
1185        // the same guarantee as the unfused pipeline instead of trusting
1186        // store-buffer sortedness — unsorted/duplicated inputs previously
1187        // produced silently wrong (empty) fused results.
1188        let e_xy = &self.wcoj_layout_u64_recorded(e_xy, launch_stream)?;
1189        let e_yz = &self.wcoj_layout_u64_recorded(e_yz, launch_stream)?;
1190        let e_xz = &self.wcoj_layout_u64_recorded(e_xz, launch_stream)?;
1191        let (kernel_name, segment_kernel_name, agg_name) = match agg_op {
1192            AggOp::Sum => (
1193                wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_SUM_HG_U64,
1194                wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_VALUES_U64,
1195                "sum_0",
1196            ),
1197            AggOp::Min => (
1198                wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MIN_HG_U64,
1199                wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_MIN_VALUES_U64,
1200                "min_0",
1201            ),
1202            AggOp::Max => (
1203                wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MAX_HG_U64,
1204                wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_MAX_VALUES_U64,
1205                "max_0",
1206            ),
1207            other => {
1208                return Err(XlogError::Kernel(format!(
1209                    "{ctx}: unsupported AggOp {other:?} (Sum/Min/Max only; use \
1210                     wcoj_triangle_groupby_root_count_u64_recorded for Count)"
1211                )))
1212            }
1213        };
1214        validate_binary_u64(ctx, "e_xy", e_xy)?;
1215        validate_binary_u64(ctx, "e_yz", e_yz)?;
1216        validate_binary_u64(ctx, "e_xz", e_xz)?;
1217        let plan = self.wcoj_triangle_hg_work_plan_u64_recorded(
1218            e_xy,
1219            e_yz,
1220            e_xz,
1221            block_work_unit,
1222            launch_stream,
1223        )?;
1224        let n_xy = plan.row_count;
1225        let out_schema = Schema::new(vec![
1226            ("x".to_string(), ScalarType::U64),
1227            (agg_name.to_string(), ScalarType::U64),
1228        ]);
1229        if n_xy == 0 || plan.total_work == 0 {
1230            return self.create_empty_buffer(out_schema);
1231        }
1232
1233        let runtime = self.memory().runtime().ok_or_else(|| {
1234            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1235        })?;
1236        let cu_stream = runtime
1237            .stream_pool()
1238            .resolve(launch_stream)
1239            .ok_or_else(|| {
1240                XlogError::Kernel(format!(
1241                    "{ctx}: launch_stream StreamId({}) does not resolve",
1242                    launch_stream.0
1243                ))
1244            })?;
1245
1246        let yz_col1 = metadata_column_u64(e_yz, 1)?;
1247        let xz_col1 = metadata_column_u64(e_xz, 1)?;
1248        let xy_col1 = metadata_column_u64(e_xy, 1)?;
1249        let n_yz = self.metadata_logical_rows(e_yz)?;
1250        let n_xz = self.metadata_logical_rows(e_xz)?;
1251        let value_from_z: u32 = match value {
1252            WcojRootAggValue::Y => 0,
1253            WcojRootAggValue::Z => 1,
1254        };
1255
1256        // Per-e_xy-row match counters + u64 aggregate partials.
1257        let mut row_counts = self.memory().alloc::<u32>(n_xy as usize)?;
1258        self.device()
1259            .inner()
1260            .memset_zeros(&mut row_counts)
1261            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
1262        let mut row_agg = self
1263            .memory()
1264            .alloc::<u8>(n_xy as usize * std::mem::size_of::<u64>())?;
1265        self.device()
1266            .inner()
1267            .memset_zeros(&mut row_agg)
1268            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row aggregates failed: {e}")))?;
1269
1270        let grid = plan.total_work.div_ceil(plan.block_work_unit);
1271        let mut rec = LaunchRecorder::new_strict(launch_stream);
1272        rec.read(e_xy.num_rows_device());
1273        rec.read(e_yz.num_rows_device());
1274        rec.read(e_xz.num_rows_device());
1275        rec.read_column(e_yz.column(1).expect("yz.col1"));
1276        rec.read_column(e_xz.column(1).expect("xz.col1"));
1277        rec.read_column(e_xy.column(1).expect("xy.col1"));
1278        rec.read(&plan.xy_work_prefix);
1279        rec.read(&plan.xy_yz_start);
1280        rec.read(&plan.xy_yz_end);
1281        rec.read(&plan.xy_xz_start);
1282        rec.read(&plan.xy_xz_end);
1283        rec.write(&row_counts);
1284        rec.write(&row_agg);
1285        rec.preflight(runtime)
1286            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
1287        if matches!(agg_op, AggOp::Min) {
1288            // Min identity: u64::MAX (compaction drops untouched groups).
1289            let fill = self
1290                .device()
1291                .inner()
1292                .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U64)
1293                .ok_or_else(|| {
1294                    XlogError::Kernel("arith_fill_const_u64 kernel not found".to_string())
1295                })?;
1296            let row_agg_u64 = unsafe { reinterpret_u8_as_u64(&mut row_agg) };
1297            // SAFETY: arith_fill_const_u64(value, n, output)
1298            unsafe {
1299                fill.clone()
1300                    .launch_on_stream(
1301                        &cu_stream,
1302                        LaunchConfig::for_num_elems(n_xy),
1303                        (u64::MAX, n_xy, &mut *row_agg_u64),
1304                    )
1305                    .map_err(|e| {
1306                        XlogError::Kernel(format!("{ctx}: min identity fill failed: {e}"))
1307                    })?;
1308            }
1309        }
1310        {
1311            let kernel = self
1312                .device()
1313                .inner()
1314                .get_func(WCOJ_MODULE, kernel_name)
1315                .ok_or_else(|| XlogError::Kernel(format!("{kernel_name} kernel not found")))?;
1316            let mut params: Vec<*mut c_void> = vec![
1317                yz_col1.as_kernel_param(),
1318                n_yz.as_kernel_param(),
1319                xz_col1.as_kernel_param(),
1320                n_xz.as_kernel_param(),
1321                xy_col1.as_kernel_param(),
1322                value_from_z.as_kernel_param(),
1323                (&plan.xy_work_prefix).as_kernel_param(),
1324                (&plan.xy_yz_start).as_kernel_param(),
1325                (&plan.xy_yz_end).as_kernel_param(),
1326                (&plan.xy_xz_start).as_kernel_param(),
1327                (&plan.xy_xz_end).as_kernel_param(),
1328                plan.row_count.as_kernel_param(),
1329                plan.total_work.as_kernel_param(),
1330                plan.block_work_unit.as_kernel_param(),
1331                (&row_counts).as_kernel_param(),
1332                (&row_agg).as_kernel_param(),
1333            ];
1334            unsafe {
1335                kernel
1336                    .clone()
1337                    .launch_on_stream(
1338                        &cu_stream,
1339                        LaunchConfig {
1340                            grid_dim: (grid, 1, 1),
1341                            block_dim: (BLOCK_SIZE, 1, 1),
1342                            shared_mem_bytes: 0,
1343                        },
1344                        &mut params,
1345                    )
1346                    .map_err(|e| {
1347                        XlogError::Kernel(format!("{ctx}: groupby-agg launch failed: {e}"))
1348                    })?;
1349            }
1350        }
1351        rec.commit(runtime)
1352            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
1353
1354        // Per-X reduction via the relation metadata: one (unique X, group
1355        // start) pair per root; e_xy is lex-sorted by X so group rows are
1356        // contiguous.
1357        let meta = self.wcoj_build_metadata_u64_recorded(e_xy, 0, launch_stream)?;
1358        let key_count = meta.key_count;
1359        if key_count == 0 {
1360            return self.create_empty_buffer(out_schema);
1361        }
1362        let mut count_sums = self
1363            .memory()
1364            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1365        self.device()
1366            .inner()
1367            .memset_zeros(&mut count_sums)
1368            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group counts failed: {e}")))?;
1369        let mut group_agg = self
1370            .memory()
1371            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1372        self.device()
1373            .inner()
1374            .memset_zeros(&mut group_agg)
1375            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group aggregates failed: {e}")))?;
1376
1377        let mut rec_reduce = LaunchRecorder::new_strict(launch_stream);
1378        rec_reduce.read(&row_counts);
1379        rec_reduce.read(&row_agg);
1380        rec_reduce.read(&meta.prefix_sum);
1381        rec_reduce.write(&count_sums);
1382        rec_reduce.write(&group_agg);
1383        rec_reduce
1384            .preflight(runtime)
1385            .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce preflight failed: {e}")))?;
1386        if matches!(agg_op, AggOp::Min) {
1387            let fill = self
1388                .device()
1389                .inner()
1390                .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U64)
1391                .ok_or_else(|| {
1392                    XlogError::Kernel("arith_fill_const_u64 kernel not found".to_string())
1393                })?;
1394            let group_agg_u64 = unsafe { reinterpret_u8_as_u64(&mut group_agg) };
1395            // SAFETY: arith_fill_const_u64(value, n, output)
1396            unsafe {
1397                fill.clone()
1398                    .launch_on_stream(
1399                        &cu_stream,
1400                        LaunchConfig::for_num_elems(key_count),
1401                        (u64::MAX, key_count, &mut *group_agg_u64),
1402                    )
1403                    .map_err(|e| {
1404                        XlogError::Kernel(format!("{ctx}: group min identity fill failed: {e}"))
1405                    })?;
1406            }
1407        }
1408        let reduce_grid = n_xy.div_ceil(BLOCK_SIZE);
1409        {
1410            let kernel = self
1411                .device()
1412                .inner()
1413                .get_func(
1414                    WCOJ_MODULE,
1415                    wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_COUNTS_U32,
1416                )
1417                .ok_or_else(|| {
1418                    XlogError::Kernel(
1419                        "wcoj_groupby_root_segment_sum_counts_u32 kernel not found".to_string(),
1420                    )
1421                })?;
1422            let mut params: Vec<*mut c_void> = vec![
1423                (&row_counts).as_kernel_param(),
1424                n_xy.as_kernel_param(),
1425                (&meta.prefix_sum).as_kernel_param(),
1426                key_count.as_kernel_param(),
1427                (&count_sums).as_kernel_param(),
1428            ];
1429            unsafe {
1430                kernel
1431                    .clone()
1432                    .launch_on_stream(
1433                        &cu_stream,
1434                        LaunchConfig {
1435                            grid_dim: (reduce_grid, 1, 1),
1436                            block_dim: (BLOCK_SIZE, 1, 1),
1437                            shared_mem_bytes: 0,
1438                        },
1439                        &mut params,
1440                    )
1441                    .map_err(|e| {
1442                        XlogError::Kernel(format!("{ctx}: count reduce launch failed: {e}"))
1443                    })?;
1444            }
1445        }
1446        {
1447            let kernel = self
1448                .device()
1449                .inner()
1450                .get_func(WCOJ_MODULE, segment_kernel_name)
1451                .ok_or_else(|| {
1452                    XlogError::Kernel(format!("{segment_kernel_name} kernel not found"))
1453                })?;
1454            let mut params: Vec<*mut c_void> = vec![
1455                (&row_counts).as_kernel_param(),
1456                (&row_agg).as_kernel_param(),
1457                n_xy.as_kernel_param(),
1458                (&meta.prefix_sum).as_kernel_param(),
1459                key_count.as_kernel_param(),
1460                (&group_agg).as_kernel_param(),
1461            ];
1462            unsafe {
1463                kernel
1464                    .clone()
1465                    .launch_on_stream(
1466                        &cu_stream,
1467                        LaunchConfig {
1468                            grid_dim: (reduce_grid, 1, 1),
1469                            block_dim: (BLOCK_SIZE, 1, 1),
1470                            shared_mem_bytes: 0,
1471                        },
1472                        &mut params,
1473                    )
1474                    .map_err(|e| {
1475                        XlogError::Kernel(format!("{ctx}: agg reduce launch failed: {e}"))
1476                    })?;
1477            }
1478        }
1479        rec_reduce
1480            .commit(runtime)
1481            .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce commit failed: {e}")))?;
1482
1483        // (unique X, agg) staging plus a counts-only buffer whose mask
1484        // drops groups with no completion. Fresh destination blocks are
1485        // registered through the strict recorder BEFORE the enqueue (a raw
1486        // async copy into a freshly pool-allocated block without recording
1487        // is a visibility race).
1488        let x_copy = self
1489            .memory()
1490            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1491        let d_num_rows_agg = self.memory().alloc::<u32>(1)?;
1492        let d_num_rows_counts = self.memory().alloc::<u32>(1)?;
1493        let mut rec_copy = LaunchRecorder::new_strict(launch_stream);
1494        rec_copy.read(&meta.unique_keys);
1495        rec_copy.write(&x_copy);
1496        rec_copy.write(&d_num_rows_agg);
1497        rec_copy.write(&d_num_rows_counts);
1498        rec_copy
1499            .preflight(runtime)
1500            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy preflight failed: {e}")))?;
1501        unsafe {
1502            let res = sys::cuMemcpyDtoDAsync_v2(
1503                *x_copy.device_ptr(),
1504                *meta.unique_keys.device_ptr(),
1505                key_count as usize * std::mem::size_of::<u64>(),
1506                cu_stream.cu_stream(),
1507            );
1508            if res != sys::cudaError_enum::CUDA_SUCCESS {
1509                return Err(XlogError::Kernel(format!(
1510                    "{ctx}: DtoD unique keys copy failed: {res:?}"
1511                )));
1512            }
1513        }
1514        self.htod_launch_metadata_async_copy_one(
1515            &key_count,
1516            &d_num_rows_agg,
1517            &cu_stream,
1518            &format!("{ctx}: d_num_rows_agg"),
1519        )?;
1520        self.htod_launch_metadata_async_copy_one(
1521            &key_count,
1522            &d_num_rows_counts,
1523            &cu_stream,
1524            &format!("{ctx}: d_num_rows_counts"),
1525        )?;
1526        rec_copy
1527            .commit(runtime)
1528            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy commit failed: {e}")))?;
1529        cu_stream
1530            .synchronize()
1531            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
1532
1533        let counts_schema = Schema::new(vec![("count".to_string(), ScalarType::U64)]);
1534        let counts_buf = CudaBuffer::from_columns_with_host_count(
1535            vec![count_sums.into()],
1536            u64::from(key_count),
1537            d_num_rows_counts,
1538            counts_schema,
1539            key_count,
1540        );
1541        let staging = CudaBuffer::from_columns_with_host_count(
1542            vec![x_copy.into(), group_agg.into()],
1543            u64::from(key_count),
1544            d_num_rows_agg,
1545            out_schema,
1546            key_count,
1547        );
1548        let mask = self.compare_const_mask_recorded::<u64>(
1549            &counts_buf,
1550            0,
1551            0u64,
1552            crate::CompareOp::Gt,
1553            launch_stream,
1554        )?;
1555        self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)
1556    }
1557
1558    pub fn wcoj_triangle_hg_count_phase_u32_recorded(
1559        &self,
1560        e_xy: &CudaBuffer,
1561        e_yz: &CudaBuffer,
1562        e_xz: &CudaBuffer,
1563        plan: &WcojTriangleHgWorkPlanU32,
1564        launch_stream: StreamId,
1565    ) -> Result<WcojTriangleHgCountPhaseU32> {
1566        let ctx = "wcoj_triangle_hg_count_phase_u32_recorded";
1567        validate_binary_u32(ctx, "e_xy", e_xy)?;
1568        validate_binary_u32(ctx, "e_yz", e_yz)?;
1569        validate_binary_u32(ctx, "e_xz", e_xz)?;
1570        let grid = plan.total_work.div_ceil(plan.block_work_unit);
1571        if grid > 1024 {
1572            return Err(XlogError::Kernel(format!(
1573                "{ctx}: spike phase path requires grid <= 1024, got {grid}"
1574            )));
1575        }
1576        let total_rows_device = self.memory().alloc::<u32>(1)?;
1577        if plan.total_work == 0 {
1578            return Ok(WcojTriangleHgCountPhaseU32 {
1579                total_rows_device,
1580                total_rows: 0,
1581            });
1582        }
1583
1584        let runtime = self.memory().runtime().ok_or_else(|| {
1585            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1586        })?;
1587        let cu_stream = runtime
1588            .stream_pool()
1589            .resolve(launch_stream)
1590            .ok_or_else(|| {
1591                XlogError::Kernel(format!(
1592                    "{ctx}: launch_stream StreamId({}) does not resolve",
1593                    launch_stream.0
1594                ))
1595            })?;
1596
1597        let yz_col1 = metadata_column_u32(e_yz, 1)?;
1598        let xz_col1 = metadata_column_u32(e_xz, 1)?;
1599        let n_yz = self.metadata_logical_rows(e_yz)?;
1600        let n_xz = self.metadata_logical_rows(e_xz)?;
1601
1602        let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
1603        rec_hg.read(e_xy.num_rows_device());
1604        rec_hg.read(e_yz.num_rows_device());
1605        rec_hg.read(e_xz.num_rows_device());
1606        rec_hg.read_column(e_yz.column(1).expect("yz.col1"));
1607        rec_hg.read_column(e_xz.column(1).expect("xz.col1"));
1608        rec_hg.read(&plan.xy_work_prefix);
1609        rec_hg.read(&plan.xy_yz_start);
1610        rec_hg.read(&plan.xy_yz_end);
1611        rec_hg.read(&plan.xy_xz_start);
1612        rec_hg.read(&plan.xy_xz_end);
1613        rec_hg.read_write(&plan.block_counts);
1614        rec_hg.read_write(&plan.block_offsets);
1615        rec_hg.write(&total_rows_device);
1616        rec_hg
1617            .preflight(runtime)
1618            .map_err(|e| XlogError::Kernel(format!("{ctx}: count preflight failed: {e}")))?;
1619        {
1620            let kernel = self
1621                .device()
1622                .inner()
1623                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_U32)
1624                .ok_or_else(|| {
1625                    XlogError::Kernel("wcoj_triangle_count_hg_u32 kernel not found".to_string())
1626                })?;
1627            let mut params: Vec<*mut c_void> = vec![
1628                yz_col1.as_kernel_param(),
1629                n_yz.as_kernel_param(),
1630                xz_col1.as_kernel_param(),
1631                n_xz.as_kernel_param(),
1632                (&plan.xy_work_prefix).as_kernel_param(),
1633                (&plan.xy_yz_start).as_kernel_param(),
1634                (&plan.xy_yz_end).as_kernel_param(),
1635                (&plan.xy_xz_start).as_kernel_param(),
1636                (&plan.xy_xz_end).as_kernel_param(),
1637                plan.row_count.as_kernel_param(),
1638                plan.total_work.as_kernel_param(),
1639                plan.block_work_unit.as_kernel_param(),
1640                (&plan.block_counts).as_kernel_param(),
1641            ];
1642            unsafe {
1643                kernel
1644                    .clone()
1645                    .launch_on_stream(
1646                        &cu_stream,
1647                        LaunchConfig {
1648                            grid_dim: (grid, 1, 1),
1649                            block_dim: (BLOCK_SIZE, 1, 1),
1650                            shared_mem_bytes: 0,
1651                        },
1652                        &mut params,
1653                    )
1654                    .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
1655            }
1656        }
1657        {
1658            let kernel = self
1659                .device()
1660                .inner()
1661                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
1662                .ok_or_else(|| {
1663                    XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
1664                })?;
1665            let mut params: Vec<*mut c_void> = vec![
1666                (&plan.block_counts).as_kernel_param(),
1667                grid.as_kernel_param(),
1668                (&plan.block_offsets).as_kernel_param(),
1669                (&total_rows_device).as_kernel_param(),
1670            ];
1671            unsafe {
1672                kernel
1673                    .clone()
1674                    .launch_on_stream(
1675                        &cu_stream,
1676                        LaunchConfig {
1677                            grid_dim: (1, 1, 1),
1678                            block_dim: (1024, 1, 1),
1679                            shared_mem_bytes: 0,
1680                        },
1681                        &mut params,
1682                    )
1683                    .map_err(|e| XlogError::Kernel(format!("{ctx}: scan launch failed: {e}")))?;
1684            }
1685        }
1686        rec_hg
1687            .commit(runtime)
1688            .map_err(|e| XlogError::Kernel(format!("{ctx}: count commit failed: {e}")))?;
1689        cu_stream
1690            .synchronize()
1691            .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
1692        let total_rows = self
1693            .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
1694            .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
1695        Ok(WcojTriangleHgCountPhaseU32 {
1696            total_rows_device,
1697            total_rows,
1698        })
1699    }
1700
1701    pub fn wcoj_triangle_hg_materialize_phase_u32_recorded(
1702        &self,
1703        e_xy: &CudaBuffer,
1704        e_yz: &CudaBuffer,
1705        e_xz: &CudaBuffer,
1706        plan: &WcojTriangleHgWorkPlanU32,
1707        count: WcojTriangleHgCountPhaseU32,
1708        launch_stream: StreamId,
1709    ) -> Result<CudaBuffer> {
1710        let ctx = "wcoj_triangle_hg_materialize_phase_u32_recorded";
1711        validate_binary_u32(ctx, "e_xy", e_xy)?;
1712        validate_binary_u32(ctx, "e_yz", e_yz)?;
1713        validate_binary_u32(ctx, "e_xz", e_xz)?;
1714        let out_schema = Schema::new(vec![
1715            (
1716                "x".to_string(),
1717                e_xy.schema().column_type(0).expect("xy.col0 type"),
1718            ),
1719            (
1720                "y".to_string(),
1721                e_xy.schema().column_type(1).expect("xy.col1 type"),
1722            ),
1723            (
1724                "z".to_string(),
1725                e_yz.schema().column_type(1).expect("yz.col1 type"),
1726            ),
1727        ]);
1728        if count.total_rows == 0 {
1729            return self.create_empty_buffer(out_schema);
1730        }
1731        let grid = plan.total_work.div_ceil(plan.block_work_unit);
1732        if grid > 1024 {
1733            return Err(XlogError::Kernel(format!(
1734                "{ctx}: spike phase path requires grid <= 1024, got {grid}"
1735            )));
1736        }
1737        let runtime = self.memory().runtime().ok_or_else(|| {
1738            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1739        })?;
1740        let cu_stream = runtime
1741            .stream_pool()
1742            .resolve(launch_stream)
1743            .ok_or_else(|| {
1744                XlogError::Kernel(format!(
1745                    "{ctx}: launch_stream StreamId({}) does not resolve",
1746                    launch_stream.0
1747                ))
1748            })?;
1749        let xy_col0 = metadata_column_u32(e_xy, 0)?;
1750        let xy_col1 = metadata_column_u32(e_xy, 1)?;
1751        let yz_col1 = metadata_column_u32(e_yz, 1)?;
1752        let xz_col1 = metadata_column_u32(e_xz, 1)?;
1753        let n_yz = self.metadata_logical_rows(e_yz)?;
1754        let n_xz = self.metadata_logical_rows(e_xz)?;
1755        let bytes_per_col = (count.total_rows as usize)
1756            .checked_mul(std::mem::size_of::<u32>())
1757            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
1758        let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
1759        let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
1760        let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
1761        let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
1762        rec_mat.read(e_xy.num_rows_device());
1763        rec_mat.read(e_yz.num_rows_device());
1764        rec_mat.read(e_xz.num_rows_device());
1765        rec_mat.read_column(e_xy.column(0).expect("xy.col0"));
1766        rec_mat.read_column(e_xy.column(1).expect("xy.col1"));
1767        rec_mat.read_column(e_yz.column(1).expect("yz.col1"));
1768        rec_mat.read_column(e_xz.column(1).expect("xz.col1"));
1769        rec_mat.read(&plan.xy_work_prefix);
1770        rec_mat.read(&plan.xy_yz_start);
1771        rec_mat.read(&plan.xy_yz_end);
1772        rec_mat.read(&plan.xy_xz_start);
1773        rec_mat.read(&plan.xy_xz_end);
1774        rec_mat.read(&plan.block_offsets);
1775        rec_mat.write(&out_x);
1776        rec_mat.write(&out_y);
1777        rec_mat.write(&out_z);
1778        rec_mat
1779            .preflight(runtime)
1780            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
1781        {
1782            let kernel = self
1783                .device()
1784                .inner()
1785                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_MATERIALIZE_HG_U32)
1786                .ok_or_else(|| {
1787                    XlogError::Kernel(
1788                        "wcoj_triangle_materialize_hg_u32 kernel not found".to_string(),
1789                    )
1790                })?;
1791            let out_x_u32 = unsafe { reinterpret_u8_as_u32(&mut out_x) };
1792            let out_y_u32 = unsafe { reinterpret_u8_as_u32(&mut out_y) };
1793            let out_z_u32 = unsafe { reinterpret_u8_as_u32(&mut out_z) };
1794            let mut params: Vec<*mut c_void> = vec![
1795                xy_col0.as_kernel_param(),
1796                xy_col1.as_kernel_param(),
1797                yz_col1.as_kernel_param(),
1798                n_yz.as_kernel_param(),
1799                xz_col1.as_kernel_param(),
1800                n_xz.as_kernel_param(),
1801                (&plan.xy_work_prefix).as_kernel_param(),
1802                (&plan.xy_yz_start).as_kernel_param(),
1803                (&plan.xy_yz_end).as_kernel_param(),
1804                (&plan.xy_xz_start).as_kernel_param(),
1805                (&plan.xy_xz_end).as_kernel_param(),
1806                plan.row_count.as_kernel_param(),
1807                plan.total_work.as_kernel_param(),
1808                plan.block_work_unit.as_kernel_param(),
1809                (&plan.block_offsets).as_kernel_param(),
1810                count.total_rows.as_kernel_param(),
1811                out_x_u32.as_kernel_param(),
1812                out_y_u32.as_kernel_param(),
1813                out_z_u32.as_kernel_param(),
1814            ];
1815            unsafe {
1816                kernel
1817                    .clone()
1818                    .launch_on_stream(
1819                        &cu_stream,
1820                        LaunchConfig {
1821                            grid_dim: (grid, 1, 1),
1822                            block_dim: (BLOCK_SIZE, 1, 1),
1823                            shared_mem_bytes: 0,
1824                        },
1825                        &mut params,
1826                    )
1827                    .map_err(|e| {
1828                        XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
1829                    })?;
1830            }
1831        }
1832        rec_mat
1833            .commit(runtime)
1834            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
1835        cu_stream.synchronize().map_err(|e| {
1836            XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
1837        })?;
1838        Ok(CudaBuffer::from_columns_with_host_count(
1839            vec![out_x.into(), out_y.into(), out_z.into()],
1840            count.total_rows as u64,
1841            count.total_rows_device,
1842            out_schema,
1843            count.total_rows,
1844        ))
1845    }
1846
1847    pub fn wcoj_triangle_hg_u32_with_plan_recorded(
1848        &self,
1849        e_xy: &CudaBuffer,
1850        e_yz: &CudaBuffer,
1851        e_xz: &CudaBuffer,
1852        plan: &WcojTriangleHgWorkPlanU32,
1853        launch_stream: StreamId,
1854    ) -> Result<CudaBuffer> {
1855        let ctx = "wcoj_triangle_hg_u32_with_plan_recorded";
1856        validate_binary_u32(ctx, "e_xy", e_xy)?;
1857        validate_binary_u32(ctx, "e_yz", e_yz)?;
1858        validate_binary_u32(ctx, "e_xz", e_xz)?;
1859        let out_schema = Schema::new(vec![
1860            (
1861                "x".to_string(),
1862                e_xy.schema().column_type(0).expect("xy.col0 type"),
1863            ),
1864            (
1865                "y".to_string(),
1866                e_xy.schema().column_type(1).expect("xy.col1 type"),
1867            ),
1868            (
1869                "z".to_string(),
1870                e_yz.schema().column_type(1).expect("yz.col1 type"),
1871            ),
1872        ]);
1873        if plan.total_work == 0 {
1874            return self.create_empty_buffer(out_schema);
1875        }
1876
1877        let grid = plan.total_work.div_ceil(plan.block_work_unit);
1878        let bytes_count = (grid as usize)
1879            .checked_mul(std::mem::size_of::<u32>())
1880            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
1881        let mut local_counts = None;
1882        let mut local_offsets = None;
1883        if grid > 1024 {
1884            local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
1885            local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
1886        }
1887        let total_rows_device = self.memory().alloc::<u32>(1)?;
1888
1889        let runtime = self.memory().runtime().ok_or_else(|| {
1890            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1891        })?;
1892        let cu_stream = runtime
1893            .stream_pool()
1894            .resolve(launch_stream)
1895            .ok_or_else(|| {
1896                XlogError::Kernel(format!(
1897                    "{ctx}: launch_stream StreamId({}) does not resolve",
1898                    launch_stream.0
1899                ))
1900            })?;
1901
1902        let xy_col0 = metadata_column_u32(e_xy, 0)?;
1903        let xy_col1 = metadata_column_u32(e_xy, 1)?;
1904        let yz_col1 = metadata_column_u32(e_yz, 1)?;
1905        let xz_col1 = metadata_column_u32(e_xz, 1)?;
1906        let n_yz = self.metadata_logical_rows(e_yz)?;
1907        let n_xz = self.metadata_logical_rows(e_xz)?;
1908
1909        let count_u32 = if grid <= 1024 {
1910            &plan.block_counts
1911        } else {
1912            local_counts
1913                .as_ref()
1914                .expect("local HG counts allocated when grid exceeds single-block scan")
1915        };
1916        let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
1917        rec_hg.read(e_xy.num_rows_device());
1918        rec_hg.read(e_yz.num_rows_device());
1919        rec_hg.read(e_xz.num_rows_device());
1920        rec_hg.read_column(e_xy.column(0).expect("xy.col0"));
1921        rec_hg.read_column(e_xy.column(1).expect("xy.col1"));
1922        rec_hg.read_column(e_yz.column(1).expect("yz.col1"));
1923        rec_hg.read_column(e_xz.column(1).expect("xz.col1"));
1924        rec_hg.read(&plan.xy_work_prefix);
1925        rec_hg.read(&plan.xy_yz_start);
1926        rec_hg.read(&plan.xy_yz_end);
1927        rec_hg.read(&plan.xy_xz_start);
1928        rec_hg.read(&plan.xy_xz_end);
1929        rec_hg.read_write(count_u32);
1930        if grid <= 1024 {
1931            rec_hg.read_write(&plan.block_offsets);
1932        } else {
1933            rec_hg.read_write(
1934                local_offsets
1935                    .as_ref()
1936                    .expect("local HG offsets allocated when grid exceeds single-block scan"),
1937            );
1938        }
1939        rec_hg.write(&total_rows_device);
1940        rec_hg.read_write(&plan.scratch_x);
1941        rec_hg.read_write(&plan.scratch_y);
1942        rec_hg.read_write(&plan.scratch_z);
1943        rec_hg
1944            .preflight(runtime)
1945            .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
1946        {
1947            let kernel = self
1948                .device()
1949                .inner()
1950                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_CACHED_U32)
1951                .ok_or_else(|| {
1952                    XlogError::Kernel(
1953                        "wcoj_triangle_count_hg_cached_u32 kernel not found".to_string(),
1954                    )
1955                })?;
1956            let mut params: Vec<*mut c_void> = vec![
1957                xy_col0.as_kernel_param(),
1958                xy_col1.as_kernel_param(),
1959                yz_col1.as_kernel_param(),
1960                n_yz.as_kernel_param(),
1961                xz_col1.as_kernel_param(),
1962                n_xz.as_kernel_param(),
1963                (&plan.xy_work_prefix).as_kernel_param(),
1964                (&plan.xy_yz_start).as_kernel_param(),
1965                (&plan.xy_yz_end).as_kernel_param(),
1966                (&plan.xy_xz_start).as_kernel_param(),
1967                (&plan.xy_xz_end).as_kernel_param(),
1968                plan.row_count.as_kernel_param(),
1969                plan.total_work.as_kernel_param(),
1970                plan.block_work_unit.as_kernel_param(),
1971                count_u32.as_kernel_param(),
1972                (&plan.scratch_x).as_kernel_param(),
1973                (&plan.scratch_y).as_kernel_param(),
1974                (&plan.scratch_z).as_kernel_param(),
1975            ];
1976            unsafe {
1977                kernel
1978                    .clone()
1979                    .launch_on_stream(
1980                        &cu_stream,
1981                        LaunchConfig {
1982                            grid_dim: (grid, 1, 1),
1983                            block_dim: (HG_COUNT_BLOCK_SIZE, 1, 1),
1984                            shared_mem_bytes: 0,
1985                        },
1986                        &mut params,
1987                    )
1988                    .map_err(|e| {
1989                        XlogError::Kernel(format!("{ctx}: cached count launch failed: {e}"))
1990                    })?;
1991            }
1992        }
1993        if grid <= 1024 {
1994            let kernel = self
1995                .device()
1996                .inner()
1997                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
1998                .ok_or_else(|| {
1999                    XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
2000                })?;
2001            let mut params: Vec<*mut c_void> = vec![
2002                count_u32.as_kernel_param(),
2003                grid.as_kernel_param(),
2004                (&plan.block_offsets).as_kernel_param(),
2005                (&total_rows_device).as_kernel_param(),
2006            ];
2007            unsafe {
2008                kernel
2009                    .clone()
2010                    .launch_on_stream(
2011                        &cu_stream,
2012                        LaunchConfig {
2013                            grid_dim: (1, 1, 1),
2014                            block_dim: (1024, 1, 1),
2015                            shared_mem_bytes: 0,
2016                        },
2017                        &mut params,
2018                    )
2019                    .map_err(|e| {
2020                        XlogError::Kernel(format!("{ctx}: HG block-count scan failed: {e}"))
2021                    })?;
2022            }
2023        } else {
2024            let offsets_mut = local_offsets
2025                .as_mut()
2026                .expect("local HG offsets allocated when grid exceeds single-block scan");
2027            unsafe {
2028                let res = sys::cuMemcpyDtoDAsync_v2(
2029                    *offsets_mut.device_ptr(),
2030                    *count_u32.device_ptr(),
2031                    bytes_count,
2032                    cu_stream.cu_stream(),
2033                );
2034                if res != sys::cudaError_enum::CUDA_SUCCESS {
2035                    return Err(XlogError::Kernel(format!(
2036                        "{ctx}: DtoD count to offsets failed: {res:?}"
2037                    )));
2038                }
2039            }
2040            self.multiblock_scan_u32_inplace_on_stream(
2041                offsets_mut,
2042                grid,
2043                &cu_stream,
2044                launch_stream,
2045                runtime,
2046            )?;
2047            let total_kernel = self
2048                .device()
2049                .inner()
2050                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
2051                .ok_or_else(|| {
2052                    XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
2053                })?;
2054            let mut params: Vec<*mut c_void> = vec![
2055                count_u32.as_kernel_param(),
2056                (&*offsets_mut).as_kernel_param(),
2057                grid.as_kernel_param(),
2058                (&total_rows_device).as_kernel_param(),
2059            ];
2060            unsafe {
2061                total_kernel
2062                    .clone()
2063                    .launch_on_stream(
2064                        &cu_stream,
2065                        LaunchConfig {
2066                            grid_dim: (1, 1, 1),
2067                            block_dim: (1, 1, 1),
2068                            shared_mem_bytes: 0,
2069                        },
2070                        &mut params,
2071                    )
2072                    .map_err(|e| {
2073                        XlogError::Kernel(format!("{ctx}: HG total reducer failed: {e}"))
2074                    })?;
2075            }
2076        }
2077        rec_hg
2078            .commit(runtime)
2079            .map_err(|e| XlogError::Kernel(format!("{ctx}: HG count commit failed: {e}")))?;
2080        cu_stream
2081            .synchronize()
2082            .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
2083        let total_rows = self
2084            .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
2085            .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
2086        if total_rows == 0 {
2087            return self.create_empty_buffer(out_schema);
2088        }
2089
2090        let bytes_per_col = (total_rows as usize)
2091            .checked_mul(std::mem::size_of::<u32>())
2092            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
2093        let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
2094        let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
2095        let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
2096        let materialize_offsets = if grid <= 1024 {
2097            &plan.block_offsets
2098        } else {
2099            local_offsets
2100                .as_ref()
2101                .expect("local HG offsets allocated when grid exceeds single-block scan")
2102        };
2103        let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
2104        rec_mat.read(count_u32);
2105        rec_mat.read(materialize_offsets);
2106        rec_mat.read(&plan.scratch_x);
2107        rec_mat.read(&plan.scratch_y);
2108        rec_mat.read(&plan.scratch_z);
2109        rec_mat.write(&out_x);
2110        rec_mat.write(&out_y);
2111        rec_mat.write(&out_z);
2112        rec_mat
2113            .preflight(runtime)
2114            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
2115        {
2116            let kernel = self
2117                .device()
2118                .inner()
2119                .get_func(
2120                    WCOJ_MODULE,
2121                    wcoj_kernels::WCOJ_TRIANGLE_MATERIALIZE_HG_CACHED_U32,
2122                )
2123                .ok_or_else(|| {
2124                    XlogError::Kernel(
2125                        "wcoj_triangle_materialize_hg_cached_u32 kernel not found".to_string(),
2126                    )
2127                })?;
2128            let out_x_u32 = unsafe { reinterpret_u8_as_u32(&mut out_x) };
2129            let out_y_u32 = unsafe { reinterpret_u8_as_u32(&mut out_y) };
2130            let out_z_u32 = unsafe { reinterpret_u8_as_u32(&mut out_z) };
2131            let mut params: Vec<*mut c_void> = vec![
2132                count_u32.as_kernel_param(),
2133                materialize_offsets.as_kernel_param(),
2134                plan.block_work_unit.as_kernel_param(),
2135                total_rows.as_kernel_param(),
2136                (&plan.scratch_x).as_kernel_param(),
2137                (&plan.scratch_y).as_kernel_param(),
2138                (&plan.scratch_z).as_kernel_param(),
2139                out_x_u32.as_kernel_param(),
2140                out_y_u32.as_kernel_param(),
2141                out_z_u32.as_kernel_param(),
2142            ];
2143            unsafe {
2144                kernel
2145                    .clone()
2146                    .launch_on_stream(
2147                        &cu_stream,
2148                        LaunchConfig {
2149                            grid_dim: (grid, 1, 1),
2150                            block_dim: (BLOCK_SIZE, 1, 1),
2151                            shared_mem_bytes: 0,
2152                        },
2153                        &mut params,
2154                    )
2155                    .map_err(|e| {
2156                        XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
2157                    })?;
2158            }
2159        }
2160        rec_mat
2161            .commit(runtime)
2162            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
2163        cu_stream.synchronize().map_err(|e| {
2164            XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
2165        })?;
2166
2167        Ok(CudaBuffer::from_columns_with_host_count(
2168            vec![out_x.into(), out_y.into(), out_z.into()],
2169            total_rows as u64,
2170            total_rows_device,
2171            out_schema,
2172            total_rows,
2173        ))
2174    }
2175
2176    pub fn wcoj_triangle_hg_work_plan_u64_recorded(
2177        &self,
2178        e_xy: &CudaBuffer,
2179        e_yz: &CudaBuffer,
2180        e_xz: &CudaBuffer,
2181        block_work_unit: u32,
2182        launch_stream: StreamId,
2183    ) -> Result<WcojTriangleHgWorkPlanU64> {
2184        let ctx = "wcoj_triangle_hg_work_plan_u64_recorded";
2185        if block_work_unit == 0 {
2186            return Err(XlogError::Kernel(format!(
2187                "{ctx}: block_work_unit must be nonzero"
2188            )));
2189        }
2190        validate_binary_u64(ctx, "e_xy", e_xy)?;
2191        validate_binary_u64(ctx, "e_yz", e_yz)?;
2192        validate_binary_u64(ctx, "e_xz", e_xz)?;
2193
2194        let n_xy = self.metadata_logical_rows(e_xy)?;
2195        let n_yz = self.metadata_logical_rows(e_yz)?;
2196        let n_xz = self.metadata_logical_rows(e_xz)?;
2197        let prefix_len = n_xy
2198            .checked_add(1)
2199            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
2200        let mut xy_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
2201        let mut xy_yz_start = self.memory().alloc::<u32>(n_xy as usize)?;
2202        let mut xy_yz_end = self.memory().alloc::<u32>(n_xy as usize)?;
2203        let mut xy_xz_start = self.memory().alloc::<u32>(n_xy as usize)?;
2204        let mut xy_xz_end = self.memory().alloc::<u32>(n_xy as usize)?;
2205
2206        if n_xy == 0 {
2207            let block_counts = self.memory().alloc::<u32>(1)?;
2208            let block_offsets = self.memory().alloc::<u32>(1)?;
2209            return Ok(WcojTriangleHgWorkPlanU64 {
2210                xy_work_prefix,
2211                xy_yz_start,
2212                xy_yz_end,
2213                xy_xz_start,
2214                xy_xz_end,
2215                block_counts,
2216                block_offsets,
2217                total_work: 0,
2218                block_work_unit,
2219                row_count: n_xy,
2220            });
2221        }
2222
2223        let runtime = self.memory().runtime().ok_or_else(|| {
2224            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2225        })?;
2226        let cu_stream = runtime
2227            .stream_pool()
2228            .resolve(launch_stream)
2229            .ok_or_else(|| {
2230                XlogError::Kernel(format!(
2231                    "{ctx}: launch_stream StreamId({}) does not resolve",
2232                    launch_stream.0
2233                ))
2234            })?;
2235
2236        let xy_col0 = metadata_column_u64(e_xy, 0)?;
2237        let xy_col1 = metadata_column_u64(e_xy, 1)?;
2238        let yz_col0 = metadata_column_u64(e_yz, 0)?;
2239        let xz_col0 = metadata_column_u64(e_xz, 0)?;
2240
2241        let mut rec = LaunchRecorder::new_strict(launch_stream);
2242        rec.read(e_xy.num_rows_device());
2243        rec.read(e_yz.num_rows_device());
2244        rec.read(e_xz.num_rows_device());
2245        rec.read_column(e_xy.column(0).expect("xy.col0"));
2246        rec.read_column(e_xy.column(1).expect("xy.col1"));
2247        rec.read_column(e_yz.column(0).expect("yz.col0"));
2248        rec.read_column(e_xz.column(0).expect("xz.col0"));
2249        rec.write(&xy_work_prefix);
2250        rec.write(&xy_yz_start);
2251        rec.write(&xy_yz_end);
2252        rec.write(&xy_xz_start);
2253        rec.write(&xy_xz_end);
2254        rec.preflight(runtime)
2255            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
2256
2257        let kernel = self
2258            .device()
2259            .inner()
2260            .get_func(
2261                WCOJ_MODULE,
2262                wcoj_kernels::WCOJ_TRIANGLE_BUILD_HG_WORK_PLAN_U64,
2263            )
2264            .ok_or_else(|| {
2265                XlogError::Kernel(
2266                    "wcoj_triangle_build_hg_work_plan_u64 kernel not found".to_string(),
2267                )
2268            })?;
2269        let grid = n_xy.div_ceil(BLOCK_SIZE);
2270        unsafe {
2271            kernel
2272                .clone()
2273                .launch_on_stream(
2274                    &cu_stream,
2275                    LaunchConfig {
2276                        grid_dim: (grid, 1, 1),
2277                        block_dim: (BLOCK_SIZE, 1, 1),
2278                        shared_mem_bytes: 0,
2279                    },
2280                    (
2281                        xy_col0,
2282                        xy_col1,
2283                        n_xy,
2284                        yz_col0,
2285                        n_yz,
2286                        xz_col0,
2287                        n_xz,
2288                        &mut xy_work_prefix,
2289                        &mut xy_yz_start,
2290                        &mut xy_yz_end,
2291                        &mut xy_xz_start,
2292                        &mut xy_xz_end,
2293                    ),
2294                )
2295                .map_err(|e| {
2296                    XlogError::Kernel(format!(
2297                        "wcoj_triangle_build_hg_work_plan_u64 launch failed: {e}"
2298                    ))
2299                })?;
2300        }
2301        self.multiblock_scan_u32_inplace_on_stream(
2302            &mut xy_work_prefix,
2303            prefix_len,
2304            &cu_stream,
2305            launch_stream,
2306            runtime,
2307        )?;
2308        rec.commit(runtime)
2309            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
2310        cu_stream
2311            .synchronize()
2312            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
2313        let total_work = self.dtoh_scalar_untracked::<u32>(&xy_work_prefix, n_xy as usize)?;
2314        let grid = if total_work == 0 {
2315            1
2316        } else {
2317            total_work.div_ceil(block_work_unit)
2318        };
2319        let block_counts = self.memory().alloc::<u32>(grid as usize)?;
2320        let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
2321
2322        Ok(WcojTriangleHgWorkPlanU64 {
2323            xy_work_prefix,
2324            xy_yz_start,
2325            xy_yz_end,
2326            xy_xz_start,
2327            xy_xz_end,
2328            block_counts,
2329            block_offsets,
2330            total_work,
2331            block_work_unit,
2332            row_count: n_xy,
2333        })
2334    }
2335
2336    pub fn wcoj_triangle_hg_u64_recorded(
2337        &self,
2338        e_xy: &CudaBuffer,
2339        e_yz: &CudaBuffer,
2340        e_xz: &CudaBuffer,
2341        block_work_unit: u32,
2342        launch_stream: StreamId,
2343    ) -> Result<CudaBuffer> {
2344        let ctx = "wcoj_triangle_hg_u64_recorded";
2345        validate_binary_u64(ctx, "e_xy", e_xy)?;
2346        validate_binary_u64(ctx, "e_yz", e_yz)?;
2347        validate_binary_u64(ctx, "e_xz", e_xz)?;
2348        let plan = self.wcoj_triangle_hg_work_plan_u64_recorded(
2349            e_xy,
2350            e_yz,
2351            e_xz,
2352            block_work_unit,
2353            launch_stream,
2354        )?;
2355        let out_schema = Schema::new(vec![
2356            ("col0".to_string(), ScalarType::U64),
2357            ("col1".to_string(), ScalarType::U64),
2358            ("col2".to_string(), ScalarType::U64),
2359        ]);
2360        if plan.total_work == 0 {
2361            return self.create_empty_buffer(out_schema);
2362        }
2363
2364        let grid = plan.total_work.div_ceil(plan.block_work_unit);
2365        let bytes_count = (grid as usize)
2366            .checked_mul(std::mem::size_of::<u32>())
2367            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
2368        let mut local_counts = None;
2369        let mut local_offsets = None;
2370        if grid > 1024 {
2371            local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
2372            local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
2373        }
2374        let total_rows_device = self.memory().alloc::<u32>(1)?;
2375
2376        let runtime = self.memory().runtime().ok_or_else(|| {
2377            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2378        })?;
2379        let cu_stream = runtime
2380            .stream_pool()
2381            .resolve(launch_stream)
2382            .ok_or_else(|| {
2383                XlogError::Kernel(format!(
2384                    "{ctx}: launch_stream StreamId({}) does not resolve",
2385                    launch_stream.0
2386                ))
2387            })?;
2388
2389        let xy_col0 = metadata_column_u64(e_xy, 0)?;
2390        let xy_col1 = metadata_column_u64(e_xy, 1)?;
2391        let yz_col1 = metadata_column_u64(e_yz, 1)?;
2392        let xz_col1 = metadata_column_u64(e_xz, 1)?;
2393        let n_yz = self.metadata_logical_rows(e_yz)?;
2394        let n_xz = self.metadata_logical_rows(e_xz)?;
2395
2396        let count_u32 = if grid <= 1024 {
2397            &plan.block_counts
2398        } else {
2399            local_counts
2400                .as_ref()
2401                .expect("local HG counts allocated when grid exceeds single-block scan")
2402        };
2403        let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
2404        rec_hg.read(e_xy.num_rows_device());
2405        rec_hg.read(e_yz.num_rows_device());
2406        rec_hg.read(e_xz.num_rows_device());
2407        rec_hg.read_column(e_yz.column(1).expect("yz.col1"));
2408        rec_hg.read_column(e_xz.column(1).expect("xz.col1"));
2409        rec_hg.read(&plan.xy_work_prefix);
2410        rec_hg.read(&plan.xy_yz_start);
2411        rec_hg.read(&plan.xy_yz_end);
2412        rec_hg.read(&plan.xy_xz_start);
2413        rec_hg.read(&plan.xy_xz_end);
2414        rec_hg.read_write(count_u32);
2415        if grid <= 1024 {
2416            rec_hg.read_write(&plan.block_offsets);
2417        } else {
2418            rec_hg.read_write(
2419                local_offsets
2420                    .as_ref()
2421                    .expect("local HG offsets allocated when grid exceeds single-block scan"),
2422            );
2423        }
2424        rec_hg.write(&total_rows_device);
2425        rec_hg
2426            .preflight(runtime)
2427            .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
2428        {
2429            let kernel = self
2430                .device()
2431                .inner()
2432                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_U64)
2433                .ok_or_else(|| {
2434                    XlogError::Kernel("wcoj_triangle_count_hg_u64 kernel not found".to_string())
2435                })?;
2436            let mut params: Vec<*mut c_void> = vec![
2437                yz_col1.as_kernel_param(),
2438                n_yz.as_kernel_param(),
2439                xz_col1.as_kernel_param(),
2440                n_xz.as_kernel_param(),
2441                (&plan.xy_work_prefix).as_kernel_param(),
2442                (&plan.xy_yz_start).as_kernel_param(),
2443                (&plan.xy_yz_end).as_kernel_param(),
2444                (&plan.xy_xz_start).as_kernel_param(),
2445                (&plan.xy_xz_end).as_kernel_param(),
2446                plan.row_count.as_kernel_param(),
2447                plan.total_work.as_kernel_param(),
2448                plan.block_work_unit.as_kernel_param(),
2449                count_u32.as_kernel_param(),
2450            ];
2451            unsafe {
2452                kernel
2453                    .clone()
2454                    .launch_on_stream(
2455                        &cu_stream,
2456                        LaunchConfig {
2457                            grid_dim: (grid, 1, 1),
2458                            block_dim: (BLOCK_SIZE, 1, 1),
2459                            shared_mem_bytes: 0,
2460                        },
2461                        &mut params,
2462                    )
2463                    .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
2464            }
2465        }
2466        if grid <= 1024 {
2467            let kernel = self
2468                .device()
2469                .inner()
2470                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
2471                .ok_or_else(|| {
2472                    XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
2473                })?;
2474            let mut params: Vec<*mut c_void> = vec![
2475                count_u32.as_kernel_param(),
2476                grid.as_kernel_param(),
2477                (&plan.block_offsets).as_kernel_param(),
2478                (&total_rows_device).as_kernel_param(),
2479            ];
2480            unsafe {
2481                kernel
2482                    .clone()
2483                    .launch_on_stream(
2484                        &cu_stream,
2485                        LaunchConfig {
2486                            grid_dim: (1, 1, 1),
2487                            block_dim: (1024, 1, 1),
2488                            shared_mem_bytes: 0,
2489                        },
2490                        &mut params,
2491                    )
2492                    .map_err(|e| {
2493                        XlogError::Kernel(format!("{ctx}: HG block-count scan failed: {e}"))
2494                    })?;
2495            }
2496        } else {
2497            let offsets_mut = local_offsets
2498                .as_mut()
2499                .expect("local HG offsets allocated when grid exceeds single-block scan");
2500            unsafe {
2501                let res = sys::cuMemcpyDtoDAsync_v2(
2502                    *offsets_mut.device_ptr(),
2503                    *count_u32.device_ptr(),
2504                    bytes_count,
2505                    cu_stream.cu_stream(),
2506                );
2507                if res != sys::cudaError_enum::CUDA_SUCCESS {
2508                    return Err(XlogError::Kernel(format!(
2509                        "{ctx}: DtoD count to offsets failed: {res:?}"
2510                    )));
2511                }
2512            }
2513            self.multiblock_scan_u32_inplace_on_stream(
2514                offsets_mut,
2515                grid,
2516                &cu_stream,
2517                launch_stream,
2518                runtime,
2519            )?;
2520            let total_kernel = self
2521                .device()
2522                .inner()
2523                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
2524                .ok_or_else(|| {
2525                    XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
2526                })?;
2527            let mut params: Vec<*mut c_void> = vec![
2528                count_u32.as_kernel_param(),
2529                (&*offsets_mut).as_kernel_param(),
2530                grid.as_kernel_param(),
2531                (&total_rows_device).as_kernel_param(),
2532            ];
2533            unsafe {
2534                total_kernel
2535                    .clone()
2536                    .launch_on_stream(
2537                        &cu_stream,
2538                        LaunchConfig {
2539                            grid_dim: (1, 1, 1),
2540                            block_dim: (1, 1, 1),
2541                            shared_mem_bytes: 0,
2542                        },
2543                        &mut params,
2544                    )
2545                    .map_err(|e| {
2546                        XlogError::Kernel(format!("{ctx}: HG total reducer failed: {e}"))
2547                    })?;
2548            }
2549        }
2550        rec_hg
2551            .commit(runtime)
2552            .map_err(|e| XlogError::Kernel(format!("{ctx}: HG count commit failed: {e}")))?;
2553        cu_stream
2554            .synchronize()
2555            .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
2556        let total_rows = self
2557            .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
2558            .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
2559        if total_rows == 0 {
2560            return self.create_empty_buffer(out_schema);
2561        }
2562
2563        let bytes_per_col = (total_rows as usize)
2564            .checked_mul(std::mem::size_of::<u64>())
2565            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
2566        let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
2567        let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
2568        let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
2569        let materialize_offsets = if grid <= 1024 {
2570            &plan.block_offsets
2571        } else {
2572            local_offsets
2573                .as_ref()
2574                .expect("local HG offsets allocated when grid exceeds single-block scan")
2575        };
2576        let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
2577        rec_mat.read(materialize_offsets);
2578        rec_mat.read(e_xy.num_rows_device());
2579        rec_mat.read(e_yz.num_rows_device());
2580        rec_mat.read(e_xz.num_rows_device());
2581        rec_mat.read_column(e_xy.column(0).expect("xy.col0"));
2582        rec_mat.read_column(e_xy.column(1).expect("xy.col1"));
2583        rec_mat.read_column(e_yz.column(1).expect("yz.col1"));
2584        rec_mat.read_column(e_xz.column(1).expect("xz.col1"));
2585        rec_mat.read(&plan.xy_work_prefix);
2586        rec_mat.read(&plan.xy_yz_start);
2587        rec_mat.read(&plan.xy_yz_end);
2588        rec_mat.read(&plan.xy_xz_start);
2589        rec_mat.read(&plan.xy_xz_end);
2590        rec_mat.write(&out_x);
2591        rec_mat.write(&out_y);
2592        rec_mat.write(&out_z);
2593        rec_mat
2594            .preflight(runtime)
2595            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
2596        {
2597            let kernel = self
2598                .device()
2599                .inner()
2600                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_MATERIALIZE_HG_U64)
2601                .ok_or_else(|| {
2602                    XlogError::Kernel(
2603                        "wcoj_triangle_materialize_hg_u64 kernel not found".to_string(),
2604                    )
2605                })?;
2606            let out_x_u64 = unsafe { reinterpret_u8_as_u64(&mut out_x) };
2607            let out_y_u64 = unsafe { reinterpret_u8_as_u64(&mut out_y) };
2608            let out_z_u64 = unsafe { reinterpret_u8_as_u64(&mut out_z) };
2609            let mut params: Vec<*mut c_void> = vec![
2610                xy_col0.as_kernel_param(),
2611                xy_col1.as_kernel_param(),
2612                yz_col1.as_kernel_param(),
2613                n_yz.as_kernel_param(),
2614                xz_col1.as_kernel_param(),
2615                n_xz.as_kernel_param(),
2616                (&plan.xy_work_prefix).as_kernel_param(),
2617                (&plan.xy_yz_start).as_kernel_param(),
2618                (&plan.xy_yz_end).as_kernel_param(),
2619                (&plan.xy_xz_start).as_kernel_param(),
2620                (&plan.xy_xz_end).as_kernel_param(),
2621                plan.row_count.as_kernel_param(),
2622                plan.total_work.as_kernel_param(),
2623                plan.block_work_unit.as_kernel_param(),
2624                materialize_offsets.as_kernel_param(),
2625                total_rows.as_kernel_param(),
2626                out_x_u64.as_kernel_param(),
2627                out_y_u64.as_kernel_param(),
2628                out_z_u64.as_kernel_param(),
2629            ];
2630            unsafe {
2631                kernel
2632                    .clone()
2633                    .launch_on_stream(
2634                        &cu_stream,
2635                        LaunchConfig {
2636                            grid_dim: (grid, 1, 1),
2637                            block_dim: (BLOCK_SIZE, 1, 1),
2638                            shared_mem_bytes: 0,
2639                        },
2640                        &mut params,
2641                    )
2642                    .map_err(|e| {
2643                        XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
2644                    })?;
2645            }
2646        }
2647        rec_mat
2648            .commit(runtime)
2649            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
2650        cu_stream.synchronize().map_err(|e| {
2651            XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
2652        })?;
2653
2654        Ok(CudaBuffer::from_columns_with_host_count(
2655            vec![out_x.into(), out_y.into(), out_z.into()],
2656            total_rows as u64,
2657            total_rows_device,
2658            out_schema,
2659            total_rows,
2660        ))
2661    }
2662
2663    pub fn wcoj_4cycle_hg_work_plan_u32_recorded(
2664        &self,
2665        e1: &CudaBuffer,
2666        e2: &CudaBuffer,
2667        e3: &CudaBuffer,
2668        e4: &CudaBuffer,
2669        block_work_unit: u32,
2670        launch_stream: StreamId,
2671    ) -> Result<WcojCycle4HgWorkPlanU32> {
2672        let ctx = "wcoj_4cycle_hg_work_plan_u32_recorded";
2673        if block_work_unit == 0 {
2674            return Err(XlogError::Kernel(format!(
2675                "{ctx}: block_work_unit must be nonzero"
2676            )));
2677        }
2678        validate_binary_u32(ctx, "e1", e1)?;
2679        validate_binary_u32(ctx, "e2", e2)?;
2680        validate_binary_u32(ctx, "e3", e3)?;
2681        validate_binary_u32(ctx, "e4", e4)?;
2682
2683        let n_e1 = self.metadata_logical_rows(e1)?;
2684        let n_e2 = self.metadata_logical_rows(e2)?;
2685        let n_e3 = self.metadata_logical_rows(e3)?;
2686        let prefix_len = n_e1
2687            .checked_add(1)
2688            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
2689        let e2_prefix_len = n_e2
2690            .checked_add(1)
2691            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: e2 prefix length overflow")))?;
2692        let mut e1_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
2693        let mut e2_work_prefix = self.memory().alloc::<u32>(e2_prefix_len as usize)?;
2694        let mut e1_e2_start = self.memory().alloc::<u32>(n_e1 as usize)?;
2695        let mut e1_e2_end = self.memory().alloc::<u32>(n_e1 as usize)?;
2696
2697        if n_e1 == 0 || n_e2 == 0 || n_e3 == 0 || self.metadata_logical_rows(e4)? == 0 {
2698            let block_counts = self.memory().alloc::<u32>(1)?;
2699            let block_offsets = self.memory().alloc::<u32>(1)?;
2700            return Ok(WcojCycle4HgWorkPlanU32 {
2701                e1_work_prefix,
2702                e2_work_prefix,
2703                e1_e2_start,
2704                e1_e2_end,
2705                block_counts,
2706                block_offsets,
2707                total_work: 0,
2708                block_work_unit,
2709                row_count: n_e1,
2710            });
2711        }
2712
2713        let runtime = self.memory().runtime().ok_or_else(|| {
2714            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2715        })?;
2716        let cu_stream = runtime
2717            .stream_pool()
2718            .resolve(launch_stream)
2719            .ok_or_else(|| {
2720                XlogError::Kernel(format!(
2721                    "{ctx}: launch_stream StreamId({}) does not resolve",
2722                    launch_stream.0
2723                ))
2724            })?;
2725
2726        let e1_col1 = metadata_column_u32(e1, 1)?;
2727        let e2_col0 = metadata_column_u32(e2, 0)?;
2728        let e2_col1 = metadata_column_u32(e2, 1)?;
2729        let e3_col0 = metadata_column_u32(e3, 0)?;
2730
2731        let mut rec = LaunchRecorder::new_strict(launch_stream);
2732        rec.read(e1.num_rows_device());
2733        rec.read(e2.num_rows_device());
2734        rec.read(e3.num_rows_device());
2735        rec.read_column(e1.column(1).expect("e1.col1"));
2736        rec.read_column(e2.column(0).expect("e2.col0"));
2737        rec.read_column(e2.column(1).expect("e2.col1"));
2738        rec.read_column(e3.column(0).expect("e3.col0"));
2739        rec.read_write(&e2_work_prefix);
2740        rec.write(&e1_work_prefix);
2741        rec.write(&e1_e2_start);
2742        rec.write(&e1_e2_end);
2743        rec.preflight(runtime)
2744            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
2745
2746        let e2_kernel = self
2747            .device()
2748            .inner()
2749            .get_func(
2750                WCOJ_MODULE,
2751                wcoj_kernels::WCOJ_4CYCLE_BUILD_E2_WORK_PREFIX_U32,
2752            )
2753            .ok_or_else(|| {
2754                XlogError::Kernel(
2755                    "wcoj_4cycle_build_e2_work_prefix_u32 kernel not found".to_string(),
2756                )
2757            })?;
2758        let e2_grid = n_e2.div_ceil(BLOCK_SIZE);
2759        unsafe {
2760            e2_kernel
2761                .clone()
2762                .launch_on_stream(
2763                    &cu_stream,
2764                    LaunchConfig {
2765                        grid_dim: (e2_grid, 1, 1),
2766                        block_dim: (BLOCK_SIZE, 1, 1),
2767                        shared_mem_bytes: 0,
2768                    },
2769                    (e2_col1, n_e2, e3_col0, n_e3, &mut e2_work_prefix),
2770                )
2771                .map_err(|e| {
2772                    XlogError::Kernel(format!(
2773                        "wcoj_4cycle_build_e2_work_prefix_u32 launch failed: {e}"
2774                    ))
2775                })?;
2776        }
2777        self.multiblock_scan_u32_inplace_on_stream(
2778            &mut e2_work_prefix,
2779            e2_prefix_len,
2780            &cu_stream,
2781            launch_stream,
2782            runtime,
2783        )?;
2784
2785        let kernel = self
2786            .device()
2787            .inner()
2788            .get_func(
2789                WCOJ_MODULE,
2790                wcoj_kernels::WCOJ_4CYCLE_BUILD_HG_WORK_PLAN_U32,
2791            )
2792            .ok_or_else(|| {
2793                XlogError::Kernel("wcoj_4cycle_build_hg_work_plan_u32 kernel not found".to_string())
2794            })?;
2795        let grid = n_e1.div_ceil(BLOCK_SIZE);
2796        unsafe {
2797            kernel
2798                .clone()
2799                .launch_on_stream(
2800                    &cu_stream,
2801                    LaunchConfig {
2802                        grid_dim: (grid, 1, 1),
2803                        block_dim: (BLOCK_SIZE, 1, 1),
2804                        shared_mem_bytes: 0,
2805                    },
2806                    (
2807                        e1_col1,
2808                        n_e1,
2809                        e2_col0,
2810                        n_e2,
2811                        &e2_work_prefix,
2812                        &mut e1_work_prefix,
2813                        &mut e1_e2_start,
2814                        &mut e1_e2_end,
2815                    ),
2816                )
2817                .map_err(|e| {
2818                    XlogError::Kernel(format!(
2819                        "wcoj_4cycle_build_hg_work_plan_u32 launch failed: {e}"
2820                    ))
2821                })?;
2822        }
2823        self.multiblock_scan_u32_inplace_on_stream(
2824            &mut e1_work_prefix,
2825            prefix_len,
2826            &cu_stream,
2827            launch_stream,
2828            runtime,
2829        )?;
2830        rec.commit(runtime)
2831            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
2832        cu_stream
2833            .synchronize()
2834            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
2835        let total_work = self.dtoh_scalar_untracked::<u32>(&e1_work_prefix, n_e1 as usize)?;
2836        let grid = if total_work == 0 {
2837            1
2838        } else {
2839            total_work.div_ceil(block_work_unit)
2840        };
2841        let block_counts = self.memory().alloc::<u32>(grid as usize)?;
2842        let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
2843
2844        Ok(WcojCycle4HgWorkPlanU32 {
2845            e1_work_prefix,
2846            e2_work_prefix,
2847            e1_e2_start,
2848            e1_e2_end,
2849            block_counts,
2850            block_offsets,
2851            total_work,
2852            block_work_unit,
2853            row_count: n_e1,
2854        })
2855    }
2856
2857    pub fn wcoj_4cycle_hg_u32_recorded(
2858        &self,
2859        e1: &CudaBuffer,
2860        e2: &CudaBuffer,
2861        e3: &CudaBuffer,
2862        e4: &CudaBuffer,
2863        block_work_unit: u32,
2864        launch_stream: StreamId,
2865    ) -> Result<CudaBuffer> {
2866        let ctx = "wcoj_4cycle_hg_u32_recorded";
2867        validate_binary_u32(ctx, "e1", e1)?;
2868        validate_binary_u32(ctx, "e2", e2)?;
2869        validate_binary_u32(ctx, "e3", e3)?;
2870        validate_binary_u32(ctx, "e4", e4)?;
2871        let plan = self.wcoj_4cycle_hg_work_plan_u32_recorded(
2872            e1,
2873            e2,
2874            e3,
2875            e4,
2876            block_work_unit,
2877            launch_stream,
2878        )?;
2879        let out_schema = Schema::new(vec![
2880            (
2881                "col0".to_string(),
2882                e1.schema().column_type(0).expect("e1.col0 type"),
2883            ),
2884            (
2885                "col1".to_string(),
2886                e1.schema().column_type(1).expect("e1.col1 type"),
2887            ),
2888            (
2889                "col2".to_string(),
2890                e2.schema().column_type(1).expect("e2.col1 type"),
2891            ),
2892            (
2893                "col3".to_string(),
2894                e3.schema().column_type(1).expect("e3.col1 type"),
2895            ),
2896        ]);
2897        if plan.total_work == 0 {
2898            return self.create_empty_buffer(out_schema);
2899        }
2900
2901        let grid = plan.total_work.div_ceil(plan.block_work_unit);
2902        let bytes_count = (grid as usize)
2903            .checked_mul(std::mem::size_of::<u32>())
2904            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
2905        let mut local_counts = None;
2906        let mut local_offsets = None;
2907        if grid > 1024 {
2908            local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
2909            local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
2910        }
2911        let total_rows_device = self.memory().alloc::<u32>(1)?;
2912
2913        let runtime = self.memory().runtime().ok_or_else(|| {
2914            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2915        })?;
2916        let cu_stream = runtime
2917            .stream_pool()
2918            .resolve(launch_stream)
2919            .ok_or_else(|| {
2920                XlogError::Kernel(format!(
2921                    "{ctx}: launch_stream StreamId({}) does not resolve",
2922                    launch_stream.0
2923                ))
2924            })?;
2925
2926        let e1_col0 = metadata_column_u32(e1, 0)?;
2927        let e1_col1 = metadata_column_u32(e1, 1)?;
2928        let e2_col1 = metadata_column_u32(e2, 1)?;
2929        let e3_col0 = metadata_column_u32(e3, 0)?;
2930        let e3_col1 = metadata_column_u32(e3, 1)?;
2931        let e4_col0 = metadata_column_u32(e4, 0)?;
2932        let e4_col1 = metadata_column_u32(e4, 1)?;
2933        let n_e3 = self.metadata_logical_rows(e3)?;
2934        let n_e4 = self.metadata_logical_rows(e4)?;
2935
2936        let count_u32 = if grid <= 1024 {
2937            &plan.block_counts
2938        } else {
2939            local_counts
2940                .as_ref()
2941                .expect("local HG counts allocated when grid exceeds single-block scan")
2942        };
2943        let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
2944        rec_hg.read(e1.num_rows_device());
2945        rec_hg.read(e2.num_rows_device());
2946        rec_hg.read(e3.num_rows_device());
2947        rec_hg.read(e4.num_rows_device());
2948        rec_hg.read_column(e1.column(0).expect("e1.col0"));
2949        rec_hg.read_column(e1.column(1).expect("e1.col1"));
2950        rec_hg.read_column(e2.column(1).expect("e2.col1"));
2951        rec_hg.read_column(e3.column(0).expect("e3.col0"));
2952        rec_hg.read_column(e3.column(1).expect("e3.col1"));
2953        rec_hg.read_column(e4.column(0).expect("e4.col0"));
2954        rec_hg.read_column(e4.column(1).expect("e4.col1"));
2955        rec_hg.read(&plan.e1_work_prefix);
2956        rec_hg.read(&plan.e2_work_prefix);
2957        rec_hg.read(&plan.e1_e2_start);
2958        rec_hg.read(&plan.e1_e2_end);
2959        rec_hg.read_write(count_u32);
2960        if grid <= 1024 {
2961            rec_hg.read_write(&plan.block_offsets);
2962        } else {
2963            rec_hg.read_write(
2964                local_offsets
2965                    .as_ref()
2966                    .expect("local HG offsets allocated when grid exceeds single-block scan"),
2967            );
2968        }
2969        rec_hg.write(&total_rows_device);
2970        rec_hg
2971            .preflight(runtime)
2972            .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
2973        {
2974            let kernel = self
2975                .device()
2976                .inner()
2977                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_COUNT_HG_U32)
2978                .ok_or_else(|| {
2979                    XlogError::Kernel("wcoj_4cycle_count_hg_u32 kernel not found".to_string())
2980                })?;
2981            let mut params: Vec<*mut c_void> = vec![
2982                e1_col0.as_kernel_param(),
2983                e1_col1.as_kernel_param(),
2984                plan.row_count.as_kernel_param(),
2985                e2_col1.as_kernel_param(),
2986                e3_col0.as_kernel_param(),
2987                e3_col1.as_kernel_param(),
2988                n_e3.as_kernel_param(),
2989                e4_col0.as_kernel_param(),
2990                e4_col1.as_kernel_param(),
2991                n_e4.as_kernel_param(),
2992                (&plan.e1_work_prefix).as_kernel_param(),
2993                (&plan.e2_work_prefix).as_kernel_param(),
2994                (&plan.e1_e2_start).as_kernel_param(),
2995                (&plan.e1_e2_end).as_kernel_param(),
2996                plan.total_work.as_kernel_param(),
2997                plan.block_work_unit.as_kernel_param(),
2998                count_u32.as_kernel_param(),
2999            ];
3000            unsafe {
3001                kernel
3002                    .clone()
3003                    .launch_on_stream(
3004                        &cu_stream,
3005                        LaunchConfig {
3006                            grid_dim: (grid, 1, 1),
3007                            block_dim: (BLOCK_SIZE, 1, 1),
3008                            shared_mem_bytes: 0,
3009                        },
3010                        &mut params,
3011                    )
3012                    .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
3013            }
3014        }
3015        if grid <= 1024 {
3016            let kernel = self
3017                .device()
3018                .inner()
3019                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
3020                .ok_or_else(|| {
3021                    XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
3022                })?;
3023            let mut params: Vec<*mut c_void> = vec![
3024                count_u32.as_kernel_param(),
3025                grid.as_kernel_param(),
3026                (&plan.block_offsets).as_kernel_param(),
3027                (&total_rows_device).as_kernel_param(),
3028            ];
3029            unsafe {
3030                kernel
3031                    .clone()
3032                    .launch_on_stream(
3033                        &cu_stream,
3034                        LaunchConfig {
3035                            grid_dim: (1, 1, 1),
3036                            block_dim: (1024, 1, 1),
3037                            shared_mem_bytes: 0,
3038                        },
3039                        &mut params,
3040                    )
3041                    .map_err(|e| XlogError::Kernel(format!("{ctx}: scan failed: {e}")))?;
3042            }
3043        } else {
3044            let offsets_mut = local_offsets
3045                .as_mut()
3046                .expect("local HG offsets allocated when grid exceeds single-block scan");
3047            unsafe {
3048                let res = sys::cuMemcpyDtoDAsync_v2(
3049                    *offsets_mut.device_ptr(),
3050                    *count_u32.device_ptr(),
3051                    bytes_count,
3052                    cu_stream.cu_stream(),
3053                );
3054                if res != sys::cudaError_enum::CUDA_SUCCESS {
3055                    return Err(XlogError::Kernel(format!(
3056                        "{ctx}: DtoD count to offsets failed: {res:?}"
3057                    )));
3058                }
3059            }
3060            self.multiblock_scan_u32_inplace_on_stream(
3061                offsets_mut,
3062                grid,
3063                &cu_stream,
3064                launch_stream,
3065                runtime,
3066            )?;
3067            let total_kernel = self
3068                .device()
3069                .inner()
3070                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
3071                .ok_or_else(|| {
3072                    XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
3073                })?;
3074            let mut params: Vec<*mut c_void> = vec![
3075                count_u32.as_kernel_param(),
3076                (&*offsets_mut).as_kernel_param(),
3077                grid.as_kernel_param(),
3078                (&total_rows_device).as_kernel_param(),
3079            ];
3080            unsafe {
3081                total_kernel
3082                    .clone()
3083                    .launch_on_stream(
3084                        &cu_stream,
3085                        LaunchConfig {
3086                            grid_dim: (1, 1, 1),
3087                            block_dim: (1, 1, 1),
3088                            shared_mem_bytes: 0,
3089                        },
3090                        &mut params,
3091                    )
3092                    .map_err(|e| XlogError::Kernel(format!("{ctx}: total failed: {e}")))?;
3093            }
3094        }
3095        rec_hg
3096            .commit(runtime)
3097            .map_err(|e| XlogError::Kernel(format!("{ctx}: count commit failed: {e}")))?;
3098        cu_stream
3099            .synchronize()
3100            .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
3101        let total_rows = self
3102            .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
3103            .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
3104        if total_rows == 0 {
3105            return self.create_empty_buffer(out_schema);
3106        }
3107
3108        let bytes_per_col = (total_rows as usize)
3109            .checked_mul(std::mem::size_of::<u32>())
3110            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
3111        let mut out_w = self.memory().alloc::<u8>(bytes_per_col)?;
3112        let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
3113        let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
3114        let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
3115        let materialize_offsets = if grid <= 1024 {
3116            &plan.block_offsets
3117        } else {
3118            local_offsets
3119                .as_ref()
3120                .expect("local HG offsets allocated when grid exceeds single-block scan")
3121        };
3122        let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
3123        rec_mat.read(materialize_offsets);
3124        rec_mat.read(e1.num_rows_device());
3125        rec_mat.read(e2.num_rows_device());
3126        rec_mat.read(e3.num_rows_device());
3127        rec_mat.read(e4.num_rows_device());
3128        rec_mat.read_column(e1.column(0).expect("e1.col0"));
3129        rec_mat.read_column(e1.column(1).expect("e1.col1"));
3130        rec_mat.read_column(e2.column(1).expect("e2.col1"));
3131        rec_mat.read_column(e3.column(0).expect("e3.col0"));
3132        rec_mat.read_column(e3.column(1).expect("e3.col1"));
3133        rec_mat.read_column(e4.column(0).expect("e4.col0"));
3134        rec_mat.read_column(e4.column(1).expect("e4.col1"));
3135        rec_mat.read(&plan.e1_work_prefix);
3136        rec_mat.read(&plan.e2_work_prefix);
3137        rec_mat.read(&plan.e1_e2_start);
3138        rec_mat.read(&plan.e1_e2_end);
3139        rec_mat.write(&out_w);
3140        rec_mat.write(&out_x);
3141        rec_mat.write(&out_y);
3142        rec_mat.write(&out_z);
3143        rec_mat
3144            .preflight(runtime)
3145            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
3146        {
3147            let kernel = self
3148                .device()
3149                .inner()
3150                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_MATERIALIZE_HG_U32)
3151                .ok_or_else(|| {
3152                    XlogError::Kernel("wcoj_4cycle_materialize_hg_u32 kernel not found".to_string())
3153                })?;
3154            let out_w_u32 = unsafe { reinterpret_u8_as_u32(&mut out_w) };
3155            let out_x_u32 = unsafe { reinterpret_u8_as_u32(&mut out_x) };
3156            let out_y_u32 = unsafe { reinterpret_u8_as_u32(&mut out_y) };
3157            let out_z_u32 = unsafe { reinterpret_u8_as_u32(&mut out_z) };
3158            let mut params: Vec<*mut c_void> = vec![
3159                e1_col0.as_kernel_param(),
3160                e1_col1.as_kernel_param(),
3161                plan.row_count.as_kernel_param(),
3162                e2_col1.as_kernel_param(),
3163                e3_col0.as_kernel_param(),
3164                e3_col1.as_kernel_param(),
3165                n_e3.as_kernel_param(),
3166                e4_col0.as_kernel_param(),
3167                e4_col1.as_kernel_param(),
3168                n_e4.as_kernel_param(),
3169                (&plan.e1_work_prefix).as_kernel_param(),
3170                (&plan.e2_work_prefix).as_kernel_param(),
3171                (&plan.e1_e2_start).as_kernel_param(),
3172                (&plan.e1_e2_end).as_kernel_param(),
3173                plan.total_work.as_kernel_param(),
3174                plan.block_work_unit.as_kernel_param(),
3175                materialize_offsets.as_kernel_param(),
3176                total_rows.as_kernel_param(),
3177                out_w_u32.as_kernel_param(),
3178                out_x_u32.as_kernel_param(),
3179                out_y_u32.as_kernel_param(),
3180                out_z_u32.as_kernel_param(),
3181            ];
3182            unsafe {
3183                kernel
3184                    .clone()
3185                    .launch_on_stream(
3186                        &cu_stream,
3187                        LaunchConfig {
3188                            grid_dim: (grid, 1, 1),
3189                            block_dim: (BLOCK_SIZE, 1, 1),
3190                            shared_mem_bytes: 0,
3191                        },
3192                        &mut params,
3193                    )
3194                    .map_err(|e| {
3195                        XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
3196                    })?;
3197            }
3198        }
3199        rec_mat
3200            .commit(runtime)
3201            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
3202        cu_stream.synchronize().map_err(|e| {
3203            XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
3204        })?;
3205
3206        Ok(CudaBuffer::from_columns_with_host_count(
3207            vec![out_w.into(), out_x.into(), out_y.into(), out_z.into()],
3208            total_rows as u64,
3209            total_rows_device,
3210            out_schema,
3211            total_rows,
3212        ))
3213    }
3214
3215    /// Aggregate-fused 4-cycle group-by-root count: evaluate
3216    /// `q(W, count) :- e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)` grouped by the
3217    /// variable-order root W, WITHOUT materializing the 4-cycle rows.
3218    ///
3219    /// Pipeline (all recorded; the 4-cycle result never exists as rows):
3220    /// 1. the standard 4-cycle histogram-guided work plan;
3221    /// 2. `wcoj_4cycle_groupby_root_count_hg_u32` accumulates, per e1 row,
3222    ///    a match count (integer atomicAdd — order-insensitive,
3223    ///    deterministic values);
3224    /// 3. a (W, count) staging buffer over the *input* rows is compacted
3225    ///    to count>0 rows (roots with no completion must be absent) and
3226    ///    reduced per W via the recorded groupby Sum.
3227    ///
3228    /// All reduction work is O(n_e1) — input-sized, never join-output-sized.
3229    ///
3230    /// Output schema matches the unfused materialize+groupby baseline:
3231    /// `col0` = W (e1.col0 type, U32/Symbol), `col1` = count (U64).
3232    ///
3233    /// # Errors
3234    /// * `XlogError::Kernel` if the manager has no runtime, the launch
3235    ///   stream does not resolve, an input is not 2-column U32/Symbol, or
3236    ///   any kernel launch fails.
3237    pub fn wcoj_4cycle_groupby_root_count_u32_recorded(
3238        &self,
3239        e1: &CudaBuffer,
3240        e2: &CudaBuffer,
3241        e3: &CudaBuffer,
3242        e4: &CudaBuffer,
3243        block_work_unit: u32,
3244        launch_stream: StreamId,
3245    ) -> Result<CudaBuffer> {
3246        let ctx = "wcoj_4cycle_groupby_root_count_u32_recorded";
3247        // Layout-normalize per dispatch (sorted-fast-path clone when the
3248        // input is already lex-sorted + unique): the fused path must give
3249        // the same guarantee as the unfused pipeline instead of trusting
3250        // store-buffer sortedness — unsorted/duplicated inputs previously
3251        // produced silently wrong (empty) fused results.
3252        let e1 = &self.wcoj_layout_u32_recorded(e1, launch_stream)?;
3253        let e2 = &self.wcoj_layout_u32_recorded(e2, launch_stream)?;
3254        let e3 = &self.wcoj_layout_u32_recorded(e3, launch_stream)?;
3255        let e4 = &self.wcoj_layout_u32_recorded(e4, launch_stream)?;
3256        validate_binary_u32(ctx, "e1", e1)?;
3257        validate_binary_u32(ctx, "e2", e2)?;
3258        validate_binary_u32(ctx, "e3", e3)?;
3259        validate_binary_u32(ctx, "e4", e4)?;
3260        let plan = self.wcoj_4cycle_hg_work_plan_u32_recorded(
3261            e1,
3262            e2,
3263            e3,
3264            e4,
3265            block_work_unit,
3266            launch_stream,
3267        )?;
3268        let n_e1 = plan.row_count;
3269        let w_type = e1.schema().column_type(0).expect("e1.col0 type");
3270        let out_schema = Schema::new(vec![
3271            ("w".to_string(), w_type),
3272            ("count".to_string(), ScalarType::U64),
3273        ]);
3274        if n_e1 == 0 || plan.total_work == 0 {
3275            return self.create_empty_buffer(out_schema);
3276        }
3277
3278        let runtime = self.memory().runtime().ok_or_else(|| {
3279            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
3280        })?;
3281        let cu_stream = runtime
3282            .stream_pool()
3283            .resolve(launch_stream)
3284            .ok_or_else(|| {
3285                XlogError::Kernel(format!(
3286                    "{ctx}: launch_stream StreamId({}) does not resolve",
3287                    launch_stream.0
3288                ))
3289            })?;
3290
3291        let e1_col0 = metadata_column_u32(e1, 0)?;
3292        let e1_col1 = metadata_column_u32(e1, 1)?;
3293        let e2_col1 = metadata_column_u32(e2, 1)?;
3294        let e3_col0 = metadata_column_u32(e3, 0)?;
3295        let e3_col1 = metadata_column_u32(e3, 1)?;
3296        let e4_col0 = metadata_column_u32(e4, 0)?;
3297        let e4_col1 = metadata_column_u32(e4, 1)?;
3298        let n_e3 = self.metadata_logical_rows(e3)?;
3299        let n_e4 = self.metadata_logical_rows(e4)?;
3300
3301        // Per-e1-row match counters, zero-initialized. Allocated as the
3302        // u8-backed column layout so the array doubles as the staging
3303        // buffer's count column after the kernel fills it.
3304        let mut row_counts = self
3305            .memory()
3306            .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3307        self.device()
3308            .inner()
3309            .memset_zeros(&mut row_counts)
3310            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
3311
3312        let grid = plan.total_work.div_ceil(plan.block_work_unit);
3313        let mut rec = LaunchRecorder::new_strict(launch_stream);
3314        rec.read(e1.num_rows_device());
3315        rec.read(e2.num_rows_device());
3316        rec.read(e3.num_rows_device());
3317        rec.read(e4.num_rows_device());
3318        rec.read_column(e1.column(0).expect("e1.col0"));
3319        rec.read_column(e1.column(1).expect("e1.col1"));
3320        rec.read_column(e2.column(1).expect("e2.col1"));
3321        rec.read_column(e3.column(0).expect("e3.col0"));
3322        rec.read_column(e3.column(1).expect("e3.col1"));
3323        rec.read_column(e4.column(0).expect("e4.col0"));
3324        rec.read_column(e4.column(1).expect("e4.col1"));
3325        rec.read(&plan.e1_work_prefix);
3326        rec.read(&plan.e2_work_prefix);
3327        rec.read(&plan.e1_e2_start);
3328        rec.read(&plan.e1_e2_end);
3329        rec.write(&row_counts);
3330        rec.preflight(runtime)
3331            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
3332        {
3333            let kernel = self
3334                .device()
3335                .inner()
3336                .get_func(
3337                    WCOJ_MODULE,
3338                    wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_COUNT_HG_U32,
3339                )
3340                .ok_or_else(|| {
3341                    XlogError::Kernel(
3342                        "wcoj_4cycle_groupby_root_count_hg_u32 kernel not found".to_string(),
3343                    )
3344                })?;
3345            let mut params: Vec<*mut c_void> = vec![
3346                e1_col0.as_kernel_param(),
3347                e1_col1.as_kernel_param(),
3348                plan.row_count.as_kernel_param(),
3349                e2_col1.as_kernel_param(),
3350                e3_col0.as_kernel_param(),
3351                e3_col1.as_kernel_param(),
3352                n_e3.as_kernel_param(),
3353                e4_col0.as_kernel_param(),
3354                e4_col1.as_kernel_param(),
3355                n_e4.as_kernel_param(),
3356                (&plan.e1_work_prefix).as_kernel_param(),
3357                (&plan.e2_work_prefix).as_kernel_param(),
3358                (&plan.e1_e2_start).as_kernel_param(),
3359                (&plan.e1_e2_end).as_kernel_param(),
3360                plan.total_work.as_kernel_param(),
3361                plan.block_work_unit.as_kernel_param(),
3362                (&row_counts).as_kernel_param(),
3363            ];
3364            unsafe {
3365                kernel
3366                    .clone()
3367                    .launch_on_stream(
3368                        &cu_stream,
3369                        LaunchConfig {
3370                            grid_dim: (grid, 1, 1),
3371                            block_dim: (BLOCK_SIZE, 1, 1),
3372                            shared_mem_bytes: 0,
3373                        },
3374                        &mut params,
3375                    )
3376                    .map_err(|e| {
3377                        XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
3378                    })?;
3379            }
3380        }
3381        rec.commit(runtime)
3382            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
3383
3384        // Staging buffer (W, count) over the n_e1 input rows: W is a
3385        // device-to-device copy of e1.col0; the count column is the
3386        // kernel-filled array. Rows stay lex-sorted by W.
3387        let w_src = match e1.column(0).expect("e1.col0") {
3388            CudaColumn::Owned(slice) => slice,
3389            _ => {
3390                return Err(XlogError::Kernel(format!(
3391                    "{ctx}: e1.col0 must be an owned CudaColumn"
3392                )))
3393            }
3394        };
3395        let mut w_copy = self
3396            .memory()
3397            .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3398        self.device()
3399            .inner()
3400            .dtod_copy(w_src, &mut w_copy)
3401            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy W column failed: {e}")))?;
3402        let mut d_num_rows = self.memory().alloc::<u32>(1)?;
3403        self.device()
3404            .inner()
3405            .dtod_copy(e1.num_rows_device(), &mut d_num_rows)
3406            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
3407        let staging_schema = Schema::new(vec![
3408            ("w".to_string(), w_type),
3409            ("count".to_string(), ScalarType::U32),
3410        ]);
3411        let staging = CudaBuffer::from_columns_with_host_count(
3412            vec![w_copy.into(), row_counts.into()],
3413            n_e1 as u64,
3414            d_num_rows,
3415            staging_schema,
3416            n_e1,
3417        );
3418
3419        // Keep only roots with at least one completed 4-cycle, then reduce
3420        // per W. Both steps run over input-sized data.
3421        let mask = self.compare_const_mask_recorded::<u32>(
3422            &staging,
3423            1,
3424            0u32,
3425            crate::CompareOp::Gt,
3426            launch_stream,
3427        )?;
3428        let compacted =
3429            self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
3430        self.groupby_multi_agg_recorded(
3431            &compacted,
3432            &[0],
3433            &[(1, xlog_core::AggOp::Sum)],
3434            launch_stream,
3435        )
3436    }
3437
3438    /// Aggregate-fused 4-cycle group-by-root sum/min/max: evaluate
3439    /// `q(W, agg(V)) :- e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)` with
3440    /// `agg ∈ {Sum, Min, Max}` and `V ∈ {X, Y, Z}` grouped by the
3441    /// variable-order root W, WITHOUT materializing the 4-cycle rows.
3442    ///
3443    /// Pipeline (all recorded; the 4-cycle result never exists as rows):
3444    /// 1. the standard 4-cycle histogram-guided work plan;
3445    /// 2. the per-op fused kernel accumulates, per e1 row, a match count
3446    ///    (compaction mask) and the per-row partial aggregate (integer
3447    ///    atomics — order-insensitive, deterministic values). Sum partials
3448    ///    are u64 (a per-row partial can exceed `u32::MAX`); min partials
3449    ///    start at `u32::MAX`, max partials at 0;
3450    /// 3. a 3-column (W, count, agg) staging buffer over the *input* rows
3451    ///    is compacted to count>0 rows (roots with no completion must be
3452    ///    absent) and reduced per W via the recorded groupby with the same
3453    ///    `AggOp` (Sum over the u64 partials; Min/Max over u32).
3454    ///
3455    /// All reduction work is O(n_e1) — input-sized, never join-output-sized.
3456    ///
3457    /// Output schema matches the unfused materialize+groupby baseline:
3458    /// `col0` = W (e1.col0 type, U32/Symbol), `col1` = U64 for Sum,
3459    /// U32 for Min/Max.
3460    ///
3461    /// Bag semantics: every (X, Y, Z) completion contributes its value,
3462    /// exactly like aggregating the materialized projection.
3463    ///
3464    /// # Errors
3465    /// * `XlogError::Kernel` if `agg_op` is not Sum/Min/Max, the value
3466    ///   column is not plain U32, the manager has no runtime, the launch
3467    ///   stream does not resolve, an input is not 2-column U32/Symbol, or
3468    ///   any kernel launch fails.
3469    #[allow(clippy::too_many_arguments)]
3470    pub fn wcoj_4cycle_groupby_root_agg_u32_recorded(
3471        &self,
3472        e1: &CudaBuffer,
3473        e2: &CudaBuffer,
3474        e3: &CudaBuffer,
3475        e4: &CudaBuffer,
3476        agg_op: AggOp,
3477        value: Wcoj4CycleRootAggValue,
3478        block_work_unit: u32,
3479        launch_stream: StreamId,
3480    ) -> Result<CudaBuffer> {
3481        let ctx = "wcoj_4cycle_groupby_root_agg_u32_recorded";
3482        // Layout-normalize per dispatch (sorted-fast-path clone when the
3483        // input is already lex-sorted + unique): the fused path must give
3484        // the same guarantee as the unfused pipeline instead of trusting
3485        // store-buffer sortedness — unsorted/duplicated inputs previously
3486        // produced silently wrong (empty) fused results.
3487        let e1 = &self.wcoj_layout_u32_recorded(e1, launch_stream)?;
3488        let e2 = &self.wcoj_layout_u32_recorded(e2, launch_stream)?;
3489        let e3 = &self.wcoj_layout_u32_recorded(e3, launch_stream)?;
3490        let e4 = &self.wcoj_layout_u32_recorded(e4, launch_stream)?;
3491        let (kernel_name, agg_elem_size, agg_scalar, agg_name) = match agg_op {
3492            AggOp::Sum => (
3493                wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_SUM_HG_U32,
3494                std::mem::size_of::<u64>(),
3495                ScalarType::U64,
3496                "sum_0",
3497            ),
3498            AggOp::Min => (
3499                wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_MIN_HG_U32,
3500                std::mem::size_of::<u32>(),
3501                ScalarType::U32,
3502                "min_0",
3503            ),
3504            AggOp::Max => (
3505                wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_MAX_HG_U32,
3506                std::mem::size_of::<u32>(),
3507                ScalarType::U32,
3508                "max_0",
3509            ),
3510            other => {
3511                return Err(XlogError::Kernel(format!(
3512                    "{ctx}: unsupported AggOp {other:?} (Sum/Min/Max only; use \
3513                     wcoj_4cycle_groupby_root_count_u32_recorded for Count)"
3514                )))
3515            }
3516        };
3517        validate_binary_u32(ctx, "e1", e1)?;
3518        validate_binary_u32(ctx, "e2", e2)?;
3519        validate_binary_u32(ctx, "e3", e3)?;
3520        validate_binary_u32(ctx, "e4", e4)?;
3521        // The aggregate value is arithmetic: require a plain U32 value
3522        // column (Symbol ids are not summable/orderable data). The column
3523        // checked is exactly the one the kernel reads the value from —
3524        // and the one whose type the materialized (W, X, Y, Z) baseline
3525        // schema carries (`build_4cycle_head_schema`).
3526        let (value_buf, value_label) = match value {
3527            Wcoj4CycleRootAggValue::X => (e1, "e1"),
3528            Wcoj4CycleRootAggValue::Y => (e2, "e2"),
3529            Wcoj4CycleRootAggValue::Z => (e3, "e3"),
3530        };
3531        {
3532            let ty = value_buf.schema().column_type(1).expect("validated 2-col");
3533            if ty != ScalarType::U32 {
3534                return Err(XlogError::Kernel(format!(
3535                    "{ctx}: {value_label}.col1 supplies the aggregate value and must be U32, \
3536                     got {ty:?}"
3537                )));
3538            }
3539        }
3540
3541        let plan = self.wcoj_4cycle_hg_work_plan_u32_recorded(
3542            e1,
3543            e2,
3544            e3,
3545            e4,
3546            block_work_unit,
3547            launch_stream,
3548        )?;
3549        let n_e1 = plan.row_count;
3550        let w_type = e1.schema().column_type(0).expect("e1.col0 type");
3551        let out_schema = Schema::new(vec![
3552            ("w".to_string(), w_type),
3553            (agg_name.to_string(), agg_scalar),
3554        ]);
3555        if n_e1 == 0 || plan.total_work == 0 {
3556            return self.create_empty_buffer(out_schema);
3557        }
3558
3559        let runtime = self.memory().runtime().ok_or_else(|| {
3560            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
3561        })?;
3562        let cu_stream = runtime
3563            .stream_pool()
3564            .resolve(launch_stream)
3565            .ok_or_else(|| {
3566                XlogError::Kernel(format!(
3567                    "{ctx}: launch_stream StreamId({}) does not resolve",
3568                    launch_stream.0
3569                ))
3570            })?;
3571
3572        let e1_col0 = metadata_column_u32(e1, 0)?;
3573        let e1_col1 = metadata_column_u32(e1, 1)?;
3574        let e2_col1 = metadata_column_u32(e2, 1)?;
3575        let e3_col0 = metadata_column_u32(e3, 0)?;
3576        let e3_col1 = metadata_column_u32(e3, 1)?;
3577        let e4_col0 = metadata_column_u32(e4, 0)?;
3578        let e4_col1 = metadata_column_u32(e4, 1)?;
3579        let n_e3 = self.metadata_logical_rows(e3)?;
3580        let n_e4 = self.metadata_logical_rows(e4)?;
3581        let value_sel: u32 = match value {
3582            Wcoj4CycleRootAggValue::X => 0,
3583            Wcoj4CycleRootAggValue::Y => 1,
3584            Wcoj4CycleRootAggValue::Z => 2,
3585        };
3586
3587        // Per-e1-row match counters + aggregate partials, allocated as
3588        // the u8-backed column layout so the arrays double as the staging
3589        // buffer's columns after the kernel fills them.
3590        let mut row_counts = self
3591            .memory()
3592            .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3593        self.device()
3594            .inner()
3595            .memset_zeros(&mut row_counts)
3596            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
3597        let mut row_agg = self.memory().alloc::<u8>(n_e1 as usize * agg_elem_size)?;
3598        self.device()
3599            .inner()
3600            .memset_zeros(&mut row_agg)
3601            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row aggregates failed: {e}")))?;
3602
3603        let grid = plan.total_work.div_ceil(plan.block_work_unit);
3604        let mut rec = LaunchRecorder::new_strict(launch_stream);
3605        rec.read(e1.num_rows_device());
3606        rec.read(e2.num_rows_device());
3607        rec.read(e3.num_rows_device());
3608        rec.read(e4.num_rows_device());
3609        rec.read_column(e1.column(0).expect("e1.col0"));
3610        rec.read_column(e1.column(1).expect("e1.col1"));
3611        rec.read_column(e2.column(1).expect("e2.col1"));
3612        rec.read_column(e3.column(0).expect("e3.col0"));
3613        rec.read_column(e3.column(1).expect("e3.col1"));
3614        rec.read_column(e4.column(0).expect("e4.col0"));
3615        rec.read_column(e4.column(1).expect("e4.col1"));
3616        rec.read(&plan.e1_work_prefix);
3617        rec.read(&plan.e2_work_prefix);
3618        rec.read(&plan.e1_e2_start);
3619        rec.read(&plan.e1_e2_end);
3620        rec.write(&row_counts);
3621        rec.write(&row_agg);
3622        rec.preflight(runtime)
3623            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
3624        if matches!(agg_op, AggOp::Min) {
3625            // Min identity: u32::MAX (compaction drops untouched rows).
3626            let fill = self
3627                .device()
3628                .inner()
3629                .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U32)
3630                .ok_or_else(|| {
3631                    XlogError::Kernel("arith_fill_const_u32 kernel not found".to_string())
3632                })?;
3633            let row_agg_u32 = unsafe { reinterpret_u8_as_u32(&mut row_agg) };
3634            // SAFETY: arith_fill_const_u32(value, n, output)
3635            unsafe {
3636                fill.clone()
3637                    .launch_on_stream(
3638                        &cu_stream,
3639                        LaunchConfig::for_num_elems(n_e1),
3640                        (u32::MAX, n_e1, &mut *row_agg_u32),
3641                    )
3642                    .map_err(|e| {
3643                        XlogError::Kernel(format!("{ctx}: min identity fill failed: {e}"))
3644                    })?;
3645            }
3646        }
3647        {
3648            let kernel = self
3649                .device()
3650                .inner()
3651                .get_func(WCOJ_MODULE, kernel_name)
3652                .ok_or_else(|| XlogError::Kernel(format!("{kernel_name} kernel not found")))?;
3653            let mut params: Vec<*mut c_void> = vec![
3654                e1_col0.as_kernel_param(),
3655                e1_col1.as_kernel_param(),
3656                plan.row_count.as_kernel_param(),
3657                e2_col1.as_kernel_param(),
3658                e3_col0.as_kernel_param(),
3659                e3_col1.as_kernel_param(),
3660                n_e3.as_kernel_param(),
3661                e4_col0.as_kernel_param(),
3662                e4_col1.as_kernel_param(),
3663                n_e4.as_kernel_param(),
3664                value_sel.as_kernel_param(),
3665                (&plan.e1_work_prefix).as_kernel_param(),
3666                (&plan.e2_work_prefix).as_kernel_param(),
3667                (&plan.e1_e2_start).as_kernel_param(),
3668                (&plan.e1_e2_end).as_kernel_param(),
3669                plan.total_work.as_kernel_param(),
3670                plan.block_work_unit.as_kernel_param(),
3671                (&row_counts).as_kernel_param(),
3672                (&row_agg).as_kernel_param(),
3673            ];
3674            unsafe {
3675                kernel
3676                    .clone()
3677                    .launch_on_stream(
3678                        &cu_stream,
3679                        LaunchConfig {
3680                            grid_dim: (grid, 1, 1),
3681                            block_dim: (BLOCK_SIZE, 1, 1),
3682                            shared_mem_bytes: 0,
3683                        },
3684                        &mut params,
3685                    )
3686                    .map_err(|e| {
3687                        XlogError::Kernel(format!("{ctx}: groupby-agg launch failed: {e}"))
3688                    })?;
3689            }
3690        }
3691        rec.commit(runtime)
3692            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
3693
3694        // Staging buffer (W, count, agg) over the n_e1 input rows: W is a
3695        // device-to-device copy of e1.col0; count and agg are the
3696        // kernel-filled arrays. Rows stay lex-sorted by W.
3697        let w_src = match e1.column(0).expect("e1.col0") {
3698            CudaColumn::Owned(slice) => slice,
3699            _ => {
3700                return Err(XlogError::Kernel(format!(
3701                    "{ctx}: e1.col0 must be an owned CudaColumn"
3702                )))
3703            }
3704        };
3705        let w_copy = self
3706            .memory()
3707            .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3708        // Explicit-length copy: layout-normalized columns are allocated at
3709        // capacity, which can exceed the logical n_e1 * 4 bytes a full-slice
3710        // typed copy would assert on.
3711        unsafe {
3712            let res = sys::cuMemcpyDtoD_v2(
3713                *w_copy.device_ptr(),
3714                *w_src.device_ptr(),
3715                n_e1 as usize * std::mem::size_of::<u32>(),
3716            );
3717            if res != sys::cudaError_enum::CUDA_SUCCESS {
3718                return Err(XlogError::Kernel(format!(
3719                    "{ctx}: copy W column failed: {res:?}"
3720                )));
3721            }
3722        }
3723        let mut d_num_rows = self.memory().alloc::<u32>(1)?;
3724        self.device()
3725            .inner()
3726            .dtod_copy(e1.num_rows_device(), &mut d_num_rows)
3727            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
3728        let staging_schema = Schema::new(vec![
3729            ("w".to_string(), w_type),
3730            ("count".to_string(), ScalarType::U32),
3731            ("agg".to_string(), agg_scalar),
3732        ]);
3733        let staging = CudaBuffer::from_columns_with_host_count(
3734            vec![w_copy.into(), row_counts.into(), row_agg.into()],
3735            n_e1 as u64,
3736            d_num_rows,
3737            staging_schema,
3738            n_e1,
3739        );
3740
3741        // Keep only roots with at least one completed 4-cycle, then reduce
3742        // per W with the same AggOp. Both steps run over input-sized data.
3743        let mask = self.compare_const_mask_recorded::<u32>(
3744            &staging,
3745            1,
3746            0u32,
3747            crate::CompareOp::Gt,
3748            launch_stream,
3749        )?;
3750        let compacted =
3751            self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
3752        self.groupby_multi_agg_recorded(&compacted, &[0], &[(2, agg_op)], launch_stream)
3753    }
3754
3755    /// U64-key aggregate-fused 4-cycle count sibling of
3756    /// [`Self::wcoj_4cycle_groupby_root_count_u32_recorded`]: evaluate
3757    /// `q(W, count) :- e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)` grouped by the
3758    /// variable-order root W for U64 relations, WITHOUT materializing the
3759    /// 4-cycle rows.
3760    ///
3761    /// The recorded groupby is U32/Symbol-key only, so the per-W reduction
3762    /// reuses the WCOJ relation metadata instead (mirroring
3763    /// [`Self::wcoj_triangle_groupby_root_count_u64_recorded`]): e1 is
3764    /// lex-sorted, so `wcoj_build_metadata_u64_recorded` yields one
3765    /// (unique W, group start) pair per root, and
3766    /// `wcoj_groupby_root_segment_sum_counts_u32` accumulates the per-row
3767    /// match counts into per-unique-root u64 totals (integer atomicAdd —
3768    /// deterministic). Roots with zero completions are compacted away.
3769    /// All reduction work is O(n_e1).
3770    ///
3771    /// Output schema matches the unfused materialize+groupby baseline:
3772    /// `col0` = W (U64), `col1` = count (U64).
3773    ///
3774    /// # Errors
3775    /// * `XlogError::Kernel` if the manager has no runtime, the launch
3776    ///   stream does not resolve, an input is not 2-column U64, or any
3777    ///   kernel launch fails.
3778    pub fn wcoj_4cycle_groupby_root_count_u64_recorded(
3779        &self,
3780        e1: &CudaBuffer,
3781        e2: &CudaBuffer,
3782        e3: &CudaBuffer,
3783        e4: &CudaBuffer,
3784        block_work_unit: u32,
3785        launch_stream: StreamId,
3786    ) -> Result<CudaBuffer> {
3787        let ctx = "wcoj_4cycle_groupby_root_count_u64_recorded";
3788        // Layout-normalize per dispatch (sorted-fast-path clone when the
3789        // input is already lex-sorted + unique): the fused path must give
3790        // the same guarantee as the unfused pipeline instead of trusting
3791        // store-buffer sortedness — unsorted/duplicated inputs previously
3792        // produced silently wrong (empty) fused results.
3793        let e1 = &self.wcoj_layout_u64_recorded(e1, launch_stream)?;
3794        let e2 = &self.wcoj_layout_u64_recorded(e2, launch_stream)?;
3795        let e3 = &self.wcoj_layout_u64_recorded(e3, launch_stream)?;
3796        let e4 = &self.wcoj_layout_u64_recorded(e4, launch_stream)?;
3797        validate_binary_u64(ctx, "e1", e1)?;
3798        validate_binary_u64(ctx, "e2", e2)?;
3799        validate_binary_u64(ctx, "e3", e3)?;
3800        validate_binary_u64(ctx, "e4", e4)?;
3801        let plan = self.wcoj_4cycle_hg_work_plan_u64_recorded(
3802            e1,
3803            e2,
3804            e3,
3805            e4,
3806            block_work_unit,
3807            launch_stream,
3808        )?;
3809        let n_e1 = plan.row_count;
3810        let out_schema = Schema::new(vec![
3811            ("w".to_string(), ScalarType::U64),
3812            ("count".to_string(), ScalarType::U64),
3813        ]);
3814        if n_e1 == 0 || plan.total_work == 0 {
3815            return self.create_empty_buffer(out_schema);
3816        }
3817
3818        let runtime = self.memory().runtime().ok_or_else(|| {
3819            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
3820        })?;
3821        let cu_stream = runtime
3822            .stream_pool()
3823            .resolve(launch_stream)
3824            .ok_or_else(|| {
3825                XlogError::Kernel(format!(
3826                    "{ctx}: launch_stream StreamId({}) does not resolve",
3827                    launch_stream.0
3828                ))
3829            })?;
3830
3831        let e1_col0 = metadata_column_u64(e1, 0)?;
3832        let e1_col1 = metadata_column_u64(e1, 1)?;
3833        let e2_col1 = metadata_column_u64(e2, 1)?;
3834        let e3_col0 = metadata_column_u64(e3, 0)?;
3835        let e3_col1 = metadata_column_u64(e3, 1)?;
3836        let e4_col0 = metadata_column_u64(e4, 0)?;
3837        let e4_col1 = metadata_column_u64(e4, 1)?;
3838        let n_e3 = self.metadata_logical_rows(e3)?;
3839        let n_e4 = self.metadata_logical_rows(e4)?;
3840
3841        // Per-e1-row match counters, zero-initialized.
3842        let mut row_counts = self.memory().alloc::<u32>(n_e1 as usize)?;
3843        self.device()
3844            .inner()
3845            .memset_zeros(&mut row_counts)
3846            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
3847
3848        let grid = plan.total_work.div_ceil(plan.block_work_unit);
3849        let mut rec = LaunchRecorder::new_strict(launch_stream);
3850        rec.read(e1.num_rows_device());
3851        rec.read(e2.num_rows_device());
3852        rec.read(e3.num_rows_device());
3853        rec.read(e4.num_rows_device());
3854        rec.read_column(e1.column(0).expect("e1.col0"));
3855        rec.read_column(e1.column(1).expect("e1.col1"));
3856        rec.read_column(e2.column(1).expect("e2.col1"));
3857        rec.read_column(e3.column(0).expect("e3.col0"));
3858        rec.read_column(e3.column(1).expect("e3.col1"));
3859        rec.read_column(e4.column(0).expect("e4.col0"));
3860        rec.read_column(e4.column(1).expect("e4.col1"));
3861        rec.read(&plan.e1_work_prefix);
3862        rec.read(&plan.e2_work_prefix);
3863        rec.read(&plan.e1_e2_start);
3864        rec.read(&plan.e1_e2_end);
3865        rec.write(&row_counts);
3866        rec.preflight(runtime)
3867            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
3868        {
3869            let kernel = self
3870                .device()
3871                .inner()
3872                .get_func(
3873                    WCOJ_MODULE,
3874                    wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_COUNT_HG_U64,
3875                )
3876                .ok_or_else(|| {
3877                    XlogError::Kernel(
3878                        "wcoj_4cycle_groupby_root_count_hg_u64 kernel not found".to_string(),
3879                    )
3880                })?;
3881            let mut params: Vec<*mut c_void> = vec![
3882                e1_col0.as_kernel_param(),
3883                e1_col1.as_kernel_param(),
3884                plan.row_count.as_kernel_param(),
3885                e2_col1.as_kernel_param(),
3886                e3_col0.as_kernel_param(),
3887                e3_col1.as_kernel_param(),
3888                n_e3.as_kernel_param(),
3889                e4_col0.as_kernel_param(),
3890                e4_col1.as_kernel_param(),
3891                n_e4.as_kernel_param(),
3892                (&plan.e1_work_prefix).as_kernel_param(),
3893                (&plan.e2_work_prefix).as_kernel_param(),
3894                (&plan.e1_e2_start).as_kernel_param(),
3895                (&plan.e1_e2_end).as_kernel_param(),
3896                plan.total_work.as_kernel_param(),
3897                plan.block_work_unit.as_kernel_param(),
3898                (&row_counts).as_kernel_param(),
3899            ];
3900            unsafe {
3901                kernel
3902                    .clone()
3903                    .launch_on_stream(
3904                        &cu_stream,
3905                        LaunchConfig {
3906                            grid_dim: (grid, 1, 1),
3907                            block_dim: (BLOCK_SIZE, 1, 1),
3908                            shared_mem_bytes: 0,
3909                        },
3910                        &mut params,
3911                    )
3912                    .map_err(|e| {
3913                        XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
3914                    })?;
3915            }
3916        }
3917        rec.commit(runtime)
3918            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
3919
3920        // Per-W reduction via the relation metadata: one (unique W, group
3921        // start) pair per root; e1 is lex-sorted by W so group rows are
3922        // contiguous.
3923        let meta = self.wcoj_build_metadata_u64_recorded(e1, 0, launch_stream)?;
3924        let key_count = meta.key_count;
3925        if key_count == 0 {
3926            return self.create_empty_buffer(out_schema);
3927        }
3928        let mut sums = self
3929            .memory()
3930            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
3931        self.device()
3932            .inner()
3933            .memset_zeros(&mut sums)
3934            .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group sums failed: {e}")))?;
3935
3936        let mut rec_sum = LaunchRecorder::new_strict(launch_stream);
3937        rec_sum.read(&row_counts);
3938        rec_sum.read(&meta.prefix_sum);
3939        rec_sum.write(&sums);
3940        rec_sum
3941            .preflight(runtime)
3942            .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce preflight failed: {e}")))?;
3943        {
3944            let kernel = self
3945                .device()
3946                .inner()
3947                .get_func(
3948                    WCOJ_MODULE,
3949                    wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_COUNTS_U32,
3950                )
3951                .ok_or_else(|| {
3952                    XlogError::Kernel(
3953                        "wcoj_groupby_root_segment_sum_counts_u32 kernel not found".to_string(),
3954                    )
3955                })?;
3956            let reduce_grid = n_e1.div_ceil(BLOCK_SIZE);
3957            let mut params: Vec<*mut c_void> = vec![
3958                (&row_counts).as_kernel_param(),
3959                n_e1.as_kernel_param(),
3960                (&meta.prefix_sum).as_kernel_param(),
3961                key_count.as_kernel_param(),
3962                (&sums).as_kernel_param(),
3963            ];
3964            unsafe {
3965                kernel
3966                    .clone()
3967                    .launch_on_stream(
3968                        &cu_stream,
3969                        LaunchConfig {
3970                            grid_dim: (reduce_grid, 1, 1),
3971                            block_dim: (BLOCK_SIZE, 1, 1),
3972                            shared_mem_bytes: 0,
3973                        },
3974                        &mut params,
3975                    )
3976                    .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce launch failed: {e}")))?;
3977            }
3978        }
3979        rec_sum
3980            .commit(runtime)
3981            .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce commit failed: {e}")))?;
3982
3983        // (unique W, total) buffer over the key_count roots, then drop the
3984        // roots with no completion. The copies run on launch_stream and the
3985        // fresh destination blocks are registered through the strict
3986        // recorder BEFORE the enqueue — a raw async copy into a freshly
3987        // pool-allocated block without recording is a visibility race.
3988        let w_copy = self
3989            .memory()
3990            .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
3991        let d_num_rows = self.memory().alloc::<u32>(1)?;
3992        let mut rec_copy = LaunchRecorder::new_strict(launch_stream);
3993        rec_copy.read(&meta.unique_keys);
3994        rec_copy.write(&w_copy);
3995        rec_copy.write(&d_num_rows);
3996        rec_copy
3997            .preflight(runtime)
3998            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy preflight failed: {e}")))?;
3999        unsafe {
4000            let res = sys::cuMemcpyDtoDAsync_v2(
4001                *w_copy.device_ptr(),
4002                *meta.unique_keys.device_ptr(),
4003                key_count as usize * std::mem::size_of::<u64>(),
4004                cu_stream.cu_stream(),
4005            );
4006            if res != sys::cudaError_enum::CUDA_SUCCESS {
4007                return Err(XlogError::Kernel(format!(
4008                    "{ctx}: DtoD unique keys copy failed: {res:?}"
4009                )));
4010            }
4011        }
4012        self.htod_launch_metadata_async_copy_one(
4013            &key_count,
4014            &d_num_rows,
4015            &cu_stream,
4016            &format!("{ctx}: d_num_rows"),
4017        )?;
4018        rec_copy
4019            .commit(runtime)
4020            .map_err(|e| XlogError::Kernel(format!("{ctx}: copy commit failed: {e}")))?;
4021        cu_stream
4022            .synchronize()
4023            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
4024        let staging_schema = Schema::new(vec![
4025            ("w".to_string(), ScalarType::U64),
4026            ("count".to_string(), ScalarType::U64),
4027        ]);
4028        let staging = CudaBuffer::from_columns_with_host_count(
4029            vec![w_copy.into(), sums.into()],
4030            u64::from(key_count),
4031            d_num_rows,
4032            staging_schema,
4033            key_count,
4034        );
4035        let mask = self.compare_const_mask_recorded::<u64>(
4036            &staging,
4037            1,
4038            0u64,
4039            crate::CompareOp::Gt,
4040            launch_stream,
4041        )?;
4042        self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)
4043    }
4044
4045    pub fn wcoj_4cycle_hg_work_plan_u64_recorded(
4046        &self,
4047        e1: &CudaBuffer,
4048        e2: &CudaBuffer,
4049        e3: &CudaBuffer,
4050        e4: &CudaBuffer,
4051        block_work_unit: u32,
4052        launch_stream: StreamId,
4053    ) -> Result<WcojCycle4HgWorkPlanU64> {
4054        let ctx = "wcoj_4cycle_hg_work_plan_u64_recorded";
4055        if block_work_unit == 0 {
4056            return Err(XlogError::Kernel(format!(
4057                "{ctx}: block_work_unit must be nonzero"
4058            )));
4059        }
4060        validate_binary_u64(ctx, "e1", e1)?;
4061        validate_binary_u64(ctx, "e2", e2)?;
4062        validate_binary_u64(ctx, "e3", e3)?;
4063        validate_binary_u64(ctx, "e4", e4)?;
4064
4065        let n_e1 = self.metadata_logical_rows(e1)?;
4066        let n_e2 = self.metadata_logical_rows(e2)?;
4067        let n_e3 = self.metadata_logical_rows(e3)?;
4068        let prefix_len = n_e1
4069            .checked_add(1)
4070            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
4071        let e2_prefix_len = n_e2
4072            .checked_add(1)
4073            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: e2 prefix length overflow")))?;
4074        let mut e1_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
4075        let mut e2_work_prefix = self.memory().alloc::<u32>(e2_prefix_len as usize)?;
4076        let mut e1_e2_start = self.memory().alloc::<u32>(n_e1 as usize)?;
4077        let mut e1_e2_end = self.memory().alloc::<u32>(n_e1 as usize)?;
4078
4079        if n_e1 == 0 || n_e2 == 0 || n_e3 == 0 || self.metadata_logical_rows(e4)? == 0 {
4080            let block_counts = self.memory().alloc::<u32>(1)?;
4081            let block_offsets = self.memory().alloc::<u32>(1)?;
4082            return Ok(WcojCycle4HgWorkPlanU64 {
4083                e1_work_prefix,
4084                e2_work_prefix,
4085                e1_e2_start,
4086                e1_e2_end,
4087                block_counts,
4088                block_offsets,
4089                total_work: 0,
4090                block_work_unit,
4091                row_count: n_e1,
4092            });
4093        }
4094
4095        let runtime = self.memory().runtime().ok_or_else(|| {
4096            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
4097        })?;
4098        let cu_stream = runtime
4099            .stream_pool()
4100            .resolve(launch_stream)
4101            .ok_or_else(|| {
4102                XlogError::Kernel(format!(
4103                    "{ctx}: launch_stream StreamId({}) does not resolve",
4104                    launch_stream.0
4105                ))
4106            })?;
4107
4108        let e1_col1 = metadata_column_u64(e1, 1)?;
4109        let e2_col0 = metadata_column_u64(e2, 0)?;
4110        let e2_col1 = metadata_column_u64(e2, 1)?;
4111        let e3_col0 = metadata_column_u64(e3, 0)?;
4112
4113        let mut rec = LaunchRecorder::new_strict(launch_stream);
4114        rec.read(e1.num_rows_device());
4115        rec.read(e2.num_rows_device());
4116        rec.read(e3.num_rows_device());
4117        rec.read_column(e1.column(1).expect("e1.col1"));
4118        rec.read_column(e2.column(0).expect("e2.col0"));
4119        rec.read_column(e2.column(1).expect("e2.col1"));
4120        rec.read_column(e3.column(0).expect("e3.col0"));
4121        rec.read_write(&e2_work_prefix);
4122        rec.write(&e1_work_prefix);
4123        rec.write(&e1_e2_start);
4124        rec.write(&e1_e2_end);
4125        rec.preflight(runtime)
4126            .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
4127
4128        let e2_kernel = self
4129            .device()
4130            .inner()
4131            .get_func(
4132                WCOJ_MODULE,
4133                wcoj_kernels::WCOJ_4CYCLE_BUILD_E2_WORK_PREFIX_U64,
4134            )
4135            .ok_or_else(|| {
4136                XlogError::Kernel(
4137                    "wcoj_4cycle_build_e2_work_prefix_u64 kernel not found".to_string(),
4138                )
4139            })?;
4140        let e2_grid = n_e2.div_ceil(BLOCK_SIZE);
4141        unsafe {
4142            e2_kernel
4143                .clone()
4144                .launch_on_stream(
4145                    &cu_stream,
4146                    LaunchConfig {
4147                        grid_dim: (e2_grid, 1, 1),
4148                        block_dim: (BLOCK_SIZE, 1, 1),
4149                        shared_mem_bytes: 0,
4150                    },
4151                    (e2_col1, n_e2, e3_col0, n_e3, &mut e2_work_prefix),
4152                )
4153                .map_err(|e| {
4154                    XlogError::Kernel(format!(
4155                        "wcoj_4cycle_build_e2_work_prefix_u64 launch failed: {e}"
4156                    ))
4157                })?;
4158        }
4159        self.multiblock_scan_u32_inplace_on_stream(
4160            &mut e2_work_prefix,
4161            e2_prefix_len,
4162            &cu_stream,
4163            launch_stream,
4164            runtime,
4165        )?;
4166
4167        let kernel = self
4168            .device()
4169            .inner()
4170            .get_func(
4171                WCOJ_MODULE,
4172                wcoj_kernels::WCOJ_4CYCLE_BUILD_HG_WORK_PLAN_U64,
4173            )
4174            .ok_or_else(|| {
4175                XlogError::Kernel("wcoj_4cycle_build_hg_work_plan_u64 kernel not found".to_string())
4176            })?;
4177        let grid = n_e1.div_ceil(BLOCK_SIZE);
4178        unsafe {
4179            kernel
4180                .clone()
4181                .launch_on_stream(
4182                    &cu_stream,
4183                    LaunchConfig {
4184                        grid_dim: (grid, 1, 1),
4185                        block_dim: (BLOCK_SIZE, 1, 1),
4186                        shared_mem_bytes: 0,
4187                    },
4188                    (
4189                        e1_col1,
4190                        n_e1,
4191                        e2_col0,
4192                        n_e2,
4193                        &e2_work_prefix,
4194                        &mut e1_work_prefix,
4195                        &mut e1_e2_start,
4196                        &mut e1_e2_end,
4197                    ),
4198                )
4199                .map_err(|e| {
4200                    XlogError::Kernel(format!(
4201                        "wcoj_4cycle_build_hg_work_plan_u64 launch failed: {e}"
4202                    ))
4203                })?;
4204        }
4205        self.multiblock_scan_u32_inplace_on_stream(
4206            &mut e1_work_prefix,
4207            prefix_len,
4208            &cu_stream,
4209            launch_stream,
4210            runtime,
4211        )?;
4212        rec.commit(runtime)
4213            .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
4214        cu_stream
4215            .synchronize()
4216            .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
4217        let total_work = self.dtoh_scalar_untracked::<u32>(&e1_work_prefix, n_e1 as usize)?;
4218        let grid = if total_work == 0 {
4219            1
4220        } else {
4221            total_work.div_ceil(block_work_unit)
4222        };
4223        let block_counts = self.memory().alloc::<u32>(grid as usize)?;
4224        let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
4225
4226        Ok(WcojCycle4HgWorkPlanU64 {
4227            e1_work_prefix,
4228            e2_work_prefix,
4229            e1_e2_start,
4230            e1_e2_end,
4231            block_counts,
4232            block_offsets,
4233            total_work,
4234            block_work_unit,
4235            row_count: n_e1,
4236        })
4237    }
4238
4239    pub fn wcoj_4cycle_hg_u64_recorded(
4240        &self,
4241        e1: &CudaBuffer,
4242        e2: &CudaBuffer,
4243        e3: &CudaBuffer,
4244        e4: &CudaBuffer,
4245        block_work_unit: u32,
4246        launch_stream: StreamId,
4247    ) -> Result<CudaBuffer> {
4248        let ctx = "wcoj_4cycle_hg_u64_recorded";
4249        validate_binary_u64(ctx, "e1", e1)?;
4250        validate_binary_u64(ctx, "e2", e2)?;
4251        validate_binary_u64(ctx, "e3", e3)?;
4252        validate_binary_u64(ctx, "e4", e4)?;
4253        let plan = self.wcoj_4cycle_hg_work_plan_u64_recorded(
4254            e1,
4255            e2,
4256            e3,
4257            e4,
4258            block_work_unit,
4259            launch_stream,
4260        )?;
4261        let out_schema = Schema::new(vec![
4262            ("col0".to_string(), ScalarType::U64),
4263            ("col1".to_string(), ScalarType::U64),
4264            ("col2".to_string(), ScalarType::U64),
4265            ("col3".to_string(), ScalarType::U64),
4266        ]);
4267        if plan.total_work == 0 {
4268            return self.create_empty_buffer(out_schema);
4269        }
4270
4271        let grid = plan.total_work.div_ceil(plan.block_work_unit);
4272        let bytes_count = (grid as usize)
4273            .checked_mul(std::mem::size_of::<u32>())
4274            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
4275        let mut local_counts = None;
4276        let mut local_offsets = None;
4277        if grid > 1024 {
4278            local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
4279            local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
4280        }
4281        let total_rows_device = self.memory().alloc::<u32>(1)?;
4282
4283        let runtime = self.memory().runtime().ok_or_else(|| {
4284            XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
4285        })?;
4286        let cu_stream = runtime
4287            .stream_pool()
4288            .resolve(launch_stream)
4289            .ok_or_else(|| {
4290                XlogError::Kernel(format!(
4291                    "{ctx}: launch_stream StreamId({}) does not resolve",
4292                    launch_stream.0
4293                ))
4294            })?;
4295
4296        let e1_col0 = metadata_column_u64(e1, 0)?;
4297        let e1_col1 = metadata_column_u64(e1, 1)?;
4298        let e2_col1 = metadata_column_u64(e2, 1)?;
4299        let e3_col0 = metadata_column_u64(e3, 0)?;
4300        let e3_col1 = metadata_column_u64(e3, 1)?;
4301        let e4_col0 = metadata_column_u64(e4, 0)?;
4302        let e4_col1 = metadata_column_u64(e4, 1)?;
4303        let n_e3 = self.metadata_logical_rows(e3)?;
4304        let n_e4 = self.metadata_logical_rows(e4)?;
4305
4306        let count_u32 = if grid <= 1024 {
4307            &plan.block_counts
4308        } else {
4309            local_counts
4310                .as_ref()
4311                .expect("local HG counts allocated when grid exceeds single-block scan")
4312        };
4313        let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
4314        rec_hg.read(e1.num_rows_device());
4315        rec_hg.read(e2.num_rows_device());
4316        rec_hg.read(e3.num_rows_device());
4317        rec_hg.read(e4.num_rows_device());
4318        rec_hg.read_column(e1.column(0).expect("e1.col0"));
4319        rec_hg.read_column(e1.column(1).expect("e1.col1"));
4320        rec_hg.read_column(e2.column(1).expect("e2.col1"));
4321        rec_hg.read_column(e3.column(0).expect("e3.col0"));
4322        rec_hg.read_column(e3.column(1).expect("e3.col1"));
4323        rec_hg.read_column(e4.column(0).expect("e4.col0"));
4324        rec_hg.read_column(e4.column(1).expect("e4.col1"));
4325        rec_hg.read(&plan.e1_work_prefix);
4326        rec_hg.read(&plan.e2_work_prefix);
4327        rec_hg.read(&plan.e1_e2_start);
4328        rec_hg.read(&plan.e1_e2_end);
4329        rec_hg.read_write(count_u32);
4330        if grid <= 1024 {
4331            rec_hg.read_write(&plan.block_offsets);
4332        } else {
4333            rec_hg.read_write(
4334                local_offsets
4335                    .as_ref()
4336                    .expect("local HG offsets allocated when grid exceeds single-block scan"),
4337            );
4338        }
4339        rec_hg.write(&total_rows_device);
4340        rec_hg
4341            .preflight(runtime)
4342            .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
4343        {
4344            let kernel = self
4345                .device()
4346                .inner()
4347                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_COUNT_HG_U64)
4348                .ok_or_else(|| {
4349                    XlogError::Kernel("wcoj_4cycle_count_hg_u64 kernel not found".to_string())
4350                })?;
4351            let mut params: Vec<*mut c_void> = vec![
4352                e1_col0.as_kernel_param(),
4353                e1_col1.as_kernel_param(),
4354                plan.row_count.as_kernel_param(),
4355                e2_col1.as_kernel_param(),
4356                e3_col0.as_kernel_param(),
4357                e3_col1.as_kernel_param(),
4358                n_e3.as_kernel_param(),
4359                e4_col0.as_kernel_param(),
4360                e4_col1.as_kernel_param(),
4361                n_e4.as_kernel_param(),
4362                (&plan.e1_work_prefix).as_kernel_param(),
4363                (&plan.e2_work_prefix).as_kernel_param(),
4364                (&plan.e1_e2_start).as_kernel_param(),
4365                (&plan.e1_e2_end).as_kernel_param(),
4366                plan.total_work.as_kernel_param(),
4367                plan.block_work_unit.as_kernel_param(),
4368                count_u32.as_kernel_param(),
4369            ];
4370            unsafe {
4371                kernel
4372                    .clone()
4373                    .launch_on_stream(
4374                        &cu_stream,
4375                        LaunchConfig {
4376                            grid_dim: (grid, 1, 1),
4377                            block_dim: (BLOCK_SIZE, 1, 1),
4378                            shared_mem_bytes: 0,
4379                        },
4380                        &mut params,
4381                    )
4382                    .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
4383            }
4384        }
4385        if grid <= 1024 {
4386            let kernel = self
4387                .device()
4388                .inner()
4389                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
4390                .ok_or_else(|| {
4391                    XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
4392                })?;
4393            let mut params: Vec<*mut c_void> = vec![
4394                count_u32.as_kernel_param(),
4395                grid.as_kernel_param(),
4396                (&plan.block_offsets).as_kernel_param(),
4397                (&total_rows_device).as_kernel_param(),
4398            ];
4399            unsafe {
4400                kernel
4401                    .clone()
4402                    .launch_on_stream(
4403                        &cu_stream,
4404                        LaunchConfig {
4405                            grid_dim: (1, 1, 1),
4406                            block_dim: (1024, 1, 1),
4407                            shared_mem_bytes: 0,
4408                        },
4409                        &mut params,
4410                    )
4411                    .map_err(|e| XlogError::Kernel(format!("{ctx}: scan failed: {e}")))?;
4412            }
4413        } else {
4414            let offsets_mut = local_offsets
4415                .as_mut()
4416                .expect("local HG offsets allocated when grid exceeds single-block scan");
4417            unsafe {
4418                let res = sys::cuMemcpyDtoDAsync_v2(
4419                    *offsets_mut.device_ptr(),
4420                    *count_u32.device_ptr(),
4421                    bytes_count,
4422                    cu_stream.cu_stream(),
4423                );
4424                if res != sys::cudaError_enum::CUDA_SUCCESS {
4425                    return Err(XlogError::Kernel(format!(
4426                        "{ctx}: DtoD count to offsets failed: {res:?}"
4427                    )));
4428                }
4429            }
4430            self.multiblock_scan_u32_inplace_on_stream(
4431                offsets_mut,
4432                grid,
4433                &cu_stream,
4434                launch_stream,
4435                runtime,
4436            )?;
4437            let total_kernel = self
4438                .device()
4439                .inner()
4440                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
4441                .ok_or_else(|| {
4442                    XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
4443                })?;
4444            let mut params: Vec<*mut c_void> = vec![
4445                count_u32.as_kernel_param(),
4446                (&*offsets_mut).as_kernel_param(),
4447                grid.as_kernel_param(),
4448                (&total_rows_device).as_kernel_param(),
4449            ];
4450            unsafe {
4451                total_kernel
4452                    .clone()
4453                    .launch_on_stream(
4454                        &cu_stream,
4455                        LaunchConfig {
4456                            grid_dim: (1, 1, 1),
4457                            block_dim: (1, 1, 1),
4458                            shared_mem_bytes: 0,
4459                        },
4460                        &mut params,
4461                    )
4462                    .map_err(|e| XlogError::Kernel(format!("{ctx}: total failed: {e}")))?;
4463            }
4464        }
4465        rec_hg
4466            .commit(runtime)
4467            .map_err(|e| XlogError::Kernel(format!("{ctx}: count commit failed: {e}")))?;
4468        cu_stream
4469            .synchronize()
4470            .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
4471        let total_rows = self
4472            .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
4473            .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
4474        if total_rows == 0 {
4475            return self.create_empty_buffer(out_schema);
4476        }
4477
4478        let bytes_per_col = (total_rows as usize)
4479            .checked_mul(std::mem::size_of::<u64>())
4480            .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
4481        let mut out_w = self.memory().alloc::<u8>(bytes_per_col)?;
4482        let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
4483        let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
4484        let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
4485        let materialize_offsets = if grid <= 1024 {
4486            &plan.block_offsets
4487        } else {
4488            local_offsets
4489                .as_ref()
4490                .expect("local HG offsets allocated when grid exceeds single-block scan")
4491        };
4492        let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
4493        rec_mat.read(materialize_offsets);
4494        rec_mat.read(e1.num_rows_device());
4495        rec_mat.read(e2.num_rows_device());
4496        rec_mat.read(e3.num_rows_device());
4497        rec_mat.read(e4.num_rows_device());
4498        rec_mat.read_column(e1.column(0).expect("e1.col0"));
4499        rec_mat.read_column(e1.column(1).expect("e1.col1"));
4500        rec_mat.read_column(e2.column(1).expect("e2.col1"));
4501        rec_mat.read_column(e3.column(0).expect("e3.col0"));
4502        rec_mat.read_column(e3.column(1).expect("e3.col1"));
4503        rec_mat.read_column(e4.column(0).expect("e4.col0"));
4504        rec_mat.read_column(e4.column(1).expect("e4.col1"));
4505        rec_mat.read(&plan.e1_work_prefix);
4506        rec_mat.read(&plan.e2_work_prefix);
4507        rec_mat.read(&plan.e1_e2_start);
4508        rec_mat.read(&plan.e1_e2_end);
4509        rec_mat.write(&out_w);
4510        rec_mat.write(&out_x);
4511        rec_mat.write(&out_y);
4512        rec_mat.write(&out_z);
4513        rec_mat
4514            .preflight(runtime)
4515            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
4516        {
4517            let kernel = self
4518                .device()
4519                .inner()
4520                .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_MATERIALIZE_HG_U64)
4521                .ok_or_else(|| {
4522                    XlogError::Kernel("wcoj_4cycle_materialize_hg_u64 kernel not found".to_string())
4523                })?;
4524            let out_w_u64 = unsafe { reinterpret_u8_as_u64(&mut out_w) };
4525            let out_x_u64 = unsafe { reinterpret_u8_as_u64(&mut out_x) };
4526            let out_y_u64 = unsafe { reinterpret_u8_as_u64(&mut out_y) };
4527            let out_z_u64 = unsafe { reinterpret_u8_as_u64(&mut out_z) };
4528            let mut params: Vec<*mut c_void> = vec![
4529                e1_col0.as_kernel_param(),
4530                e1_col1.as_kernel_param(),
4531                plan.row_count.as_kernel_param(),
4532                e2_col1.as_kernel_param(),
4533                e3_col0.as_kernel_param(),
4534                e3_col1.as_kernel_param(),
4535                n_e3.as_kernel_param(),
4536                e4_col0.as_kernel_param(),
4537                e4_col1.as_kernel_param(),
4538                n_e4.as_kernel_param(),
4539                (&plan.e1_work_prefix).as_kernel_param(),
4540                (&plan.e2_work_prefix).as_kernel_param(),
4541                (&plan.e1_e2_start).as_kernel_param(),
4542                (&plan.e1_e2_end).as_kernel_param(),
4543                plan.total_work.as_kernel_param(),
4544                plan.block_work_unit.as_kernel_param(),
4545                materialize_offsets.as_kernel_param(),
4546                total_rows.as_kernel_param(),
4547                out_w_u64.as_kernel_param(),
4548                out_x_u64.as_kernel_param(),
4549                out_y_u64.as_kernel_param(),
4550                out_z_u64.as_kernel_param(),
4551            ];
4552            unsafe {
4553                kernel
4554                    .clone()
4555                    .launch_on_stream(
4556                        &cu_stream,
4557                        LaunchConfig {
4558                            grid_dim: (grid, 1, 1),
4559                            block_dim: (BLOCK_SIZE, 1, 1),
4560                            shared_mem_bytes: 0,
4561                        },
4562                        &mut params,
4563                    )
4564                    .map_err(|e| {
4565                        XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
4566                    })?;
4567            }
4568        }
4569        rec_mat
4570            .commit(runtime)
4571            .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
4572        cu_stream.synchronize().map_err(|e| {
4573            XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
4574        })?;
4575
4576        Ok(CudaBuffer::from_columns_with_host_count(
4577            vec![out_w.into(), out_x.into(), out_y.into(), out_z.into()],
4578            total_rows as u64,
4579            total_rows_device,
4580            out_schema,
4581            total_rows,
4582        ))
4583    }
4584
4585    fn validate_metadata_column(
4586        &self,
4587        input: &CudaBuffer,
4588        key_col_idx: usize,
4589        width: MetadataWidth,
4590    ) -> Result<()> {
4591        if key_col_idx >= input.arity() {
4592            return Err(XlogError::Kernel(format!(
4593                "wcoj_build_metadata: key column {} out of range for arity {}",
4594                key_col_idx,
4595                input.arity()
4596            )));
4597        }
4598        let ty = input.schema().column_type(key_col_idx).ok_or_else(|| {
4599            XlogError::Kernel(format!(
4600                "wcoj_build_metadata: column {} type missing",
4601                key_col_idx
4602            ))
4603        })?;
4604        match (width, ty) {
4605            (MetadataWidth::U32, ScalarType::U32 | ScalarType::Symbol)
4606            | (MetadataWidth::U64, ScalarType::U64) => Ok(()),
4607            (MetadataWidth::U32, other) => Err(XlogError::Kernel(format!(
4608                "wcoj_build_metadata_u32_recorded: column {} must be U32 or Symbol, got {:?}",
4609                key_col_idx, other
4610            ))),
4611            (MetadataWidth::U64, other) => Err(XlogError::Kernel(format!(
4612                "wcoj_build_metadata_u64_recorded: column {} must be U64, got {:?}",
4613                key_col_idx, other
4614            ))),
4615        }
4616    }
4617
4618    fn build_metadata_u32_from_column(
4619        &self,
4620        input: &CudaBuffer,
4621        key_col_idx: usize,
4622        keys: &TrackedCudaSlice<u32>,
4623        launch_stream: StreamId,
4624    ) -> Result<WcojRelationMetadata<u32>> {
4625        let n = self.metadata_logical_rows(input)?;
4626        if n == 0 {
4627            return Ok(WcojRelationMetadata {
4628                unique_keys: self.memory().alloc::<u32>(0)?,
4629                fan_out: self.memory().alloc::<u32>(0)?,
4630                prefix_sum: self.memory().alloc::<u32>(0)?,
4631                per_candidate_root: BTreeMap::new(),
4632                total: 0,
4633                key_count: 0,
4634                row_count: 0,
4635            });
4636        }
4637        let mut boundary_mask = self.memory().alloc::<u32>(n as usize)?;
4638        let mut boundary_prefix = self.memory().alloc::<u32>(n as usize)?;
4639        self.mark_metadata_boundaries_u32(
4640            input,
4641            key_col_idx,
4642            keys,
4643            n,
4644            &mut boundary_mask,
4645            &mut boundary_prefix,
4646            launch_stream,
4647        )?;
4648        let key_count = self.metadata_scanned_count(&boundary_mask, &boundary_prefix, n)?;
4649
4650        let mut unique_keys = self.memory().alloc::<u32>(key_count as usize)?;
4651        let mut fan_out = self.memory().alloc::<u32>(key_count as usize)?;
4652        let mut prefix_sum = self.memory().alloc::<u32>(key_count as usize)?;
4653        self.scatter_metadata_u32(
4654            input,
4655            key_col_idx,
4656            keys,
4657            n,
4658            &boundary_mask,
4659            &boundary_prefix,
4660            &mut unique_keys,
4661            &mut fan_out,
4662            &mut prefix_sum,
4663            launch_stream,
4664        )?;
4665
4666        Ok(WcojRelationMetadata {
4667            unique_keys,
4668            fan_out,
4669            prefix_sum,
4670            per_candidate_root: BTreeMap::new(),
4671            total: u64::from(n),
4672            key_count,
4673            row_count: n,
4674        })
4675    }
4676
4677    fn build_metadata_u64_from_column(
4678        &self,
4679        input: &CudaBuffer,
4680        key_col_idx: usize,
4681        keys: &TrackedCudaSlice<u64>,
4682        launch_stream: StreamId,
4683    ) -> Result<WcojRelationMetadata<u64>> {
4684        let n = self.metadata_logical_rows(input)?;
4685        if n == 0 {
4686            return Ok(WcojRelationMetadata {
4687                unique_keys: self.memory().alloc::<u64>(0)?,
4688                fan_out: self.memory().alloc::<u32>(0)?,
4689                prefix_sum: self.memory().alloc::<u32>(0)?,
4690                per_candidate_root: BTreeMap::new(),
4691                total: 0,
4692                key_count: 0,
4693                row_count: 0,
4694            });
4695        }
4696        let mut boundary_mask = self.memory().alloc::<u32>(n as usize)?;
4697        let mut boundary_prefix = self.memory().alloc::<u32>(n as usize)?;
4698        self.mark_metadata_boundaries_u64(
4699            input,
4700            key_col_idx,
4701            keys,
4702            n,
4703            &mut boundary_mask,
4704            &mut boundary_prefix,
4705            launch_stream,
4706        )?;
4707        let key_count = self.metadata_scanned_count(&boundary_mask, &boundary_prefix, n)?;
4708
4709        let mut unique_keys = self.memory().alloc::<u64>(key_count as usize)?;
4710        let mut fan_out = self.memory().alloc::<u32>(key_count as usize)?;
4711        let mut prefix_sum = self.memory().alloc::<u32>(key_count as usize)?;
4712        self.scatter_metadata_u64(
4713            input,
4714            key_col_idx,
4715            keys,
4716            n,
4717            &boundary_mask,
4718            &boundary_prefix,
4719            &mut unique_keys,
4720            &mut fan_out,
4721            &mut prefix_sum,
4722            launch_stream,
4723        )?;
4724
4725        Ok(WcojRelationMetadata {
4726            unique_keys,
4727            fan_out,
4728            prefix_sum,
4729            per_candidate_root: BTreeMap::new(),
4730            total: u64::from(n),
4731            key_count,
4732            row_count: n,
4733        })
4734    }
4735
4736    #[allow(clippy::too_many_arguments)]
4737    fn mark_metadata_boundaries_u32(
4738        &self,
4739        input: &CudaBuffer,
4740        key_col_idx: usize,
4741        keys: &TrackedCudaSlice<u32>,
4742        n: u32,
4743        boundary_mask: &mut TrackedCudaSlice<u32>,
4744        boundary_prefix: &mut TrackedCudaSlice<u32>,
4745        launch_stream: StreamId,
4746    ) -> Result<()> {
4747        let runtime = self.memory().runtime().ok_or_else(|| {
4748            XlogError::Kernel(
4749                "wcoj_build_metadata_u32_recorded requires a runtime-backed GpuMemoryManager"
4750                    .to_string(),
4751            )
4752        })?;
4753        let cu_stream = runtime
4754            .stream_pool()
4755            .resolve(launch_stream)
4756            .ok_or_else(|| {
4757                XlogError::Kernel(format!(
4758                    "wcoj_build_metadata_u32_recorded: launch_stream StreamId({}) does not resolve",
4759                    launch_stream.0
4760                ))
4761            })?;
4762        let mut rec = LaunchRecorder::new_strict(launch_stream);
4763        rec.read(input.num_rows_device());
4764        rec.read_column(input.column(key_col_idx).expect("metadata key column"));
4765        rec.write(boundary_mask);
4766        rec.write(boundary_prefix);
4767        rec.preflight(runtime).map_err(|e| {
4768            XlogError::Kernel(format!(
4769                "wcoj_build_metadata_u32_recorded: mark preflight failed: {e}"
4770            ))
4771        })?;
4772
4773        let kernel = self
4774            .device()
4775            .inner()
4776            .get_func(
4777                WCOJ_MODULE,
4778                wcoj_kernels::WCOJ_BUILD_METADATA_MARK_BOUNDARIES_U32,
4779            )
4780            .ok_or_else(|| {
4781                XlogError::Kernel(
4782                    "wcoj_build_metadata_mark_boundaries_u32 kernel not found".to_string(),
4783                )
4784            })?;
4785        let grid = n.div_ceil(BLOCK_SIZE);
4786        unsafe {
4787            kernel
4788                .clone()
4789                .launch_on_stream(
4790                    &cu_stream,
4791                    LaunchConfig {
4792                        grid_dim: (grid, 1, 1),
4793                        block_dim: (BLOCK_SIZE, 1, 1),
4794                        shared_mem_bytes: 0,
4795                    },
4796                    (keys, n, &mut *boundary_mask, &mut *boundary_prefix),
4797                )
4798                .map_err(|e| {
4799                    XlogError::Kernel(format!(
4800                        "wcoj_build_metadata_mark_boundaries_u32 launch failed: {e}"
4801                    ))
4802                })?;
4803        }
4804        self.multiblock_scan_u32_inplace_on_stream(
4805            boundary_prefix,
4806            n,
4807            &cu_stream,
4808            launch_stream,
4809            runtime,
4810        )?;
4811        rec.commit(runtime).map_err(|e| {
4812            XlogError::Kernel(format!(
4813                "wcoj_build_metadata_u32_recorded: mark commit failed: {e}"
4814            ))
4815        })?;
4816        cu_stream.synchronize().map_err(|e| {
4817            XlogError::Kernel(format!(
4818                "wcoj_build_metadata_u32_recorded: mark stream sync failed: {e}"
4819            ))
4820        })?;
4821        Ok(())
4822    }
4823
4824    #[allow(clippy::too_many_arguments)]
4825    fn mark_metadata_boundaries_u64(
4826        &self,
4827        input: &CudaBuffer,
4828        key_col_idx: usize,
4829        keys: &TrackedCudaSlice<u64>,
4830        n: u32,
4831        boundary_mask: &mut TrackedCudaSlice<u32>,
4832        boundary_prefix: &mut TrackedCudaSlice<u32>,
4833        launch_stream: StreamId,
4834    ) -> Result<()> {
4835        let runtime = self.memory().runtime().ok_or_else(|| {
4836            XlogError::Kernel(
4837                "wcoj_build_metadata_u64_recorded requires a runtime-backed GpuMemoryManager"
4838                    .to_string(),
4839            )
4840        })?;
4841        let cu_stream = runtime
4842            .stream_pool()
4843            .resolve(launch_stream)
4844            .ok_or_else(|| {
4845                XlogError::Kernel(format!(
4846                    "wcoj_build_metadata_u64_recorded: launch_stream StreamId({}) does not resolve",
4847                    launch_stream.0
4848                ))
4849            })?;
4850        let mut rec = LaunchRecorder::new_strict(launch_stream);
4851        rec.read(input.num_rows_device());
4852        rec.read_column(input.column(key_col_idx).expect("metadata key column"));
4853        rec.write(boundary_mask);
4854        rec.write(boundary_prefix);
4855        rec.preflight(runtime).map_err(|e| {
4856            XlogError::Kernel(format!(
4857                "wcoj_build_metadata_u64_recorded: mark preflight failed: {e}"
4858            ))
4859        })?;
4860
4861        let kernel = self
4862            .device()
4863            .inner()
4864            .get_func(
4865                WCOJ_MODULE,
4866                wcoj_kernels::WCOJ_BUILD_METADATA_MARK_BOUNDARIES_U64,
4867            )
4868            .ok_or_else(|| {
4869                XlogError::Kernel(
4870                    "wcoj_build_metadata_mark_boundaries_u64 kernel not found".to_string(),
4871                )
4872            })?;
4873        let grid = n.div_ceil(BLOCK_SIZE);
4874        unsafe {
4875            kernel
4876                .clone()
4877                .launch_on_stream(
4878                    &cu_stream,
4879                    LaunchConfig {
4880                        grid_dim: (grid, 1, 1),
4881                        block_dim: (BLOCK_SIZE, 1, 1),
4882                        shared_mem_bytes: 0,
4883                    },
4884                    (keys, n, &mut *boundary_mask, &mut *boundary_prefix),
4885                )
4886                .map_err(|e| {
4887                    XlogError::Kernel(format!(
4888                        "wcoj_build_metadata_mark_boundaries_u64 launch failed: {e}"
4889                    ))
4890                })?;
4891        }
4892        self.multiblock_scan_u32_inplace_on_stream(
4893            boundary_prefix,
4894            n,
4895            &cu_stream,
4896            launch_stream,
4897            runtime,
4898        )?;
4899        rec.commit(runtime).map_err(|e| {
4900            XlogError::Kernel(format!(
4901                "wcoj_build_metadata_u64_recorded: mark commit failed: {e}"
4902            ))
4903        })?;
4904        cu_stream.synchronize().map_err(|e| {
4905            XlogError::Kernel(format!(
4906                "wcoj_build_metadata_u64_recorded: mark stream sync failed: {e}"
4907            ))
4908        })?;
4909        Ok(())
4910    }
4911
4912    #[allow(clippy::too_many_arguments)]
4913    fn scatter_metadata_u32(
4914        &self,
4915        input: &CudaBuffer,
4916        key_col_idx: usize,
4917        keys: &TrackedCudaSlice<u32>,
4918        n: u32,
4919        boundary_mask: &TrackedCudaSlice<u32>,
4920        boundary_prefix: &TrackedCudaSlice<u32>,
4921        unique_keys: &mut TrackedCudaSlice<u32>,
4922        fan_out: &mut TrackedCudaSlice<u32>,
4923        prefix_sum: &mut TrackedCudaSlice<u32>,
4924        launch_stream: StreamId,
4925    ) -> Result<()> {
4926        let runtime = self.memory().runtime().ok_or_else(|| {
4927            XlogError::Kernel(
4928                "wcoj_build_metadata_u32_recorded requires a runtime-backed GpuMemoryManager"
4929                    .to_string(),
4930            )
4931        })?;
4932        let cu_stream = runtime
4933            .stream_pool()
4934            .resolve(launch_stream)
4935            .ok_or_else(|| {
4936                XlogError::Kernel(format!(
4937                    "wcoj_build_metadata_u32_recorded: launch_stream StreamId({}) does not resolve",
4938                    launch_stream.0
4939                ))
4940            })?;
4941        let mut rec = LaunchRecorder::new_strict(launch_stream);
4942        rec.read(input.num_rows_device());
4943        rec.read_column(input.column(key_col_idx).expect("metadata key column"));
4944        rec.read(boundary_mask);
4945        rec.read(boundary_prefix);
4946        rec.write(unique_keys);
4947        rec.write(fan_out);
4948        rec.write(prefix_sum);
4949        rec.preflight(runtime).map_err(|e| {
4950            XlogError::Kernel(format!(
4951                "wcoj_build_metadata_u32_recorded: scatter preflight failed: {e}"
4952            ))
4953        })?;
4954
4955        let kernel = self
4956            .device()
4957            .inner()
4958            .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_BUILD_METADATA_SCATTER_U32)
4959            .ok_or_else(|| {
4960                XlogError::Kernel("wcoj_build_metadata_scatter_u32 kernel not found".to_string())
4961            })?;
4962        let grid = n.div_ceil(BLOCK_SIZE);
4963        unsafe {
4964            kernel
4965                .clone()
4966                .launch_on_stream(
4967                    &cu_stream,
4968                    LaunchConfig {
4969                        grid_dim: (grid, 1, 1),
4970                        block_dim: (BLOCK_SIZE, 1, 1),
4971                        shared_mem_bytes: 0,
4972                    },
4973                    (
4974                        keys,
4975                        n,
4976                        boundary_mask,
4977                        boundary_prefix,
4978                        &mut *unique_keys,
4979                        &mut *fan_out,
4980                        &mut *prefix_sum,
4981                    ),
4982                )
4983                .map_err(|e| {
4984                    XlogError::Kernel(format!(
4985                        "wcoj_build_metadata_scatter_u32 launch failed: {e}"
4986                    ))
4987                })?;
4988        }
4989        rec.commit(runtime).map_err(|e| {
4990            XlogError::Kernel(format!(
4991                "wcoj_build_metadata_u32_recorded: scatter commit failed: {e}"
4992            ))
4993        })?;
4994        cu_stream.synchronize().map_err(|e| {
4995            XlogError::Kernel(format!(
4996                "wcoj_build_metadata_u32_recorded: scatter stream sync failed: {e}"
4997            ))
4998        })?;
4999        Ok(())
5000    }
5001
5002    #[allow(clippy::too_many_arguments)]
5003    fn scatter_metadata_u64(
5004        &self,
5005        input: &CudaBuffer,
5006        key_col_idx: usize,
5007        keys: &TrackedCudaSlice<u64>,
5008        n: u32,
5009        boundary_mask: &TrackedCudaSlice<u32>,
5010        boundary_prefix: &TrackedCudaSlice<u32>,
5011        unique_keys: &mut TrackedCudaSlice<u64>,
5012        fan_out: &mut TrackedCudaSlice<u32>,
5013        prefix_sum: &mut TrackedCudaSlice<u32>,
5014        launch_stream: StreamId,
5015    ) -> Result<()> {
5016        let runtime = self.memory().runtime().ok_or_else(|| {
5017            XlogError::Kernel(
5018                "wcoj_build_metadata_u64_recorded requires a runtime-backed GpuMemoryManager"
5019                    .to_string(),
5020            )
5021        })?;
5022        let cu_stream = runtime
5023            .stream_pool()
5024            .resolve(launch_stream)
5025            .ok_or_else(|| {
5026                XlogError::Kernel(format!(
5027                    "wcoj_build_metadata_u64_recorded: launch_stream StreamId({}) does not resolve",
5028                    launch_stream.0
5029                ))
5030            })?;
5031        let mut rec = LaunchRecorder::new_strict(launch_stream);
5032        rec.read(input.num_rows_device());
5033        rec.read_column(input.column(key_col_idx).expect("metadata key column"));
5034        rec.read(boundary_mask);
5035        rec.read(boundary_prefix);
5036        rec.write(unique_keys);
5037        rec.write(fan_out);
5038        rec.write(prefix_sum);
5039        rec.preflight(runtime).map_err(|e| {
5040            XlogError::Kernel(format!(
5041                "wcoj_build_metadata_u64_recorded: scatter preflight failed: {e}"
5042            ))
5043        })?;
5044
5045        let kernel = self
5046            .device()
5047            .inner()
5048            .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_BUILD_METADATA_SCATTER_U64)
5049            .ok_or_else(|| {
5050                XlogError::Kernel("wcoj_build_metadata_scatter_u64 kernel not found".to_string())
5051            })?;
5052        let grid = n.div_ceil(BLOCK_SIZE);
5053        unsafe {
5054            kernel
5055                .clone()
5056                .launch_on_stream(
5057                    &cu_stream,
5058                    LaunchConfig {
5059                        grid_dim: (grid, 1, 1),
5060                        block_dim: (BLOCK_SIZE, 1, 1),
5061                        shared_mem_bytes: 0,
5062                    },
5063                    (
5064                        keys,
5065                        n,
5066                        boundary_mask,
5067                        boundary_prefix,
5068                        &mut *unique_keys,
5069                        &mut *fan_out,
5070                        &mut *prefix_sum,
5071                    ),
5072                )
5073                .map_err(|e| {
5074                    XlogError::Kernel(format!(
5075                        "wcoj_build_metadata_scatter_u64 launch failed: {e}"
5076                    ))
5077                })?;
5078        }
5079        rec.commit(runtime).map_err(|e| {
5080            XlogError::Kernel(format!(
5081                "wcoj_build_metadata_u64_recorded: scatter commit failed: {e}"
5082            ))
5083        })?;
5084        cu_stream.synchronize().map_err(|e| {
5085            XlogError::Kernel(format!(
5086                "wcoj_build_metadata_u64_recorded: scatter stream sync failed: {e}"
5087            ))
5088        })?;
5089        Ok(())
5090    }
5091
5092    fn metadata_scanned_count(
5093        &self,
5094        boundary_mask: &TrackedCudaSlice<u32>,
5095        boundary_prefix: &TrackedCudaSlice<u32>,
5096        n: u32,
5097    ) -> Result<u32> {
5098        let last = n - 1;
5099        let prefix_last = self.dtoh_scalar_untracked::<u32>(boundary_prefix, last as usize)?;
5100        let mask_last = self.dtoh_scalar_untracked::<u32>(boundary_mask, last as usize)?;
5101        Ok(prefix_last + mask_last)
5102    }
5103
5104    fn metadata_logical_rows(&self, input: &CudaBuffer) -> Result<u32> {
5105        if let Some(cached) = input.cached_row_count() {
5106            return Ok(cached);
5107        }
5108        self.dtoh_scalar_untracked::<u32>(input.num_rows_device(), 0)
5109    }
5110}
5111
5112#[derive(Clone, Copy)]
5113enum MetadataWidth {
5114    U32,
5115    U64,
5116}
5117
5118fn metadata_column_u32(input: &CudaBuffer, key_col_idx: usize) -> Result<&TrackedCudaSlice<u32>> {
5119    let col = input.column(key_col_idx).ok_or_else(|| {
5120        XlogError::Kernel(format!(
5121            "wcoj_build_metadata_u32_recorded: column {} not found",
5122            key_col_idx
5123        ))
5124    })?;
5125    match col {
5126        CudaColumn::Owned(slice) => unsafe {
5127            Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u32>))
5128        },
5129        _ => Err(XlogError::Kernel(
5130            "wcoj_build_metadata_u32_recorded: key column must be an owned CudaColumn".to_string(),
5131        )),
5132    }
5133}
5134
5135fn metadata_column_u64(input: &CudaBuffer, key_col_idx: usize) -> Result<&TrackedCudaSlice<u64>> {
5136    let col = input.column(key_col_idx).ok_or_else(|| {
5137        XlogError::Kernel(format!(
5138            "wcoj_build_metadata_u64_recorded: column {} not found",
5139            key_col_idx
5140        ))
5141    })?;
5142    match col {
5143        CudaColumn::Owned(slice) => unsafe {
5144            Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u64>))
5145        },
5146        _ => Err(XlogError::Kernel(
5147            "wcoj_build_metadata_u64_recorded: key column must be an owned CudaColumn".to_string(),
5148        )),
5149    }
5150}
5151
5152fn validate_binary_u32(ctx: &str, label: &str, input: &CudaBuffer) -> Result<()> {
5153    if input.arity() != 2 {
5154        return Err(XlogError::Kernel(format!(
5155            "{ctx}: {label} must be 2-column, got arity {}",
5156            input.arity()
5157        )));
5158    }
5159    for col_idx in 0..2 {
5160        let ty = input.schema().column_type(col_idx).ok_or_else(|| {
5161            XlogError::Kernel(format!("{ctx}: {label}.col{col_idx} type missing"))
5162        })?;
5163        if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
5164            return Err(XlogError::Kernel(format!(
5165                "{ctx}: {label}.col{col_idx} must be U32 or Symbol, got {:?}",
5166                ty
5167            )));
5168        }
5169    }
5170    Ok(())
5171}
5172
5173fn validate_binary_u64(ctx: &str, label: &str, input: &CudaBuffer) -> Result<()> {
5174    if input.arity() != 2 {
5175        return Err(XlogError::Kernel(format!(
5176            "{ctx}: {label} must be 2-column, got arity {}",
5177            input.arity()
5178        )));
5179    }
5180    for col_idx in 0..2 {
5181        let ty = input.schema().column_type(col_idx).ok_or_else(|| {
5182            XlogError::Kernel(format!("{ctx}: {label}.col{col_idx} type missing"))
5183        })?;
5184        if !matches!(ty, ScalarType::U64) {
5185            return Err(XlogError::Kernel(format!(
5186                "{ctx}: {label}.col{col_idx} must be U64, got {:?}",
5187                ty
5188            )));
5189        }
5190    }
5191    Ok(())
5192}
5193
5194unsafe fn reinterpret_u8_as_u32(slice: &mut TrackedCudaSlice<u8>) -> &mut TrackedCudaSlice<u32> {
5195    &mut *(slice as *mut TrackedCudaSlice<u8> as *mut TrackedCudaSlice<u32>)
5196}
5197
5198unsafe fn reinterpret_u8_as_u64(slice: &mut TrackedCudaSlice<u8>) -> &mut TrackedCudaSlice<u64> {
5199    &mut *(slice as *mut TrackedCudaSlice<u8> as *mut TrackedCudaSlice<u64>)
5200}