1use std::collections::BTreeMap;
2use std::ffi::c_void;
3
4use cudarc::driver::sys;
5use xlog_core::{AggOp, Result, ScalarType, Schema, XlogError};
6
7use super::{arith_kernels, wcoj_kernels, CudaKernelProvider, ARITH_MODULE, WCOJ_MODULE};
8use crate::device_runtime::StreamId;
9use crate::launch::LaunchRecorder;
10use crate::memory::{CudaColumn, TrackedCudaSlice};
11use crate::wcoj_metadata::{
12 Wcoj4CycleRootAggValue, WcojCycle4HgWorkPlanU32, WcojCycle4HgWorkPlanU64, WcojRelationMetadata,
13 WcojRootAggValue, WcojTriangleHgCountPhaseU32, WcojTriangleHgWorkPlanU32,
14 WcojTriangleHgWorkPlanU64,
15};
16use crate::{AsKernelParam, CudaBuffer, LaunchAsync, LaunchConfig};
17
18const BLOCK_SIZE: u32 = 256;
19const HG_COUNT_BLOCK_SIZE: u32 = 512;
20
21impl CudaKernelProvider {
22 pub fn wcoj_build_metadata_u32_recorded(
23 &self,
24 input: &CudaBuffer,
25 key_col_idx: usize,
26 launch_stream: StreamId,
27 ) -> Result<WcojRelationMetadata<u32>> {
28 self.validate_metadata_column(input, key_col_idx, MetadataWidth::U32)?;
29 let keys = metadata_column_u32(input, key_col_idx)?;
30 self.build_metadata_u32_from_column(input, key_col_idx, keys, launch_stream)
31 }
32
33 pub fn wcoj_build_metadata_u64_recorded(
34 &self,
35 input: &CudaBuffer,
36 key_col_idx: usize,
37 launch_stream: StreamId,
38 ) -> Result<WcojRelationMetadata<u64>> {
39 self.validate_metadata_column(input, key_col_idx, MetadataWidth::U64)?;
40 let keys = metadata_column_u64(input, key_col_idx)?;
41 self.build_metadata_u64_from_column(input, key_col_idx, keys, launch_stream)
42 }
43
44 pub fn wcoj_triangle_hg_work_plan_u32_recorded(
45 &self,
46 e_xy: &CudaBuffer,
47 e_yz: &CudaBuffer,
48 e_xz: &CudaBuffer,
49 block_work_unit: u32,
50 launch_stream: StreamId,
51 ) -> Result<WcojTriangleHgWorkPlanU32> {
52 let ctx = "wcoj_triangle_hg_work_plan_u32_recorded";
53 if block_work_unit == 0 {
54 return Err(XlogError::Kernel(format!(
55 "{ctx}: block_work_unit must be nonzero"
56 )));
57 }
58 validate_binary_u32(ctx, "e_xy", e_xy)?;
59 validate_binary_u32(ctx, "e_yz", e_yz)?;
60 validate_binary_u32(ctx, "e_xz", e_xz)?;
61
62 let n_xy = self.metadata_logical_rows(e_xy)?;
63 let n_yz = self.metadata_logical_rows(e_yz)?;
64 let n_xz = self.metadata_logical_rows(e_xz)?;
65 let prefix_len = n_xy
66 .checked_add(1)
67 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
68 let mut xy_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
69 let mut xy_yz_start = self.memory().alloc::<u32>(n_xy as usize)?;
70 let mut xy_yz_end = self.memory().alloc::<u32>(n_xy as usize)?;
71 let mut xy_xz_start = self.memory().alloc::<u32>(n_xy as usize)?;
72 let mut xy_xz_end = self.memory().alloc::<u32>(n_xy as usize)?;
73
74 if n_xy == 0 {
75 let block_counts = self.memory().alloc::<u32>(1)?;
76 let block_offsets = self.memory().alloc::<u32>(1)?;
77 let scratch_x = self.memory().alloc::<u32>(1)?;
78 let scratch_y = self.memory().alloc::<u32>(1)?;
79 let scratch_z = self.memory().alloc::<u32>(1)?;
80 return Ok(WcojTriangleHgWorkPlanU32 {
81 xy_work_prefix,
82 xy_yz_start,
83 xy_yz_end,
84 xy_xz_start,
85 xy_xz_end,
86 block_counts,
87 block_offsets,
88 scratch_x,
89 scratch_y,
90 scratch_z,
91 total_work: 0,
92 block_work_unit,
93 row_count: 0,
94 });
95 }
96
97 let runtime = self.memory().runtime().ok_or_else(|| {
98 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
99 })?;
100 let cu_stream = runtime
101 .stream_pool()
102 .resolve(launch_stream)
103 .ok_or_else(|| {
104 XlogError::Kernel(format!(
105 "{ctx}: launch_stream StreamId({}) does not resolve",
106 launch_stream.0
107 ))
108 })?;
109
110 let xy_col0 = metadata_column_u32(e_xy, 0)?;
111 let xy_col1 = metadata_column_u32(e_xy, 1)?;
112 let yz_col0 = metadata_column_u32(e_yz, 0)?;
113 let xz_col0 = metadata_column_u32(e_xz, 0)?;
114
115 let mut rec = LaunchRecorder::new_strict(launch_stream);
116 rec.read(e_xy.num_rows_device());
117 rec.read(e_yz.num_rows_device());
118 rec.read(e_xz.num_rows_device());
119 rec.read_column(e_xy.column(0).expect("xy.col0"));
120 rec.read_column(e_xy.column(1).expect("xy.col1"));
121 rec.read_column(e_yz.column(0).expect("yz.col0"));
122 rec.read_column(e_xz.column(0).expect("xz.col0"));
123 rec.write(&xy_work_prefix);
124 rec.write(&xy_yz_start);
125 rec.write(&xy_yz_end);
126 rec.write(&xy_xz_start);
127 rec.write(&xy_xz_end);
128 rec.preflight(runtime)
129 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
130
131 let kernel = self
132 .device()
133 .inner()
134 .get_func(
135 WCOJ_MODULE,
136 wcoj_kernels::WCOJ_TRIANGLE_BUILD_HG_WORK_PLAN_U32,
137 )
138 .ok_or_else(|| {
139 XlogError::Kernel(
140 "wcoj_triangle_build_hg_work_plan_u32 kernel not found".to_string(),
141 )
142 })?;
143 let grid = n_xy.div_ceil(BLOCK_SIZE);
144 unsafe {
145 kernel
146 .clone()
147 .launch_on_stream(
148 &cu_stream,
149 LaunchConfig {
150 grid_dim: (grid, 1, 1),
151 block_dim: (BLOCK_SIZE, 1, 1),
152 shared_mem_bytes: 0,
153 },
154 (
155 xy_col0,
156 xy_col1,
157 n_xy,
158 yz_col0,
159 n_yz,
160 xz_col0,
161 n_xz,
162 &mut xy_work_prefix,
163 &mut xy_yz_start,
164 &mut xy_yz_end,
165 &mut xy_xz_start,
166 &mut xy_xz_end,
167 ),
168 )
169 .map_err(|e| {
170 XlogError::Kernel(format!(
171 "wcoj_triangle_build_hg_work_plan_u32 launch failed: {e}"
172 ))
173 })?;
174 }
175 self.multiblock_scan_u32_inplace_on_stream(
176 &mut xy_work_prefix,
177 prefix_len,
178 &cu_stream,
179 launch_stream,
180 runtime,
181 )?;
182 rec.commit(runtime)
183 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
184 cu_stream
185 .synchronize()
186 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
187 let total_work = self.dtoh_scalar_untracked::<u32>(&xy_work_prefix, n_xy as usize)?;
188 let scratch_slots = if total_work == 0 {
189 1usize
190 } else {
191 let grid = total_work.div_ceil(block_work_unit);
192 (grid as usize)
193 .checked_mul(block_work_unit as usize)
194 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: scratch slot overflow")))?
195 };
196 let scratch_x = self.memory().alloc::<u32>(scratch_slots)?;
197 let scratch_y = self.memory().alloc::<u32>(scratch_slots)?;
198 let scratch_z = self.memory().alloc::<u32>(scratch_slots)?;
199 let grid = if total_work == 0 {
200 1
201 } else {
202 total_work.div_ceil(block_work_unit)
203 };
204 let block_counts = self.memory().alloc::<u32>(grid as usize)?;
205 let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
206
207 Ok(WcojTriangleHgWorkPlanU32 {
208 xy_work_prefix,
209 xy_yz_start,
210 xy_yz_end,
211 xy_xz_start,
212 xy_xz_end,
213 block_counts,
214 block_offsets,
215 scratch_x,
216 scratch_y,
217 scratch_z,
218 total_work,
219 block_work_unit,
220 row_count: n_xy,
221 })
222 }
223
224 pub fn wcoj_triangle_count_hg_u32_recorded(
225 &self,
226 e_yz: &CudaBuffer,
227 e_xz: &CudaBuffer,
228 plan: &WcojTriangleHgWorkPlanU32,
229 launch_stream: StreamId,
230 ) -> Result<CudaBuffer> {
231 let ctx = "wcoj_triangle_count_hg_u32_recorded";
232 validate_binary_u32(ctx, "e_yz", e_yz)?;
233 validate_binary_u32(ctx, "e_xz", e_xz)?;
234 let n_yz = self.metadata_logical_rows(e_yz)?;
235 let n_xz = self.metadata_logical_rows(e_xz)?;
236 let grid = if plan.total_work == 0 {
237 1
238 } else {
239 plan.total_work.div_ceil(plan.block_work_unit)
240 };
241 let bytes_count = (grid as usize)
242 .checked_mul(std::mem::size_of::<u32>())
243 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
244 let mut count_bytes = self.memory().alloc::<u8>(bytes_count)?;
245 let d_num_rows = self.memory().alloc::<u32>(1)?;
246
247 let runtime = self.memory().runtime().ok_or_else(|| {
248 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
249 })?;
250 let cu_stream = runtime
251 .stream_pool()
252 .resolve(launch_stream)
253 .ok_or_else(|| {
254 XlogError::Kernel(format!(
255 "{ctx}: launch_stream StreamId({}) does not resolve",
256 launch_stream.0
257 ))
258 })?;
259 let yz_col1 = metadata_column_u32(e_yz, 1)?;
260 let xz_col1 = metadata_column_u32(e_xz, 1)?;
261
262 let mut rec = LaunchRecorder::new_strict(launch_stream);
263 rec.read(e_yz.num_rows_device());
264 rec.read(e_xz.num_rows_device());
265 rec.read_column(e_yz.column(1).expect("yz.col1"));
266 rec.read_column(e_xz.column(1).expect("xz.col1"));
267 rec.read(&plan.xy_work_prefix);
268 rec.read(&plan.xy_yz_start);
269 rec.read(&plan.xy_yz_end);
270 rec.read(&plan.xy_xz_start);
271 rec.read(&plan.xy_xz_end);
272 rec.write(&count_bytes);
273 rec.write(&d_num_rows);
274 rec.preflight(runtime)
275 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
276
277 self.htod_launch_metadata_async_copy_one(
278 &grid,
279 &d_num_rows,
280 &cu_stream,
281 &format!("{ctx}: d_num_rows"),
282 )?;
283
284 let kernel = self
285 .device()
286 .inner()
287 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_U32)
288 .ok_or_else(|| {
289 XlogError::Kernel("wcoj_triangle_count_hg_u32 kernel not found".to_string())
290 })?;
291 let count_u32 = unsafe { reinterpret_u8_as_u32(&mut count_bytes) };
292 let mut params: Vec<*mut c_void> = vec![
293 yz_col1.as_kernel_param(),
294 n_yz.as_kernel_param(),
295 xz_col1.as_kernel_param(),
296 n_xz.as_kernel_param(),
297 (&plan.xy_work_prefix).as_kernel_param(),
298 (&plan.xy_yz_start).as_kernel_param(),
299 (&plan.xy_yz_end).as_kernel_param(),
300 (&plan.xy_xz_start).as_kernel_param(),
301 (&plan.xy_xz_end).as_kernel_param(),
302 plan.row_count.as_kernel_param(),
303 plan.total_work.as_kernel_param(),
304 plan.block_work_unit.as_kernel_param(),
305 count_u32.as_kernel_param(),
306 ];
307 unsafe {
308 kernel
309 .clone()
310 .launch_on_stream(
311 &cu_stream,
312 LaunchConfig {
313 grid_dim: (grid, 1, 1),
314 block_dim: (BLOCK_SIZE, 1, 1),
315 shared_mem_bytes: 0,
316 },
317 &mut params,
318 )
319 .map_err(|e| XlogError::Kernel(format!("{ctx}: launch failed: {e}")))?;
320 }
321 rec.commit(runtime)
322 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
323 cu_stream
324 .synchronize()
325 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
326
327 let schema = Schema::new(vec![("count".to_string(), ScalarType::U32)]);
328 Ok(CudaBuffer::from_columns_with_host_count(
329 vec![count_bytes.into()],
330 grid as u64,
331 d_num_rows,
332 schema,
333 grid,
334 ))
335 }
336
337 pub fn wcoj_triangle_hg_u32_recorded(
338 &self,
339 e_xy: &CudaBuffer,
340 e_yz: &CudaBuffer,
341 e_xz: &CudaBuffer,
342 block_work_unit: u32,
343 launch_stream: StreamId,
344 ) -> Result<CudaBuffer> {
345 let ctx = "wcoj_triangle_hg_u32_recorded";
346 validate_binary_u32(ctx, "e_xy", e_xy)?;
347 validate_binary_u32(ctx, "e_yz", e_yz)?;
348 validate_binary_u32(ctx, "e_xz", e_xz)?;
349 let plan = self.wcoj_triangle_hg_work_plan_u32_recorded(
350 e_xy,
351 e_yz,
352 e_xz,
353 block_work_unit,
354 launch_stream,
355 )?;
356 self.wcoj_triangle_hg_u32_with_plan_recorded(e_xy, e_yz, e_xz, &plan, launch_stream)
357 }
358
359 pub fn wcoj_triangle_groupby_root_count_u32_recorded(
383 &self,
384 e_xy: &CudaBuffer,
385 e_yz: &CudaBuffer,
386 e_xz: &CudaBuffer,
387 block_work_unit: u32,
388 launch_stream: StreamId,
389 ) -> Result<CudaBuffer> {
390 let ctx = "wcoj_triangle_groupby_root_count_u32_recorded";
391 let e_xy = &self.wcoj_layout_u32_recorded(e_xy, launch_stream)?;
397 let e_yz = &self.wcoj_layout_u32_recorded(e_yz, launch_stream)?;
398 let e_xz = &self.wcoj_layout_u32_recorded(e_xz, launch_stream)?;
399 validate_binary_u32(ctx, "e_xy", e_xy)?;
400 validate_binary_u32(ctx, "e_yz", e_yz)?;
401 validate_binary_u32(ctx, "e_xz", e_xz)?;
402 let plan = self.wcoj_triangle_hg_work_plan_u32_recorded(
403 e_xy,
404 e_yz,
405 e_xz,
406 block_work_unit,
407 launch_stream,
408 )?;
409 let n_xy = plan.row_count;
410 let x_type = e_xy.schema().column_type(0).expect("xy.col0 type");
411 let out_schema = Schema::new(vec![
412 ("x".to_string(), x_type),
413 ("count".to_string(), ScalarType::U64),
414 ]);
415 if n_xy == 0 || plan.total_work == 0 {
416 return self.create_empty_buffer(out_schema);
417 }
418
419 let runtime = self.memory().runtime().ok_or_else(|| {
420 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
421 })?;
422 let cu_stream = runtime
423 .stream_pool()
424 .resolve(launch_stream)
425 .ok_or_else(|| {
426 XlogError::Kernel(format!(
427 "{ctx}: launch_stream StreamId({}) does not resolve",
428 launch_stream.0
429 ))
430 })?;
431
432 let yz_col1 = metadata_column_u32(e_yz, 1)?;
433 let xz_col1 = metadata_column_u32(e_xz, 1)?;
434 let n_yz = self.metadata_logical_rows(e_yz)?;
435 let n_xz = self.metadata_logical_rows(e_xz)?;
436
437 let mut row_counts = self
441 .memory()
442 .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
443 self.device()
444 .inner()
445 .memset_zeros(&mut row_counts)
446 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
447
448 let grid = plan.total_work.div_ceil(plan.block_work_unit);
449 let mut rec = LaunchRecorder::new_strict(launch_stream);
450 rec.read(e_xy.num_rows_device());
451 rec.read(e_yz.num_rows_device());
452 rec.read(e_xz.num_rows_device());
453 rec.read_column(e_yz.column(1).expect("yz.col1"));
454 rec.read_column(e_xz.column(1).expect("xz.col1"));
455 rec.read(&plan.xy_work_prefix);
456 rec.read(&plan.xy_yz_start);
457 rec.read(&plan.xy_yz_end);
458 rec.read(&plan.xy_xz_start);
459 rec.read(&plan.xy_xz_end);
460 rec.write(&row_counts);
461 rec.preflight(runtime)
462 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
463 {
464 let kernel = self
465 .device()
466 .inner()
467 .get_func(
468 WCOJ_MODULE,
469 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_COUNT_HG_U32,
470 )
471 .ok_or_else(|| {
472 XlogError::Kernel(
473 "wcoj_triangle_groupby_root_count_hg_u32 kernel not found".to_string(),
474 )
475 })?;
476 let mut params: Vec<*mut c_void> = vec![
477 yz_col1.as_kernel_param(),
478 n_yz.as_kernel_param(),
479 xz_col1.as_kernel_param(),
480 n_xz.as_kernel_param(),
481 (&plan.xy_work_prefix).as_kernel_param(),
482 (&plan.xy_yz_start).as_kernel_param(),
483 (&plan.xy_yz_end).as_kernel_param(),
484 (&plan.xy_xz_start).as_kernel_param(),
485 (&plan.xy_xz_end).as_kernel_param(),
486 plan.row_count.as_kernel_param(),
487 plan.total_work.as_kernel_param(),
488 plan.block_work_unit.as_kernel_param(),
489 (&row_counts).as_kernel_param(),
490 ];
491 unsafe {
492 kernel
493 .clone()
494 .launch_on_stream(
495 &cu_stream,
496 LaunchConfig {
497 grid_dim: (grid, 1, 1),
498 block_dim: (BLOCK_SIZE, 1, 1),
499 shared_mem_bytes: 0,
500 },
501 &mut params,
502 )
503 .map_err(|e| {
504 XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
505 })?;
506 }
507 }
508 rec.commit(runtime)
509 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
510
511 let x_src = match e_xy.column(0).expect("xy.col0") {
515 CudaColumn::Owned(slice) => slice,
516 _ => {
517 return Err(XlogError::Kernel(format!(
518 "{ctx}: e_xy.col0 must be an owned CudaColumn"
519 )))
520 }
521 };
522 let x_copy = self
523 .memory()
524 .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
525 unsafe {
529 let res = sys::cuMemcpyDtoD_v2(
530 *x_copy.device_ptr(),
531 *x_src.device_ptr(),
532 n_xy as usize * std::mem::size_of::<u32>(),
533 );
534 if res != sys::cudaError_enum::CUDA_SUCCESS {
535 return Err(XlogError::Kernel(format!(
536 "{ctx}: copy X column failed: {res:?}"
537 )));
538 }
539 }
540 let mut d_num_rows = self.memory().alloc::<u32>(1)?;
541 self.device()
542 .inner()
543 .dtod_copy(e_xy.num_rows_device(), &mut d_num_rows)
544 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
545 let staging_schema = Schema::new(vec![
546 ("x".to_string(), x_type),
547 ("count".to_string(), ScalarType::U32),
548 ]);
549 let staging = CudaBuffer::from_columns_with_host_count(
550 vec![x_copy.into(), row_counts.into()],
551 n_xy as u64,
552 d_num_rows,
553 staging_schema,
554 n_xy,
555 );
556
557 let mask = self.compare_const_mask_recorded::<u32>(
560 &staging,
561 1,
562 0u32,
563 crate::CompareOp::Gt,
564 launch_stream,
565 )?;
566 let compacted =
567 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
568 self.groupby_multi_agg_recorded(
569 &compacted,
570 &[0],
571 &[(1, xlog_core::AggOp::Sum)],
572 launch_stream,
573 )
574 }
575
576 pub fn wcoj_triangle_groupby_root_agg_u32_recorded(
608 &self,
609 e_xy: &CudaBuffer,
610 e_yz: &CudaBuffer,
611 e_xz: &CudaBuffer,
612 agg_op: AggOp,
613 value: WcojRootAggValue,
614 block_work_unit: u32,
615 launch_stream: StreamId,
616 ) -> Result<CudaBuffer> {
617 let ctx = "wcoj_triangle_groupby_root_agg_u32_recorded";
618 let e_xy = &self.wcoj_layout_u32_recorded(e_xy, launch_stream)?;
624 let e_yz = &self.wcoj_layout_u32_recorded(e_yz, launch_stream)?;
625 let e_xz = &self.wcoj_layout_u32_recorded(e_xz, launch_stream)?;
626 let (kernel_name, agg_elem_size, agg_scalar, agg_name) = match agg_op {
627 AggOp::Sum => (
628 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_SUM_HG_U32,
629 std::mem::size_of::<u64>(),
630 ScalarType::U64,
631 "sum_0",
632 ),
633 AggOp::Min => (
634 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MIN_HG_U32,
635 std::mem::size_of::<u32>(),
636 ScalarType::U32,
637 "min_0",
638 ),
639 AggOp::Max => (
640 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MAX_HG_U32,
641 std::mem::size_of::<u32>(),
642 ScalarType::U32,
643 "max_0",
644 ),
645 other => {
646 return Err(XlogError::Kernel(format!(
647 "{ctx}: unsupported AggOp {other:?} (Sum/Min/Max only; use \
648 wcoj_triangle_groupby_root_count_u32_recorded for Count)"
649 )))
650 }
651 };
652 validate_binary_u32(ctx, "e_xy", e_xy)?;
653 validate_binary_u32(ctx, "e_yz", e_yz)?;
654 validate_binary_u32(ctx, "e_xz", e_xz)?;
655 let value_cols: &[(&CudaBuffer, &str)] = match value {
658 WcojRootAggValue::Y => &[(e_xy, "e_xy")],
659 WcojRootAggValue::Z => &[(e_yz, "e_yz"), (e_xz, "e_xz")],
660 };
661 for (buf, label) in value_cols {
662 let ty = buf.schema().column_type(1).expect("validated 2-col");
663 if ty != ScalarType::U32 {
664 return Err(XlogError::Kernel(format!(
665 "{ctx}: {label}.col1 supplies the aggregate value and must be U32, got {ty:?}"
666 )));
667 }
668 }
669
670 let plan = self.wcoj_triangle_hg_work_plan_u32_recorded(
671 e_xy,
672 e_yz,
673 e_xz,
674 block_work_unit,
675 launch_stream,
676 )?;
677 let n_xy = plan.row_count;
678 let x_type = e_xy.schema().column_type(0).expect("xy.col0 type");
679 let out_schema = Schema::new(vec![
680 ("x".to_string(), x_type),
681 (agg_name.to_string(), agg_scalar),
682 ]);
683 if n_xy == 0 || plan.total_work == 0 {
684 return self.create_empty_buffer(out_schema);
685 }
686
687 let runtime = self.memory().runtime().ok_or_else(|| {
688 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
689 })?;
690 let cu_stream = runtime
691 .stream_pool()
692 .resolve(launch_stream)
693 .ok_or_else(|| {
694 XlogError::Kernel(format!(
695 "{ctx}: launch_stream StreamId({}) does not resolve",
696 launch_stream.0
697 ))
698 })?;
699
700 let yz_col1 = metadata_column_u32(e_yz, 1)?;
701 let xz_col1 = metadata_column_u32(e_xz, 1)?;
702 let xy_col1 = metadata_column_u32(e_xy, 1)?;
703 let n_yz = self.metadata_logical_rows(e_yz)?;
704 let n_xz = self.metadata_logical_rows(e_xz)?;
705 let value_from_z: u32 = match value {
706 WcojRootAggValue::Y => 0,
707 WcojRootAggValue::Z => 1,
708 };
709
710 let mut row_counts = self
714 .memory()
715 .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
716 self.device()
717 .inner()
718 .memset_zeros(&mut row_counts)
719 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
720 let mut row_agg = self.memory().alloc::<u8>(n_xy as usize * agg_elem_size)?;
721 self.device()
722 .inner()
723 .memset_zeros(&mut row_agg)
724 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row aggregates failed: {e}")))?;
725
726 let grid = plan.total_work.div_ceil(plan.block_work_unit);
727 let mut rec = LaunchRecorder::new_strict(launch_stream);
728 rec.read(e_xy.num_rows_device());
729 rec.read(e_yz.num_rows_device());
730 rec.read(e_xz.num_rows_device());
731 rec.read_column(e_yz.column(1).expect("yz.col1"));
732 rec.read_column(e_xz.column(1).expect("xz.col1"));
733 rec.read_column(e_xy.column(1).expect("xy.col1"));
734 rec.read(&plan.xy_work_prefix);
735 rec.read(&plan.xy_yz_start);
736 rec.read(&plan.xy_yz_end);
737 rec.read(&plan.xy_xz_start);
738 rec.read(&plan.xy_xz_end);
739 rec.write(&row_counts);
740 rec.write(&row_agg);
741 rec.preflight(runtime)
742 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
743 if matches!(agg_op, AggOp::Min) {
744 let fill = self
746 .device()
747 .inner()
748 .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U32)
749 .ok_or_else(|| {
750 XlogError::Kernel("arith_fill_const_u32 kernel not found".to_string())
751 })?;
752 let row_agg_u32 = unsafe { reinterpret_u8_as_u32(&mut row_agg) };
753 unsafe {
755 fill.clone()
756 .launch_on_stream(
757 &cu_stream,
758 LaunchConfig::for_num_elems(n_xy),
759 (u32::MAX, n_xy, &mut *row_agg_u32),
760 )
761 .map_err(|e| {
762 XlogError::Kernel(format!("{ctx}: min identity fill failed: {e}"))
763 })?;
764 }
765 }
766 {
767 let kernel = self
768 .device()
769 .inner()
770 .get_func(WCOJ_MODULE, kernel_name)
771 .ok_or_else(|| XlogError::Kernel(format!("{kernel_name} kernel not found")))?;
772 let mut params: Vec<*mut c_void> = vec![
773 yz_col1.as_kernel_param(),
774 n_yz.as_kernel_param(),
775 xz_col1.as_kernel_param(),
776 n_xz.as_kernel_param(),
777 xy_col1.as_kernel_param(),
778 value_from_z.as_kernel_param(),
779 (&plan.xy_work_prefix).as_kernel_param(),
780 (&plan.xy_yz_start).as_kernel_param(),
781 (&plan.xy_yz_end).as_kernel_param(),
782 (&plan.xy_xz_start).as_kernel_param(),
783 (&plan.xy_xz_end).as_kernel_param(),
784 plan.row_count.as_kernel_param(),
785 plan.total_work.as_kernel_param(),
786 plan.block_work_unit.as_kernel_param(),
787 (&row_counts).as_kernel_param(),
788 (&row_agg).as_kernel_param(),
789 ];
790 unsafe {
791 kernel
792 .clone()
793 .launch_on_stream(
794 &cu_stream,
795 LaunchConfig {
796 grid_dim: (grid, 1, 1),
797 block_dim: (BLOCK_SIZE, 1, 1),
798 shared_mem_bytes: 0,
799 },
800 &mut params,
801 )
802 .map_err(|e| {
803 XlogError::Kernel(format!("{ctx}: groupby-agg launch failed: {e}"))
804 })?;
805 }
806 }
807 rec.commit(runtime)
808 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
809
810 let x_src = match e_xy.column(0).expect("xy.col0") {
814 CudaColumn::Owned(slice) => slice,
815 _ => {
816 return Err(XlogError::Kernel(format!(
817 "{ctx}: e_xy.col0 must be an owned CudaColumn"
818 )))
819 }
820 };
821 let x_copy = self
822 .memory()
823 .alloc::<u8>(n_xy as usize * std::mem::size_of::<u32>())?;
824 unsafe {
828 let res = sys::cuMemcpyDtoD_v2(
829 *x_copy.device_ptr(),
830 *x_src.device_ptr(),
831 n_xy as usize * std::mem::size_of::<u32>(),
832 );
833 if res != sys::cudaError_enum::CUDA_SUCCESS {
834 return Err(XlogError::Kernel(format!(
835 "{ctx}: copy X column failed: {res:?}"
836 )));
837 }
838 }
839 let mut d_num_rows = self.memory().alloc::<u32>(1)?;
840 self.device()
841 .inner()
842 .dtod_copy(e_xy.num_rows_device(), &mut d_num_rows)
843 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
844 let staging_schema = Schema::new(vec![
845 ("x".to_string(), x_type),
846 ("count".to_string(), ScalarType::U32),
847 ("agg".to_string(), agg_scalar),
848 ]);
849 let staging = CudaBuffer::from_columns_with_host_count(
850 vec![x_copy.into(), row_counts.into(), row_agg.into()],
851 n_xy as u64,
852 d_num_rows,
853 staging_schema,
854 n_xy,
855 );
856
857 let mask = self.compare_const_mask_recorded::<u32>(
860 &staging,
861 1,
862 0u32,
863 crate::CompareOp::Gt,
864 launch_stream,
865 )?;
866 let compacted =
867 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
868 self.groupby_multi_agg_recorded(&compacted, &[0], &[(2, agg_op)], launch_stream)
869 }
870
871 pub fn wcoj_triangle_groupby_root_count_u64_recorded(
892 &self,
893 e_xy: &CudaBuffer,
894 e_yz: &CudaBuffer,
895 e_xz: &CudaBuffer,
896 block_work_unit: u32,
897 launch_stream: StreamId,
898 ) -> Result<CudaBuffer> {
899 let ctx = "wcoj_triangle_groupby_root_count_u64_recorded";
900 let e_xy = &self.wcoj_layout_u64_recorded(e_xy, launch_stream)?;
906 let e_yz = &self.wcoj_layout_u64_recorded(e_yz, launch_stream)?;
907 let e_xz = &self.wcoj_layout_u64_recorded(e_xz, launch_stream)?;
908 validate_binary_u64(ctx, "e_xy", e_xy)?;
909 validate_binary_u64(ctx, "e_yz", e_yz)?;
910 validate_binary_u64(ctx, "e_xz", e_xz)?;
911 let plan = self.wcoj_triangle_hg_work_plan_u64_recorded(
912 e_xy,
913 e_yz,
914 e_xz,
915 block_work_unit,
916 launch_stream,
917 )?;
918 let n_xy = plan.row_count;
919 let out_schema = Schema::new(vec![
920 ("x".to_string(), ScalarType::U64),
921 ("count".to_string(), ScalarType::U64),
922 ]);
923 if n_xy == 0 || plan.total_work == 0 {
924 return self.create_empty_buffer(out_schema);
925 }
926
927 let runtime = self.memory().runtime().ok_or_else(|| {
928 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
929 })?;
930 let cu_stream = runtime
931 .stream_pool()
932 .resolve(launch_stream)
933 .ok_or_else(|| {
934 XlogError::Kernel(format!(
935 "{ctx}: launch_stream StreamId({}) does not resolve",
936 launch_stream.0
937 ))
938 })?;
939
940 let yz_col1 = metadata_column_u64(e_yz, 1)?;
941 let xz_col1 = metadata_column_u64(e_xz, 1)?;
942 let n_yz = self.metadata_logical_rows(e_yz)?;
943 let n_xz = self.metadata_logical_rows(e_xz)?;
944
945 let mut row_counts = self.memory().alloc::<u32>(n_xy as usize)?;
947 self.device()
948 .inner()
949 .memset_zeros(&mut row_counts)
950 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
951
952 let grid = plan.total_work.div_ceil(plan.block_work_unit);
953 let mut rec = LaunchRecorder::new_strict(launch_stream);
954 rec.read(e_xy.num_rows_device());
955 rec.read(e_yz.num_rows_device());
956 rec.read(e_xz.num_rows_device());
957 rec.read_column(e_yz.column(1).expect("yz.col1"));
958 rec.read_column(e_xz.column(1).expect("xz.col1"));
959 rec.read(&plan.xy_work_prefix);
960 rec.read(&plan.xy_yz_start);
961 rec.read(&plan.xy_yz_end);
962 rec.read(&plan.xy_xz_start);
963 rec.read(&plan.xy_xz_end);
964 rec.write(&row_counts);
965 rec.preflight(runtime)
966 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
967 {
968 let kernel = self
969 .device()
970 .inner()
971 .get_func(
972 WCOJ_MODULE,
973 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_COUNT_HG_U64,
974 )
975 .ok_or_else(|| {
976 XlogError::Kernel(
977 "wcoj_triangle_groupby_root_count_hg_u64 kernel not found".to_string(),
978 )
979 })?;
980 let mut params: Vec<*mut c_void> = vec![
981 yz_col1.as_kernel_param(),
982 n_yz.as_kernel_param(),
983 xz_col1.as_kernel_param(),
984 n_xz.as_kernel_param(),
985 (&plan.xy_work_prefix).as_kernel_param(),
986 (&plan.xy_yz_start).as_kernel_param(),
987 (&plan.xy_yz_end).as_kernel_param(),
988 (&plan.xy_xz_start).as_kernel_param(),
989 (&plan.xy_xz_end).as_kernel_param(),
990 plan.row_count.as_kernel_param(),
991 plan.total_work.as_kernel_param(),
992 plan.block_work_unit.as_kernel_param(),
993 (&row_counts).as_kernel_param(),
994 ];
995 unsafe {
996 kernel
997 .clone()
998 .launch_on_stream(
999 &cu_stream,
1000 LaunchConfig {
1001 grid_dim: (grid, 1, 1),
1002 block_dim: (BLOCK_SIZE, 1, 1),
1003 shared_mem_bytes: 0,
1004 },
1005 &mut params,
1006 )
1007 .map_err(|e| {
1008 XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
1009 })?;
1010 }
1011 }
1012 rec.commit(runtime)
1013 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
1014
1015 let meta = self.wcoj_build_metadata_u64_recorded(e_xy, 0, launch_stream)?;
1019 let key_count = meta.key_count;
1020 if key_count == 0 {
1021 return self.create_empty_buffer(out_schema);
1022 }
1023 let mut sums = self
1024 .memory()
1025 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1026 self.device()
1027 .inner()
1028 .memset_zeros(&mut sums)
1029 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group sums failed: {e}")))?;
1030
1031 let mut rec_sum = LaunchRecorder::new_strict(launch_stream);
1032 rec_sum.read(&row_counts);
1033 rec_sum.read(&meta.prefix_sum);
1034 rec_sum.write(&sums);
1035 rec_sum
1036 .preflight(runtime)
1037 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce preflight failed: {e}")))?;
1038 {
1039 let kernel = self
1040 .device()
1041 .inner()
1042 .get_func(
1043 WCOJ_MODULE,
1044 wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_COUNTS_U32,
1045 )
1046 .ok_or_else(|| {
1047 XlogError::Kernel(
1048 "wcoj_groupby_root_segment_sum_counts_u32 kernel not found".to_string(),
1049 )
1050 })?;
1051 let reduce_grid = n_xy.div_ceil(BLOCK_SIZE);
1052 let mut params: Vec<*mut c_void> = vec![
1053 (&row_counts).as_kernel_param(),
1054 n_xy.as_kernel_param(),
1055 (&meta.prefix_sum).as_kernel_param(),
1056 key_count.as_kernel_param(),
1057 (&sums).as_kernel_param(),
1058 ];
1059 unsafe {
1060 kernel
1061 .clone()
1062 .launch_on_stream(
1063 &cu_stream,
1064 LaunchConfig {
1065 grid_dim: (reduce_grid, 1, 1),
1066 block_dim: (BLOCK_SIZE, 1, 1),
1067 shared_mem_bytes: 0,
1068 },
1069 &mut params,
1070 )
1071 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce launch failed: {e}")))?;
1072 }
1073 }
1074 rec_sum
1075 .commit(runtime)
1076 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce commit failed: {e}")))?;
1077
1078 let x_copy = self
1084 .memory()
1085 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1086 let d_num_rows = self.memory().alloc::<u32>(1)?;
1087 let mut rec_copy = LaunchRecorder::new_strict(launch_stream);
1088 rec_copy.read(&meta.unique_keys);
1089 rec_copy.write(&x_copy);
1090 rec_copy.write(&d_num_rows);
1091 rec_copy
1092 .preflight(runtime)
1093 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy preflight failed: {e}")))?;
1094 unsafe {
1095 let res = sys::cuMemcpyDtoDAsync_v2(
1096 *x_copy.device_ptr(),
1097 *meta.unique_keys.device_ptr(),
1098 key_count as usize * std::mem::size_of::<u64>(),
1099 cu_stream.cu_stream(),
1100 );
1101 if res != sys::cudaError_enum::CUDA_SUCCESS {
1102 return Err(XlogError::Kernel(format!(
1103 "{ctx}: DtoD unique keys copy failed: {res:?}"
1104 )));
1105 }
1106 }
1107 self.htod_launch_metadata_async_copy_one(
1108 &key_count,
1109 &d_num_rows,
1110 &cu_stream,
1111 &format!("{ctx}: d_num_rows"),
1112 )?;
1113 rec_copy
1114 .commit(runtime)
1115 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy commit failed: {e}")))?;
1116 cu_stream
1117 .synchronize()
1118 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
1119 let staging_schema = Schema::new(vec![
1120 ("x".to_string(), ScalarType::U64),
1121 ("count".to_string(), ScalarType::U64),
1122 ]);
1123 let staging = CudaBuffer::from_columns_with_host_count(
1124 vec![x_copy.into(), sums.into()],
1125 u64::from(key_count),
1126 d_num_rows,
1127 staging_schema,
1128 key_count,
1129 );
1130 let mask = self.compare_const_mask_recorded::<u64>(
1131 &staging,
1132 1,
1133 0u64,
1134 crate::CompareOp::Gt,
1135 launch_stream,
1136 )?;
1137 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)
1138 }
1139
1140 pub fn wcoj_triangle_groupby_root_agg_u64_recorded(
1173 &self,
1174 e_xy: &CudaBuffer,
1175 e_yz: &CudaBuffer,
1176 e_xz: &CudaBuffer,
1177 agg_op: AggOp,
1178 value: WcojRootAggValue,
1179 block_work_unit: u32,
1180 launch_stream: StreamId,
1181 ) -> Result<CudaBuffer> {
1182 let ctx = "wcoj_triangle_groupby_root_agg_u64_recorded";
1183 let e_xy = &self.wcoj_layout_u64_recorded(e_xy, launch_stream)?;
1189 let e_yz = &self.wcoj_layout_u64_recorded(e_yz, launch_stream)?;
1190 let e_xz = &self.wcoj_layout_u64_recorded(e_xz, launch_stream)?;
1191 let (kernel_name, segment_kernel_name, agg_name) = match agg_op {
1192 AggOp::Sum => (
1193 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_SUM_HG_U64,
1194 wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_VALUES_U64,
1195 "sum_0",
1196 ),
1197 AggOp::Min => (
1198 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MIN_HG_U64,
1199 wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_MIN_VALUES_U64,
1200 "min_0",
1201 ),
1202 AggOp::Max => (
1203 wcoj_kernels::WCOJ_TRIANGLE_GROUPBY_ROOT_MAX_HG_U64,
1204 wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_MAX_VALUES_U64,
1205 "max_0",
1206 ),
1207 other => {
1208 return Err(XlogError::Kernel(format!(
1209 "{ctx}: unsupported AggOp {other:?} (Sum/Min/Max only; use \
1210 wcoj_triangle_groupby_root_count_u64_recorded for Count)"
1211 )))
1212 }
1213 };
1214 validate_binary_u64(ctx, "e_xy", e_xy)?;
1215 validate_binary_u64(ctx, "e_yz", e_yz)?;
1216 validate_binary_u64(ctx, "e_xz", e_xz)?;
1217 let plan = self.wcoj_triangle_hg_work_plan_u64_recorded(
1218 e_xy,
1219 e_yz,
1220 e_xz,
1221 block_work_unit,
1222 launch_stream,
1223 )?;
1224 let n_xy = plan.row_count;
1225 let out_schema = Schema::new(vec![
1226 ("x".to_string(), ScalarType::U64),
1227 (agg_name.to_string(), ScalarType::U64),
1228 ]);
1229 if n_xy == 0 || plan.total_work == 0 {
1230 return self.create_empty_buffer(out_schema);
1231 }
1232
1233 let runtime = self.memory().runtime().ok_or_else(|| {
1234 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1235 })?;
1236 let cu_stream = runtime
1237 .stream_pool()
1238 .resolve(launch_stream)
1239 .ok_or_else(|| {
1240 XlogError::Kernel(format!(
1241 "{ctx}: launch_stream StreamId({}) does not resolve",
1242 launch_stream.0
1243 ))
1244 })?;
1245
1246 let yz_col1 = metadata_column_u64(e_yz, 1)?;
1247 let xz_col1 = metadata_column_u64(e_xz, 1)?;
1248 let xy_col1 = metadata_column_u64(e_xy, 1)?;
1249 let n_yz = self.metadata_logical_rows(e_yz)?;
1250 let n_xz = self.metadata_logical_rows(e_xz)?;
1251 let value_from_z: u32 = match value {
1252 WcojRootAggValue::Y => 0,
1253 WcojRootAggValue::Z => 1,
1254 };
1255
1256 let mut row_counts = self.memory().alloc::<u32>(n_xy as usize)?;
1258 self.device()
1259 .inner()
1260 .memset_zeros(&mut row_counts)
1261 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
1262 let mut row_agg = self
1263 .memory()
1264 .alloc::<u8>(n_xy as usize * std::mem::size_of::<u64>())?;
1265 self.device()
1266 .inner()
1267 .memset_zeros(&mut row_agg)
1268 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row aggregates failed: {e}")))?;
1269
1270 let grid = plan.total_work.div_ceil(plan.block_work_unit);
1271 let mut rec = LaunchRecorder::new_strict(launch_stream);
1272 rec.read(e_xy.num_rows_device());
1273 rec.read(e_yz.num_rows_device());
1274 rec.read(e_xz.num_rows_device());
1275 rec.read_column(e_yz.column(1).expect("yz.col1"));
1276 rec.read_column(e_xz.column(1).expect("xz.col1"));
1277 rec.read_column(e_xy.column(1).expect("xy.col1"));
1278 rec.read(&plan.xy_work_prefix);
1279 rec.read(&plan.xy_yz_start);
1280 rec.read(&plan.xy_yz_end);
1281 rec.read(&plan.xy_xz_start);
1282 rec.read(&plan.xy_xz_end);
1283 rec.write(&row_counts);
1284 rec.write(&row_agg);
1285 rec.preflight(runtime)
1286 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
1287 if matches!(agg_op, AggOp::Min) {
1288 let fill = self
1290 .device()
1291 .inner()
1292 .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U64)
1293 .ok_or_else(|| {
1294 XlogError::Kernel("arith_fill_const_u64 kernel not found".to_string())
1295 })?;
1296 let row_agg_u64 = unsafe { reinterpret_u8_as_u64(&mut row_agg) };
1297 unsafe {
1299 fill.clone()
1300 .launch_on_stream(
1301 &cu_stream,
1302 LaunchConfig::for_num_elems(n_xy),
1303 (u64::MAX, n_xy, &mut *row_agg_u64),
1304 )
1305 .map_err(|e| {
1306 XlogError::Kernel(format!("{ctx}: min identity fill failed: {e}"))
1307 })?;
1308 }
1309 }
1310 {
1311 let kernel = self
1312 .device()
1313 .inner()
1314 .get_func(WCOJ_MODULE, kernel_name)
1315 .ok_or_else(|| XlogError::Kernel(format!("{kernel_name} kernel not found")))?;
1316 let mut params: Vec<*mut c_void> = vec![
1317 yz_col1.as_kernel_param(),
1318 n_yz.as_kernel_param(),
1319 xz_col1.as_kernel_param(),
1320 n_xz.as_kernel_param(),
1321 xy_col1.as_kernel_param(),
1322 value_from_z.as_kernel_param(),
1323 (&plan.xy_work_prefix).as_kernel_param(),
1324 (&plan.xy_yz_start).as_kernel_param(),
1325 (&plan.xy_yz_end).as_kernel_param(),
1326 (&plan.xy_xz_start).as_kernel_param(),
1327 (&plan.xy_xz_end).as_kernel_param(),
1328 plan.row_count.as_kernel_param(),
1329 plan.total_work.as_kernel_param(),
1330 plan.block_work_unit.as_kernel_param(),
1331 (&row_counts).as_kernel_param(),
1332 (&row_agg).as_kernel_param(),
1333 ];
1334 unsafe {
1335 kernel
1336 .clone()
1337 .launch_on_stream(
1338 &cu_stream,
1339 LaunchConfig {
1340 grid_dim: (grid, 1, 1),
1341 block_dim: (BLOCK_SIZE, 1, 1),
1342 shared_mem_bytes: 0,
1343 },
1344 &mut params,
1345 )
1346 .map_err(|e| {
1347 XlogError::Kernel(format!("{ctx}: groupby-agg launch failed: {e}"))
1348 })?;
1349 }
1350 }
1351 rec.commit(runtime)
1352 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
1353
1354 let meta = self.wcoj_build_metadata_u64_recorded(e_xy, 0, launch_stream)?;
1358 let key_count = meta.key_count;
1359 if key_count == 0 {
1360 return self.create_empty_buffer(out_schema);
1361 }
1362 let mut count_sums = self
1363 .memory()
1364 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1365 self.device()
1366 .inner()
1367 .memset_zeros(&mut count_sums)
1368 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group counts failed: {e}")))?;
1369 let mut group_agg = self
1370 .memory()
1371 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1372 self.device()
1373 .inner()
1374 .memset_zeros(&mut group_agg)
1375 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group aggregates failed: {e}")))?;
1376
1377 let mut rec_reduce = LaunchRecorder::new_strict(launch_stream);
1378 rec_reduce.read(&row_counts);
1379 rec_reduce.read(&row_agg);
1380 rec_reduce.read(&meta.prefix_sum);
1381 rec_reduce.write(&count_sums);
1382 rec_reduce.write(&group_agg);
1383 rec_reduce
1384 .preflight(runtime)
1385 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce preflight failed: {e}")))?;
1386 if matches!(agg_op, AggOp::Min) {
1387 let fill = self
1388 .device()
1389 .inner()
1390 .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U64)
1391 .ok_or_else(|| {
1392 XlogError::Kernel("arith_fill_const_u64 kernel not found".to_string())
1393 })?;
1394 let group_agg_u64 = unsafe { reinterpret_u8_as_u64(&mut group_agg) };
1395 unsafe {
1397 fill.clone()
1398 .launch_on_stream(
1399 &cu_stream,
1400 LaunchConfig::for_num_elems(key_count),
1401 (u64::MAX, key_count, &mut *group_agg_u64),
1402 )
1403 .map_err(|e| {
1404 XlogError::Kernel(format!("{ctx}: group min identity fill failed: {e}"))
1405 })?;
1406 }
1407 }
1408 let reduce_grid = n_xy.div_ceil(BLOCK_SIZE);
1409 {
1410 let kernel = self
1411 .device()
1412 .inner()
1413 .get_func(
1414 WCOJ_MODULE,
1415 wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_COUNTS_U32,
1416 )
1417 .ok_or_else(|| {
1418 XlogError::Kernel(
1419 "wcoj_groupby_root_segment_sum_counts_u32 kernel not found".to_string(),
1420 )
1421 })?;
1422 let mut params: Vec<*mut c_void> = vec![
1423 (&row_counts).as_kernel_param(),
1424 n_xy.as_kernel_param(),
1425 (&meta.prefix_sum).as_kernel_param(),
1426 key_count.as_kernel_param(),
1427 (&count_sums).as_kernel_param(),
1428 ];
1429 unsafe {
1430 kernel
1431 .clone()
1432 .launch_on_stream(
1433 &cu_stream,
1434 LaunchConfig {
1435 grid_dim: (reduce_grid, 1, 1),
1436 block_dim: (BLOCK_SIZE, 1, 1),
1437 shared_mem_bytes: 0,
1438 },
1439 &mut params,
1440 )
1441 .map_err(|e| {
1442 XlogError::Kernel(format!("{ctx}: count reduce launch failed: {e}"))
1443 })?;
1444 }
1445 }
1446 {
1447 let kernel = self
1448 .device()
1449 .inner()
1450 .get_func(WCOJ_MODULE, segment_kernel_name)
1451 .ok_or_else(|| {
1452 XlogError::Kernel(format!("{segment_kernel_name} kernel not found"))
1453 })?;
1454 let mut params: Vec<*mut c_void> = vec![
1455 (&row_counts).as_kernel_param(),
1456 (&row_agg).as_kernel_param(),
1457 n_xy.as_kernel_param(),
1458 (&meta.prefix_sum).as_kernel_param(),
1459 key_count.as_kernel_param(),
1460 (&group_agg).as_kernel_param(),
1461 ];
1462 unsafe {
1463 kernel
1464 .clone()
1465 .launch_on_stream(
1466 &cu_stream,
1467 LaunchConfig {
1468 grid_dim: (reduce_grid, 1, 1),
1469 block_dim: (BLOCK_SIZE, 1, 1),
1470 shared_mem_bytes: 0,
1471 },
1472 &mut params,
1473 )
1474 .map_err(|e| {
1475 XlogError::Kernel(format!("{ctx}: agg reduce launch failed: {e}"))
1476 })?;
1477 }
1478 }
1479 rec_reduce
1480 .commit(runtime)
1481 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce commit failed: {e}")))?;
1482
1483 let x_copy = self
1489 .memory()
1490 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
1491 let d_num_rows_agg = self.memory().alloc::<u32>(1)?;
1492 let d_num_rows_counts = self.memory().alloc::<u32>(1)?;
1493 let mut rec_copy = LaunchRecorder::new_strict(launch_stream);
1494 rec_copy.read(&meta.unique_keys);
1495 rec_copy.write(&x_copy);
1496 rec_copy.write(&d_num_rows_agg);
1497 rec_copy.write(&d_num_rows_counts);
1498 rec_copy
1499 .preflight(runtime)
1500 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy preflight failed: {e}")))?;
1501 unsafe {
1502 let res = sys::cuMemcpyDtoDAsync_v2(
1503 *x_copy.device_ptr(),
1504 *meta.unique_keys.device_ptr(),
1505 key_count as usize * std::mem::size_of::<u64>(),
1506 cu_stream.cu_stream(),
1507 );
1508 if res != sys::cudaError_enum::CUDA_SUCCESS {
1509 return Err(XlogError::Kernel(format!(
1510 "{ctx}: DtoD unique keys copy failed: {res:?}"
1511 )));
1512 }
1513 }
1514 self.htod_launch_metadata_async_copy_one(
1515 &key_count,
1516 &d_num_rows_agg,
1517 &cu_stream,
1518 &format!("{ctx}: d_num_rows_agg"),
1519 )?;
1520 self.htod_launch_metadata_async_copy_one(
1521 &key_count,
1522 &d_num_rows_counts,
1523 &cu_stream,
1524 &format!("{ctx}: d_num_rows_counts"),
1525 )?;
1526 rec_copy
1527 .commit(runtime)
1528 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy commit failed: {e}")))?;
1529 cu_stream
1530 .synchronize()
1531 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
1532
1533 let counts_schema = Schema::new(vec![("count".to_string(), ScalarType::U64)]);
1534 let counts_buf = CudaBuffer::from_columns_with_host_count(
1535 vec![count_sums.into()],
1536 u64::from(key_count),
1537 d_num_rows_counts,
1538 counts_schema,
1539 key_count,
1540 );
1541 let staging = CudaBuffer::from_columns_with_host_count(
1542 vec![x_copy.into(), group_agg.into()],
1543 u64::from(key_count),
1544 d_num_rows_agg,
1545 out_schema,
1546 key_count,
1547 );
1548 let mask = self.compare_const_mask_recorded::<u64>(
1549 &counts_buf,
1550 0,
1551 0u64,
1552 crate::CompareOp::Gt,
1553 launch_stream,
1554 )?;
1555 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)
1556 }
1557
1558 pub fn wcoj_triangle_hg_count_phase_u32_recorded(
1559 &self,
1560 e_xy: &CudaBuffer,
1561 e_yz: &CudaBuffer,
1562 e_xz: &CudaBuffer,
1563 plan: &WcojTriangleHgWorkPlanU32,
1564 launch_stream: StreamId,
1565 ) -> Result<WcojTriangleHgCountPhaseU32> {
1566 let ctx = "wcoj_triangle_hg_count_phase_u32_recorded";
1567 validate_binary_u32(ctx, "e_xy", e_xy)?;
1568 validate_binary_u32(ctx, "e_yz", e_yz)?;
1569 validate_binary_u32(ctx, "e_xz", e_xz)?;
1570 let grid = plan.total_work.div_ceil(plan.block_work_unit);
1571 if grid > 1024 {
1572 return Err(XlogError::Kernel(format!(
1573 "{ctx}: spike phase path requires grid <= 1024, got {grid}"
1574 )));
1575 }
1576 let total_rows_device = self.memory().alloc::<u32>(1)?;
1577 if plan.total_work == 0 {
1578 return Ok(WcojTriangleHgCountPhaseU32 {
1579 total_rows_device,
1580 total_rows: 0,
1581 });
1582 }
1583
1584 let runtime = self.memory().runtime().ok_or_else(|| {
1585 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1586 })?;
1587 let cu_stream = runtime
1588 .stream_pool()
1589 .resolve(launch_stream)
1590 .ok_or_else(|| {
1591 XlogError::Kernel(format!(
1592 "{ctx}: launch_stream StreamId({}) does not resolve",
1593 launch_stream.0
1594 ))
1595 })?;
1596
1597 let yz_col1 = metadata_column_u32(e_yz, 1)?;
1598 let xz_col1 = metadata_column_u32(e_xz, 1)?;
1599 let n_yz = self.metadata_logical_rows(e_yz)?;
1600 let n_xz = self.metadata_logical_rows(e_xz)?;
1601
1602 let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
1603 rec_hg.read(e_xy.num_rows_device());
1604 rec_hg.read(e_yz.num_rows_device());
1605 rec_hg.read(e_xz.num_rows_device());
1606 rec_hg.read_column(e_yz.column(1).expect("yz.col1"));
1607 rec_hg.read_column(e_xz.column(1).expect("xz.col1"));
1608 rec_hg.read(&plan.xy_work_prefix);
1609 rec_hg.read(&plan.xy_yz_start);
1610 rec_hg.read(&plan.xy_yz_end);
1611 rec_hg.read(&plan.xy_xz_start);
1612 rec_hg.read(&plan.xy_xz_end);
1613 rec_hg.read_write(&plan.block_counts);
1614 rec_hg.read_write(&plan.block_offsets);
1615 rec_hg.write(&total_rows_device);
1616 rec_hg
1617 .preflight(runtime)
1618 .map_err(|e| XlogError::Kernel(format!("{ctx}: count preflight failed: {e}")))?;
1619 {
1620 let kernel = self
1621 .device()
1622 .inner()
1623 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_U32)
1624 .ok_or_else(|| {
1625 XlogError::Kernel("wcoj_triangle_count_hg_u32 kernel not found".to_string())
1626 })?;
1627 let mut params: Vec<*mut c_void> = vec![
1628 yz_col1.as_kernel_param(),
1629 n_yz.as_kernel_param(),
1630 xz_col1.as_kernel_param(),
1631 n_xz.as_kernel_param(),
1632 (&plan.xy_work_prefix).as_kernel_param(),
1633 (&plan.xy_yz_start).as_kernel_param(),
1634 (&plan.xy_yz_end).as_kernel_param(),
1635 (&plan.xy_xz_start).as_kernel_param(),
1636 (&plan.xy_xz_end).as_kernel_param(),
1637 plan.row_count.as_kernel_param(),
1638 plan.total_work.as_kernel_param(),
1639 plan.block_work_unit.as_kernel_param(),
1640 (&plan.block_counts).as_kernel_param(),
1641 ];
1642 unsafe {
1643 kernel
1644 .clone()
1645 .launch_on_stream(
1646 &cu_stream,
1647 LaunchConfig {
1648 grid_dim: (grid, 1, 1),
1649 block_dim: (BLOCK_SIZE, 1, 1),
1650 shared_mem_bytes: 0,
1651 },
1652 &mut params,
1653 )
1654 .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
1655 }
1656 }
1657 {
1658 let kernel = self
1659 .device()
1660 .inner()
1661 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
1662 .ok_or_else(|| {
1663 XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
1664 })?;
1665 let mut params: Vec<*mut c_void> = vec![
1666 (&plan.block_counts).as_kernel_param(),
1667 grid.as_kernel_param(),
1668 (&plan.block_offsets).as_kernel_param(),
1669 (&total_rows_device).as_kernel_param(),
1670 ];
1671 unsafe {
1672 kernel
1673 .clone()
1674 .launch_on_stream(
1675 &cu_stream,
1676 LaunchConfig {
1677 grid_dim: (1, 1, 1),
1678 block_dim: (1024, 1, 1),
1679 shared_mem_bytes: 0,
1680 },
1681 &mut params,
1682 )
1683 .map_err(|e| XlogError::Kernel(format!("{ctx}: scan launch failed: {e}")))?;
1684 }
1685 }
1686 rec_hg
1687 .commit(runtime)
1688 .map_err(|e| XlogError::Kernel(format!("{ctx}: count commit failed: {e}")))?;
1689 cu_stream
1690 .synchronize()
1691 .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
1692 let total_rows = self
1693 .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
1694 .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
1695 Ok(WcojTriangleHgCountPhaseU32 {
1696 total_rows_device,
1697 total_rows,
1698 })
1699 }
1700
1701 pub fn wcoj_triangle_hg_materialize_phase_u32_recorded(
1702 &self,
1703 e_xy: &CudaBuffer,
1704 e_yz: &CudaBuffer,
1705 e_xz: &CudaBuffer,
1706 plan: &WcojTriangleHgWorkPlanU32,
1707 count: WcojTriangleHgCountPhaseU32,
1708 launch_stream: StreamId,
1709 ) -> Result<CudaBuffer> {
1710 let ctx = "wcoj_triangle_hg_materialize_phase_u32_recorded";
1711 validate_binary_u32(ctx, "e_xy", e_xy)?;
1712 validate_binary_u32(ctx, "e_yz", e_yz)?;
1713 validate_binary_u32(ctx, "e_xz", e_xz)?;
1714 let out_schema = Schema::new(vec![
1715 (
1716 "x".to_string(),
1717 e_xy.schema().column_type(0).expect("xy.col0 type"),
1718 ),
1719 (
1720 "y".to_string(),
1721 e_xy.schema().column_type(1).expect("xy.col1 type"),
1722 ),
1723 (
1724 "z".to_string(),
1725 e_yz.schema().column_type(1).expect("yz.col1 type"),
1726 ),
1727 ]);
1728 if count.total_rows == 0 {
1729 return self.create_empty_buffer(out_schema);
1730 }
1731 let grid = plan.total_work.div_ceil(plan.block_work_unit);
1732 if grid > 1024 {
1733 return Err(XlogError::Kernel(format!(
1734 "{ctx}: spike phase path requires grid <= 1024, got {grid}"
1735 )));
1736 }
1737 let runtime = self.memory().runtime().ok_or_else(|| {
1738 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1739 })?;
1740 let cu_stream = runtime
1741 .stream_pool()
1742 .resolve(launch_stream)
1743 .ok_or_else(|| {
1744 XlogError::Kernel(format!(
1745 "{ctx}: launch_stream StreamId({}) does not resolve",
1746 launch_stream.0
1747 ))
1748 })?;
1749 let xy_col0 = metadata_column_u32(e_xy, 0)?;
1750 let xy_col1 = metadata_column_u32(e_xy, 1)?;
1751 let yz_col1 = metadata_column_u32(e_yz, 1)?;
1752 let xz_col1 = metadata_column_u32(e_xz, 1)?;
1753 let n_yz = self.metadata_logical_rows(e_yz)?;
1754 let n_xz = self.metadata_logical_rows(e_xz)?;
1755 let bytes_per_col = (count.total_rows as usize)
1756 .checked_mul(std::mem::size_of::<u32>())
1757 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
1758 let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
1759 let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
1760 let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
1761 let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
1762 rec_mat.read(e_xy.num_rows_device());
1763 rec_mat.read(e_yz.num_rows_device());
1764 rec_mat.read(e_xz.num_rows_device());
1765 rec_mat.read_column(e_xy.column(0).expect("xy.col0"));
1766 rec_mat.read_column(e_xy.column(1).expect("xy.col1"));
1767 rec_mat.read_column(e_yz.column(1).expect("yz.col1"));
1768 rec_mat.read_column(e_xz.column(1).expect("xz.col1"));
1769 rec_mat.read(&plan.xy_work_prefix);
1770 rec_mat.read(&plan.xy_yz_start);
1771 rec_mat.read(&plan.xy_yz_end);
1772 rec_mat.read(&plan.xy_xz_start);
1773 rec_mat.read(&plan.xy_xz_end);
1774 rec_mat.read(&plan.block_offsets);
1775 rec_mat.write(&out_x);
1776 rec_mat.write(&out_y);
1777 rec_mat.write(&out_z);
1778 rec_mat
1779 .preflight(runtime)
1780 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
1781 {
1782 let kernel = self
1783 .device()
1784 .inner()
1785 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_MATERIALIZE_HG_U32)
1786 .ok_or_else(|| {
1787 XlogError::Kernel(
1788 "wcoj_triangle_materialize_hg_u32 kernel not found".to_string(),
1789 )
1790 })?;
1791 let out_x_u32 = unsafe { reinterpret_u8_as_u32(&mut out_x) };
1792 let out_y_u32 = unsafe { reinterpret_u8_as_u32(&mut out_y) };
1793 let out_z_u32 = unsafe { reinterpret_u8_as_u32(&mut out_z) };
1794 let mut params: Vec<*mut c_void> = vec![
1795 xy_col0.as_kernel_param(),
1796 xy_col1.as_kernel_param(),
1797 yz_col1.as_kernel_param(),
1798 n_yz.as_kernel_param(),
1799 xz_col1.as_kernel_param(),
1800 n_xz.as_kernel_param(),
1801 (&plan.xy_work_prefix).as_kernel_param(),
1802 (&plan.xy_yz_start).as_kernel_param(),
1803 (&plan.xy_yz_end).as_kernel_param(),
1804 (&plan.xy_xz_start).as_kernel_param(),
1805 (&plan.xy_xz_end).as_kernel_param(),
1806 plan.row_count.as_kernel_param(),
1807 plan.total_work.as_kernel_param(),
1808 plan.block_work_unit.as_kernel_param(),
1809 (&plan.block_offsets).as_kernel_param(),
1810 count.total_rows.as_kernel_param(),
1811 out_x_u32.as_kernel_param(),
1812 out_y_u32.as_kernel_param(),
1813 out_z_u32.as_kernel_param(),
1814 ];
1815 unsafe {
1816 kernel
1817 .clone()
1818 .launch_on_stream(
1819 &cu_stream,
1820 LaunchConfig {
1821 grid_dim: (grid, 1, 1),
1822 block_dim: (BLOCK_SIZE, 1, 1),
1823 shared_mem_bytes: 0,
1824 },
1825 &mut params,
1826 )
1827 .map_err(|e| {
1828 XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
1829 })?;
1830 }
1831 }
1832 rec_mat
1833 .commit(runtime)
1834 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
1835 cu_stream.synchronize().map_err(|e| {
1836 XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
1837 })?;
1838 Ok(CudaBuffer::from_columns_with_host_count(
1839 vec![out_x.into(), out_y.into(), out_z.into()],
1840 count.total_rows as u64,
1841 count.total_rows_device,
1842 out_schema,
1843 count.total_rows,
1844 ))
1845 }
1846
1847 pub fn wcoj_triangle_hg_u32_with_plan_recorded(
1848 &self,
1849 e_xy: &CudaBuffer,
1850 e_yz: &CudaBuffer,
1851 e_xz: &CudaBuffer,
1852 plan: &WcojTriangleHgWorkPlanU32,
1853 launch_stream: StreamId,
1854 ) -> Result<CudaBuffer> {
1855 let ctx = "wcoj_triangle_hg_u32_with_plan_recorded";
1856 validate_binary_u32(ctx, "e_xy", e_xy)?;
1857 validate_binary_u32(ctx, "e_yz", e_yz)?;
1858 validate_binary_u32(ctx, "e_xz", e_xz)?;
1859 let out_schema = Schema::new(vec![
1860 (
1861 "x".to_string(),
1862 e_xy.schema().column_type(0).expect("xy.col0 type"),
1863 ),
1864 (
1865 "y".to_string(),
1866 e_xy.schema().column_type(1).expect("xy.col1 type"),
1867 ),
1868 (
1869 "z".to_string(),
1870 e_yz.schema().column_type(1).expect("yz.col1 type"),
1871 ),
1872 ]);
1873 if plan.total_work == 0 {
1874 return self.create_empty_buffer(out_schema);
1875 }
1876
1877 let grid = plan.total_work.div_ceil(plan.block_work_unit);
1878 let bytes_count = (grid as usize)
1879 .checked_mul(std::mem::size_of::<u32>())
1880 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
1881 let mut local_counts = None;
1882 let mut local_offsets = None;
1883 if grid > 1024 {
1884 local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
1885 local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
1886 }
1887 let total_rows_device = self.memory().alloc::<u32>(1)?;
1888
1889 let runtime = self.memory().runtime().ok_or_else(|| {
1890 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
1891 })?;
1892 let cu_stream = runtime
1893 .stream_pool()
1894 .resolve(launch_stream)
1895 .ok_or_else(|| {
1896 XlogError::Kernel(format!(
1897 "{ctx}: launch_stream StreamId({}) does not resolve",
1898 launch_stream.0
1899 ))
1900 })?;
1901
1902 let xy_col0 = metadata_column_u32(e_xy, 0)?;
1903 let xy_col1 = metadata_column_u32(e_xy, 1)?;
1904 let yz_col1 = metadata_column_u32(e_yz, 1)?;
1905 let xz_col1 = metadata_column_u32(e_xz, 1)?;
1906 let n_yz = self.metadata_logical_rows(e_yz)?;
1907 let n_xz = self.metadata_logical_rows(e_xz)?;
1908
1909 let count_u32 = if grid <= 1024 {
1910 &plan.block_counts
1911 } else {
1912 local_counts
1913 .as_ref()
1914 .expect("local HG counts allocated when grid exceeds single-block scan")
1915 };
1916 let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
1917 rec_hg.read(e_xy.num_rows_device());
1918 rec_hg.read(e_yz.num_rows_device());
1919 rec_hg.read(e_xz.num_rows_device());
1920 rec_hg.read_column(e_xy.column(0).expect("xy.col0"));
1921 rec_hg.read_column(e_xy.column(1).expect("xy.col1"));
1922 rec_hg.read_column(e_yz.column(1).expect("yz.col1"));
1923 rec_hg.read_column(e_xz.column(1).expect("xz.col1"));
1924 rec_hg.read(&plan.xy_work_prefix);
1925 rec_hg.read(&plan.xy_yz_start);
1926 rec_hg.read(&plan.xy_yz_end);
1927 rec_hg.read(&plan.xy_xz_start);
1928 rec_hg.read(&plan.xy_xz_end);
1929 rec_hg.read_write(count_u32);
1930 if grid <= 1024 {
1931 rec_hg.read_write(&plan.block_offsets);
1932 } else {
1933 rec_hg.read_write(
1934 local_offsets
1935 .as_ref()
1936 .expect("local HG offsets allocated when grid exceeds single-block scan"),
1937 );
1938 }
1939 rec_hg.write(&total_rows_device);
1940 rec_hg.read_write(&plan.scratch_x);
1941 rec_hg.read_write(&plan.scratch_y);
1942 rec_hg.read_write(&plan.scratch_z);
1943 rec_hg
1944 .preflight(runtime)
1945 .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
1946 {
1947 let kernel = self
1948 .device()
1949 .inner()
1950 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_CACHED_U32)
1951 .ok_or_else(|| {
1952 XlogError::Kernel(
1953 "wcoj_triangle_count_hg_cached_u32 kernel not found".to_string(),
1954 )
1955 })?;
1956 let mut params: Vec<*mut c_void> = vec![
1957 xy_col0.as_kernel_param(),
1958 xy_col1.as_kernel_param(),
1959 yz_col1.as_kernel_param(),
1960 n_yz.as_kernel_param(),
1961 xz_col1.as_kernel_param(),
1962 n_xz.as_kernel_param(),
1963 (&plan.xy_work_prefix).as_kernel_param(),
1964 (&plan.xy_yz_start).as_kernel_param(),
1965 (&plan.xy_yz_end).as_kernel_param(),
1966 (&plan.xy_xz_start).as_kernel_param(),
1967 (&plan.xy_xz_end).as_kernel_param(),
1968 plan.row_count.as_kernel_param(),
1969 plan.total_work.as_kernel_param(),
1970 plan.block_work_unit.as_kernel_param(),
1971 count_u32.as_kernel_param(),
1972 (&plan.scratch_x).as_kernel_param(),
1973 (&plan.scratch_y).as_kernel_param(),
1974 (&plan.scratch_z).as_kernel_param(),
1975 ];
1976 unsafe {
1977 kernel
1978 .clone()
1979 .launch_on_stream(
1980 &cu_stream,
1981 LaunchConfig {
1982 grid_dim: (grid, 1, 1),
1983 block_dim: (HG_COUNT_BLOCK_SIZE, 1, 1),
1984 shared_mem_bytes: 0,
1985 },
1986 &mut params,
1987 )
1988 .map_err(|e| {
1989 XlogError::Kernel(format!("{ctx}: cached count launch failed: {e}"))
1990 })?;
1991 }
1992 }
1993 if grid <= 1024 {
1994 let kernel = self
1995 .device()
1996 .inner()
1997 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
1998 .ok_or_else(|| {
1999 XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
2000 })?;
2001 let mut params: Vec<*mut c_void> = vec![
2002 count_u32.as_kernel_param(),
2003 grid.as_kernel_param(),
2004 (&plan.block_offsets).as_kernel_param(),
2005 (&total_rows_device).as_kernel_param(),
2006 ];
2007 unsafe {
2008 kernel
2009 .clone()
2010 .launch_on_stream(
2011 &cu_stream,
2012 LaunchConfig {
2013 grid_dim: (1, 1, 1),
2014 block_dim: (1024, 1, 1),
2015 shared_mem_bytes: 0,
2016 },
2017 &mut params,
2018 )
2019 .map_err(|e| {
2020 XlogError::Kernel(format!("{ctx}: HG block-count scan failed: {e}"))
2021 })?;
2022 }
2023 } else {
2024 let offsets_mut = local_offsets
2025 .as_mut()
2026 .expect("local HG offsets allocated when grid exceeds single-block scan");
2027 unsafe {
2028 let res = sys::cuMemcpyDtoDAsync_v2(
2029 *offsets_mut.device_ptr(),
2030 *count_u32.device_ptr(),
2031 bytes_count,
2032 cu_stream.cu_stream(),
2033 );
2034 if res != sys::cudaError_enum::CUDA_SUCCESS {
2035 return Err(XlogError::Kernel(format!(
2036 "{ctx}: DtoD count to offsets failed: {res:?}"
2037 )));
2038 }
2039 }
2040 self.multiblock_scan_u32_inplace_on_stream(
2041 offsets_mut,
2042 grid,
2043 &cu_stream,
2044 launch_stream,
2045 runtime,
2046 )?;
2047 let total_kernel = self
2048 .device()
2049 .inner()
2050 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
2051 .ok_or_else(|| {
2052 XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
2053 })?;
2054 let mut params: Vec<*mut c_void> = vec![
2055 count_u32.as_kernel_param(),
2056 (&*offsets_mut).as_kernel_param(),
2057 grid.as_kernel_param(),
2058 (&total_rows_device).as_kernel_param(),
2059 ];
2060 unsafe {
2061 total_kernel
2062 .clone()
2063 .launch_on_stream(
2064 &cu_stream,
2065 LaunchConfig {
2066 grid_dim: (1, 1, 1),
2067 block_dim: (1, 1, 1),
2068 shared_mem_bytes: 0,
2069 },
2070 &mut params,
2071 )
2072 .map_err(|e| {
2073 XlogError::Kernel(format!("{ctx}: HG total reducer failed: {e}"))
2074 })?;
2075 }
2076 }
2077 rec_hg
2078 .commit(runtime)
2079 .map_err(|e| XlogError::Kernel(format!("{ctx}: HG count commit failed: {e}")))?;
2080 cu_stream
2081 .synchronize()
2082 .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
2083 let total_rows = self
2084 .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
2085 .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
2086 if total_rows == 0 {
2087 return self.create_empty_buffer(out_schema);
2088 }
2089
2090 let bytes_per_col = (total_rows as usize)
2091 .checked_mul(std::mem::size_of::<u32>())
2092 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
2093 let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
2094 let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
2095 let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
2096 let materialize_offsets = if grid <= 1024 {
2097 &plan.block_offsets
2098 } else {
2099 local_offsets
2100 .as_ref()
2101 .expect("local HG offsets allocated when grid exceeds single-block scan")
2102 };
2103 let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
2104 rec_mat.read(count_u32);
2105 rec_mat.read(materialize_offsets);
2106 rec_mat.read(&plan.scratch_x);
2107 rec_mat.read(&plan.scratch_y);
2108 rec_mat.read(&plan.scratch_z);
2109 rec_mat.write(&out_x);
2110 rec_mat.write(&out_y);
2111 rec_mat.write(&out_z);
2112 rec_mat
2113 .preflight(runtime)
2114 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
2115 {
2116 let kernel = self
2117 .device()
2118 .inner()
2119 .get_func(
2120 WCOJ_MODULE,
2121 wcoj_kernels::WCOJ_TRIANGLE_MATERIALIZE_HG_CACHED_U32,
2122 )
2123 .ok_or_else(|| {
2124 XlogError::Kernel(
2125 "wcoj_triangle_materialize_hg_cached_u32 kernel not found".to_string(),
2126 )
2127 })?;
2128 let out_x_u32 = unsafe { reinterpret_u8_as_u32(&mut out_x) };
2129 let out_y_u32 = unsafe { reinterpret_u8_as_u32(&mut out_y) };
2130 let out_z_u32 = unsafe { reinterpret_u8_as_u32(&mut out_z) };
2131 let mut params: Vec<*mut c_void> = vec![
2132 count_u32.as_kernel_param(),
2133 materialize_offsets.as_kernel_param(),
2134 plan.block_work_unit.as_kernel_param(),
2135 total_rows.as_kernel_param(),
2136 (&plan.scratch_x).as_kernel_param(),
2137 (&plan.scratch_y).as_kernel_param(),
2138 (&plan.scratch_z).as_kernel_param(),
2139 out_x_u32.as_kernel_param(),
2140 out_y_u32.as_kernel_param(),
2141 out_z_u32.as_kernel_param(),
2142 ];
2143 unsafe {
2144 kernel
2145 .clone()
2146 .launch_on_stream(
2147 &cu_stream,
2148 LaunchConfig {
2149 grid_dim: (grid, 1, 1),
2150 block_dim: (BLOCK_SIZE, 1, 1),
2151 shared_mem_bytes: 0,
2152 },
2153 &mut params,
2154 )
2155 .map_err(|e| {
2156 XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
2157 })?;
2158 }
2159 }
2160 rec_mat
2161 .commit(runtime)
2162 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
2163 cu_stream.synchronize().map_err(|e| {
2164 XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
2165 })?;
2166
2167 Ok(CudaBuffer::from_columns_with_host_count(
2168 vec![out_x.into(), out_y.into(), out_z.into()],
2169 total_rows as u64,
2170 total_rows_device,
2171 out_schema,
2172 total_rows,
2173 ))
2174 }
2175
2176 pub fn wcoj_triangle_hg_work_plan_u64_recorded(
2177 &self,
2178 e_xy: &CudaBuffer,
2179 e_yz: &CudaBuffer,
2180 e_xz: &CudaBuffer,
2181 block_work_unit: u32,
2182 launch_stream: StreamId,
2183 ) -> Result<WcojTriangleHgWorkPlanU64> {
2184 let ctx = "wcoj_triangle_hg_work_plan_u64_recorded";
2185 if block_work_unit == 0 {
2186 return Err(XlogError::Kernel(format!(
2187 "{ctx}: block_work_unit must be nonzero"
2188 )));
2189 }
2190 validate_binary_u64(ctx, "e_xy", e_xy)?;
2191 validate_binary_u64(ctx, "e_yz", e_yz)?;
2192 validate_binary_u64(ctx, "e_xz", e_xz)?;
2193
2194 let n_xy = self.metadata_logical_rows(e_xy)?;
2195 let n_yz = self.metadata_logical_rows(e_yz)?;
2196 let n_xz = self.metadata_logical_rows(e_xz)?;
2197 let prefix_len = n_xy
2198 .checked_add(1)
2199 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
2200 let mut xy_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
2201 let mut xy_yz_start = self.memory().alloc::<u32>(n_xy as usize)?;
2202 let mut xy_yz_end = self.memory().alloc::<u32>(n_xy as usize)?;
2203 let mut xy_xz_start = self.memory().alloc::<u32>(n_xy as usize)?;
2204 let mut xy_xz_end = self.memory().alloc::<u32>(n_xy as usize)?;
2205
2206 if n_xy == 0 {
2207 let block_counts = self.memory().alloc::<u32>(1)?;
2208 let block_offsets = self.memory().alloc::<u32>(1)?;
2209 return Ok(WcojTriangleHgWorkPlanU64 {
2210 xy_work_prefix,
2211 xy_yz_start,
2212 xy_yz_end,
2213 xy_xz_start,
2214 xy_xz_end,
2215 block_counts,
2216 block_offsets,
2217 total_work: 0,
2218 block_work_unit,
2219 row_count: n_xy,
2220 });
2221 }
2222
2223 let runtime = self.memory().runtime().ok_or_else(|| {
2224 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2225 })?;
2226 let cu_stream = runtime
2227 .stream_pool()
2228 .resolve(launch_stream)
2229 .ok_or_else(|| {
2230 XlogError::Kernel(format!(
2231 "{ctx}: launch_stream StreamId({}) does not resolve",
2232 launch_stream.0
2233 ))
2234 })?;
2235
2236 let xy_col0 = metadata_column_u64(e_xy, 0)?;
2237 let xy_col1 = metadata_column_u64(e_xy, 1)?;
2238 let yz_col0 = metadata_column_u64(e_yz, 0)?;
2239 let xz_col0 = metadata_column_u64(e_xz, 0)?;
2240
2241 let mut rec = LaunchRecorder::new_strict(launch_stream);
2242 rec.read(e_xy.num_rows_device());
2243 rec.read(e_yz.num_rows_device());
2244 rec.read(e_xz.num_rows_device());
2245 rec.read_column(e_xy.column(0).expect("xy.col0"));
2246 rec.read_column(e_xy.column(1).expect("xy.col1"));
2247 rec.read_column(e_yz.column(0).expect("yz.col0"));
2248 rec.read_column(e_xz.column(0).expect("xz.col0"));
2249 rec.write(&xy_work_prefix);
2250 rec.write(&xy_yz_start);
2251 rec.write(&xy_yz_end);
2252 rec.write(&xy_xz_start);
2253 rec.write(&xy_xz_end);
2254 rec.preflight(runtime)
2255 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
2256
2257 let kernel = self
2258 .device()
2259 .inner()
2260 .get_func(
2261 WCOJ_MODULE,
2262 wcoj_kernels::WCOJ_TRIANGLE_BUILD_HG_WORK_PLAN_U64,
2263 )
2264 .ok_or_else(|| {
2265 XlogError::Kernel(
2266 "wcoj_triangle_build_hg_work_plan_u64 kernel not found".to_string(),
2267 )
2268 })?;
2269 let grid = n_xy.div_ceil(BLOCK_SIZE);
2270 unsafe {
2271 kernel
2272 .clone()
2273 .launch_on_stream(
2274 &cu_stream,
2275 LaunchConfig {
2276 grid_dim: (grid, 1, 1),
2277 block_dim: (BLOCK_SIZE, 1, 1),
2278 shared_mem_bytes: 0,
2279 },
2280 (
2281 xy_col0,
2282 xy_col1,
2283 n_xy,
2284 yz_col0,
2285 n_yz,
2286 xz_col0,
2287 n_xz,
2288 &mut xy_work_prefix,
2289 &mut xy_yz_start,
2290 &mut xy_yz_end,
2291 &mut xy_xz_start,
2292 &mut xy_xz_end,
2293 ),
2294 )
2295 .map_err(|e| {
2296 XlogError::Kernel(format!(
2297 "wcoj_triangle_build_hg_work_plan_u64 launch failed: {e}"
2298 ))
2299 })?;
2300 }
2301 self.multiblock_scan_u32_inplace_on_stream(
2302 &mut xy_work_prefix,
2303 prefix_len,
2304 &cu_stream,
2305 launch_stream,
2306 runtime,
2307 )?;
2308 rec.commit(runtime)
2309 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
2310 cu_stream
2311 .synchronize()
2312 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
2313 let total_work = self.dtoh_scalar_untracked::<u32>(&xy_work_prefix, n_xy as usize)?;
2314 let grid = if total_work == 0 {
2315 1
2316 } else {
2317 total_work.div_ceil(block_work_unit)
2318 };
2319 let block_counts = self.memory().alloc::<u32>(grid as usize)?;
2320 let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
2321
2322 Ok(WcojTriangleHgWorkPlanU64 {
2323 xy_work_prefix,
2324 xy_yz_start,
2325 xy_yz_end,
2326 xy_xz_start,
2327 xy_xz_end,
2328 block_counts,
2329 block_offsets,
2330 total_work,
2331 block_work_unit,
2332 row_count: n_xy,
2333 })
2334 }
2335
2336 pub fn wcoj_triangle_hg_u64_recorded(
2337 &self,
2338 e_xy: &CudaBuffer,
2339 e_yz: &CudaBuffer,
2340 e_xz: &CudaBuffer,
2341 block_work_unit: u32,
2342 launch_stream: StreamId,
2343 ) -> Result<CudaBuffer> {
2344 let ctx = "wcoj_triangle_hg_u64_recorded";
2345 validate_binary_u64(ctx, "e_xy", e_xy)?;
2346 validate_binary_u64(ctx, "e_yz", e_yz)?;
2347 validate_binary_u64(ctx, "e_xz", e_xz)?;
2348 let plan = self.wcoj_triangle_hg_work_plan_u64_recorded(
2349 e_xy,
2350 e_yz,
2351 e_xz,
2352 block_work_unit,
2353 launch_stream,
2354 )?;
2355 let out_schema = Schema::new(vec![
2356 ("col0".to_string(), ScalarType::U64),
2357 ("col1".to_string(), ScalarType::U64),
2358 ("col2".to_string(), ScalarType::U64),
2359 ]);
2360 if plan.total_work == 0 {
2361 return self.create_empty_buffer(out_schema);
2362 }
2363
2364 let grid = plan.total_work.div_ceil(plan.block_work_unit);
2365 let bytes_count = (grid as usize)
2366 .checked_mul(std::mem::size_of::<u32>())
2367 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
2368 let mut local_counts = None;
2369 let mut local_offsets = None;
2370 if grid > 1024 {
2371 local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
2372 local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
2373 }
2374 let total_rows_device = self.memory().alloc::<u32>(1)?;
2375
2376 let runtime = self.memory().runtime().ok_or_else(|| {
2377 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2378 })?;
2379 let cu_stream = runtime
2380 .stream_pool()
2381 .resolve(launch_stream)
2382 .ok_or_else(|| {
2383 XlogError::Kernel(format!(
2384 "{ctx}: launch_stream StreamId({}) does not resolve",
2385 launch_stream.0
2386 ))
2387 })?;
2388
2389 let xy_col0 = metadata_column_u64(e_xy, 0)?;
2390 let xy_col1 = metadata_column_u64(e_xy, 1)?;
2391 let yz_col1 = metadata_column_u64(e_yz, 1)?;
2392 let xz_col1 = metadata_column_u64(e_xz, 1)?;
2393 let n_yz = self.metadata_logical_rows(e_yz)?;
2394 let n_xz = self.metadata_logical_rows(e_xz)?;
2395
2396 let count_u32 = if grid <= 1024 {
2397 &plan.block_counts
2398 } else {
2399 local_counts
2400 .as_ref()
2401 .expect("local HG counts allocated when grid exceeds single-block scan")
2402 };
2403 let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
2404 rec_hg.read(e_xy.num_rows_device());
2405 rec_hg.read(e_yz.num_rows_device());
2406 rec_hg.read(e_xz.num_rows_device());
2407 rec_hg.read_column(e_yz.column(1).expect("yz.col1"));
2408 rec_hg.read_column(e_xz.column(1).expect("xz.col1"));
2409 rec_hg.read(&plan.xy_work_prefix);
2410 rec_hg.read(&plan.xy_yz_start);
2411 rec_hg.read(&plan.xy_yz_end);
2412 rec_hg.read(&plan.xy_xz_start);
2413 rec_hg.read(&plan.xy_xz_end);
2414 rec_hg.read_write(count_u32);
2415 if grid <= 1024 {
2416 rec_hg.read_write(&plan.block_offsets);
2417 } else {
2418 rec_hg.read_write(
2419 local_offsets
2420 .as_ref()
2421 .expect("local HG offsets allocated when grid exceeds single-block scan"),
2422 );
2423 }
2424 rec_hg.write(&total_rows_device);
2425 rec_hg
2426 .preflight(runtime)
2427 .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
2428 {
2429 let kernel = self
2430 .device()
2431 .inner()
2432 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_COUNT_HG_U64)
2433 .ok_or_else(|| {
2434 XlogError::Kernel("wcoj_triangle_count_hg_u64 kernel not found".to_string())
2435 })?;
2436 let mut params: Vec<*mut c_void> = vec![
2437 yz_col1.as_kernel_param(),
2438 n_yz.as_kernel_param(),
2439 xz_col1.as_kernel_param(),
2440 n_xz.as_kernel_param(),
2441 (&plan.xy_work_prefix).as_kernel_param(),
2442 (&plan.xy_yz_start).as_kernel_param(),
2443 (&plan.xy_yz_end).as_kernel_param(),
2444 (&plan.xy_xz_start).as_kernel_param(),
2445 (&plan.xy_xz_end).as_kernel_param(),
2446 plan.row_count.as_kernel_param(),
2447 plan.total_work.as_kernel_param(),
2448 plan.block_work_unit.as_kernel_param(),
2449 count_u32.as_kernel_param(),
2450 ];
2451 unsafe {
2452 kernel
2453 .clone()
2454 .launch_on_stream(
2455 &cu_stream,
2456 LaunchConfig {
2457 grid_dim: (grid, 1, 1),
2458 block_dim: (BLOCK_SIZE, 1, 1),
2459 shared_mem_bytes: 0,
2460 },
2461 &mut params,
2462 )
2463 .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
2464 }
2465 }
2466 if grid <= 1024 {
2467 let kernel = self
2468 .device()
2469 .inner()
2470 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
2471 .ok_or_else(|| {
2472 XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
2473 })?;
2474 let mut params: Vec<*mut c_void> = vec![
2475 count_u32.as_kernel_param(),
2476 grid.as_kernel_param(),
2477 (&plan.block_offsets).as_kernel_param(),
2478 (&total_rows_device).as_kernel_param(),
2479 ];
2480 unsafe {
2481 kernel
2482 .clone()
2483 .launch_on_stream(
2484 &cu_stream,
2485 LaunchConfig {
2486 grid_dim: (1, 1, 1),
2487 block_dim: (1024, 1, 1),
2488 shared_mem_bytes: 0,
2489 },
2490 &mut params,
2491 )
2492 .map_err(|e| {
2493 XlogError::Kernel(format!("{ctx}: HG block-count scan failed: {e}"))
2494 })?;
2495 }
2496 } else {
2497 let offsets_mut = local_offsets
2498 .as_mut()
2499 .expect("local HG offsets allocated when grid exceeds single-block scan");
2500 unsafe {
2501 let res = sys::cuMemcpyDtoDAsync_v2(
2502 *offsets_mut.device_ptr(),
2503 *count_u32.device_ptr(),
2504 bytes_count,
2505 cu_stream.cu_stream(),
2506 );
2507 if res != sys::cudaError_enum::CUDA_SUCCESS {
2508 return Err(XlogError::Kernel(format!(
2509 "{ctx}: DtoD count to offsets failed: {res:?}"
2510 )));
2511 }
2512 }
2513 self.multiblock_scan_u32_inplace_on_stream(
2514 offsets_mut,
2515 grid,
2516 &cu_stream,
2517 launch_stream,
2518 runtime,
2519 )?;
2520 let total_kernel = self
2521 .device()
2522 .inner()
2523 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
2524 .ok_or_else(|| {
2525 XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
2526 })?;
2527 let mut params: Vec<*mut c_void> = vec![
2528 count_u32.as_kernel_param(),
2529 (&*offsets_mut).as_kernel_param(),
2530 grid.as_kernel_param(),
2531 (&total_rows_device).as_kernel_param(),
2532 ];
2533 unsafe {
2534 total_kernel
2535 .clone()
2536 .launch_on_stream(
2537 &cu_stream,
2538 LaunchConfig {
2539 grid_dim: (1, 1, 1),
2540 block_dim: (1, 1, 1),
2541 shared_mem_bytes: 0,
2542 },
2543 &mut params,
2544 )
2545 .map_err(|e| {
2546 XlogError::Kernel(format!("{ctx}: HG total reducer failed: {e}"))
2547 })?;
2548 }
2549 }
2550 rec_hg
2551 .commit(runtime)
2552 .map_err(|e| XlogError::Kernel(format!("{ctx}: HG count commit failed: {e}")))?;
2553 cu_stream
2554 .synchronize()
2555 .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
2556 let total_rows = self
2557 .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
2558 .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
2559 if total_rows == 0 {
2560 return self.create_empty_buffer(out_schema);
2561 }
2562
2563 let bytes_per_col = (total_rows as usize)
2564 .checked_mul(std::mem::size_of::<u64>())
2565 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
2566 let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
2567 let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
2568 let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
2569 let materialize_offsets = if grid <= 1024 {
2570 &plan.block_offsets
2571 } else {
2572 local_offsets
2573 .as_ref()
2574 .expect("local HG offsets allocated when grid exceeds single-block scan")
2575 };
2576 let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
2577 rec_mat.read(materialize_offsets);
2578 rec_mat.read(e_xy.num_rows_device());
2579 rec_mat.read(e_yz.num_rows_device());
2580 rec_mat.read(e_xz.num_rows_device());
2581 rec_mat.read_column(e_xy.column(0).expect("xy.col0"));
2582 rec_mat.read_column(e_xy.column(1).expect("xy.col1"));
2583 rec_mat.read_column(e_yz.column(1).expect("yz.col1"));
2584 rec_mat.read_column(e_xz.column(1).expect("xz.col1"));
2585 rec_mat.read(&plan.xy_work_prefix);
2586 rec_mat.read(&plan.xy_yz_start);
2587 rec_mat.read(&plan.xy_yz_end);
2588 rec_mat.read(&plan.xy_xz_start);
2589 rec_mat.read(&plan.xy_xz_end);
2590 rec_mat.write(&out_x);
2591 rec_mat.write(&out_y);
2592 rec_mat.write(&out_z);
2593 rec_mat
2594 .preflight(runtime)
2595 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
2596 {
2597 let kernel = self
2598 .device()
2599 .inner()
2600 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_TRIANGLE_MATERIALIZE_HG_U64)
2601 .ok_or_else(|| {
2602 XlogError::Kernel(
2603 "wcoj_triangle_materialize_hg_u64 kernel not found".to_string(),
2604 )
2605 })?;
2606 let out_x_u64 = unsafe { reinterpret_u8_as_u64(&mut out_x) };
2607 let out_y_u64 = unsafe { reinterpret_u8_as_u64(&mut out_y) };
2608 let out_z_u64 = unsafe { reinterpret_u8_as_u64(&mut out_z) };
2609 let mut params: Vec<*mut c_void> = vec![
2610 xy_col0.as_kernel_param(),
2611 xy_col1.as_kernel_param(),
2612 yz_col1.as_kernel_param(),
2613 n_yz.as_kernel_param(),
2614 xz_col1.as_kernel_param(),
2615 n_xz.as_kernel_param(),
2616 (&plan.xy_work_prefix).as_kernel_param(),
2617 (&plan.xy_yz_start).as_kernel_param(),
2618 (&plan.xy_yz_end).as_kernel_param(),
2619 (&plan.xy_xz_start).as_kernel_param(),
2620 (&plan.xy_xz_end).as_kernel_param(),
2621 plan.row_count.as_kernel_param(),
2622 plan.total_work.as_kernel_param(),
2623 plan.block_work_unit.as_kernel_param(),
2624 materialize_offsets.as_kernel_param(),
2625 total_rows.as_kernel_param(),
2626 out_x_u64.as_kernel_param(),
2627 out_y_u64.as_kernel_param(),
2628 out_z_u64.as_kernel_param(),
2629 ];
2630 unsafe {
2631 kernel
2632 .clone()
2633 .launch_on_stream(
2634 &cu_stream,
2635 LaunchConfig {
2636 grid_dim: (grid, 1, 1),
2637 block_dim: (BLOCK_SIZE, 1, 1),
2638 shared_mem_bytes: 0,
2639 },
2640 &mut params,
2641 )
2642 .map_err(|e| {
2643 XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
2644 })?;
2645 }
2646 }
2647 rec_mat
2648 .commit(runtime)
2649 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
2650 cu_stream.synchronize().map_err(|e| {
2651 XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
2652 })?;
2653
2654 Ok(CudaBuffer::from_columns_with_host_count(
2655 vec![out_x.into(), out_y.into(), out_z.into()],
2656 total_rows as u64,
2657 total_rows_device,
2658 out_schema,
2659 total_rows,
2660 ))
2661 }
2662
2663 pub fn wcoj_4cycle_hg_work_plan_u32_recorded(
2664 &self,
2665 e1: &CudaBuffer,
2666 e2: &CudaBuffer,
2667 e3: &CudaBuffer,
2668 e4: &CudaBuffer,
2669 block_work_unit: u32,
2670 launch_stream: StreamId,
2671 ) -> Result<WcojCycle4HgWorkPlanU32> {
2672 let ctx = "wcoj_4cycle_hg_work_plan_u32_recorded";
2673 if block_work_unit == 0 {
2674 return Err(XlogError::Kernel(format!(
2675 "{ctx}: block_work_unit must be nonzero"
2676 )));
2677 }
2678 validate_binary_u32(ctx, "e1", e1)?;
2679 validate_binary_u32(ctx, "e2", e2)?;
2680 validate_binary_u32(ctx, "e3", e3)?;
2681 validate_binary_u32(ctx, "e4", e4)?;
2682
2683 let n_e1 = self.metadata_logical_rows(e1)?;
2684 let n_e2 = self.metadata_logical_rows(e2)?;
2685 let n_e3 = self.metadata_logical_rows(e3)?;
2686 let prefix_len = n_e1
2687 .checked_add(1)
2688 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
2689 let e2_prefix_len = n_e2
2690 .checked_add(1)
2691 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: e2 prefix length overflow")))?;
2692 let mut e1_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
2693 let mut e2_work_prefix = self.memory().alloc::<u32>(e2_prefix_len as usize)?;
2694 let mut e1_e2_start = self.memory().alloc::<u32>(n_e1 as usize)?;
2695 let mut e1_e2_end = self.memory().alloc::<u32>(n_e1 as usize)?;
2696
2697 if n_e1 == 0 || n_e2 == 0 || n_e3 == 0 || self.metadata_logical_rows(e4)? == 0 {
2698 let block_counts = self.memory().alloc::<u32>(1)?;
2699 let block_offsets = self.memory().alloc::<u32>(1)?;
2700 return Ok(WcojCycle4HgWorkPlanU32 {
2701 e1_work_prefix,
2702 e2_work_prefix,
2703 e1_e2_start,
2704 e1_e2_end,
2705 block_counts,
2706 block_offsets,
2707 total_work: 0,
2708 block_work_unit,
2709 row_count: n_e1,
2710 });
2711 }
2712
2713 let runtime = self.memory().runtime().ok_or_else(|| {
2714 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2715 })?;
2716 let cu_stream = runtime
2717 .stream_pool()
2718 .resolve(launch_stream)
2719 .ok_or_else(|| {
2720 XlogError::Kernel(format!(
2721 "{ctx}: launch_stream StreamId({}) does not resolve",
2722 launch_stream.0
2723 ))
2724 })?;
2725
2726 let e1_col1 = metadata_column_u32(e1, 1)?;
2727 let e2_col0 = metadata_column_u32(e2, 0)?;
2728 let e2_col1 = metadata_column_u32(e2, 1)?;
2729 let e3_col0 = metadata_column_u32(e3, 0)?;
2730
2731 let mut rec = LaunchRecorder::new_strict(launch_stream);
2732 rec.read(e1.num_rows_device());
2733 rec.read(e2.num_rows_device());
2734 rec.read(e3.num_rows_device());
2735 rec.read_column(e1.column(1).expect("e1.col1"));
2736 rec.read_column(e2.column(0).expect("e2.col0"));
2737 rec.read_column(e2.column(1).expect("e2.col1"));
2738 rec.read_column(e3.column(0).expect("e3.col0"));
2739 rec.read_write(&e2_work_prefix);
2740 rec.write(&e1_work_prefix);
2741 rec.write(&e1_e2_start);
2742 rec.write(&e1_e2_end);
2743 rec.preflight(runtime)
2744 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
2745
2746 let e2_kernel = self
2747 .device()
2748 .inner()
2749 .get_func(
2750 WCOJ_MODULE,
2751 wcoj_kernels::WCOJ_4CYCLE_BUILD_E2_WORK_PREFIX_U32,
2752 )
2753 .ok_or_else(|| {
2754 XlogError::Kernel(
2755 "wcoj_4cycle_build_e2_work_prefix_u32 kernel not found".to_string(),
2756 )
2757 })?;
2758 let e2_grid = n_e2.div_ceil(BLOCK_SIZE);
2759 unsafe {
2760 e2_kernel
2761 .clone()
2762 .launch_on_stream(
2763 &cu_stream,
2764 LaunchConfig {
2765 grid_dim: (e2_grid, 1, 1),
2766 block_dim: (BLOCK_SIZE, 1, 1),
2767 shared_mem_bytes: 0,
2768 },
2769 (e2_col1, n_e2, e3_col0, n_e3, &mut e2_work_prefix),
2770 )
2771 .map_err(|e| {
2772 XlogError::Kernel(format!(
2773 "wcoj_4cycle_build_e2_work_prefix_u32 launch failed: {e}"
2774 ))
2775 })?;
2776 }
2777 self.multiblock_scan_u32_inplace_on_stream(
2778 &mut e2_work_prefix,
2779 e2_prefix_len,
2780 &cu_stream,
2781 launch_stream,
2782 runtime,
2783 )?;
2784
2785 let kernel = self
2786 .device()
2787 .inner()
2788 .get_func(
2789 WCOJ_MODULE,
2790 wcoj_kernels::WCOJ_4CYCLE_BUILD_HG_WORK_PLAN_U32,
2791 )
2792 .ok_or_else(|| {
2793 XlogError::Kernel("wcoj_4cycle_build_hg_work_plan_u32 kernel not found".to_string())
2794 })?;
2795 let grid = n_e1.div_ceil(BLOCK_SIZE);
2796 unsafe {
2797 kernel
2798 .clone()
2799 .launch_on_stream(
2800 &cu_stream,
2801 LaunchConfig {
2802 grid_dim: (grid, 1, 1),
2803 block_dim: (BLOCK_SIZE, 1, 1),
2804 shared_mem_bytes: 0,
2805 },
2806 (
2807 e1_col1,
2808 n_e1,
2809 e2_col0,
2810 n_e2,
2811 &e2_work_prefix,
2812 &mut e1_work_prefix,
2813 &mut e1_e2_start,
2814 &mut e1_e2_end,
2815 ),
2816 )
2817 .map_err(|e| {
2818 XlogError::Kernel(format!(
2819 "wcoj_4cycle_build_hg_work_plan_u32 launch failed: {e}"
2820 ))
2821 })?;
2822 }
2823 self.multiblock_scan_u32_inplace_on_stream(
2824 &mut e1_work_prefix,
2825 prefix_len,
2826 &cu_stream,
2827 launch_stream,
2828 runtime,
2829 )?;
2830 rec.commit(runtime)
2831 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
2832 cu_stream
2833 .synchronize()
2834 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
2835 let total_work = self.dtoh_scalar_untracked::<u32>(&e1_work_prefix, n_e1 as usize)?;
2836 let grid = if total_work == 0 {
2837 1
2838 } else {
2839 total_work.div_ceil(block_work_unit)
2840 };
2841 let block_counts = self.memory().alloc::<u32>(grid as usize)?;
2842 let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
2843
2844 Ok(WcojCycle4HgWorkPlanU32 {
2845 e1_work_prefix,
2846 e2_work_prefix,
2847 e1_e2_start,
2848 e1_e2_end,
2849 block_counts,
2850 block_offsets,
2851 total_work,
2852 block_work_unit,
2853 row_count: n_e1,
2854 })
2855 }
2856
2857 pub fn wcoj_4cycle_hg_u32_recorded(
2858 &self,
2859 e1: &CudaBuffer,
2860 e2: &CudaBuffer,
2861 e3: &CudaBuffer,
2862 e4: &CudaBuffer,
2863 block_work_unit: u32,
2864 launch_stream: StreamId,
2865 ) -> Result<CudaBuffer> {
2866 let ctx = "wcoj_4cycle_hg_u32_recorded";
2867 validate_binary_u32(ctx, "e1", e1)?;
2868 validate_binary_u32(ctx, "e2", e2)?;
2869 validate_binary_u32(ctx, "e3", e3)?;
2870 validate_binary_u32(ctx, "e4", e4)?;
2871 let plan = self.wcoj_4cycle_hg_work_plan_u32_recorded(
2872 e1,
2873 e2,
2874 e3,
2875 e4,
2876 block_work_unit,
2877 launch_stream,
2878 )?;
2879 let out_schema = Schema::new(vec![
2880 (
2881 "col0".to_string(),
2882 e1.schema().column_type(0).expect("e1.col0 type"),
2883 ),
2884 (
2885 "col1".to_string(),
2886 e1.schema().column_type(1).expect("e1.col1 type"),
2887 ),
2888 (
2889 "col2".to_string(),
2890 e2.schema().column_type(1).expect("e2.col1 type"),
2891 ),
2892 (
2893 "col3".to_string(),
2894 e3.schema().column_type(1).expect("e3.col1 type"),
2895 ),
2896 ]);
2897 if plan.total_work == 0 {
2898 return self.create_empty_buffer(out_schema);
2899 }
2900
2901 let grid = plan.total_work.div_ceil(plan.block_work_unit);
2902 let bytes_count = (grid as usize)
2903 .checked_mul(std::mem::size_of::<u32>())
2904 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
2905 let mut local_counts = None;
2906 let mut local_offsets = None;
2907 if grid > 1024 {
2908 local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
2909 local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
2910 }
2911 let total_rows_device = self.memory().alloc::<u32>(1)?;
2912
2913 let runtime = self.memory().runtime().ok_or_else(|| {
2914 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
2915 })?;
2916 let cu_stream = runtime
2917 .stream_pool()
2918 .resolve(launch_stream)
2919 .ok_or_else(|| {
2920 XlogError::Kernel(format!(
2921 "{ctx}: launch_stream StreamId({}) does not resolve",
2922 launch_stream.0
2923 ))
2924 })?;
2925
2926 let e1_col0 = metadata_column_u32(e1, 0)?;
2927 let e1_col1 = metadata_column_u32(e1, 1)?;
2928 let e2_col1 = metadata_column_u32(e2, 1)?;
2929 let e3_col0 = metadata_column_u32(e3, 0)?;
2930 let e3_col1 = metadata_column_u32(e3, 1)?;
2931 let e4_col0 = metadata_column_u32(e4, 0)?;
2932 let e4_col1 = metadata_column_u32(e4, 1)?;
2933 let n_e3 = self.metadata_logical_rows(e3)?;
2934 let n_e4 = self.metadata_logical_rows(e4)?;
2935
2936 let count_u32 = if grid <= 1024 {
2937 &plan.block_counts
2938 } else {
2939 local_counts
2940 .as_ref()
2941 .expect("local HG counts allocated when grid exceeds single-block scan")
2942 };
2943 let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
2944 rec_hg.read(e1.num_rows_device());
2945 rec_hg.read(e2.num_rows_device());
2946 rec_hg.read(e3.num_rows_device());
2947 rec_hg.read(e4.num_rows_device());
2948 rec_hg.read_column(e1.column(0).expect("e1.col0"));
2949 rec_hg.read_column(e1.column(1).expect("e1.col1"));
2950 rec_hg.read_column(e2.column(1).expect("e2.col1"));
2951 rec_hg.read_column(e3.column(0).expect("e3.col0"));
2952 rec_hg.read_column(e3.column(1).expect("e3.col1"));
2953 rec_hg.read_column(e4.column(0).expect("e4.col0"));
2954 rec_hg.read_column(e4.column(1).expect("e4.col1"));
2955 rec_hg.read(&plan.e1_work_prefix);
2956 rec_hg.read(&plan.e2_work_prefix);
2957 rec_hg.read(&plan.e1_e2_start);
2958 rec_hg.read(&plan.e1_e2_end);
2959 rec_hg.read_write(count_u32);
2960 if grid <= 1024 {
2961 rec_hg.read_write(&plan.block_offsets);
2962 } else {
2963 rec_hg.read_write(
2964 local_offsets
2965 .as_ref()
2966 .expect("local HG offsets allocated when grid exceeds single-block scan"),
2967 );
2968 }
2969 rec_hg.write(&total_rows_device);
2970 rec_hg
2971 .preflight(runtime)
2972 .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
2973 {
2974 let kernel = self
2975 .device()
2976 .inner()
2977 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_COUNT_HG_U32)
2978 .ok_or_else(|| {
2979 XlogError::Kernel("wcoj_4cycle_count_hg_u32 kernel not found".to_string())
2980 })?;
2981 let mut params: Vec<*mut c_void> = vec![
2982 e1_col0.as_kernel_param(),
2983 e1_col1.as_kernel_param(),
2984 plan.row_count.as_kernel_param(),
2985 e2_col1.as_kernel_param(),
2986 e3_col0.as_kernel_param(),
2987 e3_col1.as_kernel_param(),
2988 n_e3.as_kernel_param(),
2989 e4_col0.as_kernel_param(),
2990 e4_col1.as_kernel_param(),
2991 n_e4.as_kernel_param(),
2992 (&plan.e1_work_prefix).as_kernel_param(),
2993 (&plan.e2_work_prefix).as_kernel_param(),
2994 (&plan.e1_e2_start).as_kernel_param(),
2995 (&plan.e1_e2_end).as_kernel_param(),
2996 plan.total_work.as_kernel_param(),
2997 plan.block_work_unit.as_kernel_param(),
2998 count_u32.as_kernel_param(),
2999 ];
3000 unsafe {
3001 kernel
3002 .clone()
3003 .launch_on_stream(
3004 &cu_stream,
3005 LaunchConfig {
3006 grid_dim: (grid, 1, 1),
3007 block_dim: (BLOCK_SIZE, 1, 1),
3008 shared_mem_bytes: 0,
3009 },
3010 &mut params,
3011 )
3012 .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
3013 }
3014 }
3015 if grid <= 1024 {
3016 let kernel = self
3017 .device()
3018 .inner()
3019 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
3020 .ok_or_else(|| {
3021 XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
3022 })?;
3023 let mut params: Vec<*mut c_void> = vec![
3024 count_u32.as_kernel_param(),
3025 grid.as_kernel_param(),
3026 (&plan.block_offsets).as_kernel_param(),
3027 (&total_rows_device).as_kernel_param(),
3028 ];
3029 unsafe {
3030 kernel
3031 .clone()
3032 .launch_on_stream(
3033 &cu_stream,
3034 LaunchConfig {
3035 grid_dim: (1, 1, 1),
3036 block_dim: (1024, 1, 1),
3037 shared_mem_bytes: 0,
3038 },
3039 &mut params,
3040 )
3041 .map_err(|e| XlogError::Kernel(format!("{ctx}: scan failed: {e}")))?;
3042 }
3043 } else {
3044 let offsets_mut = local_offsets
3045 .as_mut()
3046 .expect("local HG offsets allocated when grid exceeds single-block scan");
3047 unsafe {
3048 let res = sys::cuMemcpyDtoDAsync_v2(
3049 *offsets_mut.device_ptr(),
3050 *count_u32.device_ptr(),
3051 bytes_count,
3052 cu_stream.cu_stream(),
3053 );
3054 if res != sys::cudaError_enum::CUDA_SUCCESS {
3055 return Err(XlogError::Kernel(format!(
3056 "{ctx}: DtoD count to offsets failed: {res:?}"
3057 )));
3058 }
3059 }
3060 self.multiblock_scan_u32_inplace_on_stream(
3061 offsets_mut,
3062 grid,
3063 &cu_stream,
3064 launch_stream,
3065 runtime,
3066 )?;
3067 let total_kernel = self
3068 .device()
3069 .inner()
3070 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
3071 .ok_or_else(|| {
3072 XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
3073 })?;
3074 let mut params: Vec<*mut c_void> = vec![
3075 count_u32.as_kernel_param(),
3076 (&*offsets_mut).as_kernel_param(),
3077 grid.as_kernel_param(),
3078 (&total_rows_device).as_kernel_param(),
3079 ];
3080 unsafe {
3081 total_kernel
3082 .clone()
3083 .launch_on_stream(
3084 &cu_stream,
3085 LaunchConfig {
3086 grid_dim: (1, 1, 1),
3087 block_dim: (1, 1, 1),
3088 shared_mem_bytes: 0,
3089 },
3090 &mut params,
3091 )
3092 .map_err(|e| XlogError::Kernel(format!("{ctx}: total failed: {e}")))?;
3093 }
3094 }
3095 rec_hg
3096 .commit(runtime)
3097 .map_err(|e| XlogError::Kernel(format!("{ctx}: count commit failed: {e}")))?;
3098 cu_stream
3099 .synchronize()
3100 .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
3101 let total_rows = self
3102 .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
3103 .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
3104 if total_rows == 0 {
3105 return self.create_empty_buffer(out_schema);
3106 }
3107
3108 let bytes_per_col = (total_rows as usize)
3109 .checked_mul(std::mem::size_of::<u32>())
3110 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
3111 let mut out_w = self.memory().alloc::<u8>(bytes_per_col)?;
3112 let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
3113 let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
3114 let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
3115 let materialize_offsets = if grid <= 1024 {
3116 &plan.block_offsets
3117 } else {
3118 local_offsets
3119 .as_ref()
3120 .expect("local HG offsets allocated when grid exceeds single-block scan")
3121 };
3122 let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
3123 rec_mat.read(materialize_offsets);
3124 rec_mat.read(e1.num_rows_device());
3125 rec_mat.read(e2.num_rows_device());
3126 rec_mat.read(e3.num_rows_device());
3127 rec_mat.read(e4.num_rows_device());
3128 rec_mat.read_column(e1.column(0).expect("e1.col0"));
3129 rec_mat.read_column(e1.column(1).expect("e1.col1"));
3130 rec_mat.read_column(e2.column(1).expect("e2.col1"));
3131 rec_mat.read_column(e3.column(0).expect("e3.col0"));
3132 rec_mat.read_column(e3.column(1).expect("e3.col1"));
3133 rec_mat.read_column(e4.column(0).expect("e4.col0"));
3134 rec_mat.read_column(e4.column(1).expect("e4.col1"));
3135 rec_mat.read(&plan.e1_work_prefix);
3136 rec_mat.read(&plan.e2_work_prefix);
3137 rec_mat.read(&plan.e1_e2_start);
3138 rec_mat.read(&plan.e1_e2_end);
3139 rec_mat.write(&out_w);
3140 rec_mat.write(&out_x);
3141 rec_mat.write(&out_y);
3142 rec_mat.write(&out_z);
3143 rec_mat
3144 .preflight(runtime)
3145 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
3146 {
3147 let kernel = self
3148 .device()
3149 .inner()
3150 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_MATERIALIZE_HG_U32)
3151 .ok_or_else(|| {
3152 XlogError::Kernel("wcoj_4cycle_materialize_hg_u32 kernel not found".to_string())
3153 })?;
3154 let out_w_u32 = unsafe { reinterpret_u8_as_u32(&mut out_w) };
3155 let out_x_u32 = unsafe { reinterpret_u8_as_u32(&mut out_x) };
3156 let out_y_u32 = unsafe { reinterpret_u8_as_u32(&mut out_y) };
3157 let out_z_u32 = unsafe { reinterpret_u8_as_u32(&mut out_z) };
3158 let mut params: Vec<*mut c_void> = vec![
3159 e1_col0.as_kernel_param(),
3160 e1_col1.as_kernel_param(),
3161 plan.row_count.as_kernel_param(),
3162 e2_col1.as_kernel_param(),
3163 e3_col0.as_kernel_param(),
3164 e3_col1.as_kernel_param(),
3165 n_e3.as_kernel_param(),
3166 e4_col0.as_kernel_param(),
3167 e4_col1.as_kernel_param(),
3168 n_e4.as_kernel_param(),
3169 (&plan.e1_work_prefix).as_kernel_param(),
3170 (&plan.e2_work_prefix).as_kernel_param(),
3171 (&plan.e1_e2_start).as_kernel_param(),
3172 (&plan.e1_e2_end).as_kernel_param(),
3173 plan.total_work.as_kernel_param(),
3174 plan.block_work_unit.as_kernel_param(),
3175 materialize_offsets.as_kernel_param(),
3176 total_rows.as_kernel_param(),
3177 out_w_u32.as_kernel_param(),
3178 out_x_u32.as_kernel_param(),
3179 out_y_u32.as_kernel_param(),
3180 out_z_u32.as_kernel_param(),
3181 ];
3182 unsafe {
3183 kernel
3184 .clone()
3185 .launch_on_stream(
3186 &cu_stream,
3187 LaunchConfig {
3188 grid_dim: (grid, 1, 1),
3189 block_dim: (BLOCK_SIZE, 1, 1),
3190 shared_mem_bytes: 0,
3191 },
3192 &mut params,
3193 )
3194 .map_err(|e| {
3195 XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
3196 })?;
3197 }
3198 }
3199 rec_mat
3200 .commit(runtime)
3201 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
3202 cu_stream.synchronize().map_err(|e| {
3203 XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
3204 })?;
3205
3206 Ok(CudaBuffer::from_columns_with_host_count(
3207 vec![out_w.into(), out_x.into(), out_y.into(), out_z.into()],
3208 total_rows as u64,
3209 total_rows_device,
3210 out_schema,
3211 total_rows,
3212 ))
3213 }
3214
3215 pub fn wcoj_4cycle_groupby_root_count_u32_recorded(
3238 &self,
3239 e1: &CudaBuffer,
3240 e2: &CudaBuffer,
3241 e3: &CudaBuffer,
3242 e4: &CudaBuffer,
3243 block_work_unit: u32,
3244 launch_stream: StreamId,
3245 ) -> Result<CudaBuffer> {
3246 let ctx = "wcoj_4cycle_groupby_root_count_u32_recorded";
3247 let e1 = &self.wcoj_layout_u32_recorded(e1, launch_stream)?;
3253 let e2 = &self.wcoj_layout_u32_recorded(e2, launch_stream)?;
3254 let e3 = &self.wcoj_layout_u32_recorded(e3, launch_stream)?;
3255 let e4 = &self.wcoj_layout_u32_recorded(e4, launch_stream)?;
3256 validate_binary_u32(ctx, "e1", e1)?;
3257 validate_binary_u32(ctx, "e2", e2)?;
3258 validate_binary_u32(ctx, "e3", e3)?;
3259 validate_binary_u32(ctx, "e4", e4)?;
3260 let plan = self.wcoj_4cycle_hg_work_plan_u32_recorded(
3261 e1,
3262 e2,
3263 e3,
3264 e4,
3265 block_work_unit,
3266 launch_stream,
3267 )?;
3268 let n_e1 = plan.row_count;
3269 let w_type = e1.schema().column_type(0).expect("e1.col0 type");
3270 let out_schema = Schema::new(vec![
3271 ("w".to_string(), w_type),
3272 ("count".to_string(), ScalarType::U64),
3273 ]);
3274 if n_e1 == 0 || plan.total_work == 0 {
3275 return self.create_empty_buffer(out_schema);
3276 }
3277
3278 let runtime = self.memory().runtime().ok_or_else(|| {
3279 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
3280 })?;
3281 let cu_stream = runtime
3282 .stream_pool()
3283 .resolve(launch_stream)
3284 .ok_or_else(|| {
3285 XlogError::Kernel(format!(
3286 "{ctx}: launch_stream StreamId({}) does not resolve",
3287 launch_stream.0
3288 ))
3289 })?;
3290
3291 let e1_col0 = metadata_column_u32(e1, 0)?;
3292 let e1_col1 = metadata_column_u32(e1, 1)?;
3293 let e2_col1 = metadata_column_u32(e2, 1)?;
3294 let e3_col0 = metadata_column_u32(e3, 0)?;
3295 let e3_col1 = metadata_column_u32(e3, 1)?;
3296 let e4_col0 = metadata_column_u32(e4, 0)?;
3297 let e4_col1 = metadata_column_u32(e4, 1)?;
3298 let n_e3 = self.metadata_logical_rows(e3)?;
3299 let n_e4 = self.metadata_logical_rows(e4)?;
3300
3301 let mut row_counts = self
3305 .memory()
3306 .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3307 self.device()
3308 .inner()
3309 .memset_zeros(&mut row_counts)
3310 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
3311
3312 let grid = plan.total_work.div_ceil(plan.block_work_unit);
3313 let mut rec = LaunchRecorder::new_strict(launch_stream);
3314 rec.read(e1.num_rows_device());
3315 rec.read(e2.num_rows_device());
3316 rec.read(e3.num_rows_device());
3317 rec.read(e4.num_rows_device());
3318 rec.read_column(e1.column(0).expect("e1.col0"));
3319 rec.read_column(e1.column(1).expect("e1.col1"));
3320 rec.read_column(e2.column(1).expect("e2.col1"));
3321 rec.read_column(e3.column(0).expect("e3.col0"));
3322 rec.read_column(e3.column(1).expect("e3.col1"));
3323 rec.read_column(e4.column(0).expect("e4.col0"));
3324 rec.read_column(e4.column(1).expect("e4.col1"));
3325 rec.read(&plan.e1_work_prefix);
3326 rec.read(&plan.e2_work_prefix);
3327 rec.read(&plan.e1_e2_start);
3328 rec.read(&plan.e1_e2_end);
3329 rec.write(&row_counts);
3330 rec.preflight(runtime)
3331 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
3332 {
3333 let kernel = self
3334 .device()
3335 .inner()
3336 .get_func(
3337 WCOJ_MODULE,
3338 wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_COUNT_HG_U32,
3339 )
3340 .ok_or_else(|| {
3341 XlogError::Kernel(
3342 "wcoj_4cycle_groupby_root_count_hg_u32 kernel not found".to_string(),
3343 )
3344 })?;
3345 let mut params: Vec<*mut c_void> = vec![
3346 e1_col0.as_kernel_param(),
3347 e1_col1.as_kernel_param(),
3348 plan.row_count.as_kernel_param(),
3349 e2_col1.as_kernel_param(),
3350 e3_col0.as_kernel_param(),
3351 e3_col1.as_kernel_param(),
3352 n_e3.as_kernel_param(),
3353 e4_col0.as_kernel_param(),
3354 e4_col1.as_kernel_param(),
3355 n_e4.as_kernel_param(),
3356 (&plan.e1_work_prefix).as_kernel_param(),
3357 (&plan.e2_work_prefix).as_kernel_param(),
3358 (&plan.e1_e2_start).as_kernel_param(),
3359 (&plan.e1_e2_end).as_kernel_param(),
3360 plan.total_work.as_kernel_param(),
3361 plan.block_work_unit.as_kernel_param(),
3362 (&row_counts).as_kernel_param(),
3363 ];
3364 unsafe {
3365 kernel
3366 .clone()
3367 .launch_on_stream(
3368 &cu_stream,
3369 LaunchConfig {
3370 grid_dim: (grid, 1, 1),
3371 block_dim: (BLOCK_SIZE, 1, 1),
3372 shared_mem_bytes: 0,
3373 },
3374 &mut params,
3375 )
3376 .map_err(|e| {
3377 XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
3378 })?;
3379 }
3380 }
3381 rec.commit(runtime)
3382 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
3383
3384 let w_src = match e1.column(0).expect("e1.col0") {
3388 CudaColumn::Owned(slice) => slice,
3389 _ => {
3390 return Err(XlogError::Kernel(format!(
3391 "{ctx}: e1.col0 must be an owned CudaColumn"
3392 )))
3393 }
3394 };
3395 let mut w_copy = self
3396 .memory()
3397 .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3398 self.device()
3399 .inner()
3400 .dtod_copy(w_src, &mut w_copy)
3401 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy W column failed: {e}")))?;
3402 let mut d_num_rows = self.memory().alloc::<u32>(1)?;
3403 self.device()
3404 .inner()
3405 .dtod_copy(e1.num_rows_device(), &mut d_num_rows)
3406 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
3407 let staging_schema = Schema::new(vec![
3408 ("w".to_string(), w_type),
3409 ("count".to_string(), ScalarType::U32),
3410 ]);
3411 let staging = CudaBuffer::from_columns_with_host_count(
3412 vec![w_copy.into(), row_counts.into()],
3413 n_e1 as u64,
3414 d_num_rows,
3415 staging_schema,
3416 n_e1,
3417 );
3418
3419 let mask = self.compare_const_mask_recorded::<u32>(
3422 &staging,
3423 1,
3424 0u32,
3425 crate::CompareOp::Gt,
3426 launch_stream,
3427 )?;
3428 let compacted =
3429 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
3430 self.groupby_multi_agg_recorded(
3431 &compacted,
3432 &[0],
3433 &[(1, xlog_core::AggOp::Sum)],
3434 launch_stream,
3435 )
3436 }
3437
3438 #[allow(clippy::too_many_arguments)]
3470 pub fn wcoj_4cycle_groupby_root_agg_u32_recorded(
3471 &self,
3472 e1: &CudaBuffer,
3473 e2: &CudaBuffer,
3474 e3: &CudaBuffer,
3475 e4: &CudaBuffer,
3476 agg_op: AggOp,
3477 value: Wcoj4CycleRootAggValue,
3478 block_work_unit: u32,
3479 launch_stream: StreamId,
3480 ) -> Result<CudaBuffer> {
3481 let ctx = "wcoj_4cycle_groupby_root_agg_u32_recorded";
3482 let e1 = &self.wcoj_layout_u32_recorded(e1, launch_stream)?;
3488 let e2 = &self.wcoj_layout_u32_recorded(e2, launch_stream)?;
3489 let e3 = &self.wcoj_layout_u32_recorded(e3, launch_stream)?;
3490 let e4 = &self.wcoj_layout_u32_recorded(e4, launch_stream)?;
3491 let (kernel_name, agg_elem_size, agg_scalar, agg_name) = match agg_op {
3492 AggOp::Sum => (
3493 wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_SUM_HG_U32,
3494 std::mem::size_of::<u64>(),
3495 ScalarType::U64,
3496 "sum_0",
3497 ),
3498 AggOp::Min => (
3499 wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_MIN_HG_U32,
3500 std::mem::size_of::<u32>(),
3501 ScalarType::U32,
3502 "min_0",
3503 ),
3504 AggOp::Max => (
3505 wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_MAX_HG_U32,
3506 std::mem::size_of::<u32>(),
3507 ScalarType::U32,
3508 "max_0",
3509 ),
3510 other => {
3511 return Err(XlogError::Kernel(format!(
3512 "{ctx}: unsupported AggOp {other:?} (Sum/Min/Max only; use \
3513 wcoj_4cycle_groupby_root_count_u32_recorded for Count)"
3514 )))
3515 }
3516 };
3517 validate_binary_u32(ctx, "e1", e1)?;
3518 validate_binary_u32(ctx, "e2", e2)?;
3519 validate_binary_u32(ctx, "e3", e3)?;
3520 validate_binary_u32(ctx, "e4", e4)?;
3521 let (value_buf, value_label) = match value {
3527 Wcoj4CycleRootAggValue::X => (e1, "e1"),
3528 Wcoj4CycleRootAggValue::Y => (e2, "e2"),
3529 Wcoj4CycleRootAggValue::Z => (e3, "e3"),
3530 };
3531 {
3532 let ty = value_buf.schema().column_type(1).expect("validated 2-col");
3533 if ty != ScalarType::U32 {
3534 return Err(XlogError::Kernel(format!(
3535 "{ctx}: {value_label}.col1 supplies the aggregate value and must be U32, \
3536 got {ty:?}"
3537 )));
3538 }
3539 }
3540
3541 let plan = self.wcoj_4cycle_hg_work_plan_u32_recorded(
3542 e1,
3543 e2,
3544 e3,
3545 e4,
3546 block_work_unit,
3547 launch_stream,
3548 )?;
3549 let n_e1 = plan.row_count;
3550 let w_type = e1.schema().column_type(0).expect("e1.col0 type");
3551 let out_schema = Schema::new(vec![
3552 ("w".to_string(), w_type),
3553 (agg_name.to_string(), agg_scalar),
3554 ]);
3555 if n_e1 == 0 || plan.total_work == 0 {
3556 return self.create_empty_buffer(out_schema);
3557 }
3558
3559 let runtime = self.memory().runtime().ok_or_else(|| {
3560 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
3561 })?;
3562 let cu_stream = runtime
3563 .stream_pool()
3564 .resolve(launch_stream)
3565 .ok_or_else(|| {
3566 XlogError::Kernel(format!(
3567 "{ctx}: launch_stream StreamId({}) does not resolve",
3568 launch_stream.0
3569 ))
3570 })?;
3571
3572 let e1_col0 = metadata_column_u32(e1, 0)?;
3573 let e1_col1 = metadata_column_u32(e1, 1)?;
3574 let e2_col1 = metadata_column_u32(e2, 1)?;
3575 let e3_col0 = metadata_column_u32(e3, 0)?;
3576 let e3_col1 = metadata_column_u32(e3, 1)?;
3577 let e4_col0 = metadata_column_u32(e4, 0)?;
3578 let e4_col1 = metadata_column_u32(e4, 1)?;
3579 let n_e3 = self.metadata_logical_rows(e3)?;
3580 let n_e4 = self.metadata_logical_rows(e4)?;
3581 let value_sel: u32 = match value {
3582 Wcoj4CycleRootAggValue::X => 0,
3583 Wcoj4CycleRootAggValue::Y => 1,
3584 Wcoj4CycleRootAggValue::Z => 2,
3585 };
3586
3587 let mut row_counts = self
3591 .memory()
3592 .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3593 self.device()
3594 .inner()
3595 .memset_zeros(&mut row_counts)
3596 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
3597 let mut row_agg = self.memory().alloc::<u8>(n_e1 as usize * agg_elem_size)?;
3598 self.device()
3599 .inner()
3600 .memset_zeros(&mut row_agg)
3601 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row aggregates failed: {e}")))?;
3602
3603 let grid = plan.total_work.div_ceil(plan.block_work_unit);
3604 let mut rec = LaunchRecorder::new_strict(launch_stream);
3605 rec.read(e1.num_rows_device());
3606 rec.read(e2.num_rows_device());
3607 rec.read(e3.num_rows_device());
3608 rec.read(e4.num_rows_device());
3609 rec.read_column(e1.column(0).expect("e1.col0"));
3610 rec.read_column(e1.column(1).expect("e1.col1"));
3611 rec.read_column(e2.column(1).expect("e2.col1"));
3612 rec.read_column(e3.column(0).expect("e3.col0"));
3613 rec.read_column(e3.column(1).expect("e3.col1"));
3614 rec.read_column(e4.column(0).expect("e4.col0"));
3615 rec.read_column(e4.column(1).expect("e4.col1"));
3616 rec.read(&plan.e1_work_prefix);
3617 rec.read(&plan.e2_work_prefix);
3618 rec.read(&plan.e1_e2_start);
3619 rec.read(&plan.e1_e2_end);
3620 rec.write(&row_counts);
3621 rec.write(&row_agg);
3622 rec.preflight(runtime)
3623 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
3624 if matches!(agg_op, AggOp::Min) {
3625 let fill = self
3627 .device()
3628 .inner()
3629 .get_func(ARITH_MODULE, arith_kernels::ARITH_FILL_CONST_U32)
3630 .ok_or_else(|| {
3631 XlogError::Kernel("arith_fill_const_u32 kernel not found".to_string())
3632 })?;
3633 let row_agg_u32 = unsafe { reinterpret_u8_as_u32(&mut row_agg) };
3634 unsafe {
3636 fill.clone()
3637 .launch_on_stream(
3638 &cu_stream,
3639 LaunchConfig::for_num_elems(n_e1),
3640 (u32::MAX, n_e1, &mut *row_agg_u32),
3641 )
3642 .map_err(|e| {
3643 XlogError::Kernel(format!("{ctx}: min identity fill failed: {e}"))
3644 })?;
3645 }
3646 }
3647 {
3648 let kernel = self
3649 .device()
3650 .inner()
3651 .get_func(WCOJ_MODULE, kernel_name)
3652 .ok_or_else(|| XlogError::Kernel(format!("{kernel_name} kernel not found")))?;
3653 let mut params: Vec<*mut c_void> = vec![
3654 e1_col0.as_kernel_param(),
3655 e1_col1.as_kernel_param(),
3656 plan.row_count.as_kernel_param(),
3657 e2_col1.as_kernel_param(),
3658 e3_col0.as_kernel_param(),
3659 e3_col1.as_kernel_param(),
3660 n_e3.as_kernel_param(),
3661 e4_col0.as_kernel_param(),
3662 e4_col1.as_kernel_param(),
3663 n_e4.as_kernel_param(),
3664 value_sel.as_kernel_param(),
3665 (&plan.e1_work_prefix).as_kernel_param(),
3666 (&plan.e2_work_prefix).as_kernel_param(),
3667 (&plan.e1_e2_start).as_kernel_param(),
3668 (&plan.e1_e2_end).as_kernel_param(),
3669 plan.total_work.as_kernel_param(),
3670 plan.block_work_unit.as_kernel_param(),
3671 (&row_counts).as_kernel_param(),
3672 (&row_agg).as_kernel_param(),
3673 ];
3674 unsafe {
3675 kernel
3676 .clone()
3677 .launch_on_stream(
3678 &cu_stream,
3679 LaunchConfig {
3680 grid_dim: (grid, 1, 1),
3681 block_dim: (BLOCK_SIZE, 1, 1),
3682 shared_mem_bytes: 0,
3683 },
3684 &mut params,
3685 )
3686 .map_err(|e| {
3687 XlogError::Kernel(format!("{ctx}: groupby-agg launch failed: {e}"))
3688 })?;
3689 }
3690 }
3691 rec.commit(runtime)
3692 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
3693
3694 let w_src = match e1.column(0).expect("e1.col0") {
3698 CudaColumn::Owned(slice) => slice,
3699 _ => {
3700 return Err(XlogError::Kernel(format!(
3701 "{ctx}: e1.col0 must be an owned CudaColumn"
3702 )))
3703 }
3704 };
3705 let w_copy = self
3706 .memory()
3707 .alloc::<u8>(n_e1 as usize * std::mem::size_of::<u32>())?;
3708 unsafe {
3712 let res = sys::cuMemcpyDtoD_v2(
3713 *w_copy.device_ptr(),
3714 *w_src.device_ptr(),
3715 n_e1 as usize * std::mem::size_of::<u32>(),
3716 );
3717 if res != sys::cudaError_enum::CUDA_SUCCESS {
3718 return Err(XlogError::Kernel(format!(
3719 "{ctx}: copy W column failed: {res:?}"
3720 )));
3721 }
3722 }
3723 let mut d_num_rows = self.memory().alloc::<u32>(1)?;
3724 self.device()
3725 .inner()
3726 .dtod_copy(e1.num_rows_device(), &mut d_num_rows)
3727 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy row count failed: {e}")))?;
3728 let staging_schema = Schema::new(vec![
3729 ("w".to_string(), w_type),
3730 ("count".to_string(), ScalarType::U32),
3731 ("agg".to_string(), agg_scalar),
3732 ]);
3733 let staging = CudaBuffer::from_columns_with_host_count(
3734 vec![w_copy.into(), row_counts.into(), row_agg.into()],
3735 n_e1 as u64,
3736 d_num_rows,
3737 staging_schema,
3738 n_e1,
3739 );
3740
3741 let mask = self.compare_const_mask_recorded::<u32>(
3744 &staging,
3745 1,
3746 0u32,
3747 crate::CompareOp::Gt,
3748 launch_stream,
3749 )?;
3750 let compacted =
3751 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
3752 self.groupby_multi_agg_recorded(&compacted, &[0], &[(2, agg_op)], launch_stream)
3753 }
3754
3755 pub fn wcoj_4cycle_groupby_root_count_u64_recorded(
3779 &self,
3780 e1: &CudaBuffer,
3781 e2: &CudaBuffer,
3782 e3: &CudaBuffer,
3783 e4: &CudaBuffer,
3784 block_work_unit: u32,
3785 launch_stream: StreamId,
3786 ) -> Result<CudaBuffer> {
3787 let ctx = "wcoj_4cycle_groupby_root_count_u64_recorded";
3788 let e1 = &self.wcoj_layout_u64_recorded(e1, launch_stream)?;
3794 let e2 = &self.wcoj_layout_u64_recorded(e2, launch_stream)?;
3795 let e3 = &self.wcoj_layout_u64_recorded(e3, launch_stream)?;
3796 let e4 = &self.wcoj_layout_u64_recorded(e4, launch_stream)?;
3797 validate_binary_u64(ctx, "e1", e1)?;
3798 validate_binary_u64(ctx, "e2", e2)?;
3799 validate_binary_u64(ctx, "e3", e3)?;
3800 validate_binary_u64(ctx, "e4", e4)?;
3801 let plan = self.wcoj_4cycle_hg_work_plan_u64_recorded(
3802 e1,
3803 e2,
3804 e3,
3805 e4,
3806 block_work_unit,
3807 launch_stream,
3808 )?;
3809 let n_e1 = plan.row_count;
3810 let out_schema = Schema::new(vec![
3811 ("w".to_string(), ScalarType::U64),
3812 ("count".to_string(), ScalarType::U64),
3813 ]);
3814 if n_e1 == 0 || plan.total_work == 0 {
3815 return self.create_empty_buffer(out_schema);
3816 }
3817
3818 let runtime = self.memory().runtime().ok_or_else(|| {
3819 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
3820 })?;
3821 let cu_stream = runtime
3822 .stream_pool()
3823 .resolve(launch_stream)
3824 .ok_or_else(|| {
3825 XlogError::Kernel(format!(
3826 "{ctx}: launch_stream StreamId({}) does not resolve",
3827 launch_stream.0
3828 ))
3829 })?;
3830
3831 let e1_col0 = metadata_column_u64(e1, 0)?;
3832 let e1_col1 = metadata_column_u64(e1, 1)?;
3833 let e2_col1 = metadata_column_u64(e2, 1)?;
3834 let e3_col0 = metadata_column_u64(e3, 0)?;
3835 let e3_col1 = metadata_column_u64(e3, 1)?;
3836 let e4_col0 = metadata_column_u64(e4, 0)?;
3837 let e4_col1 = metadata_column_u64(e4, 1)?;
3838 let n_e3 = self.metadata_logical_rows(e3)?;
3839 let n_e4 = self.metadata_logical_rows(e4)?;
3840
3841 let mut row_counts = self.memory().alloc::<u32>(n_e1 as usize)?;
3843 self.device()
3844 .inner()
3845 .memset_zeros(&mut row_counts)
3846 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero row counts failed: {e}")))?;
3847
3848 let grid = plan.total_work.div_ceil(plan.block_work_unit);
3849 let mut rec = LaunchRecorder::new_strict(launch_stream);
3850 rec.read(e1.num_rows_device());
3851 rec.read(e2.num_rows_device());
3852 rec.read(e3.num_rows_device());
3853 rec.read(e4.num_rows_device());
3854 rec.read_column(e1.column(0).expect("e1.col0"));
3855 rec.read_column(e1.column(1).expect("e1.col1"));
3856 rec.read_column(e2.column(1).expect("e2.col1"));
3857 rec.read_column(e3.column(0).expect("e3.col0"));
3858 rec.read_column(e3.column(1).expect("e3.col1"));
3859 rec.read_column(e4.column(0).expect("e4.col0"));
3860 rec.read_column(e4.column(1).expect("e4.col1"));
3861 rec.read(&plan.e1_work_prefix);
3862 rec.read(&plan.e2_work_prefix);
3863 rec.read(&plan.e1_e2_start);
3864 rec.read(&plan.e1_e2_end);
3865 rec.write(&row_counts);
3866 rec.preflight(runtime)
3867 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
3868 {
3869 let kernel = self
3870 .device()
3871 .inner()
3872 .get_func(
3873 WCOJ_MODULE,
3874 wcoj_kernels::WCOJ_4CYCLE_GROUPBY_ROOT_COUNT_HG_U64,
3875 )
3876 .ok_or_else(|| {
3877 XlogError::Kernel(
3878 "wcoj_4cycle_groupby_root_count_hg_u64 kernel not found".to_string(),
3879 )
3880 })?;
3881 let mut params: Vec<*mut c_void> = vec![
3882 e1_col0.as_kernel_param(),
3883 e1_col1.as_kernel_param(),
3884 plan.row_count.as_kernel_param(),
3885 e2_col1.as_kernel_param(),
3886 e3_col0.as_kernel_param(),
3887 e3_col1.as_kernel_param(),
3888 n_e3.as_kernel_param(),
3889 e4_col0.as_kernel_param(),
3890 e4_col1.as_kernel_param(),
3891 n_e4.as_kernel_param(),
3892 (&plan.e1_work_prefix).as_kernel_param(),
3893 (&plan.e2_work_prefix).as_kernel_param(),
3894 (&plan.e1_e2_start).as_kernel_param(),
3895 (&plan.e1_e2_end).as_kernel_param(),
3896 plan.total_work.as_kernel_param(),
3897 plan.block_work_unit.as_kernel_param(),
3898 (&row_counts).as_kernel_param(),
3899 ];
3900 unsafe {
3901 kernel
3902 .clone()
3903 .launch_on_stream(
3904 &cu_stream,
3905 LaunchConfig {
3906 grid_dim: (grid, 1, 1),
3907 block_dim: (BLOCK_SIZE, 1, 1),
3908 shared_mem_bytes: 0,
3909 },
3910 &mut params,
3911 )
3912 .map_err(|e| {
3913 XlogError::Kernel(format!("{ctx}: groupby-count launch failed: {e}"))
3914 })?;
3915 }
3916 }
3917 rec.commit(runtime)
3918 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
3919
3920 let meta = self.wcoj_build_metadata_u64_recorded(e1, 0, launch_stream)?;
3924 let key_count = meta.key_count;
3925 if key_count == 0 {
3926 return self.create_empty_buffer(out_schema);
3927 }
3928 let mut sums = self
3929 .memory()
3930 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
3931 self.device()
3932 .inner()
3933 .memset_zeros(&mut sums)
3934 .map_err(|e| XlogError::Kernel(format!("{ctx}: zero group sums failed: {e}")))?;
3935
3936 let mut rec_sum = LaunchRecorder::new_strict(launch_stream);
3937 rec_sum.read(&row_counts);
3938 rec_sum.read(&meta.prefix_sum);
3939 rec_sum.write(&sums);
3940 rec_sum
3941 .preflight(runtime)
3942 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce preflight failed: {e}")))?;
3943 {
3944 let kernel = self
3945 .device()
3946 .inner()
3947 .get_func(
3948 WCOJ_MODULE,
3949 wcoj_kernels::WCOJ_GROUPBY_ROOT_SEGMENT_SUM_COUNTS_U32,
3950 )
3951 .ok_or_else(|| {
3952 XlogError::Kernel(
3953 "wcoj_groupby_root_segment_sum_counts_u32 kernel not found".to_string(),
3954 )
3955 })?;
3956 let reduce_grid = n_e1.div_ceil(BLOCK_SIZE);
3957 let mut params: Vec<*mut c_void> = vec![
3958 (&row_counts).as_kernel_param(),
3959 n_e1.as_kernel_param(),
3960 (&meta.prefix_sum).as_kernel_param(),
3961 key_count.as_kernel_param(),
3962 (&sums).as_kernel_param(),
3963 ];
3964 unsafe {
3965 kernel
3966 .clone()
3967 .launch_on_stream(
3968 &cu_stream,
3969 LaunchConfig {
3970 grid_dim: (reduce_grid, 1, 1),
3971 block_dim: (BLOCK_SIZE, 1, 1),
3972 shared_mem_bytes: 0,
3973 },
3974 &mut params,
3975 )
3976 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce launch failed: {e}")))?;
3977 }
3978 }
3979 rec_sum
3980 .commit(runtime)
3981 .map_err(|e| XlogError::Kernel(format!("{ctx}: reduce commit failed: {e}")))?;
3982
3983 let w_copy = self
3989 .memory()
3990 .alloc::<u8>(key_count as usize * std::mem::size_of::<u64>())?;
3991 let d_num_rows = self.memory().alloc::<u32>(1)?;
3992 let mut rec_copy = LaunchRecorder::new_strict(launch_stream);
3993 rec_copy.read(&meta.unique_keys);
3994 rec_copy.write(&w_copy);
3995 rec_copy.write(&d_num_rows);
3996 rec_copy
3997 .preflight(runtime)
3998 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy preflight failed: {e}")))?;
3999 unsafe {
4000 let res = sys::cuMemcpyDtoDAsync_v2(
4001 *w_copy.device_ptr(),
4002 *meta.unique_keys.device_ptr(),
4003 key_count as usize * std::mem::size_of::<u64>(),
4004 cu_stream.cu_stream(),
4005 );
4006 if res != sys::cudaError_enum::CUDA_SUCCESS {
4007 return Err(XlogError::Kernel(format!(
4008 "{ctx}: DtoD unique keys copy failed: {res:?}"
4009 )));
4010 }
4011 }
4012 self.htod_launch_metadata_async_copy_one(
4013 &key_count,
4014 &d_num_rows,
4015 &cu_stream,
4016 &format!("{ctx}: d_num_rows"),
4017 )?;
4018 rec_copy
4019 .commit(runtime)
4020 .map_err(|e| XlogError::Kernel(format!("{ctx}: copy commit failed: {e}")))?;
4021 cu_stream
4022 .synchronize()
4023 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
4024 let staging_schema = Schema::new(vec![
4025 ("w".to_string(), ScalarType::U64),
4026 ("count".to_string(), ScalarType::U64),
4027 ]);
4028 let staging = CudaBuffer::from_columns_with_host_count(
4029 vec![w_copy.into(), sums.into()],
4030 u64::from(key_count),
4031 d_num_rows,
4032 staging_schema,
4033 key_count,
4034 );
4035 let mask = self.compare_const_mask_recorded::<u64>(
4036 &staging,
4037 1,
4038 0u64,
4039 crate::CompareOp::Gt,
4040 launch_stream,
4041 )?;
4042 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)
4043 }
4044
4045 pub fn wcoj_4cycle_hg_work_plan_u64_recorded(
4046 &self,
4047 e1: &CudaBuffer,
4048 e2: &CudaBuffer,
4049 e3: &CudaBuffer,
4050 e4: &CudaBuffer,
4051 block_work_unit: u32,
4052 launch_stream: StreamId,
4053 ) -> Result<WcojCycle4HgWorkPlanU64> {
4054 let ctx = "wcoj_4cycle_hg_work_plan_u64_recorded";
4055 if block_work_unit == 0 {
4056 return Err(XlogError::Kernel(format!(
4057 "{ctx}: block_work_unit must be nonzero"
4058 )));
4059 }
4060 validate_binary_u64(ctx, "e1", e1)?;
4061 validate_binary_u64(ctx, "e2", e2)?;
4062 validate_binary_u64(ctx, "e3", e3)?;
4063 validate_binary_u64(ctx, "e4", e4)?;
4064
4065 let n_e1 = self.metadata_logical_rows(e1)?;
4066 let n_e2 = self.metadata_logical_rows(e2)?;
4067 let n_e3 = self.metadata_logical_rows(e3)?;
4068 let prefix_len = n_e1
4069 .checked_add(1)
4070 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: prefix length overflow")))?;
4071 let e2_prefix_len = n_e2
4072 .checked_add(1)
4073 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: e2 prefix length overflow")))?;
4074 let mut e1_work_prefix = self.memory().alloc::<u32>(prefix_len as usize)?;
4075 let mut e2_work_prefix = self.memory().alloc::<u32>(e2_prefix_len as usize)?;
4076 let mut e1_e2_start = self.memory().alloc::<u32>(n_e1 as usize)?;
4077 let mut e1_e2_end = self.memory().alloc::<u32>(n_e1 as usize)?;
4078
4079 if n_e1 == 0 || n_e2 == 0 || n_e3 == 0 || self.metadata_logical_rows(e4)? == 0 {
4080 let block_counts = self.memory().alloc::<u32>(1)?;
4081 let block_offsets = self.memory().alloc::<u32>(1)?;
4082 return Ok(WcojCycle4HgWorkPlanU64 {
4083 e1_work_prefix,
4084 e2_work_prefix,
4085 e1_e2_start,
4086 e1_e2_end,
4087 block_counts,
4088 block_offsets,
4089 total_work: 0,
4090 block_work_unit,
4091 row_count: n_e1,
4092 });
4093 }
4094
4095 let runtime = self.memory().runtime().ok_or_else(|| {
4096 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
4097 })?;
4098 let cu_stream = runtime
4099 .stream_pool()
4100 .resolve(launch_stream)
4101 .ok_or_else(|| {
4102 XlogError::Kernel(format!(
4103 "{ctx}: launch_stream StreamId({}) does not resolve",
4104 launch_stream.0
4105 ))
4106 })?;
4107
4108 let e1_col1 = metadata_column_u64(e1, 1)?;
4109 let e2_col0 = metadata_column_u64(e2, 0)?;
4110 let e2_col1 = metadata_column_u64(e2, 1)?;
4111 let e3_col0 = metadata_column_u64(e3, 0)?;
4112
4113 let mut rec = LaunchRecorder::new_strict(launch_stream);
4114 rec.read(e1.num_rows_device());
4115 rec.read(e2.num_rows_device());
4116 rec.read(e3.num_rows_device());
4117 rec.read_column(e1.column(1).expect("e1.col1"));
4118 rec.read_column(e2.column(0).expect("e2.col0"));
4119 rec.read_column(e2.column(1).expect("e2.col1"));
4120 rec.read_column(e3.column(0).expect("e3.col0"));
4121 rec.read_write(&e2_work_prefix);
4122 rec.write(&e1_work_prefix);
4123 rec.write(&e1_e2_start);
4124 rec.write(&e1_e2_end);
4125 rec.preflight(runtime)
4126 .map_err(|e| XlogError::Kernel(format!("{ctx}: preflight failed: {e}")))?;
4127
4128 let e2_kernel = self
4129 .device()
4130 .inner()
4131 .get_func(
4132 WCOJ_MODULE,
4133 wcoj_kernels::WCOJ_4CYCLE_BUILD_E2_WORK_PREFIX_U64,
4134 )
4135 .ok_or_else(|| {
4136 XlogError::Kernel(
4137 "wcoj_4cycle_build_e2_work_prefix_u64 kernel not found".to_string(),
4138 )
4139 })?;
4140 let e2_grid = n_e2.div_ceil(BLOCK_SIZE);
4141 unsafe {
4142 e2_kernel
4143 .clone()
4144 .launch_on_stream(
4145 &cu_stream,
4146 LaunchConfig {
4147 grid_dim: (e2_grid, 1, 1),
4148 block_dim: (BLOCK_SIZE, 1, 1),
4149 shared_mem_bytes: 0,
4150 },
4151 (e2_col1, n_e2, e3_col0, n_e3, &mut e2_work_prefix),
4152 )
4153 .map_err(|e| {
4154 XlogError::Kernel(format!(
4155 "wcoj_4cycle_build_e2_work_prefix_u64 launch failed: {e}"
4156 ))
4157 })?;
4158 }
4159 self.multiblock_scan_u32_inplace_on_stream(
4160 &mut e2_work_prefix,
4161 e2_prefix_len,
4162 &cu_stream,
4163 launch_stream,
4164 runtime,
4165 )?;
4166
4167 let kernel = self
4168 .device()
4169 .inner()
4170 .get_func(
4171 WCOJ_MODULE,
4172 wcoj_kernels::WCOJ_4CYCLE_BUILD_HG_WORK_PLAN_U64,
4173 )
4174 .ok_or_else(|| {
4175 XlogError::Kernel("wcoj_4cycle_build_hg_work_plan_u64 kernel not found".to_string())
4176 })?;
4177 let grid = n_e1.div_ceil(BLOCK_SIZE);
4178 unsafe {
4179 kernel
4180 .clone()
4181 .launch_on_stream(
4182 &cu_stream,
4183 LaunchConfig {
4184 grid_dim: (grid, 1, 1),
4185 block_dim: (BLOCK_SIZE, 1, 1),
4186 shared_mem_bytes: 0,
4187 },
4188 (
4189 e1_col1,
4190 n_e1,
4191 e2_col0,
4192 n_e2,
4193 &e2_work_prefix,
4194 &mut e1_work_prefix,
4195 &mut e1_e2_start,
4196 &mut e1_e2_end,
4197 ),
4198 )
4199 .map_err(|e| {
4200 XlogError::Kernel(format!(
4201 "wcoj_4cycle_build_hg_work_plan_u64 launch failed: {e}"
4202 ))
4203 })?;
4204 }
4205 self.multiblock_scan_u32_inplace_on_stream(
4206 &mut e1_work_prefix,
4207 prefix_len,
4208 &cu_stream,
4209 launch_stream,
4210 runtime,
4211 )?;
4212 rec.commit(runtime)
4213 .map_err(|e| XlogError::Kernel(format!("{ctx}: commit failed: {e}")))?;
4214 cu_stream
4215 .synchronize()
4216 .map_err(|e| XlogError::Kernel(format!("{ctx}: stream sync failed: {e}")))?;
4217 let total_work = self.dtoh_scalar_untracked::<u32>(&e1_work_prefix, n_e1 as usize)?;
4218 let grid = if total_work == 0 {
4219 1
4220 } else {
4221 total_work.div_ceil(block_work_unit)
4222 };
4223 let block_counts = self.memory().alloc::<u32>(grid as usize)?;
4224 let block_offsets = self.memory().alloc::<u32>(grid as usize)?;
4225
4226 Ok(WcojCycle4HgWorkPlanU64 {
4227 e1_work_prefix,
4228 e2_work_prefix,
4229 e1_e2_start,
4230 e1_e2_end,
4231 block_counts,
4232 block_offsets,
4233 total_work,
4234 block_work_unit,
4235 row_count: n_e1,
4236 })
4237 }
4238
4239 pub fn wcoj_4cycle_hg_u64_recorded(
4240 &self,
4241 e1: &CudaBuffer,
4242 e2: &CudaBuffer,
4243 e3: &CudaBuffer,
4244 e4: &CudaBuffer,
4245 block_work_unit: u32,
4246 launch_stream: StreamId,
4247 ) -> Result<CudaBuffer> {
4248 let ctx = "wcoj_4cycle_hg_u64_recorded";
4249 validate_binary_u64(ctx, "e1", e1)?;
4250 validate_binary_u64(ctx, "e2", e2)?;
4251 validate_binary_u64(ctx, "e3", e3)?;
4252 validate_binary_u64(ctx, "e4", e4)?;
4253 let plan = self.wcoj_4cycle_hg_work_plan_u64_recorded(
4254 e1,
4255 e2,
4256 e3,
4257 e4,
4258 block_work_unit,
4259 launch_stream,
4260 )?;
4261 let out_schema = Schema::new(vec![
4262 ("col0".to_string(), ScalarType::U64),
4263 ("col1".to_string(), ScalarType::U64),
4264 ("col2".to_string(), ScalarType::U64),
4265 ("col3".to_string(), ScalarType::U64),
4266 ]);
4267 if plan.total_work == 0 {
4268 return self.create_empty_buffer(out_schema);
4269 }
4270
4271 let grid = plan.total_work.div_ceil(plan.block_work_unit);
4272 let bytes_count = (grid as usize)
4273 .checked_mul(std::mem::size_of::<u32>())
4274 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: count byte size overflow")))?;
4275 let mut local_counts = None;
4276 let mut local_offsets = None;
4277 if grid > 1024 {
4278 local_counts = Some(self.memory().alloc::<u32>(grid as usize)?);
4279 local_offsets = Some(self.memory().alloc::<u32>(grid as usize)?);
4280 }
4281 let total_rows_device = self.memory().alloc::<u32>(1)?;
4282
4283 let runtime = self.memory().runtime().ok_or_else(|| {
4284 XlogError::Kernel(format!("{ctx} requires a runtime-backed GpuMemoryManager"))
4285 })?;
4286 let cu_stream = runtime
4287 .stream_pool()
4288 .resolve(launch_stream)
4289 .ok_or_else(|| {
4290 XlogError::Kernel(format!(
4291 "{ctx}: launch_stream StreamId({}) does not resolve",
4292 launch_stream.0
4293 ))
4294 })?;
4295
4296 let e1_col0 = metadata_column_u64(e1, 0)?;
4297 let e1_col1 = metadata_column_u64(e1, 1)?;
4298 let e2_col1 = metadata_column_u64(e2, 1)?;
4299 let e3_col0 = metadata_column_u64(e3, 0)?;
4300 let e3_col1 = metadata_column_u64(e3, 1)?;
4301 let e4_col0 = metadata_column_u64(e4, 0)?;
4302 let e4_col1 = metadata_column_u64(e4, 1)?;
4303 let n_e3 = self.metadata_logical_rows(e3)?;
4304 let n_e4 = self.metadata_logical_rows(e4)?;
4305
4306 let count_u32 = if grid <= 1024 {
4307 &plan.block_counts
4308 } else {
4309 local_counts
4310 .as_ref()
4311 .expect("local HG counts allocated when grid exceeds single-block scan")
4312 };
4313 let mut rec_hg = LaunchRecorder::new_strict(launch_stream);
4314 rec_hg.read(e1.num_rows_device());
4315 rec_hg.read(e2.num_rows_device());
4316 rec_hg.read(e3.num_rows_device());
4317 rec_hg.read(e4.num_rows_device());
4318 rec_hg.read_column(e1.column(0).expect("e1.col0"));
4319 rec_hg.read_column(e1.column(1).expect("e1.col1"));
4320 rec_hg.read_column(e2.column(1).expect("e2.col1"));
4321 rec_hg.read_column(e3.column(0).expect("e3.col0"));
4322 rec_hg.read_column(e3.column(1).expect("e3.col1"));
4323 rec_hg.read_column(e4.column(0).expect("e4.col0"));
4324 rec_hg.read_column(e4.column(1).expect("e4.col1"));
4325 rec_hg.read(&plan.e1_work_prefix);
4326 rec_hg.read(&plan.e2_work_prefix);
4327 rec_hg.read(&plan.e1_e2_start);
4328 rec_hg.read(&plan.e1_e2_end);
4329 rec_hg.read_write(count_u32);
4330 if grid <= 1024 {
4331 rec_hg.read_write(&plan.block_offsets);
4332 } else {
4333 rec_hg.read_write(
4334 local_offsets
4335 .as_ref()
4336 .expect("local HG offsets allocated when grid exceeds single-block scan"),
4337 );
4338 }
4339 rec_hg.write(&total_rows_device);
4340 rec_hg
4341 .preflight(runtime)
4342 .map_err(|e| XlogError::Kernel(format!("{ctx}: HG preflight failed: {e}")))?;
4343 {
4344 let kernel = self
4345 .device()
4346 .inner()
4347 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_COUNT_HG_U64)
4348 .ok_or_else(|| {
4349 XlogError::Kernel("wcoj_4cycle_count_hg_u64 kernel not found".to_string())
4350 })?;
4351 let mut params: Vec<*mut c_void> = vec![
4352 e1_col0.as_kernel_param(),
4353 e1_col1.as_kernel_param(),
4354 plan.row_count.as_kernel_param(),
4355 e2_col1.as_kernel_param(),
4356 e3_col0.as_kernel_param(),
4357 e3_col1.as_kernel_param(),
4358 n_e3.as_kernel_param(),
4359 e4_col0.as_kernel_param(),
4360 e4_col1.as_kernel_param(),
4361 n_e4.as_kernel_param(),
4362 (&plan.e1_work_prefix).as_kernel_param(),
4363 (&plan.e2_work_prefix).as_kernel_param(),
4364 (&plan.e1_e2_start).as_kernel_param(),
4365 (&plan.e1_e2_end).as_kernel_param(),
4366 plan.total_work.as_kernel_param(),
4367 plan.block_work_unit.as_kernel_param(),
4368 count_u32.as_kernel_param(),
4369 ];
4370 unsafe {
4371 kernel
4372 .clone()
4373 .launch_on_stream(
4374 &cu_stream,
4375 LaunchConfig {
4376 grid_dim: (grid, 1, 1),
4377 block_dim: (BLOCK_SIZE, 1, 1),
4378 shared_mem_bytes: 0,
4379 },
4380 &mut params,
4381 )
4382 .map_err(|e| XlogError::Kernel(format!("{ctx}: count launch failed: {e}")))?;
4383 }
4384 }
4385 if grid <= 1024 {
4386 let kernel = self
4387 .device()
4388 .inner()
4389 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_SCAN_HG_BLOCK_COUNTS_U32)
4390 .ok_or_else(|| {
4391 XlogError::Kernel("wcoj_scan_hg_block_counts_u32 kernel not found".to_string())
4392 })?;
4393 let mut params: Vec<*mut c_void> = vec![
4394 count_u32.as_kernel_param(),
4395 grid.as_kernel_param(),
4396 (&plan.block_offsets).as_kernel_param(),
4397 (&total_rows_device).as_kernel_param(),
4398 ];
4399 unsafe {
4400 kernel
4401 .clone()
4402 .launch_on_stream(
4403 &cu_stream,
4404 LaunchConfig {
4405 grid_dim: (1, 1, 1),
4406 block_dim: (1024, 1, 1),
4407 shared_mem_bytes: 0,
4408 },
4409 &mut params,
4410 )
4411 .map_err(|e| XlogError::Kernel(format!("{ctx}: scan failed: {e}")))?;
4412 }
4413 } else {
4414 let offsets_mut = local_offsets
4415 .as_mut()
4416 .expect("local HG offsets allocated when grid exceeds single-block scan");
4417 unsafe {
4418 let res = sys::cuMemcpyDtoDAsync_v2(
4419 *offsets_mut.device_ptr(),
4420 *count_u32.device_ptr(),
4421 bytes_count,
4422 cu_stream.cu_stream(),
4423 );
4424 if res != sys::cudaError_enum::CUDA_SUCCESS {
4425 return Err(XlogError::Kernel(format!(
4426 "{ctx}: DtoD count to offsets failed: {res:?}"
4427 )));
4428 }
4429 }
4430 self.multiblock_scan_u32_inplace_on_stream(
4431 offsets_mut,
4432 grid,
4433 &cu_stream,
4434 launch_stream,
4435 runtime,
4436 )?;
4437 let total_kernel = self
4438 .device()
4439 .inner()
4440 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
4441 .ok_or_else(|| {
4442 XlogError::Kernel("wcoj_compute_total kernel not found".to_string())
4443 })?;
4444 let mut params: Vec<*mut c_void> = vec![
4445 count_u32.as_kernel_param(),
4446 (&*offsets_mut).as_kernel_param(),
4447 grid.as_kernel_param(),
4448 (&total_rows_device).as_kernel_param(),
4449 ];
4450 unsafe {
4451 total_kernel
4452 .clone()
4453 .launch_on_stream(
4454 &cu_stream,
4455 LaunchConfig {
4456 grid_dim: (1, 1, 1),
4457 block_dim: (1, 1, 1),
4458 shared_mem_bytes: 0,
4459 },
4460 &mut params,
4461 )
4462 .map_err(|e| XlogError::Kernel(format!("{ctx}: total failed: {e}")))?;
4463 }
4464 }
4465 rec_hg
4466 .commit(runtime)
4467 .map_err(|e| XlogError::Kernel(format!("{ctx}: count commit failed: {e}")))?;
4468 cu_stream
4469 .synchronize()
4470 .map_err(|e| XlogError::Kernel(format!("{ctx}: count stream sync failed: {e}")))?;
4471 let total_rows = self
4472 .dtoh_scalar_untracked::<u32>(&total_rows_device, 0)
4473 .map_err(|e| XlogError::Kernel(format!("{ctx}: read total rows failed: {e}")))?;
4474 if total_rows == 0 {
4475 return self.create_empty_buffer(out_schema);
4476 }
4477
4478 let bytes_per_col = (total_rows as usize)
4479 .checked_mul(std::mem::size_of::<u64>())
4480 .ok_or_else(|| XlogError::Kernel(format!("{ctx}: output byte size overflow")))?;
4481 let mut out_w = self.memory().alloc::<u8>(bytes_per_col)?;
4482 let mut out_x = self.memory().alloc::<u8>(bytes_per_col)?;
4483 let mut out_y = self.memory().alloc::<u8>(bytes_per_col)?;
4484 let mut out_z = self.memory().alloc::<u8>(bytes_per_col)?;
4485 let materialize_offsets = if grid <= 1024 {
4486 &plan.block_offsets
4487 } else {
4488 local_offsets
4489 .as_ref()
4490 .expect("local HG offsets allocated when grid exceeds single-block scan")
4491 };
4492 let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
4493 rec_mat.read(materialize_offsets);
4494 rec_mat.read(e1.num_rows_device());
4495 rec_mat.read(e2.num_rows_device());
4496 rec_mat.read(e3.num_rows_device());
4497 rec_mat.read(e4.num_rows_device());
4498 rec_mat.read_column(e1.column(0).expect("e1.col0"));
4499 rec_mat.read_column(e1.column(1).expect("e1.col1"));
4500 rec_mat.read_column(e2.column(1).expect("e2.col1"));
4501 rec_mat.read_column(e3.column(0).expect("e3.col0"));
4502 rec_mat.read_column(e3.column(1).expect("e3.col1"));
4503 rec_mat.read_column(e4.column(0).expect("e4.col0"));
4504 rec_mat.read_column(e4.column(1).expect("e4.col1"));
4505 rec_mat.read(&plan.e1_work_prefix);
4506 rec_mat.read(&plan.e2_work_prefix);
4507 rec_mat.read(&plan.e1_e2_start);
4508 rec_mat.read(&plan.e1_e2_end);
4509 rec_mat.write(&out_w);
4510 rec_mat.write(&out_x);
4511 rec_mat.write(&out_y);
4512 rec_mat.write(&out_z);
4513 rec_mat
4514 .preflight(runtime)
4515 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize preflight failed: {e}")))?;
4516 {
4517 let kernel = self
4518 .device()
4519 .inner()
4520 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_4CYCLE_MATERIALIZE_HG_U64)
4521 .ok_or_else(|| {
4522 XlogError::Kernel("wcoj_4cycle_materialize_hg_u64 kernel not found".to_string())
4523 })?;
4524 let out_w_u64 = unsafe { reinterpret_u8_as_u64(&mut out_w) };
4525 let out_x_u64 = unsafe { reinterpret_u8_as_u64(&mut out_x) };
4526 let out_y_u64 = unsafe { reinterpret_u8_as_u64(&mut out_y) };
4527 let out_z_u64 = unsafe { reinterpret_u8_as_u64(&mut out_z) };
4528 let mut params: Vec<*mut c_void> = vec![
4529 e1_col0.as_kernel_param(),
4530 e1_col1.as_kernel_param(),
4531 plan.row_count.as_kernel_param(),
4532 e2_col1.as_kernel_param(),
4533 e3_col0.as_kernel_param(),
4534 e3_col1.as_kernel_param(),
4535 n_e3.as_kernel_param(),
4536 e4_col0.as_kernel_param(),
4537 e4_col1.as_kernel_param(),
4538 n_e4.as_kernel_param(),
4539 (&plan.e1_work_prefix).as_kernel_param(),
4540 (&plan.e2_work_prefix).as_kernel_param(),
4541 (&plan.e1_e2_start).as_kernel_param(),
4542 (&plan.e1_e2_end).as_kernel_param(),
4543 plan.total_work.as_kernel_param(),
4544 plan.block_work_unit.as_kernel_param(),
4545 materialize_offsets.as_kernel_param(),
4546 total_rows.as_kernel_param(),
4547 out_w_u64.as_kernel_param(),
4548 out_x_u64.as_kernel_param(),
4549 out_y_u64.as_kernel_param(),
4550 out_z_u64.as_kernel_param(),
4551 ];
4552 unsafe {
4553 kernel
4554 .clone()
4555 .launch_on_stream(
4556 &cu_stream,
4557 LaunchConfig {
4558 grid_dim: (grid, 1, 1),
4559 block_dim: (BLOCK_SIZE, 1, 1),
4560 shared_mem_bytes: 0,
4561 },
4562 &mut params,
4563 )
4564 .map_err(|e| {
4565 XlogError::Kernel(format!("{ctx}: materialize launch failed: {e}"))
4566 })?;
4567 }
4568 }
4569 rec_mat
4570 .commit(runtime)
4571 .map_err(|e| XlogError::Kernel(format!("{ctx}: materialize commit failed: {e}")))?;
4572 cu_stream.synchronize().map_err(|e| {
4573 XlogError::Kernel(format!("{ctx}: materialize stream sync failed: {e}"))
4574 })?;
4575
4576 Ok(CudaBuffer::from_columns_with_host_count(
4577 vec![out_w.into(), out_x.into(), out_y.into(), out_z.into()],
4578 total_rows as u64,
4579 total_rows_device,
4580 out_schema,
4581 total_rows,
4582 ))
4583 }
4584
4585 fn validate_metadata_column(
4586 &self,
4587 input: &CudaBuffer,
4588 key_col_idx: usize,
4589 width: MetadataWidth,
4590 ) -> Result<()> {
4591 if key_col_idx >= input.arity() {
4592 return Err(XlogError::Kernel(format!(
4593 "wcoj_build_metadata: key column {} out of range for arity {}",
4594 key_col_idx,
4595 input.arity()
4596 )));
4597 }
4598 let ty = input.schema().column_type(key_col_idx).ok_or_else(|| {
4599 XlogError::Kernel(format!(
4600 "wcoj_build_metadata: column {} type missing",
4601 key_col_idx
4602 ))
4603 })?;
4604 match (width, ty) {
4605 (MetadataWidth::U32, ScalarType::U32 | ScalarType::Symbol)
4606 | (MetadataWidth::U64, ScalarType::U64) => Ok(()),
4607 (MetadataWidth::U32, other) => Err(XlogError::Kernel(format!(
4608 "wcoj_build_metadata_u32_recorded: column {} must be U32 or Symbol, got {:?}",
4609 key_col_idx, other
4610 ))),
4611 (MetadataWidth::U64, other) => Err(XlogError::Kernel(format!(
4612 "wcoj_build_metadata_u64_recorded: column {} must be U64, got {:?}",
4613 key_col_idx, other
4614 ))),
4615 }
4616 }
4617
4618 fn build_metadata_u32_from_column(
4619 &self,
4620 input: &CudaBuffer,
4621 key_col_idx: usize,
4622 keys: &TrackedCudaSlice<u32>,
4623 launch_stream: StreamId,
4624 ) -> Result<WcojRelationMetadata<u32>> {
4625 let n = self.metadata_logical_rows(input)?;
4626 if n == 0 {
4627 return Ok(WcojRelationMetadata {
4628 unique_keys: self.memory().alloc::<u32>(0)?,
4629 fan_out: self.memory().alloc::<u32>(0)?,
4630 prefix_sum: self.memory().alloc::<u32>(0)?,
4631 per_candidate_root: BTreeMap::new(),
4632 total: 0,
4633 key_count: 0,
4634 row_count: 0,
4635 });
4636 }
4637 let mut boundary_mask = self.memory().alloc::<u32>(n as usize)?;
4638 let mut boundary_prefix = self.memory().alloc::<u32>(n as usize)?;
4639 self.mark_metadata_boundaries_u32(
4640 input,
4641 key_col_idx,
4642 keys,
4643 n,
4644 &mut boundary_mask,
4645 &mut boundary_prefix,
4646 launch_stream,
4647 )?;
4648 let key_count = self.metadata_scanned_count(&boundary_mask, &boundary_prefix, n)?;
4649
4650 let mut unique_keys = self.memory().alloc::<u32>(key_count as usize)?;
4651 let mut fan_out = self.memory().alloc::<u32>(key_count as usize)?;
4652 let mut prefix_sum = self.memory().alloc::<u32>(key_count as usize)?;
4653 self.scatter_metadata_u32(
4654 input,
4655 key_col_idx,
4656 keys,
4657 n,
4658 &boundary_mask,
4659 &boundary_prefix,
4660 &mut unique_keys,
4661 &mut fan_out,
4662 &mut prefix_sum,
4663 launch_stream,
4664 )?;
4665
4666 Ok(WcojRelationMetadata {
4667 unique_keys,
4668 fan_out,
4669 prefix_sum,
4670 per_candidate_root: BTreeMap::new(),
4671 total: u64::from(n),
4672 key_count,
4673 row_count: n,
4674 })
4675 }
4676
4677 fn build_metadata_u64_from_column(
4678 &self,
4679 input: &CudaBuffer,
4680 key_col_idx: usize,
4681 keys: &TrackedCudaSlice<u64>,
4682 launch_stream: StreamId,
4683 ) -> Result<WcojRelationMetadata<u64>> {
4684 let n = self.metadata_logical_rows(input)?;
4685 if n == 0 {
4686 return Ok(WcojRelationMetadata {
4687 unique_keys: self.memory().alloc::<u64>(0)?,
4688 fan_out: self.memory().alloc::<u32>(0)?,
4689 prefix_sum: self.memory().alloc::<u32>(0)?,
4690 per_candidate_root: BTreeMap::new(),
4691 total: 0,
4692 key_count: 0,
4693 row_count: 0,
4694 });
4695 }
4696 let mut boundary_mask = self.memory().alloc::<u32>(n as usize)?;
4697 let mut boundary_prefix = self.memory().alloc::<u32>(n as usize)?;
4698 self.mark_metadata_boundaries_u64(
4699 input,
4700 key_col_idx,
4701 keys,
4702 n,
4703 &mut boundary_mask,
4704 &mut boundary_prefix,
4705 launch_stream,
4706 )?;
4707 let key_count = self.metadata_scanned_count(&boundary_mask, &boundary_prefix, n)?;
4708
4709 let mut unique_keys = self.memory().alloc::<u64>(key_count as usize)?;
4710 let mut fan_out = self.memory().alloc::<u32>(key_count as usize)?;
4711 let mut prefix_sum = self.memory().alloc::<u32>(key_count as usize)?;
4712 self.scatter_metadata_u64(
4713 input,
4714 key_col_idx,
4715 keys,
4716 n,
4717 &boundary_mask,
4718 &boundary_prefix,
4719 &mut unique_keys,
4720 &mut fan_out,
4721 &mut prefix_sum,
4722 launch_stream,
4723 )?;
4724
4725 Ok(WcojRelationMetadata {
4726 unique_keys,
4727 fan_out,
4728 prefix_sum,
4729 per_candidate_root: BTreeMap::new(),
4730 total: u64::from(n),
4731 key_count,
4732 row_count: n,
4733 })
4734 }
4735
4736 #[allow(clippy::too_many_arguments)]
4737 fn mark_metadata_boundaries_u32(
4738 &self,
4739 input: &CudaBuffer,
4740 key_col_idx: usize,
4741 keys: &TrackedCudaSlice<u32>,
4742 n: u32,
4743 boundary_mask: &mut TrackedCudaSlice<u32>,
4744 boundary_prefix: &mut TrackedCudaSlice<u32>,
4745 launch_stream: StreamId,
4746 ) -> Result<()> {
4747 let runtime = self.memory().runtime().ok_or_else(|| {
4748 XlogError::Kernel(
4749 "wcoj_build_metadata_u32_recorded requires a runtime-backed GpuMemoryManager"
4750 .to_string(),
4751 )
4752 })?;
4753 let cu_stream = runtime
4754 .stream_pool()
4755 .resolve(launch_stream)
4756 .ok_or_else(|| {
4757 XlogError::Kernel(format!(
4758 "wcoj_build_metadata_u32_recorded: launch_stream StreamId({}) does not resolve",
4759 launch_stream.0
4760 ))
4761 })?;
4762 let mut rec = LaunchRecorder::new_strict(launch_stream);
4763 rec.read(input.num_rows_device());
4764 rec.read_column(input.column(key_col_idx).expect("metadata key column"));
4765 rec.write(boundary_mask);
4766 rec.write(boundary_prefix);
4767 rec.preflight(runtime).map_err(|e| {
4768 XlogError::Kernel(format!(
4769 "wcoj_build_metadata_u32_recorded: mark preflight failed: {e}"
4770 ))
4771 })?;
4772
4773 let kernel = self
4774 .device()
4775 .inner()
4776 .get_func(
4777 WCOJ_MODULE,
4778 wcoj_kernels::WCOJ_BUILD_METADATA_MARK_BOUNDARIES_U32,
4779 )
4780 .ok_or_else(|| {
4781 XlogError::Kernel(
4782 "wcoj_build_metadata_mark_boundaries_u32 kernel not found".to_string(),
4783 )
4784 })?;
4785 let grid = n.div_ceil(BLOCK_SIZE);
4786 unsafe {
4787 kernel
4788 .clone()
4789 .launch_on_stream(
4790 &cu_stream,
4791 LaunchConfig {
4792 grid_dim: (grid, 1, 1),
4793 block_dim: (BLOCK_SIZE, 1, 1),
4794 shared_mem_bytes: 0,
4795 },
4796 (keys, n, &mut *boundary_mask, &mut *boundary_prefix),
4797 )
4798 .map_err(|e| {
4799 XlogError::Kernel(format!(
4800 "wcoj_build_metadata_mark_boundaries_u32 launch failed: {e}"
4801 ))
4802 })?;
4803 }
4804 self.multiblock_scan_u32_inplace_on_stream(
4805 boundary_prefix,
4806 n,
4807 &cu_stream,
4808 launch_stream,
4809 runtime,
4810 )?;
4811 rec.commit(runtime).map_err(|e| {
4812 XlogError::Kernel(format!(
4813 "wcoj_build_metadata_u32_recorded: mark commit failed: {e}"
4814 ))
4815 })?;
4816 cu_stream.synchronize().map_err(|e| {
4817 XlogError::Kernel(format!(
4818 "wcoj_build_metadata_u32_recorded: mark stream sync failed: {e}"
4819 ))
4820 })?;
4821 Ok(())
4822 }
4823
4824 #[allow(clippy::too_many_arguments)]
4825 fn mark_metadata_boundaries_u64(
4826 &self,
4827 input: &CudaBuffer,
4828 key_col_idx: usize,
4829 keys: &TrackedCudaSlice<u64>,
4830 n: u32,
4831 boundary_mask: &mut TrackedCudaSlice<u32>,
4832 boundary_prefix: &mut TrackedCudaSlice<u32>,
4833 launch_stream: StreamId,
4834 ) -> Result<()> {
4835 let runtime = self.memory().runtime().ok_or_else(|| {
4836 XlogError::Kernel(
4837 "wcoj_build_metadata_u64_recorded requires a runtime-backed GpuMemoryManager"
4838 .to_string(),
4839 )
4840 })?;
4841 let cu_stream = runtime
4842 .stream_pool()
4843 .resolve(launch_stream)
4844 .ok_or_else(|| {
4845 XlogError::Kernel(format!(
4846 "wcoj_build_metadata_u64_recorded: launch_stream StreamId({}) does not resolve",
4847 launch_stream.0
4848 ))
4849 })?;
4850 let mut rec = LaunchRecorder::new_strict(launch_stream);
4851 rec.read(input.num_rows_device());
4852 rec.read_column(input.column(key_col_idx).expect("metadata key column"));
4853 rec.write(boundary_mask);
4854 rec.write(boundary_prefix);
4855 rec.preflight(runtime).map_err(|e| {
4856 XlogError::Kernel(format!(
4857 "wcoj_build_metadata_u64_recorded: mark preflight failed: {e}"
4858 ))
4859 })?;
4860
4861 let kernel = self
4862 .device()
4863 .inner()
4864 .get_func(
4865 WCOJ_MODULE,
4866 wcoj_kernels::WCOJ_BUILD_METADATA_MARK_BOUNDARIES_U64,
4867 )
4868 .ok_or_else(|| {
4869 XlogError::Kernel(
4870 "wcoj_build_metadata_mark_boundaries_u64 kernel not found".to_string(),
4871 )
4872 })?;
4873 let grid = n.div_ceil(BLOCK_SIZE);
4874 unsafe {
4875 kernel
4876 .clone()
4877 .launch_on_stream(
4878 &cu_stream,
4879 LaunchConfig {
4880 grid_dim: (grid, 1, 1),
4881 block_dim: (BLOCK_SIZE, 1, 1),
4882 shared_mem_bytes: 0,
4883 },
4884 (keys, n, &mut *boundary_mask, &mut *boundary_prefix),
4885 )
4886 .map_err(|e| {
4887 XlogError::Kernel(format!(
4888 "wcoj_build_metadata_mark_boundaries_u64 launch failed: {e}"
4889 ))
4890 })?;
4891 }
4892 self.multiblock_scan_u32_inplace_on_stream(
4893 boundary_prefix,
4894 n,
4895 &cu_stream,
4896 launch_stream,
4897 runtime,
4898 )?;
4899 rec.commit(runtime).map_err(|e| {
4900 XlogError::Kernel(format!(
4901 "wcoj_build_metadata_u64_recorded: mark commit failed: {e}"
4902 ))
4903 })?;
4904 cu_stream.synchronize().map_err(|e| {
4905 XlogError::Kernel(format!(
4906 "wcoj_build_metadata_u64_recorded: mark stream sync failed: {e}"
4907 ))
4908 })?;
4909 Ok(())
4910 }
4911
4912 #[allow(clippy::too_many_arguments)]
4913 fn scatter_metadata_u32(
4914 &self,
4915 input: &CudaBuffer,
4916 key_col_idx: usize,
4917 keys: &TrackedCudaSlice<u32>,
4918 n: u32,
4919 boundary_mask: &TrackedCudaSlice<u32>,
4920 boundary_prefix: &TrackedCudaSlice<u32>,
4921 unique_keys: &mut TrackedCudaSlice<u32>,
4922 fan_out: &mut TrackedCudaSlice<u32>,
4923 prefix_sum: &mut TrackedCudaSlice<u32>,
4924 launch_stream: StreamId,
4925 ) -> Result<()> {
4926 let runtime = self.memory().runtime().ok_or_else(|| {
4927 XlogError::Kernel(
4928 "wcoj_build_metadata_u32_recorded requires a runtime-backed GpuMemoryManager"
4929 .to_string(),
4930 )
4931 })?;
4932 let cu_stream = runtime
4933 .stream_pool()
4934 .resolve(launch_stream)
4935 .ok_or_else(|| {
4936 XlogError::Kernel(format!(
4937 "wcoj_build_metadata_u32_recorded: launch_stream StreamId({}) does not resolve",
4938 launch_stream.0
4939 ))
4940 })?;
4941 let mut rec = LaunchRecorder::new_strict(launch_stream);
4942 rec.read(input.num_rows_device());
4943 rec.read_column(input.column(key_col_idx).expect("metadata key column"));
4944 rec.read(boundary_mask);
4945 rec.read(boundary_prefix);
4946 rec.write(unique_keys);
4947 rec.write(fan_out);
4948 rec.write(prefix_sum);
4949 rec.preflight(runtime).map_err(|e| {
4950 XlogError::Kernel(format!(
4951 "wcoj_build_metadata_u32_recorded: scatter preflight failed: {e}"
4952 ))
4953 })?;
4954
4955 let kernel = self
4956 .device()
4957 .inner()
4958 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_BUILD_METADATA_SCATTER_U32)
4959 .ok_or_else(|| {
4960 XlogError::Kernel("wcoj_build_metadata_scatter_u32 kernel not found".to_string())
4961 })?;
4962 let grid = n.div_ceil(BLOCK_SIZE);
4963 unsafe {
4964 kernel
4965 .clone()
4966 .launch_on_stream(
4967 &cu_stream,
4968 LaunchConfig {
4969 grid_dim: (grid, 1, 1),
4970 block_dim: (BLOCK_SIZE, 1, 1),
4971 shared_mem_bytes: 0,
4972 },
4973 (
4974 keys,
4975 n,
4976 boundary_mask,
4977 boundary_prefix,
4978 &mut *unique_keys,
4979 &mut *fan_out,
4980 &mut *prefix_sum,
4981 ),
4982 )
4983 .map_err(|e| {
4984 XlogError::Kernel(format!(
4985 "wcoj_build_metadata_scatter_u32 launch failed: {e}"
4986 ))
4987 })?;
4988 }
4989 rec.commit(runtime).map_err(|e| {
4990 XlogError::Kernel(format!(
4991 "wcoj_build_metadata_u32_recorded: scatter commit failed: {e}"
4992 ))
4993 })?;
4994 cu_stream.synchronize().map_err(|e| {
4995 XlogError::Kernel(format!(
4996 "wcoj_build_metadata_u32_recorded: scatter stream sync failed: {e}"
4997 ))
4998 })?;
4999 Ok(())
5000 }
5001
5002 #[allow(clippy::too_many_arguments)]
5003 fn scatter_metadata_u64(
5004 &self,
5005 input: &CudaBuffer,
5006 key_col_idx: usize,
5007 keys: &TrackedCudaSlice<u64>,
5008 n: u32,
5009 boundary_mask: &TrackedCudaSlice<u32>,
5010 boundary_prefix: &TrackedCudaSlice<u32>,
5011 unique_keys: &mut TrackedCudaSlice<u64>,
5012 fan_out: &mut TrackedCudaSlice<u32>,
5013 prefix_sum: &mut TrackedCudaSlice<u32>,
5014 launch_stream: StreamId,
5015 ) -> Result<()> {
5016 let runtime = self.memory().runtime().ok_or_else(|| {
5017 XlogError::Kernel(
5018 "wcoj_build_metadata_u64_recorded requires a runtime-backed GpuMemoryManager"
5019 .to_string(),
5020 )
5021 })?;
5022 let cu_stream = runtime
5023 .stream_pool()
5024 .resolve(launch_stream)
5025 .ok_or_else(|| {
5026 XlogError::Kernel(format!(
5027 "wcoj_build_metadata_u64_recorded: launch_stream StreamId({}) does not resolve",
5028 launch_stream.0
5029 ))
5030 })?;
5031 let mut rec = LaunchRecorder::new_strict(launch_stream);
5032 rec.read(input.num_rows_device());
5033 rec.read_column(input.column(key_col_idx).expect("metadata key column"));
5034 rec.read(boundary_mask);
5035 rec.read(boundary_prefix);
5036 rec.write(unique_keys);
5037 rec.write(fan_out);
5038 rec.write(prefix_sum);
5039 rec.preflight(runtime).map_err(|e| {
5040 XlogError::Kernel(format!(
5041 "wcoj_build_metadata_u64_recorded: scatter preflight failed: {e}"
5042 ))
5043 })?;
5044
5045 let kernel = self
5046 .device()
5047 .inner()
5048 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_BUILD_METADATA_SCATTER_U64)
5049 .ok_or_else(|| {
5050 XlogError::Kernel("wcoj_build_metadata_scatter_u64 kernel not found".to_string())
5051 })?;
5052 let grid = n.div_ceil(BLOCK_SIZE);
5053 unsafe {
5054 kernel
5055 .clone()
5056 .launch_on_stream(
5057 &cu_stream,
5058 LaunchConfig {
5059 grid_dim: (grid, 1, 1),
5060 block_dim: (BLOCK_SIZE, 1, 1),
5061 shared_mem_bytes: 0,
5062 },
5063 (
5064 keys,
5065 n,
5066 boundary_mask,
5067 boundary_prefix,
5068 &mut *unique_keys,
5069 &mut *fan_out,
5070 &mut *prefix_sum,
5071 ),
5072 )
5073 .map_err(|e| {
5074 XlogError::Kernel(format!(
5075 "wcoj_build_metadata_scatter_u64 launch failed: {e}"
5076 ))
5077 })?;
5078 }
5079 rec.commit(runtime).map_err(|e| {
5080 XlogError::Kernel(format!(
5081 "wcoj_build_metadata_u64_recorded: scatter commit failed: {e}"
5082 ))
5083 })?;
5084 cu_stream.synchronize().map_err(|e| {
5085 XlogError::Kernel(format!(
5086 "wcoj_build_metadata_u64_recorded: scatter stream sync failed: {e}"
5087 ))
5088 })?;
5089 Ok(())
5090 }
5091
5092 fn metadata_scanned_count(
5093 &self,
5094 boundary_mask: &TrackedCudaSlice<u32>,
5095 boundary_prefix: &TrackedCudaSlice<u32>,
5096 n: u32,
5097 ) -> Result<u32> {
5098 let last = n - 1;
5099 let prefix_last = self.dtoh_scalar_untracked::<u32>(boundary_prefix, last as usize)?;
5100 let mask_last = self.dtoh_scalar_untracked::<u32>(boundary_mask, last as usize)?;
5101 Ok(prefix_last + mask_last)
5102 }
5103
5104 fn metadata_logical_rows(&self, input: &CudaBuffer) -> Result<u32> {
5105 if let Some(cached) = input.cached_row_count() {
5106 return Ok(cached);
5107 }
5108 self.dtoh_scalar_untracked::<u32>(input.num_rows_device(), 0)
5109 }
5110}
5111
5112#[derive(Clone, Copy)]
5113enum MetadataWidth {
5114 U32,
5115 U64,
5116}
5117
5118fn metadata_column_u32(input: &CudaBuffer, key_col_idx: usize) -> Result<&TrackedCudaSlice<u32>> {
5119 let col = input.column(key_col_idx).ok_or_else(|| {
5120 XlogError::Kernel(format!(
5121 "wcoj_build_metadata_u32_recorded: column {} not found",
5122 key_col_idx
5123 ))
5124 })?;
5125 match col {
5126 CudaColumn::Owned(slice) => unsafe {
5127 Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u32>))
5128 },
5129 _ => Err(XlogError::Kernel(
5130 "wcoj_build_metadata_u32_recorded: key column must be an owned CudaColumn".to_string(),
5131 )),
5132 }
5133}
5134
5135fn metadata_column_u64(input: &CudaBuffer, key_col_idx: usize) -> Result<&TrackedCudaSlice<u64>> {
5136 let col = input.column(key_col_idx).ok_or_else(|| {
5137 XlogError::Kernel(format!(
5138 "wcoj_build_metadata_u64_recorded: column {} not found",
5139 key_col_idx
5140 ))
5141 })?;
5142 match col {
5143 CudaColumn::Owned(slice) => unsafe {
5144 Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u64>))
5145 },
5146 _ => Err(XlogError::Kernel(
5147 "wcoj_build_metadata_u64_recorded: key column must be an owned CudaColumn".to_string(),
5148 )),
5149 }
5150}
5151
5152fn validate_binary_u32(ctx: &str, label: &str, input: &CudaBuffer) -> Result<()> {
5153 if input.arity() != 2 {
5154 return Err(XlogError::Kernel(format!(
5155 "{ctx}: {label} must be 2-column, got arity {}",
5156 input.arity()
5157 )));
5158 }
5159 for col_idx in 0..2 {
5160 let ty = input.schema().column_type(col_idx).ok_or_else(|| {
5161 XlogError::Kernel(format!("{ctx}: {label}.col{col_idx} type missing"))
5162 })?;
5163 if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
5164 return Err(XlogError::Kernel(format!(
5165 "{ctx}: {label}.col{col_idx} must be U32 or Symbol, got {:?}",
5166 ty
5167 )));
5168 }
5169 }
5170 Ok(())
5171}
5172
5173fn validate_binary_u64(ctx: &str, label: &str, input: &CudaBuffer) -> Result<()> {
5174 if input.arity() != 2 {
5175 return Err(XlogError::Kernel(format!(
5176 "{ctx}: {label} must be 2-column, got arity {}",
5177 input.arity()
5178 )));
5179 }
5180 for col_idx in 0..2 {
5181 let ty = input.schema().column_type(col_idx).ok_or_else(|| {
5182 XlogError::Kernel(format!("{ctx}: {label}.col{col_idx} type missing"))
5183 })?;
5184 if !matches!(ty, ScalarType::U64) {
5185 return Err(XlogError::Kernel(format!(
5186 "{ctx}: {label}.col{col_idx} must be U64, got {:?}",
5187 ty
5188 )));
5189 }
5190 }
5191 Ok(())
5192}
5193
5194unsafe fn reinterpret_u8_as_u32(slice: &mut TrackedCudaSlice<u8>) -> &mut TrackedCudaSlice<u32> {
5195 &mut *(slice as *mut TrackedCudaSlice<u8> as *mut TrackedCudaSlice<u32>)
5196}
5197
5198unsafe fn reinterpret_u8_as_u64(slice: &mut TrackedCudaSlice<u8>) -> &mut TrackedCudaSlice<u64> {
5199 &mut *(slice as *mut TrackedCudaSlice<u8> as *mut TrackedCudaSlice<u64>)
5200}