Skip to main content

xlog_cuda/provider/
wcoj.rs

1//! v0.6.2 GPU 3-way Worst-Case Optimal Join — provider entries.
2//!
3//! Public methods (parallel u32 and u64 entries — see
4//! [`CudaKernelProvider::wcoj_triangle_u32_recorded`] and
5//! [`CudaKernelProvider::wcoj_triangle_u64_recorded`]):
6//!
7//!   * **U32 / Symbol** entry takes 2-column inputs whose columns
8//!     may be [`xlog_core::ScalarType::U32`] or
9//!     [`xlog_core::ScalarType::Symbol`] (both share the same
10//!     4-byte physical layout). It routes through the
11//!     histogram-guided block-slice triangle path.
12//!   * **U64** entry takes 2-column inputs whose columns are
13//!     [`xlog_core::ScalarType::U64`] only. Backed by parallel
14//!     `_u64` count + materialize kernels in `wcoj.cu`; counters
15//!     and the `wcoj_compute_total` reducer are reused unchanged
16//!     (they're bounded by `u32::MAX` rows).
17//!   * Mixed-width inputs in the same triangle are rejected at
18//!     the provider level — each entry's schema guard requires
19//!     all three relations match its width.
20//!   * **Sorted, deduped inputs.** Caller-supplied:
21//!     - `e_xy` lex-sorted+deduped by (X, Y),
22//!     - `e_yz` lex-sorted+deduped by (Y, Z),
23//!     - `e_xz` lex-sorted+deduped by (X, Z).
24//!       Physical layout construction is a separate slice — this
25//!       entry assumes the caller has already arranged input layout.
26//!   * **Two-phase count → device-scan → materialize.** Mirrors
27//!     SRDatalog (Sun et al., arXiv 2604.20073) Section 4's
28//!     deterministic two-phase pipeline. Row counts are
29//!     prefix-summed on device; the only host visit between the
30//!     two phases is a single 4-byte `dtoh_scalar_untracked` of
31//!     the inclusive total (sanctioned metadata read, exempt from
32//!     the strict deterministic-D2H gate).
33//!   * **Strict [`LaunchRecorder`] discipline.** Two recorders
34//!     run sequentially on the caller-supplied launch stream:
35//!     1. count+scan recorder: reads `e_xy` / `e_yz` / `e_xz`
36//!        columns + their `d_num_rows`; writes `count_buf`,
37//!        `offsets_buf`, `d_total`. Spans the count kernel,
38//!        the dtod copy `count_buf → offsets_buf`, the
39//!        device-side prefix-sum on `offsets_buf`, and
40//!        `wcoj_compute_total`.
41//!     2. materialize recorder: reads same inputs + `offsets_buf`;
42//!        writes the three output columns + output `d_num_rows`.
43//!   * **Output deterministic and lex-sorted by (X, Y, Z).** Locked
44//!     by [`tests/test_wcoj_triangle_u32.rs`].
45//!   * **Set semantics on deduped input.** If the caller violates
46//!     the dedup contract on an input, the kernel may emit
47//!     duplicates; the test suite documents this as caller
48//!     responsibility.
49//!
50use std::ffi::c_void;
51use std::time::Instant;
52
53use cudarc::driver::sys;
54use xlog_core::{Result, ScalarType, Schema, XlogError};
55
56use super::{wcoj_kernels, CudaKernelProvider, WCOJ_MODULE};
57use crate::device_runtime::StreamId;
58use crate::launch::LaunchRecorder;
59use crate::memory::{CudaColumn, TrackedCudaSlice};
60use crate::wcoj_metadata::WcojRelationMetadata;
61use crate::CudaBuffer;
62use crate::{AsKernelParam, LaunchAsync, LaunchConfig};
63
64const BLOCK_SIZE: u32 = 256;
65
66fn column_u32(input: &CudaBuffer, col_idx: usize) -> Result<&TrackedCudaSlice<u32>> {
67    let col = input.column(col_idx).ok_or_else(|| {
68        XlogError::Kernel(format!(
69            "wcoj_layout_u32_recorded: column {col_idx} not found"
70        ))
71    })?;
72    match col {
73        CudaColumn::Owned(slice) => unsafe {
74            Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u32>))
75        },
76        _ => Err(XlogError::Kernel(
77            "wcoj_layout_u32_recorded: input column must be owned".to_string(),
78        )),
79    }
80}
81
82fn column_u64(input: &CudaBuffer, col_idx: usize) -> Result<&TrackedCudaSlice<u64>> {
83    let col = input.column(col_idx).ok_or_else(|| {
84        XlogError::Kernel(format!(
85            "wcoj_layout_u64_recorded: column {col_idx} not found"
86        ))
87    })?;
88    match col {
89        CudaColumn::Owned(slice) => unsafe {
90            Ok(&*(slice as *const TrackedCudaSlice<u8> as *const TrackedCudaSlice<u64>))
91        },
92        _ => Err(XlogError::Kernel(
93            "wcoj_layout_u64_recorded: input column must be owned".to_string(),
94        )),
95    }
96}
97
98impl CudaKernelProvider {
99    /// Build the sorted+deduped WCOJ physical layout for a 2-column
100    /// u32 relation.
101    ///
102    /// Output: a 2-column u32 [`CudaBuffer`] sorted lexicographically
103    /// by `(col0, col1)` and deduplicated. The output is suitable for
104    /// direct consumption by [`Self::wcoj_triangle_u32_recorded`] in
105    /// any of the three slot positions (`e_xy`, `e_yz`, `e_xz`); the
106    /// caller chooses which logical relation each input represents
107    /// by the slot it passes the layout into.
108    ///
109    /// Fast-path: if the input is already strictly lex-sorted and
110    /// full-row unique, a recorded checker proves that property and
111    /// the method returns a recorded device-side clone. Otherwise it
112    /// falls back to [`Self::dedup_full_row_recorded`], which invokes
113    /// [`Self::sort_recorded`] (typed multi-column radix sort on
114    /// `(col0, col1)`) followed by an on-stream
115    /// `mark_unique_full_row_bytewise` mask + counted compaction.
116    /// Both paths are launch-recorder disciplined and preserve the
117    /// sorted+deduped output contract.
118    ///
119    /// This entry exists for two reasons:
120    ///   1. Narrowing the input contract to 2-column u32 lets the
121    ///      WCOJ-specific call site fail fast with a clear error
122    ///      rather than the more generic dedup error if the caller
123    ///      passes the wrong arity / type.
124    ///   2. Naming the WCOJ pipeline boundary makes downstream
125    ///      callers (planner / executor wiring, cert harness)
126    ///      target the WCOJ-specific layout API rather than the
127    ///      general-purpose dedup primitive — separating concerns
128    ///      that may diverge as the WCOJ stack grows.
129    ///
130    /// # Errors
131    /// * `XlogError::Kernel` if the manager has no runtime
132    ///   (`with_runtime` is required), the input is not 2-column,
133    ///   any column is not [`ScalarType::U32`] or
134    ///   [`ScalarType::Symbol`] (both share the same 4-byte
135    ///   physical layout, so the underlying sort/dedup primitives
136    ///   handle either with no kernel changes), or any inner
137    ///   sort/dedup primitive fails.
138    pub fn wcoj_layout_u32_recorded(
139        &self,
140        input: &CudaBuffer,
141        launch_stream: StreamId,
142    ) -> Result<CudaBuffer> {
143        // Manager must be runtime-backed — the inner
144        // dedup_full_row_recorded enforces the same constraint, but
145        // checking here gives a WCOJ-specific error message.
146        if self.memory().runtime().is_none() {
147            return Err(XlogError::Kernel(
148                "wcoj_layout_u32_recorded requires a runtime-backed \
149                 GpuMemoryManager (constructed via with_runtime)"
150                    .to_string(),
151            ));
152        }
153        // 2-column 4-byte-key contract: U32 or Symbol per column.
154        // Both share the same 4-byte physical layout
155        // (`ScalarType::size_bytes` == 4); the sort+dedup
156        // primitives we delegate to already accept either.
157        if input.arity() != 2 {
158            return Err(XlogError::Kernel(format!(
159                "wcoj_layout_u32_recorded: input must be 2-column, got arity {}",
160                input.arity()
161            )));
162        }
163        for col_idx in 0..2 {
164            let ty = input.schema.column_type(col_idx).ok_or_else(|| {
165                XlogError::Kernel(format!(
166                    "wcoj_layout_u32_recorded: column {} type missing",
167                    col_idx
168                ))
169            })?;
170            if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
171                return Err(XlogError::Kernel(format!(
172                    "wcoj_layout_u32_recorded: column {} must be U32 or Symbol, got {:?}",
173                    col_idx, ty
174                )));
175            }
176        }
177        // Layout fast-path: if the input is already strictly
178        // lex-sorted AND full-row unique, we can skip the
179        // (expensive) sort + mark-unique + compact pipeline
180        // and emit a recorded device-side clone. The phase
181        // report (docs/evidence/2026-05-01-wcoj-bench-baseline/phase-timing-report.md)
182        // measured layout at 91-97% of WCOJ adaptive dispatch
183        // wall clock; this branch is the targeted overhead
184        // reduction.
185        match self.try_wcoj_layout_fast_path_u32(input, launch_stream) {
186            Ok(Some(out)) => {
187                self.record_wcoj_layout_fast_path_hit();
188                return Ok(out);
189            }
190            Ok(None) => {
191                // Empty input handled by dedup_full_row_recorded
192                // (which has its own n==0 short-circuit returning
193                // create_empty_buffer). Fall through.
194            }
195            Err(_) => {
196                // Checker failed unexpectedly. Fall through to
197                // the safe path; correctness is preserved.
198            }
199        }
200        // Fall through: the input wasn't proven sorted+unique.
201        // Delegate to the existing typed sort + full-row dedup.
202        // Both primitives are fully recorder-disciplined; the
203        // resulting CudaBuffer is sorted lex by (col0, col1) and
204        // deduplicated.
205        self.dedup_full_row_recorded(input, launch_stream)
206    }
207
208    /// Generic full-row WCOJ layout sort+dedup for relations of any
209    /// arity ≥ 2 in the 4-byte width-class (`U32`, `Symbol`,
210    /// mixable within the class).
211    ///
212    /// **Design**: this entry point leaves the existing arity-2
213    /// [`Self::wcoj_layout_u32_recorded`] is **unchanged** for
214    /// the triangle / 4-cycle / project-then-layout callers — it
215    /// retains its arity-2-specific fast-path branch. This generic
216    /// surface delegates straight to
217    /// [`Self::dedup_full_row_recorded`] for any arity ≥ 2.
218    ///
219    /// **Validation order** (`runtime → arity ≥ 2 → per-column
220    /// width-class → delegate`):
221    ///   1. Manager runtime-backed.
222    ///   2. `input.arity() >= 2`.
223    ///   3. Every column type ∈ `{U32, Symbol}` (4-byte
224    ///      width-class). Mixed `U32` + `Symbol` within one
225    ///      relation is permitted; `U64` is rejected — use
226    ///      [`Self::wcoj_layout_sort_u64_recorded`] instead.
227    ///   4. Delegate to `dedup_full_row_recorded(input, launch_stream)`.
228    ///
229    /// Stream resolution is owned by `dedup_full_row_recorded`
230    /// and is NOT in this entry point's validation list. The
231    /// `n == 0` short-circuit (returns
232    /// `create_empty_buffer(input.schema().clone())`) is also
233    /// owned downstream — single source of truth, no duplicated
234    /// empty-buffer semantics.
235    ///
236    /// **Composition**: `dedup_full_row_recorded` only — there
237    /// is no fast-path branch for arity ≥ 3 in this generic
238    /// full-row layout-sort accessor (the
239    /// existing arity-2 fast-path stays untouched and reachable
240    /// only via `wcoj_layout_u32_recorded`).
241    ///
242    /// # Errors
243    /// * `XlogError::Kernel` if the manager has no runtime
244    ///   (`with_runtime` is required).
245    /// * `XlogError::Kernel` if `input.arity() < 2`.
246    /// * `XlogError::Kernel` if any column is not `U32` /
247    ///   `Symbol`.
248    /// * Whatever `dedup_full_row_recorded` returns for
249    ///   stream-resolution / kernel-launch failures.
250    pub fn wcoj_layout_sort_u32_recorded(
251        &self,
252        input: &CudaBuffer,
253        launch_stream: StreamId,
254    ) -> Result<CudaBuffer> {
255        self.record_wcoj_layout_sort_invocation();
256        if self.memory().runtime().is_none() {
257            return Err(XlogError::Kernel(
258                "wcoj_layout_sort_u32_recorded requires a runtime-backed \
259                 GpuMemoryManager (constructed via with_runtime)"
260                    .to_string(),
261            ));
262        }
263        if input.arity() < 2 {
264            return Err(XlogError::Kernel(format!(
265                "wcoj_layout_sort_u32_recorded: input must have arity >= 2, got {}",
266                input.arity()
267            )));
268        }
269        for col_idx in 0..input.arity() {
270            let ty = input.schema.column_type(col_idx).ok_or_else(|| {
271                XlogError::Kernel(format!(
272                    "wcoj_layout_sort_u32_recorded: column {} type missing",
273                    col_idx
274                ))
275            })?;
276            if !matches!(ty, ScalarType::U32 | ScalarType::Symbol) {
277                return Err(XlogError::Kernel(format!(
278                    "wcoj_layout_sort_u32_recorded: column {} must be U32 or Symbol \
279                     (4-byte width-class), got {:?}",
280                    col_idx, ty
281                )));
282            }
283        }
284        self.dedup_full_row_recorded(input, launch_stream)
285    }
286
287    /// Evaluate `tri(X, Y, Z) :- e_xy(X,Y), e_yz(Y,Z), e_xz(X,Z)`
288    /// on already-sorted, already-deduped binary 4-byte-key
289    /// relations. See module-level docs for the full contract.
290    ///
291    /// Each column may be [`ScalarType::U32`] or
292    /// [`ScalarType::Symbol`] — both share the same 4-byte
293    /// physical layout, so the kernel reads the bits unchanged.
294    /// Cross-relation type compatibility (e.g., that Y is the
295    /// same type in `e_xy.col1` and `e_yz.col0`) is the
296    /// planner's responsibility upstream; this entry only
297    /// enforces width.
298    ///
299    /// The output schema preserves per-head-position scalar types
300    /// from the inputs:
301    ///   * `out.col0` = `e_xy.col0` type (X)
302    ///   * `out.col1` = `e_xy.col1` type (Y)
303    ///   * `out.col2` = `e_yz.col1` type (Z)
304    ///
305    /// # Errors
306    /// * `XlogError::Kernel` if the manager has no runtime
307    ///   (`with_runtime` is required), the launch stream does
308    ///   not resolve, an input is not 2-column with U32/Symbol
309    ///   columns, or any kernel launch fails.
310    pub fn wcoj_triangle_u32_recorded(
311        &self,
312        e_xy: &CudaBuffer,
313        e_yz: &CudaBuffer,
314        e_xz: &CudaBuffer,
315        launch_stream: StreamId,
316    ) -> Result<CudaBuffer> {
317        self.wcoj_triangle_hg_u32_recorded(
318            e_xy,
319            e_yz,
320            e_xz,
321            crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
322            launch_stream,
323        )
324    }
325
326    /// Evaluate `cyc4(W, X, Y, Z) :- e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)`
327    /// on already-sorted, already-deduped binary 4-byte-key
328    /// relations. Structural mirror of [`Self::wcoj_triangle_u32_recorded`]
329    /// for the 4-cycle case; see that entry's contract and the
330    /// module-level docs for the shared two-phase recorder
331    /// discipline.
332    ///
333    /// Each column may be [`ScalarType::U32`] or
334    /// [`ScalarType::Symbol`] — both share the same 4-byte
335    /// physical layout, so the kernel reads the bits unchanged.
336    /// Cross-relation type compatibility (e.g., that X is the
337    /// same type in `e1.col1` and `e2.col0`) is the planner's
338    /// responsibility upstream; this entry only enforces width.
339    ///
340    /// The 4-cycle slot order is `[e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)]`.
341    /// The output schema preserves per-head-position scalar types
342    /// from the inputs:
343    ///   * `out.col0` = `e1.col0` type (W)
344    ///   * `out.col1` = `e1.col1` type (X)
345    ///   * `out.col2` = `e2.col1` type (Y)
346    ///   * `out.col3` = `e3.col1` type (Z)
347    ///
348    /// # Errors
349    /// * `XlogError::Kernel` if the manager has no runtime
350    ///   (`with_runtime` is required), the launch stream does
351    ///   not resolve, an input is not 2-column with U32/Symbol
352    ///   columns, or any kernel launch fails.
353    pub fn wcoj_4cycle_u32_recorded(
354        &self,
355        e1: &CudaBuffer,
356        e2: &CudaBuffer,
357        e3: &CudaBuffer,
358        e4: &CudaBuffer,
359        launch_stream: StreamId,
360    ) -> Result<CudaBuffer> {
361        self.wcoj_4cycle_hg_u32_recorded(
362            e1,
363            e2,
364            e3,
365            e4,
366            crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
367            launch_stream,
368        )
369    }
370
371    fn logical_row_count_u32(&self, buf: &CudaBuffer) -> Result<u32> {
372        if let Some(c) = buf.cached_row_count() {
373            return Ok(c);
374        }
375        self.dtoh_scalar_untracked::<u32>(buf.num_rows_device(), 0)
376    }
377
378    /// Build the sorted+deduped WCOJ physical layout for a 2-column
379    /// U64 relation. Output: a 2-column U64 [`CudaBuffer`] sorted
380    /// lexicographically by `(col0, col1)` and deduplicated. Suitable
381    /// for direct consumption by [`Self::wcoj_triangle_u64_recorded`].
382    ///
383    /// Composition mirrors [`Self::wcoj_layout_u32_recorded`]:
384    /// already sorted+unique inputs take the recorded fast-path clone;
385    /// other inputs fall back to [`Self::dedup_full_row_recorded`],
386    /// whose U64 `sort_recorded` path ports the legacy `sort()` hi/lo
387    /// radix-pass strategy into recorded launch discipline.
388    ///
389    /// # Errors
390    /// * `XlogError::Kernel` if the manager has no runtime, the
391    ///   input is not 2-column, any column is not
392    ///   [`ScalarType::U64`], or any inner sort/dedup primitive
393    ///   fails.
394    pub fn wcoj_layout_u64_recorded(
395        &self,
396        input: &CudaBuffer,
397        launch_stream: StreamId,
398    ) -> Result<CudaBuffer> {
399        if self.memory().runtime().is_none() {
400            return Err(XlogError::Kernel(
401                "wcoj_layout_u64_recorded requires a runtime-backed \
402                 GpuMemoryManager (constructed via with_runtime)"
403                    .to_string(),
404            ));
405        }
406        if input.arity() != 2 {
407            return Err(XlogError::Kernel(format!(
408                "wcoj_layout_u64_recorded: input must be 2-column, got arity {}",
409                input.arity()
410            )));
411        }
412        for col_idx in 0..2 {
413            let ty = input.schema.column_type(col_idx).ok_or_else(|| {
414                XlogError::Kernel(format!(
415                    "wcoj_layout_u64_recorded: column {} type missing",
416                    col_idx
417                ))
418            })?;
419            if !matches!(ty, ScalarType::U64) {
420                return Err(XlogError::Kernel(format!(
421                    "wcoj_layout_u64_recorded: column {} must be U64, got {:?}",
422                    col_idx, ty
423                )));
424            }
425        }
426        // Fast-path: see u32 entry for rationale + measurement
427        // basis. Strictly lex-sorted AND full-row unique inputs
428        // skip dedup_full_row_recorded.
429        if let Ok(Some(out)) = self.try_wcoj_layout_fast_path_u64(input, launch_stream) {
430            self.record_wcoj_layout_fast_path_hit();
431            return Ok(out);
432        }
433        // dedup_full_row_recorded internally invokes sort_recorded
434        // (U64-aware after commit 1) and the bytewise mask kernel
435        // (already width-generic via col_sizes upload).
436        self.dedup_full_row_recorded(input, launch_stream)
437    }
438
439    /// Generic full-row WCOJ layout sort+dedup for relations of any
440    /// arity ≥ 2 in the 8-byte width-class (`U64` only).
441    ///
442    /// **Design**: this entry point leaves the existing arity-2
443    /// [`Self::wcoj_layout_u64_recorded`] is **unchanged** for
444    /// the existing 2-column callers — it retains its
445    /// arity-2-specific fast-path branch. This generic surface
446    /// delegates straight to [`Self::dedup_full_row_recorded`]
447    /// for any arity ≥ 2.
448    ///
449    /// Mirrors [`Self::wcoj_layout_sort_u32_recorded`]'s contract
450    /// at the 8-byte width-class — see that entry's doc for the
451    /// validation order, stream-resolution ownership, n==0
452    /// semantics, and "no fast-path for arity ≥ 3" lock.
453    ///
454    /// **Validation order** (`runtime → arity ≥ 2 → per-column
455    /// width-class → delegate`):
456    ///   1. Manager runtime-backed.
457    ///   2. `input.arity() >= 2`.
458    ///   3. Every column type = `U64`. `U32` / `Symbol` are
459    ///      rejected — use [`Self::wcoj_layout_sort_u32_recorded`]
460    ///      instead.
461    ///   4. Delegate to `dedup_full_row_recorded(input, launch_stream)`.
462    ///
463    /// # Errors
464    /// * `XlogError::Kernel` if the manager has no runtime
465    ///   (`with_runtime` is required).
466    /// * `XlogError::Kernel` if `input.arity() < 2`.
467    /// * `XlogError::Kernel` if any column is not `U64`.
468    /// * Whatever `dedup_full_row_recorded` returns for
469    ///   stream-resolution / kernel-launch failures.
470    pub fn wcoj_layout_sort_u64_recorded(
471        &self,
472        input: &CudaBuffer,
473        launch_stream: StreamId,
474    ) -> Result<CudaBuffer> {
475        self.record_wcoj_layout_sort_invocation();
476        if self.memory().runtime().is_none() {
477            return Err(XlogError::Kernel(
478                "wcoj_layout_sort_u64_recorded requires a runtime-backed \
479                 GpuMemoryManager (constructed via with_runtime)"
480                    .to_string(),
481            ));
482        }
483        if input.arity() < 2 {
484            return Err(XlogError::Kernel(format!(
485                "wcoj_layout_sort_u64_recorded: input must have arity >= 2, got {}",
486                input.arity()
487            )));
488        }
489        for col_idx in 0..input.arity() {
490            let ty = input.schema.column_type(col_idx).ok_or_else(|| {
491                XlogError::Kernel(format!(
492                    "wcoj_layout_sort_u64_recorded: column {} type missing",
493                    col_idx
494                ))
495            })?;
496            if !matches!(ty, ScalarType::U64) {
497                return Err(XlogError::Kernel(format!(
498                    "wcoj_layout_sort_u64_recorded: column {} must be U64 \
499                     (8-byte width-class), got {:?}",
500                    col_idx, ty
501                )));
502            }
503        }
504        self.dedup_full_row_recorded(input, launch_stream)
505    }
506
507    /// Evaluate `tri(X, Y, Z) :- e_xy(X,Y), e_yz(Y,Z), e_xz(X,Z)`
508    /// on already-sorted, already-deduped binary U64 relations.
509    /// Mirrors [`Self::wcoj_triangle_u32_recorded`]'s contract;
510    /// the only differences are the 8-byte join-key reads/writes
511    /// and the U64-specific count/materialize kernels. Counters
512    /// and the total reducer remain u32.
513    ///
514    /// # Errors
515    /// * `XlogError::Kernel` if the manager has no runtime, the
516    ///   launch stream does not resolve, an input is not 2-column
517    ///   with U64 columns, or any kernel launch fails.
518    pub fn wcoj_triangle_u64_recorded(
519        &self,
520        e_xy: &CudaBuffer,
521        e_yz: &CudaBuffer,
522        e_xz: &CudaBuffer,
523        launch_stream: StreamId,
524    ) -> Result<CudaBuffer> {
525        self.wcoj_triangle_hg_u64_recorded(
526            e_xy,
527            e_yz,
528            e_xz,
529            crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
530            launch_stream,
531        )
532    }
533
534    /// Evaluate
535    /// `cycle4(W, X, Y, Z) :- e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)`
536    /// on already-sorted, already-deduped binary U64 relations.
537    /// Mirrors [`Self::wcoj_4cycle_u32_recorded`]'s contract; the
538    /// only differences are the 8-byte join-key reads/writes and
539    /// the U64 HG planner/count/materialize kernels. Counters and
540    /// the total reducer remain u32 (bounded by the upstream host-
541    /// side row-count guard).
542    ///
543    /// 4-cycle slot order:
544    /// `[e1(W,X), e2(X,Y), e3(Y,Z), e4(Z,W)]`.
545    ///
546    /// # Errors
547    /// * `XlogError::Kernel` if the manager has no runtime, the
548    ///   launch stream does not resolve, an input is not 2-column
549    ///   with U64 columns, or any kernel launch fails.
550    pub fn wcoj_4cycle_u64_recorded(
551        &self,
552        e1: &CudaBuffer,
553        e2: &CudaBuffer,
554        e3: &CudaBuffer,
555        e4: &CudaBuffer,
556        launch_stream: StreamId,
557    ) -> Result<CudaBuffer> {
558        self.wcoj_4cycle_hg_u64_recorded(
559            e1,
560            e2,
561            e3,
562            e4,
563            crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT,
564            launch_stream,
565        )
566    }
567}
568
569// ===============================================================
570// v0.6.2 — WCOJ layout fast-path implementation.
571//
572// Goal: when an input is already strictly lex-sorted AND full-row
573// unique, skip `dedup_full_row_recorded` (sort + mark-unique +
574// compact) and emit a recorded device-side clone instead. The
575// existing layout API surface is unchanged; the fast-path is a
576// purely additive optimization with proof-based correctness.
577//
578// Flow:
579//   1. Validate (caller already did this).
580//   2. Resolve LOGICAL row count via `logical_row_count_u32`
581//      (NOT `input.num_rows()` — that returns row_cap on
582//      compacted buffers).
583//   3. n == 0  → return Ok(None); caller falls through to the
584//      existing `dedup_full_row_recorded` n==0 short-circuit
585//      (`create_empty_buffer`). We don't mint an empty clone
586//      here; we preserve existing semantics exactly.
587//   4. n == 1  → recorded clone (trivially sorted+unique).
588//   5. n >= 2  → launch the checker kernel under a fresh
589//      `LaunchRecorder`; sync; D2H the 4-byte flag.
590//   6. Flag == 1 → recorded clone. Flag == 0 → return
591//      Ok(None); caller falls through to the dedup path.
592//
593// Strict-D2H: the 4-byte flag read uses
594// `dtoh_scalar_untracked::<u32>` (whitelisted by the strict
595// gate, same class as `wcoj_compute_total`'s d_total read).
596// ===============================================================
597
598impl CudaKernelProvider {
599    /// Try to short-circuit a u32/Symbol layout call by proving
600    /// the input is already sorted+unique. Returns:
601    ///   * `Ok(Some(out))` — fast-path hit; `out` is the layout.
602    ///   * `Ok(None)`      — fast-path missed (n==0 or proof
603    ///     failed). Caller falls through to
604    ///     `dedup_full_row_recorded`.
605    ///   * `Err(e)`        — checker pipeline error. Caller
606    ///     treats this as "fall through" to preserve correctness.
607    fn try_wcoj_layout_fast_path_u32(
608        &self,
609        input: &CudaBuffer,
610        launch_stream: StreamId,
611    ) -> Result<Option<CudaBuffer>> {
612        let runtime = self
613            .memory()
614            .runtime()
615            .ok_or_else(|| XlogError::Kernel("wcoj_layout fast-path: no runtime".to_string()))?;
616        let cu_stream = runtime
617            .stream_pool()
618            .resolve(launch_stream)
619            .ok_or_else(|| {
620                XlogError::Kernel("wcoj_layout fast-path: stream resolve".to_string())
621            })?;
622
623        // LOGICAL row count, not row_cap: compacted buffers
624        // (e.g. dedup outputs) have row_cap > logical.
625        let n = self.logical_row_count_u32(input)?;
626        if n == 0 {
627            // Preserve existing semantics: dedup_full_row_recorded's
628            // n==0 path returns create_empty_buffer(schema). Don't
629            // shadow that here.
630            return Ok(None);
631        }
632        if n == 1 {
633            // Trivially sorted+unique; skip the checker entirely.
634            return Ok(Some(self.recorded_clone_2col_4byte(
635                input,
636                n,
637                launch_stream,
638                &cu_stream,
639                runtime,
640            )?));
641        }
642
643        // n >= 2: run the checker. Output flag in u32 (4 bytes).
644        let mut flag_buf = self.memory.alloc::<u32>(1)?;
645
646        let col0 = column_u32(input, 0)?;
647        let col1 = column_u32(input, 1)?;
648
649        // Resolve the kernel before queueing launch-stream work.
650        // If the module lookup fails, `flag_buf` can drop without
651        // racing any in-flight H2D / kernel work.
652        let device = self.device.inner();
653        let kernel = device
654            .get_func(
655                WCOJ_MODULE,
656                wcoj_kernels::WCOJ_LAYOUT_CHECK_SORTED_UNIQUE_U32,
657            )
658            .ok_or_else(|| {
659                XlogError::Kernel("wcoj_layout_check_sorted_unique_u32 kernel not found".into())
660            })?;
661
662        let mut rec = LaunchRecorder::new_strict(launch_stream);
663        rec.read(input.num_rows_device());
664        rec.read_column(input.column(0).expect("col0"));
665        rec.read_column(input.column(1).expect("col1"));
666        rec.write(&flag_buf);
667        rec.preflight(runtime)
668            .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path: preflight {e}")))?;
669
670        // Initialize flag = 1 on stream via cuMemsetD32Async-
671        // equivalent. The simplest portable path is a 4-byte
672        // host->device async copy (sequenced before the kernel
673        // by stream order). Doing it as part of the recorded
674        // window keeps the dealloc-safety chain intact.
675        let one: u32 = 1;
676        let grid = n.div_ceil(BLOCK_SIZE);
677        let queued_result: Result<()> = (|| {
678            self.htod_launch_metadata_async_copy_one(
679                &one,
680                &flag_buf,
681                &cu_stream,
682                "wcoj_layout fast-path flag init",
683            )?;
684
685            // SAFETY: 4-arg signature
686            //   wcoj_layout_check_sorted_unique_u32(
687            //     const u32* col0, const u32* col1, u32 n, u32* flag)
688            unsafe {
689                kernel
690                    .clone()
691                    .launch_on_stream(
692                        &cu_stream,
693                        LaunchConfig {
694                            grid_dim: (grid, 1, 1),
695                            block_dim: (BLOCK_SIZE, 1, 1),
696                            shared_mem_bytes: 0,
697                        },
698                        (col0, col1, n, &mut flag_buf),
699                    )
700                    .map_err(|e| {
701                        XlogError::Kernel(format!(
702                            "wcoj_layout_check_sorted_unique_u32 launch: {e}"
703                        ))
704                    })?;
705            }
706            Ok(())
707        })();
708        if let Err(e) = queued_result {
709            let _ = cu_stream.synchronize();
710            return Err(e);
711        }
712
713        if let Err(e) = rec.commit(runtime) {
714            let _ = cu_stream.synchronize();
715            return Err(XlogError::Kernel(format!(
716                "wcoj_layout fast-path: commit {e}"
717            )));
718        }
719
720        cu_stream
721            .synchronize()
722            .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path: sync {e}")))?;
723
724        let flag_val = self.dtoh_scalar_untracked::<u32>(&flag_buf, 0)?;
725        if flag_val == 1 {
726            Ok(Some(self.recorded_clone_2col_4byte(
727                input,
728                n,
729                launch_stream,
730                &cu_stream,
731                runtime,
732            )?))
733        } else {
734            Ok(None)
735        }
736    }
737
738    /// U64 variant. Mirrors `try_wcoj_layout_fast_path_u32`.
739    fn try_wcoj_layout_fast_path_u64(
740        &self,
741        input: &CudaBuffer,
742        launch_stream: StreamId,
743    ) -> Result<Option<CudaBuffer>> {
744        let runtime = self
745            .memory()
746            .runtime()
747            .ok_or_else(|| XlogError::Kernel("wcoj_layout fast-path: no runtime".to_string()))?;
748        let cu_stream = runtime
749            .stream_pool()
750            .resolve(launch_stream)
751            .ok_or_else(|| {
752                XlogError::Kernel("wcoj_layout fast-path: stream resolve".to_string())
753            })?;
754
755        let n = self.logical_row_count_u32(input)?;
756        if n == 0 {
757            return Ok(None);
758        }
759        if n == 1 {
760            return Ok(Some(self.recorded_clone_2col_8byte(
761                input,
762                n,
763                launch_stream,
764                &cu_stream,
765                runtime,
766            )?));
767        }
768
769        let mut flag_buf = self.memory.alloc::<u32>(1)?;
770        let col0 = column_u64(input, 0)?;
771        let col1 = column_u64(input, 1)?;
772
773        let device = self.device.inner();
774        let kernel = device
775            .get_func(
776                WCOJ_MODULE,
777                wcoj_kernels::WCOJ_LAYOUT_CHECK_SORTED_UNIQUE_U64,
778            )
779            .ok_or_else(|| {
780                XlogError::Kernel("wcoj_layout_check_sorted_unique_u64 kernel not found".into())
781            })?;
782
783        let mut rec = LaunchRecorder::new_strict(launch_stream);
784        rec.read(input.num_rows_device());
785        rec.read_column(input.column(0).expect("col0"));
786        rec.read_column(input.column(1).expect("col1"));
787        rec.write(&flag_buf);
788        rec.preflight(runtime)
789            .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path u64: preflight {e}")))?;
790
791        let one: u32 = 1;
792        let grid = n.div_ceil(BLOCK_SIZE);
793        let queued_result: Result<()> = (|| {
794            self.htod_launch_metadata_async_copy_one(
795                &one,
796                &flag_buf,
797                &cu_stream,
798                "wcoj_layout fast-path u64 flag init",
799            )?;
800
801            unsafe {
802                kernel
803                    .clone()
804                    .launch_on_stream(
805                        &cu_stream,
806                        LaunchConfig {
807                            grid_dim: (grid, 1, 1),
808                            block_dim: (BLOCK_SIZE, 1, 1),
809                            shared_mem_bytes: 0,
810                        },
811                        (col0, col1, n, &mut flag_buf),
812                    )
813                    .map_err(|e| {
814                        XlogError::Kernel(format!(
815                            "wcoj_layout_check_sorted_unique_u64 launch: {e}"
816                        ))
817                    })?;
818            }
819            Ok(())
820        })();
821        if let Err(e) = queued_result {
822            let _ = cu_stream.synchronize();
823            return Err(e);
824        }
825
826        if let Err(e) = rec.commit(runtime) {
827            let _ = cu_stream.synchronize();
828            return Err(XlogError::Kernel(format!(
829                "wcoj_layout fast-path u64: commit {e}"
830            )));
831        }
832
833        cu_stream
834            .synchronize()
835            .map_err(|e| XlogError::Kernel(format!("wcoj_layout fast-path u64: sync {e}")))?;
836
837        let flag_val = self.dtoh_scalar_untracked::<u32>(&flag_buf, 0)?;
838        if flag_val == 1 {
839            Ok(Some(self.recorded_clone_2col_8byte(
840                input,
841                n,
842                launch_stream,
843                &cu_stream,
844                runtime,
845            )?))
846        } else {
847            Ok(None)
848        }
849    }
850
851    /// Recorded device-side clone of a 2-column 4-byte-per-key
852    /// buffer, sized to `n` logical rows. Allocates fresh
853    /// columns + d_num_rows on the runtime allocator and copies
854    /// via `cuMemcpyDtoDAsync_v2` on `launch_stream` under a
855    /// `LaunchRecorder` window. NOT a view: the output buffer
856    /// owns its bytes; input lifetime independence is preserved.
857    fn recorded_clone_2col_4byte(
858        &self,
859        input: &CudaBuffer,
860        n: u32,
861        launch_stream: StreamId,
862        cu_stream: &cudarc::driver::CudaStream,
863        runtime: &std::sync::Arc<crate::device_runtime::XlogDeviceRuntime>,
864    ) -> Result<CudaBuffer> {
865        let bpc = (n as usize) * 4;
866        let out_col0 = self.memory.alloc::<u8>(bpc)?;
867        let out_col1 = self.memory.alloc::<u8>(bpc)?;
868        let out_d_num_rows = self.memory.alloc::<u32>(1)?;
869
870        let src_col0 = input.column(0).expect("col0");
871        let src_col1 = input.column(1).expect("col1");
872
873        let mut rec = LaunchRecorder::new_strict(launch_stream);
874        rec.read(input.num_rows_device());
875        rec.read_column(src_col0);
876        rec.read_column(src_col1);
877        rec.write(&out_col0);
878        rec.write(&out_col1);
879        rec.write(&out_d_num_rows);
880        rec.preflight(runtime)
881            .map_err(|e| XlogError::Kernel(format!("wcoj_layout clone 4B: preflight {e}")))?;
882
883        let queued_result: Result<()> = (|| {
884            unsafe {
885                let r0 = sys::cuMemcpyDtoDAsync_v2(
886                    *out_col0.device_ptr(),
887                    *src_col0.device_ptr(),
888                    bpc,
889                    cu_stream.cu_stream(),
890                );
891                if r0 != sys::cudaError_enum::CUDA_SUCCESS {
892                    return Err(XlogError::Kernel(format!(
893                        "wcoj_layout clone 4B: dtod col0 failed: {r0:?}"
894                    )));
895                }
896                let r1 = sys::cuMemcpyDtoDAsync_v2(
897                    *out_col1.device_ptr(),
898                    *src_col1.device_ptr(),
899                    bpc,
900                    cu_stream.cu_stream(),
901                );
902                if r1 != sys::cudaError_enum::CUDA_SUCCESS {
903                    return Err(XlogError::Kernel(format!(
904                        "wcoj_layout clone 4B: dtod col1 failed: {r1:?}"
905                    )));
906                }
907            }
908            self.htod_launch_metadata_async_copy_one(
909                &n,
910                &out_d_num_rows,
911                cu_stream,
912                "wcoj_layout clone 4B d_num_rows",
913            )?;
914            Ok(())
915        })();
916        if let Err(e) = queued_result {
917            let _ = cu_stream.synchronize();
918            return Err(e);
919        }
920
921        if let Err(e) = rec.commit(runtime) {
922            let _ = cu_stream.synchronize();
923            return Err(XlogError::Kernel(format!(
924                "wcoj_layout clone 4B: commit {e}"
925            )));
926        }
927
928        Ok(CudaBuffer::from_columns_with_host_count(
929            vec![out_col0.into(), out_col1.into()],
930            n as u64,
931            out_d_num_rows,
932            input.schema().clone(),
933            n,
934        ))
935    }
936
937    /// 8-byte-per-key sibling. Same recorder discipline.
938    fn recorded_clone_2col_8byte(
939        &self,
940        input: &CudaBuffer,
941        n: u32,
942        launch_stream: StreamId,
943        cu_stream: &cudarc::driver::CudaStream,
944        runtime: &std::sync::Arc<crate::device_runtime::XlogDeviceRuntime>,
945    ) -> Result<CudaBuffer> {
946        let bpc = (n as usize) * 8;
947        let out_col0 = self.memory.alloc::<u8>(bpc)?;
948        let out_col1 = self.memory.alloc::<u8>(bpc)?;
949        let out_d_num_rows = self.memory.alloc::<u32>(1)?;
950
951        let src_col0 = input.column(0).expect("col0");
952        let src_col1 = input.column(1).expect("col1");
953
954        let mut rec = LaunchRecorder::new_strict(launch_stream);
955        rec.read(input.num_rows_device());
956        rec.read_column(src_col0);
957        rec.read_column(src_col1);
958        rec.write(&out_col0);
959        rec.write(&out_col1);
960        rec.write(&out_d_num_rows);
961        rec.preflight(runtime)
962            .map_err(|e| XlogError::Kernel(format!("wcoj_layout clone 8B: preflight {e}")))?;
963
964        let queued_result: Result<()> = (|| {
965            unsafe {
966                let r0 = sys::cuMemcpyDtoDAsync_v2(
967                    *out_col0.device_ptr(),
968                    *src_col0.device_ptr(),
969                    bpc,
970                    cu_stream.cu_stream(),
971                );
972                if r0 != sys::cudaError_enum::CUDA_SUCCESS {
973                    return Err(XlogError::Kernel(format!(
974                        "wcoj_layout clone 8B: dtod col0 failed: {r0:?}"
975                    )));
976                }
977                let r1 = sys::cuMemcpyDtoDAsync_v2(
978                    *out_col1.device_ptr(),
979                    *src_col1.device_ptr(),
980                    bpc,
981                    cu_stream.cu_stream(),
982                );
983                if r1 != sys::cudaError_enum::CUDA_SUCCESS {
984                    return Err(XlogError::Kernel(format!(
985                        "wcoj_layout clone 8B: dtod col1 failed: {r1:?}"
986                    )));
987                }
988            }
989            self.htod_launch_metadata_async_copy_one(
990                &n,
991                &out_d_num_rows,
992                cu_stream,
993                "wcoj_layout clone 8B d_num_rows",
994            )?;
995            Ok(())
996        })();
997        if let Err(e) = queued_result {
998            let _ = cu_stream.synchronize();
999            return Err(e);
1000        }
1001
1002        if let Err(e) = rec.commit(runtime) {
1003            let _ = cu_stream.synchronize();
1004            return Err(XlogError::Kernel(format!(
1005                "wcoj_layout clone 8B: commit {e}"
1006            )));
1007        }
1008
1009        Ok(CudaBuffer::from_columns_with_host_count(
1010            vec![out_col0.into(), out_col1.into()],
1011            n as u64,
1012            out_d_num_rows,
1013            input.schema().clone(),
1014            n,
1015        ))
1016    }
1017}
1018
1019// ===============================================================
1020// General-arity clique WCOJ provider for K=5..8.
1021//
1022// Thin public methods (k=5..8 × u32/u64) delegate to a
1023// single generic helper `wcoj_clique_recorded_inner`. Width-class
1024// (4-byte = U32+Symbol mixable, 8-byte = U64) and K
1025// drive kernel-name selection and per-row element-size; otherwise
1026// the orchestration is identical: validate → upload edge-pointer
1027// arrays → count → scan → total → materialize → output.
1028//
1029// Each public entry assumes sorted+deduped input as a
1030// pre-condition (same contract as `wcoj_triangle_u32_recorded` /
1031// `wcoj_4cycle_u32_recorded`); the runtime dispatcher routes
1032// every edge through the generic full-row layout-sort accessors before
1033// invoking these entries.
1034// ===============================================================
1035
1036#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1037enum CliqueWidthClass {
1038    FourByte,  // U32 / Symbol
1039    EightByte, // U64
1040}
1041
1042impl CliqueWidthClass {
1043    fn elem_bytes(self) -> usize {
1044        match self {
1045            CliqueWidthClass::FourByte => 4,
1046            CliqueWidthClass::EightByte => 8,
1047        }
1048    }
1049    fn label(self) -> &'static str {
1050        match self {
1051            CliqueWidthClass::FourByte => "u32",
1052            CliqueWidthClass::EightByte => "u64",
1053        }
1054    }
1055    fn validate_col_type(self, ty: ScalarType) -> bool {
1056        match self {
1057            CliqueWidthClass::FourByte => matches!(ty, ScalarType::U32 | ScalarType::Symbol),
1058            CliqueWidthClass::EightByte => matches!(ty, ScalarType::U64),
1059        }
1060    }
1061}
1062
1063/// Resolve the kernel name for a given `(K, count_or_materialize, width_class)`.
1064fn clique_kernel_name(k: usize, materialize: bool, w: CliqueWidthClass) -> &'static str {
1065    match (k, materialize, w) {
1066        (5, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE5_COUNT_HG_U32,
1067        (5, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE5_MATERIALIZE_HG_U32,
1068        (5, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE5_COUNT_HG_U64,
1069        (5, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE5_MATERIALIZE_HG_U64,
1070        (6, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE6_COUNT_HG_U32,
1071        (6, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE6_MATERIALIZE_HG_U32,
1072        (6, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE6_COUNT_HG_U64,
1073        (6, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE6_MATERIALIZE_HG_U64,
1074        (7, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE7_COUNT_HG_U32,
1075        (7, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE7_MATERIALIZE_HG_U32,
1076        (7, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE7_COUNT_HG_U64,
1077        (7, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE7_MATERIALIZE_HG_U64,
1078        (8, false, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE8_COUNT_HG_U32,
1079        (8, true, CliqueWidthClass::FourByte) => wcoj_kernels::WCOJ_CLIQUE8_MATERIALIZE_HG_U32,
1080        (8, false, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE8_COUNT_HG_U64,
1081        (8, true, CliqueWidthClass::EightByte) => wcoj_kernels::WCOJ_CLIQUE8_MATERIALIZE_HG_U64,
1082        _ => panic!("clique_kernel_name: K must be 5..8, got {}", k),
1083    }
1084}
1085
1086enum CliqueLeaderMetadata {
1087    U32(WcojRelationMetadata<u32>),
1088    U64(WcojRelationMetadata<u64>),
1089}
1090
1091impl CliqueLeaderMetadata {
1092    fn total_rows_u32(&self, entry_label: &str) -> Result<u32> {
1093        let total = match self {
1094            CliqueLeaderMetadata::U32(metadata) => metadata.total,
1095            CliqueLeaderMetadata::U64(metadata) => metadata.total,
1096        };
1097        u32::try_from(total).map_err(|_| {
1098            XlogError::Kernel(format!(
1099                "{}: leader metadata total {} exceeds u32 kernel surface",
1100                entry_label, total
1101            ))
1102        })
1103    }
1104
1105    fn key_count(&self) -> u32 {
1106        match self {
1107            CliqueLeaderMetadata::U32(metadata) => metadata.key_count,
1108            CliqueLeaderMetadata::U64(metadata) => metadata.key_count,
1109        }
1110    }
1111}
1112
1113fn validate_clique_metadata_leader<'a>(
1114    k: usize,
1115    edges: &'a [&CudaBuffer],
1116    leader_edge_idx: u32,
1117    width_class: CliqueWidthClass,
1118    entry_label: &str,
1119) -> Result<&'a CudaBuffer> {
1120    if !(5..=8).contains(&k) {
1121        return Err(XlogError::Kernel(format!(
1122            "{}: k must be 5..8, got {}",
1123            entry_label, k
1124        )));
1125    }
1126    let expected_edges = k * (k - 1) / 2;
1127    if edges.len() != expected_edges {
1128        return Err(XlogError::Kernel(format!(
1129            "{}: expected {} edges (= C({}, 2)), got {}",
1130            entry_label,
1131            expected_edges,
1132            k,
1133            edges.len()
1134        )));
1135    }
1136    let leader_slot = usize::try_from(leader_edge_idx)
1137        .ok()
1138        .filter(|idx| *idx < expected_edges)
1139        .ok_or_else(|| {
1140            XlogError::Kernel(format!(
1141                "{}: leader_edge_idx {} out of range for {} edges",
1142                entry_label, leader_edge_idx, expected_edges
1143            ))
1144        })?;
1145    let leader = edges[leader_slot];
1146    if leader.arity() != 2 {
1147        return Err(XlogError::Kernel(format!(
1148            "{}: leader edge must be 2-column, got arity {}",
1149            entry_label,
1150            leader.arity()
1151        )));
1152    }
1153    let ty = leader.schema.column_type(0).ok_or_else(|| {
1154        XlogError::Kernel(format!(
1155            "{}: leader edge column 0 type missing",
1156            entry_label
1157        ))
1158    })?;
1159    if !width_class.validate_col_type(ty) {
1160        return Err(XlogError::Kernel(format!(
1161            "{}: leader edge column 0 type {:?} not in {} width-class",
1162            entry_label,
1163            ty,
1164            width_class.label()
1165        )));
1166    }
1167    Ok(leader)
1168}
1169
1170impl CudaKernelProvider {
1171    fn wcoj_clique_metadata_recorded_u32_inner(
1172        &self,
1173        k: usize,
1174        edges: &[&CudaBuffer],
1175        leader_edge_idx: u32,
1176        launch_stream: StreamId,
1177        entry_label: &str,
1178    ) -> Result<WcojRelationMetadata<u32>> {
1179        let leader = validate_clique_metadata_leader(
1180            k,
1181            edges,
1182            leader_edge_idx,
1183            CliqueWidthClass::FourByte,
1184            entry_label,
1185        )?;
1186        self.wcoj_build_metadata_u32_recorded(leader, 0, launch_stream)
1187    }
1188
1189    fn wcoj_clique_metadata_recorded_u64_inner(
1190        &self,
1191        k: usize,
1192        edges: &[&CudaBuffer],
1193        leader_edge_idx: u32,
1194        launch_stream: StreamId,
1195        entry_label: &str,
1196    ) -> Result<WcojRelationMetadata<u64>> {
1197        let leader = validate_clique_metadata_leader(
1198            k,
1199            edges,
1200            leader_edge_idx,
1201            CliqueWidthClass::EightByte,
1202            entry_label,
1203        )?;
1204        self.wcoj_build_metadata_u64_recorded(leader, 0, launch_stream)
1205    }
1206
1207    /// Build leader-edge runtime metadata for a 5-clique 4-byte-width dispatch.
1208    pub fn wcoj_clique5_metadata_recorded_u32(
1209        &self,
1210        edges: &[&CudaBuffer; 10],
1211        leader_edge_idx: u32,
1212        launch_stream: StreamId,
1213    ) -> Result<WcojRelationMetadata<u32>> {
1214        self.wcoj_clique_metadata_recorded_u32_inner(
1215            5,
1216            edges,
1217            leader_edge_idx,
1218            launch_stream,
1219            "wcoj_clique5_metadata_recorded_u32",
1220        )
1221    }
1222
1223    /// Build leader-edge runtime metadata for a 5-clique 8-byte-width dispatch.
1224    pub fn wcoj_clique5_metadata_recorded_u64(
1225        &self,
1226        edges: &[&CudaBuffer; 10],
1227        leader_edge_idx: u32,
1228        launch_stream: StreamId,
1229    ) -> Result<WcojRelationMetadata<u64>> {
1230        self.wcoj_clique_metadata_recorded_u64_inner(
1231            5,
1232            edges,
1233            leader_edge_idx,
1234            launch_stream,
1235            "wcoj_clique5_metadata_recorded_u64",
1236        )
1237    }
1238
1239    /// Build leader-edge runtime metadata for a 6-clique 4-byte-width dispatch.
1240    pub fn wcoj_clique6_metadata_recorded_u32(
1241        &self,
1242        edges: &[&CudaBuffer; 15],
1243        leader_edge_idx: u32,
1244        launch_stream: StreamId,
1245    ) -> Result<WcojRelationMetadata<u32>> {
1246        self.wcoj_clique_metadata_recorded_u32_inner(
1247            6,
1248            edges,
1249            leader_edge_idx,
1250            launch_stream,
1251            "wcoj_clique6_metadata_recorded_u32",
1252        )
1253    }
1254
1255    /// Build leader-edge runtime metadata for a 6-clique 8-byte-width dispatch.
1256    pub fn wcoj_clique6_metadata_recorded_u64(
1257        &self,
1258        edges: &[&CudaBuffer; 15],
1259        leader_edge_idx: u32,
1260        launch_stream: StreamId,
1261    ) -> Result<WcojRelationMetadata<u64>> {
1262        self.wcoj_clique_metadata_recorded_u64_inner(
1263            6,
1264            edges,
1265            leader_edge_idx,
1266            launch_stream,
1267            "wcoj_clique6_metadata_recorded_u64",
1268        )
1269    }
1270
1271    /// Generic clique provider helper. Orchestrates count
1272    /// → scan → total → materialize for K-clique on K*(K-1)/2
1273    /// 2-column edges in the given width-class.
1274    ///
1275    /// Caller pre-conditions:
1276    ///   * Manager runtime-backed (validated here too).
1277    ///   * `K ∈ {5, 6, 7, 8}` (validated; panic-free `Err` otherwise).
1278    ///   * `edges.len() == K * (K - 1) / 2`.
1279    ///   * Each edge is 2-column with all columns in `width_class`.
1280    ///   * Each edge is lex-sorted+deduped on `(col0, col1)` —
1281    ///     same contract as `wcoj_triangle_*_recorded`. The
1282    ///     runtime dispatcher routes every edge through the generic
1283    ///     full-row layout-sort accessors before
1284    ///     calling here; provider does NOT layout-sort itself.
1285    #[allow(clippy::too_many_arguments)]
1286    fn wcoj_clique_recorded_inner(
1287        &self,
1288        k: usize,
1289        edges: &[&CudaBuffer],
1290        leader_edge_idx: u32,
1291        edge_order: Option<&[u8]>,
1292        iteration_order: Option<&[u8]>,
1293        width_class: CliqueWidthClass,
1294        launch_stream: StreamId,
1295        entry_label: &str,
1296    ) -> Result<CudaBuffer> {
1297        let runtime = self.memory().runtime().ok_or_else(|| {
1298            XlogError::Kernel(format!(
1299                "{} requires a runtime-backed GpuMemoryManager (with_runtime)",
1300                entry_label
1301            ))
1302        })?;
1303        let cu_stream = runtime
1304            .stream_pool()
1305            .resolve(launch_stream)
1306            .ok_or_else(|| {
1307                XlogError::Kernel(format!(
1308                    "{}: launch_stream StreamId({}) does not resolve",
1309                    entry_label, launch_stream.0
1310                ))
1311            })?;
1312        if !(5..=8).contains(&k) {
1313            return Err(XlogError::Kernel(format!(
1314                "{}: k must be 5..8, got {}",
1315                entry_label, k
1316            )));
1317        }
1318        let expected_edges = k * (k - 1) / 2;
1319        if edges.len() != expected_edges {
1320            return Err(XlogError::Kernel(format!(
1321                "{}: expected {} edges (= C({}, 2)), got {}",
1322                entry_label,
1323                expected_edges,
1324                k,
1325                edges.len()
1326            )));
1327        }
1328        if usize::try_from(leader_edge_idx)
1329            .ok()
1330            .is_none_or(|idx| idx >= expected_edges)
1331        {
1332            return Err(XlogError::Kernel(format!(
1333                "{}: leader_edge_idx {} out of range for {} edges",
1334                entry_label, leader_edge_idx, expected_edges
1335            )));
1336        }
1337        match (edge_order, iteration_order) {
1338            (Some(edge_order), Some(iteration_order)) => {
1339                validate_clique_u8_permutation(
1340                    edge_order,
1341                    expected_edges,
1342                    "edge_order",
1343                    entry_label,
1344                )?;
1345                validate_clique_u8_permutation(iteration_order, k, "iteration_order", entry_label)?;
1346            }
1347            (None, None) => {}
1348            _ => {
1349                return Err(XlogError::Kernel(format!(
1350                    "{}: edge_order and iteration_order must both be present or both be omitted",
1351                    entry_label
1352                )));
1353            }
1354        }
1355
1356        // Validate every edge: 2-column, in width-class.
1357        for (i, buf) in edges.iter().enumerate() {
1358            if buf.arity() != 2 {
1359                return Err(XlogError::Kernel(format!(
1360                    "{}: edge[{}] must be 2-column, got arity {}",
1361                    entry_label,
1362                    i,
1363                    buf.arity()
1364                )));
1365            }
1366            for col_idx in 0..2 {
1367                let ty = buf.schema.column_type(col_idx).ok_or_else(|| {
1368                    XlogError::Kernel(format!(
1369                        "{}: edge[{}] column {} type missing",
1370                        entry_label, i, col_idx
1371                    ))
1372                })?;
1373                if !width_class.validate_col_type(ty) {
1374                    return Err(XlogError::Kernel(format!(
1375                        "{}: edge[{}] column {} type {:?} not in {} width-class",
1376                        entry_label,
1377                        i,
1378                        col_idx,
1379                        ty,
1380                        width_class.label()
1381                    )));
1382                }
1383            }
1384        }
1385
1386        // Build output schema in kernel binding order. The runtime
1387        // projects this buffer back to rule-head order when the plan
1388        // chooses a non-identity variable order.
1389        let mut head_types = Vec::with_capacity(k);
1390        let leader_slot = edge_order.map(|order| order[0] as usize).unwrap_or(0);
1391        head_types.push(edges[leader_slot].schema.column_type(0).expect("validated"));
1392        head_types.push(edges[leader_slot].schema.column_type(1).expect("validated"));
1393        for i in 2..k {
1394            let logical_edge = i - 1;
1395            let edge_slot = edge_order
1396                .map(|order| order[logical_edge] as usize)
1397                .unwrap_or(logical_edge);
1398            head_types.push(edges[edge_slot].schema.column_type(1).expect("validated"));
1399        }
1400        let out_schema = Schema::new(
1401            head_types
1402                .iter()
1403                .enumerate()
1404                .map(|(i, t)| (format!("col{}", i), *t))
1405                .collect(),
1406        );
1407
1408        let leader_slot = usize::try_from(leader_edge_idx).expect("validated");
1409        let n_leader = self.logical_row_count_u32(edges[leader_slot])?;
1410        if n_leader == 0 {
1411            return self.create_empty_buffer(out_schema);
1412        }
1413        // Paper section 5 Algorithm 1 Phase 1: histograms are maintained
1414        // alongside data and refreshed during recursive merge handling.
1415        let metadata_start = Instant::now();
1416        let leader_metadata = match width_class {
1417            CliqueWidthClass::FourByte => {
1418                CliqueLeaderMetadata::U32(self.wcoj_clique_metadata_recorded_u32_inner(
1419                    k,
1420                    edges,
1421                    leader_edge_idx,
1422                    launch_stream,
1423                    entry_label,
1424                )?)
1425            }
1426            CliqueWidthClass::EightByte => {
1427                CliqueLeaderMetadata::U64(self.wcoj_clique_metadata_recorded_u64_inner(
1428                    k,
1429                    edges,
1430                    leader_edge_idx,
1431                    launch_stream,
1432                    entry_label,
1433                )?)
1434            }
1435        };
1436        self.record_kclique_metadata_build_nanos(metadata_start.elapsed().as_nanos());
1437        let leader_work_total = leader_metadata.total_rows_u32(entry_label)?;
1438        if leader_work_total != n_leader {
1439            return Err(XlogError::Kernel(format!(
1440                "{}: leader metadata total {} does not match leader row count {}",
1441                entry_label, leader_work_total, n_leader
1442            )));
1443        }
1444        let leader_metadata_key_count = leader_metadata.key_count();
1445        if leader_metadata_key_count == 0 {
1446            return self.create_empty_buffer(out_schema);
1447        }
1448
1449        // Build host-side per-edge pointer arrays + row-count
1450        // array. These get htod'd to small device buffers that
1451        // the kernel reads as `const T* const* edge_col0` etc.
1452        let mut edge_col0_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
1453        let mut edge_col1_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
1454        let mut edge_n_host: Vec<u32> = Vec::with_capacity(expected_edges);
1455        for buf in edges.iter() {
1456            let col0 = buf.column(0).expect("validated");
1457            let col1 = buf.column(1).expect("validated");
1458            edge_col0_ptrs.push(*col0.device_ptr());
1459            edge_col1_ptrs.push(*col1.device_ptr());
1460            edge_n_host.push(self.logical_row_count_u32(buf)?);
1461        }
1462
1463        // Allocate device-side pointer arrays + row counts.
1464        let mut d_edge_col0 = self.memory.alloc::<u64>(expected_edges)?;
1465        let mut d_edge_col1 = self.memory.alloc::<u64>(expected_edges)?;
1466        let mut d_edge_n = self.memory.alloc::<u32>(expected_edges)?;
1467        let device = self.device.inner();
1468        self.htod_launch_metadata_sync_copy_into(&edge_col0_ptrs, &mut d_edge_col0)
1469            .map_err(|e| {
1470                XlogError::Kernel(format!(
1471                    "{}: htod edge_col0_ptrs failed: {}",
1472                    entry_label, e
1473                ))
1474            })?;
1475        self.htod_launch_metadata_sync_copy_into(&edge_col1_ptrs, &mut d_edge_col1)
1476            .map_err(|e| {
1477                XlogError::Kernel(format!(
1478                    "{}: htod edge_col1_ptrs failed: {}",
1479                    entry_label, e
1480                ))
1481            })?;
1482        self.htod_launch_metadata_sync_copy_into(&edge_n_host, &mut d_edge_n)
1483            .map_err(|e| {
1484                XlogError::Kernel(format!("{}: htod edge_n failed: {}", entry_label, e))
1485            })?;
1486        let d_edge_order = if let Some(edge_order) = edge_order {
1487            let mut buf = self.memory.alloc::<u8>(expected_edges)?;
1488            self.htod_launch_metadata_sync_copy_into(edge_order, &mut buf)
1489                .map_err(|e| {
1490                    XlogError::Kernel(format!("{}: htod edge_order failed: {}", entry_label, e))
1491                })?;
1492            Some(buf)
1493        } else {
1494            None
1495        };
1496        let d_iteration_order = if let Some(iteration_order) = iteration_order {
1497            let mut buf = self.memory.alloc::<u8>(k)?;
1498            self.htod_launch_metadata_sync_copy_into(iteration_order, &mut buf)
1499                .map_err(|e| {
1500                    XlogError::Kernel(format!(
1501                        "{}: htod iteration_order failed: {}",
1502                        entry_label, e
1503                    ))
1504                })?;
1505            Some(buf)
1506        } else {
1507            None
1508        };
1509
1510        // Phase 1: HG block counts + scan + total.
1511        let block_work_unit = crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT;
1512        let grid = leader_work_total.div_ceil(block_work_unit);
1513        let count_buf = self.memory.alloc::<u32>(grid as usize)?;
1514        let thread_counts_buf = self
1515            .memory
1516            .alloc::<u32>((grid as usize) * (BLOCK_SIZE as usize))?;
1517        let mut offsets_buf = self.memory.alloc::<u32>(grid as usize)?;
1518        let d_total = self.memory.alloc::<u32>(1)?;
1519
1520        let mut rec_count = LaunchRecorder::new_strict(launch_stream);
1521        for buf in edges.iter() {
1522            rec_count.read(buf.num_rows_device());
1523            rec_count.read_column(buf.column(0).expect("validated"));
1524            rec_count.read_column(buf.column(1).expect("validated"));
1525        }
1526        rec_count.read(&d_edge_col0);
1527        rec_count.read(&d_edge_col1);
1528        rec_count.read(&d_edge_n);
1529        if let Some(buf) = d_edge_order.as_ref() {
1530            rec_count.read(buf);
1531        }
1532        if let Some(buf) = d_iteration_order.as_ref() {
1533            rec_count.read(buf);
1534        }
1535        match &leader_metadata {
1536            CliqueLeaderMetadata::U32(leader_metadata) => {
1537                rec_count.read(&leader_metadata.unique_keys);
1538                rec_count.read(&leader_metadata.fan_out);
1539                rec_count.read(&leader_metadata.prefix_sum);
1540            }
1541            CliqueLeaderMetadata::U64(leader_metadata) => {
1542                rec_count.read(&leader_metadata.unique_keys);
1543                rec_count.read(&leader_metadata.fan_out);
1544                rec_count.read(&leader_metadata.prefix_sum);
1545            }
1546        }
1547        rec_count.write(&count_buf);
1548        rec_count.write(&thread_counts_buf);
1549        rec_count.write(&offsets_buf);
1550        rec_count.write(&d_total);
1551        rec_count.preflight(runtime).map_err(|e| {
1552            XlogError::Kernel(format!("{}: count preflight failed: {}", entry_label, e))
1553        })?;
1554
1555        let count_kernel = device
1556            .get_func(WCOJ_MODULE, clique_kernel_name(k, false, width_class))
1557            .ok_or_else(|| {
1558                XlogError::Kernel(format!(
1559                    "{}: count kernel '{}' not found",
1560                    entry_label,
1561                    clique_kernel_name(k, false, width_class)
1562                ))
1563            })?;
1564        let count_config = LaunchConfig {
1565            grid_dim: (grid, 1, 1),
1566            block_dim: (BLOCK_SIZE, 1, 1),
1567            shared_mem_bytes: 0,
1568        };
1569
1570        // SAFETY: kernel signature
1571        //   wcoj_clique{K}_count_*(
1572        //     const T* const* edge_col0,
1573        //     const T* const* edge_col1,
1574        //     const u32* edge_n,
1575        //     u32 leader_edge_idx,
1576        //     const u8* edge_order,
1577        //     const u8* iteration_order,
1578        //     u32 leader_count,
1579        //     const T* unique_keys,
1580        //     const u32* fan_out,
1581        //     const u32* prefix_sum,
1582        //     u32 metadata_key_count,
1583        //     u32 block_work_unit,
1584        //     u32* out_block_counts,
1585        //     u32* out_thread_counts)
1586        // Pointers all device-resident; preflight verified
1587        // cross-stream tracking. Raw params are required because
1588        // the metadata-extended ABI exceeds the tuple-launch arity.
1589        let null_order_ptr = 0_u64;
1590        let edge_order_param = match d_edge_order.as_ref() {
1591            Some(buf) => buf.as_kernel_param(),
1592            None => null_order_ptr.as_kernel_param(),
1593        };
1594        let iteration_order_param = match d_iteration_order.as_ref() {
1595            Some(buf) => buf.as_kernel_param(),
1596            None => null_order_ptr.as_kernel_param(),
1597        };
1598        unsafe {
1599            let mut params: Vec<*mut c_void> = match &leader_metadata {
1600                CliqueLeaderMetadata::U32(leader_metadata) => vec![
1601                    (&d_edge_col0).as_kernel_param(),
1602                    (&d_edge_col1).as_kernel_param(),
1603                    (&d_edge_n).as_kernel_param(),
1604                    leader_edge_idx.as_kernel_param(),
1605                    edge_order_param,
1606                    iteration_order_param,
1607                    n_leader.as_kernel_param(),
1608                    (&leader_metadata.unique_keys).as_kernel_param(),
1609                    (&leader_metadata.fan_out).as_kernel_param(),
1610                    (&leader_metadata.prefix_sum).as_kernel_param(),
1611                    leader_metadata_key_count.as_kernel_param(),
1612                    block_work_unit.as_kernel_param(),
1613                    (&count_buf).as_kernel_param(),
1614                    (&thread_counts_buf).as_kernel_param(),
1615                ],
1616                CliqueLeaderMetadata::U64(leader_metadata) => vec![
1617                    (&d_edge_col0).as_kernel_param(),
1618                    (&d_edge_col1).as_kernel_param(),
1619                    (&d_edge_n).as_kernel_param(),
1620                    leader_edge_idx.as_kernel_param(),
1621                    edge_order_param,
1622                    iteration_order_param,
1623                    n_leader.as_kernel_param(),
1624                    (&leader_metadata.unique_keys).as_kernel_param(),
1625                    (&leader_metadata.fan_out).as_kernel_param(),
1626                    (&leader_metadata.prefix_sum).as_kernel_param(),
1627                    leader_metadata_key_count.as_kernel_param(),
1628                    block_work_unit.as_kernel_param(),
1629                    (&count_buf).as_kernel_param(),
1630                    (&thread_counts_buf).as_kernel_param(),
1631                ],
1632            };
1633            count_kernel
1634                .clone()
1635                .launch_on_stream(&cu_stream, count_config, &mut params)
1636                .map_err(|e| {
1637                    XlogError::Kernel(format!(
1638                        "{}: count kernel launch failed: {}",
1639                        entry_label, e
1640                    ))
1641                })?;
1642        }
1643
1644        // dtod count → offsets (scan modifies offsets in place).
1645        let bytes_count = (grid as usize) * std::mem::size_of::<u32>();
1646        unsafe {
1647            let res = sys::cuMemcpyDtoDAsync_v2(
1648                *offsets_buf.device_ptr(),
1649                *count_buf.device_ptr(),
1650                bytes_count,
1651                cu_stream.cu_stream(),
1652            );
1653            if res != sys::cudaError_enum::CUDA_SUCCESS {
1654                return Err(XlogError::Kernel(format!(
1655                    "{}: dtod count → offsets failed: {:?}",
1656                    entry_label, res
1657                )));
1658            }
1659        }
1660
1661        // Device-side exclusive prefix-sum on offsets.
1662        self.multiblock_scan_u32_inplace_on_stream(
1663            &mut offsets_buf,
1664            grid,
1665            &cu_stream,
1666            launch_stream,
1667            runtime,
1668        )?;
1669
1670        // Compute total = counts[n-1] + offsets[n-1].
1671        let total_kernel = device
1672            .get_func(WCOJ_MODULE, wcoj_kernels::WCOJ_COMPUTE_TOTAL)
1673            .ok_or_else(|| XlogError::Kernel("wcoj_compute_total kernel not found".to_string()))?;
1674        unsafe {
1675            total_kernel
1676                .clone()
1677                .launch_on_stream(
1678                    &cu_stream,
1679                    LaunchConfig {
1680                        grid_dim: (1, 1, 1),
1681                        block_dim: (1, 1, 1),
1682                        shared_mem_bytes: 0,
1683                    },
1684                    (&count_buf, &offsets_buf, grid, &d_total),
1685                )
1686                .map_err(|e| {
1687                    XlogError::Kernel(format!("wcoj_compute_total launch failed: {}", e))
1688                })?;
1689        }
1690
1691        rec_count.commit(runtime).map_err(|e| {
1692            XlogError::Kernel(format!(
1693                "{}: count+scan+total commit failed: {}",
1694                entry_label, e
1695            ))
1696        })?;
1697
1698        cu_stream.synchronize().map_err(|e| {
1699            XlogError::Kernel(format!(
1700                "{}: stream sync after total failed: {}",
1701                entry_label, e
1702            ))
1703        })?;
1704        let total_rows = self
1705            .dtoh_scalar_untracked::<u32>(&d_total, 0)
1706            .map_err(|e| {
1707                XlogError::Kernel(format!("{}: read d_total failed: {}", entry_label, e))
1708            })?;
1709
1710        if total_rows == 0 {
1711            return self.create_empty_buffer(out_schema);
1712        }
1713
1714        // Phase 2: materialize. Allocate K output column buffers.
1715        let elem_bytes = width_class.elem_bytes();
1716        let bytes_per_col = (total_rows as usize) * elem_bytes;
1717        let mut out_col_bufs: Vec<TrackedCudaSlice<u8>> = Vec::with_capacity(k);
1718        let mut out_col_ptrs: Vec<u64> = Vec::with_capacity(k);
1719        for _ in 0..k {
1720            let buf = self.memory.alloc::<u8>(bytes_per_col)?;
1721            out_col_ptrs.push(*buf.device_ptr());
1722            out_col_bufs.push(buf);
1723        }
1724        let mut d_out_cols = self.memory.alloc::<u64>(k)?;
1725        self.htod_launch_metadata_sync_copy_into(&out_col_ptrs, &mut d_out_cols)
1726            .map_err(|e| {
1727                XlogError::Kernel(format!("{}: htod out_col_ptrs failed: {}", entry_label, e))
1728            })?;
1729        let out_d_num_rows = self.memory.alloc::<u32>(1)?;
1730
1731        // H2D output row count.
1732        self.htod_launch_metadata_async_copy_one(
1733            &total_rows,
1734            &out_d_num_rows,
1735            &cu_stream,
1736            &format!("{entry_label}: out_d_num_rows"),
1737        )?;
1738
1739        let mut rec_mat = LaunchRecorder::new_strict(launch_stream);
1740        for buf in edges.iter() {
1741            rec_mat.read(buf.num_rows_device());
1742            rec_mat.read_column(buf.column(0).expect("validated"));
1743            rec_mat.read_column(buf.column(1).expect("validated"));
1744        }
1745        rec_mat.read(&d_edge_col0);
1746        rec_mat.read(&d_edge_col1);
1747        rec_mat.read(&d_edge_n);
1748        if let Some(buf) = d_edge_order.as_ref() {
1749            rec_mat.read(buf);
1750        }
1751        if let Some(buf) = d_iteration_order.as_ref() {
1752            rec_mat.read(buf);
1753        }
1754        match &leader_metadata {
1755            CliqueLeaderMetadata::U32(leader_metadata) => {
1756                rec_mat.read(&leader_metadata.unique_keys);
1757                rec_mat.read(&leader_metadata.fan_out);
1758                rec_mat.read(&leader_metadata.prefix_sum);
1759            }
1760            CliqueLeaderMetadata::U64(leader_metadata) => {
1761                rec_mat.read(&leader_metadata.unique_keys);
1762                rec_mat.read(&leader_metadata.fan_out);
1763                rec_mat.read(&leader_metadata.prefix_sum);
1764            }
1765        }
1766        rec_mat.read(&thread_counts_buf);
1767        rec_mat.read(&offsets_buf);
1768        rec_mat.read(&d_out_cols);
1769        for buf in out_col_bufs.iter() {
1770            rec_mat.write(buf);
1771        }
1772        rec_mat.write(&out_d_num_rows);
1773        rec_mat.preflight(runtime).map_err(|e| {
1774            XlogError::Kernel(format!(
1775                "{}: materialize preflight failed: {}",
1776                entry_label, e
1777            ))
1778        })?;
1779
1780        let materialize_kernel = device
1781            .get_func(WCOJ_MODULE, clique_kernel_name(k, true, width_class))
1782            .ok_or_else(|| {
1783                XlogError::Kernel(format!(
1784                    "{}: materialize kernel '{}' not found",
1785                    entry_label,
1786                    clique_kernel_name(k, true, width_class)
1787                ))
1788            })?;
1789
1790        // SAFETY: kernel signature
1791        //   wcoj_clique{K}_materialize_*(
1792        //     const T* const* edge_col0,
1793        //     const T* const* edge_col1,
1794        //     const u32* edge_n,
1795        //     u32 leader_edge_idx,
1796        //     const u8* edge_order,
1797        //     const u8* iteration_order,
1798        //     u32 leader_count,
1799        //     const T* unique_keys,
1800        //     const u32* fan_out,
1801        //     const u32* prefix_sum,
1802        //     u32 metadata_key_count,
1803        //     u32 block_work_unit,
1804        //     const u32* thread_counts,
1805        //     const u32* block_offsets,
1806        //     u32 total_rows,
1807        //     T* const* out_cols)
1808        // Raw params are required because the metadata-extended ABI
1809        // exceeds the tuple-launch arity.
1810        let mat_config = LaunchConfig {
1811            grid_dim: (grid, 1, 1),
1812            block_dim: (BLOCK_SIZE, 1, 1),
1813            shared_mem_bytes: 0,
1814        };
1815        unsafe {
1816            let mut params: Vec<*mut c_void> = match &leader_metadata {
1817                CliqueLeaderMetadata::U32(leader_metadata) => vec![
1818                    (&d_edge_col0).as_kernel_param(),
1819                    (&d_edge_col1).as_kernel_param(),
1820                    (&d_edge_n).as_kernel_param(),
1821                    leader_edge_idx.as_kernel_param(),
1822                    edge_order_param,
1823                    iteration_order_param,
1824                    n_leader.as_kernel_param(),
1825                    (&leader_metadata.unique_keys).as_kernel_param(),
1826                    (&leader_metadata.fan_out).as_kernel_param(),
1827                    (&leader_metadata.prefix_sum).as_kernel_param(),
1828                    leader_metadata_key_count.as_kernel_param(),
1829                    block_work_unit.as_kernel_param(),
1830                    (&thread_counts_buf).as_kernel_param(),
1831                    (&offsets_buf).as_kernel_param(),
1832                    total_rows.as_kernel_param(),
1833                    (&d_out_cols).as_kernel_param(),
1834                ],
1835                CliqueLeaderMetadata::U64(leader_metadata) => vec![
1836                    (&d_edge_col0).as_kernel_param(),
1837                    (&d_edge_col1).as_kernel_param(),
1838                    (&d_edge_n).as_kernel_param(),
1839                    leader_edge_idx.as_kernel_param(),
1840                    edge_order_param,
1841                    iteration_order_param,
1842                    n_leader.as_kernel_param(),
1843                    (&leader_metadata.unique_keys).as_kernel_param(),
1844                    (&leader_metadata.fan_out).as_kernel_param(),
1845                    (&leader_metadata.prefix_sum).as_kernel_param(),
1846                    leader_metadata_key_count.as_kernel_param(),
1847                    block_work_unit.as_kernel_param(),
1848                    (&thread_counts_buf).as_kernel_param(),
1849                    (&offsets_buf).as_kernel_param(),
1850                    total_rows.as_kernel_param(),
1851                    (&d_out_cols).as_kernel_param(),
1852                ],
1853            };
1854            materialize_kernel
1855                .clone()
1856                .launch_on_stream(&cu_stream, mat_config, &mut params)
1857                .map_err(|e| {
1858                    XlogError::Kernel(format!("{}: materialize launch failed: {}", entry_label, e))
1859                })?;
1860        }
1861
1862        rec_mat.commit(runtime).map_err(|e| {
1863            XlogError::Kernel(format!("{}: materialize commit failed: {}", entry_label, e))
1864        })?;
1865
1866        let columns: Vec<CudaColumn> = out_col_bufs.into_iter().map(|b| b.into()).collect();
1867        Ok(CudaBuffer::from_columns_with_host_count(
1868            columns,
1869            total_rows as u64,
1870            out_d_num_rows,
1871            out_schema,
1872            total_rows,
1873        ))
1874    }
1875
1876    /// 5-clique WCOJ at 4-byte width-class.
1877    ///
1878    /// `edges` must contain exactly **10** 2-column buffers in
1879    /// canonical lex `(i, j)` order (i < j): `(0,1), (0,2), (0,3),
1880    /// (0,4), (1,2), (1,3), (1,4), (2,3), (2,4), (3,4)`. Each
1881    /// column may be `U32` or `Symbol` (mixable within the
1882    /// 4-byte width-class). All edges must be lex-sorted+deduped
1883    /// on `(col0, col1)` — the runtime dispatcher routes through
1884    /// `wcoj_layout_sort_u32_recorded` before calling here.
1885    pub fn wcoj_clique5_u32_recorded(
1886        &self,
1887        edges: &[&CudaBuffer; 10],
1888        launch_stream: StreamId,
1889    ) -> Result<CudaBuffer> {
1890        self.wcoj_clique_recorded_inner(
1891            5,
1892            edges,
1893            0,
1894            None,
1895            None,
1896            CliqueWidthClass::FourByte,
1897            launch_stream,
1898            "wcoj_clique5_u32_recorded",
1899        )
1900    }
1901
1902    /// 5-clique WCOJ at 4-byte width-class using plan-derived launch params.
1903    pub fn wcoj_clique5_u32_recorded_planned(
1904        &self,
1905        edges: &[&CudaBuffer; 10],
1906        leader_edge_idx: u32,
1907        edge_order: &[u8],
1908        iteration_order: &[u8],
1909        launch_stream: StreamId,
1910    ) -> Result<CudaBuffer> {
1911        self.wcoj_clique_recorded_inner(
1912            5,
1913            edges,
1914            leader_edge_idx,
1915            Some(edge_order),
1916            Some(iteration_order),
1917            CliqueWidthClass::FourByte,
1918            launch_stream,
1919            "wcoj_clique5_u32_recorded_planned",
1920        )
1921    }
1922
1923    /// 5-clique WCOJ at 8-byte width-class (U64 only).
1924    pub fn wcoj_clique5_u64_recorded(
1925        &self,
1926        edges: &[&CudaBuffer; 10],
1927        launch_stream: StreamId,
1928    ) -> Result<CudaBuffer> {
1929        self.wcoj_clique_recorded_inner(
1930            5,
1931            edges,
1932            0,
1933            None,
1934            None,
1935            CliqueWidthClass::EightByte,
1936            launch_stream,
1937            "wcoj_clique5_u64_recorded",
1938        )
1939    }
1940
1941    /// 5-clique WCOJ at 8-byte width-class using plan-derived launch params.
1942    pub fn wcoj_clique5_u64_recorded_planned(
1943        &self,
1944        edges: &[&CudaBuffer; 10],
1945        leader_edge_idx: u32,
1946        edge_order: &[u8],
1947        iteration_order: &[u8],
1948        launch_stream: StreamId,
1949    ) -> Result<CudaBuffer> {
1950        self.wcoj_clique_recorded_inner(
1951            5,
1952            edges,
1953            leader_edge_idx,
1954            Some(edge_order),
1955            Some(iteration_order),
1956            CliqueWidthClass::EightByte,
1957            launch_stream,
1958            "wcoj_clique5_u64_recorded_planned",
1959        )
1960    }
1961
1962    /// 6-clique WCOJ at 4-byte width-class.
1963    ///
1964    /// `edges` must contain exactly **15** 2-column buffers in
1965    /// canonical lex `(i, j)` order. Width-class + sort+dedup
1966    /// pre-condition match `wcoj_clique5_u32_recorded`.
1967    pub fn wcoj_clique6_u32_recorded(
1968        &self,
1969        edges: &[&CudaBuffer; 15],
1970        launch_stream: StreamId,
1971    ) -> Result<CudaBuffer> {
1972        self.wcoj_clique_recorded_inner(
1973            6,
1974            edges,
1975            0,
1976            None,
1977            None,
1978            CliqueWidthClass::FourByte,
1979            launch_stream,
1980            "wcoj_clique6_u32_recorded",
1981        )
1982    }
1983
1984    /// 6-clique WCOJ at 4-byte width-class using plan-derived launch params.
1985    pub fn wcoj_clique6_u32_recorded_planned(
1986        &self,
1987        edges: &[&CudaBuffer; 15],
1988        leader_edge_idx: u32,
1989        edge_order: &[u8],
1990        iteration_order: &[u8],
1991        launch_stream: StreamId,
1992    ) -> Result<CudaBuffer> {
1993        self.wcoj_clique_recorded_inner(
1994            6,
1995            edges,
1996            leader_edge_idx,
1997            Some(edge_order),
1998            Some(iteration_order),
1999            CliqueWidthClass::FourByte,
2000            launch_stream,
2001            "wcoj_clique6_u32_recorded_planned",
2002        )
2003    }
2004
2005    /// 6-clique WCOJ at 8-byte width-class (U64 only).
2006    pub fn wcoj_clique6_u64_recorded(
2007        &self,
2008        edges: &[&CudaBuffer; 15],
2009        launch_stream: StreamId,
2010    ) -> Result<CudaBuffer> {
2011        self.wcoj_clique_recorded_inner(
2012            6,
2013            edges,
2014            0,
2015            None,
2016            None,
2017            CliqueWidthClass::EightByte,
2018            launch_stream,
2019            "wcoj_clique6_u64_recorded",
2020        )
2021    }
2022
2023    /// 6-clique WCOJ at 8-byte width-class using plan-derived launch params.
2024    pub fn wcoj_clique6_u64_recorded_planned(
2025        &self,
2026        edges: &[&CudaBuffer; 15],
2027        leader_edge_idx: u32,
2028        edge_order: &[u8],
2029        iteration_order: &[u8],
2030        launch_stream: StreamId,
2031    ) -> Result<CudaBuffer> {
2032        self.wcoj_clique_recorded_inner(
2033            6,
2034            edges,
2035            leader_edge_idx,
2036            Some(edge_order),
2037            Some(iteration_order),
2038            CliqueWidthClass::EightByte,
2039            launch_stream,
2040            "wcoj_clique6_u64_recorded_planned",
2041        )
2042    }
2043
2044    /// 7-clique WCOJ at 4-byte width-class.
2045    pub fn wcoj_clique7_u32_recorded(
2046        &self,
2047        edges: &[&CudaBuffer; 21],
2048        launch_stream: StreamId,
2049    ) -> Result<CudaBuffer> {
2050        self.wcoj_clique_recorded_inner(
2051            7,
2052            edges,
2053            0,
2054            None,
2055            None,
2056            CliqueWidthClass::FourByte,
2057            launch_stream,
2058            "wcoj_clique7_u32_recorded",
2059        )
2060    }
2061
2062    /// 7-clique WCOJ at 4-byte width-class using plan-derived launch params.
2063    pub fn wcoj_clique7_u32_recorded_planned(
2064        &self,
2065        edges: &[&CudaBuffer; 21],
2066        leader_edge_idx: u32,
2067        edge_order: &[u8],
2068        iteration_order: &[u8],
2069        launch_stream: StreamId,
2070    ) -> Result<CudaBuffer> {
2071        self.wcoj_clique_recorded_inner(
2072            7,
2073            edges,
2074            leader_edge_idx,
2075            Some(edge_order),
2076            Some(iteration_order),
2077            CliqueWidthClass::FourByte,
2078            launch_stream,
2079            "wcoj_clique7_u32_recorded_planned",
2080        )
2081    }
2082
2083    /// 7-clique WCOJ at 8-byte width-class (U64 only).
2084    pub fn wcoj_clique7_u64_recorded(
2085        &self,
2086        edges: &[&CudaBuffer; 21],
2087        launch_stream: StreamId,
2088    ) -> Result<CudaBuffer> {
2089        self.wcoj_clique_recorded_inner(
2090            7,
2091            edges,
2092            0,
2093            None,
2094            None,
2095            CliqueWidthClass::EightByte,
2096            launch_stream,
2097            "wcoj_clique7_u64_recorded",
2098        )
2099    }
2100
2101    /// 7-clique WCOJ at 8-byte width-class using plan-derived launch params.
2102    pub fn wcoj_clique7_u64_recorded_planned(
2103        &self,
2104        edges: &[&CudaBuffer; 21],
2105        leader_edge_idx: u32,
2106        edge_order: &[u8],
2107        iteration_order: &[u8],
2108        launch_stream: StreamId,
2109    ) -> Result<CudaBuffer> {
2110        self.wcoj_clique_recorded_inner(
2111            7,
2112            edges,
2113            leader_edge_idx,
2114            Some(edge_order),
2115            Some(iteration_order),
2116            CliqueWidthClass::EightByte,
2117            launch_stream,
2118            "wcoj_clique7_u64_recorded_planned",
2119        )
2120    }
2121
2122    /// 8-clique WCOJ at 4-byte width-class.
2123    pub fn wcoj_clique8_u32_recorded(
2124        &self,
2125        edges: &[&CudaBuffer; 28],
2126        launch_stream: StreamId,
2127    ) -> Result<CudaBuffer> {
2128        self.wcoj_clique_recorded_inner(
2129            8,
2130            edges,
2131            0,
2132            None,
2133            None,
2134            CliqueWidthClass::FourByte,
2135            launch_stream,
2136            "wcoj_clique8_u32_recorded",
2137        )
2138    }
2139
2140    /// 8-clique WCOJ at 4-byte width-class using plan-derived launch params.
2141    pub fn wcoj_clique8_u32_recorded_planned(
2142        &self,
2143        edges: &[&CudaBuffer; 28],
2144        leader_edge_idx: u32,
2145        edge_order: &[u8],
2146        iteration_order: &[u8],
2147        launch_stream: StreamId,
2148    ) -> Result<CudaBuffer> {
2149        self.wcoj_clique_recorded_inner(
2150            8,
2151            edges,
2152            leader_edge_idx,
2153            Some(edge_order),
2154            Some(iteration_order),
2155            CliqueWidthClass::FourByte,
2156            launch_stream,
2157            "wcoj_clique8_u32_recorded_planned",
2158        )
2159    }
2160
2161    /// 8-clique WCOJ at 8-byte width-class (U64 only).
2162    pub fn wcoj_clique8_u64_recorded(
2163        &self,
2164        edges: &[&CudaBuffer; 28],
2165        launch_stream: StreamId,
2166    ) -> Result<CudaBuffer> {
2167        self.wcoj_clique_recorded_inner(
2168            8,
2169            edges,
2170            0,
2171            None,
2172            None,
2173            CliqueWidthClass::EightByte,
2174            launch_stream,
2175            "wcoj_clique8_u64_recorded",
2176        )
2177    }
2178
2179    /// 8-clique WCOJ at 8-byte width-class using plan-derived launch params.
2180    pub fn wcoj_clique8_u64_recorded_planned(
2181        &self,
2182        edges: &[&CudaBuffer; 28],
2183        leader_edge_idx: u32,
2184        edge_order: &[u8],
2185        iteration_order: &[u8],
2186        launch_stream: StreamId,
2187    ) -> Result<CudaBuffer> {
2188        self.wcoj_clique_recorded_inner(
2189            8,
2190            edges,
2191            leader_edge_idx,
2192            Some(edge_order),
2193            Some(iteration_order),
2194            CliqueWidthClass::EightByte,
2195            launch_stream,
2196            "wcoj_clique8_u64_recorded_planned",
2197        )
2198    }
2199
2200    /// Aggregate-fused K-clique count-by-root (u32 width-class,
2201    /// K ∈ {5, 6}). For `q(R, count(*)) :- <complete K-clique body>`
2202    /// grouped by the plan's position-0 root variable, computes the
2203    /// (root, count) row set WITHOUT materializing the clique rows.
2204    ///
2205    /// Pipeline (mirrors `wcoj_4cycle_groupby_root_count_u32_recorded`):
2206    ///   1. Layout-normalize every edge per dispatch (sorted-fast-path
2207    ///      clone when already lex-sorted + unique) — the fused path must
2208    ///      give the same guarantee as the unfused pipeline instead of
2209    ///      trusting store-buffer sortedness.
2210    ///   2. Build the leader-edge runtime metadata, htod the per-edge
2211    ///      pointer arrays + plan orders (same surface as the unfused
2212    ///      planned clique entries).
2213    ///   3. `wcoj_clique{K}_groupby_root_count_hg_u32` accumulates, per
2214    ///      leader-edge row, the row's clique completion count via
2215    ///      atomicAdd (order-insensitive, deterministic values). The
2216    ///      row's group key is the oriented leader edge's col0 — the
2217    ///      kernel's binding[0] root.
2218    ///   4. Staging (root, count) over the n_leader input rows, compact
2219    ///      count>0, reduce per root with the recorded groupby Sum.
2220    ///
2221    /// All reduction work is O(n_leader) — input-sized, never
2222    /// join-output-sized. Output schema (root: U32/Symbol, count: U64)
2223    /// matches the unfused materialize+groupby baseline.
2224    #[allow(clippy::too_many_arguments)]
2225    fn wcoj_clique_groupby_root_count_recorded_inner(
2226        &self,
2227        k: usize,
2228        edges: &[&CudaBuffer],
2229        leader_edge_idx: u32,
2230        edge_order: &[u8],
2231        iteration_order: &[u8],
2232        launch_stream: StreamId,
2233        entry_label: &str,
2234    ) -> Result<CudaBuffer> {
2235        let runtime = self.memory().runtime().ok_or_else(|| {
2236            XlogError::Kernel(format!(
2237                "{} requires a runtime-backed GpuMemoryManager (with_runtime)",
2238                entry_label
2239            ))
2240        })?;
2241        let cu_stream = runtime
2242            .stream_pool()
2243            .resolve(launch_stream)
2244            .ok_or_else(|| {
2245                XlogError::Kernel(format!(
2246                    "{}: launch_stream StreamId({}) does not resolve",
2247                    entry_label, launch_stream.0
2248                ))
2249            })?;
2250        if !matches!(k, 5 | 6) {
2251            return Err(XlogError::Kernel(format!(
2252                "{}: fused count-by-root supports k 5..6, got {}",
2253                entry_label, k
2254            )));
2255        }
2256        let expected_edges = k * (k - 1) / 2;
2257        if edges.len() != expected_edges {
2258            return Err(XlogError::Kernel(format!(
2259                "{}: expected {} edges (= C({}, 2)), got {}",
2260                entry_label,
2261                expected_edges,
2262                k,
2263                edges.len()
2264            )));
2265        }
2266        let leader_slot = usize::try_from(leader_edge_idx)
2267            .ok()
2268            .filter(|idx| *idx < expected_edges)
2269            .ok_or_else(|| {
2270                XlogError::Kernel(format!(
2271                    "{}: leader_edge_idx {} out of range for {} edges",
2272                    entry_label, leader_edge_idx, expected_edges
2273                ))
2274            })?;
2275        validate_clique_u8_permutation(edge_order, expected_edges, "edge_order", entry_label)?;
2276        validate_clique_u8_permutation(iteration_order, k, "iteration_order", entry_label)?;
2277
2278        // Layout-normalize per dispatch (commit 31b0ccf0 contract for
2279        // ALL fused group-by-root entries). Also enforces the 2-column
2280        // 4-byte width-class per edge.
2281        let mut laid_out: Vec<CudaBuffer> = Vec::with_capacity(expected_edges);
2282        for buf in edges {
2283            laid_out.push(self.wcoj_layout_u32_recorded(buf, launch_stream)?);
2284        }
2285        let edges: Vec<&CudaBuffer> = laid_out.iter().collect();
2286        let leader = edges[leader_slot];
2287
2288        let w_type = leader.schema.column_type(0).ok_or_else(|| {
2289            XlogError::Kernel(format!(
2290                "{}: leader edge column 0 type missing",
2291                entry_label
2292            ))
2293        })?;
2294        let out_schema = Schema::new(vec![
2295            ("root".to_string(), w_type),
2296            ("count".to_string(), ScalarType::U64),
2297        ]);
2298        let n_leader = self.logical_row_count_u32(leader)?;
2299        if n_leader == 0 {
2300            return self.create_empty_buffer(out_schema);
2301        }
2302
2303        let leader_metadata = self.wcoj_clique_metadata_recorded_u32_inner(
2304            k,
2305            &edges,
2306            leader_edge_idx,
2307            launch_stream,
2308            entry_label,
2309        )?;
2310        let leader_work_total = u32::try_from(leader_metadata.total).map_err(|_| {
2311            XlogError::Kernel(format!(
2312                "{}: leader metadata total {} exceeds u32 kernel surface",
2313                entry_label, leader_metadata.total
2314            ))
2315        })?;
2316        if leader_work_total != n_leader {
2317            return Err(XlogError::Kernel(format!(
2318                "{}: leader metadata total {} does not match leader row count {}",
2319                entry_label, leader_work_total, n_leader
2320            )));
2321        }
2322        if leader_metadata.key_count == 0 {
2323            return self.create_empty_buffer(out_schema);
2324        }
2325
2326        // Host-side per-edge pointer arrays + row counts, htod'd to
2327        // small device buffers (same surface as the unfused planned
2328        // clique entries).
2329        let mut edge_col0_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
2330        let mut edge_col1_ptrs: Vec<u64> = Vec::with_capacity(expected_edges);
2331        let mut edge_n_host: Vec<u32> = Vec::with_capacity(expected_edges);
2332        for buf in edges.iter() {
2333            let col0 = buf.column(0).ok_or_else(|| {
2334                XlogError::Kernel(format!("{}: edge column 0 missing", entry_label))
2335            })?;
2336            let col1 = buf.column(1).ok_or_else(|| {
2337                XlogError::Kernel(format!("{}: edge column 1 missing", entry_label))
2338            })?;
2339            edge_col0_ptrs.push(*col0.device_ptr());
2340            edge_col1_ptrs.push(*col1.device_ptr());
2341            edge_n_host.push(self.logical_row_count_u32(buf)?);
2342        }
2343        let mut d_edge_col0 = self.memory.alloc::<u64>(expected_edges)?;
2344        let mut d_edge_col1 = self.memory.alloc::<u64>(expected_edges)?;
2345        let mut d_edge_n = self.memory.alloc::<u32>(expected_edges)?;
2346        self.htod_launch_metadata_sync_copy_into(&edge_col0_ptrs, &mut d_edge_col0)
2347            .map_err(|e| {
2348                XlogError::Kernel(format!(
2349                    "{}: htod edge_col0_ptrs failed: {}",
2350                    entry_label, e
2351                ))
2352            })?;
2353        self.htod_launch_metadata_sync_copy_into(&edge_col1_ptrs, &mut d_edge_col1)
2354            .map_err(|e| {
2355                XlogError::Kernel(format!(
2356                    "{}: htod edge_col1_ptrs failed: {}",
2357                    entry_label, e
2358                ))
2359            })?;
2360        self.htod_launch_metadata_sync_copy_into(&edge_n_host, &mut d_edge_n)
2361            .map_err(|e| {
2362                XlogError::Kernel(format!("{}: htod edge_n failed: {}", entry_label, e))
2363            })?;
2364        let mut d_edge_order = self.memory.alloc::<u8>(expected_edges)?;
2365        self.htod_launch_metadata_sync_copy_into(edge_order, &mut d_edge_order)
2366            .map_err(|e| {
2367                XlogError::Kernel(format!("{}: htod edge_order failed: {}", entry_label, e))
2368            })?;
2369        let mut d_iteration_order = self.memory.alloc::<u8>(k)?;
2370        self.htod_launch_metadata_sync_copy_into(iteration_order, &mut d_iteration_order)
2371            .map_err(|e| {
2372                XlogError::Kernel(format!(
2373                    "{}: htod iteration_order failed: {}",
2374                    entry_label, e
2375                ))
2376            })?;
2377
2378        // Per-leader-row match counters, zero-initialized. Allocated as
2379        // the u8-backed column layout so the array doubles as the
2380        // staging buffer's count column after the kernel fills it.
2381        let mut row_counts = self
2382            .memory()
2383            .alloc::<u8>(n_leader as usize * std::mem::size_of::<u32>())?;
2384        self.device()
2385            .inner()
2386            .memset_zeros(&mut row_counts)
2387            .map_err(|e| {
2388                XlogError::Kernel(format!("{}: zero row counts failed: {}", entry_label, e))
2389            })?;
2390
2391        let block_work_unit = crate::wcoj_metadata::WCOJ_HG_BLOCK_WORK_UNIT_DEFAULT;
2392        let grid = leader_work_total.div_ceil(block_work_unit);
2393        let mut rec = LaunchRecorder::new_strict(launch_stream);
2394        for buf in edges.iter() {
2395            rec.read(buf.num_rows_device());
2396            rec.read_column(buf.column(0).expect("validated"));
2397            rec.read_column(buf.column(1).expect("validated"));
2398        }
2399        rec.read(&d_edge_col0);
2400        rec.read(&d_edge_col1);
2401        rec.read(&d_edge_n);
2402        rec.read(&d_edge_order);
2403        rec.read(&d_iteration_order);
2404        rec.read(&leader_metadata.unique_keys);
2405        rec.read(&leader_metadata.fan_out);
2406        rec.read(&leader_metadata.prefix_sum);
2407        rec.write(&row_counts);
2408        rec.preflight(runtime)
2409            .map_err(|e| XlogError::Kernel(format!("{}: preflight failed: {}", entry_label, e)))?;
2410
2411        let kernel_name = match k {
2412            5 => wcoj_kernels::WCOJ_CLIQUE5_GROUPBY_ROOT_COUNT_HG_U32,
2413            _ => wcoj_kernels::WCOJ_CLIQUE6_GROUPBY_ROOT_COUNT_HG_U32,
2414        };
2415        let kernel = self
2416            .device()
2417            .inner()
2418            .get_func(WCOJ_MODULE, kernel_name)
2419            .ok_or_else(|| {
2420                XlogError::Kernel(format!(
2421                    "{}: kernel '{}' not found",
2422                    entry_label, kernel_name
2423                ))
2424            })?;
2425        // SAFETY: kernel signature
2426        //   wcoj_clique{K}_groupby_root_count_hg_u32(
2427        //     const u32* const* edge_col0,
2428        //     const u32* const* edge_col1,
2429        //     const u32* edge_n,
2430        //     u32 leader_edge_idx,
2431        //     const u8* edge_order,
2432        //     const u8* iteration_order,
2433        //     u32 leader_count,
2434        //     const u32* unique_keys,
2435        //     const u32* fan_out,
2436        //     const u32* prefix_sum,
2437        //     u32 metadata_key_count,
2438        //     u32 block_work_unit,
2439        //     u32* out_row_counts)
2440        // Pointers all device-resident; preflight verified cross-stream
2441        // tracking. Raw params are required because the
2442        // metadata-extended ABI exceeds the tuple-launch arity.
2443        unsafe {
2444            let mut params: Vec<*mut c_void> = vec![
2445                (&d_edge_col0).as_kernel_param(),
2446                (&d_edge_col1).as_kernel_param(),
2447                (&d_edge_n).as_kernel_param(),
2448                leader_edge_idx.as_kernel_param(),
2449                (&d_edge_order).as_kernel_param(),
2450                (&d_iteration_order).as_kernel_param(),
2451                n_leader.as_kernel_param(),
2452                (&leader_metadata.unique_keys).as_kernel_param(),
2453                (&leader_metadata.fan_out).as_kernel_param(),
2454                (&leader_metadata.prefix_sum).as_kernel_param(),
2455                leader_metadata.key_count.as_kernel_param(),
2456                block_work_unit.as_kernel_param(),
2457                (&row_counts).as_kernel_param(),
2458            ];
2459            kernel
2460                .clone()
2461                .launch_on_stream(
2462                    &cu_stream,
2463                    LaunchConfig {
2464                        grid_dim: (grid, 1, 1),
2465                        block_dim: (BLOCK_SIZE, 1, 1),
2466                        shared_mem_bytes: 0,
2467                    },
2468                    &mut params,
2469                )
2470                .map_err(|e| {
2471                    XlogError::Kernel(format!(
2472                        "{}: groupby-count launch failed: {}",
2473                        entry_label, e
2474                    ))
2475                })?;
2476        }
2477        rec.commit(runtime)
2478            .map_err(|e| XlogError::Kernel(format!("{}: commit failed: {}", entry_label, e)))?;
2479
2480        // Staging buffer (root, count) over the n_leader input rows:
2481        // root is a device-to-device copy of the oriented leader edge's
2482        // col0; the count column is the kernel-filled array. Rows stay
2483        // lex-sorted by root.
2484        let root_src = match leader.column(0).expect("validated") {
2485            CudaColumn::Owned(slice) => slice,
2486            _ => {
2487                return Err(XlogError::Kernel(format!(
2488                    "{}: leader.col0 must be an owned CudaColumn",
2489                    entry_label
2490                )))
2491            }
2492        };
2493        let root_copy = self
2494            .memory()
2495            .alloc::<u8>(n_leader as usize * std::mem::size_of::<u32>())?;
2496        // Explicit-length copy: layout-normalized columns are allocated at
2497        // capacity, which can exceed the logical n_leader * 4 bytes a
2498        // full-slice typed copy would assert on.
2499        unsafe {
2500            let res = sys::cuMemcpyDtoD_v2(
2501                *root_copy.device_ptr(),
2502                *root_src.device_ptr(),
2503                n_leader as usize * std::mem::size_of::<u32>(),
2504            );
2505            if res != sys::cudaError_enum::CUDA_SUCCESS {
2506                return Err(XlogError::Kernel(format!(
2507                    "{}: copy root column failed: {:?}",
2508                    entry_label, res
2509                )));
2510            }
2511        }
2512        let mut d_num_rows = self.memory().alloc::<u32>(1)?;
2513        self.device()
2514            .inner()
2515            .dtod_copy(leader.num_rows_device(), &mut d_num_rows)
2516            .map_err(|e| {
2517                XlogError::Kernel(format!("{}: copy row count failed: {}", entry_label, e))
2518            })?;
2519        let staging_schema = Schema::new(vec![
2520            ("root".to_string(), w_type),
2521            ("count".to_string(), ScalarType::U32),
2522        ]);
2523        let staging = CudaBuffer::from_columns_with_host_count(
2524            vec![root_copy.into(), row_counts.into()],
2525            n_leader as u64,
2526            d_num_rows,
2527            staging_schema,
2528            n_leader,
2529        );
2530
2531        // Keep only roots with at least one completed clique, then
2532        // reduce per root. Both steps run over input-sized data.
2533        let mask = self.compare_const_mask_recorded::<u32>(
2534            &staging,
2535            1,
2536            0u32,
2537            crate::CompareOp::Gt,
2538            launch_stream,
2539        )?;
2540        let compacted =
2541            self.compact_buffer_by_device_mask_counted_recorded(&staging, &mask, launch_stream)?;
2542        self.groupby_multi_agg_recorded(
2543            &compacted,
2544            &[0],
2545            &[(1, xlog_core::AggOp::Sum)],
2546            launch_stream,
2547        )
2548    }
2549
2550    /// Fused 5-clique count-by-root at the 4-byte width-class,
2551    /// using plan-derived launch params. See
2552    /// [`Self::wcoj_clique_groupby_root_count_recorded_inner`].
2553    pub fn wcoj_clique5_groupby_root_count_u32_recorded_planned(
2554        &self,
2555        edges: &[&CudaBuffer; 10],
2556        leader_edge_idx: u32,
2557        edge_order: &[u8],
2558        iteration_order: &[u8],
2559        launch_stream: StreamId,
2560    ) -> Result<CudaBuffer> {
2561        self.wcoj_clique_groupby_root_count_recorded_inner(
2562            5,
2563            edges,
2564            leader_edge_idx,
2565            edge_order,
2566            iteration_order,
2567            launch_stream,
2568            "wcoj_clique5_groupby_root_count_u32_recorded_planned",
2569        )
2570    }
2571
2572    /// Fused 6-clique count-by-root at the 4-byte width-class,
2573    /// using plan-derived launch params. See
2574    /// [`Self::wcoj_clique_groupby_root_count_recorded_inner`].
2575    pub fn wcoj_clique6_groupby_root_count_u32_recorded_planned(
2576        &self,
2577        edges: &[&CudaBuffer; 15],
2578        leader_edge_idx: u32,
2579        edge_order: &[u8],
2580        iteration_order: &[u8],
2581        launch_stream: StreamId,
2582    ) -> Result<CudaBuffer> {
2583        self.wcoj_clique_groupby_root_count_recorded_inner(
2584            6,
2585            edges,
2586            leader_edge_idx,
2587            edge_order,
2588            iteration_order,
2589            launch_stream,
2590            "wcoj_clique6_groupby_root_count_u32_recorded_planned",
2591        )
2592    }
2593}
2594
2595fn validate_clique_u8_permutation(
2596    values: &[u8],
2597    len: usize,
2598    label: &str,
2599    entry_label: &str,
2600) -> Result<()> {
2601    if values.len() != len {
2602        return Err(XlogError::Kernel(format!(
2603            "{}: {} length {} must equal {}",
2604            entry_label,
2605            label,
2606            values.len(),
2607            len
2608        )));
2609    }
2610    let mut seen = vec![false; len];
2611    for &value in values {
2612        let idx = usize::from(value);
2613        if idx >= len {
2614            return Err(XlogError::Kernel(format!(
2615                "{}: {} value {} out of range 0..{}",
2616                entry_label, label, value, len
2617            )));
2618        }
2619        if seen[idx] {
2620            return Err(XlogError::Kernel(format!(
2621                "{}: {} duplicates value {}",
2622                entry_label, label, value
2623            )));
2624        }
2625        seen[idx] = true;
2626    }
2627    Ok(())
2628}