Skip to main content

xlog_runtime/executor/
wcoj_dispatch.rs

1//! WCOJ triangle dispatch runtime hook.
2//!
3//! Wires the GPU WCOJ kernels into the executor's per-rule loop.
4//! Production callers leave `RuntimeConfig::default()` and use
5//! the stats-backed dispatch model.
6//!
7//! Override knobs (config + env, highest precedence first):
8//!
9//!   1. **Force-WCOJ** — `wcoj_triangle_dispatch=Some(true)` /
10//!      [`ENV_USE_WCOJ_TRIANGLE_U32`]. Bypasses stats decision.
11//!   2. **Explicit force-off** —
12//!      `wcoj_triangle_dispatch=Some(false)`. Used by bench
13//!      `Mode::Off` cells and any test that wants binary-join.
14//!   3. **Default**: stats-backed dispatch model.
15//!
16//! ## Recognized RIR Shape
17//!
18//! The hook now consumes [`RirNode::MultiWayJoin`], produced by
19//! [`xlog_logic::promote::promote_multiway`] after the optimizer
20//! pass in [`xlog_logic::Compiler::compile_program_with_stats_snapshot`].
21//! The promoter rewrites the canonical lowered+optimized triangle
22//! tree to a `MultiWayJoin` whose structure encodes the same
23//! semantic invariants as the earlier strict tree-pattern matcher:
24//!
25//! * `inputs` is a 3-element vec of `Scan` nodes in WCOJ slot
26//!   order `[xy, yz, xz]`.
27//! * `slot_vars` is exactly `[[Some(0), Some(1)], [Some(1), Some(2)],
28//!   [Some(0), Some(2)]]` — variable-class ids for X, Y, Z.
29//! * `output_columns` is exactly
30//!   `[Column(0), Column(1), Column(3)]` (matching the certified
31//!   GPU kernel's (X, Y, Z) emit order).
32//! * `fallback` is the post-optimizer binary-join tree, executed
33//!   verbatim when this hook declines.
34//!
35//! Anything else (rotated/computed projection, non-canonical
36//! slot_vars, non-Scan inputs, recursive SCC, missing input
37//! buffers, unsupported scalar types, mixed-width slots, or no
38//! runtime-backed memory manager) returns `Ok(None)` and the
39//! caller takes the embedded `fallback` path.
40//!
41//! Width branching: 4-byte (U32 / Symbol) inputs go to
42//! `wcoj_layout_u32_recorded` + `wcoj_triangle_hg_u32_recorded`;
43//! 8-byte (U64) inputs go to the `_u64_recorded` siblings. All
44//! three slots must share a width.
45//!
46//! ## Failure handling
47//!
48//! Per dispatch contract: "failure in helper must not corrupt store
49//! state." If the WCOJ pipeline (layout construction or kernel
50//! launch) returns an error, the hook converts it to `Ok(None)`
51//! and the caller falls back to the existing path. The store is
52//! never partially mutated; the dispatch hook only writes when the
53//! full pipeline succeeds, and the writeback is the caller's
54//! responsibility.
55//!
56//! ## Hook surface
57//!
58//! The dispatcher exposes two entry points per shape:
59//!
60//! * `try_dispatch_wcoj_*(rule)` — keyed on `&CompiledRule`,
61//!   used by the non-recursive arm in `execute_stratum_impl`.
62//! * `try_dispatch_wcoj_*_on_body(body)` — keyed on `&RirNode`,
63//!   used by the recursive arm via
64//!   `Executor::execute_wcoj_or_fallback_node` on both seeding
65//!   and per-variant evaluation. The promoter gates recursive bodies
66//!   by per-rule recursive-scan count: a single recursive scan can
67//!   promote, while two or more stay on the binary-join path.
68//!
69//! ## Out of Scope
70//!
71//! * Additional cost-model expansion.
72//! * Mixed-width admission (a triangle with both U32 and U64
73//!   slots stays on the binary-join path).
74//! * Multi-recursive WCOJ with two or more in-SCC body scans.
75
76use std::collections::HashSet;
77
78use xlog_core::{RelId, Result, ScalarType, Schema};
79use xlog_cuda::device_runtime::StreamId;
80use xlog_cuda::provider::NESTED_LOOP_TOTAL_THRESHOLD;
81use xlog_cuda::wcoj_metadata::{Wcoj4CycleRootAggValue, WcojRootAggValue};
82use xlog_cuda::CudaBuffer;
83use xlog_cuda::JoinType as CudaJoinType;
84use xlog_ir::{
85    rir::{KCliqueVariableOrder, MultiwayPlan, ProjectExpr, VariableOrder},
86    CompiledRule, RirNode,
87};
88
89use super::Executor;
90
91#[cfg(feature = "wcoj-phase-timing")]
92use std::time::Instant;
93
94/// Env variable controlling the WCOJ triangle dispatch. Treated
95/// as ON when set to `"1"` or case-insensitive `"true"`; anything
96/// else (unset, `"0"`, `"false"`, empty string, …) means OFF.
97pub const ENV_USE_WCOJ_TRIANGLE_U32: &str = "XLOG_USE_WCOJ_TRIANGLE_U32";
98
99/// Resolve the dispatch gate. Config override (set by tests)
100/// takes precedence over the env var. Production callers leave
101/// the override as `None` and configure via env.
102pub(super) fn wcoj_gate_enabled(config_override: Option<bool>) -> bool {
103    if let Some(v) = config_override {
104        return v;
105    }
106    std::env::var(ENV_USE_WCOJ_TRIANGLE_U32)
107        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
108        .unwrap_or(false)
109}
110
111pub const ENV_WCOJ_BLOCK_WORK_UNIT: &str = "XLOG_WCOJ_BLOCK_WORK_UNIT";
112pub(super) const WCOJ_BLOCK_WORK_UNIT_DEFAULT: u32 = 1024;
113pub(super) const WCOJ_BLOCK_WORK_UNIT_MAX: u32 = 8192;
114
115pub(super) fn wcoj_block_work_unit() -> u32 {
116    match std::env::var(ENV_WCOJ_BLOCK_WORK_UNIT) {
117        Ok(raw) => match raw.trim().parse::<u32>() {
118            Ok(v @ 1..=WCOJ_BLOCK_WORK_UNIT_MAX) => v,
119            Ok(v) => {
120                eprintln!(
121                    "warning: {ENV_WCOJ_BLOCK_WORK_UNIT}={v} is outside 1..={WCOJ_BLOCK_WORK_UNIT_MAX}; \
122                     using {WCOJ_BLOCK_WORK_UNIT_DEFAULT}"
123                );
124                WCOJ_BLOCK_WORK_UNIT_DEFAULT
125            }
126            Err(_) => {
127                eprintln!(
128                    "warning: {ENV_WCOJ_BLOCK_WORK_UNIT}={raw:?} is not a u32; \
129                     using {WCOJ_BLOCK_WORK_UNIT_DEFAULT}"
130                );
131                WCOJ_BLOCK_WORK_UNIT_DEFAULT
132            }
133        },
134        Err(_) => WCOJ_BLOCK_WORK_UNIT_DEFAULT,
135    }
136}
137
138pub(super) fn wcoj_adaptive_enabled(config_override: Option<bool>) -> bool {
139    config_override.unwrap_or(true)
140}
141
142/// Kill switch for the aggregate-fused group-by-root count dispatch.
143/// Default ON (fusion enabled); set to `1`/`true` to force every
144/// GroupBy-over-triangle through the materialize+groupby path.
145pub const ENV_DISABLE_WCOJ_GROUPBY_FUSION: &str = "XLOG_DISABLE_WCOJ_GROUPBY_FUSION";
146
147pub(super) fn wcoj_groupby_fusion_disabled() -> bool {
148    std::env::var(ENV_DISABLE_WCOJ_GROUPBY_FUSION)
149        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
150        .unwrap_or(false)
151}
152
153/// Kill switch for the generalized Free Join dispatch. Default ON
154/// (dispatch enabled); set to `1`/`true` to force every general
155/// multiway body through the embedded binary fallback.
156pub const ENV_DISABLE_FREE_JOIN: &str = "XLOG_DISABLE_FREE_JOIN";
157
158pub(super) fn free_join_disabled() -> bool {
159    std::env::var(ENV_DISABLE_FREE_JOIN)
160        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
161        .unwrap_or(false)
162}
163
164/// Kill switch for the factorized recursive-delta dispatch. Default
165/// ON (dispatch enabled); set to `1`/`true` to force every recursive
166/// delta step through the legacy hash-join -> diff path.
167pub const ENV_DISABLE_FACTORIZED_DELTA: &str = "XLOG_DISABLE_FACTORIZED_DELTA";
168
169pub(super) fn factorized_delta_disabled() -> bool {
170    std::env::var(ENV_DISABLE_FACTORIZED_DELTA)
171        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
172        .unwrap_or(false)
173}
174
175/// Default dense-domain dispatch cap (bitmap 32 MiB + counts 128 MiB).
176/// `XLOG_FACTORIZED_DELTA_MAX_DOMAIN` raises it up to the provider hard
177/// bound `FJ_DELTA_MAX_DOMAIN` (2^16).
178const FACTORIZED_DELTA_DEFAULT_MAX_DOMAIN: u32 = 1 << 14;
179
180fn factorized_delta_max_domain() -> u32 {
181    std::env::var("XLOG_FACTORIZED_DELTA_MAX_DOMAIN")
182        .ok()
183        .and_then(|v| v.parse::<u32>().ok())
184        .unwrap_or(FACTORIZED_DELTA_DEFAULT_MAX_DOMAIN)
185        .min(xlog_cuda::provider::FJ_DELTA_MAX_DOMAIN)
186}
187
188/// Byte ceiling for the sparse route's conservative hash table; over it
189/// the sparse entry declines to the legacy path. Defaults to half the
190/// device budget; `XLOG_FACTORIZED_DELTA_MAX_TABLE_BYTES` overrides it
191/// (tuning + tests forcing the decline boundary).
192fn factorized_delta_max_table_bytes(budget_bytes: u64) -> u64 {
193    std::env::var("XLOG_FACTORIZED_DELTA_MAX_TABLE_BYTES")
194        .ok()
195        .and_then(|v| v.parse::<u64>().ok())
196        .unwrap_or(budget_bytes / 2)
197}
198
199/// Per-iteration work-floor divisor: dispatch only when the estimated
200/// candidate work is at least `n_words / divisor`, protecting sparse /
201/// long-chain fixpoints from the bitmap popcount+scan floor.
202fn factorized_delta_work_divisor() -> u64 {
203    std::env::var("XLOG_FACTORIZED_DELTA_WORK_DIVISOR")
204        .ok()
205        .and_then(|v| v.parse::<u64>().ok())
206        .filter(|&v| v >= 1)
207        .unwrap_or(8)
208}
209
210/// Per-fixpoint dispatch context for the factorized recursive delta.
211/// Owned by one `execute_recursive_scc` call: caches the dense-domain
212/// bound per (head, static rel) — `None` records a for-this-fixpoint
213/// decline — and layout-normalized static buffers for non-recursive
214/// (EDB) static sides.
215#[derive(Default)]
216pub(super) struct FactorizedDeltaCtx {
217    domain_by_key: std::collections::HashMap<(String, RelId), Option<u32>>,
218    static_norm_cache: std::collections::HashMap<(RelId, usize), CudaBuffer>,
219}
220
221/// Diagnostics gate for WCOJ pipeline errors. By default a layout/kernel
222/// error declines to the binary-join fallback (the store is never partially
223/// mutated) but is **counted** (`Executor::wcoj_error_decline_count`) and
224/// logged to stderr, so a regressed kernel cannot silently disappear from
225/// production dispatch behind the silent-fallback contract. Set
226/// `XLOG_WCOJ_STRICT=1` to propagate the error instead (diagnostic mode).
227pub const ENV_WCOJ_STRICT: &str = "XLOG_WCOJ_STRICT";
228
229pub(super) fn wcoj_strict_errors_enabled() -> bool {
230    std::env::var(ENV_WCOJ_STRICT)
231        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
232        .unwrap_or(false)
233}
234
235/// Convert a WCOJ pipeline error into a counted, logged decline
236/// (`Ok(None)` — caller falls back to the binary-join path), or propagate
237/// it when [`ENV_WCOJ_STRICT`] is set. Structural declines (gate off,
238/// shape mismatch, missing buffer) stay silent and do NOT go through here;
239/// this seam is only for real layout/kernel failures.
240pub(super) fn wcoj_decline_on_error(
241    counter: &mut u64,
242    stage: &str,
243    err: xlog_core::XlogError,
244) -> Result<Option<CudaBuffer>> {
245    *counter += 1;
246    if wcoj_strict_errors_enabled() {
247        return Err(err);
248    }
249    eprintln!("warning: WCOJ {stage} pipeline error; declining to binary-join fallback: {err}");
250    Ok(None)
251}
252
253/// Chain dispatcher gate. Default ON after profiler traces showed
254/// chain-shaped rules dominated evaluation time; `XLOG_WCOJ_CHAIN_ENABLE=0`
255/// or `false` disables the route for A/B measurements.
256pub const ENV_WCOJ_CHAIN_ENABLE: &str = "XLOG_WCOJ_CHAIN_ENABLE";
257
258pub(super) fn chain_dispatch_enabled() -> bool {
259    std::env::var(ENV_WCOJ_CHAIN_ENABLE)
260        .map(|v| !(v == "0" || v.eq_ignore_ascii_case("false")))
261        .unwrap_or(true)
262}
263
264// -----------------------------------------------------------------
265// 4-cycle dispatch gates.
266//
267// Width-neutral env naming: `XLOG_USE_WCOJ_4CYCLE` controls the
268// force gate across u32 / u64 / Symbol. Triangle's `_U32` suffix is
269// historical debt; we do NOT propagate that pattern to 4-cycle.
270//
271// Adaptive resolution differs from triangle: 4-cycle is **opt-in by
272// default**. Unset env + `None` config → `false`. Default-on is
273// gated on bench evidence and lives in a separate follow-up decision.
274// -----------------------------------------------------------------
275
276/// Force-gate env. `"1"` / case-insensitive `"true"` → ON.
277pub const ENV_USE_WCOJ_4CYCLE: &str = "XLOG_USE_WCOJ_4CYCLE";
278
279/// Adaptive opt-in env. Default off for explicit-only dispatch.
280pub const ENV_USE_WCOJ_4CYCLE_ADAPTIVE: &str = "XLOG_USE_WCOJ_4CYCLE_ADAPTIVE";
281
282/// Kill switch env.
283pub const ENV_DISABLE_WCOJ_4CYCLE: &str = "XLOG_DISABLE_WCOJ_4CYCLE";
284
285/// Resolve the 4-cycle force gate (config override > env > false).
286pub(super) fn wcoj_4cycle_gate_enabled(config_override: Option<bool>) -> bool {
287    if let Some(v) = config_override {
288        return v;
289    }
290    std::env::var(ENV_USE_WCOJ_4CYCLE)
291        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
292        .unwrap_or(false)
293}
294
295/// Resolve the 4-cycle adaptive opt-in. Precedence:
296///   * `config_override = Some(b)` → `b`.
297///   * `XLOG_USE_WCOJ_4CYCLE_ADAPTIVE=1` → `true`.
298///   * Anything else (including unset) → `false`.
299///
300/// **Differs from triangle**: triangle defaults adaptive to `true`
301/// when env is unset (default-on flip after baseline evidence).
302/// 4-cycle defaults to `false` until its own baseline evidence
303/// supports a default-on flip in a follow-up slice.
304pub(super) fn wcoj_4cycle_adaptive_enabled(config_override: Option<bool>) -> bool {
305    if let Some(v) = config_override {
306        return v;
307    }
308    std::env::var(ENV_USE_WCOJ_4CYCLE_ADAPTIVE)
309        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
310        .unwrap_or(false)
311}
312
313/// Resolve the 4-cycle kill switch (config > env > false).
314pub(super) fn wcoj_4cycle_disabled(config_override: Option<bool>) -> bool {
315    if let Some(v) = config_override {
316        return v;
317    }
318    std::env::var(ENV_DISABLE_WCOJ_4CYCLE)
319        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
320        .unwrap_or(false)
321}
322
323/// Resolved dispatch mode after consulting both gates.
324#[derive(Clone, Copy, Debug, PartialEq, Eq)]
325enum DispatchMode {
326    Force,
327    CostModel,
328}
329
330/// Two rel IDs and key positions extracted from a matched chain RIR.
331/// Inputs are in the promoter's left/right order.
332pub(super) struct ChainRirMatch {
333    pub rel_left: RelId,
334    pub rel_right: RelId,
335    pub left_key: usize,
336    pub right_key: usize,
337    pub output_columns: Vec<ProjectExpr>,
338}
339
340/// ChainJoin production matcher. The chain shape is encoded as a
341/// first-class `ChainJoin`; malformed non-scan inputs decline dispatch
342/// and execute the captured fallback.
343pub(super) fn match_chain_join(body: &RirNode) -> Option<ChainRirMatch> {
344    let RirNode::ChainJoin {
345        left,
346        right,
347        left_key,
348        right_key,
349        output_columns,
350        ..
351    } = body
352    else {
353        return None;
354    };
355    if *left_key >= 2 || *right_key >= 2 {
356        return None;
357    }
358    let rel_left = scan_rel(left)?;
359    let rel_right = scan_rel(right)?;
360    Some(ChainRirMatch {
361        rel_left,
362        rel_right,
363        left_key: *left_key,
364        right_key: *right_key,
365        output_columns: output_columns.clone(),
366    })
367}
368
369/// Three rel IDs extracted from a matched triangle RIR. The
370/// names correspond to the WCOJ kernel's slot semantics.
371pub(super) struct TriangleRirMatch {
372    /// Rel for the (X, Y) edge — left subtree of the inner join,
373    /// joined with `rel_yz` on Y.
374    pub rel_xy: RelId,
375    /// Rel for the (Y, Z) edge — right subtree of the inner join.
376    pub rel_yz: RelId,
377    /// Rel for the (X, Z) closing edge — right subtree of the
378    /// outer join, joined with the inner join's output on (X, Z).
379    pub rel_xz: RelId,
380}
381
382/// Pattern-match a `RirNode::MultiWayJoin` whose structure is the
383/// canonical triangle shape. Returns the three scan rel IDs in
384/// WCOJ slot order on a successful match; `None` for any deviation.
385///
386/// The match is intentionally strict over `inputs`, `slot_vars`,
387/// AND `output_columns`. The current triangle matcher certifies the
388/// canonical (X, Y, Z) emit order; rotated head projections,
389/// non-Scan inputs, or non-canonical variable classes decline
390/// dispatch and the caller takes the embedded `fallback` path.
391///
392/// Future matcher work must generalize in tandem with kernel
393/// generalization (4-way, n-way) — never one without the other.
394pub(super) fn match_multiway_triangle(body: &RirNode) -> Option<TriangleRirMatch> {
395    let RirNode::MultiWayJoin {
396        inputs,
397        slot_vars,
398        output_columns,
399        ..
400    } = body
401    else {
402        return None;
403    };
404    if inputs.len() != 3 {
405        return None;
406    }
407    if !slot_vars_match_canonical_triangle(slot_vars) {
408        return None;
409    }
410    if !output_columns_match_canonical_triangle(output_columns) {
411        return None;
412    }
413    let rel_xy = scan_rel(&inputs[0])?;
414    let rel_yz = scan_rel(&inputs[1])?;
415    let rel_xz = scan_rel(&inputs[2])?;
416    Some(TriangleRirMatch {
417        rel_xy,
418        rel_yz,
419        rel_xz,
420    })
421}
422
423/// Confirm `slot_vars` is the canonical
424/// `[[A, B], [B, C], [A, C]]` triangle shape with three distinct
425/// variable-class ids. Anything else (rotated, dropped, repeated)
426/// fails the match.
427fn slot_vars_match_canonical_triangle(slot_vars: &[Vec<Option<u32>>]) -> bool {
428    if slot_vars.len() != 3 {
429        return false;
430    }
431    let s0 = &slot_vars[0];
432    let s1 = &slot_vars[1];
433    let s2 = &slot_vars[2];
434    if s0.len() != 2 || s1.len() != 2 || s2.len() != 2 {
435        return false;
436    }
437    let (a, b) = match (s0[0], s0[1]) {
438        (Some(a), Some(b)) if a != b => (a, b),
439        _ => return false,
440    };
441    let c = match (s1[0], s1[1]) {
442        (Some(b1), Some(c)) if b1 == b && c != a && c != b => c,
443        _ => return false,
444    };
445    matches!((s2[0], s2[1]), (Some(a2), Some(c2)) if a2 == a && c2 == c)
446}
447
448/// Confirm `output_columns` is one of the valid head-extraction
449/// layouts. The GPU kernel writes triples in canonical
450/// `(X, Y, Z)` order; the project columns describe the
451/// binary-join-intermediate layout the head extracts from.
452///
453/// Accepted triangle output-column layouts:
454///   * `[Column(0), Column(1), Column(3)]` — Y-shared /
455///     X-shared inner pair (binary intermediate cols
456///     [X, Y, Y, Z, X, Z] / [X, Y, X, Z, Y, Z]).
457///   * `[Column(0), Column(2), Column(3)]` — Z-shared inner
458///     pair (binary intermediate cols [X, Z, Y, Z, X, Y]).
459fn output_columns_match_canonical_triangle(cols: &[ProjectExpr]) -> bool {
460    if cols.len() != 3 {
461        return false;
462    }
463    let cols_pattern = (
464        matches!(cols[0], ProjectExpr::Column(0)),
465        matches!(cols[1], ProjectExpr::Column(1)) || matches!(cols[1], ProjectExpr::Column(2)),
466        matches!(cols[2], ProjectExpr::Column(3)),
467    );
468    cols_pattern == (true, true, true)
469}
470
471// -----------------------------------------------------------------
472// 4-cycle matcher.
473//
474// Mirrors the triangle matcher with a shape-locked qualifier.
475// -----------------------------------------------------------------
476
477/// Four rel IDs extracted from a matched 4-cycle RIR.
478pub(super) struct FourCycleRirMatch {
479    pub rel_e1: RelId,
480    pub rel_e2: RelId,
481    pub rel_e3: RelId,
482    pub rel_e4: RelId,
483}
484
485/// Pattern-match a `RirNode::MultiWayJoin` whose structure is the
486/// canonical 4-cycle shape. Returns the four scan rel IDs in WCOJ
487/// slot order on a successful match; `None` for any deviation.
488///
489/// The match is intentionally strict over `inputs`, `slot_vars`,
490/// AND `output_columns`. The current 4-cycle matcher certifies the
491/// canonical (W, X, Y, Z) emit order.
492pub(super) fn match_multiway_4cycle(body: &RirNode) -> Option<FourCycleRirMatch> {
493    let RirNode::MultiWayJoin {
494        inputs,
495        slot_vars,
496        output_columns,
497        ..
498    } = body
499    else {
500        return None;
501    };
502    if inputs.len() != 4 {
503        return None;
504    }
505    if !slot_vars_match_canonical_4cycle(slot_vars) {
506        return None;
507    }
508    if !output_columns_match_canonical_4cycle(output_columns) {
509        return None;
510    }
511    let rel_e1 = scan_rel(&inputs[0])?;
512    let rel_e2 = scan_rel(&inputs[1])?;
513    let rel_e3 = scan_rel(&inputs[2])?;
514    let rel_e4 = scan_rel(&inputs[3])?;
515    Some(FourCycleRirMatch {
516        rel_e1,
517        rel_e2,
518        rel_e3,
519        rel_e4,
520    })
521}
522
523/// Confirm `slot_vars` is the canonical
524/// `[[A, B], [B, C], [C, D], [D, A]]` 4-cycle shape with four
525/// distinct variable-class ids closing the cycle (slot 3's second
526/// var equals slot 0's first var).
527fn slot_vars_match_canonical_4cycle(slot_vars: &[Vec<Option<u32>>]) -> bool {
528    if slot_vars.len() != 4 {
529        return false;
530    }
531    for s in slot_vars {
532        if s.len() != 2 {
533            return false;
534        }
535    }
536    let (a, b) = match (slot_vars[0][0], slot_vars[0][1]) {
537        (Some(a), Some(b)) if a != b => (a, b),
538        _ => return false,
539    };
540    let c = match (slot_vars[1][0], slot_vars[1][1]) {
541        (Some(b1), Some(c)) if b1 == b && c != a && c != b => c,
542        _ => return false,
543    };
544    let d = match (slot_vars[2][0], slot_vars[2][1]) {
545        (Some(c1), Some(d)) if c1 == c && d != a && d != b && d != c => d,
546        _ => return false,
547    };
548    matches!(
549        (slot_vars[3][0], slot_vars[3][1]),
550        (Some(d2), Some(a2)) if d2 == d && a2 == a
551    )
552}
553
554/// Confirm `output_columns` is the certified `(W, X, Y, Z)` emit
555/// order. The GPU kernel writes quads in this order.
556/// Accepted 4-cycle output-column layouts:
557///   * `[Column(0), Column(1), Column(3), Column(5)]` —
558///     Default grouping `(WX⋈XY) + (YZ⋈ZW)`.
559///   * `[Column(5), Column(0), Column(1), Column(3)]` — Alt
560///     grouping `(XY⋈YZ) + (ZW⋈WX)` (binary intermediate
561///     col 5 = W from inner-right; (W, X, Y, Z) extracts
562///     from cols [5, 0, 1, 3]).
563fn output_columns_match_canonical_4cycle(cols: &[ProjectExpr]) -> bool {
564    if cols.len() != 4 {
565        return false;
566    }
567    let exact = |idx: usize, want: usize| matches!(cols[idx], ProjectExpr::Column(c) if c == want);
568    // Default layout.
569    let default_layout = exact(0, 0) && exact(1, 1) && exact(2, 3) && exact(3, 5);
570    // Alt layout.
571    let alt_layout = exact(0, 5) && exact(1, 0) && exact(2, 1) && exact(3, 3);
572    default_layout || alt_layout
573}
574
575/// Extract the `RelId` from a leaf `Scan` node, or `None` for
576/// any non-Scan child. The current matcher only admits Scan leaves;
577/// future matcher work may admit `Filter { Scan }` or projected
578/// scans, but always in tandem with kernel support.
579fn scan_rel(node: &RirNode) -> Option<RelId> {
580    match node {
581        RirNode::Scan { rel } => Some(*rel),
582        _ => None,
583    }
584}
585
586/// Physical key width for a WCOJ-eligible binary relation at
587/// the RIR-level dispatch. `FourByte` covers `U32` and `Symbol`
588/// (bit-identical layout); `EightByte` covers `U64`.
589#[derive(Debug, Clone, Copy, PartialEq, Eq)]
590pub(super) enum WcojKeyWidth {
591    FourByte,
592    EightByte,
593}
594
595/// Classify a binary [`CudaBuffer`]'s key width for WCOJ
596/// dispatch, mirroring `xlog_integration::wcoj_dispatch`'s
597/// AST-level helper. Returns `Some(width)` for 2-column buffers
598/// whose columns are both 4-byte (U32/Symbol) or both 8-byte
599/// (U64); `None` for any other arity / type combination,
600/// including mixed-width within a single buffer.
601///
602/// Cross-relation type compatibility is enforced upstream by
603/// the planner via `analyze_typed`. The executor only sees
604/// lowered RIR at this point, so this classifier is the last
605/// width-uniformity check before the GPU launch — any
606/// divergence vs. the binary-join path is caught by the
607/// wiring/cert row-set-equality tests.
608fn classify_two_col_wcoj_width(buf: &CudaBuffer) -> Option<WcojKeyWidth> {
609    if buf.arity() != 2 {
610        return None;
611    }
612    let c0 = buf.schema.column_type(0)?;
613    let c1 = buf.schema.column_type(1)?;
614    let w0 = scalar_wcoj_width(c0)?;
615    let w1 = scalar_wcoj_width(c1)?;
616    if w0 != w1 {
617        return None;
618    }
619    Some(w0)
620}
621
622fn scalar_wcoj_width(ty: xlog_core::ScalarType) -> Option<WcojKeyWidth> {
623    match ty {
624        xlog_core::ScalarType::U32 | xlog_core::ScalarType::Symbol => Some(WcojKeyWidth::FourByte),
625        xlog_core::ScalarType::U64 => Some(WcojKeyWidth::EightByte),
626        _ => None,
627    }
628}
629
630/// Convert `kernel_output_cols` (a `Vec<ProjectExpr>`) into
631/// the `Vec<usize>` permutation that
632/// `wcoj_project_output_columns_recorded` consumes. Triangle and
633/// 4-cycle kernel_output_cols entries are always
634/// `ProjectExpr::Column(_)` per the locked permutation tables in
635/// `xlog_logic::wcoj_var_ordering`; anything else is a planner bug.
636/// Derive the slot-0 joined-with slot-1 feedback pair and the
637/// underlying-relation key columns from `var_order`.
638///
639/// Returns `(rel_a, rel_b, left_keys, right_keys)` where keys
640/// are NATIVE (pre-swap) column indices on the underlying
641/// relations — `record_join_result` stores keys against native
642/// indexing. For triangle non-default leaders, slot 1 is a
643/// 2-col SWAPPED view of the underlying relation; the kernel
644/// invariant `slot0.col1 ≡ slot1.col0` holds for the views
645/// but maps to native key index 1 on BOTH sides.
646///
647/// **Rotated-feedback table**:
648///
649/// | Shape    | Leader            | (rel_a, rel_b)      | (left_keys, right_keys) |
650/// |----------|-------------------|---------------------|-------------------------|
651/// | Triangle | 0 (e_xy default)  | (slot[0], slot[1])  | [1] / [0] (no swap) |
652/// | Triangle | 1 (e_yz)          | (slot[1], slot[2])  | **[1] / [1]** (slot 1 = e_xz↔) |
653/// | Triangle | 2 (e_xz)          | (slot[2], slot[1])  | **[1] / [1]** (slot 1 = e_yz↔) |
654/// | 4-cycle  | 0..3 (rotation)   | (slot[i], slot[i+1])| [1] / [0] (no swap) |
655///
656/// Returns `None` only if `slot_rels.len() < 2` (defensive).
657fn feedback_pair_from_var_order(
658    slot_rels: &[RelId],
659    var_order: Option<&VariableOrder>,
660) -> Option<(RelId, RelId, Vec<usize>, Vec<usize>)> {
661    if slot_rels.len() < 2 {
662        return None;
663    }
664    let Some(vo) = var_order else {
665        // Default config / no rotation: canonical feedback
666        // behavior: canonical (slot_rels[0], slot_rels[1]) with
667        // keys [1] / [0].
668        return Some((slot_rels[0], slot_rels[1], vec![1], vec![0]));
669    };
670    let leader_idx = vo.leader_idx as usize;
671    match slot_rels.len() {
672        3 => {
673            // Triangle rotated-feedback table.
674            match leader_idx {
675                0 => Some((slot_rels[0], slot_rels[1], vec![1], vec![0])),
676                1 => {
677                    // Leader e_yz: slot 0 = rel_yz native, slot 1 =
678                    // rel_xz **swapped** view. Native rel_xz has Z
679                    // at col1, so [1]/[1].
680                    Some((slot_rels[1], slot_rels[2], vec![1], vec![1]))
681                }
682                2 => {
683                    // Leader e_xz: slot 0 = rel_xz native, slot 1 =
684                    // rel_yz **swapped** view. Native rel_yz has Z
685                    // at col1, so [1]/[1].
686                    Some((slot_rels[2], slot_rels[1], vec![1], vec![1]))
687                }
688                _ => None,
689            }
690        }
691        4 => {
692            // 4-cycle: rotation-only, all slots in native layout,
693            // keys [1]/[0] for every leader.
694            if leader_idx >= 4 {
695                return None;
696            }
697            let slot1_input_idx = (leader_idx + 1) % 4;
698            Some((
699                slot_rels[leader_idx],
700                slot_rels[slot1_input_idx],
701                vec![1],
702                vec![0],
703            ))
704        }
705        _ => None,
706    }
707}
708
709fn perm_indices_from_kernel_output_cols(cols: &[ProjectExpr]) -> Result<Vec<usize>> {
710    let mut out = Vec::with_capacity(cols.len());
711    for c in cols {
712        match c {
713            ProjectExpr::Column(idx) => out.push(*idx),
714            other => {
715                return Err(xlog_core::XlogError::Kernel(format!(
716                    "perm_indices_from_kernel_output_cols: \
717                     kernel_output_cols must be ProjectExpr::Column(_), got {:?}",
718                    other
719                )));
720            }
721        }
722    }
723    Ok(out)
724}
725
726/// Build the canonical triangle head schema `(X, Y, Z)`
727/// from the canonical promoter inputs. Used as the
728/// `head_schema` argument to
729/// `wcoj_project_output_columns_recorded` on the leader-ordered path.
730fn build_triangle_head_schema(buf_xy: &CudaBuffer, buf_yz: &CudaBuffer) -> Result<Schema> {
731    let x_type = buf_xy.schema.column_type(0).ok_or_else(|| {
732        xlog_core::XlogError::Kernel("build_triangle_head_schema: e_xy.col0 type missing".into())
733    })?;
734    let y_type = buf_xy.schema.column_type(1).ok_or_else(|| {
735        xlog_core::XlogError::Kernel("build_triangle_head_schema: e_xy.col1 type missing".into())
736    })?;
737    let z_type = buf_yz.schema.column_type(1).ok_or_else(|| {
738        xlog_core::XlogError::Kernel("build_triangle_head_schema: e_yz.col1 type missing".into())
739    })?;
740    Schema::new(vec![
741        ("col0".to_string(), x_type),
742        ("col1".to_string(), y_type),
743        ("col2".to_string(), z_type),
744    ])
745    .with_sort_labels(vec![
746        buf_xy
747            .schema
748            .column_sort_label(0)
749            .unwrap_or("col0")
750            .to_string(),
751        buf_xy
752            .schema
753            .column_sort_label(1)
754            .unwrap_or("col1")
755            .to_string(),
756        buf_yz
757            .schema
758            .column_sort_label(1)
759            .unwrap_or("col2")
760            .to_string(),
761    ])
762    .map_err(xlog_core::XlogError::Kernel)
763}
764
765/// Build the canonical 4-cycle head schema
766/// `(W, X, Y, Z)` from the canonical promoter inputs.
767fn build_4cycle_head_schema(
768    buf_e1: &CudaBuffer,
769    buf_e2: &CudaBuffer,
770    buf_e3: &CudaBuffer,
771) -> Result<Schema> {
772    // `[e_wx, e_xy, e_yz, e_zw]` — canonical promoter order.
773    // W = e_wx.col0, X = e_wx.col1 (= e_xy.col0), Y = e_xy.col1
774    // (= e_yz.col0), Z = e_yz.col1 (= e_zw.col0).
775    let w_type = buf_e1.schema.column_type(0).ok_or_else(|| {
776        xlog_core::XlogError::Kernel("build_4cycle_head_schema: e_wx.col0 type missing".into())
777    })?;
778    let x_type = buf_e1.schema.column_type(1).ok_or_else(|| {
779        xlog_core::XlogError::Kernel("build_4cycle_head_schema: e_wx.col1 type missing".into())
780    })?;
781    let y_type = buf_e2.schema.column_type(1).ok_or_else(|| {
782        xlog_core::XlogError::Kernel("build_4cycle_head_schema: e_xy.col1 type missing".into())
783    })?;
784    let z_type = buf_e3.schema.column_type(1).ok_or_else(|| {
785        xlog_core::XlogError::Kernel("build_4cycle_head_schema: e_yz.col1 type missing".into())
786    })?;
787    // Suppress the unused-import warning when ScalarType isn't
788    // referenced in this scope (kept here for explicitness in case
789    // a future change adds a width check).
790    let _: ScalarType = w_type;
791    Schema::new(vec![
792        ("col0".to_string(), w_type),
793        ("col1".to_string(), x_type),
794        ("col2".to_string(), y_type),
795        ("col3".to_string(), z_type),
796    ])
797    .with_sort_labels(vec![
798        buf_e1
799            .schema
800            .column_sort_label(0)
801            .unwrap_or("col0")
802            .to_string(),
803        buf_e1
804            .schema
805            .column_sort_label(1)
806            .unwrap_or("col1")
807            .to_string(),
808        buf_e2
809            .schema
810            .column_sort_label(1)
811            .unwrap_or("col2")
812            .to_string(),
813        buf_e3
814            .schema
815            .column_sort_label(1)
816            .unwrap_or("col3")
817            .to_string(),
818    ])
819    .map_err(xlog_core::XlogError::Kernel)
820}
821
822impl Executor {
823    /// Try to dispatch a single non-recursive rule through the
824    /// GPU WCOJ triangle kernel. Returns `Ok(Some(buffer))` if
825    /// the dispatch fires and produces a result; `Ok(None)`
826    /// otherwise (gate off, shape mismatch, missing buffer,
827    /// non-4-byte-key schema, missing runtime, or kernel error — every
828    /// failure mode is silent fallback).
829    ///
830    /// On `Ok(Some(_))`, the caller is responsible for installing
831    /// the buffer into the relation store via the same path the
832    /// existing binary-join branch uses.
833    pub(super) fn try_dispatch_wcoj_triangle(
834        &mut self,
835        rule: &CompiledRule,
836    ) -> Result<Option<CudaBuffer>> {
837        // Body-keyed entry. Rule-keyed callers stay
838        // byte-identical via this thin wrapper.
839        self.try_dispatch_wcoj_triangle_on_body(&rule.body)
840    }
841
842    /// Read the WCOJ output buffer's logical row count.
843    /// Returns `None` when the cache isn't populated. **Never
844    /// returns `Some(0)` for an unknown row count** — only for
845    /// an observed-empty output. The distinction matters for
846    /// `record_wcoj_feedback`: an unknown count must skip the
847    /// EMA update, not record selectivity 0.
848    fn wcoj_output_rows(buf: &CudaBuffer) -> Option<u64> {
849        // `CudaBuffer::cached_row_count` returns `Option<u32>`;
850        // widen to `u64` for the `StatsManager` API.
851        buf.cached_row_count().map(u64::from)
852    }
853
854    /// Wire successful WCOJ dispatches back into
855    /// `StatsManager` so the cardinality cost model's future
856    /// `binary_est` reads reflect observed selectivity.
857    ///
858    /// **Leader-aware routing**: the `(rel_a, rel_b, left_keys, right_keys)`
859    /// quadruple is derived from the dispatched plan's
860    /// `var_order` via `feedback_pair_from_var_order`, NOT
861    /// hardcoded:
862    ///
863    /// * `var_order = None` (default config): returns the
864    ///   canonical feedback pair, `(slot_rels[0], slot_rels[1])`
865    ///   with keys `[1] / [0]`.
866    /// * `var_order = Some(_)` (non-default leader): returns the rotated
867    ///   pair from the locked feedback table — triangle
868    ///   non-default leaders use rotated `(slot_rels[0],
869    ///   slot_rels[1])` with keys `[1] / [1]` (Z-shared edges
870    ///   in canonical layout join on col 1 of both rels);
871    ///   4-cycle is rotation-only with keys `[1] / [0]`.
872    ///
873    /// `CardinalityAwareCostModel::should_dispatch_*` still
874    /// reads via `estimate_join_cardinality` on the canonical
875    /// default-leader pair — but on a non-default-leader run
876    /// the dispatched layout's actual edge is what we observe,
877    /// and that's what gets recorded under the rotated key.
878    /// The leader-aware cost models look up rotated edges
879    /// correspondingly; the writer ↔ reader pair stays
880    /// coherent under each leader topology.
881    ///
882    /// Skips the recording when:
883    ///   * `slot_rels.len() < 2` — not enough slots for a
884    ///     binary inner pair (defensive).
885    ///   * `output_rows == None` — unknown logical row count;
886    ///     recording 0 would poison the EMA.
887    ///   * `feedback_pair_from_var_order` returns `None` — the
888    ///     leader rotation isn't in the locked feedback table
889    ///     (conservative; never write under uncertainty).
890    ///   * Any of `(rel_a, rel_b)` has missing or zero
891    ///     cardinality; unknown inputs would compute a meaningless
892    ///     `input_card_product`.
893    ///
894    /// Recording an observed-empty output (`Some(0)`) IS
895    /// correct — the EMA tightens future estimates toward zero,
896    /// so WCOJ becomes less likely on the same inputs next
897    /// call (the kernel produced nothing useful).
898    ///
899    /// The triangle / 4-cycle output is a strict subset of the
900    /// inner-join intermediate (the third / additional atoms
901    /// further filter it). The recorded selectivity is
902    /// therefore an UPPER BOUND on the true binary
903    /// selectivity, which is the correct conservative direction
904    /// for the cost model: it under-claims the WCOJ kernel's
905    /// win rather than over-claiming.
906    fn record_wcoj_feedback(
907        &mut self,
908        slot_rels: &[RelId],
909        var_order: Option<&VariableOrder>,
910        output_rows: Option<u64>,
911    ) {
912        if slot_rels.len() < 2 {
913            return;
914        }
915        let Some(out_rows) = output_rows else {
916            return;
917        };
918        // Derive the (slot 0, slot 1) feedback pair and the
919        // underlying-relation key columns from `var_order`.
920        // For `var_order = None` (default config), this returns
921        // the canonical pair + keys [1]/[0]. For Some(_), the pair may be
922        // rotated (triangle non-default leaders use rotated pair
923        // + [1]/[1] keys; 4-cycle is rotation-only [1]/[0]).
924        let Some((rel_a, rel_b, left_keys, right_keys)) =
925            feedback_pair_from_var_order(slot_rels, var_order)
926        else {
927            return;
928        };
929        let card_a = self
930            .stats
931            .get_relation_stats(rel_a)
932            .map(|s| s.cardinality)
933            .filter(|c| *c > 0);
934        let card_b = self
935            .stats
936            .get_relation_stats(rel_b)
937            .map(|s| s.cardinality)
938            .filter(|c| *c > 0);
939        let (Some(a), Some(b)) = (card_a, card_b) else {
940            return;
941        };
942        let input_rows = a.saturating_mul(b);
943        // `record_join_result` takes owned `Vec<usize>` for the
944        // key columns (signature predates this slice).
945        self.stats
946            .record_join_result(rel_a, rel_b, left_keys, right_keys, input_rows, out_rows);
947    }
948
949    /// Body-keyed entry point: same gate / pattern-match / dispatch
950    /// logic as `try_dispatch_wcoj_triangle`, keyed on `body`
951    /// rather than `&CompiledRule`. The recursive engine calls
952    /// this on the rewritten variant body (one Scan's RelId
953    /// swapped to a delta RelId); the rule-keyed wrapper above
954    /// preserves the rule-keyed surface for non-recursive callers.
955    pub(super) fn try_dispatch_wcoj_triangle_on_body(
956        &mut self,
957        body: &RirNode,
958    ) -> Result<Option<CudaBuffer>> {
959        #[cfg(feature = "wcoj-phase-timing")]
960        let wall_start = Instant::now();
961        // 1. Gate resolution. Decision tree (highest → lowest):
962        //
963        //    a. Runtime disable flag → no dispatch.
964        //    b. If `wcoj_triangle_dispatch` resolves to true
965        //       (config Some(true) or env=1) → force WCOJ.
966        //    c. Force = Some(false) → explicit off.
967        //    d. Else if stats mode resolves to true, consult
968        //       the cardinality model.
969        //    e. Else → no dispatch.
970        if self.config.wcoj_triangle_dispatch_disabled.unwrap_or(false) {
971            return Ok(None);
972        }
973        let force_override = self.config.wcoj_triangle_dispatch;
974        let force_on = wcoj_gate_enabled(force_override);
975        let mode = if force_on {
976            DispatchMode::Force
977        } else {
978            // Force-Some(false) is "explicitly off". Only when
979            // force is None or env-default-off do we consult the
980            // stats gate.
981            let force_explicit_off = matches!(force_override, Some(false));
982            if force_explicit_off {
983                return Ok(None);
984            }
985            let adaptive_override = self.config.wcoj_triangle_dispatch_adaptive;
986            if wcoj_adaptive_enabled(adaptive_override) {
987                DispatchMode::CostModel
988            } else {
989                return Ok(None);
990            }
991        };
992
993        // 2. Pattern-match the canonical-triangle MultiWayJoin.
994        let Some(matched) = match_multiway_triangle(body) else {
995            return Ok(None);
996        };
997
998        // 3. Resolve rel IDs to predicate names.
999        // get_rel_name returns Option<&str> — bind to owned String
1000        // so the borrow doesn't conflict with later &mut self uses.
1001        let name_xy = match self.get_rel_name(matched.rel_xy) {
1002            Some(s) => s.to_string(),
1003            None => return Ok(None),
1004        };
1005        let name_yz = match self.get_rel_name(matched.rel_yz) {
1006            Some(s) => s.to_string(),
1007            None => return Ok(None),
1008        };
1009        let name_xz = match self.get_rel_name(matched.rel_xz) {
1010            Some(s) => s.to_string(),
1011            None => return Ok(None),
1012        };
1013
1014        // 4. Look up input buffers + classify their key widths.
1015        // All three slots must be WCOJ-eligible AND share the
1016        // same width — mixed-width triangles fall back here so
1017        // the binary-join path handles them.
1018        let buf_xy = match self.store.get(&name_xy) {
1019            Some(b) => b,
1020            None => return Ok(None),
1021        };
1022        let buf_yz = match self.store.get(&name_yz) {
1023            Some(b) => b,
1024            None => return Ok(None),
1025        };
1026        let buf_xz = match self.store.get(&name_xz) {
1027            Some(b) => b,
1028            None => return Ok(None),
1029        };
1030        let width = match (
1031            classify_two_col_wcoj_width(buf_xy),
1032            classify_two_col_wcoj_width(buf_yz),
1033            classify_two_col_wcoj_width(buf_xz),
1034        ) {
1035            (Some(a), Some(b), Some(c)) if a == b && b == c => a,
1036            _ => return Ok(None),
1037        };
1038
1039        // 5. Resolve the cached executor WCOJ launch stream.
1040        // Acquire-once / reuse-forever (mirrors
1041        // `CudaKernelProvider::recorded_op_stream`). Acquiring
1042        // per-invocation would silently drain the
1043        // `StreamPool` (default cap 16, grow-only) on long-
1044        // lived runtimes — once exhausted, subsequent
1045        // dispatches would silently fall back to binary-join
1046        // and the dispatch counter would stop incrementing.
1047        // Without a runtime-backed manager, the recorded WCOJ
1048        // primitives can't run — fall back silently.
1049        if self.provider.memory().runtime().is_none() {
1050            return Ok(None);
1051        }
1052        let launch_stream = match self.wcoj_dispatch_stream_or_init() {
1053            Some(s) => s,
1054            None => return Ok(None),
1055        };
1056
1057        // 6. Stats-backed mode only: resolve the WCOJ cost model
1058        // on the same launch stream as the eventual GPU pipeline.
1059        #[cfg(feature = "wcoj-phase-timing")]
1060        let mut classifier_ms: f32 = 0.0;
1061        if mode == DispatchMode::CostModel {
1062            #[cfg(feature = "wcoj-phase-timing")]
1063            let cls_start = Instant::now();
1064            let model = super::wcoj_cost_model::build_wcoj_cost_model(&self.config);
1065            let slot_rels = [matched.rel_xy, matched.rel_yz, matched.rel_xz];
1066            let ctx = super::wcoj_cost_model::WcojDispatchCtx {
1067                stats: &self.stats,
1068                launch_stream,
1069                width,
1070                slot_rels: &slot_rels,
1071            };
1072            let dispatch = model.should_dispatch_triangle(&ctx);
1073            #[cfg(feature = "wcoj-phase-timing")]
1074            {
1075                classifier_ms = cls_start.elapsed().as_secs_f64() as f32 * 1000.0;
1076            }
1077            if !dispatch {
1078                return Ok(None);
1079            }
1080        }
1081
1082        // Extract var_order from the matched MultiWayJoin body. None preserves
1083        // default-leader dispatch bit-identically.
1084        let var_order_opt: Option<&VariableOrder> = match body {
1085            RirNode::MultiWayJoin { var_order, .. } => var_order.as_ref(),
1086            _ => None,
1087        };
1088
1089        // 7. Run layout + triangle. Convert any kernel error to
1090        // silent fallback per dispatch contract ("failure must not
1091        // corrupt store state"). The WCOJ helpers don't write
1092        // to the store, so an error here only loses the work
1093        // we just did — the binary-join path picks it up.
1094        #[cfg(feature = "wcoj-phase-timing")]
1095        let mut layout_times: [f32; 3] = [0.0; 3];
1096        let dispatch_result = self.run_wcoj_triangle_pipeline(
1097            buf_xy,
1098            buf_yz,
1099            buf_xz,
1100            launch_stream,
1101            width,
1102            var_order_opt,
1103            #[cfg(feature = "wcoj-phase-timing")]
1104            &mut layout_times,
1105        );
1106        match dispatch_result {
1107            Ok(buf) => {
1108                // Record observed selectivity into
1109                // StatsManager for the cardinality cost model.
1110                // The (rel_a, rel_b, left_keys, right_keys) pair
1111                // is derived from `var_order_opt` via
1112                // `feedback_pair_from_var_order`:
1113                //   * `var_order = None` (default config) →
1114                //     canonical `(rel_xy, rel_yz)` keys
1115                //     `[1]/[0]`.
1116                //   * `var_order = Some(_)` (non-default leader) →
1117                //     rotated pair per the feedback table.
1118                //     Triangle non-default leaders use rotated
1119                //     `(slot_rels[0], slot_rels[1])` with keys
1120                //     `[1]/[1]` (Z-shared edges in canonical
1121                //     layout join on col 1 of both rels).
1122                // Helper handles skip-on-missing-data and is
1123                // called BEFORE the counter increment so a
1124                // helper panic doesn't advance the counter.
1125                let output_rows = Self::wcoj_output_rows(&buf);
1126                let slot_rels = [matched.rel_xy, matched.rel_yz, matched.rel_xz];
1127                self.record_wcoj_feedback(&slot_rels, var_order_opt, output_rows);
1128                self.wcoj_triangle_dispatch_count += 1;
1129                #[cfg(feature = "wcoj-phase-timing")]
1130                {
1131                    let triangle_timing = self
1132                        .provider
1133                        .take_wcoj_triangle_phase_timing()
1134                        .unwrap_or_default();
1135                    let wall_ms = wall_start.elapsed().as_secs_f64() as f32 * 1000.0;
1136                    let timing = super::wcoj_phase_timing::WcojDispatchPhaseTiming::new(
1137                        classifier_ms,
1138                        layout_times[0],
1139                        layout_times[1],
1140                        layout_times[2],
1141                        triangle_timing,
1142                        wall_ms,
1143                    );
1144                    if let Ok(mut g) = self.last_wcoj_phase_timing.lock() {
1145                        *g = Some(timing);
1146                    }
1147                }
1148                Ok(Some(buf))
1149            }
1150            Err(err) => wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "triangle", err),
1151        }
1152    }
1153
1154    /// Inner pipeline: 3× layout construction + triangle kernel.
1155    /// Split out so [`try_dispatch_wcoj_triangle`] can map any
1156    /// error to `Ok(None)` cleanly. Branches by `width` between
1157    /// the parallel u32 and u64 provider entries.
1158    ///
1159    /// Under feature `wcoj-phase-timing`, fills the optional
1160    /// `layout_times_ms` slot with `[layout_xy, layout_yz, layout_xz]`
1161    /// wall times in milliseconds. The triangle's per-phase GPU
1162    /// times are pulled from the provider via
1163    /// `take_wcoj_triangle_phase_timing` after this returns.
1164    #[allow(clippy::too_many_arguments)]
1165    fn run_wcoj_triangle_pipeline(
1166        &self,
1167        buf_xy: &CudaBuffer,
1168        buf_yz: &CudaBuffer,
1169        buf_xz: &CudaBuffer,
1170        launch_stream: StreamId,
1171        width: WcojKeyWidth,
1172        var_order: Option<&VariableOrder>,
1173        #[cfg(feature = "wcoj-phase-timing")] layout_times_ms: &mut [f32; 3],
1174    ) -> Result<CudaBuffer> {
1175        // When the cost model selected a non-default leader,
1176        // run the rotated/swapped path. Layout helper sees the
1177        // (possibly col-swapped) leader-rotated inputs; kernel
1178        // emits in (a, b, c) order; final projection helper remaps
1179        // to the canonical (X, Y, Z) head order.
1180        if let Some(vo) = var_order {
1181            return self.run_wcoj_triangle_pipeline_with_leader_order(
1182                buf_xy,
1183                buf_yz,
1184                buf_xz,
1185                launch_stream,
1186                width,
1187                vo,
1188            );
1189        }
1190        #[cfg(feature = "wcoj-phase-timing")]
1191        let mut time_layout =
1192            |f: &dyn Fn() -> Result<CudaBuffer>, slot: usize| -> Result<CudaBuffer> {
1193                let s = Instant::now();
1194                let r = f()?;
1195                layout_times_ms[slot] = s.elapsed().as_secs_f64() as f32 * 1000.0;
1196                Ok(r)
1197            };
1198        match width {
1199            WcojKeyWidth::FourByte => {
1200                #[cfg(feature = "wcoj-phase-timing")]
1201                let (layout_xy, layout_yz, layout_xz) = {
1202                    let xy = time_layout(
1203                        &|| {
1204                            self.provider
1205                                .wcoj_layout_u32_recorded(buf_xy, launch_stream)
1206                        },
1207                        0,
1208                    )?;
1209                    let yz = time_layout(
1210                        &|| {
1211                            self.provider
1212                                .wcoj_layout_u32_recorded(buf_yz, launch_stream)
1213                        },
1214                        1,
1215                    )?;
1216                    let xz = time_layout(
1217                        &|| {
1218                            self.provider
1219                                .wcoj_layout_u32_recorded(buf_xz, launch_stream)
1220                        },
1221                        2,
1222                    )?;
1223                    (xy, yz, xz)
1224                };
1225                #[cfg(not(feature = "wcoj-phase-timing"))]
1226                let layout_xy = self
1227                    .provider
1228                    .wcoj_layout_u32_recorded(buf_xy, launch_stream)?;
1229                #[cfg(not(feature = "wcoj-phase-timing"))]
1230                let layout_yz = self
1231                    .provider
1232                    .wcoj_layout_u32_recorded(buf_yz, launch_stream)?;
1233                #[cfg(not(feature = "wcoj-phase-timing"))]
1234                let layout_xz = self
1235                    .provider
1236                    .wcoj_layout_u32_recorded(buf_xz, launch_stream)?;
1237                let out = self.provider.wcoj_triangle_hg_u32_recorded(
1238                    &layout_xy,
1239                    &layout_yz,
1240                    &layout_xz,
1241                    wcoj_block_work_unit(),
1242                    launch_stream,
1243                )?;
1244                self.provider.record_wcoj_triangle_hg_dispatch();
1245                Ok(out)
1246            }
1247            WcojKeyWidth::EightByte => {
1248                #[cfg(feature = "wcoj-phase-timing")]
1249                let (layout_xy, layout_yz, layout_xz) = {
1250                    let xy = time_layout(
1251                        &|| {
1252                            self.provider
1253                                .wcoj_layout_u64_recorded(buf_xy, launch_stream)
1254                        },
1255                        0,
1256                    )?;
1257                    let yz = time_layout(
1258                        &|| {
1259                            self.provider
1260                                .wcoj_layout_u64_recorded(buf_yz, launch_stream)
1261                        },
1262                        1,
1263                    )?;
1264                    let xz = time_layout(
1265                        &|| {
1266                            self.provider
1267                                .wcoj_layout_u64_recorded(buf_xz, launch_stream)
1268                        },
1269                        2,
1270                    )?;
1271                    (xy, yz, xz)
1272                };
1273                #[cfg(not(feature = "wcoj-phase-timing"))]
1274                let layout_xy = self
1275                    .provider
1276                    .wcoj_layout_u64_recorded(buf_xy, launch_stream)?;
1277                #[cfg(not(feature = "wcoj-phase-timing"))]
1278                let layout_yz = self
1279                    .provider
1280                    .wcoj_layout_u64_recorded(buf_yz, launch_stream)?;
1281                #[cfg(not(feature = "wcoj-phase-timing"))]
1282                let layout_xz = self
1283                    .provider
1284                    .wcoj_layout_u64_recorded(buf_xz, launch_stream)?;
1285                self.provider.wcoj_triangle_u64_recorded(
1286                    &layout_xy,
1287                    &layout_yz,
1288                    &layout_xz,
1289                    launch_stream,
1290                )
1291            }
1292        }
1293    }
1294
1295    /// Pipeline for non-default leaders. Uses the permutation tables on
1296    /// `var_order` to:
1297    /// 1. Rotate canonical inputs `[buf_xy, buf_yz, buf_xz]` so the
1298    ///    leader sits at slot 0.
1299    /// 2. Apply col-swap (via `wcoj_project_2col_swap_recorded`) to
1300    ///    any non-leader slot whose `LookupPerm.swap_cols` is true.
1301    ///    Triangle e_yz / e_xz leaders need swaps; 4-cycle is
1302    ///    rotation-only (no swap entries).
1303    /// 3. Run `wcoj_layout_*_recorded` on each slot input.
1304    /// 4. Run `wcoj_triangle_*_recorded`. Kernel emits 3 columns
1305    ///    in leader's `(a, b, c)` order.
1306    /// 5. Apply `wcoj_project_output_columns_recorded` with
1307    ///    `var_order.kernel_output_cols` to re-permute the
1308    ///    kernel-direct output into the canonical head order
1309    ///    `(X, Y, Z)`.
1310    ///
1311    /// Phase timing is intentionally NOT instrumented on this path; performance
1312    /// validation for the non-default leader threshold is handled by benchmark
1313    /// evidence outside this helper.
1314    fn run_wcoj_triangle_pipeline_with_leader_order(
1315        &self,
1316        buf_xy: &CudaBuffer,
1317        buf_yz: &CudaBuffer,
1318        buf_xz: &CudaBuffer,
1319        launch_stream: StreamId,
1320        width: WcojKeyWidth,
1321        var_order: &VariableOrder,
1322    ) -> Result<CudaBuffer> {
1323        let canonical: [&CudaBuffer; 3] = [buf_xy, buf_yz, buf_xz];
1324        let slot_inputs = self.prepare_leader_inputs(&canonical, var_order, launch_stream)?;
1325        if slot_inputs.len() != 3 {
1326            return Err(xlog_core::XlogError::Kernel(
1327                "run_wcoj_triangle_pipeline_with_leader_order: prepare_leader_inputs must return 3 slots"
1328                    .to_string(),
1329            ));
1330        }
1331
1332        // Build the canonical (X, Y, Z) head schema from the
1333        // canonical promoter inputs (NOT the rotated kernel
1334        // inputs). The kernel will emit in (a, b, c) order under
1335        // the rotated leader; the final projection helper maps
1336        // back to head order using kernel_output_cols.
1337        let head_schema = build_triangle_head_schema(buf_xy, buf_yz)?;
1338        let perm = perm_indices_from_kernel_output_cols(&var_order.kernel_output_cols)?;
1339
1340        let kernel_out: CudaBuffer = match width {
1341            WcojKeyWidth::FourByte => {
1342                let l0 = self
1343                    .provider
1344                    .wcoj_layout_u32_recorded(&slot_inputs[0], launch_stream)?;
1345                let l1 = self
1346                    .provider
1347                    .wcoj_layout_u32_recorded(&slot_inputs[1], launch_stream)?;
1348                let l2 = self
1349                    .provider
1350                    .wcoj_layout_u32_recorded(&slot_inputs[2], launch_stream)?;
1351                let out = self.provider.wcoj_triangle_hg_u32_recorded(
1352                    &l0,
1353                    &l1,
1354                    &l2,
1355                    wcoj_block_work_unit(),
1356                    launch_stream,
1357                )?;
1358                self.provider.record_wcoj_triangle_hg_dispatch();
1359                out
1360            }
1361            WcojKeyWidth::EightByte => {
1362                let l0 = self
1363                    .provider
1364                    .wcoj_layout_u64_recorded(&slot_inputs[0], launch_stream)?;
1365                let l1 = self
1366                    .provider
1367                    .wcoj_layout_u64_recorded(&slot_inputs[1], launch_stream)?;
1368                let l2 = self
1369                    .provider
1370                    .wcoj_layout_u64_recorded(&slot_inputs[2], launch_stream)?;
1371                self.provider
1372                    .wcoj_triangle_u64_recorded(&l0, &l1, &l2, launch_stream)?
1373            }
1374        };
1375
1376        self.provider.wcoj_project_output_columns_recorded(
1377            &kernel_out,
1378            &perm,
1379            head_schema,
1380            launch_stream,
1381        )
1382    }
1383
1384    /// Number of times the WCOJ triangle hook produced a result
1385    /// and the executor installed it. Used by tests to assert
1386    /// that the WCOJ path actually ran (vs. silently falling
1387    /// back to the existing binary-join path with the same
1388    /// answer).
1389    pub fn wcoj_triangle_dispatch_count(&self) -> u64 {
1390        self.wcoj_triangle_dispatch_count
1391    }
1392
1393    /// Number of WCOJ pipeline errors (layout or kernel failures, across
1394    /// triangle / 4-cycle / k-clique / chain hooks) that were converted
1395    /// into binary-join declines. Healthy dispatch keeps this at 0; a
1396    /// nonzero value is the signature of a regressed WCOJ pipeline hiding
1397    /// behind the silent-fallback contract. Set `XLOG_WCOJ_STRICT=1` to
1398    /// propagate such errors instead of declining.
1399    pub fn wcoj_error_decline_count(&self) -> u64 {
1400        self.wcoj_error_decline_count
1401    }
1402
1403    /// Count of times the generalized Free Join dispatch produced
1404    /// the installed result (vs. the embedded binary fallback).
1405    pub fn free_join_dispatch_count(&self) -> u64 {
1406        self.free_join_dispatch_count
1407    }
1408
1409    /// Count of times the factorized recursive-delta dispatch produced the
1410    /// installed novel set (vs. the legacy hash-join -> diff path).
1411    pub fn factorized_delta_dispatch_count(&self) -> u64 {
1412        self.factorized_delta_dispatch_count
1413    }
1414
1415    /// Layout-normalize one factorized-delta static side key-first:
1416    /// key column 0 feeds the layout helper directly; key column 1 is
1417    /// column-swapped through the recorded projection first.
1418    fn factorized_delta_normalize_static(
1419        &self,
1420        buf: &CudaBuffer,
1421        key_col: usize,
1422        launch_stream: StreamId,
1423    ) -> Result<CudaBuffer> {
1424        if key_col == 0 {
1425            return self.provider.wcoj_layout_u32_recorded(buf, launch_stream);
1426        }
1427        let ty = |i: usize| {
1428            buf.schema().column_type(i).ok_or_else(|| {
1429                xlog_core::XlogError::Execution(format!(
1430                    "factorized-delta: static column {i} type missing"
1431                ))
1432            })
1433        };
1434        let swapped = Schema::new(vec![("k".to_string(), ty(1)?), ("v".to_string(), ty(0)?)]);
1435        let projected = self.provider.wcoj_project_output_columns_recorded(
1436            buf,
1437            &[1, 0],
1438            swapped,
1439            launch_stream,
1440        )?;
1441        self.provider
1442            .wcoj_layout_u32_recorded(&projected, launch_stream)
1443    }
1444
1445    /// Dispatch one semi-naive delta step through the factorized novel-set
1446    /// pipeline (`fj_delta_novel_u32_recorded`). Accepts the
1447    /// per-occurrence delta-rewritten variant body when it is a
1448    /// `ChainJoin` over two Scans with exactly one scanning the delta
1449    /// relation; the returned buffer is the head-order novel set —
1450    /// already diffed against `head_pred`'s stable relation and
1451    /// full-row deduped, so the caller may skip the legacy diff when
1452    /// every contribution to the head went through this path.
1453    ///
1454    /// Declines (silent, `Ok(None)`): kill switch, non-ChainJoin or
1455    /// non-Scan children, zero/two delta occurrences, non-u32/Symbol
1456    /// or non-arity-2 schemas, missing store buffers, head projection
1457    /// that is not a permutation of {delta carry, static value},
1458    /// dense-domain bound over the cap (cached per fixpoint), and the
1459    /// per-iteration work floor. Pipeline errors route through
1460    /// [`wcoj_decline_on_error`] ("factorized-delta" stage).
1461    pub(super) fn try_dispatch_factorized_delta(
1462        &mut self,
1463        node: &RirNode,
1464        delta_rel: RelId,
1465        head_pred: &str,
1466        recursive_preds: &HashSet<String>,
1467        ctx: &mut FactorizedDeltaCtx,
1468    ) -> Result<Option<CudaBuffer>> {
1469        use xlog_cuda::provider::FjDeltaCols;
1470
1471        if factorized_delta_disabled() {
1472            return Ok(None);
1473        }
1474        let RirNode::ChainJoin {
1475            left,
1476            right,
1477            left_key,
1478            right_key,
1479            output_columns,
1480            ..
1481        } = node
1482        else {
1483            return Ok(None);
1484        };
1485        let (RirNode::Scan { rel: left_rel }, RirNode::Scan { rel: right_rel }) =
1486            (left.as_ref(), right.as_ref())
1487        else {
1488            return Ok(None);
1489        };
1490        // Exactly one side scans the delta (per-occurrence variant
1491        // rewriting guarantees one occurrence; a delta-delta chain or
1492        // a chain not touching the delta both decline).
1493        let delta_on_left = match (*left_rel == delta_rel, *right_rel == delta_rel) {
1494            (true, false) => true,
1495            (false, true) => false,
1496            _ => return Ok(None),
1497        };
1498        let (delta_key, static_rel, static_key) = if delta_on_left {
1499            (*left_key, *right_rel, *right_key)
1500        } else {
1501            (*right_key, *left_rel, *left_key)
1502        };
1503        if delta_key > 1 || static_key > 1 {
1504            return Ok(None);
1505        }
1506        let delta_carry = 1 - delta_key;
1507        let static_value = 1 - static_key;
1508
1509        // Head projection must be a permutation of {delta carry,
1510        // static value} in the combined left+right column space.
1511        let (delta_off, static_off) = if delta_on_left { (0, 2) } else { (2, 0) };
1512        let carry_global = delta_off + delta_carry;
1513        let value_global = static_off + static_value;
1514        let [ProjectExpr::Column(out0), ProjectExpr::Column(out1)] = output_columns.as_slice()
1515        else {
1516            return Ok(None);
1517        };
1518        let (r_carry, r_value) = if (*out0, *out1) == (carry_global, value_global) {
1519            (0, 1)
1520        } else if (*out0, *out1) == (value_global, carry_global) {
1521            (1, 0)
1522        } else {
1523            return Ok(None);
1524        };
1525
1526        // Resolve store buffers; all three must be arity-2 u32/Symbol.
1527        let binary_u32_class = |buf: &CudaBuffer| {
1528            buf.arity() == 2
1529                && (0..2).all(|i| {
1530                    matches!(
1531                        buf.schema().column_type(i),
1532                        Some(ScalarType::U32) | Some(ScalarType::Symbol)
1533                    )
1534                })
1535        };
1536        let Some(delta_name) = self.get_rel_name(delta_rel).map(str::to_string) else {
1537            return Ok(None);
1538        };
1539        let Some(static_name) = self.get_rel_name(static_rel).map(str::to_string) else {
1540            return Ok(None);
1541        };
1542        let Some(delta_buf) = self.store.get(&delta_name) else {
1543            return Ok(None);
1544        };
1545        let Some(static_buf) = self.store.get(&static_name) else {
1546            return Ok(None);
1547        };
1548        let Some(full_buf) = self.store.get(head_pred) else {
1549            return Ok(None);
1550        };
1551        if !binary_u32_class(delta_buf)
1552            || !binary_u32_class(static_buf)
1553            || !binary_u32_class(full_buf)
1554        {
1555            return Ok(None);
1556        }
1557        if self.provider.memory().runtime().is_none() {
1558            return Ok(None);
1559        }
1560        let Some(launch_stream) = self.wcoj_dispatch_stream_or_init() else {
1561            return Ok(None);
1562        };
1563
1564        // Dense-domain bound, computed once per (head, static) per
1565        // fixpoint at the first dispatch attempt (induction: every
1566        // derived id comes from the seeded delta, the stable head
1567        // relation, or the static side — exactly the iteration-1
1568        // sets). The in-kernel bounds check stays as the fail-closed
1569        // backstop.
1570        // Domain bound (max id + 1), computed once per (head, static)
1571        // per fixpoint (induction: every derived id comes from the
1572        // seeded delta, the stable head relation, or the static side).
1573        // `None` only when an id is u32::MAX — neither the dense
1574        // bitvector (domain overflow) nor the sparse hash set (the
1575        // forbidden (MAX,MAX) key) can pack it, so decline for the
1576        // whole fixpoint.
1577        let domain_key = (head_pred.to_string(), static_rel);
1578        let domain = match ctx.domain_by_key.get(&domain_key) {
1579            Some(Some(d)) => *d,
1580            Some(None) => return Ok(None),
1581            None => {
1582                let max_id = match self.provider.fj_delta_columns_max_u32(
1583                    &[
1584                        (delta_buf, &[0, 1][..]),
1585                        (static_buf, &[0, 1][..]),
1586                        (full_buf, &[0, 1][..]),
1587                    ],
1588                    launch_stream,
1589                ) {
1590                    Ok(m) => m,
1591                    Err(err) => {
1592                        return wcoj_decline_on_error(
1593                            &mut self.wcoj_error_decline_count,
1594                            "factorized-delta",
1595                            err,
1596                        );
1597                    }
1598                };
1599                let decided = if max_id == u32::MAX {
1600                    None
1601                } else {
1602                    Some(max_id + 1)
1603                };
1604                ctx.domain_by_key.insert(domain_key, decided);
1605                match decided {
1606                    Some(d) => d,
1607                    None => return Ok(None),
1608                }
1609            }
1610        };
1611
1612        let n_delta = u64::from(self.buffer_row_count(delta_buf)?);
1613        let n_static = u64::from(self.buffer_row_count(static_buf)?);
1614        if n_delta == 0 || n_static == 0 {
1615            return Ok(None);
1616        }
1617
1618        // Route: dense characteristic-bitvector when the domain fits
1619        // the cap (default 2¹⁴, env up to 2¹⁶); sparse hash set
1620        // otherwise. The bitvector's popcount+scan floor over
1621        // n_words = domain·⌈domain/32⌉ is a domain² term, so it gates
1622        // ONLY the dense route — applying it to a large-domain sparse
1623        // step would spuriously bail every iteration.
1624        let dense = domain <= factorized_delta_max_domain();
1625        if dense {
1626            let n_words = u64::from(domain.div_ceil(32)) * u64::from(domain);
1627            let work_est = n_delta.saturating_mul((n_static / u64::from(domain)).max(1));
1628            if work_est < n_words / factorized_delta_work_divisor() {
1629                return Ok(None);
1630            }
1631        }
1632
1633        // Static side key-first layout. EDB statics are normalized
1634        // once per fixpoint (cached); a recursive static (non-linear
1635        // self-join — the stable relation itself) changes every
1636        // iteration and is re-normalized (it is already sorted+deduped
1637        // from union_gpu, so the layout fast-path applies).
1638        let static_is_recursive = recursive_preds.contains(&static_name);
1639        let norm_owned;
1640        let static_norm: &CudaBuffer = if static_is_recursive {
1641            norm_owned =
1642                match self.factorized_delta_normalize_static(static_buf, static_key, launch_stream)
1643                {
1644                    Ok(b) => b,
1645                    Err(err) => {
1646                        return wcoj_decline_on_error(
1647                            &mut self.wcoj_error_decline_count,
1648                            "factorized-delta",
1649                            err,
1650                        );
1651                    }
1652                };
1653            &norm_owned
1654        } else {
1655            match ctx.static_norm_cache.entry((static_rel, static_key)) {
1656                std::collections::hash_map::Entry::Occupied(e) => &*e.into_mut(),
1657                std::collections::hash_map::Entry::Vacant(v) => {
1658                    let norm = match self.factorized_delta_normalize_static(
1659                        static_buf,
1660                        static_key,
1661                        launch_stream,
1662                    ) {
1663                        Ok(b) => b,
1664                        Err(err) => {
1665                            return wcoj_decline_on_error(
1666                                &mut self.wcoj_error_decline_count,
1667                                "factorized-delta",
1668                                err,
1669                            );
1670                        }
1671                    };
1672                    &*v.insert(norm)
1673                }
1674            }
1675        };
1676
1677        let cols = FjDeltaCols {
1678            delta_carry,
1679            delta_key,
1680            r_carry,
1681            r_value,
1682        };
1683        if dense {
1684            match self.provider.fj_delta_novel_u32_recorded(
1685                delta_buf,
1686                static_norm,
1687                full_buf,
1688                cols,
1689                domain,
1690                launch_stream,
1691            ) {
1692                Ok(novel) => {
1693                    self.factorized_delta_dispatch_count += 1;
1694                    Ok(Some(novel))
1695                }
1696                Err(err) => wcoj_decline_on_error(
1697                    &mut self.wcoj_error_decline_count,
1698                    "factorized-delta",
1699                    err,
1700                ),
1701            }
1702        } else {
1703            // Sparse route: cap the conservative hash table at half the
1704            // device budget; over that, the entry returns Ok(None) and
1705            // we fall back to the legacy hash-join → diff path.
1706            let max_table_bytes =
1707                factorized_delta_max_table_bytes(self.provider.memory().budget().device_bytes);
1708            match self.provider.fj_delta_sparse_novel_u32_recorded(
1709                delta_buf,
1710                static_norm,
1711                full_buf,
1712                cols,
1713                max_table_bytes,
1714                launch_stream,
1715            ) {
1716                Ok(Some(novel)) => {
1717                    self.factorized_delta_dispatch_count += 1;
1718                    Ok(Some(novel))
1719                }
1720                Ok(None) => Ok(None),
1721                Err(err) => wcoj_decline_on_error(
1722                    &mut self.wcoj_error_decline_count,
1723                    "factorized-delta",
1724                    err,
1725                ),
1726            }
1727        }
1728    }
1729
1730    /// Dispatch a general `MultiWayJoin` (any shape WITHOUT a
1731    /// dedicated kernel) through the Free Join frontier engine. Runs
1732    /// after the triangle/4-cycle/k-clique dispatchers in
1733    /// `execute_wcoj_or_fallback_node`, and accepts ONLY nodes
1734    /// carrying `MultiwayPlan::FreeJoin` — the general promoter's
1735    /// provenance marker guaranteeing `output_columns` lives in the
1736    /// concatenated-inputs column space (dedicated promoters reorder
1737    /// `inputs` canonically, so positional interpretation of their
1738    /// nodes would permute the head). The plan is derived
1739    /// `binary2fj`-style over the node's slot order with
1740    /// earliest-node probe pushing (paper §4.1); probe keys must form
1741    /// a PREFIX of each atom's column order (flat sorted tries
1742    /// consume columns physically left-to-right) — non-prefix bodies
1743    /// decline silently to the fallback. Pipeline errors route
1744    /// through [`wcoj_decline_on_error`] ("free-join" stage).
1745    pub(super) fn try_dispatch_free_join(&mut self, node: &RirNode) -> Result<Option<CudaBuffer>> {
1746        use xlog_cuda::provider::{FjNode, FjPlan, FjSubAtom};
1747
1748        if free_join_disabled() {
1749            return Ok(None);
1750        }
1751        let RirNode::MultiWayJoin {
1752            inputs,
1753            slot_vars,
1754            output_columns,
1755            plan,
1756            ..
1757        } = node
1758        else {
1759            return Ok(None);
1760        };
1761        // Provenance gate (design §3): accept ONLY nodes the general
1762        // multiway promoter marked `MultiwayPlan::FreeJoin`. Their
1763        // construction guarantees `inputs` are the fallback's Scan
1764        // leaves in traversal order, so `output_columns` (fallback
1765        // projection space, the universal MultiWayJoin convention)
1766        // coincides with the concatenated-inputs space this
1767        // dispatcher projects from. Dedicated-shape promoters
1768        // (triangle / 4-cycle / K-clique) reorder `inputs`
1769        // canonically — interpreting their `output_columns`
1770        // positionally would permute the head — and they carry
1771        // `None` / `WcojWithPlan` / `PlannedHashRoute`, so the gate
1772        // also subsumes the dedicated-shape carve-out.
1773        if !matches!(plan, Some(MultiwayPlan::FreeJoin)) {
1774            return Ok(None);
1775        }
1776        if inputs.len() < 3 {
1777            return Ok(None);
1778        }
1779        // Resolve scans -> store buffers; all columns across all
1780        // inputs must share ONE width class — u32/Symbol or u64
1781        // (mixed widths and other types decline; the engine's flat
1782        // sorted-range tries are width-uniform per execution).
1783        let mut bufs: Vec<&CudaBuffer> = Vec::with_capacity(inputs.len());
1784        let mut all_u32 = true;
1785        let mut all_u64 = true;
1786        for input in inputs {
1787            let RirNode::Scan { rel } = input else {
1788                return Ok(None);
1789            };
1790            let name = match self.get_rel_name(*rel) {
1791                Some(s) => s.to_string(),
1792                None => return Ok(None),
1793            };
1794            let Some(buf) = self.store.get(&name) else {
1795                return Ok(None);
1796            };
1797            for i in 0..buf.arity() {
1798                match buf.schema().column_type(i) {
1799                    Some(ScalarType::U32 | ScalarType::Symbol) => all_u64 = false,
1800                    Some(ScalarType::U64) => all_u32 = false,
1801                    _ => return Ok(None),
1802                }
1803            }
1804            bufs.push(buf);
1805        }
1806        if !all_u32 && !all_u64 {
1807            return Ok(None);
1808        }
1809        // Fail-open cost-model loss veto: decline Free Join to the binary
1810        // fallback only when the cost model has full stats AND the join
1811        // is provably small (largest input below the WCOJ-worthwhile
1812        // threshold), the measured 1.7–2.0× cost-of-generality region.
1813        // Stats absent / any large input → FJ proceeds (every measured
1814        // win preserved). Inputs are all Scans (checked above).
1815        {
1816            let slot_rels: Vec<RelId> = inputs
1817                .iter()
1818                .filter_map(|i| match i {
1819                    RirNode::Scan { rel } => Some(*rel),
1820                    _ => None,
1821                })
1822                .collect();
1823            let model = super::wcoj_cost_model::build_wcoj_cost_model(&self.config);
1824            let width = if all_u32 {
1825                WcojKeyWidth::FourByte
1826            } else {
1827                WcojKeyWidth::EightByte
1828            };
1829            let ctx = super::wcoj_cost_model::WcojDispatchCtx {
1830                stats: &self.stats,
1831                launch_stream: StreamId::DEFAULT,
1832                width,
1833                slot_rels: &slot_rels,
1834            };
1835            if model.factorized_loss_veto(&ctx) {
1836                return Ok(None);
1837            }
1838        }
1839        // Dense variable ids: remap slot_vars' class ids to 0..n.
1840        let mut class_to_var: Vec<u32> = Vec::new();
1841        let mut dense = |class: u32| -> usize {
1842            match class_to_var.iter().position(|c| *c == class) {
1843                Some(i) => i,
1844                None => {
1845                    class_to_var.push(class);
1846                    class_to_var.len() - 1
1847                }
1848            }
1849        };
1850        let mut atom_vars: Vec<Vec<usize>> = Vec::with_capacity(slot_vars.len());
1851        for (i, cols) in slot_vars.iter().enumerate() {
1852            if cols.len() != bufs[i].arity() {
1853                return Ok(None);
1854            }
1855            let mut vars = Vec::with_capacity(cols.len());
1856            for c in cols {
1857                let Some(class) = c else { return Ok(None) };
1858                vars.push(dense(*class));
1859            }
1860            atom_vars.push(vars);
1861        }
1862        let num_vars = class_to_var.len();
1863        // Prefix-key-joinable order planner (decline-or-reorder).
1864        // Free Join's probe-key rule forces a left-deep prefix in COLUMN
1865        // order, so a bad atom order can materialize a large intermediate even
1866        // when the result is tiny (a measured worst case is ~3x peak vs
1867        // binary). The planner is a
1868        // safety net: it keeps the traversal order when it is already
1869        // competitive with the binary plan (every winning fixture untouched),
1870        // reorders to a better prefix-key-joinable order when one exists, or
1871        // declines to the binary fallback when none is competitive.
1872        // Cardinalities are the ground-truth row counts of the buffers we are
1873        // about to join (NOT StatsManager — always available, never activates
1874        // the loss veto on statless winners); per-pair join estimates consult
1875        // StatsManager when stats are populated. Only the CardinalityAware
1876        // model plans (SkewClassifier opt-out keeps the traversal order).
1877        let order: Vec<usize> = {
1878            let slot_rels: Vec<RelId> = inputs
1879                .iter()
1880                .filter_map(|i| match i {
1881                    RirNode::Scan { rel } => Some(*rel),
1882                    _ => None,
1883                })
1884                .collect();
1885            let cards: Vec<u64> = bufs.iter().map(|b| b.num_rows()).collect();
1886            let model = super::wcoj_cost_model::build_wcoj_cost_model(&self.config);
1887            let width = if all_u32 {
1888                WcojKeyWidth::FourByte
1889            } else {
1890                WcojKeyWidth::EightByte
1891            };
1892            let ctx = super::wcoj_cost_model::WcojDispatchCtx {
1893                stats: &self.stats,
1894                launch_stream: StreamId::DEFAULT,
1895                width,
1896                slot_rels: &slot_rels,
1897            };
1898            match model.plan_free_join_order(&ctx, &atom_vars, &cards) {
1899                super::wcoj_cost_model::FjOrderDecision::Decline => return Ok(None),
1900                super::wcoj_cost_model::FjOrderDecision::Reorder(o) => o,
1901                super::wcoj_cost_model::FjOrderDecision::KeepDefault => {
1902                    (0..atom_vars.len()).collect()
1903                }
1904            }
1905        };
1906        // binary2fj over the planned order with earliest-node probe pushing:
1907        // each atom's bound-variable PREFIX probes the earliest node
1908        // after which its keys are available; the unbound suffix covers
1909        // a new node. Repeated variables within one cover decline (the
1910        // provider's rebind check would reject them).
1911        let mut bound_at: Vec<Option<usize>> = vec![None; num_vars]; // var -> node idx
1912        let mut nodes: Vec<FjNode> = Vec::new();
1913        // Process atoms in the planned order; `i` stays the ORIGINAL input
1914        // index so `input_idx`/`bufs` indexing and the head projection
1915        // (`col_to_var`, built in original order below) remain correct — only
1916        // the prefix-materialization order changes.
1917        for &i in &order {
1918            let vars = &atom_vars[i];
1919            let split = vars.iter().take_while(|v| bound_at[**v].is_some()).count();
1920            if vars[split..].iter().any(|v| bound_at[*v].is_some()) {
1921                // A bound variable after an unbound one: the trie order
1922                // cannot consume it as a key — non-prefix body.
1923                return Ok(None);
1924            }
1925            if split > 0 {
1926                let probe = FjSubAtom {
1927                    input_idx: i,
1928                    var_positions: vars[..split].to_vec(),
1929                };
1930                if nodes.is_empty() {
1931                    return Ok(None);
1932                }
1933                let target = vars[..split]
1934                    .iter()
1935                    .map(|v| bound_at[*v].expect("prefix vars are bound"))
1936                    .max()
1937                    .expect("split > 0");
1938                nodes[target].probes.push(probe);
1939            }
1940            if split < vars.len() {
1941                let cover_vars = vars[split..].to_vec();
1942                let mut seen = HashSet::new();
1943                if !cover_vars.iter().all(|v| seen.insert(*v)) {
1944                    return Ok(None);
1945                }
1946                let k = nodes.len();
1947                for v in &cover_vars {
1948                    bound_at[*v] = Some(k);
1949                }
1950                nodes.push(FjNode {
1951                    cover: FjSubAtom {
1952                        input_idx: i,
1953                        var_positions: cover_vars,
1954                    },
1955                    probes: Vec::new(),
1956                });
1957            } else if nodes.is_empty() {
1958                return Ok(None);
1959            }
1960        }
1961        // Head projection: map join-tree output columns (concatenated
1962        // input columns in slot order) to variable ids.
1963        let mut col_to_var: Vec<usize> = Vec::new();
1964        for vars in &atom_vars {
1965            col_to_var.extend(vars.iter().copied());
1966        }
1967        let mut output_vars: Vec<usize> = Vec::with_capacity(output_columns.len());
1968        for oc in output_columns {
1969            let ProjectExpr::Column(c) = oc else {
1970                return Ok(None);
1971            };
1972            let Some(v) = col_to_var.get(*c) else {
1973                return Ok(None);
1974            };
1975            output_vars.push(*v);
1976        }
1977        let fj_plan = FjPlan {
1978            num_vars,
1979            nodes,
1980            output_vars,
1981        };
1982        if self.provider.memory().runtime().is_none() {
1983            return Ok(None);
1984        }
1985        let Some(launch_stream) = self.wcoj_dispatch_stream_or_init() else {
1986            return Ok(None);
1987        };
1988        let outcome = if all_u32 {
1989            self.provider
1990                .free_join_execute_u32_recorded(&bufs, &fj_plan, launch_stream)
1991        } else {
1992            self.provider
1993                .free_join_execute_u64_recorded(&bufs, &fj_plan, launch_stream)
1994        };
1995        match outcome {
1996            Ok(buf) => {
1997                self.free_join_dispatch_count += 1;
1998                Ok(Some(buf))
1999            }
2000            Err(err) => wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "free-join", err),
2001        }
2002    }
2003
2004    /// Factorized Free Join count-by-root: fused dispatch
2005    /// for `count` aggregates over FreeJoin-marked general multiway
2006    /// bodies. The plan is derived like [`Self::try_dispatch_free_join`],
2007    /// with one refinement:
2008    /// trailing cover variables PRIVATE to their atom (single global
2009    /// occurrence, not the group key) are left unconsumed — the
2010    /// engine multiplies their live trie-range lengths instead of
2011    /// expanding the frontier (the d-representation count). Count
2012    /// semantics match the unfused pipeline exactly: the lowered
2013    /// group input is a non-deduplicating projection of the join
2014    /// output, so both paths count distinct full body bindings.
2015    /// u32/Symbol width only (the recorded groupby's engine-wide key
2016    /// support); every decline silently leaves the unfused path to
2017    /// run.
2018    fn try_dispatch_free_join_count(
2019        &mut self,
2020        node: &RirNode,
2021        group_cols: &[ProjectExpr],
2022    ) -> Result<Option<CudaBuffer>> {
2023        use xlog_cuda::provider::{FjNode, FjPlan, FjSubAtom};
2024
2025        if free_join_disabled() {
2026            return Ok(None);
2027        }
2028        let RirNode::MultiWayJoin {
2029            inputs,
2030            slot_vars,
2031            plan,
2032            ..
2033        } = node
2034        else {
2035            return Ok(None);
2036        };
2037        if !matches!(plan, Some(MultiwayPlan::FreeJoin)) {
2038            return Ok(None);
2039        }
2040        if inputs.len() < 3 {
2041            return Ok(None);
2042        }
2043        let mut bufs: Vec<&CudaBuffer> = Vec::with_capacity(inputs.len());
2044        for input in inputs {
2045            let RirNode::Scan { rel } = input else {
2046                return Ok(None);
2047            };
2048            let name = match self.get_rel_name(*rel) {
2049                Some(s) => s.to_string(),
2050                None => return Ok(None),
2051            };
2052            let Some(buf) = self.store.get(&name) else {
2053                return Ok(None);
2054            };
2055            let four_byte = (0..buf.arity()).all(|i| {
2056                matches!(
2057                    buf.schema().column_type(i),
2058                    Some(ScalarType::U32 | ScalarType::Symbol)
2059                )
2060            });
2061            if !four_byte {
2062                return Ok(None);
2063            }
2064            bufs.push(buf);
2065        }
2066        // Dense variable ids (same scheme as the materialize
2067        // dispatcher).
2068        let mut class_to_var: Vec<u32> = Vec::new();
2069        let mut dense = |class: u32| -> usize {
2070            match class_to_var.iter().position(|c| *c == class) {
2071                Some(i) => i,
2072                None => {
2073                    class_to_var.push(class);
2074                    class_to_var.len() - 1
2075                }
2076            }
2077        };
2078        let mut atom_vars: Vec<Vec<usize>> = Vec::with_capacity(slot_vars.len());
2079        for (i, cols) in slot_vars.iter().enumerate() {
2080            if cols.len() != bufs[i].arity() {
2081                return Ok(None);
2082            }
2083            let mut vars = Vec::with_capacity(cols.len());
2084            for c in cols {
2085                let Some(class) = c else { return Ok(None) };
2086                vars.push(dense(*class));
2087            }
2088            atom_vars.push(vars);
2089        }
2090        let num_vars = class_to_var.len();
2091        // Group key: the group projection's column 0 through the
2092        // concatenated-inputs column space (FreeJoin provenance).
2093        let mut col_to_var: Vec<usize> = Vec::new();
2094        for vars in &atom_vars {
2095            col_to_var.extend(vars.iter().copied());
2096        }
2097        let Some(ProjectExpr::Column(key_col)) = group_cols.first() else {
2098            return Ok(None);
2099        };
2100        let Some(&group_var) = col_to_var.get(*key_col) else {
2101            return Ok(None);
2102        };
2103        // Variable occurrence counts: single-occurrence variables are
2104        // private to their atom and (unless they key the group)
2105        // prunable as trailing covers.
2106        let mut occurrences = vec![0usize; num_vars];
2107        for vars in &atom_vars {
2108            for &v in vars {
2109                occurrences[v] += 1;
2110            }
2111        }
2112        // binary2fj with trailing-private pruning.
2113        let mut bound_at: Vec<Option<usize>> = vec![None; num_vars];
2114        let mut nodes: Vec<FjNode> = Vec::new();
2115        for (i, vars) in atom_vars.iter().enumerate() {
2116            let split = vars.iter().take_while(|v| bound_at[**v].is_some()).count();
2117            if vars[split..].iter().any(|v| bound_at[*v].is_some()) {
2118                // Non-prefix body (see the materialize dispatcher).
2119                return Ok(None);
2120            }
2121            let mut keep_end = vars.len();
2122            while keep_end > split {
2123                let v = vars[keep_end - 1];
2124                if occurrences[v] == 1 && v != group_var {
2125                    keep_end -= 1;
2126                } else {
2127                    break;
2128                }
2129            }
2130            if split == 0 && keep_end == 0 {
2131                // Fully-private atom: nothing binds or probes it
2132                // (cannot arise from the promoter — keyless joins
2133                // are rejected there — but decline defensively).
2134                return Ok(None);
2135            }
2136            if split > 0 {
2137                let probe = FjSubAtom {
2138                    input_idx: i,
2139                    var_positions: vars[..split].to_vec(),
2140                };
2141                if nodes.is_empty() {
2142                    return Ok(None);
2143                }
2144                let target = vars[..split]
2145                    .iter()
2146                    .map(|v| bound_at[*v].expect("prefix vars are bound"))
2147                    .max()
2148                    .expect("split > 0");
2149                nodes[target].probes.push(probe);
2150            }
2151            if split < keep_end {
2152                let cover_vars = vars[split..keep_end].to_vec();
2153                let mut seen = HashSet::new();
2154                if !cover_vars.iter().all(|v| seen.insert(*v)) {
2155                    return Ok(None);
2156                }
2157                let k = nodes.len();
2158                for v in &cover_vars {
2159                    bound_at[*v] = Some(k);
2160                }
2161                nodes.push(FjNode {
2162                    cover: FjSubAtom {
2163                        input_idx: i,
2164                        var_positions: cover_vars,
2165                    },
2166                    probes: Vec::new(),
2167                });
2168            } else if nodes.is_empty() {
2169                return Ok(None);
2170            }
2171        }
2172        if bound_at[group_var].is_none() {
2173            return Ok(None);
2174        }
2175        let fj_plan = FjPlan {
2176            num_vars,
2177            nodes,
2178            output_vars: vec![group_var],
2179        };
2180        if self.provider.memory().runtime().is_none() {
2181            return Ok(None);
2182        }
2183        let Some(launch_stream) = self.wcoj_dispatch_stream_or_init() else {
2184            return Ok(None);
2185        };
2186        match self
2187            .provider
2188            .free_join_count_by_root_u32_recorded(&bufs, &fj_plan, launch_stream)
2189        {
2190            Ok(buf) => {
2191                self.free_join_dispatch_count += 1;
2192                self.wcoj_groupby_fusion_dispatch_count += 1;
2193                Ok(Some(buf))
2194            }
2195            Err(err) => {
2196                wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "free-join-count", err)
2197            }
2198        }
2199    }
2200
2201    /// Count of times the fused group-by-root count hook produced a
2202    /// result and the executor installed it (vs. silently falling back to
2203    /// the materialize+groupby path with the same answer).
2204    pub fn wcoj_groupby_fusion_dispatch_count(&self) -> u64 {
2205        self.wcoj_groupby_fusion_dispatch_count
2206    }
2207
2208    /// Aggregate-fused WCOJ: dispatch
2209    /// `GroupBy { Project { MultiWayJoin(triangle) }, key_cols: [0],
2210    /// aggs: [(_, Count | Sum | Min | Max)] }` through the fused
2211    /// group-by-root kernels, which never materialize the triangle rows.
2212    /// The group key column 0 is the variable-order root X in the canonical
2213    /// triangle output, the condition under which one-pass aggregate
2214    /// propagation over the variable order is sound. For Sum/Min/Max the
2215    /// aggregate value column must itself map to a triangle output variable
2216    /// (Y or Z; plain U32 on the 4-byte path, uniform U64 on the 8-byte
2217    /// path) so the kernel can read it during traversal; Count ignores the
2218    /// value column. Every structural mismatch (other
2219    /// keys/aggs, computed projections, value column not Y/Z or not U32,
2220    /// non-triangle shape, non-4-byte width, missing buffers/runtime, kill
2221    /// switch) returns `Ok(None)` — silent decline to the existing
2222    /// materialize+groupby path. Pipeline errors route through
2223    /// [`wcoj_decline_on_error`] (counted; `XLOG_WCOJ_STRICT=1` propagates).
2224    pub(super) fn try_dispatch_wcoj_groupby_root_agg(
2225        &mut self,
2226        input: &RirNode,
2227        key_cols: &[usize],
2228        aggs: &[(usize, xlog_core::AggOp)],
2229    ) -> Result<Option<CudaBuffer>> {
2230        use xlog_core::AggOp;
2231        if wcoj_groupby_fusion_disabled() {
2232            return Ok(None);
2233        }
2234        if key_cols != [0] {
2235            return Ok(None);
2236        }
2237        if aggs.len() != 1 {
2238            return Ok(None);
2239        }
2240        let (agg_col, agg_op) = aggs[0];
2241        if !matches!(agg_op, AggOp::Count | AggOp::Sum | AggOp::Min | AggOp::Max) {
2242            return Ok(None);
2243        }
2244        let RirNode::Project {
2245            input: multiway,
2246            columns,
2247        } = input
2248        else {
2249            return Ok(None);
2250        };
2251        // The group projection must contain only plain column references.
2252        if columns.is_empty() || !columns.iter().all(|c| matches!(c, ProjectExpr::Column(_))) {
2253            return Ok(None);
2254        }
2255        // Triangle and 4-cycle place the variable-order root at output
2256        // position 0 by construction, so their group key must be
2257        // Column(0). The K-clique root is plan-dependent; its branch
2258        // validates the planned root itself.
2259        let key_is_col0 = matches!(columns[0], ProjectExpr::Column(0));
2260        // For value-reading aggregates the value column must map to a
2261        // non-key join output variable the per-shape kernel can see
2262        // (triangle: Y/Z; 4-cycle: X/Y/Z). Resolve the raw output column
2263        // here (the key itself and non-column refs decline); the
2264        // per-shape mapping happens after shape match. Count never reads
2265        // the value column, so any pass-through value columns are
2266        // admissible.
2267        let agg_value_col = if matches!(agg_op, AggOp::Count) {
2268            None
2269        } else {
2270            match columns.get(agg_col) {
2271                Some(ProjectExpr::Column(c)) if *c >= 1 => Some(*c),
2272                _ => return Ok(None),
2273            }
2274        };
2275        let Some(matched) = match_multiway_triangle(multiway) else {
2276            // 4-cycle sibling of the triangle fusion (count + sum/min/max).
2277            // The 4-cycle root is output column 0 by
2278            // construction, so gate on the key here like the triangle.
2279            if key_is_col0 {
2280                if let Some(buf) =
2281                    self.try_dispatch_wcoj_groupby_root_agg_4cycle(multiway, agg_op, agg_value_col)?
2282                {
2283                    return Ok(Some(buf));
2284                }
2285            }
2286            // K-clique (K = 5, 6) count sibling. The clique root is
2287            // plan-dependent, so the helper validates the group key against
2288            // the planned root itself instead of key_is_col0. Count-only
2289            // (no fused clique sum/min/max kernels).
2290            if !matches!(agg_op, AggOp::Count) {
2291                return Ok(None);
2292            }
2293            if let Some(buf) =
2294                self.try_dispatch_wcoj_groupby_root_count_clique(multiway, columns)?
2295            {
2296                return Ok(Some(buf));
2297            }
2298            // Factorized Free Join count-by-root for
2299            // FreeJoin-marked general multiway bodies (any shape the
2300            // dedicated fused kernels above declined).
2301            return self.try_dispatch_free_join_count(multiway, columns);
2302        };
2303        // Triangle output space: col 1 = Y, col 2 = Z. Anything else
2304        // (e.g. an out-of-range ref) declines.
2305        let agg_value = match agg_value_col {
2306            None => None,
2307            Some(1) => Some(WcojRootAggValue::Y),
2308            Some(2) => Some(WcojRootAggValue::Z),
2309            Some(_) => return Ok(None),
2310        };
2311        if !key_is_col0 {
2312            return Ok(None);
2313        }
2314        let name_xy = match self.get_rel_name(matched.rel_xy) {
2315            Some(s) => s.to_string(),
2316            None => return Ok(None),
2317        };
2318        let name_yz = match self.get_rel_name(matched.rel_yz) {
2319            Some(s) => s.to_string(),
2320            None => return Ok(None),
2321        };
2322        let name_xz = match self.get_rel_name(matched.rel_xz) {
2323            Some(s) => s.to_string(),
2324            None => return Ok(None),
2325        };
2326        let buf_xy = match self.store.get(&name_xy) {
2327            Some(b) => b,
2328            None => return Ok(None),
2329        };
2330        let buf_yz = match self.store.get(&name_yz) {
2331            Some(b) => b,
2332            None => return Ok(None),
2333        };
2334        let buf_xz = match self.store.get(&name_xz) {
2335            Some(b) => b,
2336            None => return Ok(None),
2337        };
2338        let width = match (
2339            classify_two_col_wcoj_width(buf_xy),
2340            classify_two_col_wcoj_width(buf_yz),
2341            classify_two_col_wcoj_width(buf_xz),
2342        ) {
2343            (
2344                Some(WcojKeyWidth::FourByte),
2345                Some(WcojKeyWidth::FourByte),
2346                Some(WcojKeyWidth::FourByte),
2347            ) => WcojKeyWidth::FourByte,
2348            (
2349                Some(WcojKeyWidth::EightByte),
2350                Some(WcojKeyWidth::EightByte),
2351                Some(WcojKeyWidth::EightByte),
2352            ) => WcojKeyWidth::EightByte,
2353            _ => return Ok(None),
2354        };
2355        // Fail-open cost-model loss veto: decline the FUSED triangle
2356        // aggregate to the unfused materialize+groupby only when the cost
2357        // model has stats AND the triangle is provably small — the
2358        // small-case the base triangle cost model already declines, which
2359        // the fused path otherwise bypasses. Fail-open: stats absent / any
2360        // large input → fuse (every measured fused-aggregate win preserved).
2361        // The rarer
2362        // fused 4-cycle/K-clique sub-paths inherit their base shapes'
2363        // gating posture and are not separately vetoed here.
2364        {
2365            let slot_rels = [matched.rel_xy, matched.rel_yz, matched.rel_xz];
2366            let model = super::wcoj_cost_model::build_wcoj_cost_model(&self.config);
2367            let ctx = super::wcoj_cost_model::WcojDispatchCtx {
2368                stats: &self.stats,
2369                launch_stream: StreamId::DEFAULT,
2370                width,
2371                slot_rels: &slot_rels,
2372            };
2373            if model.factorized_loss_veto(&ctx) {
2374                return Ok(None);
2375            }
2376        }
2377        // Sum/Min/Max are arithmetic: on the 4-byte path the columns
2378        // supplying the value must be plain U32 (Symbol ids are not
2379        // summable/orderable data — and the unfused groupby rejects Symbol
2380        // values too, so declining keeps both paths aligned). On the
2381        // 8-byte path the width classifier already guarantees uniform U64
2382        // columns, which the u64 fused kernels consume directly.
2383        if matches!(width, WcojKeyWidth::FourByte) {
2384            match agg_value {
2385                Some(WcojRootAggValue::Y) => {
2386                    if buf_xy.schema().column_type(1) != Some(xlog_core::ScalarType::U32) {
2387                        return Ok(None);
2388                    }
2389                }
2390                Some(WcojRootAggValue::Z) => {
2391                    if buf_yz.schema().column_type(1) != Some(xlog_core::ScalarType::U32)
2392                        || buf_xz.schema().column_type(1) != Some(xlog_core::ScalarType::U32)
2393                    {
2394                        return Ok(None);
2395                    }
2396                }
2397                None => {}
2398            }
2399        }
2400        if self.provider.memory().runtime().is_none() {
2401            return Ok(None);
2402        }
2403        let Some(launch_stream) = self.wcoj_dispatch_stream_or_init() else {
2404            return Ok(None);
2405        };
2406        let result = match (agg_value, width) {
2407            (None, WcojKeyWidth::FourByte) => {
2408                self.provider.wcoj_triangle_groupby_root_count_u32_recorded(
2409                    buf_xy,
2410                    buf_yz,
2411                    buf_xz,
2412                    wcoj_block_work_unit(),
2413                    launch_stream,
2414                )
2415            }
2416            (None, WcojKeyWidth::EightByte) => {
2417                self.provider.wcoj_triangle_groupby_root_count_u64_recorded(
2418                    buf_xy,
2419                    buf_yz,
2420                    buf_xz,
2421                    wcoj_block_work_unit(),
2422                    launch_stream,
2423                )
2424            }
2425            (Some(value), WcojKeyWidth::FourByte) => {
2426                self.provider.wcoj_triangle_groupby_root_agg_u32_recorded(
2427                    buf_xy,
2428                    buf_yz,
2429                    buf_xz,
2430                    agg_op,
2431                    value,
2432                    wcoj_block_work_unit(),
2433                    launch_stream,
2434                )
2435            }
2436            // U64-key sum/min/max through the u64 fused
2437            // kernels (value columns are uniform U64 by classification).
2438            (Some(value), WcojKeyWidth::EightByte) => {
2439                self.provider.wcoj_triangle_groupby_root_agg_u64_recorded(
2440                    buf_xy,
2441                    buf_yz,
2442                    buf_xz,
2443                    agg_op,
2444                    value,
2445                    wcoj_block_work_unit(),
2446                    launch_stream,
2447                )
2448            }
2449        };
2450        match result {
2451            Ok(buf) => {
2452                self.wcoj_groupby_fusion_dispatch_count += 1;
2453                Ok(Some(buf))
2454            }
2455            Err(err) => {
2456                wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "groupby-fusion", err)
2457            }
2458        }
2459    }
2460
2461    /// Aggregate-fused WCOJ, 4-cycle: dispatch the inner
2462    /// `MultiWayJoin(4-cycle)` of a count/sum/min/max-by-root aggregate
2463    /// through the fused group-by-root kernels, which never materialize
2464    /// the 4-cycle rows. Both accepted `output_columns` layouts place the
2465    /// variable-order root W at output position 0, so the caller's
2466    /// `key_cols == [0]` + `columns[0] == Column(0)` checks pin the group
2467    /// key to W — the soundness condition for one-pass aggregate
2468    /// propagation.
2469    ///
2470    /// Gating decision: the fused path
2471    /// mirrors the triangle fusion — enabled by default behind the shared
2472    /// `XLOG_DISABLE_WCOJ_GROUPBY_FUSION` kill switch (checked by the
2473    /// caller). The `XLOG_USE_WCOJ_4CYCLE*` gates govern only the
2474    /// NON-aggregate 4-cycle materialize dispatch (opt-in pending its own
2475    /// default-on evidence); they are intentionally not consulted here,
2476    /// because a declined or kill-switched fusion falls back to that
2477    /// independently-gated path (default: embedded binary fallback).
2478    ///
2479    /// Value-column mapping (same rules as the triangle): for
2480    /// Sum/Min/Max the aggregate value must map to a 4-cycle output
2481    /// variable the kernel can read during traversal — X (col 1, from
2482    /// e1.col1), Y (col 2, from e2.col1) or Z (col 3, from e3.col1) —
2483    /// with plain U32 type. Symbol values decline (the unfused groupby
2484    /// rejects them with the same value-type error, so fused and
2485    /// kill-switch runs fail identically). Count admits any pass-through
2486    /// value column and, uniquely, uniform U64 keys.
2487    ///
2488    /// Sum/Min/Max are 4-byte-only; u64-key 4-cycle sum/min/max fusion is
2489    /// deferred and declines silently. Pipeline errors route through
2490    /// [`wcoj_decline_on_error`] (counted; `XLOG_WCOJ_STRICT=1`
2491    /// propagates).
2492    fn try_dispatch_wcoj_groupby_root_agg_4cycle(
2493        &mut self,
2494        multiway: &RirNode,
2495        agg_op: xlog_core::AggOp,
2496        agg_value_col: Option<usize>,
2497    ) -> Result<Option<CudaBuffer>> {
2498        use xlog_core::AggOp;
2499        let Some(matched) = match_multiway_4cycle(multiway) else {
2500            return Ok(None);
2501        };
2502        // 4-cycle output space: col 1 = X, col 2 = Y, col 3 = Z. Anything
2503        // else (e.g. an out-of-range ref) declines.
2504        let agg_value = match agg_value_col {
2505            None => None,
2506            Some(1) => Some(Wcoj4CycleRootAggValue::X),
2507            Some(2) => Some(Wcoj4CycleRootAggValue::Y),
2508            Some(3) => Some(Wcoj4CycleRootAggValue::Z),
2509            Some(_) => return Ok(None),
2510        };
2511        let name_e1 = match self.get_rel_name(matched.rel_e1) {
2512            Some(s) => s.to_string(),
2513            None => return Ok(None),
2514        };
2515        let name_e2 = match self.get_rel_name(matched.rel_e2) {
2516            Some(s) => s.to_string(),
2517            None => return Ok(None),
2518        };
2519        let name_e3 = match self.get_rel_name(matched.rel_e3) {
2520            Some(s) => s.to_string(),
2521            None => return Ok(None),
2522        };
2523        let name_e4 = match self.get_rel_name(matched.rel_e4) {
2524            Some(s) => s.to_string(),
2525            None => return Ok(None),
2526        };
2527        let buf_e1 = match self.store.get(&name_e1) {
2528            Some(b) => b,
2529            None => return Ok(None),
2530        };
2531        let buf_e2 = match self.store.get(&name_e2) {
2532            Some(b) => b,
2533            None => return Ok(None),
2534        };
2535        let buf_e3 = match self.store.get(&name_e3) {
2536            Some(b) => b,
2537            None => return Ok(None),
2538        };
2539        let buf_e4 = match self.store.get(&name_e4) {
2540            Some(b) => b,
2541            None => return Ok(None),
2542        };
2543        let width = match (
2544            classify_two_col_wcoj_width(buf_e1),
2545            classify_two_col_wcoj_width(buf_e2),
2546            classify_two_col_wcoj_width(buf_e3),
2547            classify_two_col_wcoj_width(buf_e4),
2548        ) {
2549            (
2550                Some(WcojKeyWidth::FourByte),
2551                Some(WcojKeyWidth::FourByte),
2552                Some(WcojKeyWidth::FourByte),
2553                Some(WcojKeyWidth::FourByte),
2554            ) => WcojKeyWidth::FourByte,
2555            (
2556                Some(WcojKeyWidth::EightByte),
2557                Some(WcojKeyWidth::EightByte),
2558                Some(WcojKeyWidth::EightByte),
2559                Some(WcojKeyWidth::EightByte),
2560            ) => WcojKeyWidth::EightByte,
2561            _ => return Ok(None),
2562        };
2563        // Sum/Min/Max are 4-byte-only (u64-key 4-cycle sum/min/max fusion
2564        // is deferred and declines to materialize+groupby).
2565        if agg_value.is_some() && width != WcojKeyWidth::FourByte {
2566            return Ok(None);
2567        }
2568        // Sum/Min/Max are arithmetic: the column supplying the value must
2569        // be plain U32 (Symbol ids are not summable/orderable data — and
2570        // the unfused groupby rejects Symbol values too, so declining
2571        // keeps both paths aligned). The checked column matches the
2572        // materialized (W, X, Y, Z) baseline schema's type source
2573        // (`build_4cycle_head_schema`): X from e1.col1, Y from e2.col1,
2574        // Z from e3.col1.
2575        let value_source = match agg_value {
2576            None => None,
2577            Some(Wcoj4CycleRootAggValue::X) => Some(buf_e1),
2578            Some(Wcoj4CycleRootAggValue::Y) => Some(buf_e2),
2579            Some(Wcoj4CycleRootAggValue::Z) => Some(buf_e3),
2580        };
2581        if let Some(src) = value_source {
2582            if src.schema().column_type(1) != Some(xlog_core::ScalarType::U32) {
2583                return Ok(None);
2584            }
2585        }
2586        if self.provider.memory().runtime().is_none() {
2587            return Ok(None);
2588        }
2589        let Some(launch_stream) = self.wcoj_dispatch_stream_or_init() else {
2590            return Ok(None);
2591        };
2592        debug_assert!(
2593            agg_value.is_some() || matches!(agg_op, AggOp::Count),
2594            "non-Count aggregates resolve a value column above"
2595        );
2596        let result = match (agg_value, width) {
2597            (None, WcojKeyWidth::FourByte) => {
2598                self.provider.wcoj_4cycle_groupby_root_count_u32_recorded(
2599                    buf_e1,
2600                    buf_e2,
2601                    buf_e3,
2602                    buf_e4,
2603                    wcoj_block_work_unit(),
2604                    launch_stream,
2605                )
2606            }
2607            // U64-key count through the metadata-driven
2608            // segment reduction (the recorded groupby is U32/Symbol-key
2609            // only).
2610            (None, WcojKeyWidth::EightByte) => {
2611                self.provider.wcoj_4cycle_groupby_root_count_u64_recorded(
2612                    buf_e1,
2613                    buf_e2,
2614                    buf_e3,
2615                    buf_e4,
2616                    wcoj_block_work_unit(),
2617                    launch_stream,
2618                )
2619            }
2620            (Some(value), _) => self.provider.wcoj_4cycle_groupby_root_agg_u32_recorded(
2621                buf_e1,
2622                buf_e2,
2623                buf_e3,
2624                buf_e4,
2625                agg_op,
2626                value,
2627                wcoj_block_work_unit(),
2628                launch_stream,
2629            ),
2630        };
2631        match result {
2632            Ok(buf) => {
2633                self.wcoj_groupby_fusion_dispatch_count += 1;
2634                Ok(Some(buf))
2635            }
2636            Err(err) => wcoj_decline_on_error(
2637                &mut self.wcoj_error_decline_count,
2638                "groupby-fusion-4cycle",
2639                err,
2640            ),
2641        }
2642    }
2643
2644    /// Count of times the WCOJ 4-cycle hook
2645    /// produced a result and the executor installed it. Tracked
2646    /// separately from triangle so tests can pin which shape
2647    /// dispatched.
2648    pub fn wcoj_4cycle_dispatch_count(&self) -> u64 {
2649        self.wcoj_4cycle_dispatch_count
2650    }
2651
2652    /// Count of times a two-atom `ChainJoin` routed through the chain
2653    /// dispatcher instead of the embedded binary fallback.
2654    pub fn chain_dispatch_count(&self) -> u64 {
2655        self.chain_dispatch_count
2656    }
2657
2658    /// Count of times `execute_join` routed an inner-join
2659    /// to the nested-loop provider entry point because the
2660    /// eligibility predicate + Cartesian-product threshold both
2661    /// held. Tests use this counter to assert that the nested-loop path
2662    /// actually fired vs. silently falling back to hash with the
2663    /// same answer.
2664    pub fn nested_loop_dispatch_count(&self) -> u64 {
2665        self.nested_loop_dispatch_count
2666    }
2667
2668    /// ChainJoin dispatch. Shape match is done on the production
2669    /// `ChainJoin` emitted by the promoter.
2670    ///
2671    /// Route order:
2672    ///   1. sorted eligible U32/Symbol inputs -> sort-merge
2673    ///   2. threshold eligible U32/Symbol inputs -> nested loop
2674    ///   3. otherwise -> existing hash_join_v2 provider path
2675    ///
2676    /// The final projection uses the captured `output_columns`, so
2677    /// row semantics match `MultiWayJoin.fallback`.
2678    pub(super) fn try_dispatch_chain_on_body(
2679        &mut self,
2680        body: &RirNode,
2681    ) -> Result<Option<CudaBuffer>> {
2682        if !chain_dispatch_enabled() {
2683            return Ok(None);
2684        }
2685        let Some(matched) = match_chain_join(body) else {
2686            return Ok(None);
2687        };
2688
2689        let name_left = match self.get_rel_name(matched.rel_left) {
2690            Some(s) => s.to_string(),
2691            None => return Ok(None),
2692        };
2693        let name_right = match self.get_rel_name(matched.rel_right) {
2694            Some(s) => s.to_string(),
2695            None => return Ok(None),
2696        };
2697        let left = match self.store.get(&name_left) {
2698            Some(buf) => buf,
2699            None => return Ok(None),
2700        };
2701        let right = match self.store.get(&name_right) {
2702            Some(buf) => buf,
2703            None => return Ok(None),
2704        };
2705
2706        let num_left = self.provider.device_row_count(left)? as u64;
2707        let num_right = self.provider.device_row_count(right)? as u64;
2708        let in_threshold = num_left
2709            .checked_mul(num_right)
2710            .map(|p| p <= NESTED_LOOP_TOTAL_THRESHOLD)
2711            .unwrap_or(false);
2712        let four_byte = matches!(
2713            classify_two_col_wcoj_width(left),
2714            Some(WcojKeyWidth::FourByte)
2715        ) && matches!(
2716            classify_two_col_wcoj_width(right),
2717            Some(WcojKeyWidth::FourByte)
2718        );
2719
2720        let mut used_nested_loop = false;
2721        let joined = if four_byte {
2722            let left_sorted = self
2723                .provider
2724                .is_sorted_ascending_u32(left, matched.left_key)
2725                .unwrap_or(false);
2726            let right_sorted = self
2727                .provider
2728                .is_sorted_ascending_u32(right, matched.right_key)
2729                .unwrap_or(false);
2730            if left_sorted && right_sorted {
2731                if in_threshold {
2732                    self.provider.sort_merge_join_v2_inner_u32_1key(
2733                        left,
2734                        right,
2735                        matched.left_key,
2736                        matched.right_key,
2737                    )
2738                } else {
2739                    let capacity = usize::try_from(num_left.min(num_right)).unwrap_or(usize::MAX);
2740                    self.provider.sort_merge_join_v2_inner_u32_1key_bounded(
2741                        left,
2742                        right,
2743                        matched.left_key,
2744                        matched.right_key,
2745                        capacity,
2746                    )
2747                }
2748            } else if in_threshold {
2749                used_nested_loop = true;
2750                self.provider.nested_loop_join_v2_inner_u32_1key(
2751                    left,
2752                    right,
2753                    matched.left_key,
2754                    matched.right_key,
2755                )
2756            } else {
2757                self.provider.hash_join_v2(
2758                    left,
2759                    right,
2760                    &[matched.left_key],
2761                    &[matched.right_key],
2762                    CudaJoinType::Inner,
2763                )
2764            }
2765        } else {
2766            self.provider.hash_join_v2(
2767                left,
2768                right,
2769                &[matched.left_key],
2770                &[matched.right_key],
2771                CudaJoinType::Inner,
2772            )
2773        };
2774
2775        let joined = match joined {
2776            Ok(buf) => buf,
2777            Err(err) => {
2778                return wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "chain-join", err)
2779            }
2780        };
2781        let projected = match self.execute_project(&joined, &matched.output_columns) {
2782            Ok(buf) => buf,
2783            Err(err) => {
2784                return wcoj_decline_on_error(
2785                    &mut self.wcoj_error_decline_count,
2786                    "chain-join-project",
2787                    err,
2788                )
2789            }
2790        };
2791        self.stats.record_join_result(
2792            matched.rel_left,
2793            matched.rel_right,
2794            vec![matched.left_key],
2795            vec![matched.right_key],
2796            num_left.saturating_mul(num_right),
2797            joined.num_rows(),
2798        );
2799        if used_nested_loop {
2800            self.nested_loop_dispatch_count += 1;
2801        }
2802        self.chain_dispatch_count += 1;
2803        Ok(Some(projected))
2804    }
2805
2806    /// Try to dispatch a non-recursive rule
2807    /// through the GPU 4-cycle WCOJ kernel.
2808    ///
2809    /// Decision tree (highest → lowest):
2810    ///   1. Hard kill switch (`wcoj_4cycle_dispatch_disabled` /
2811    ///      `XLOG_DISABLE_WCOJ_4CYCLE=1`) → no dispatch.
2812    ///   2. Force gate (`wcoj_4cycle_dispatch=Some(true)` /
2813    ///      `XLOG_USE_WCOJ_4CYCLE=1`) → kernel runs.
2814    ///   3. Force-Some(false) → no dispatch.
2815    ///   4. Stats opt-in (config / env, default off) →
2816    ///      cardinality model decides whether the kernel runs.
2817    ///
2818    /// Returns `Ok(Some(buffer))` on dispatch; `Ok(None)`
2819    /// silently otherwise. The caller installs the buffer or
2820    /// descends into `MultiWayJoin.fallback`.
2821    pub(super) fn try_dispatch_wcoj_4cycle(
2822        &mut self,
2823        rule: &CompiledRule,
2824    ) -> Result<Option<CudaBuffer>> {
2825        // Body-keyed entry. Rule-keyed callers stay
2826        // byte-identical via this thin wrapper.
2827        self.try_dispatch_wcoj_4cycle_on_body(&rule.body)
2828    }
2829
2830    /// Body-keyed entry point: same gate / pattern-match / dispatch
2831    /// logic as `try_dispatch_wcoj_4cycle`, keyed on `body` rather
2832    /// than `&CompiledRule`. See
2833    /// `try_dispatch_wcoj_triangle_on_body` for the rationale.
2834    pub(super) fn try_dispatch_wcoj_4cycle_on_body(
2835        &mut self,
2836        body: &RirNode,
2837    ) -> Result<Option<CudaBuffer>> {
2838        // 1. Kill switch.
2839        if wcoj_4cycle_disabled(self.config.wcoj_4cycle_dispatch_disabled) {
2840            return Ok(None);
2841        }
2842        // 2. Force gate.
2843        let force_override = self.config.wcoj_4cycle_dispatch;
2844        let force_on = wcoj_4cycle_gate_enabled(force_override);
2845        let mode = if force_on {
2846            DispatchMode::Force
2847        } else {
2848            // Force-Some(false) is explicit off — adaptive does
2849            // NOT resurrect it.
2850            if matches!(force_override, Some(false)) {
2851                return Ok(None);
2852            }
2853            let adaptive_override = self.config.wcoj_4cycle_dispatch_adaptive;
2854            if wcoj_4cycle_adaptive_enabled(adaptive_override) {
2855                DispatchMode::CostModel
2856            } else {
2857                return Ok(None);
2858            }
2859        };
2860
2861        // 3. Match the canonical 4-cycle MultiWayJoin.
2862        let Some(matched) = match_multiway_4cycle(body) else {
2863            return Ok(None);
2864        };
2865
2866        // 4. Resolve rel IDs to predicate names.
2867        let name_e1 = match self.get_rel_name(matched.rel_e1) {
2868            Some(s) => s.to_string(),
2869            None => return Ok(None),
2870        };
2871        let name_e2 = match self.get_rel_name(matched.rel_e2) {
2872            Some(s) => s.to_string(),
2873            None => return Ok(None),
2874        };
2875        let name_e3 = match self.get_rel_name(matched.rel_e3) {
2876            Some(s) => s.to_string(),
2877            None => return Ok(None),
2878        };
2879        let name_e4 = match self.get_rel_name(matched.rel_e4) {
2880            Some(s) => s.to_string(),
2881            None => return Ok(None),
2882        };
2883
2884        // 5. Look up input buffers + classify their key widths.
2885        // All four slots must share the same width.
2886        let buf_e1 = match self.store.get(&name_e1) {
2887            Some(b) => b,
2888            None => return Ok(None),
2889        };
2890        let buf_e2 = match self.store.get(&name_e2) {
2891            Some(b) => b,
2892            None => return Ok(None),
2893        };
2894        let buf_e3 = match self.store.get(&name_e3) {
2895            Some(b) => b,
2896            None => return Ok(None),
2897        };
2898        let buf_e4 = match self.store.get(&name_e4) {
2899            Some(b) => b,
2900            None => return Ok(None),
2901        };
2902        let width = match (
2903            classify_two_col_wcoj_width(buf_e1),
2904            classify_two_col_wcoj_width(buf_e2),
2905            classify_two_col_wcoj_width(buf_e3),
2906            classify_two_col_wcoj_width(buf_e4),
2907        ) {
2908            (Some(a), Some(b), Some(c), Some(d)) if a == b && b == c && c == d => a,
2909            _ => return Ok(None),
2910        };
2911
2912        // 6. Resolve the cached WCOJ launch stream (shared with
2913        // triangle dispatch; the stream resolver is now
2914        // shape-agnostic).
2915        if self.provider.memory().runtime().is_none() {
2916            return Ok(None);
2917        }
2918        let launch_stream = match self.wcoj_dispatch_stream_or_init() {
2919            Some(s) => s,
2920            None => return Ok(None),
2921        };
2922
2923        // 7. Stats-backed mode: route the decision through
2924        // the cardinality WCOJ cost model.
2925        if mode == DispatchMode::CostModel {
2926            // Factory selects per RuntimeConfig precedence.
2927            let model = super::wcoj_cost_model::build_wcoj_cost_model(&self.config);
2928            let slot_rels = [
2929                matched.rel_e1,
2930                matched.rel_e2,
2931                matched.rel_e3,
2932                matched.rel_e4,
2933            ];
2934            let ctx = super::wcoj_cost_model::WcojDispatchCtx {
2935                stats: &self.stats,
2936                launch_stream,
2937                width,
2938                slot_rels: &slot_rels,
2939            };
2940            let dispatch = model.should_dispatch_4cycle(&ctx);
2941            if !dispatch {
2942                return Ok(None);
2943            }
2944        }
2945
2946        // Extract var_order. None preserves default-leader dispatch
2947        // bit-identically.
2948        let var_order_opt: Option<&VariableOrder> = match body {
2949            RirNode::MultiWayJoin { var_order, .. } => var_order.as_ref(),
2950            _ => None,
2951        };
2952
2953        // 8. Run layout (4× per slot) + 4-cycle kernel. Failure
2954        // → silent fallback per slice contract.
2955        let dispatch_result = self.run_wcoj_4cycle_pipeline(
2956            buf_e1,
2957            buf_e2,
2958            buf_e3,
2959            buf_e4,
2960            launch_stream,
2961            width,
2962            var_order_opt,
2963        );
2964        match dispatch_result {
2965            Ok(buf) => {
2966                // Record observed selectivity.
2967                // The (rel_a, rel_b, left_keys, right_keys)
2968                // pair is derived from `var_order_opt` via
2969                // `feedback_pair_from_var_order`:
2970                //   * `var_order = None` (default config) →
2971                //     canonical `(rel_e1, rel_e2)` keys
2972                //     `[1]/[0]`.
2973                //   * `var_order = Some(_)` (non-default leader) →
2974                //     rotated pair from the feedback table. 4-cycle is
2975                //     rotation-only (every cycle edge is
2976                //     `[1]/[0]` in canonical layout), so the
2977                //     keys stay `[1]/[0]` while the pair
2978                //     itself rotates.
2979                let output_rows = Self::wcoj_output_rows(&buf);
2980                let slot_rels = [
2981                    matched.rel_e1,
2982                    matched.rel_e2,
2983                    matched.rel_e3,
2984                    matched.rel_e4,
2985                ];
2986                self.record_wcoj_feedback(&slot_rels, var_order_opt, output_rows);
2987                self.wcoj_4cycle_dispatch_count += 1;
2988                Ok(Some(buf))
2989            }
2990            Err(err) => wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "4-cycle", err),
2991        }
2992    }
2993
2994    /// Inner pipeline for 4-cycle: 4× layout construction + kernel.
2995    #[allow(clippy::too_many_arguments)]
2996    fn run_wcoj_4cycle_pipeline(
2997        &self,
2998        buf_e1: &CudaBuffer,
2999        buf_e2: &CudaBuffer,
3000        buf_e3: &CudaBuffer,
3001        buf_e4: &CudaBuffer,
3002        launch_stream: StreamId,
3003        width: WcojKeyWidth,
3004        var_order: Option<&VariableOrder>,
3005    ) -> Result<CudaBuffer> {
3006        if let Some(vo) = var_order {
3007            return self.run_wcoj_4cycle_pipeline_with_leader_order(
3008                buf_e1,
3009                buf_e2,
3010                buf_e3,
3011                buf_e4,
3012                launch_stream,
3013                width,
3014                vo,
3015            );
3016        }
3017        match width {
3018            WcojKeyWidth::FourByte => {
3019                let layout_e1 = self
3020                    .provider
3021                    .wcoj_layout_u32_recorded(buf_e1, launch_stream)?;
3022                let layout_e2 = self
3023                    .provider
3024                    .wcoj_layout_u32_recorded(buf_e2, launch_stream)?;
3025                let layout_e3 = self
3026                    .provider
3027                    .wcoj_layout_u32_recorded(buf_e3, launch_stream)?;
3028                let layout_e4 = self
3029                    .provider
3030                    .wcoj_layout_u32_recorded(buf_e4, launch_stream)?;
3031                self.provider.wcoj_4cycle_u32_recorded(
3032                    &layout_e1,
3033                    &layout_e2,
3034                    &layout_e3,
3035                    &layout_e4,
3036                    launch_stream,
3037                )
3038            }
3039            WcojKeyWidth::EightByte => {
3040                let layout_e1 = self
3041                    .provider
3042                    .wcoj_layout_u64_recorded(buf_e1, launch_stream)?;
3043                let layout_e2 = self
3044                    .provider
3045                    .wcoj_layout_u64_recorded(buf_e2, launch_stream)?;
3046                let layout_e3 = self
3047                    .provider
3048                    .wcoj_layout_u64_recorded(buf_e3, launch_stream)?;
3049                let layout_e4 = self
3050                    .provider
3051                    .wcoj_layout_u64_recorded(buf_e4, launch_stream)?;
3052                self.provider.wcoj_4cycle_u64_recorded(
3053                    &layout_e1,
3054                    &layout_e2,
3055                    &layout_e3,
3056                    &layout_e4,
3057                    launch_stream,
3058                )
3059            }
3060        }
3061    }
3062
3063    /// Pipeline for non-default 4-cycle leaders. All
3064    /// 4-cycle leaders are rotation-only (no col-swap entries
3065    /// in `lookup_perms`); kernel emits in `(a, b, c, d)` order
3066    /// per the rotated leader; final projection helper remaps
3067    /// to canonical `(W, X, Y, Z)` head order.
3068    #[allow(clippy::too_many_arguments)]
3069    fn run_wcoj_4cycle_pipeline_with_leader_order(
3070        &self,
3071        buf_e1: &CudaBuffer,
3072        buf_e2: &CudaBuffer,
3073        buf_e3: &CudaBuffer,
3074        buf_e4: &CudaBuffer,
3075        launch_stream: StreamId,
3076        width: WcojKeyWidth,
3077        var_order: &VariableOrder,
3078    ) -> Result<CudaBuffer> {
3079        let canonical: [&CudaBuffer; 4] = [buf_e1, buf_e2, buf_e3, buf_e4];
3080        let slot_inputs = self.prepare_leader_inputs(&canonical, var_order, launch_stream)?;
3081        if slot_inputs.len() != 4 {
3082            return Err(xlog_core::XlogError::Kernel(
3083                "run_wcoj_4cycle_pipeline_with_leader_order: prepare_leader_inputs must return 4 slots"
3084                    .to_string(),
3085            ));
3086        }
3087
3088        let head_schema = build_4cycle_head_schema(buf_e1, buf_e2, buf_e3)?;
3089        let perm = perm_indices_from_kernel_output_cols(&var_order.kernel_output_cols)?;
3090
3091        let kernel_out: CudaBuffer = match width {
3092            WcojKeyWidth::FourByte => {
3093                let l0 = self
3094                    .provider
3095                    .wcoj_layout_u32_recorded(&slot_inputs[0], launch_stream)?;
3096                let l1 = self
3097                    .provider
3098                    .wcoj_layout_u32_recorded(&slot_inputs[1], launch_stream)?;
3099                let l2 = self
3100                    .provider
3101                    .wcoj_layout_u32_recorded(&slot_inputs[2], launch_stream)?;
3102                let l3 = self
3103                    .provider
3104                    .wcoj_layout_u32_recorded(&slot_inputs[3], launch_stream)?;
3105                self.provider
3106                    .wcoj_4cycle_u32_recorded(&l0, &l1, &l2, &l3, launch_stream)?
3107            }
3108            WcojKeyWidth::EightByte => {
3109                let l0 = self
3110                    .provider
3111                    .wcoj_layout_u64_recorded(&slot_inputs[0], launch_stream)?;
3112                let l1 = self
3113                    .provider
3114                    .wcoj_layout_u64_recorded(&slot_inputs[1], launch_stream)?;
3115                let l2 = self
3116                    .provider
3117                    .wcoj_layout_u64_recorded(&slot_inputs[2], launch_stream)?;
3118                let l3 = self
3119                    .provider
3120                    .wcoj_layout_u64_recorded(&slot_inputs[3], launch_stream)?;
3121                self.provider
3122                    .wcoj_4cycle_u64_recorded(&l0, &l1, &l2, &l3, launch_stream)?
3123            }
3124        };
3125
3126        self.provider.wcoj_project_output_columns_recorded(
3127            &kernel_out,
3128            &perm,
3129            head_schema,
3130            launch_stream,
3131        )
3132    }
3133
3134    /// Produce **owned, materialized** kernel slot inputs
3135    /// from a canonical-order input array and a `VariableOrder`.
3136    ///
3137    /// **Public** runtime helper. Production callers are
3138    /// `run_wcoj_*_pipeline_with_leader_order` (this module); runtime tests
3139    /// in `crates/xlog-runtime/tests/test_leader_input_permutation_tables.rs` invoke it
3140    /// directly to assert per-slot schema + content against a CPU
3141    /// reference. Public visibility is intentional: there is no
3142    /// other reasonable seam for tests to inspect rotation +
3143    /// col-swap behavior, and the helper has well-defined
3144    /// owned-buffer semantics that external callers can rely on.
3145    ///
3146    /// Returns a `Vec<CudaBuffer>` of length `canonical.len()` (3
3147    /// for triangle, 4 for 4-cycle). Slot 0 is the leader; slots
3148    /// 1.. follow `var_order.lookup_perms[i].input_idx` mapping.
3149    /// Triangle non-default leaders may col-swap selected slots
3150    /// per the locked permutation table; 4-cycle is rotation-only
3151    /// and rejects swap requests with a kernel error.
3152    ///
3153    /// Each returned `CudaBuffer` is owned: swapped slots are
3154    /// DtoD-copied via `wcoj_project_2col_swap_recorded`; non-
3155    /// swapped slots use the double-swap clone path below to give
3156    /// every slot a uniform owned-buffer return type.
3157    ///
3158    /// **Lifetime contract**: returned buffers are independent of
3159    /// `canonical[*]`. Callers may pass references through to
3160    /// `wcoj_layout_*_recorded` without aliasing concerns.
3161    pub fn prepare_leader_inputs(
3162        &self,
3163        canonical: &[&CudaBuffer],
3164        var_order: &VariableOrder,
3165        launch_stream: StreamId,
3166    ) -> Result<Vec<CudaBuffer>> {
3167        let n = canonical.len();
3168        if !(n == 3 || n == 4) {
3169            return Err(xlog_core::XlogError::Kernel(format!(
3170                "prepare_leader_inputs: canonical inputs must be 3 (triangle) or 4 (4-cycle), got {n}"
3171            )));
3172        }
3173        let leader_idx = var_order.leader_idx as usize;
3174        if leader_idx >= n {
3175            return Err(xlog_core::XlogError::Kernel(format!(
3176                "prepare_leader_inputs: leader_idx {leader_idx} out of range for arity {n}"
3177            )));
3178        }
3179        if var_order.lookup_perms.len() != n - 1 {
3180            return Err(xlog_core::XlogError::Kernel(format!(
3181                "prepare_leader_inputs: lookup_perms.len() = {} must equal {} (arity - 1)",
3182                var_order.lookup_perms.len(),
3183                n - 1
3184            )));
3185        }
3186        for (slot, lp) in var_order.lookup_perms.iter().enumerate() {
3187            let input_idx = lp.input_idx as usize;
3188            if input_idx >= n {
3189                return Err(xlog_core::XlogError::Kernel(format!(
3190                    "prepare_leader_inputs: lookup_perms[{slot}].input_idx {input_idx} out of range for arity {n}"
3191                )));
3192            }
3193        }
3194        // 4-cycle defense: no col-swaps allowed (locked table).
3195        if n == 4 {
3196            for lp in &var_order.lookup_perms {
3197                if lp.swap_cols {
3198                    return Err(xlog_core::XlogError::Kernel(
3199                        "prepare_leader_inputs: 4-cycle does not support col-swaps".to_string(),
3200                    ));
3201                }
3202            }
3203        }
3204
3205        // Slot 0: clone the leader via the swap helper called twice
3206        // (cancels out → owned pass-through). The simpler path for
3207        // production is just passing `canonical[leader_idx]` by
3208        // reference, but since the production callers consume the
3209        // returned `Vec<CudaBuffer>` by index, we materialize an
3210        // owned copy. Triangle leaders never have swap_cols on
3211        // their own slot; we use `wcoj_project_2col_swap_recorded`
3212        // twice to produce an owned copy with identical layout.
3213        //
3214        // For clarity and to avoid the extra DtoD: leader slot 0 is
3215        // produced by single swap-twice, lookups by either single
3216        // swap (when swap_cols) or single swap-twice (when not).
3217        //
3218        // Cost: one extra DtoD copy per slot vs. the previous
3219        // inline-references implementation. The leader-ordered path is opt-in,
3220        // and the DtoD overhead is small relative to the layout + kernel cost.
3221        let mut slots: Vec<CudaBuffer> = Vec::with_capacity(n);
3222        // Slot 0 = leader, no swap.
3223        slots.push(self.clone_buffer_via_swap(canonical[leader_idx], launch_stream)?);
3224        for lp in &var_order.lookup_perms {
3225            let src = canonical[lp.input_idx as usize];
3226            let buf = if lp.swap_cols {
3227                self.provider
3228                    .wcoj_project_2col_swap_recorded(src, launch_stream)?
3229            } else {
3230                self.clone_buffer_via_swap(src, launch_stream)?
3231            };
3232            slots.push(buf);
3233        }
3234        Ok(slots)
3235    }
3236
3237    /// Clone a 2-col `CudaBuffer` via a double-swap through the
3238    /// existing recorded helper. Two swaps cancel — the result is a
3239    /// fresh owned buffer with the same column order, schema, and
3240    /// content as `src`. Used by `prepare_leader_inputs` to give
3241    /// every slot a uniform owned-buffer return type.
3242    fn clone_buffer_via_swap(
3243        &self,
3244        src: &CudaBuffer,
3245        launch_stream: StreamId,
3246    ) -> Result<CudaBuffer> {
3247        let once = self
3248            .provider
3249            .wcoj_project_2col_swap_recorded(src, launch_stream)?;
3250        self.provider
3251            .wcoj_project_2col_swap_recorded(&once, launch_stream)
3252    }
3253
3254    /// Resolve the cached WCOJ launch stream, lazily initializing
3255    /// it on first call by acquiring one stream from the runtime
3256    /// pool. Subsequent calls reuse the same stream — mirrors
3257    /// [`xlog_cuda::CudaKernelProvider::recorded_op_stream`]
3258    /// (provider/mod.rs).
3259    ///
3260    /// **Shared across WCOJ shapes**: triangle
3261    /// and 4-cycle dispatch both go through this resolver and
3262    /// reuse the same stream. Renamed from
3263    /// `wcoj_triangle_stream_or_init` when 4-cycle dispatch
3264    /// landed.
3265    ///
3266    /// Returns `None` only when (a) the manager has no runtime,
3267    /// or (b) the very first acquisition fails (pool already
3268    /// at cap from other consumers). After that first success
3269    /// the cached id keeps resolving for the executor's lifetime.
3270    pub fn wcoj_dispatch_stream_or_init(&self) -> Option<StreamId> {
3271        if let Some(s) = self.wcoj_dispatch_stream.get() {
3272            return Some(*s);
3273        }
3274        let runtime = self.provider.memory().runtime()?;
3275        let stream = runtime.stream_pool().acquire().ok()?;
3276        let _ = self.wcoj_dispatch_stream.set(stream);
3277        self.wcoj_dispatch_stream.get().copied()
3278    }
3279}
3280
3281// ===============================================================
3282// K-clique dispatch (k = 5..8).
3283//
3284// Default-dispatch on shape match. No force / kill / adaptive
3285// knobs.
3286// Silent fallback to MultiWayJoin.fallback on dispatcher decline
3287// or kernel error.
3288//
3289// Counter accessors are public so xlog-integration
3290// certs can assert across the crate boundary.
3291// ===============================================================
3292
3293impl Executor {
3294    /// Number of times the WCOJ k=5-clique hook produced a
3295    /// result and the executor installed it. Counter does NOT
3296    /// advance on dispatcher decline / kernel-launch failure
3297    /// (silent fallback to `MultiWayJoin.fallback`).
3298    pub fn wcoj_clique5_dispatch_count(&self) -> u64 {
3299        self.wcoj_clique5_dispatch_count
3300    }
3301
3302    /// Number of times the WCOJ k=6-clique hook produced
3303    /// a result. Same observability contract as
3304    /// `wcoj_clique5_dispatch_count`.
3305    pub fn wcoj_clique6_dispatch_count(&self) -> u64 {
3306        self.wcoj_clique6_dispatch_count
3307    }
3308
3309    /// Number of times the WCOJ k=7-clique hook produced
3310    /// a result. Same observability contract as
3311    /// `wcoj_clique5_dispatch_count`.
3312    pub fn wcoj_clique7_dispatch_count(&self) -> u64 {
3313        self.wcoj_clique7_dispatch_count
3314    }
3315
3316    /// Number of times the WCOJ k=8-clique hook produced
3317    /// a result. Same observability contract as
3318    /// `wcoj_clique5_dispatch_count`.
3319    pub fn wcoj_clique8_dispatch_count(&self) -> u64 {
3320        self.wcoj_clique8_dispatch_count
3321    }
3322
3323    /// Number of recursive merge
3324    /// boundaries where K-clique metadata was marked for refresh.
3325    pub fn kclique_histogram_refresh_count(&self) -> u64 {
3326        self.kclique_histogram_refresh_count
3327    }
3328
3329    /// Cumulative recursive K-clique metadata refresh accounting
3330    /// time in nanoseconds.
3331    pub fn kclique_histogram_refresh_nanos(&self) -> u128 {
3332        self.kclique_histogram_refresh_nanos
3333    }
3334
3335    /// Try k=5-clique dispatch. Wrapper for rule-keyed
3336    /// callers (recursive engine + non-recursive scc).
3337    pub(super) fn try_dispatch_wcoj_clique5(
3338        &mut self,
3339        rule: &CompiledRule,
3340    ) -> Result<Option<CudaBuffer>> {
3341        self.try_dispatch_wcoj_clique5_on_body(&rule.body)
3342    }
3343
3344    /// Try k=6-clique dispatch.
3345    pub(super) fn try_dispatch_wcoj_clique6(
3346        &mut self,
3347        rule: &CompiledRule,
3348    ) -> Result<Option<CudaBuffer>> {
3349        self.try_dispatch_wcoj_clique6_on_body(&rule.body)
3350    }
3351
3352    /// Try k=7-clique dispatch.
3353    pub(super) fn try_dispatch_wcoj_clique7(
3354        &mut self,
3355        rule: &CompiledRule,
3356    ) -> Result<Option<CudaBuffer>> {
3357        self.try_dispatch_wcoj_clique7_on_body(&rule.body)
3358    }
3359
3360    /// Try k=8-clique dispatch.
3361    pub(super) fn try_dispatch_wcoj_clique8(
3362        &mut self,
3363        rule: &CompiledRule,
3364    ) -> Result<Option<CudaBuffer>> {
3365        self.try_dispatch_wcoj_clique8_on_body(&rule.body)
3366    }
3367
3368    /// Body-keyed k=5-clique dispatch.
3369    pub(super) fn try_dispatch_wcoj_clique5_on_body(
3370        &mut self,
3371        body: &RirNode,
3372    ) -> Result<Option<CudaBuffer>> {
3373        self.try_dispatch_wcoj_clique_k_on_body(body, 5)
3374    }
3375
3376    /// Body-keyed k=6-clique dispatch.
3377    pub(super) fn try_dispatch_wcoj_clique6_on_body(
3378        &mut self,
3379        body: &RirNode,
3380    ) -> Result<Option<CudaBuffer>> {
3381        self.try_dispatch_wcoj_clique_k_on_body(body, 6)
3382    }
3383
3384    /// Body-keyed k=7-clique dispatch.
3385    pub(super) fn try_dispatch_wcoj_clique7_on_body(
3386        &mut self,
3387        body: &RirNode,
3388    ) -> Result<Option<CudaBuffer>> {
3389        self.try_dispatch_wcoj_clique_k_on_body(body, 7)
3390    }
3391
3392    /// Body-keyed k=8-clique dispatch.
3393    pub(super) fn try_dispatch_wcoj_clique8_on_body(
3394        &mut self,
3395        body: &RirNode,
3396    ) -> Result<Option<CudaBuffer>> {
3397        self.try_dispatch_wcoj_clique_k_on_body(body, 8)
3398    }
3399
3400    /// Generic K-clique dispatch shared by k=5..8
3401    /// entries. Returns `Ok(Some(buffer))` on dispatch;
3402    /// `Ok(None)` on decline / fallback.
3403    fn try_dispatch_wcoj_clique_k_on_body(
3404        &mut self,
3405        body: &RirNode,
3406        k: usize,
3407    ) -> Result<Option<CudaBuffer>> {
3408        let expected_edges = k * (k - 1) / 2;
3409        // 1. Shape match: MultiWayJoin with inputs.len() == C(k, 2).
3410        let RirNode::MultiWayJoin {
3411            inputs,
3412            plan,
3413            var_order,
3414            ..
3415        } = body
3416        else {
3417            return Ok(None);
3418        };
3419        if matches!(plan, Some(MultiwayPlan::PlannedHashRoute { .. })) {
3420            return Ok(None);
3421        }
3422        if inputs.len() != expected_edges {
3423            return Ok(None);
3424        }
3425        let kclique = match var_order.as_ref().and_then(|order| order.kclique.as_ref()) {
3426            Some(plan) if usize::from(plan.k) == k => plan,
3427            _ => return Ok(None),
3428        };
3429        // 2. Extract RelIds from each input (must all be Scans).
3430        let mut rel_ids: Vec<RelId> = Vec::with_capacity(expected_edges);
3431        for input in inputs {
3432            let RirNode::Scan { rel } = input else {
3433                return Ok(None);
3434            };
3435            rel_ids.push(*rel);
3436        }
3437        // 3. Resolve each rel to a buffer in the relation store.
3438        let mut raw_bufs: Vec<&CudaBuffer> = Vec::with_capacity(expected_edges);
3439        for rid in &rel_ids {
3440            let name = match self.rel_names.get(rid) {
3441                Some(n) => n.clone(),
3442                None => return Ok(None),
3443            };
3444            match self.store.get(&name) {
3445                Some(b) => raw_bufs.push(b),
3446                None => return Ok(None),
3447            }
3448        }
3449        // 4. Acquire dispatch stream.
3450        let launch_stream = match self.wcoj_dispatch_stream_or_init() {
3451            Some(s) => s,
3452            None => return Ok(None),
3453        };
3454        // 5. Determine width-class from the first edge's column 0.
3455        // All edges must share the width-class; provider entries
3456        // re-validate.
3457        let first_ty = match raw_bufs[0].schema.column_type(0) {
3458            Some(t) => t,
3459            None => return Ok(None),
3460        };
3461        let is_u64 = matches!(first_ty, xlog_core::ScalarType::U64);
3462        let is_4byte = matches!(
3463            first_ty,
3464            xlog_core::ScalarType::U32 | xlog_core::ScalarType::Symbol
3465        );
3466        if !is_u64 && !is_4byte {
3467            return Ok(None);
3468        }
3469        let Some(plan_params) = kclique_dispatch_params(kclique, k) else {
3470            return Ok(None);
3471        };
3472        let head_schema = match build_kclique_head_schema(&raw_bufs, k) {
3473            Some(schema) => schema,
3474            None => return Ok(None),
3475        };
3476        let output_perm = match kclique_output_perm(kclique, k) {
3477            Some(perm) => perm,
3478            None => return Ok(None),
3479        };
3480        // 6. Orient edges according to KCliqueVariableOrder, then
3481        // layout only the plan-required physical slots through the
3482        // generic layout-sort helper. Remaining 2-column slots use
3483        // the narrower WCOJ layout entry, which preserves correctness
3484        // and can take the sorted-unique fast path.
3485        let laid_out = match self.orient_and_layout_kclique_edges(
3486            &raw_bufs,
3487            &plan_params,
3488            is_u64,
3489            launch_stream,
3490        ) {
3491            Ok(bufs) => bufs,
3492            Err(err) => {
3493                return wcoj_decline_on_error(
3494                    &mut self.wcoj_error_decline_count,
3495                    "k-clique-layout",
3496                    err,
3497                )
3498            }
3499        };
3500        // 7. Build the slice of buffer references the provider
3501        // expects.
3502        let edge_refs: Vec<&CudaBuffer> = laid_out.iter().collect();
3503        // 8. Dispatch the appropriate provider entry.
3504        let result = match (k, is_u64) {
3505            (5, false) => {
3506                let arr: &[&CudaBuffer; 10] = match edge_refs.as_slice().try_into() {
3507                    Ok(a) => a,
3508                    Err(_) => return Ok(None),
3509                };
3510                self.provider.wcoj_clique5_u32_recorded_planned(
3511                    arr,
3512                    plan_params.leader_edge_idx,
3513                    &plan_params.edge_order,
3514                    &plan_params.iteration_order,
3515                    launch_stream,
3516                )
3517            }
3518            (5, true) => {
3519                let arr: &[&CudaBuffer; 10] = match edge_refs.as_slice().try_into() {
3520                    Ok(a) => a,
3521                    Err(_) => return Ok(None),
3522                };
3523                self.provider.wcoj_clique5_u64_recorded_planned(
3524                    arr,
3525                    plan_params.leader_edge_idx,
3526                    &plan_params.edge_order,
3527                    &plan_params.iteration_order,
3528                    launch_stream,
3529                )
3530            }
3531            (6, false) => {
3532                let arr: &[&CudaBuffer; 15] = match edge_refs.as_slice().try_into() {
3533                    Ok(a) => a,
3534                    Err(_) => return Ok(None),
3535                };
3536                self.provider.wcoj_clique6_u32_recorded_planned(
3537                    arr,
3538                    plan_params.leader_edge_idx,
3539                    &plan_params.edge_order,
3540                    &plan_params.iteration_order,
3541                    launch_stream,
3542                )
3543            }
3544            (6, true) => {
3545                let arr: &[&CudaBuffer; 15] = match edge_refs.as_slice().try_into() {
3546                    Ok(a) => a,
3547                    Err(_) => return Ok(None),
3548                };
3549                self.provider.wcoj_clique6_u64_recorded_planned(
3550                    arr,
3551                    plan_params.leader_edge_idx,
3552                    &plan_params.edge_order,
3553                    &plan_params.iteration_order,
3554                    launch_stream,
3555                )
3556            }
3557            (7, false) => {
3558                let arr: &[&CudaBuffer; 21] = match edge_refs.as_slice().try_into() {
3559                    Ok(a) => a,
3560                    Err(_) => return Ok(None),
3561                };
3562                self.provider.wcoj_clique7_u32_recorded_planned(
3563                    arr,
3564                    plan_params.leader_edge_idx,
3565                    &plan_params.edge_order,
3566                    &plan_params.iteration_order,
3567                    launch_stream,
3568                )
3569            }
3570            (7, true) => {
3571                let arr: &[&CudaBuffer; 21] = match edge_refs.as_slice().try_into() {
3572                    Ok(a) => a,
3573                    Err(_) => return Ok(None),
3574                };
3575                self.provider.wcoj_clique7_u64_recorded_planned(
3576                    arr,
3577                    plan_params.leader_edge_idx,
3578                    &plan_params.edge_order,
3579                    &plan_params.iteration_order,
3580                    launch_stream,
3581                )
3582            }
3583            (8, false) => {
3584                let arr: &[&CudaBuffer; 28] = match edge_refs.as_slice().try_into() {
3585                    Ok(a) => a,
3586                    Err(_) => return Ok(None),
3587                };
3588                self.provider.wcoj_clique8_u32_recorded_planned(
3589                    arr,
3590                    plan_params.leader_edge_idx,
3591                    &plan_params.edge_order,
3592                    &plan_params.iteration_order,
3593                    launch_stream,
3594                )
3595            }
3596            (8, true) => {
3597                let arr: &[&CudaBuffer; 28] = match edge_refs.as_slice().try_into() {
3598                    Ok(a) => a,
3599                    Err(_) => return Ok(None),
3600                };
3601                self.provider.wcoj_clique8_u64_recorded_planned(
3602                    arr,
3603                    plan_params.leader_edge_idx,
3604                    &plan_params.edge_order,
3605                    &plan_params.iteration_order,
3606                    launch_stream,
3607                )
3608            }
3609            _ => return Ok(None),
3610        };
3611        // 9. On success: counter++, return Some. On error:
3612        // silent fallback (no counter advance).
3613        match result {
3614            Ok(buf) => {
3615                let buf = if output_perm.iter().copied().eq(0..output_perm.len()) {
3616                    buf
3617                } else {
3618                    self.provider.wcoj_project_output_columns_recorded(
3619                        &buf,
3620                        &output_perm,
3621                        head_schema,
3622                        launch_stream,
3623                    )?
3624                };
3625                match k {
3626                    5 => self.wcoj_clique5_dispatch_count += 1,
3627                    6 => self.wcoj_clique6_dispatch_count += 1,
3628                    7 => self.wcoj_clique7_dispatch_count += 1,
3629                    8 => self.wcoj_clique8_dispatch_count += 1,
3630                    _ => {}
3631                }
3632                Ok(Some(buf))
3633            }
3634            Err(err) => wcoj_decline_on_error(&mut self.wcoj_error_decline_count, "k-clique", err),
3635        }
3636    }
3637
3638    /// Orient edges according to a `KCliqueVariableOrder` (edge
3639    /// permutation + column swaps), then layout the plan-required
3640    /// physical slots through the generic layout-sort helper and the
3641    /// remaining 2-column slots through the narrower WCOJ layout entry
3642    /// (which preserves correctness and can take the sorted-unique fast
3643    /// path). Shared by the unfused K-clique dispatch and the fused
3644    /// count-by-root dispatch; callers wrap errors through
3645    /// [`wcoj_decline_on_error`].
3646    fn orient_and_layout_kclique_edges(
3647        &self,
3648        raw_bufs: &[&CudaBuffer],
3649        plan_params: &KCliqueDispatchParams,
3650        is_u64: bool,
3651        launch_stream: StreamId,
3652    ) -> Result<Vec<CudaBuffer>> {
3653        let mut laid_out: Vec<CudaBuffer> = Vec::with_capacity(plan_params.edge_permutation.len());
3654        for (slot, &input_idx) in plan_params.edge_permutation.iter().enumerate() {
3655            let src = raw_bufs[input_idx];
3656            let swapped = if plan_params.swap_slots.contains(&slot) {
3657                Some(
3658                    self.provider
3659                        .wcoj_project_2col_swap_recorded(src, launch_stream)?,
3660                )
3661            } else {
3662                None
3663            };
3664            let oriented = swapped.as_ref().unwrap_or(src);
3665            let res = if plan_params.required_sort_slots.contains(&slot) {
3666                if is_u64 {
3667                    self.provider
3668                        .wcoj_layout_sort_u64_recorded(oriented, launch_stream)
3669                } else {
3670                    self.provider
3671                        .wcoj_layout_sort_u32_recorded(oriented, launch_stream)
3672                }
3673            } else if is_u64 {
3674                self.provider
3675                    .wcoj_layout_u64_recorded(oriented, launch_stream)
3676            } else {
3677                self.provider
3678                    .wcoj_layout_u32_recorded(oriented, launch_stream)
3679            };
3680            laid_out.push(res?);
3681        }
3682        Ok(laid_out)
3683    }
3684
3685    /// Aggregate-fused WCOJ, K-clique count (K = 5, 6; 4-byte keys):
3686    /// dispatch the inner `MultiWayJoin(K-clique)` of a count-by-root
3687    /// aggregate through the fused group-by-root kernel, which never
3688    /// materializes the clique rows.
3689    ///
3690    /// CAREFUL — the root under `KCliqueVariableOrder` is plan-dependent
3691    /// (`variable_order[0]` + leader-edge orientation/swaps determine the
3692    /// physical root column). The fusion is sound only when the GroupBy
3693    /// key column references the head variable whose planned position is
3694    /// 0 (`variable_positions[r] == 0`); everything else declines
3695    /// silently to the embedded fallback + groupby path. K = 7/8 (no
3696    /// fused kernels), u64/mixed widths, planned-hash routes, and
3697    /// missing buffers/runtime also decline. Kill switch
3698    /// (`XLOG_DISABLE_WCOJ_GROUPBY_FUSION`) is checked by the caller.
3699    /// Pipeline errors route through [`wcoj_decline_on_error`] (counted;
3700    /// `XLOG_WCOJ_STRICT=1` propagates).
3701    fn try_dispatch_wcoj_groupby_root_count_clique(
3702        &mut self,
3703        multiway: &RirNode,
3704        group_cols: &[ProjectExpr],
3705    ) -> Result<Option<CudaBuffer>> {
3706        let RirNode::MultiWayJoin {
3707            inputs,
3708            plan,
3709            var_order,
3710            ..
3711        } = multiway
3712        else {
3713            return Ok(None);
3714        };
3715        if matches!(plan, Some(MultiwayPlan::PlannedHashRoute { .. })) {
3716            return Ok(None);
3717        }
3718        let kclique = match var_order.as_ref().and_then(|order| order.kclique.as_ref()) {
3719            Some(plan) => plan,
3720            None => return Ok(None),
3721        };
3722        let k = usize::from(kclique.k);
3723        if !matches!(k, 5 | 6) {
3724            return Ok(None);
3725        }
3726        let expected_edges = k * (k - 1) / 2;
3727        if inputs.len() != expected_edges {
3728            return Ok(None);
3729        }
3730        // Group key must be the planned position-0 root variable.
3731        let Some(ProjectExpr::Column(root_var)) = group_cols.first() else {
3732            return Ok(None);
3733        };
3734        let Some(positions) = live_kclique_variable_positions(kclique, k) else {
3735            return Ok(None);
3736        };
3737        if *root_var >= k || positions[*root_var] != 0 {
3738            return Ok(None);
3739        }
3740        // Resolve scans → buffers; only uniform 4-byte keys are fused.
3741        let mut rel_ids: Vec<RelId> = Vec::with_capacity(expected_edges);
3742        for input in inputs {
3743            let RirNode::Scan { rel } = input else {
3744                return Ok(None);
3745            };
3746            rel_ids.push(*rel);
3747        }
3748        let mut raw_bufs: Vec<&CudaBuffer> = Vec::with_capacity(expected_edges);
3749        for rid in &rel_ids {
3750            let name = match self.rel_names.get(rid) {
3751                Some(n) => n.clone(),
3752                None => return Ok(None),
3753            };
3754            match self.store.get(&name) {
3755                Some(b) => raw_bufs.push(b),
3756                None => return Ok(None),
3757            }
3758        }
3759        for buf in &raw_bufs {
3760            if classify_two_col_wcoj_width(buf) != Some(WcojKeyWidth::FourByte) {
3761                return Ok(None);
3762            }
3763        }
3764        if self.provider.memory().runtime().is_none() {
3765            return Ok(None);
3766        }
3767        let Some(launch_stream) = self.wcoj_dispatch_stream_or_init() else {
3768            return Ok(None);
3769        };
3770        let Some(plan_params) = kclique_dispatch_params(kclique, k) else {
3771            return Ok(None);
3772        };
3773        let laid_out = match self.orient_and_layout_kclique_edges(
3774            &raw_bufs,
3775            &plan_params,
3776            false,
3777            launch_stream,
3778        ) {
3779            Ok(bufs) => bufs,
3780            Err(err) => {
3781                return wcoj_decline_on_error(
3782                    &mut self.wcoj_error_decline_count,
3783                    "groupby-fusion-clique-layout",
3784                    err,
3785                )
3786            }
3787        };
3788        let edge_refs: Vec<&CudaBuffer> = laid_out.iter().collect();
3789        let result = match k {
3790            5 => {
3791                let arr: &[&CudaBuffer; 10] = match edge_refs.as_slice().try_into() {
3792                    Ok(a) => a,
3793                    Err(_) => return Ok(None),
3794                };
3795                self.provider
3796                    .wcoj_clique5_groupby_root_count_u32_recorded_planned(
3797                        arr,
3798                        plan_params.leader_edge_idx,
3799                        &plan_params.edge_order,
3800                        &plan_params.iteration_order,
3801                        launch_stream,
3802                    )
3803            }
3804            _ => {
3805                let arr: &[&CudaBuffer; 15] = match edge_refs.as_slice().try_into() {
3806                    Ok(a) => a,
3807                    Err(_) => return Ok(None),
3808                };
3809                self.provider
3810                    .wcoj_clique6_groupby_root_count_u32_recorded_planned(
3811                        arr,
3812                        plan_params.leader_edge_idx,
3813                        &plan_params.edge_order,
3814                        &plan_params.iteration_order,
3815                        launch_stream,
3816                    )
3817            }
3818        };
3819        match result {
3820            Ok(buf) => {
3821                self.wcoj_groupby_fusion_dispatch_count += 1;
3822                Ok(Some(buf))
3823            }
3824            Err(err) => wcoj_decline_on_error(
3825                &mut self.wcoj_error_decline_count,
3826                "groupby-fusion-clique",
3827                err,
3828            ),
3829        }
3830    }
3831}
3832
3833#[derive(Debug)]
3834struct KCliqueDispatchParams {
3835    edge_permutation: Vec<usize>,
3836    edge_order: Vec<u8>,
3837    iteration_order: Vec<u8>,
3838    leader_edge_idx: u32,
3839    swap_slots: HashSet<usize>,
3840    required_sort_slots: HashSet<usize>,
3841}
3842
3843fn kclique_dispatch_params(plan: &KCliqueVariableOrder, k: usize) -> Option<KCliqueDispatchParams> {
3844    let expected_edges = k * (k - 1) / 2;
3845    let edge_permutation = live_kclique_edge_permutation(plan, expected_edges)?;
3846    let positions = live_kclique_variable_positions(plan, k)?;
3847    let mut edge_order = vec![u8::MAX; expected_edges];
3848
3849    for (slot, &edge_idx) in edge_permutation.iter().enumerate() {
3850        let (left, right) = clique_edge_pair(edge_idx, k)?;
3851        let left_pos = positions[left];
3852        let right_pos = positions[right];
3853        let logical_edge =
3854            clique_edge_idx_runtime(left_pos.min(right_pos), left_pos.max(right_pos), k)?;
3855        edge_order[logical_edge] = u8::try_from(slot).ok()?;
3856    }
3857    if edge_order.contains(&u8::MAX) {
3858        return None;
3859    }
3860    let leader_edge_idx = u32::from(edge_order[clique_edge_idx_runtime(0, 1, k)?]);
3861    let iteration_order: Vec<u8> = (0..k)
3862        .map(|idx| u8::try_from(idx).ok())
3863        .collect::<Option<_>>()?;
3864
3865    let swap_slots: HashSet<usize> = plan
3866        .column_swaps
3867        .iter()
3868        .filter(|swap| swap.swap_cols)
3869        .map(|swap| usize::from(swap.edge_slot))
3870        .collect();
3871    if swap_slots.iter().any(|slot| *slot >= expected_edges) {
3872        return None;
3873    }
3874    let required_sort_slots: HashSet<usize> = plan
3875        .sorted_layout_requirements
3876        .edge_slots
3877        .iter()
3878        .copied()
3879        .map(usize::from)
3880        .collect();
3881    if required_sort_slots
3882        .iter()
3883        .any(|slot| *slot >= expected_edges)
3884    {
3885        return None;
3886    }
3887
3888    Some(KCliqueDispatchParams {
3889        edge_permutation,
3890        edge_order,
3891        iteration_order,
3892        leader_edge_idx,
3893        swap_slots,
3894        required_sort_slots,
3895    })
3896}
3897
3898fn live_kclique_edge_permutation(
3899    plan: &KCliqueVariableOrder,
3900    expected_edges: usize,
3901) -> Option<Vec<usize>> {
3902    let values: Vec<usize> = plan
3903        .edge_permutation
3904        .iter()
3905        .copied()
3906        .take_while(|value| *value != u8::MAX)
3907        .map(usize::from)
3908        .collect();
3909    if values.len() != expected_edges {
3910        return None;
3911    }
3912    let mut seen = vec![false; expected_edges];
3913    for &value in &values {
3914        if value >= expected_edges || seen[value] {
3915            return None;
3916        }
3917        seen[value] = true;
3918    }
3919    Some(values)
3920}
3921
3922fn live_kclique_variable_positions(plan: &KCliqueVariableOrder, k: usize) -> Option<Vec<usize>> {
3923    let mut positions = Vec::with_capacity(k);
3924    let mut seen = vec![false; k];
3925    for original_var in 0..k {
3926        let pos = usize::from(*plan.variable_positions.get(original_var)?);
3927        if pos >= k || seen[pos] {
3928            return None;
3929        }
3930        seen[pos] = true;
3931        positions.push(pos);
3932    }
3933    Some(positions)
3934}
3935
3936fn clique_edge_idx_runtime(i: usize, j: usize, k: usize) -> Option<usize> {
3937    if !(i < j && j < k) {
3938        return None;
3939    }
3940    Some(i * (k - 1) - i.saturating_sub(1) * i / 2 + (j - i - 1))
3941}
3942
3943fn clique_edge_pair(edge_idx: usize, k: usize) -> Option<(usize, usize)> {
3944    let mut idx = 0usize;
3945    for i in 0..k {
3946        for j in (i + 1)..k {
3947            if idx == edge_idx {
3948                return Some((i, j));
3949            }
3950            idx += 1;
3951        }
3952    }
3953    None
3954}
3955
3956fn build_kclique_head_schema(raw_bufs: &[&CudaBuffer], k: usize) -> Option<Schema> {
3957    let mut columns = Vec::with_capacity(k);
3958    for variable in 0..k {
3959        let (edge_idx, col_idx) = if variable == 0 {
3960            (clique_edge_idx_runtime(0, 1, k)?, 0)
3961        } else {
3962            (clique_edge_idx_runtime(0, variable, k)?, 1)
3963        };
3964        let ty = raw_bufs.get(edge_idx)?.schema.column_type(col_idx)?;
3965        columns.push((format!("col{}", variable), ty));
3966    }
3967    Some(Schema::new(columns))
3968}
3969
3970fn kclique_output_perm(plan: &KCliqueVariableOrder, k: usize) -> Option<Vec<usize>> {
3971    let positions = live_kclique_variable_positions(plan, k)?;
3972    Some(positions)
3973}
3974
3975#[cfg(test)]
3976mod tests {
3977    use std::sync::{Mutex, OnceLock};
3978
3979    use super::{
3980        chain_dispatch_enabled, match_chain_join, match_multiway_triangle, wcoj_adaptive_enabled,
3981        wcoj_gate_enabled, ENV_USE_WCOJ_TRIANGLE_U32, ENV_WCOJ_CHAIN_ENABLE,
3982    };
3983    use xlog_core::RelId;
3984    use xlog_ir::rir::ProjectExpr;
3985    use xlog_ir::RirNode;
3986
3987    fn canonical_multiway() -> RirNode {
3988        RirNode::MultiWayJoin {
3989            inputs: vec![
3990                RirNode::Scan { rel: RelId(1) },
3991                RirNode::Scan { rel: RelId(2) },
3992                RirNode::Scan { rel: RelId(3) },
3993            ],
3994            slot_vars: vec![
3995                vec![Some(0u32), Some(1)],
3996                vec![Some(1u32), Some(2)],
3997                vec![Some(0u32), Some(2)],
3998            ],
3999            output_columns: vec![
4000                ProjectExpr::Column(0),
4001                ProjectExpr::Column(1),
4002                ProjectExpr::Column(3),
4003            ],
4004            fallback: Box::new(RirNode::Unit),
4005            plan: None,
4006            var_order: None,
4007        }
4008    }
4009
4010    fn canonical_chain_join() -> RirNode {
4011        RirNode::ChainJoin {
4012            left: Box::new(RirNode::Scan { rel: RelId(1) }),
4013            right: Box::new(RirNode::Scan { rel: RelId(2) }),
4014            left_key: 1,
4015            right_key: 0,
4016            output_columns: vec![ProjectExpr::Column(0), ProjectExpr::Column(3)],
4017            fallback: Box::new(RirNode::Unit),
4018        }
4019    }
4020
4021    #[test]
4022    fn match_chain_returns_two_rels_and_keys() {
4023        let node = canonical_chain_join();
4024        let m = match_chain_join(&node).expect("must match canonical chain");
4025        assert_eq!(m.rel_left, RelId(1));
4026        assert_eq!(m.rel_right, RelId(2));
4027        assert_eq!(m.left_key, 1);
4028        assert_eq!(m.right_key, 0);
4029        assert_eq!(
4030            m.output_columns,
4031            vec![ProjectExpr::Column(0), ProjectExpr::Column(3)]
4032        );
4033    }
4034
4035    #[test]
4036    fn match_chain_rejects_non_scan_inputs() {
4037        let mut node = canonical_chain_join();
4038        if let RirNode::ChainJoin { left, .. } = &mut node {
4039            **left = RirNode::Unit;
4040        }
4041        assert!(match_chain_join(&node).is_none());
4042    }
4043
4044    #[test]
4045    fn match_chain_rejects_multiway_triangle() {
4046        let node = canonical_multiway();
4047        assert!(match_chain_join(&node).is_none());
4048    }
4049
4050    #[test]
4051    fn chain_dispatch_env_defaults_on_and_can_disable() {
4052        static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4053        let _guard = ENV_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
4054        let old = std::env::var(ENV_WCOJ_CHAIN_ENABLE).ok();
4055        // SAFETY: This test holds a local mutex while mutating the
4056        // process-global chain-dispatch env var, and restores it before unlock.
4057        unsafe {
4058            std::env::remove_var(ENV_WCOJ_CHAIN_ENABLE);
4059        }
4060        assert!(chain_dispatch_enabled());
4061        unsafe {
4062            std::env::set_var(ENV_WCOJ_CHAIN_ENABLE, "0");
4063        }
4064        assert!(!chain_dispatch_enabled());
4065        unsafe {
4066            std::env::set_var(ENV_WCOJ_CHAIN_ENABLE, "false");
4067        }
4068        assert!(!chain_dispatch_enabled());
4069        unsafe {
4070            std::env::set_var(ENV_WCOJ_CHAIN_ENABLE, "1");
4071        }
4072        assert!(chain_dispatch_enabled());
4073        unsafe {
4074            match old {
4075                Some(v) => std::env::set_var(ENV_WCOJ_CHAIN_ENABLE, v),
4076                None => std::env::remove_var(ENV_WCOJ_CHAIN_ENABLE),
4077            }
4078        }
4079    }
4080
4081    #[test]
4082    fn match_canonical_returns_three_rels() {
4083        let node = canonical_multiway();
4084        let m = match_multiway_triangle(&node).expect("must match canonical triangle");
4085        assert_eq!(m.rel_xy, RelId(1));
4086        assert_eq!(m.rel_yz, RelId(2));
4087        assert_eq!(m.rel_xz, RelId(3));
4088    }
4089
4090    #[test]
4091    fn match_rejects_non_multiway_body() {
4092        let node = RirNode::Scan { rel: RelId(1) };
4093        assert!(match_multiway_triangle(&node).is_none());
4094    }
4095
4096    #[test]
4097    fn match_rejects_rotated_output_columns() {
4098        let mut node = canonical_multiway();
4099        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4100            *output_columns = vec![
4101                ProjectExpr::Column(1),
4102                ProjectExpr::Column(0),
4103                ProjectExpr::Column(3),
4104            ];
4105        }
4106        assert!(match_multiway_triangle(&node).is_none());
4107    }
4108
4109    /// Triangle with Z-shared output_columns layout
4110    /// `[Column(0), Column(2), Column(3)]` must match. The
4111    /// matcher's output-column relaxation accepts both
4112    /// `[0, 1, 3]` (Y/X-shared) and `[0, 2, 3]` (Z-shared).
4113    #[test]
4114    fn match_accepts_z_shared_triangle_output_columns() {
4115        let mut node = canonical_multiway();
4116        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4117            *output_columns = vec![
4118                ProjectExpr::Column(0),
4119                ProjectExpr::Column(2),
4120                ProjectExpr::Column(3),
4121            ];
4122        }
4123        let m = match_multiway_triangle(&node)
4124            .expect("matcher must accept the Z-shared output-column layout");
4125        assert_eq!(m.rel_xy, RelId(1));
4126        assert_eq!(m.rel_yz, RelId(2));
4127        assert_eq!(m.rel_xz, RelId(3));
4128    }
4129
4130    /// Triangle output_columns `[Column(0), Column(3), Column(3)]`
4131    /// MUST be rejected — second col must be 1 or 2, not 3.
4132    #[test]
4133    fn match_rejects_invalid_triangle_output_columns() {
4134        let mut node = canonical_multiway();
4135        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4136            *output_columns = vec![
4137                ProjectExpr::Column(0),
4138                ProjectExpr::Column(3),
4139                ProjectExpr::Column(3),
4140            ];
4141        }
4142        assert!(match_multiway_triangle(&node).is_none());
4143    }
4144
4145    #[test]
4146    fn match_rejects_arity_mismatched_output_columns() {
4147        let mut node = canonical_multiway();
4148        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4149            *output_columns = vec![ProjectExpr::Column(0), ProjectExpr::Column(1)];
4150        }
4151        assert!(match_multiway_triangle(&node).is_none());
4152    }
4153
4154    #[test]
4155    fn match_rejects_malformed_slot_vars() {
4156        // [[A,B],[B,C],[A,B]] — last slot is wrong (should be [A,C]).
4157        let mut node = canonical_multiway();
4158        if let RirNode::MultiWayJoin { slot_vars, .. } = &mut node {
4159            *slot_vars = vec![
4160                vec![Some(0u32), Some(1)],
4161                vec![Some(1u32), Some(2)],
4162                vec![Some(0u32), Some(1)],
4163            ];
4164        }
4165        assert!(match_multiway_triangle(&node).is_none());
4166    }
4167
4168    #[test]
4169    fn match_rejects_repeated_var_in_slot() {
4170        let mut node = canonical_multiway();
4171        if let RirNode::MultiWayJoin { slot_vars, .. } = &mut node {
4172            // [[A, A], …] — repeated var in slot 0.
4173            *slot_vars = vec![
4174                vec![Some(0u32), Some(0)],
4175                vec![Some(1u32), Some(2)],
4176                vec![Some(0u32), Some(2)],
4177            ];
4178        }
4179        assert!(match_multiway_triangle(&node).is_none());
4180    }
4181
4182    #[test]
4183    fn match_rejects_non_scan_input() {
4184        let mut node = canonical_multiway();
4185        if let RirNode::MultiWayJoin { inputs, .. } = &mut node {
4186            inputs[0] = RirNode::Unit;
4187        }
4188        assert!(match_multiway_triangle(&node).is_none());
4189    }
4190
4191    #[test]
4192    fn match_rejects_input_arity_mismatch() {
4193        let mut node = canonical_multiway();
4194        if let RirNode::MultiWayJoin { inputs, .. } = &mut node {
4195            inputs.pop();
4196        }
4197        assert!(match_multiway_triangle(&node).is_none());
4198    }
4199
4200    fn env_lock() -> &'static Mutex<()> {
4201        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
4202        LOCK.get_or_init(|| Mutex::new(()))
4203    }
4204
4205    struct EnvSnapshot {
4206        force: Option<String>,
4207    }
4208
4209    impl EnvSnapshot {
4210        fn capture_and_clear() -> Self {
4211            let snapshot = Self {
4212                force: std::env::var(ENV_USE_WCOJ_TRIANGLE_U32).ok(),
4213            };
4214
4215            // SAFETY: The caller holds `env_lock`, serializing mutation of
4216            // this process-global WCOJ env var.
4217            unsafe {
4218                std::env::remove_var(ENV_USE_WCOJ_TRIANGLE_U32);
4219            }
4220
4221            snapshot
4222        }
4223    }
4224
4225    impl Drop for EnvSnapshot {
4226        fn drop(&mut self) {
4227            // SAFETY: The snapshot is dropped before `env_lock` is released,
4228            // so restoration is serialized even if the test body panics.
4229            unsafe {
4230                match self.force.take() {
4231                    Some(v) => std::env::set_var(ENV_USE_WCOJ_TRIANGLE_U32, v),
4232                    None => std::env::remove_var(ENV_USE_WCOJ_TRIANGLE_U32),
4233                }
4234            }
4235        }
4236    }
4237
4238    fn with_wcoj_env<R>(f: impl FnOnce() -> R) -> R {
4239        let _guard = env_lock().lock().expect("WCOJ env lock poisoned");
4240        let _snapshot = EnvSnapshot::capture_and_clear();
4241        f()
4242    }
4243
4244    fn set_env(name: &str, value: &str) {
4245        // SAFETY: Callers are inside `with_wcoj_env`, which serializes and
4246        // restores these process-global WCOJ env vars.
4247        unsafe {
4248            std::env::set_var(name, value);
4249        }
4250    }
4251
4252    #[test]
4253    fn stats_gate_defaults_on_when_env_unset() {
4254        with_wcoj_env(|| {
4255            assert!(wcoj_adaptive_enabled(None));
4256            assert!(wcoj_adaptive_enabled(Some(true)));
4257            assert!(!wcoj_adaptive_enabled(Some(false)));
4258        });
4259    }
4260
4261    #[test]
4262    fn config_controls_stats_gate() {
4263        with_wcoj_env(|| {
4264            assert!(wcoj_adaptive_enabled(Some(true)));
4265            assert!(!wcoj_adaptive_enabled(Some(false)));
4266        });
4267    }
4268
4269    #[test]
4270    fn force_resolver_config_still_overrides_env() {
4271        with_wcoj_env(|| {
4272            set_env(ENV_USE_WCOJ_TRIANGLE_U32, "1");
4273            assert!(wcoj_gate_enabled(None));
4274            assert!(!wcoj_gate_enabled(Some(false)));
4275
4276            set_env(ENV_USE_WCOJ_TRIANGLE_U32, "0");
4277            assert!(!wcoj_gate_enabled(None));
4278            assert!(wcoj_gate_enabled(Some(true)));
4279        });
4280    }
4281
4282    // -------------------------------------------------------------
4283    // WCOJ error-decline observability (counter + XLOG_WCOJ_STRICT).
4284    // -------------------------------------------------------------
4285
4286    #[test]
4287    fn error_decline_counts_and_falls_back_by_default() {
4288        with_wcoj_env(|| {
4289            let mut counter = 0u64;
4290            let err = xlog_core::XlogError::Kernel("synthetic layout failure".to_string());
4291            let out = super::wcoj_decline_on_error(&mut counter, "triangle", err)
4292                .expect("default mode must decline to the binary-join fallback, not error");
4293            assert!(out.is_none(), "decline must hand control to the fallback");
4294            assert_eq!(counter, 1, "every error decline must be counted");
4295        });
4296    }
4297
4298    #[test]
4299    fn error_decline_propagates_under_strict_env() {
4300        with_wcoj_env(|| {
4301            set_env(super::ENV_WCOJ_STRICT, "1");
4302            let mut counter = 0u64;
4303            let err = xlog_core::XlogError::Kernel("synthetic layout failure".to_string());
4304            let out = super::wcoj_decline_on_error(&mut counter, "triangle", err);
4305            // SAFETY: serialized + restored under `with_wcoj_env`'s lock.
4306            unsafe {
4307                std::env::remove_var(super::ENV_WCOJ_STRICT);
4308            }
4309            match out {
4310                Err(err) => assert!(
4311                    err.to_string().contains("synthetic layout failure"),
4312                    "strict mode must surface the original error: {err}"
4313                ),
4314                Ok(_) => panic!("XLOG_WCOJ_STRICT=1 must propagate the pipeline error"),
4315            }
4316            assert_eq!(counter, 1, "strict mode still counts the decline");
4317        });
4318    }
4319
4320    // -------------------------------------------------------------
4321    // 4-cycle env-resolver + matcher tests.
4322    // -------------------------------------------------------------
4323
4324    use super::{
4325        match_multiway_4cycle, wcoj_4cycle_adaptive_enabled, wcoj_4cycle_disabled,
4326        wcoj_4cycle_gate_enabled, ENV_DISABLE_WCOJ_4CYCLE, ENV_USE_WCOJ_4CYCLE,
4327        ENV_USE_WCOJ_4CYCLE_ADAPTIVE,
4328    };
4329
4330    struct EnvSnapshot4Cycle {
4331        force: Option<String>,
4332        adaptive: Option<String>,
4333        disable: Option<String>,
4334    }
4335
4336    impl EnvSnapshot4Cycle {
4337        fn capture_and_clear() -> Self {
4338            let snap = Self {
4339                force: std::env::var(ENV_USE_WCOJ_4CYCLE).ok(),
4340                adaptive: std::env::var(ENV_USE_WCOJ_4CYCLE_ADAPTIVE).ok(),
4341                disable: std::env::var(ENV_DISABLE_WCOJ_4CYCLE).ok(),
4342            };
4343            // SAFETY: caller holds env_lock.
4344            unsafe {
4345                std::env::remove_var(ENV_USE_WCOJ_4CYCLE);
4346                std::env::remove_var(ENV_USE_WCOJ_4CYCLE_ADAPTIVE);
4347                std::env::remove_var(ENV_DISABLE_WCOJ_4CYCLE);
4348            }
4349            snap
4350        }
4351    }
4352
4353    impl Drop for EnvSnapshot4Cycle {
4354        fn drop(&mut self) {
4355            // SAFETY: caller holds env_lock.
4356            unsafe {
4357                match self.force.take() {
4358                    Some(v) => std::env::set_var(ENV_USE_WCOJ_4CYCLE, v),
4359                    None => std::env::remove_var(ENV_USE_WCOJ_4CYCLE),
4360                }
4361                match self.adaptive.take() {
4362                    Some(v) => std::env::set_var(ENV_USE_WCOJ_4CYCLE_ADAPTIVE, v),
4363                    None => std::env::remove_var(ENV_USE_WCOJ_4CYCLE_ADAPTIVE),
4364                }
4365                match self.disable.take() {
4366                    Some(v) => std::env::set_var(ENV_DISABLE_WCOJ_4CYCLE, v),
4367                    None => std::env::remove_var(ENV_DISABLE_WCOJ_4CYCLE),
4368                }
4369            }
4370        }
4371    }
4372
4373    fn with_4cycle_env<R>(f: impl FnOnce() -> R) -> R {
4374        let _guard = env_lock().lock().expect("4-cycle env lock poisoned");
4375        let _snap = EnvSnapshot4Cycle::capture_and_clear();
4376        f()
4377    }
4378
4379    #[test]
4380    fn force_4cycle_resolver_defaults_off_when_env_unset() {
4381        with_4cycle_env(|| {
4382            assert!(!wcoj_4cycle_gate_enabled(None));
4383            assert!(wcoj_4cycle_gate_enabled(Some(true)));
4384            assert!(!wcoj_4cycle_gate_enabled(Some(false)));
4385        });
4386    }
4387
4388    #[test]
4389    fn force_4cycle_resolver_env_can_enable() {
4390        with_4cycle_env(|| {
4391            set_env(ENV_USE_WCOJ_4CYCLE, "1");
4392            assert!(wcoj_4cycle_gate_enabled(None));
4393            set_env(ENV_USE_WCOJ_4CYCLE, "true");
4394            assert!(wcoj_4cycle_gate_enabled(None));
4395            set_env(ENV_USE_WCOJ_4CYCLE, "0");
4396            assert!(!wcoj_4cycle_gate_enabled(None));
4397        });
4398    }
4399
4400    /// **Locks the 4-cycle adaptive contract**: adaptive opt-in
4401    /// defaults OFF, unlike triangle's default-on. If a future
4402    /// default flips, that change must update this test
4403    /// explicitly with bench evidence.
4404    #[test]
4405    fn adaptive_4cycle_resolver_defaults_off_when_env_unset() {
4406        with_4cycle_env(|| {
4407            assert!(
4408                !wcoj_4cycle_adaptive_enabled(None),
4409                "4-cycle adaptive must be OPT-IN by default (unlike triangle's default-on)"
4410            );
4411            assert!(wcoj_4cycle_adaptive_enabled(Some(true)));
4412            assert!(!wcoj_4cycle_adaptive_enabled(Some(false)));
4413        });
4414    }
4415
4416    #[test]
4417    fn adaptive_4cycle_resolver_env_can_enable() {
4418        with_4cycle_env(|| {
4419            set_env(ENV_USE_WCOJ_4CYCLE_ADAPTIVE, "1");
4420            assert!(wcoj_4cycle_adaptive_enabled(None));
4421            set_env(ENV_USE_WCOJ_4CYCLE_ADAPTIVE, "0");
4422            assert!(!wcoj_4cycle_adaptive_enabled(None));
4423            set_env(ENV_USE_WCOJ_4CYCLE_ADAPTIVE, "true");
4424            assert!(wcoj_4cycle_adaptive_enabled(None));
4425        });
4426    }
4427
4428    #[test]
4429    fn kill_4cycle_resolver_honors_env_and_config() {
4430        with_4cycle_env(|| {
4431            assert!(!wcoj_4cycle_disabled(None));
4432            set_env(ENV_DISABLE_WCOJ_4CYCLE, "1");
4433            assert!(wcoj_4cycle_disabled(None));
4434            assert!(!wcoj_4cycle_disabled(Some(false)));
4435            set_env(ENV_DISABLE_WCOJ_4CYCLE, "0");
4436            assert!(wcoj_4cycle_disabled(Some(true)));
4437        });
4438    }
4439
4440    fn canonical_4cycle_multiway() -> RirNode {
4441        RirNode::MultiWayJoin {
4442            inputs: vec![
4443                RirNode::Scan { rel: RelId(1) },
4444                RirNode::Scan { rel: RelId(2) },
4445                RirNode::Scan { rel: RelId(3) },
4446                RirNode::Scan { rel: RelId(4) },
4447            ],
4448            slot_vars: vec![
4449                vec![Some(0u32), Some(1)],
4450                vec![Some(1u32), Some(2)],
4451                vec![Some(2u32), Some(3)],
4452                vec![Some(3u32), Some(0)],
4453            ],
4454            output_columns: vec![
4455                ProjectExpr::Column(0),
4456                ProjectExpr::Column(1),
4457                ProjectExpr::Column(3),
4458                ProjectExpr::Column(5),
4459            ],
4460            fallback: Box::new(RirNode::Unit),
4461            plan: None,
4462            var_order: None,
4463        }
4464    }
4465
4466    #[test]
4467    fn match_4cycle_canonical_returns_four_rels() {
4468        let node = canonical_4cycle_multiway();
4469        let m = match_multiway_4cycle(&node).expect("must match canonical 4-cycle");
4470        assert_eq!(m.rel_e1, RelId(1));
4471        assert_eq!(m.rel_e2, RelId(2));
4472        assert_eq!(m.rel_e3, RelId(3));
4473        assert_eq!(m.rel_e4, RelId(4));
4474    }
4475
4476    #[test]
4477    fn match_4cycle_rejects_non_multiway() {
4478        assert!(match_multiway_4cycle(&RirNode::Scan { rel: RelId(1) }).is_none());
4479    }
4480
4481    #[test]
4482    fn match_4cycle_rejects_triangle_shape() {
4483        // Triangle is 3 inputs — 4-cycle matcher must reject.
4484        let triangle = RirNode::MultiWayJoin {
4485            inputs: vec![
4486                RirNode::Scan { rel: RelId(1) },
4487                RirNode::Scan { rel: RelId(2) },
4488                RirNode::Scan { rel: RelId(3) },
4489            ],
4490            slot_vars: vec![
4491                vec![Some(0u32), Some(1)],
4492                vec![Some(1u32), Some(2)],
4493                vec![Some(0u32), Some(2)],
4494            ],
4495            output_columns: vec![
4496                ProjectExpr::Column(0),
4497                ProjectExpr::Column(1),
4498                ProjectExpr::Column(3),
4499            ],
4500            fallback: Box::new(RirNode::Unit),
4501            plan: None,
4502            var_order: None,
4503        };
4504        assert!(match_multiway_4cycle(&triangle).is_none());
4505    }
4506
4507    #[test]
4508    fn match_4cycle_rejects_rotated_output_columns() {
4509        let mut node = canonical_4cycle_multiway();
4510        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4511            output_columns.swap(0, 1);
4512        }
4513        assert!(match_multiway_4cycle(&node).is_none());
4514    }
4515
4516    /// 4-cycle Alt-grouping output_columns
4517    /// `[Column(5), Column(0), Column(1), Column(3)]` must
4518    /// match. The matcher relaxation accepts both
4519    /// Default `[0, 1, 3, 5]` and Alt `[5, 0, 1, 3]`.
4520    #[test]
4521    fn match_4cycle_accepts_alt_grouping_output_columns() {
4522        let mut node = canonical_4cycle_multiway();
4523        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4524            *output_columns = vec![
4525                ProjectExpr::Column(5),
4526                ProjectExpr::Column(0),
4527                ProjectExpr::Column(1),
4528                ProjectExpr::Column(3),
4529            ];
4530        }
4531        let m = match_multiway_4cycle(&node)
4532            .expect("matcher must accept the Alt-grouping output-column layout");
4533        // RelIds preserved positionally from the body's
4534        // MultiWayJoin.inputs (which are in canonical
4535        // semantic order [WX, XY, YZ, ZW]).
4536        assert_eq!(m.rel_e1, RelId(1));
4537        assert_eq!(m.rel_e2, RelId(2));
4538        assert_eq!(m.rel_e3, RelId(3));
4539        assert_eq!(m.rel_e4, RelId(4));
4540    }
4541
4542    /// 4-cycle output_columns `[1, 0, 3, 5]` (only swap
4543    /// of cols 0 and 1 vs Default) must STILL be rejected —
4544    /// it's neither Default nor Alt.
4545    #[test]
4546    fn match_4cycle_rejects_invalid_output_columns() {
4547        let mut node = canonical_4cycle_multiway();
4548        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4549            *output_columns = vec![
4550                ProjectExpr::Column(1),
4551                ProjectExpr::Column(0),
4552                ProjectExpr::Column(3),
4553                ProjectExpr::Column(5),
4554            ];
4555        }
4556        assert!(match_multiway_4cycle(&node).is_none());
4557    }
4558
4559    #[test]
4560    fn match_4cycle_rejects_arity_mismatched_output_columns() {
4561        let mut node = canonical_4cycle_multiway();
4562        if let RirNode::MultiWayJoin { output_columns, .. } = &mut node {
4563            output_columns.pop();
4564        }
4565        assert!(match_multiway_4cycle(&node).is_none());
4566    }
4567
4568    #[test]
4569    fn match_4cycle_rejects_unclosed_cycle() {
4570        // Slot 3's second var is supposed to equal slot 0's first
4571        // var (closing the cycle). Replace with a fresh id.
4572        let mut node = canonical_4cycle_multiway();
4573        if let RirNode::MultiWayJoin { slot_vars, .. } = &mut node {
4574            slot_vars[3] = vec![Some(3), Some(99)];
4575        }
4576        assert!(match_multiway_4cycle(&node).is_none());
4577    }
4578
4579    #[test]
4580    fn match_4cycle_rejects_non_scan_input() {
4581        let mut node = canonical_4cycle_multiway();
4582        if let RirNode::MultiWayJoin { inputs, .. } = &mut node {
4583            inputs[0] = RirNode::Unit;
4584        }
4585        assert!(match_multiway_4cycle(&node).is_none());
4586    }
4587
4588    #[test]
4589    fn match_4cycle_rejects_input_arity_mismatch() {
4590        let mut node = canonical_4cycle_multiway();
4591        if let RirNode::MultiWayJoin { inputs, .. } = &mut node {
4592            inputs.push(RirNode::Scan { rel: RelId(5) });
4593        }
4594        assert!(match_multiway_4cycle(&node).is_none());
4595    }
4596}