1use std::ffi::c_void;
51use std::time::Instant;
52
53use cudarc::driver::sys;
54use xlog_core::{Result, ScalarType, Schema, XlogError};
55
56use super::{wcoj_kernels, CudaKernelProvider, WCOJ_MODULE};
57use crate::device_runtime::StreamId;
58use crate::launch::LaunchRecorder;
59use crate::memory::{CudaColumn, TrackedCudaSlice};
60use crate::wcoj_metadata::WcojRelationMetadata;
61use crate::CudaBuffer;
62use crate::{AsKernelParam, LaunchAsync, LaunchConfig};
63
64const BLOCK_SIZE: u32 = 256;
65
66fn column_u32(input: &CudaBuffer, col_idx: usize) -> Result<&TrackedCudaSlice<u32>> {
67 let col = input.column(col_idx).ok_or_else(|| {
68 XlogError::Kernel(format!(
69 "wcoj_layout_u32_recorded: column {col_idx} not found"
70 ))
71 })?;
72 match col {
73 CudaColumn::Owned(slice) => unsafe {
74 Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u32>))
75 },
76 _ => Err(XlogError::Kernel(
77 "wcoj_layout_u32_recorded: input column must be owned".to_string(),
78 )),
79 }
80}
81
82fn column_u64(input: &CudaBuffer, col_idx: usize) -> Result<&TrackedCudaSlice<u64>> {
83 let col = input.column(col_idx).ok_or_else(|| {
84 XlogError::Kernel(format!(
85 "wcoj_layout_u64_recorded: column {col_idx} not found"
86 ))
87 })?;
88 match col {
89 CudaColumn::Owned(slice) => unsafe {
90 Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u64>))
91 },
92 _ => Err(XlogError::Kernel(
93 "wcoj_layout_u64_recorded: input column must be owned".to_string(),
94 )),
95 }
96}
97
98impl CudaKernelProvider {
99 pub fn wcoj_layout_u32_recorded(
139 &self,
140 input: &CudaBuffer,
141 launch_stream: StreamId,
142 ) -> Result<CudaBuffer> {
143 if self.memory().runtime().is_none() {
147 return Err(XlogError::Kernel(
148 "wcoj_layout_u32_recorded requires a runtime-backed \
149 GpuMemoryManager (constructed via with_runtime)"
150 .to_string(),
151 ));
152 }
153 if input.arity() != 2 {
158 return Err(XlogError::Kernel(format!(
159 "wcoj_layout_u32_recorded: input must be 2-column, got arity {}",
160 input.arity()
161 )));
162 }
163 for col_idx in 0..2 {
164 let ty = input.schema.column_type(col_idx).ok_or_else(|| {
165 XlogError::Kernel(format!(
166 "wcoj_layout_u32_recorded: column {} type missing",
167 col_idx
168 ))
169 })?;
170 if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
171 return Err(XlogError::Kernel(format!(
172 "wcoj_layout_u32_recorded: column {} must be U32 or Symbol, got {:?}",
173 col_idx, ty
174 )));
175 }
176 }
177 match self.try_wcoj_layout_fast_path_u32(input, launch_stream) {
186 Ok(Some(out)) => {
187 self.record_wcoj_layout_fast_path_hit();
188 return Ok(out);
189 }
190 Ok(None) => {
191 }
195 Err(_) => {
196 }
199 }
200 self.dedup_full_row_recorded(input, launch_stream)
206 }
207
208 pub fn wcoj_layout_sort_u32_recorded(
251 &self,
252 input: &CudaBuffer,
253 launch_stream: StreamId,
254 ) -> Result<CudaBuffer> {
255 self.record_wcoj_layout_sort_invocation();
256 if self.memory().runtime().is_none() {
257 return Err(XlogError::Kernel(
258 "wcoj_layout_sort_u32_recorded requires a runtime-backed \
259 GpuMemoryManager (constructed via with_runtime)"
260 .to_string(),
261 ));
262 }
263 if input.arity() < 2 {
264 return Err(XlogError::Kernel(format!(
265 "wcoj_layout_sort_u32_recorded: input must have arity >= 2, got {}",
266 input.arity()
267 )));
268 }
269 for col_idx in 0..input.arity() {
270 let ty = input.schema.column_type(col_idx).ok_or_else(|| {
271 XlogError::Kernel(format!(
272 "wcoj_layout_sort_u32_recorded: column {} type missing",
273 col_idx
274 ))
275 })?;
276 if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
277 return Err(XlogError::Kernel(format!(
278 "wcoj_layout_sort_u32_recorded: column {} must be U32 or Symbol \
279 (4-byte width-class), got {:?}",
280 col_idx, ty
281 )));
282 }
283 }
284 self.dedup_full_row_recorded(input, launch_stream)
285 }
286
287 pub fn wcoj_triangle_u32_recorded(
311 &self,
312 e_xy: &CudaBuffer,
313 e_yz: &CudaBuffer,
314 e_xz: &CudaBuffer,
315 launch_stream: StreamId,
316 ) -> Result<CudaBuffer> {
317 self.wcoj_triangle_hg_u32_recorded(
318 e_xy,
319 e_yz,
320 e_xz,
321 crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
322 launch_stream,
323 )
324 }
325
326 pub fn wcoj_4cycle_u32_recorded(
354 &self,
355 e1: &CudaBuffer,
356 e2: &CudaBuffer,
357 e3: &CudaBuffer,
358 e4: &CudaBuffer,
359 launch_stream: StreamId,
360 ) -> Result<CudaBuffer> {
361 self.wcoj_4cycle_hg_u32_recorded(
362 e1,
363 e2,
364 e3,
365 e4,
366 crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
367 launch_stream,
368 )
369 }
370
371 fn logical_row_count_u32(&self, buf: &CudaBuffer) -> Result<u32> {
372 if let Some(c) = buf.cached_row_count() {
373 return Ok(c);
374 }
375 self.dtoh_scalar_untracked::<u32>(buf.num_rows_device(), 0)
376 }
377
378 pub fn wcoj_layout_u64_recorded(
395 &self,
396 input: &CudaBuffer,
397 launch_stream: StreamId,
398 ) -> Result<CudaBuffer> {
399 if self.memory().runtime().is_none() {
400 return Err(XlogError::Kernel(
401 "wcoj_layout_u64_recorded requires a runtime-backed \
402 GpuMemoryManager (constructed via with_runtime)"
403 .to_string(),
404 ));
405 }
406 if input.arity() != 2 {
407 return Err(XlogError::Kernel(format!(
408 "wcoj_layout_u64_recorded: input must be 2-column, got arity {}",
409 input.arity()
410 )));
411 }
412 for col_idx in 0..2 {
413 let ty = input.schema.column_type(col_idx).ok_or_else(|| {
414 XlogError::Kernel(format!(
415 "wcoj_layout_u64_recorded: column {} type missing",
416 col_idx
417 ))
418 })?;
419 if !matches!(ty, ScalarType::U64) {
420 return Err(XlogError::Kernel(format!(
421 "wcoj_layout_u64_recorded: column {} must be U64, got {:?}",
422 col_idx, ty
423 )));
424 }
425 }
426 if let Ok(Some(out)) = self.try_wcoj_layout_fast_path_u64(input, launch_stream) {
430 self.record_wcoj_layout_fast_path_hit();
431 return Ok(out);
432 }
433 self.dedup_full_row_recorded(input, launch_stream)
437 }
438
439 pub fn wcoj_layout_sort_u64_recorded(
471 &self,
472 input: &CudaBuffer,
473 launch_stream: StreamId,
474 ) -> Result<CudaBuffer> {
475 self.record_wcoj_layout_sort_invocation();
476 if self.memory().runtime().is_none() {
477 return Err(XlogError::Kernel(
478 "wcoj_layout_sort_u64_recorded requires a runtime-backed \
479 GpuMemoryManager (constructed via with_runtime)"
480 .to_string(),
481 ));
482 }
483 if input.arity() < 2 {
484 return Err(XlogError::Kernel(format!(
485 "wcoj_layout_sort_u64_recorded: input must have arity >= 2, got {}",
486 input.arity()
487 )));
488 }
489 for col_idx in 0..input.arity() {
490 let ty = input.schema.column_type(col_idx).ok_or_else(|| {
491 XlogError::Kernel(format!(
492 "wcoj_layout_sort_u64_recorded: column {} type missing",
493 col_idx
494 ))
495 })?;
496 if !matches!(ty, ScalarType::U64) {
497 return Err(XlogError::Kernel(format!(
498 "wcoj_layout_sort_u64_recorded: column {} must be U64 \
499 (8-byte width-class), got {:?}",
500 col_idx, ty
501 )));
502 }
503 }
504 self.dedup_full_row_recorded(input, launch_stream)
505 }
506
507 pub fn wcoj_triangle_u64_recorded(
519 &self,
520 e_xy: &CudaBuffer,
521 e_yz: &CudaBuffer,
522 e_xz: &CudaBuffer,
523 launch_stream: StreamId,
524 ) -> Result<CudaBuffer> {
525 self.wcoj_triangle_hg_u64_recorded(
526 e_xy,
527 e_yz,
528 e_xz,
529 crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
530 launch_stream,
531 )
532 }
533
534 pub fn wcoj_4cycle_u64_recorded(
551 &self,
552 e1: &CudaBuffer,
553 e2: &CudaBuffer,
554 e3: &CudaBuffer,
555 e4: &CudaBuffer,
556 launch_stream: StreamId,
557 ) -> Result<CudaBuffer> {
558 self.wcoj_4cycle_hg_u64_recorded(
559 e1,
560 e2,
561 e3,
562 e4,
563 crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
564 launch_stream,
565 )
566 }
567}
568
569impl CudaKernelProvider {
599 fn try_wcoj_layout_fast_path_u32(
608 &self,
609 input: &CudaBuffer,
610 launch_stream: StreamId,
611 ) -> Result<Option<CudaBuffer>> {
612 let runtime = self
613 .memory()
614 .runtime()
615 .ok_or_else(|| XlogError::Kernel("wcoj_layout fast-path: no runtime".to_string()))?;
616 let cu_stream = runtime
617 .stream_pool()
618 .resolve(launch_stream)
619 .ok_or_else(|| {
620 XlogError::Kernel("wcoj_layout fast-path: stream resolve".to_string())
621 })?;
622
623 let n = self.logical_row_count_u32(input)?;
626 if n == 0 {
627 return Ok(None);
631 }
632 if n == 1 {
633 return Ok(Some(self.recorded_clone_2col_4byte(
635 input,
636 n,
637 launch_stream,
638 &cu_stream,
639 runtime,
640 )?));
641 }
642
643 let mut flag_buf = self.memory.alloc::<u32>(1)?;
645
646 let col0 = column_u32(input, 0)?;
647 let col1 = column_u32(input, 1)?;
648
649 let device = self.device.inner();
653 let kernel = device
654 .get_func(
655 WCOJ_MODULE,
656 wcoj_kernels::WCOJ_LAYOUT_CHECK_SORTED_UNIQUE_U32,
657 )
658 .ok_or_else(|| {
659 XlogError::Kernel("wcoj_layout_check_sorted_unique_u32 kernel not found".into())
660 })?;
661
662 let mut rec = LaunchRecorder::new_strict(launch_stream);
663 rec.read(input.num_rows_device());
664 rec.read_column(input.column(0).expect("col0"));
665 rec.read_column(input.column(1).expect("col1"));
666 rec.write(&flag_buf);
667 rec.preflight(runtime)
668 .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path: preflight {e}")))?;
669
670 let one: u32 = 1;
676 let grid = n.div_ceil(BLOCK_SIZE);
677 let queued_result: Result<()> = (|| {
678 self.htod_launch_metadata_async_copy_one(
679 &one,
680 &flag_buf,
681 &cu_stream,
682 "wcoj_layout fast-path flag init",
683 )?;
684
685 unsafe {
689 kernel
690 .clone()
691 .launch_on_stream(
692 &cu_stream,
693 LaunchConfig {
694 grid_dim: (grid, 1, 1),
695 block_dim: (BLOCK_SIZE, 1, 1),
696 shared_mem_bytes: 0,
697 },
698 (col0, col1, n, &mut flag_buf),
699 )
700 .map_err(|e| {
701 XlogError::Kernel(format!(
702 "wcoj_layout_check_sorted_unique_u32 launch: {e}"
703 ))
704 })?;
705 }
706 Ok(())
707 })();
708 if let Err(e) = queued_result {
709 let _ = cu_stream.synchronize();
710 return Err(e);
711 }
712
713 if let Err(e) = rec.commit(runtime) {
714 let _ = cu_stream.synchronize();
715 return Err(XlogError::Kernel(format!(
716 "wcoj_layout fast-path: commit {e}"
717 )));
718 }
719
720 cu_stream
721 .synchronize()
722 .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path: sync {e}")))?;
723
724 let flag_val = self.dtoh_scalar_untracked::<u32>(&flag_buf, 0)?;
725 if flag_val == 1 {
726 Ok(Some(self.recorded_clone_2col_4byte(
727 input,
728 n,
729 launch_stream,
730 &cu_stream,
731 runtime,
732 )?))
733 } else {
734 Ok(None)
735 }
736 }
737
738 fn try_wcoj_layout_fast_path_u64(
740 &self,
741 input: &CudaBuffer,
742 launch_stream: StreamId,
743 ) -> Result<Option<CudaBuffer>> {
744 let runtime = self
745 .memory()
746 .runtime()
747 .ok_or_else(|| XlogError::Kernel("wcoj_layout fast-path: no runtime".to_string()))?;
748 let cu_stream = runtime
749 .stream_pool()
750 .resolve(launch_stream)
751 .ok_or_else(|| {
752 XlogError::Kernel("wcoj_layout fast-path: stream resolve".to_string())
753 })?;
754
755 let n = self.logical_row_count_u32(input)?;
756 if n == 0 {
757 return Ok(None);
758 }
759 if n == 1 {
760 return Ok(Some(self.recorded_clone_2col_8byte(
761 input,
762 n,
763 launch_stream,
764 &cu_stream,
765 runtime,
766 )?));
767 }
768
769 let mut flag_buf = self.memory.alloc::<u32>(1)?;
770 let col0 = column_u64(input, 0)?;
771 let col1 = column_u64(input, 1)?;
772
773 let device = self.device.inner();
774 let kernel = device
775 .get_func(
776 WCOJ_MODULE,
777 wcoj_kernels::WCOJ_LAYOUT_CHECK_SORTED_UNIQUE_U64,
778 )
779 .ok_or_else(|| {
780 XlogError::Kernel("wcoj_layout_check_sorted_unique_u64 kernel not found".into())
781 })?;
782
783 let mut rec = LaunchRecorder::new_strict(launch_stream);
784 rec.read(input.num_rows_device());
785 rec.read_column(input.column(0).expect("col0"));
786 rec.read_column(input.column(1).expect("col1"));
787 rec.write(&flag_buf);
788 rec.preflight(runtime)
789 .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path u64: preflight {e}")))?;
790
791 let one: u32 = 1;
792 let grid = n.div_ceil(BLOCK_SIZE);
793 let queued_result: Result<()> = (|| {
794 self.htod_launch_metadata_async_copy_one(
795 &one,
796 &flag_buf,
797 &cu_stream,
798 "wcoj_layout fast-path u64 flag init",
799 )?;
800
801 unsafe {
802 kernel
803 .clone()
804 .launch_on_stream(
805 &cu_stream,
806 LaunchConfig {
807 grid_dim: (grid, 1, 1),
808 block_dim: (BLOCK_SIZE, 1, 1),
809 shared_mem_bytes: 0,
810 },
811 (col0, col1, n, &mut flag_buf),
812 )
813 .map_err(|e| {
814 XlogError::Kernel(format!(
815 "wcoj_layout_check_sorted_unique_u64 launch: {e}"
816 ))
817 })?;
818 }
819 Ok(())
820 })();
821 if let Err(e) = queued_result {
822 let _ = cu_stream.synchronize();
823 return Err(e);
824 }
825
826 if let Err(e) = rec.commit(runtime) {
827 let _ = cu_stream.synchronize();
828 return Err(XlogError::Kernel(format!(
829 "wcoj_layout fast-path u64: commit {e}"
830 )));
831 }
832
833 cu_stream
834 .synchronize()
835 .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path u64: sync {e}")))?;
836
837 let flag_val = self.dtoh_scalar_untracked::<u32>(&flag_buf, 0)?;
838 if flag_val == 1 {
839 Ok(Some(self.recorded_clone_2col_8byte(
840 input,
841 n,
842 launch_stream,
843 &cu_stream,
844 runtime,
845 )?))
846 } else {
847 Ok(None)
848 }
849 }
850
851 fn recorded_clone_2col_4byte(
858 &self,
859 input: &CudaBuffer,
860 n: u32,
861 launch_stream: StreamId,
862 cu_stream: &cudarc::driver::CudaStream,
863 runtime: &std::sync::Arc<crate::device_runtime::XlogDeviceRuntime>,
864 ) -> Result<CudaBuffer> {
865 let bpc = (n as usize) * 4;
866 let out_col0 = self.memory.alloc::<u8>(bpc)?;
867 let out_col1 = self.memory.alloc::<u8>(bpc)?;
868 let out_d_num_rows = self.memory.alloc::<u32>(1)?;
869
870 let src_col0 = input.column(0).expect("col0");
871 let src_col1 = input.column(1).expect("col1");
872
873 let mut rec = LaunchRecorder::new_strict(launch_stream);
874 rec.read(input.num_rows_device());
875 rec.read_column(src_col0);
876 rec.read_column(src_col1);
877 rec.write(&out_col0);
878 rec.write(&out_col1);
879 rec.write(&out_d_num_rows);
880 rec.preflight(runtime)
881 .map_err(|e| XlogError::Kernel(format!("wcoj_layout clone 4B: preflight {e}")))?;
882
883 let queued_result: Result<()> = (|| {
884 unsafe {
885 let r0 = sys::cuMemcpyDtoDAsync_v2(
886 *out_col0.device_ptr(),
887 *src_col0.device_ptr(),
888 bpc,
889 cu_stream.cu_stream(),
890 );
891 if r0 != sys::cudaError_enum::CUDA_SUCCESS {
892 return Err(XlogError::Kernel(format!(
893 "wcoj_layout clone 4B: dtod col0 failed: {r0:?}"
894 )));
895 }
896 let r1 = sys::cuMemcpyDtoDAsync_v2(
897 *out_col1.device_ptr(),
898 *src_col1.device_ptr(),
899 bpc,
900 cu_stream.cu_stream(),
901 );
902 if r1 != sys::cudaError_enum::CUDA_SUCCESS {
903 return Err(XlogError::Kernel(format!(
904 "wcoj_layout clone 4B: dtod col1 failed: {r1:?}"
905 )));
906 }
907 }
908 self.htod_launch_metadata_async_copy_one(
909 &n,
910 &out_d_num_rows,
911 cu_stream,
912 "wcoj_layout clone 4B d_num_rows",
913 )?;
914 Ok(())
915 })();
916 if let Err(e) = queued_result {
917 let _ = cu_stream.synchronize();
918 return Err(e);
919 }
920
921 if let Err(e) = rec.commit(runtime) {
922 let _ = cu_stream.synchronize();
923 return Err(XlogError::Kernel(format!(
924 "wcoj_layout clone 4B: commit {e}"
925 )));
926 }
927
928 Ok(CudaBuffer::from_columns_with_host_count(
929 vec![out_col0.into(), out_col1.into()],
930 n as u64,
931 out_d_num_rows,
932 input.schema().clone(),
933 n,
934 ))
935 }
936
937 fn recorded_clone_2col_8byte(
939 &self,
940 input: &CudaBuffer,
941 n: u32,
942 launch_stream: StreamId,
943 cu_stream: &cudarc::driver::CudaStream,
944 runtime: &std::sync::Arc<crate::device_runtime::XlogDeviceRuntime>,
945 ) -> Result<CudaBuffer> {
946 let bpc = (n as usize) * 8;
947 let out_col0 = self.memory.alloc::<u8>(bpc)?;
948 let out_col1 = self.memory.alloc::<u8>(bpc)?;
949 let out_d_num_rows = self.memory.alloc::<u32>(1)?;
950
951 let src_col0 = input.column(0).expect("col0");
952 let src_col1 = input.column(1).expect("col1");
953
954 let mut rec = LaunchRecorder::new_strict(launch_stream);
955 rec.read(input.num_rows_device());
956 rec.read_column(src_col0);
957 rec.read_column(src_col1);
958 rec.write(&out_col0);
959 rec.write(&out_col1);
960 rec.write(&out_d_num_rows);
961 rec.preflight(runtime)
962 .map_err(|e| XlogError::Kernel(format!("wcoj_layout clone 8B: preflight {e}")))?;
963
964 let queued_result: Result<()> = (|| {
965 unsafe {
966 let r0 = sys::cuMemcpyDtoDAsync_v2(
967 *out_col0.device_ptr(),
968 *src_col0.device_ptr(),
969 bpc,
970 cu_stream.cu_stream(),
971 );
972 if r0 != sys::cudaError_enum::CUDA_SUCCESS {
973 return Err(XlogError::Kernel(format!(
974 "wcoj_layout clone 8B: dtod col0 failed: {r0:?}"
975 )));
976 }
977 let r1 = sys::cuMemcpyDtoDAsync_v2(
978 *out_col1.device_ptr(),
979 *src_col1.device_ptr(),
980 bpc,
981 cu_stream.cu_stream(),
982 );
983 if r1 != sys::cudaError_enum::CUDA_SUCCESS {
984 return Err(XlogError::Kernel(format!(
985 "wcoj_layout clone 8B: dtod col1 failed: {r1:?}"
986 )));
987 }
988 }
989 self.htod_launch_metadata_async_copy_one(
990 &n,
991 &out_d_num_rows,
992 cu_stream,
993 "wcoj_layout clone 8B d_num_rows",
994 )?;
995 Ok(())
996 })();
997 if let Err(e) = queued_result {
998 let _ = cu_stream.synchronize();
999 return Err(e);
1000 }
1001
1002 if let Err(e) = rec.commit(runtime) {
1003 let _ = cu_stream.synchronize();
1004 return Err(XlogError::Kernel(format!(
1005 "wcoj_layout clone 8B: commit {e}"
1006 )));
1007 }
1008
1009 Ok(CudaBuffer::from_columns_with_host_count(
1010 vec![out_col0.into(), out_col1.into()],
1011 n as u64,
1012 out_d_num_rows,
1013 input.schema().clone(),
1014 n,
1015 ))
1016 }
1017}
1018
1019#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1037enum CliqueWidthClass {
1038 FourByte, EightByte, }
1041
1042impl CliqueWidthClass {
1043 fn elem_bytes(self) -> usize {
1044 match self {
1045 CliqueWidthClass::FourByte => 4,
1046 CliqueWidthClass::EightByte => 8,
1047 }
1048 }
1049 fn label(self) -> &'static str {
1050 match self {
1051 CliqueWidthClass::FourByte => "u32",
1052 CliqueWidthClass::EightByte => "u64",
1053 }
1054 }
1055 fn validate_col_type(self, ty: ScalarType) -> bool {
1056 match self {
1057 CliqueWidthClass::FourByte => matches!(ty, ScalarType::U32 | ScalarType::Symbol),
1058 CliqueWidthClass::EightByte => matches!(ty, ScalarType::U64),
1059 }
1060 }
1061}
1062
1063fn clique_kernel_name(k: usize, materialize: bool, w: CliqueWidthClass) -> &'static str {
1065 match (k, materialize, w) {
1066 (5, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE5_COUNT_HG_U32,
1067 (5, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE5_MATERIALIZE_HG_U32,
1068 (5, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE5_COUNT_HG_U64,
1069 (5, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE5_MATERIALIZE_HG_U64,
1070 (6, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE6_COUNT_HG_U32,
1071 (6, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE6_MATERIALIZE_HG_U32,
1072 (6, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE6_COUNT_HG_U64,
1073 (6, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE6_MATERIALIZE_HG_U64,
1074 (7, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE7_COUNT_HG_U32,
1075 (7, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE7_MATERIALIZE_HG_U32,
1076 (7, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE7_COUNT_HG_U64,
1077 (7, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE7_MATERIALIZE_HG_U64,
1078 (8, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE8_COUNT_HG_U32,
1079 (8, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE8_MATERIALIZE_HG_U32,
1080 (8, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE8_COUNT_HG_U64,
1081 (8, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE8_MATERIALIZE_HG_U64,
1082 _ => panic!("clique_kernel_name: K must be 5..8, got {}", k),
1083 }
1084}
1085
1086enum CliqueLeaderMetadata {
1087 U32(WcojRelationMetadata<u32>),
1088 U64(WcojRelationMetadata<u64>),
1089}
1090
1091impl CliqueLeaderMetadata {
1092 fn total_rows_u32(&self, entry_label: &str) -> Result<u32> {
1093 let total = match self {
1094 CliqueLeaderMetadata::U32(metadata) => metadata.total,
1095 CliqueLeaderMetadata::U64(metadata) => metadata.total,
1096 };
1097 u32::try_from(total).map_err(|_| {
1098 XlogError::Kernel(format!(
1099 "{}: leader metadata total {} exceeds u32 kernel surface",
1100 entry_label, total
1101 ))
1102 })
1103 }
1104
1105 fn key_count(&self) -> u32 {
1106 match self {
1107 CliqueLeaderMetadata::U32(metadata) => metadata.key_count,
1108 CliqueLeaderMetadata::U64(metadata) => metadata.key_count,
1109 }
1110 }
1111}
1112
1113fn validate_clique_metadata_leader<'a>(
1114 k: usize,
1115 edges: &'a [&CudaBuffer],
1116 leader_edge_idx: u32,
1117 width_class: CliqueWidthClass,
1118 entry_label: &str,
1119) -> Result<&'a CudaBuffer> {
1120 if !(5..=8).contains(&k) {
1121 return Err(XlogError::Kernel(format!(
1122 "{}: k must be 5..8, got {}",
1123 entry_label, k
1124 )));
1125 }
1126 let expected_edges = k * (k - 1) / 2;
1127 if edges.len() != expected_edges {
1128 return Err(XlogError::Kernel(format!(
1129 "{}: expected {} edges (= C({}, 2)), got {}",
1130 entry_label,
1131 expected_edges,
1132 k,
1133 edges.len()
1134 )));
1135 }
1136 let leader_slot = usize::try_from(leader_edge_idx)
1137 .ok()
1138 .filter(|idx| *idx < expected_edges)
1139 .ok_or_else(|| {
1140 XlogError::Kernel(format!(
1141 "{}: leader_edge_idx {} out of range for {} edges",
1142 entry_label, leader_edge_idx, expected_edges
1143 ))
1144 })?;
1145 let leader = edges[leader_slot];
1146 if leader.arity() != 2 {
1147 return Err(XlogError::Kernel(format!(
1148 "{}: leader edge must be 2-column, got arity {}",
1149 entry_label,
1150 leader.arity()
1151 )));
1152 }
1153 let ty = leader.schema.column_type(0).ok_or_else(|| {
1154 XlogError::Kernel(format!(
1155 "{}: leader edge column 0 type missing",
1156 entry_label
1157 ))
1158 })?;
1159 if !width_class.validate_col_type(ty) {
1160 return Err(XlogError::Kernel(format!(
1161 "{}: leader edge column 0 type {:?} not in {} width-class",
1162 entry_label,
1163 ty,
1164 width_class.label()
1165 )));
1166 }
1167 Ok(leader)
1168}
1169
1170impl CudaKernelProvider {
1171 fn wcoj_clique_metadata_recorded_u32_inner(
1172 &self,
1173 k: usize,
1174 edges: &[&CudaBuffer],
1175 leader_edge_idx: u32,
1176 launch_stream: StreamId,
1177 entry_label: &str,
1178 ) -> Result<WcojRelationMetadata<u32>> {
1179 let leader = validate_clique_metadata_leader(
1180 k,
1181 edges,
1182 leader_edge_idx,
1183 CliqueWidthClass::FourByte,
1184 entry_label,
1185 )?;
1186 self.wcoj_build_metadata_u32_recorded(leader, 0, launch_stream)
1187 }
1188
1189 fn wcoj_clique_metadata_recorded_u64_inner(
1190 &self,
1191 k: usize,
1192 edges: &[&CudaBuffer],
1193 leader_edge_idx: u32,
1194 launch_stream: StreamId,
1195 entry_label: &str,
1196 ) -> Result<WcojRelationMetadata<u64>> {
1197 let leader = validate_clique_metadata_leader(
1198 k,
1199 edges,
1200 leader_edge_idx,
1201 CliqueWidthClass::EightByte,
1202 entry_label,
1203 )?;
1204 self.wcoj_build_metadata_u64_recorded(leader, 0, launch_stream)
1205 }
1206
1207 pub fn wcoj_clique5_metadata_recorded_u32(
1209 &self,
1210 edges: &[&CudaBuffer; 10],
1211 leader_edge_idx: u32,
1212 launch_stream: StreamId,
1213 ) -> Result<WcojRelationMetadata<u32>> {
1214 self.wcoj_clique_metadata_recorded_u32_inner(
1215 5,
1216 edges,
1217 leader_edge_idx,
1218 launch_stream,
1219 "wcoj_clique5_metadata_recorded_u32",
1220 )
1221 }
1222
1223 pub fn wcoj_clique5_metadata_recorded_u64(
1225 &self,
1226 edges: &[&CudaBuffer; 10],
1227 leader_edge_idx: u32,
1228 launch_stream: StreamId,
1229 ) -> Result<WcojRelationMetadata<u64>> {
1230 self.wcoj_clique_metadata_recorded_u64_inner(
1231 5,
1232 edges,
1233 leader_edge_idx,
1234 launch_stream,
1235 "wcoj_clique5_metadata_recorded_u64",
1236 )
1237 }
1238
1239 pub fn wcoj_clique6_metadata_recorded_u32(
1241 &self,
1242 edges: &[&CudaBuffer; 15],
1243 leader_edge_idx: u32,
1244 launch_stream: StreamId,
1245 ) -> Result<WcojRelationMetadata<u32>> {
1246 self.wcoj_clique_metadata_recorded_u32_inner(
1247 6,
1248 edges,
1249 leader_edge_idx,
1250 launch_stream,
1251 "wcoj_clique6_metadata_recorded_u32",
1252 )
1253 }
1254
1255 pub fn wcoj_clique6_metadata_recorded_u64(
1257 &self,
1258 edges: &[&CudaBuffer; 15],
1259 leader_edge_idx: u32,
1260 launch_stream: StreamId,
1261 ) -> Result<WcojRelationMetadata<u64>> {
1262 self.wcoj_clique_metadata_recorded_u64_inner(
1263 6,
1264 edges,
1265 leader_edge_idx,
1266 launch_stream,
1267 "wcoj_clique6_metadata_recorded_u64",
1268 )
1269 }
1270
1271 #[allow(clippy::too_many_arguments)]
1286 fn wcoj_clique_recorded_inner(
1287 &self,
1288 k: usize,
1289 edges: &[&CudaBuffer],
1290 leader_edge_idx: u32,
1291 edge_order: Option<&[u8]>,
1292 iteration_order: Option<&[u8]>,
1293 width_class: CliqueWidthClass,
1294 launch_stream: StreamId,
1295 entry_label: &str,
1296 ) -> Result<CudaBuffer> {
1297 let runtime = self.memory().runtime().ok_or_else(|| {
1298 XlogError::Kernel(format!(
1299 "{} requires a runtime-backed GpuMemoryManager (with_runtime)",
1300 entry_label
1301 ))
1302 })?;
1303 let cu_stream = runtime
1304 .stream_pool()
1305 .resolve(launch_stream)
1306 .ok_or_else(|| {
1307 XlogError::Kernel(format!(
1308 "{}: launch_stream StreamId({}) does not resolve",
1309 entry_label, launch_stream.0
1310 ))
1311 })?;
1312 if !(5..=8).contains(&k) {
1313 return Err(XlogError::Kernel(format!(
1314 "{}: k must be 5..8, got {}",
1315 entry_label, k
1316 )));
1317 }
1318 let expected_edges = k * (k - 1) / 2;
1319 if edges.len() != expected_edges {
1320 return Err(XlogError::Kernel(format!(
1321 "{}: expected {} edges (= C({}, 2)), got {}",
1322 entry_label,
1323 expected_edges,
1324 k,
1325 edges.len()
1326 )));
1327 }
1328 if usize::try_from(leader_edge_idx)
1329 .ok()
1330 .is_none_or(|idx| idx >= expected_edges)
1331 {
1332 return Err(XlogError::Kernel(format!(
1333 "{}: leader_edge_idx {} out of range for {} edges",
1334 entry_label, leader_edge_idx, expected_edges
1335 )));
1336 }
1337 match (edge_order, iteration_order) {
1338 (Some(edge_order), Some(iteration_order)) => {
1339 validate_clique_u8_permutation(
1340 edge_order,
1341 expected_edges,
1342 "edge_order",
1343 entry_label,
1344 )?;
1345 validate_clique_u8_permutation(iteration_order, k, "iteration_order", entry_label)?;
1346 }
1347 (None, None) => {}
1348 _ => {
1349 return Err(XlogError::Kernel(format!(
1350 "{}: edge_order and iteration_order must both be present or both be omitted",
1351 entry_label
1352 )));
1353 }
1354 }
1355
1356 for (i, buf) in edges.iter().enumerate() {
1358 if buf.arity() != 2 {
1359 return Err(XlogError::Kernel(format!(
1360 "{}: edge[{}] must be 2-column, got arity {}",
1361 entry_label,
1362 i,
1363 buf.arity()
1364 )));
1365 }
1366 for col_idx in 0..2 {
1367 let ty = buf.schema.column_type(col_idx).ok_or_else(|| {
1368 XlogError::Kernel(format!(
1369 "{}: edge[{}] column {} type missing",
1370 entry_label, i, col_idx
1371 ))
1372 })?;
1373 if !width_class.validate_col_type(ty) {
1374 return Err(XlogError::Kernel(format!(
1375 "{}: edge[{}] column {} type {:?} not in {} width-class",
1376 entry_label,
1377 i,
1378 col_idx,
1379 ty,
1380 width_class.label()
1381 )));
1382 }
1383 }
1384 }
1385
1386 let mut head_types = Vec::with_capacity(k);
1390 let leader_slot = edge_order.map(|order| order[0] as usize).unwrap_or(0);
1391 head_types.push(edges[leader_slot].schema.column_type(0).expect("validated"));
1392 head_types.push(edges[leader_slot].schema.column_type(1).expect("validated"));
1393 for i in 2..k {
1394 let logical_edge = i - 1;
1395 let edge_slot = edge_order
1396 .map(|order| order[logical_edge] as usize)
1397 .unwrap_or(logical_edge);
1398 head_types.push(edges[edge_slot].schema.column_type(1).expect("validated"));
1399 }
1400 let out_schema = Schema::new(
1401 head_types
1402 .iter()
1403 .enumerate()
1404 .map(|(i, t)| (format!("col{}", i), *t))
1405 .collect(),
1406 );
1407
1408 let leader_slot = usize::try_from(leader_edge_idx).expect("validated");
1409 let n_leader = self.logical_row_count_u32(edges[leader_slot])?;
1410 if n_leader == 0 {
1411 return self.create_empty_buffer(out_schema);
1412 }
1413 let metadata_start = Instant::now();
1416 let leader_metadata = match width_class {
1417 CliqueWidthClass::FourByte => {
1418 CliqueLeaderMetadata::U32(self.wcoj_clique_metadata_recorded_u32_inner(
1419 k,
1420 edges,
1421 leader_edge_idx,
1422 launch_stream,
1423 entry_label,
1424 )?)
1425 }
1426 CliqueWidthClass::EightByte => {
1427 CliqueLeaderMetadata::U64(self.wcoj_clique_metadata_recorded_u64_inner(
1428 k,
1429 edges,
1430 leader_edge_idx,
1431 launch_stream,
1432 entry_label,
1433 )?)
1434 }
1435 };
1436 self.record_kclique_metadata_build_nanos(metadata_start.elapsed().as_nanos());
1437 let leader_work_total = leader_metadata.total_rows_u32(entry_label)?;
1438 if leader_work_total != n_leader {
1439 return Err(XlogError::Kernel(format!(
1440 "{}: leader metadata total {} does not match leader row count {}",
1441 entry_label, leader_work_total, n_leader
1442 )));
1443 }
1444 let leader_metadata_key_count = leader_metadata.key_count();
1445 if leader_metadata_key_count == 0 {
1446 return self.create_empty_buffer(out_schema);
1447 }
1448
1449 let mut edge_col0_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
1453 let mut edge_col1_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
1454 let mut edge_n_host: Vec<u32> = Vec::with_capacity(expected_edges);
1455 for buf in edges.iter() {
1456 let col0 = buf.column(0).expect("validated");
1457 let col1 = buf.column(1).expect("validated");
1458 edge_col0_ptrs.push(*col0.device_ptr());
1459 edge_col1_ptrs.push(*col1.device_ptr());
1460 edge_n_host.push(self.logical_row_count_u32(buf)?);
1461 }
1462
1463 let mut d_edge_col0 = self.memory.alloc::<u64>(expected_edges)?;
1465 let mut d_edge_col1 = self.memory.alloc::<u64>(expected_edges)?;
1466 let mut d_edge_n = self.memory.alloc::<u32>(expected_edges)?;
1467 let device = self.device.inner();
1468 self.htod_launch_metadata_sync_copy_into(&edge_col0_ptrs, &mut d_edge_col0)
1469 .map_err(|e| {
1470 XlogError::Kernel(format!(
1471 "{}: htod edge_col0_ptrs failed: {}",
1472 entry_label, e
1473 ))
1474 })?;
1475 self.htod_launch_metadata_sync_copy_into(&edge_col1_ptrs, &mut d_edge_col1)
1476 .map_err(|e| {
1477 XlogError::Kernel(format!(
1478 "{}: htod edge_col1_ptrs failed: {}",
1479 entry_label, e
1480 ))
1481 })?;
1482 self.htod_launch_metadata_sync_copy_into(&edge_n_host, &mut d_edge_n)
1483 .map_err(|e| {
1484 XlogError::Kernel(format!("{}: htod edge_n failed: {}", entry_label, e))
1485 })?;
1486 let d_edge_order = if let Some(edge_order) = edge_order {
1487 let mut buf = self.memory.alloc::<u8>(expected_edges)?;
1488 self.htod_launch_metadata_sync_copy_into(edge_order, &mut buf)
1489 .map_err(|e| {
1490 XlogError::Kernel(format!("{}: htod edge_order failed: {}", entry_label, e))
1491 })?;
1492 Some(buf)
1493 } else {
1494 None
1495 };
1496 let d_iteration_order = if let Some(iteration_order) = iteration_order {
1497 let mut buf = self.memory.alloc::<u8>(k)?;
1498 self.htod_launch_metadata_sync_copy_into(iteration_order, &mut buf)
1499 .map_err(|e| {
1500 XlogError::Kernel(format!(
1501 "{}: htod iteration_order failed: {}",
1502 entry_label, e
1503 ))
1504 })?;
1505 Some(buf)
1506 } else {
1507 None
1508 };
1509
1510 let block_work_unit = crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT;
1512 let grid = leader_work_total.div_ceil(block_work_unit);
1513 let count_buf = self.memory.alloc::<u32>(grid as usize)?;
1514 let thread_counts_buf = self
1515 .memory
1516 .alloc::<u32>((grid as usize) * (BLOCK_SIZE as usize))?;
1517 let mut offsets_buf = self.memory.alloc::<u32>(grid as usize)?;
1518 let d_total = self.memory.alloc::<u32>(1)?;
1519
1520 let mut rec_count = LaunchRecorder::new_strict(launch_stream);
1521 for buf in edges.iter() {
1522 rec_count.read(buf.num_rows_device());
1523 rec_count.read_column(buf.column(0).expect("validated"));
1524 rec_count.read_column(buf.column(1).expect("validated"));
1525 }
1526 rec_count.read(&d_edge_col0);
1527 rec_count.read(&d_edge_col1);
1528 rec_count.read(&d_edge_n);
1529 if let Some(buf) = d_edge_order.as_ref() {
1530 rec_count.read(buf);
1531 }
1532 if let Some(buf) = d_iteration_order.as_ref() {
1533 rec_count.read(buf);
1534 }
1535 match &leader_metadata {
1536 CliqueLeaderMetadata::U32(leader_metadata) => {
1537 rec_count.read(&leader_metadata.unique_keys);
1538 rec_count.read(&leader_metadata.fan_out);
1539 rec_count.read(&leader_metadata.prefix_sum);
1540 }
1541 CliqueLeaderMetadata::U64(leader_metadata) => {
1542 rec_count.read(&leader_metadata.unique_keys);
1543 rec_count.read(&leader_metadata.fan_out);
1544 rec_count.read(&leader_metadata.prefix_sum);
1545 }
1546 }
1547 rec_count.write(&count_buf);
1548 rec_count.write(&thread_counts_buf);
1549 rec_count.write(&offsets_buf);
1550 rec_count.write(&d_total);
1551 rec_count.preflight(runtime).map_err(|e| {
1552 XlogError::Kernel(format!("{}: count preflight failed: {}", entry_label, e))
1553 })?;
1554
1555 let count_kernel = device
1556 .get_func(WCOJ_MODULE, clique_kernel_name(k, false, width_class))
1557 .ok_or_else(|| {
1558 XlogError::Kernel(format!(
1559 "{}: count kernel '{}' not found",
1560 entry_label,
1561 clique_kernel_name(k, false, width_class)
1562 ))
1563 })?;
1564 let count_config = LaunchConfig {
1565 grid_dim: (grid, 1, 1),
1566 block_dim: (BLOCK_SIZE, 1, 1),
1567 shared_mem_bytes: 0,
1568 };
1569
1570 let null_order_ptr = 0_u64;
1590 let edge_order_param = match d_edge_order.as_ref() {
1591 Some(buf) => buf.as_kernel_param(),
1592 None => null_order_ptr.as_kernel_param(),
1593 };
1594 let iteration_order_param = match d_iteration_order.as_ref() {
1595 Some(buf) => buf.as_kernel_param(),
1596 None => null_order_ptr.as_kernel_param(),
1597 };
1598 unsafe {
1599 let mut params: Vec<*mut c_void> = match &leader_metadata {
1600 CliqueLeaderMetadata::U32(leader_metadata) => vec![
1601 (&d_edge_col0).as_kernel_param(),
1602 (&d_edge_col1).as_kernel_param(),
1603 (&d_edge_n).as_kernel_param(),
1604 leader_edge_idx.as_kernel_param(),
1605 edge_order_param,
1606 iteration_order_param,
1607 n_leader.as_kernel_param(),
1608 (&leader_metadata.unique_keys).as_kernel_param(),
1609 (&leader_metadata.fan_out).as_kernel_param(),
1610 (&leader_metadata.prefix_sum).as_kernel_param(),
1611 leader_metadata_key_count.as_kernel_param(),
1612 block_work_unit.as_kernel_param(),
1613 (&count_buf).as_kernel_param(),
1614 (&thread_counts_buf).as_kernel_param(),
1615 ],
1616 CliqueLeaderMetadata::U64(leader_metadata) => vec![
1617 (&d_edge_col0).as_kernel_param(),
1618 (&d_edge_col1).as_kernel_param(),
1619 (&d_edge_n).as_kernel_param(),
1620 leader_edge_idx.as_kernel_param(),
1621 edge_order_param,
1622 iteration_order_param,
1623 n_leader.as_kernel_param(),
1624 (&leader_metadata.unique_keys).as_kernel_param(),
1625 (&leader_metadata.fan_out).as_kernel_param(),
1626 (&leader_metadata.prefix_sum).as_kernel_param(),
1627 leader_metadata_key_count.as_kernel_param(),
1628 block_work_unit.as_kernel_param(),
1629 (&count_buf).as_kernel_param(),
1630 (&thread_counts_buf).as_kernel_param(),
1631 ],
1632 };
1633 count_kernel
1634 .clone()
1635 .launch_on_stream(&cu_stream, count_config, &mut params)
1636 .map_err(|e| {
1637 XlogError::Kernel(format!(
1638 "{}: count kernel launch failed: {}",
1639 entry_label, e
1640 ))
1641 })?;
1642 }
1643
1644 let bytes_count = (grid as usize) * std::mem::size_of::<u32>();
1646 unsafe {
1647 let res = sys::cuMemcpyDtoDAsync_v2(
1648 *offsets_buf.device_ptr(),
1649 *count_buf.device_ptr(),
1650 bytes_count,
1651 cu_stream.cu_stream(),
1652 );
1653 if res != sys::cudaError_enum::CUDA_SUCCESS {
1654 return Err(XlogError::Kernel(format!(
1655 "{}: dtod count → offsets failed: {:?}",
1656 entry_label, res
1657 )));
1658 }
1659 }
1660
1661 self.multiblock_scan_u32_inplace_on_stream(
1663 &mut offsets_buf,
1664 grid,
1665 &cu_stream,
1666 launch_stream,
1667 runtime,
1668 )?;
1669
1670 let total_kernel = device
1672 .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
1673 .ok_or_else(|| XlogError::Kernel("wcoj_compute_total kernel not found".to_string()))?;
1674 unsafe {
1675 total_kernel
1676 .clone()
1677 .launch_on_stream(
1678 &cu_stream,
1679 LaunchConfig {
1680 grid_dim: (1, 1, 1),
1681 block_dim: (1, 1, 1),
1682 shared_mem_bytes: 0,
1683 },
1684 (&count_buf, &offsets_buf, grid, &d_total),
1685 )
1686 .map_err(|e| {
1687 XlogError::Kernel(format!("wcoj_compute_total launch failed: {}", e))
1688 })?;
1689 }
1690
1691 rec_count.commit(runtime).map_err(|e| {
1692 XlogError::Kernel(format!(
1693 "{}: count+scan+total commit failed: {}",
1694 entry_label, e
1695 ))
1696 })?;
1697
1698 cu_stream.synchronize().map_err(|e| {
1699 XlogError::Kernel(format!(
1700 "{}: stream sync after total failed: {}",
1701 entry_label, e
1702 ))
1703 })?;
1704 let total_rows = self
1705 .dtoh_scalar_untracked::<u32>(&d_total, 0)
1706 .map_err(|e| {
1707 XlogError::Kernel(format!("{}: read d_total failed: {}", entry_label, e))
1708 })?;
1709
1710 if total_rows == 0 {
1711 return self.create_empty_buffer(out_schema);
1712 }
1713
1714 let elem_bytes = width_class.elem_bytes();
1716 let bytes_per_col = (total_rows as usize) * elem_bytes;
1717 let mut out_col_bufs: Vec<TrackedCudaSlice<u8>> = Vec::with_capacity(k);
1718 let mut out_col_ptrs: Vec<u64> = Vec::with_capacity(k);
1719 for _ in 0..k {
1720 let buf = self.memory.alloc::<u8>(bytes_per_col)?;
1721 out_col_ptrs.push(*buf.device_ptr());
1722 out_col_bufs.push(buf);
1723 }
1724 let mut d_out_cols = self.memory.alloc::<u64>(k)?;
1725 self.htod_launch_metadata_sync_copy_into(&out_col_ptrs, &mut d_out_cols)
1726 .map_err(|e| {
1727 XlogError::Kernel(format!("{}: htod out_col_ptrs failed: {}", entry_label, e))
1728 })?;
1729 let out_d_num_rows = self.memory.alloc::<u32>(1)?;
1730
1731 self.htod_launch_metadata_async_copy_one(
1733 &total_rows,
1734 &out_d_num_rows,
1735 &cu_stream,
1736 &format!("{entry_label}: out_d_num_rows"),
1737 )?;
1738
1739 let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
1740 for buf in edges.iter() {
1741 rec_mat.read(buf.num_rows_device());
1742 rec_mat.read_column(buf.column(0).expect("validated"));
1743 rec_mat.read_column(buf.column(1).expect("validated"));
1744 }
1745 rec_mat.read(&d_edge_col0);
1746 rec_mat.read(&d_edge_col1);
1747 rec_mat.read(&d_edge_n);
1748 if let Some(buf) = d_edge_order.as_ref() {
1749 rec_mat.read(buf);
1750 }
1751 if let Some(buf) = d_iteration_order.as_ref() {
1752 rec_mat.read(buf);
1753 }
1754 match &leader_metadata {
1755 CliqueLeaderMetadata::U32(leader_metadata) => {
1756 rec_mat.read(&leader_metadata.unique_keys);
1757 rec_mat.read(&leader_metadata.fan_out);
1758 rec_mat.read(&leader_metadata.prefix_sum);
1759 }
1760 CliqueLeaderMetadata::U64(leader_metadata) => {
1761 rec_mat.read(&leader_metadata.unique_keys);
1762 rec_mat.read(&leader_metadata.fan_out);
1763 rec_mat.read(&leader_metadata.prefix_sum);
1764 }
1765 }
1766 rec_mat.read(&thread_counts_buf);
1767 rec_mat.read(&offsets_buf);
1768 rec_mat.read(&d_out_cols);
1769 for buf in out_col_bufs.iter() {
1770 rec_mat.write(buf);
1771 }
1772 rec_mat.write(&out_d_num_rows);
1773 rec_mat.preflight(runtime).map_err(|e| {
1774 XlogError::Kernel(format!(
1775 "{}: materialize preflight failed: {}",
1776 entry_label, e
1777 ))
1778 })?;
1779
1780 let materialize_kernel = device
1781 .get_func(WCOJ_MODULE, clique_kernel_name(k, true, width_class))
1782 .ok_or_else(|| {
1783 XlogError::Kernel(format!(
1784 "{}: materialize kernel '{}' not found",
1785 entry_label,
1786 clique_kernel_name(k, true, width_class)
1787 ))
1788 })?;
1789
1790 let mat_config = LaunchConfig {
1811 grid_dim: (grid, 1, 1),
1812 block_dim: (BLOCK_SIZE, 1, 1),
1813 shared_mem_bytes: 0,
1814 };
1815 unsafe {
1816 let mut params: Vec<*mut c_void> = match &leader_metadata {
1817 CliqueLeaderMetadata::U32(leader_metadata) => vec![
1818 (&d_edge_col0).as_kernel_param(),
1819 (&d_edge_col1).as_kernel_param(),
1820 (&d_edge_n).as_kernel_param(),
1821 leader_edge_idx.as_kernel_param(),
1822 edge_order_param,
1823 iteration_order_param,
1824 n_leader.as_kernel_param(),
1825 (&leader_metadata.unique_keys).as_kernel_param(),
1826 (&leader_metadata.fan_out).as_kernel_param(),
1827 (&leader_metadata.prefix_sum).as_kernel_param(),
1828 leader_metadata_key_count.as_kernel_param(),
1829 block_work_unit.as_kernel_param(),
1830 (&thread_counts_buf).as_kernel_param(),
1831 (&offsets_buf).as_kernel_param(),
1832 total_rows.as_kernel_param(),
1833 (&d_out_cols).as_kernel_param(),
1834 ],
1835 CliqueLeaderMetadata::U64(leader_metadata) => vec![
1836 (&d_edge_col0).as_kernel_param(),
1837 (&d_edge_col1).as_kernel_param(),
1838 (&d_edge_n).as_kernel_param(),
1839 leader_edge_idx.as_kernel_param(),
1840 edge_order_param,
1841 iteration_order_param,
1842 n_leader.as_kernel_param(),
1843 (&leader_metadata.unique_keys).as_kernel_param(),
1844 (&leader_metadata.fan_out).as_kernel_param(),
1845 (&leader_metadata.prefix_sum).as_kernel_param(),
1846 leader_metadata_key_count.as_kernel_param(),
1847 block_work_unit.as_kernel_param(),
1848 (&thread_counts_buf).as_kernel_param(),
1849 (&offsets_buf).as_kernel_param(),
1850 total_rows.as_kernel_param(),
1851 (&d_out_cols).as_kernel_param(),
1852 ],
1853 };
1854 materialize_kernel
1855 .clone()
1856 .launch_on_stream(&cu_stream, mat_config, &mut params)
1857 .map_err(|e| {
1858 XlogError::Kernel(format!("{}: materialize launch failed: {}", entry_label, e))
1859 })?;
1860 }
1861
1862 rec_mat.commit(runtime).map_err(|e| {
1863 XlogError::Kernel(format!("{}: materialize commit failed: {}", entry_label, e))
1864 })?;
1865
1866 let columns: Vec<CudaColumn> = out_col_bufs.into_iter().map(|b| b.into()).collect();
1867 Ok(CudaBuffer::from_columns_with_host_count(
1868 columns,
1869 total_rows as u64,
1870 out_d_num_rows,
1871 out_schema,
1872 total_rows,
1873 ))
1874 }
1875
1876 pub fn wcoj_clique5_u32_recorded(
1886 &self,
1887 edges: &[&CudaBuffer; 10],
1888 launch_stream: StreamId,
1889 ) -> Result<CudaBuffer> {
1890 self.wcoj_clique_recorded_inner(
1891 5,
1892 edges,
1893 0,
1894 None,
1895 None,
1896 CliqueWidthClass::FourByte,
1897 launch_stream,
1898 "wcoj_clique5_u32_recorded",
1899 )
1900 }
1901
1902 pub fn wcoj_clique5_u32_recorded_planned(
1904 &self,
1905 edges: &[&CudaBuffer; 10],
1906 leader_edge_idx: u32,
1907 edge_order: &[u8],
1908 iteration_order: &[u8],
1909 launch_stream: StreamId,
1910 ) -> Result<CudaBuffer> {
1911 self.wcoj_clique_recorded_inner(
1912 5,
1913 edges,
1914 leader_edge_idx,
1915 Some(edge_order),
1916 Some(iteration_order),
1917 CliqueWidthClass::FourByte,
1918 launch_stream,
1919 "wcoj_clique5_u32_recorded_planned",
1920 )
1921 }
1922
1923 pub fn wcoj_clique5_u64_recorded(
1925 &self,
1926 edges: &[&CudaBuffer; 10],
1927 launch_stream: StreamId,
1928 ) -> Result<CudaBuffer> {
1929 self.wcoj_clique_recorded_inner(
1930 5,
1931 edges,
1932 0,
1933 None,
1934 None,
1935 CliqueWidthClass::EightByte,
1936 launch_stream,
1937 "wcoj_clique5_u64_recorded",
1938 )
1939 }
1940
1941 pub fn wcoj_clique5_u64_recorded_planned(
1943 &self,
1944 edges: &[&CudaBuffer; 10],
1945 leader_edge_idx: u32,
1946 edge_order: &[u8],
1947 iteration_order: &[u8],
1948 launch_stream: StreamId,
1949 ) -> Result<CudaBuffer> {
1950 self.wcoj_clique_recorded_inner(
1951 5,
1952 edges,
1953 leader_edge_idx,
1954 Some(edge_order),
1955 Some(iteration_order),
1956 CliqueWidthClass::EightByte,
1957 launch_stream,
1958 "wcoj_clique5_u64_recorded_planned",
1959 )
1960 }
1961
1962 pub fn wcoj_clique6_u32_recorded(
1968 &self,
1969 edges: &[&CudaBuffer; 15],
1970 launch_stream: StreamId,
1971 ) -> Result<CudaBuffer> {
1972 self.wcoj_clique_recorded_inner(
1973 6,
1974 edges,
1975 0,
1976 None,
1977 None,
1978 CliqueWidthClass::FourByte,
1979 launch_stream,
1980 "wcoj_clique6_u32_recorded",
1981 )
1982 }
1983
1984 pub fn wcoj_clique6_u32_recorded_planned(
1986 &self,
1987 edges: &[&CudaBuffer; 15],
1988 leader_edge_idx: u32,
1989 edge_order: &[u8],
1990 iteration_order: &[u8],
1991 launch_stream: StreamId,
1992 ) -> Result<CudaBuffer> {
1993 self.wcoj_clique_recorded_inner(
1994 6,
1995 edges,
1996 leader_edge_idx,
1997 Some(edge_order),
1998 Some(iteration_order),
1999 CliqueWidthClass::FourByte,
2000 launch_stream,
2001 "wcoj_clique6_u32_recorded_planned",
2002 )
2003 }
2004
2005 pub fn wcoj_clique6_u64_recorded(
2007 &self,
2008 edges: &[&CudaBuffer; 15],
2009 launch_stream: StreamId,
2010 ) -> Result<CudaBuffer> {
2011 self.wcoj_clique_recorded_inner(
2012 6,
2013 edges,
2014 0,
2015 None,
2016 None,
2017 CliqueWidthClass::EightByte,
2018 launch_stream,
2019 "wcoj_clique6_u64_recorded",
2020 )
2021 }
2022
2023 pub fn wcoj_clique6_u64_recorded_planned(
2025 &self,
2026 edges: &[&CudaBuffer; 15],
2027 leader_edge_idx: u32,
2028 edge_order: &[u8],
2029 iteration_order: &[u8],
2030 launch_stream: StreamId,
2031 ) -> Result<CudaBuffer> {
2032 self.wcoj_clique_recorded_inner(
2033 6,
2034 edges,
2035 leader_edge_idx,
2036 Some(edge_order),
2037 Some(iteration_order),
2038 CliqueWidthClass::EightByte,
2039 launch_stream,
2040 "wcoj_clique6_u64_recorded_planned",
2041 )
2042 }
2043
2044 pub fn wcoj_clique7_u32_recorded(
2046 &self,
2047 edges: &[&CudaBuffer; 21],
2048 launch_stream: StreamId,
2049 ) -> Result<CudaBuffer> {
2050 self.wcoj_clique_recorded_inner(
2051 7,
2052 edges,
2053 0,
2054 None,
2055 None,
2056 CliqueWidthClass::FourByte,
2057 launch_stream,
2058 "wcoj_clique7_u32_recorded",
2059 )
2060 }
2061
2062 pub fn wcoj_clique7_u32_recorded_planned(
2064 &self,
2065 edges: &[&CudaBuffer; 21],
2066 leader_edge_idx: u32,
2067 edge_order: &[u8],
2068 iteration_order: &[u8],
2069 launch_stream: StreamId,
2070 ) -> Result<CudaBuffer> {
2071 self.wcoj_clique_recorded_inner(
2072 7,
2073 edges,
2074 leader_edge_idx,
2075 Some(edge_order),
2076 Some(iteration_order),
2077 CliqueWidthClass::FourByte,
2078 launch_stream,
2079 "wcoj_clique7_u32_recorded_planned",
2080 )
2081 }
2082
2083 pub fn wcoj_clique7_u64_recorded(
2085 &self,
2086 edges: &[&CudaBuffer; 21],
2087 launch_stream: StreamId,
2088 ) -> Result<CudaBuffer> {
2089 self.wcoj_clique_recorded_inner(
2090 7,
2091 edges,
2092 0,
2093 None,
2094 None,
2095 CliqueWidthClass::EightByte,
2096 launch_stream,
2097 "wcoj_clique7_u64_recorded",
2098 )
2099 }
2100
2101 pub fn wcoj_clique7_u64_recorded_planned(
2103 &self,
2104 edges: &[&CudaBuffer; 21],
2105 leader_edge_idx: u32,
2106 edge_order: &[u8],
2107 iteration_order: &[u8],
2108 launch_stream: StreamId,
2109 ) -> Result<CudaBuffer> {
2110 self.wcoj_clique_recorded_inner(
2111 7,
2112 edges,
2113 leader_edge_idx,
2114 Some(edge_order),
2115 Some(iteration_order),
2116 CliqueWidthClass::EightByte,
2117 launch_stream,
2118 "wcoj_clique7_u64_recorded_planned",
2119 )
2120 }
2121
2122 pub fn wcoj_clique8_u32_recorded(
2124 &self,
2125 edges: &[&CudaBuffer; 28],
2126 launch_stream: StreamId,
2127 ) -> Result<CudaBuffer> {
2128 self.wcoj_clique_recorded_inner(
2129 8,
2130 edges,
2131 0,
2132 None,
2133 None,
2134 CliqueWidthClass::FourByte,
2135 launch_stream,
2136 "wcoj_clique8_u32_recorded",
2137 )
2138 }
2139
2140 pub fn wcoj_clique8_u32_recorded_planned(
2142 &self,
2143 edges: &[&CudaBuffer; 28],
2144 leader_edge_idx: u32,
2145 edge_order: &[u8],
2146 iteration_order: &[u8],
2147 launch_stream: StreamId,
2148 ) -> Result<CudaBuffer> {
2149 self.wcoj_clique_recorded_inner(
2150 8,
2151 edges,
2152 leader_edge_idx,
2153 Some(edge_order),
2154 Some(iteration_order),
2155 CliqueWidthClass::FourByte,
2156 launch_stream,
2157 "wcoj_clique8_u32_recorded_planned",
2158 )
2159 }
2160
2161 pub fn wcoj_clique8_u64_recorded(
2163 &self,
2164 edges: &[&CudaBuffer; 28],
2165 launch_stream: StreamId,
2166 ) -> Result<CudaBuffer> {
2167 self.wcoj_clique_recorded_inner(
2168 8,
2169 edges,
2170 0,
2171 None,
2172 None,
2173 CliqueWidthClass::EightByte,
2174 launch_stream,
2175 "wcoj_clique8_u64_recorded",
2176 )
2177 }
2178
2179 pub fn wcoj_clique8_u64_recorded_planned(
2181 &self,
2182 edges: &[&CudaBuffer; 28],
2183 leader_edge_idx: u32,
2184 edge_order: &[u8],
2185 iteration_order: &[u8],
2186 launch_stream: StreamId,
2187 ) -> Result<CudaBuffer> {
2188 self.wcoj_clique_recorded_inner(
2189 8,
2190 edges,
2191 leader_edge_idx,
2192 Some(edge_order),
2193 Some(iteration_order),
2194 CliqueWidthClass::EightByte,
2195 launch_stream,
2196 "wcoj_clique8_u64_recorded_planned",
2197 )
2198 }
2199
2200 #[allow(clippy::too_many_arguments)]
2225 fn wcoj_clique_groupby_root_count_recorded_inner(
2226 &self,
2227 k: usize,
2228 edges: &[&CudaBuffer],
2229 leader_edge_idx: u32,
2230 edge_order: &[u8],
2231 iteration_order: &[u8],
2232 launch_stream: StreamId,
2233 entry_label: &str,
2234 ) -> Result<CudaBuffer> {
2235 let runtime = self.memory().runtime().ok_or_else(|| {
2236 XlogError::Kernel(format!(
2237 "{} requires a runtime-backed GpuMemoryManager (with_runtime)",
2238 entry_label
2239 ))
2240 })?;
2241 let cu_stream = runtime
2242 .stream_pool()
2243 .resolve(launch_stream)
2244 .ok_or_else(|| {
2245 XlogError::Kernel(format!(
2246 "{}: launch_stream StreamId({}) does not resolve",
2247 entry_label, launch_stream.0
2248 ))
2249 })?;
2250 if !matches!(k, 5 | 6) {
2251 return Err(XlogError::Kernel(format!(
2252 "{}: fused count-by-root supports k 5..6, got {}",
2253 entry_label, k
2254 )));
2255 }
2256 let expected_edges = k * (k - 1) / 2;
2257 if edges.len() != expected_edges {
2258 return Err(XlogError::Kernel(format!(
2259 "{}: expected {} edges (= C({}, 2)), got {}",
2260 entry_label,
2261 expected_edges,
2262 k,
2263 edges.len()
2264 )));
2265 }
2266 let leader_slot = usize::try_from(leader_edge_idx)
2267 .ok()
2268 .filter(|idx| *idx < expected_edges)
2269 .ok_or_else(|| {
2270 XlogError::Kernel(format!(
2271 "{}: leader_edge_idx {} out of range for {} edges",
2272 entry_label, leader_edge_idx, expected_edges
2273 ))
2274 })?;
2275 validate_clique_u8_permutation(edge_order, expected_edges, "edge_order", entry_label)?;
2276 validate_clique_u8_permutation(iteration_order, k, "iteration_order", entry_label)?;
2277
2278 let mut laid_out: Vec<CudaBuffer> = Vec::with_capacity(expected_edges);
2282 for buf in edges {
2283 laid_out.push(self.wcoj_layout_u32_recorded(buf, launch_stream)?);
2284 }
2285 let edges: Vec<&CudaBuffer> = laid_out.iter().collect();
2286 let leader = edges[leader_slot];
2287
2288 let w_type = leader.schema.column_type(0).ok_or_else(|| {
2289 XlogError::Kernel(format!(
2290 "{}: leader edge column 0 type missing",
2291 entry_label
2292 ))
2293 })?;
2294 let out_schema = Schema::new(vec![
2295 ("root".to_string(), w_type),
2296 ("count".to_string(), ScalarType::U64),
2297 ]);
2298 let n_leader = self.logical_row_count_u32(leader)?;
2299 if n_leader == 0 {
2300 return self.create_empty_buffer(out_schema);
2301 }
2302
2303 let leader_metadata = self.wcoj_clique_metadata_recorded_u32_inner(
2304 k,
2305 &edges,
2306 leader_edge_idx,
2307 launch_stream,
2308 entry_label,
2309 )?;
2310 let leader_work_total = u32::try_from(leader_metadata.total).map_err(|_| {
2311 XlogError::Kernel(format!(
2312 "{}: leader metadata total {} exceeds u32 kernel surface",
2313 entry_label, leader_metadata.total
2314 ))
2315 })?;
2316 if leader_work_total != n_leader {
2317 return Err(XlogError::Kernel(format!(
2318 "{}: leader metadata total {} does not match leader row count {}",
2319 entry_label, leader_work_total, n_leader
2320 )));
2321 }
2322 if leader_metadata.key_count == 0 {
2323 return self.create_empty_buffer(out_schema);
2324 }
2325
2326 let mut edge_col0_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
2330 let mut edge_col1_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
2331 let mut edge_n_host: Vec<u32> = Vec::with_capacity(expected_edges);
2332 for buf in edges.iter() {
2333 let col0 = buf.column(0).ok_or_else(|| {
2334 XlogError::Kernel(format!("{}: edge column 0 missing", entry_label))
2335 })?;
2336 let col1 = buf.column(1).ok_or_else(|| {
2337 XlogError::Kernel(format!("{}: edge column 1 missing", entry_label))
2338 })?;
2339 edge_col0_ptrs.push(*col0.device_ptr());
2340 edge_col1_ptrs.push(*col1.device_ptr());
2341 edge_n_host.push(self.logical_row_count_u32(buf)?);
2342 }
2343 let mut d_edge_col0 = self.memory.alloc::<u64>(expected_edges)?;
2344 let mut d_edge_col1 = self.memory.alloc::<u64>(expected_edges)?;
2345 let mut d_edge_n = self.memory.alloc::<u32>(expected_edges)?;
2346 self.htod_launch_metadata_sync_copy_into(&edge_col0_ptrs, &mut d_edge_col0)
2347 .map_err(|e| {
2348 XlogError::Kernel(format!(
2349 "{}: htod edge_col0_ptrs failed: {}",
2350 entry_label, e
2351 ))
2352 })?;
2353 self.htod_launch_metadata_sync_copy_into(&edge_col1_ptrs, &mut d_edge_col1)
2354 .map_err(|e| {
2355 XlogError::Kernel(format!(
2356 "{}: htod edge_col1_ptrs failed: {}",
2357 entry_label, e
2358 ))
2359 })?;
2360 self.htod_launch_metadata_sync_copy_into(&edge_n_host, &mut d_edge_n)
2361 .map_err(|e| {
2362 XlogError::Kernel(format!("{}: htod edge_n failed: {}", entry_label, e))
2363 })?;
2364 let mut d_edge_order = self.memory.alloc::<u8>(expected_edges)?;
2365 self.htod_launch_metadata_sync_copy_into(edge_order, &mut d_edge_order)
2366 .map_err(|e| {
2367 XlogError::Kernel(format!("{}: htod edge_order failed: {}", entry_label, e))
2368 })?;
2369 let mut d_iteration_order = self.memory.alloc::<u8>(k)?;
2370 self.htod_launch_metadata_sync_copy_into(iteration_order, &mut d_iteration_order)
2371 .map_err(|e| {
2372 XlogError::Kernel(format!(
2373 "{}: htod iteration_order failed: {}",
2374 entry_label, e
2375 ))
2376 })?;
2377
2378 let mut row_counts = self
2382 .memory()
2383 .alloc::<u8>(n_leader as usize * std::mem::size_of::<u32>())?;
2384 self.device()
2385 .inner()
2386 .memset_zeros(&mut row_counts)
2387 .map_err(|e| {
2388 XlogError::Kernel(format!("{}: zero row counts failed: {}", entry_label, e))
2389 })?;
2390
2391 let block_work_unit = crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT;
2392 let grid = leader_work_total.div_ceil(block_work_unit);
2393 let mut rec = LaunchRecorder::new_strict(launch_stream);
2394 for buf in edges.iter() {
2395 rec.read(buf.num_rows_device());
2396 rec.read_column(buf.column(0).expect("validated"));
2397 rec.read_column(buf.column(1).expect("validated"));
2398 }
2399 rec.read(&d_edge_col0);
2400 rec.read(&d_edge_col1);
2401 rec.read(&d_edge_n);
2402 rec.read(&d_edge_order);
2403 rec.read(&d_iteration_order);
2404 rec.read(&leader_metadata.unique_keys);
2405 rec.read(&leader_metadata.fan_out);
2406 rec.read(&leader_metadata.prefix_sum);
2407 rec.write(&row_counts);
2408 rec.preflight(runtime)
2409 .map_err(|e| XlogError::Kernel(format!("{}: preflight failed: {}", entry_label, e)))?;
2410
2411 let kernel_name = match k {
2412 5 => wcoj_kernels::WCOJ_CLIQUE5_GROUPBY_ROOT_COUNT_HG_U32,
2413 _ => wcoj_kernels::WCOJ_CLIQUE6_GROUPBY_ROOT_COUNT_HG_U32,
2414 };
2415 let kernel = self
2416 .device()
2417 .inner()
2418 .get_func(WCOJ_MODULE, kernel_name)
2419 .ok_or_else(|| {
2420 XlogError::Kernel(format!(
2421 "{}: kernel '{}' not found",
2422 entry_label, kernel_name
2423 ))
2424 })?;
2425 unsafe {
2444 let mut params: Vec<*mut c_void> = vec![
2445 (&d_edge_col0).as_kernel_param(),
2446 (&d_edge_col1).as_kernel_param(),
2447 (&d_edge_n).as_kernel_param(),
2448 leader_edge_idx.as_kernel_param(),
2449 (&d_edge_order).as_kernel_param(),
2450 (&d_iteration_order).as_kernel_param(),
2451 n_leader.as_kernel_param(),
2452 (&leader_metadata.unique_keys).as_kernel_param(),
2453 (&leader_metadata.fan_out).as_kernel_param(),
2454 (&leader_metadata.prefix_sum).as_kernel_param(),
2455 leader_metadata.key_count.as_kernel_param(),
2456 block_work_unit.as_kernel_param(),
2457 (&row_counts).as_kernel_param(),
2458 ];
2459 kernel
2460 .clone()
2461 .launch_on_stream(
2462 &cu_stream,
2463 LaunchConfig {
2464 grid_dim: (grid, 1, 1),
2465 block_dim: (BLOCK_SIZE, 1, 1),
2466 shared_mem_bytes: 0,
2467 },
2468 &mut params,
2469 )
2470 .map_err(|e| {
2471 XlogError::Kernel(format!(
2472 "{}: groupby-count launch failed: {}",
2473 entry_label, e
2474 ))
2475 })?;
2476 }
2477 rec.commit(runtime)
2478 .map_err(|e| XlogError::Kernel(format!("{}: commit failed: {}", entry_label, e)))?;
2479
2480 let root_src = match leader.column(0).expect("validated") {
2485 CudaColumn::Owned(slice) => slice,
2486 _ => {
2487 return Err(XlogError::Kernel(format!(
2488 "{}: leader.col0 must be an owned CudaColumn",
2489 entry_label
2490 )))
2491 }
2492 };
2493 let root_copy = self
2494 .memory()
2495 .alloc::<u8>(n_leader as usize * std::mem::size_of::<u32>())?;
2496 unsafe {
2500 let res = sys::cuMemcpyDtoD_v2(
2501 *root_copy.device_ptr(),
2502 *root_src.device_ptr(),
2503 n_leader as usize * std::mem::size_of::<u32>(),
2504 );
2505 if res != sys::cudaError_enum::CUDA_SUCCESS {
2506 return Err(XlogError::Kernel(format!(
2507 "{}: copy root column failed: {:?}",
2508 entry_label, res
2509 )));
2510 }
2511 }
2512 let mut d_num_rows = self.memory().alloc::<u32>(1)?;
2513 self.device()
2514 .inner()
2515 .dtod_copy(leader.num_rows_device(), &mut d_num_rows)
2516 .map_err(|e| {
2517 XlogError::Kernel(format!("{}: copy row count failed: {}", entry_label, e))
2518 })?;
2519 let staging_schema = Schema::new(vec![
2520 ("root".to_string(), w_type),
2521 ("count".to_string(), ScalarType::U32),
2522 ]);
2523 let staging = CudaBuffer::from_columns_with_host_count(
2524 vec![root_copy.into(), row_counts.into()],
2525 n_leader as u64,
2526 d_num_rows,
2527 staging_schema,
2528 n_leader,
2529 );
2530
2531 let mask = self.compare_const_mask_recorded::<u32>(
2534 &staging,
2535 1,
2536 0u32,
2537 crate::CompareOp::Gt,
2538 launch_stream,
2539 )?;
2540 let compacted =
2541 self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
2542 self.groupby_multi_agg_recorded(
2543 &compacted,
2544 &[0],
2545 &[(1, xlog_core::AggOp::Sum)],
2546 launch_stream,
2547 )
2548 }
2549
2550 pub fn wcoj_clique5_groupby_root_count_u32_recorded_planned(
2554 &self,
2555 edges: &[&CudaBuffer; 10],
2556 leader_edge_idx: u32,
2557 edge_order: &[u8],
2558 iteration_order: &[u8],
2559 launch_stream: StreamId,
2560 ) -> Result<CudaBuffer> {
2561 self.wcoj_clique_groupby_root_count_recorded_inner(
2562 5,
2563 edges,
2564 leader_edge_idx,
2565 edge_order,
2566 iteration_order,
2567 launch_stream,
2568 "wcoj_clique5_groupby_root_count_u32_recorded_planned",
2569 )
2570 }
2571
2572 pub fn wcoj_clique6_groupby_root_count_u32_recorded_planned(
2576 &self,
2577 edges: &[&CudaBuffer; 15],
2578 leader_edge_idx: u32,
2579 edge_order: &[u8],
2580 iteration_order: &[u8],
2581 launch_stream: StreamId,
2582 ) -> Result<CudaBuffer> {
2583 self.wcoj_clique_groupby_root_count_recorded_inner(
2584 6,
2585 edges,
2586 leader_edge_idx,
2587 edge_order,
2588 iteration_order,
2589 launch_stream,
2590 "wcoj_clique6_groupby_root_count_u32_recorded_planned",
2591 )
2592 }
2593}
2594
2595fn validate_clique_u8_permutation(
2596 values: &[u8],
2597 len: usize,
2598 label: &str,
2599 entry_label: &str,
2600) -> Result<()> {
2601 if values.len() != len {
2602 return Err(XlogError::Kernel(format!(
2603 "{}: {} length {} must equal {}",
2604 entry_label,
2605 label,
2606 values.len(),
2607 len
2608 )));
2609 }
2610 let mut seen = vec![false; len];
2611 for &value in values {
2612 let idx = usize::from(value);
2613 if idx >= len {
2614 return Err(XlogError::Kernel(format!(
2615 "{}: {} value {} out of range 0..{}",
2616 entry_label, label, value, len
2617 )));
2618 }
2619 if seen[idx] {
2620 return Err(XlogError::Kernel(format!(
2621 "{}: {} duplicates value {}",
2622 entry_label, label, value
2623 )));
2624 }
2625 seen[idx] = true;
2626 }
2627 Ok(())
2628}