Skip to main content

xlog_runtime/executor/
mod.rs

1//! Query executor for RIR nodes
2//!
3//! The executor interprets RIR (Relational IR) nodes using the CUDA kernel provider
4//! to execute GPU-accelerated relational operations.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::{Arc, OnceLock};
8
9#[cfg(test)]
10use xlog_core::ScalarType;
11use xlog_core::{RelId, Result, RuntimeConfig, Schema, XlogError};
12use xlog_cuda::memory::TrackedCudaSlice;
13use xlog_cuda::{CudaBuffer, CudaKernelProvider};
14#[cfg(test)]
15use xlog_ir::{CompareOp, ConstValue, Stratum};
16use xlog_ir::{ExecutionPlan, Expr, JoinType, ProjectExpr, RirNode};
17use xlog_stats::{StatsManager, StatsSnapshot};
18
19use crate::ilp_registry::{IlpRegistry, IlpTaggedResult};
20use crate::profiler::{ExecutionStats, Profiler};
21use crate::RelationStore;
22
23mod delta;
24mod epistemic_workspace;
25mod expression;
26mod join_cache;
27mod node_dispatch;
28mod recursive;
29mod rewrite;
30mod wcoj_cost_model;
31mod wcoj_dispatch;
32#[cfg(feature = "wcoj-phase-timing")]
33pub mod wcoj_phase_timing;
34pub use epistemic_workspace::{
35    EpistemicGpuBatchExecutionResult, EpistemicGpuBatchExecutionTrace,
36    EpistemicGpuCandidateGenerationTrace, EpistemicGpuCandidateValidationTrace,
37    EpistemicGpuConstraintValidationTrace, EpistemicGpuConstraintWorldViewValidationTrace,
38    EpistemicGpuExecutionResult, EpistemicGpuFinalResultMaterializationTrace,
39    EpistemicGpuFinalResultTransferTrace, EpistemicGpuFinalTupleMaterializationTrace,
40    EpistemicGpuKernelTimingTrace, EpistemicGpuMaterializationTrace,
41    EpistemicGpuModelMembershipSource, EpistemicGpuModelMembershipTrace,
42    EpistemicGpuPreparedExecution, EpistemicGpuPropagationTrace, EpistemicGpuProviderIdentity,
43    EpistemicGpuRejectionReason, EpistemicGpuRuntimeCounters, EpistemicGpuRuntimePreflight,
44    EpistemicGpuRuntimeTrace, EpistemicGpuRuntimeWcojCertification,
45    EpistemicGpuTransferBudgetTrace, EpistemicGpuWorkspace, EpistemicGpuWorkspaceCapacities,
46    EpistemicGpuWorkspaceLayout, EpistemicGpuWorkspaceResetTrace,
47    EpistemicGpuWorldViewValidationTrace,
48};
49use join_cache::JoinIndexCache;
50pub use join_cache::JoinIndexCacheStats;
51
52/// Incremental update for a base relation.
53pub struct RelationDelta {
54    /// Tuples to insert (if any).
55    pub insert: Option<CudaBuffer>,
56    /// Tuples to delete (if any).
57    pub delete: Option<CudaBuffer>,
58}
59
60impl RelationDelta {
61    /// Create a new incremental update.
62    pub fn new(insert: Option<CudaBuffer>, delete: Option<CudaBuffer>) -> Self {
63        Self { insert, delete }
64    }
65}
66
67/// Runtime summary for a delta recomputation pass.
68#[derive(Clone, Debug, Default, PartialEq, Eq)]
69pub struct DeltaRecomputeStats {
70    /// Number of base relations changed by the delta map.
71    pub changed_relations: usize,
72    /// True when at least one relation supplied delete rows.
73    pub has_deletes: bool,
74    /// Number of SCCs whose dependency closure was affected.
75    pub affected_sccs: usize,
76    /// Number of affected SCCs that were cleared and fully recomputed.
77    pub recomputed_sccs: usize,
78    /// Number of affected SCCs updated without clearing prior output.
79    pub incremental_sccs: usize,
80}
81
82/// Runtime common subexpression elimination telemetry.
83#[derive(Clone, Debug, Default, PartialEq, Eq)]
84pub struct CommonSubexpressionStats {
85    /// Number of safe subplan cache hits.
86    pub hits: u64,
87    /// Number of safe subplans evaluated and inserted into the cache.
88    pub misses: u64,
89    /// Number of subplans rejected because they cross an unsafe boundary.
90    pub unsafe_rejections: u64,
91    /// Rejection reason labels observed during the current executor lifetime.
92    pub rejection_reasons: Vec<String>,
93}
94
95/// Runtime join observation used by adaptive re-optimization decisions.
96#[derive(Clone, Debug, PartialEq)]
97pub struct AdaptiveJoinObservation {
98    /// Left relation ID observed at the join boundary.
99    pub left_rel: RelId,
100    /// Right relation ID observed at the join boundary.
101    pub right_rel: RelId,
102    /// Estimated output rows before the join executed.
103    pub estimated_output_rows: u64,
104    /// Actual output rows observed after the join executed.
105    pub actual_output_rows: u64,
106    /// Absolute row-count delta between estimate and observation.
107    pub cardinality_delta_abs: u64,
108    /// Estimated selectivity before execution.
109    pub estimated_selectivity: f64,
110    /// Actual selectivity observed after execution.
111    pub actual_selectivity: f64,
112    /// Absolute selectivity delta.
113    pub selectivity_delta_abs: f64,
114    /// Runtime heat for the left relation.
115    pub left_heat: f32,
116    /// Runtime heat for the right relation.
117    pub right_heat: f32,
118    /// Absolute heat delta between the join inputs.
119    pub heat_delta_abs: f32,
120    /// Multiplicative mis-plan ratio, always at least 1.0.
121    pub misplan_ratio: f64,
122}
123
124/// Deterministic adaptive re-optimization decision action.
125#[derive(Clone, Copy, Debug, PartialEq, Eq)]
126pub enum AdaptiveReoptimizationAction {
127    /// Adaptive re-optimization is explicitly disabled.
128    Disabled,
129    /// Telemetry did not cross the deterministic adaptation threshold.
130    Skipped,
131    /// Telemetry crossed the threshold and the candidate should be attempted.
132    AttemptCandidate,
133    /// Candidate output matched the baseline and was adopted.
134    Adopted,
135    /// Candidate failed or diverged and the baseline snapshot was restored.
136    RolledBack,
137}
138
139/// Typed adaptive re-optimization decision.
140#[derive(Clone, Debug, PartialEq)]
141pub struct AdaptiveReoptimizationDecision {
142    /// Decision action.
143    pub action: AdaptiveReoptimizationAction,
144    /// Stable reason label.
145    pub reason: String,
146    /// Maximum observed mis-plan ratio used by the decision.
147    pub max_misplan_ratio: f64,
148    /// Minimum threshold required to attempt a candidate.
149    pub min_misplan_ratio: f64,
150}
151
152/// Typed adaptive re-optimization diagnostic kind.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub enum AdaptiveReoptimizationDiagnosticKind {
155    /// Candidate execution returned an error.
156    CandidateExecutionFailed,
157    /// Candidate completed but did not produce baseline-equivalent outputs.
158    CandidateOutputMismatch,
159    /// Baseline relation snapshot was restored after an adverse candidate.
160    RollbackRestoredBaseline,
161}
162
163/// Typed adaptive re-optimization diagnostic.
164#[derive(Clone, Debug, PartialEq, Eq)]
165pub struct AdaptiveReoptimizationDiagnostic {
166    /// Diagnostic kind.
167    pub kind: AdaptiveReoptimizationDiagnosticKind,
168    /// Stable reason label or error text.
169    pub message: String,
170}
171
172/// Runtime adaptive re-optimization telemetry.
173#[derive(Clone, Debug, Default, PartialEq)]
174pub struct AdaptiveReoptimizationStats {
175    /// Number of adaptive entry-point invocations.
176    pub invocations: u64,
177    /// Number of invocations skipped because the feature was disabled.
178    pub disabled: u64,
179    /// Number of enabled invocations that did not attempt a candidate.
180    pub skipped: u64,
181    /// Number of candidate plans adopted.
182    pub adopted: u64,
183    /// Number of candidate plans rolled back.
184    pub rolled_back: u64,
185    /// Last deterministic decision.
186    pub last_decision: Option<AdaptiveReoptimizationDecision>,
187    /// Baseline join observations from the most recent execution.
188    pub last_observations: Vec<AdaptiveJoinObservation>,
189    /// Typed diagnostics emitted by the adaptive entry point.
190    pub diagnostics: Vec<AdaptiveReoptimizationDiagnostic>,
191    /// Tracked data-plane device-to-host calls added during the most recent adaptive path.
192    pub data_plane_dtoh_calls: u64,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Hash)]
196enum CommonSubexpressionKey {
197    Scan {
198        rel: RelId,
199        generation: u64,
200    },
201    Filter {
202        input: Box<CommonSubexpressionKey>,
203        predicate: String,
204    },
205    Project {
206        input: Box<CommonSubexpressionKey>,
207        columns: Vec<String>,
208    },
209    Join {
210        left: Box<CommonSubexpressionKey>,
211        right: Box<CommonSubexpressionKey>,
212        left_keys: Vec<usize>,
213        right_keys: Vec<usize>,
214    },
215    Union {
216        inputs: Vec<CommonSubexpressionKey>,
217    },
218    Distinct {
219        input: Box<CommonSubexpressionKey>,
220        key_cols: Vec<usize>,
221    },
222}
223
224/// Query executor that interprets RIR nodes using GPU kernels
225///
226/// The executor processes execution plans by iterating through strata and
227/// executing RIR node trees. It maintains a relation store for intermediate
228/// and final results.
229///
230/// # Example
231///
232/// ```ignore
233/// use std::sync::Arc;
234/// use xlog_runtime::Executor;
235/// use xlog_cuda::CudaKernelProvider;
236///
237/// let provider = Arc::new(CudaKernelProvider::new(device, memory)?);
238/// let mut executor = Executor::new(provider);
239///
240/// // Execute a plan
241/// let result = executor.execute_plan(&plan)?;
242/// ```
243pub struct Executor {
244    /// CUDA kernel provider for GPU operations
245    provider: Arc<CudaKernelProvider>,
246    /// Storage for named relations
247    store: RelationStore,
248    /// Mapping from RelId to relation name
249    rel_names: HashMap<RelId, String>,
250    /// Mapping from relation name to RelId
251    name_to_rel: HashMap<String, RelId>,
252    /// Runtime statistics for adaptive optimization
253    stats: StatsManager,
254    /// Cached build-side join indexes (adaptive indexing)
255    join_index_cache: JoinIndexCache,
256    /// Per-execution CSE cache for safe deterministic subplans.
257    common_subexpression_cache: HashMap<CommonSubexpressionKey, CudaBuffer>,
258    /// Runtime CSE telemetry for evidence and diagnostics.
259    common_subexpression_stats: CommonSubexpressionStats,
260    /// Runtime adaptive re-optimization telemetry for adoption/rollback evidence.
261    adaptive_reoptimization_stats: AdaptiveReoptimizationStats,
262    /// Per-plan join observations captured before `StatsManager` is updated.
263    adaptive_join_observations: Vec<AdaptiveJoinObservation>,
264    /// Runtime configuration
265    config: RuntimeConfig,
266    /// Performance profiler for --stats output
267    profiler: Profiler,
268    /// ILP tensor mask registry
269    ilp_registry: IlpRegistry,
270    /// Last ILP tagged result metadata
271    ilp_last_result: Option<IlpTaggedResult>,
272    /// Number of times the env-gated WCOJ triangle dispatch
273    /// (`XLOG_USE_WCOJ_TRIANGLE_U32` / `RuntimeConfig::wcoj_triangle_dispatch`)
274    /// produced a result and the executor installed it. Tests use this
275    /// counter to assert that the WCOJ path actually fired vs. silently
276    /// falling back to the binary-join chain with the same answer.
277    wcoj_triangle_dispatch_count: u64,
278    /// Count of times `try_dispatch_wcoj_4cycle` produced a result and the
279    /// executor installed it. Tracks 4-cycle dispatches separately from triangle.
280    pub(super) wcoj_4cycle_dispatch_count: u64,
281    /// Count of times the chain dispatcher produced a result and the
282    /// executor installed it.
283    pub(super) chain_dispatch_count: u64,
284    /// Count of times `try_dispatch_wcoj_clique5` produced
285    /// a result and the executor installed it. Public accessor:
286    /// `Executor::wcoj_clique5_dispatch_count(&self) -> u64`.
287    pub(super) wcoj_clique5_dispatch_count: u64,
288    /// Count of times `try_dispatch_wcoj_clique6` produced
289    /// a result and the executor installed it.
290    pub(super) wcoj_clique6_dispatch_count: u64,
291    /// Count of times `try_dispatch_wcoj_clique7` produced
292    /// a result and the executor installed it.
293    pub(super) wcoj_clique7_dispatch_count: u64,
294    /// Count of times `try_dispatch_wcoj_clique8` produced
295    /// a result and the executor installed it.
296    pub(super) wcoj_clique8_dispatch_count: u64,
297    /// Number of recursive Merge-phase K-clique histogram refresh
298    /// boundaries observed.
299    pub(super) kclique_histogram_refresh_count: u64,
300    /// Cumulative nanoseconds spent in recursive Merge-phase K-clique
301    /// histogram refresh accounting.
302    pub(super) kclique_histogram_refresh_nanos: u128,
303    /// Count of times `execute_join` routed an inner-join to the
304    /// nested-loop provider entry point
305    /// (`CudaKernelProvider::nested_loop_join_v2_inner_u32_1key`)
306    /// because the eligibility predicate + Cartesian-product
307    /// threshold both held. Tests use this counter to assert that
308    /// the nested-loop path actually fired vs. silently falling back to
309    /// hash. Public accessor:
310    /// `Executor::nested_loop_dispatch_count(&self) -> u64`.
311    pub(super) nested_loop_dispatch_count: u64,
312    /// Counts WCOJ pipeline errors (layout or kernel failures) that were
313    /// converted into binary-join declines instead of propagating. Healthy
314    /// dispatch keeps this at 0; nonzero values expose regressed kernels
315    /// that would otherwise hide behind the silent-fallback contract.
316    /// `XLOG_WCOJ_STRICT=1` propagates the error instead. Public accessor:
317    /// `Executor::wcoj_error_decline_count(&self) -> u64`.
318    pub(super) wcoj_error_decline_count: u64,
319    /// Count of fused group-by-root count dispatches that produced the
320    /// installed result. Public accessor:
321    /// `Executor::wcoj_groupby_fusion_dispatch_count(&self) -> u64`.
322    pub(super) wcoj_groupby_fusion_dispatch_count: u64,
323    /// Count of generalized Free Join dispatches that produced the
324    /// installed result. Public accessor:
325    /// `Executor::free_join_dispatch_count(&self) -> u64`.
326    pub(super) free_join_dispatch_count: u64,
327    /// D3 — count of factorized recursive-delta dispatches that produced
328    /// an installed novel set. Public accessor:
329    /// `Executor::factorized_delta_dispatch_count(&self) -> u64`.
330    pub(super) factorized_delta_dispatch_count: u64,
331    /// Cached non-default stream for the WCOJ triangle dispatch hook.
332    /// Acquired lazily on first dispatch and reused thereafter — mirrors
333    /// [`xlog_cuda::CudaKernelProvider::recorded_op_stream`] for the
334    /// same reason: the device-runtime
335    /// [`xlog_cuda::device_runtime::StreamPool`] is grow-only with a
336    /// hard cap (default 16). Acquiring per-invocation would silently
337    /// drain the pool on long-lived runtimes (benchmarks, soak tests,
338    /// any program with >16 matching WCOJ-eligible rules) and route
339    /// subsequent dispatches through the binary-join fallback,
340    /// invalidating the dispatch counter and the gate-on path.
341    ///
342    /// **Shared across WCOJ shapes**: triangle and 4-cycle dispatch both
343    /// acquire and reuse this single stream. Renamed from
344    /// `wcoj_triangle_stream` when 4-cycle dispatch landed.
345    wcoj_dispatch_stream: OnceLock<xlog_cuda::device_runtime::StreamId>,
346    /// Diagnostic-only: per-dispatch WCOJ triangle phase
347    /// timings, populated by `try_dispatch_wcoj_triangle` when
348    /// the `wcoj-phase-timing` Cargo feature is on. Read by the
349    /// `wcoj_phase_report` binary in xlog-integration. Field is
350    /// absent under feature-off so production builds have zero
351    /// overhead.
352    #[cfg(feature = "wcoj-phase-timing")]
353    pub(super) last_wcoj_phase_timing:
354        std::sync::Mutex<Option<wcoj_phase_timing::WcojDispatchPhaseTiming>>,
355    /// Per-iteration recursive-SCC stats trace, populated by
356    /// `execute_recursive_scc` after each delta-relation and full-relation
357    /// cardinality update site. Field, types, accessor, and populating call
358    /// sites are gated on the `recursive-stats-trace` Cargo feature
359    /// (default OFF) so production builds carry zero trace overhead: no
360    /// field, no populating call site, no symbol. The recursive stats trace
361    /// test target declares this feature in its `required-features`, so it
362    /// is only built when the feature is enabled.
363    #[cfg(feature = "recursive-stats-trace")]
364    pub(super) last_recursive_stats_trace: RecursiveStatsTrace,
365}
366
367/// Recursive-SCC stats trace for recursive cardinality updates.
368///
369/// Captures one entry per `(iteration, predicate)` boundary
370/// at which `execute_recursive_scc` updates `StatsManager` for
371/// a recursive predicate's `(full_rel, delta_rel)` RelIds.
372/// Used by recursive stats trace tests to assert per-iteration cardinality
373/// evolution and binary-join estimate evolution without intrusive
374/// instrumentation.
375#[cfg(feature = "recursive-stats-trace")]
376#[derive(Debug, Default, Clone)]
377#[allow(missing_docs)]
378pub struct RecursiveStatsTrace {
379    pub entries: Vec<RecursiveStatsTraceEntry>,
380}
381
382/// One entry per `(iteration, pred)` boundary.
383///
384/// `iteration == 0` is the seed pass; `iteration >= 1` is the
385/// fixpoint loop. `phase` distinguishes the delta-recording site
386/// from the full-recording site so full-row growth assertions only
387/// see snapshots where `full_rel` advanced, and delta-evolution
388/// assertions only see delta-update snapshots.
389#[cfg(feature = "recursive-stats-trace")]
390#[derive(Debug, Clone)]
391#[allow(missing_docs)]
392pub struct RecursiveStatsTraceEntry {
393    pub iteration: usize,
394    pub pred: String,
395    pub full_rel: RelId,
396    pub delta_rel: RelId,
397    pub full_rows: u64,
398    pub delta_rows: u64,
399    pub phase: RecursiveStatsPhase,
400    /// Optional binary-join estimate the cost model would use
401    /// for the variant body's first binary hop. Triangle:
402    /// `(delta_e1_rel, e2_rel, &[1], &[0])`. 4-cycle: same
403    /// `(delta_e1_rel, e2_rel, &[1], &[0])` (slot 0 → slot 1
404    /// adjacency on the X variable).
405    pub binary_est_for_variant: Option<u64>,
406}
407
408#[cfg(feature = "recursive-stats-trace")]
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410#[allow(missing_docs)]
411pub enum RecursiveStatsPhase {
412    /// Seed pass — full_rel + delta_rel both updated; trace
413    /// entry contains both row counts. iteration == 0.
414    Seed,
415    /// Fixpoint loop delta update: `delta_rel` updated while `full_rel`
416    /// holds the previous iteration's value. Trace entry reports
417    /// `full_rows` as the previous-iteration cardinality it sees.
418    Phase2Delta,
419    /// Fixpoint loop full update after merge. Trace entry reports the new
420    /// full row count plus the `delta_rel` value just recorded by the
421    /// delta update.
422    Phase4Full,
423}
424
425impl Executor {
426    /// Create a new executor with the given kernel provider
427    ///
428    /// # Arguments
429    /// * `provider` - The CUDA kernel provider for GPU operations
430    pub fn new(provider: Arc<CudaKernelProvider>) -> Self {
431        Self::new_with_config(provider, RuntimeConfig::default())
432    }
433
434    /// Create a new executor with the given kernel provider and runtime config
435    pub fn new_with_config(provider: Arc<CudaKernelProvider>, config: RuntimeConfig) -> Self {
436        const DEFAULT_JOIN_INDEX_CACHE_BYTES: u64 = 256 * 1024 * 1024;
437        let max_index_cache_bytes =
438            (provider.memory().budget().device_bytes / 4).min(DEFAULT_JOIN_INDEX_CACHE_BYTES);
439        Self {
440            provider: provider.clone(),
441            store: RelationStore::new(provider.clone()),
442            rel_names: HashMap::new(),
443            name_to_rel: HashMap::new(),
444            stats: StatsManager::new(),
445            join_index_cache: JoinIndexCache::new(max_index_cache_bytes),
446            common_subexpression_cache: HashMap::new(),
447            common_subexpression_stats: CommonSubexpressionStats::default(),
448            adaptive_reoptimization_stats: AdaptiveReoptimizationStats::default(),
449            adaptive_join_observations: Vec::new(),
450            config,
451            profiler: Profiler::default(),
452            ilp_registry: IlpRegistry::new(),
453            ilp_last_result: None,
454            wcoj_triangle_dispatch_count: 0,
455            wcoj_4cycle_dispatch_count: 0,
456            chain_dispatch_count: 0,
457            wcoj_clique5_dispatch_count: 0,
458            wcoj_clique6_dispatch_count: 0,
459            wcoj_clique7_dispatch_count: 0,
460            wcoj_clique8_dispatch_count: 0,
461            kclique_histogram_refresh_count: 0,
462            kclique_histogram_refresh_nanos: 0,
463            nested_loop_dispatch_count: 0,
464            wcoj_error_decline_count: 0,
465            wcoj_groupby_fusion_dispatch_count: 0,
466            free_join_dispatch_count: 0,
467            factorized_delta_dispatch_count: 0,
468            wcoj_dispatch_stream: OnceLock::new(),
469            #[cfg(feature = "wcoj-phase-timing")]
470            last_wcoj_phase_timing: std::sync::Mutex::new(None),
471            #[cfg(feature = "recursive-stats-trace")]
472            last_recursive_stats_trace: RecursiveStatsTrace::default(),
473        }
474    }
475
476    /// Return the most recent recursive-SCC stats trace populated by
477    /// `execute_recursive_scc`. Gated on the `recursive-stats-trace`
478    /// Cargo feature; default OFF.
479    #[cfg(feature = "recursive-stats-trace")]
480    pub fn last_recursive_stats_trace(&self) -> &RecursiveStatsTrace {
481        &self.last_recursive_stats_trace
482    }
483
484    /// Take the most recent WCOJ triangle dispatch's per-phase
485    /// timing breakdown. Reading clears the slot — designed for
486    /// one-shot consumption by the `wcoj_phase_report` binary.
487    /// Returns `None` if no triangle has dispatched since the
488    /// last read (or since construction).
489    ///
490    /// Compiled in only with the `wcoj-phase-timing` Cargo
491    /// feature; production builds have no such method.
492    #[cfg(feature = "wcoj-phase-timing")]
493    pub fn take_wcoj_phase_timing(&self) -> Option<wcoj_phase_timing::WcojDispatchPhaseTiming> {
494        self.last_wcoj_phase_timing
495            .lock()
496            .ok()
497            .and_then(|mut g| g.take())
498    }
499
500    /// Enable or disable the performance profiler
501    ///
502    /// When enabled, execution statistics will be collected for --stats output.
503    pub fn set_profiling(&mut self, enabled: bool) {
504        self.profiler = Profiler::new(enabled);
505        if enabled {
506            self.profiler
507                .set_memory_budget(self.provider.memory().budget().device_bytes);
508        }
509    }
510
511    /// Check if profiling is enabled
512    pub fn is_profiling(&self) -> bool {
513        self.profiler.is_enabled()
514    }
515
516    /// Get execution statistics
517    ///
518    /// Returns collected statistics if profiling was enabled.
519    pub fn execution_stats(&self, total_output_rows: u64) -> ExecutionStats {
520        self.profiler.execution_stats(total_output_rows)
521    }
522
523    /// Get a reference to the relation store
524    pub fn store(&self) -> &RelationStore {
525        &self.store
526    }
527
528    /// Get a mutable reference to the relation store
529    pub fn store_mut(&mut self) -> &mut RelationStore {
530        &mut self.store
531    }
532
533    /// Get a mutable reference to the ILP registry.
534    pub fn ilp_registry_mut(&mut self) -> &mut IlpRegistry {
535        &mut self.ilp_registry
536    }
537
538    /// Get a shared reference to the ILP registry.
539    pub fn ilp_registry(&self) -> &IlpRegistry {
540        &self.ilp_registry
541    }
542
543    /// Get the last ILP tagged result.
544    pub fn ilp_last_result(&self) -> Option<&IlpTaggedResult> {
545        self.ilp_last_result.as_ref()
546    }
547
548    /// Store a relation buffer and invalidate join indices.
549    pub fn put_relation(&mut self, name: &str, buffer: CudaBuffer) {
550        self.store_put(name, buffer);
551    }
552
553    /// Get a reference to the runtime statistics manager
554    pub fn stats(&self) -> &StatsManager {
555        &self.stats
556    }
557
558    /// Return persistent join-index manager telemetry.
559    pub fn join_index_cache_stats(&self) -> JoinIndexCacheStats {
560        self.join_index_cache.stats()
561    }
562
563    /// Reset executor state for Monte Carlo sampling.
564    ///
565    /// Clears relation storage and join index cache while preserving relation registrations.
566    pub fn reset_for_mc(&mut self) {
567        self.store.clear();
568        self.join_index_cache.clear();
569        self.common_subexpression_cache.clear();
570        self.adaptive_join_observations.clear();
571    }
572
573    /// Targeted MC reset: preserve base/static relations and clear dynamic ones.
574    ///
575    /// Unlike [`Self::reset_for_mc`] which drops all relations, this method keeps the
576    /// relations listed in `preserve` untouched, removes every other relation,
577    /// then re-creates the relations specified in `clear_to_empty` as empty
578    /// GPU buffers with the given schemas.  The join-index cache is fully
579    /// invalidated because dynamic relations have changed.
580    ///
581    /// # Arguments
582    /// * `preserve` - Relation names to keep as-is (base/static facts).
583    /// * `clear_to_empty` - `(name, schema)` pairs for dynamic relations that
584    ///   should be present but empty after the reset.
585    pub fn reset_for_mc_relations(
586        &mut self,
587        preserve: &[&str],
588        clear_to_empty: &[(&str, Schema)],
589    ) -> Result<()> {
590        let preserve_set: HashSet<&str> = preserve.iter().copied().collect();
591        let existing_names: Vec<String> = self.store.names().map(|s| s.to_string()).collect();
592
593        for name in &existing_names {
594            if !preserve_set.contains(name.as_str()) {
595                self.store.remove(name);
596            }
597        }
598
599        for (name, schema) in clear_to_empty {
600            let empty = self.provider.create_empty_buffer(schema.clone())?;
601            self.store.put(name, empty);
602        }
603
604        self.join_index_cache.clear();
605        self.common_subexpression_cache.clear();
606        self.adaptive_join_observations.clear();
607        Ok(())
608    }
609
610    /// Reset executor state for ILP attempt reuse.
611    ///
612    /// Clears ILP registry (masks + tagged results), relation storage,
613    /// join index cache, stats, and profiler. Preserves relation name
614    /// registrations (rel_names, name_to_rel) since those are immutable
615    /// compile artifacts.
616    pub fn reset_for_ilp(&mut self) {
617        self.ilp_registry.clear();
618        self.ilp_last_result = None;
619        self.store.clear();
620        self.join_index_cache.clear();
621        self.common_subexpression_cache.clear();
622        self.adaptive_join_observations.clear();
623        self.stats = StatsManager::new();
624        self.profiler = Profiler::default();
625    }
626
627    /// Get a mutable reference to the runtime statistics manager
628    pub fn stats_mut(&mut self) -> &mut StatsManager {
629        &mut self.stats
630    }
631
632    /// Capture a runtime statistics snapshot, including predicate name mappings.
633    ///
634    /// Use this snapshot to seed the compiler/optimizer on subsequent compilations.
635    pub fn stats_snapshot(&self) -> StatsSnapshot {
636        let mut snapshot = self.stats.snapshot();
637        snapshot.rel_names = self
638            .rel_names
639            .iter()
640            .map(|(id, name)| (*id, name.clone()))
641            .collect();
642        snapshot
643    }
644
645    /// Return runtime CSE telemetry for evidence and diagnostics.
646    pub fn common_subexpression_stats(&self) -> &CommonSubexpressionStats {
647        &self.common_subexpression_stats
648    }
649
650    /// Return adaptive re-optimization telemetry for evidence and diagnostics.
651    pub fn adaptive_reoptimization_stats(&self) -> &AdaptiveReoptimizationStats {
652        &self.adaptive_reoptimization_stats
653    }
654
655    /// Replay the deterministic adaptive decision against captured telemetry.
656    pub fn replay_adaptive_reoptimization_decision(
657        &self,
658        observations: &[AdaptiveJoinObservation],
659    ) -> AdaptiveReoptimizationDecision {
660        self.adaptive_reoptimization_decision(observations)
661    }
662
663    fn common_subexpression_enabled(&self) -> bool {
664        self.config.resolved_common_subexpression_elimination()
665    }
666
667    fn adaptive_reoptimization_enabled(&self) -> bool {
668        self.config.resolved_adaptive_reoptimization()
669    }
670
671    fn adaptive_reoptimization_decision(
672        &self,
673        observations: &[AdaptiveJoinObservation],
674    ) -> AdaptiveReoptimizationDecision {
675        let min_misplan_ratio = self
676            .config
677            .resolved_adaptive_reoptimization_min_misplan_ratio();
678        let max_misplan_ratio = observations
679            .iter()
680            .map(|observation| observation.misplan_ratio)
681            .fold(1.0_f64, f64::max);
682
683        if !self.adaptive_reoptimization_enabled() {
684            return AdaptiveReoptimizationDecision {
685                action: AdaptiveReoptimizationAction::Disabled,
686                reason: "adaptive_reoptimization_disabled".to_string(),
687                max_misplan_ratio,
688                min_misplan_ratio,
689            };
690        }
691
692        if observations.is_empty() {
693            return AdaptiveReoptimizationDecision {
694                action: AdaptiveReoptimizationAction::Skipped,
695                reason: "no_join_telemetry".to_string(),
696                max_misplan_ratio,
697                min_misplan_ratio,
698            };
699        }
700
701        if max_misplan_ratio >= min_misplan_ratio {
702            AdaptiveReoptimizationDecision {
703                action: AdaptiveReoptimizationAction::AttemptCandidate,
704                reason: "misplan_threshold_crossed".to_string(),
705                max_misplan_ratio,
706                min_misplan_ratio,
707            }
708        } else {
709            AdaptiveReoptimizationDecision {
710                action: AdaptiveReoptimizationAction::Skipped,
711                reason: "misplan_threshold_not_crossed".to_string(),
712                max_misplan_ratio,
713                min_misplan_ratio,
714            }
715        }
716    }
717
718    fn record_adaptive_join_observation(
719        &mut self,
720        left_rel: RelId,
721        right_rel: RelId,
722        left_keys: &[usize],
723        right_keys: &[usize],
724        input_rows: u64,
725        actual_output_rows: u64,
726    ) {
727        let estimated_output_rows = self
728            .stats
729            .estimate_join_cardinality(left_rel, right_rel, left_keys, right_keys);
730        let estimated_selectivity = if input_rows > 0 {
731            estimated_output_rows as f64 / input_rows as f64
732        } else {
733            0.0
734        };
735        let actual_selectivity = if input_rows > 0 {
736            actual_output_rows as f64 / input_rows as f64
737        } else {
738            0.0
739        };
740        let cardinality_delta_abs = estimated_output_rows.abs_diff(actual_output_rows);
741        let selectivity_delta_abs = (estimated_selectivity - actual_selectivity).abs();
742        let left_heat = self
743            .stats
744            .get_relation_stats(left_rel)
745            .map(|stats| stats.heat)
746            .unwrap_or(0.0);
747        let right_heat = self
748            .stats
749            .get_relation_stats(right_rel)
750            .map(|stats| stats.heat)
751            .unwrap_or(0.0);
752        let heat_delta_abs = (left_heat - right_heat).abs();
753        let smaller = estimated_output_rows.min(actual_output_rows);
754        let larger = estimated_output_rows.max(actual_output_rows);
755        let misplan_ratio = if smaller == 0 {
756            if larger == 0 {
757                1.0
758            } else {
759                f64::INFINITY
760            }
761        } else {
762            (larger as f64 / smaller as f64).max(1.0)
763        };
764
765        self.adaptive_join_observations
766            .push(AdaptiveJoinObservation {
767                left_rel,
768                right_rel,
769                estimated_output_rows,
770                actual_output_rows,
771                cardinality_delta_abs,
772                estimated_selectivity,
773                actual_selectivity,
774                selectivity_delta_abs,
775                left_heat,
776                right_heat,
777                heat_delta_abs,
778                misplan_ratio,
779            });
780    }
781
782    fn plan_head_names(plan: &ExecutionPlan) -> Vec<String> {
783        let mut names = Vec::new();
784        for stratum in &plan.strata {
785            for scc_id in &stratum.sccs {
786                if let Some(rules) = plan.rules_by_scc.get(*scc_id as usize) {
787                    for rule in rules {
788                        if !names.iter().any(|name| name == &rule.head) {
789                            names.push(rule.head.clone());
790                        }
791                    }
792                }
793            }
794        }
795
796        if names.is_empty() {
797            for rules in &plan.rules_by_scc {
798                for rule in rules {
799                    if !names.iter().any(|name| name == &rule.head) {
800                        names.push(rule.head.clone());
801                    }
802                }
803            }
804        }
805
806        names
807    }
808
809    fn clone_store_snapshot(&self) -> Result<HashMap<String, CudaBuffer>> {
810        let names: Vec<String> = self.store.names().map(|name| name.to_string()).collect();
811        let mut snapshot = HashMap::with_capacity(names.len());
812        for name in names {
813            if let Some(buffer) = self.store.get(&name) {
814                snapshot.insert(name, self.clone_buffer(buffer)?);
815            }
816        }
817        Ok(snapshot)
818    }
819
820    fn restore_store_snapshot(&mut self, snapshot: HashMap<String, CudaBuffer>) {
821        let snapshot_names: HashSet<String> = snapshot.keys().cloned().collect();
822        let existing_names: Vec<String> = self.store.names().map(|name| name.to_string()).collect();
823        for name in existing_names {
824            if !snapshot_names.contains(&name) {
825                self.store.remove(&name);
826            }
827        }
828        for (name, buffer) in snapshot {
829            self.store.put(&name, buffer);
830        }
831    }
832
833    fn restore_stats_snapshot(&mut self, snapshot: &StatsSnapshot) {
834        self.stats.clear();
835        self.stats.merge_snapshot(snapshot);
836    }
837
838    fn clone_final_plan_output(&self, plan: &ExecutionPlan) -> Result<CudaBuffer> {
839        let head_names = Self::plan_head_names(plan);
840        if let Some(name) = head_names.last() {
841            let output = self.store.get(name).ok_or_else(|| {
842                XlogError::Execution(format!("adaptive reoptimization output missing: {name}"))
843            })?;
844            return self.clone_buffer(output);
845        }
846
847        self.provider.create_empty_buffer(Schema::new(vec![]))
848    }
849
850    fn plan_outputs_match(
851        &self,
852        head_names: &[String],
853        baseline_snapshot: &HashMap<String, CudaBuffer>,
854    ) -> Result<bool> {
855        for name in head_names {
856            let Some(baseline) = baseline_snapshot.get(name) else {
857                return Ok(false);
858            };
859            let Some(candidate) = self.store.get(name) else {
860                return Ok(false);
861            };
862            if !self.buffers_gpu_set_equivalent(baseline, candidate)? {
863                return Ok(false);
864            }
865        }
866        Ok(true)
867    }
868
869    fn buffers_gpu_set_equivalent(&self, left: &CudaBuffer, right: &CudaBuffer) -> Result<bool> {
870        if left.schema() != right.schema() {
871            return Ok(false);
872        }
873        let left_rows = self.provider.device_row_count(left)?;
874        let right_rows = self.provider.device_row_count(right)?;
875        if left_rows != right_rows {
876            return Ok(false);
877        }
878
879        let left_minus_right = self.provider.diff_full_row(left, right)?;
880        if self.provider.device_row_count(&left_minus_right)? != 0 {
881            return Ok(false);
882        }
883        let right_minus_left = self.provider.diff_full_row(right, left)?;
884        Ok(self.provider.device_row_count(&right_minus_left)? == 0)
885    }
886
887    fn record_adaptive_dtoh_delta(&mut self, before_dtoh_calls: u64) {
888        let after_dtoh_calls = self.provider.host_transfer_stats().dtoh_calls;
889        self.adaptive_reoptimization_stats.data_plane_dtoh_calls =
890            after_dtoh_calls.saturating_sub(before_dtoh_calls);
891    }
892
893    fn is_common_subexpression_cacheable(node: &RirNode) -> bool {
894        !matches!(node, RirNode::Unit | RirNode::Scan { .. })
895    }
896
897    fn record_common_subexpression_rejection(&mut self, reason: &'static str) {
898        self.common_subexpression_stats.unsafe_rejections = self
899            .common_subexpression_stats
900            .unsafe_rejections
901            .saturating_add(1);
902        if !self
903            .common_subexpression_stats
904            .rejection_reasons
905            .iter()
906            .any(|seen| seen == reason)
907        {
908            self.common_subexpression_stats
909                .rejection_reasons
910                .push(reason.to_string());
911        }
912    }
913
914    fn common_subexpression_key(&mut self, node: &RirNode) -> Option<CommonSubexpressionKey> {
915        match node {
916            RirNode::Unit => None,
917            RirNode::Scan { rel } => {
918                let generation = self
919                    .get_rel_name(*rel)
920                    .and_then(|name| self.store.version(name))
921                    .unwrap_or(0);
922                Some(CommonSubexpressionKey::Scan {
923                    rel: *rel,
924                    generation,
925                })
926            }
927            RirNode::Filter { input, predicate } => {
928                let input = self.common_subexpression_key(input)?;
929                Some(CommonSubexpressionKey::Filter {
930                    input: Box::new(input),
931                    predicate: Self::expr_cse_key(predicate),
932                })
933            }
934            RirNode::Project { input, columns } => {
935                let input = self.common_subexpression_key(input)?;
936                Some(CommonSubexpressionKey::Project {
937                    input: Box::new(input),
938                    columns: columns.iter().map(Self::project_expr_cse_key).collect(),
939                })
940            }
941            RirNode::Join {
942                left,
943                right,
944                left_keys,
945                right_keys,
946                join_type,
947            } => {
948                if *join_type != JoinType::Inner {
949                    self.record_common_subexpression_rejection("negation_or_outer_join_boundary");
950                    return None;
951                }
952                let left = self.common_subexpression_key(left)?;
953                let right = self.common_subexpression_key(right)?;
954                Some(CommonSubexpressionKey::Join {
955                    left: Box::new(left),
956                    right: Box::new(right),
957                    left_keys: left_keys.clone(),
958                    right_keys: right_keys.clone(),
959                })
960            }
961            RirNode::Union { inputs } => {
962                let mut input_keys = Vec::with_capacity(inputs.len());
963                for input in inputs {
964                    input_keys.push(self.common_subexpression_key(input)?);
965                }
966                Some(CommonSubexpressionKey::Union { inputs: input_keys })
967            }
968            RirNode::Distinct { input, key_cols } => {
969                let input = self.common_subexpression_key(input)?;
970                Some(CommonSubexpressionKey::Distinct {
971                    input: Box::new(input),
972                    key_cols: key_cols.clone(),
973                })
974            }
975            RirNode::Diff { .. } => {
976                self.record_common_subexpression_rejection("negation_or_difference_boundary");
977                None
978            }
979            RirNode::GroupBy { .. } => {
980                self.record_common_subexpression_rejection("aggregate_boundary");
981                None
982            }
983            RirNode::Fixpoint { .. } => {
984                self.record_common_subexpression_rejection("recursive_or_mutable_boundary");
985                None
986            }
987            RirNode::TensorMaskedJoin { .. } => {
988                self.record_common_subexpression_rejection("provenance_or_tensor_boundary");
989                None
990            }
991            RirNode::MultiWayJoin { .. } | RirNode::ChainJoin { .. } => {
992                self.record_common_subexpression_rejection("specialized_dispatch_boundary");
993                None
994            }
995        }
996    }
997
998    fn expr_cse_key(expr: &Expr) -> String {
999        match expr {
1000            Expr::Column(idx) => format!("col:{idx}"),
1001            Expr::Const(value) => format!("const:{}", Self::const_cse_key(value)),
1002            Expr::Compare { left, op, right } => format!(
1003                "cmp:{}:{}:{}",
1004                Self::expr_cse_key(left),
1005                Self::compare_op_cse_key(*op),
1006                Self::expr_cse_key(right)
1007            ),
1008            Expr::And(items) => format!(
1009                "and:[{}]",
1010                items
1011                    .iter()
1012                    .map(Self::expr_cse_key)
1013                    .collect::<Vec<_>>()
1014                    .join(",")
1015            ),
1016            Expr::Or(items) => format!(
1017                "or:[{}]",
1018                items
1019                    .iter()
1020                    .map(Self::expr_cse_key)
1021                    .collect::<Vec<_>>()
1022                    .join(",")
1023            ),
1024            Expr::Not(inner) => format!("not:{}", Self::expr_cse_key(inner)),
1025            Expr::Add(left, right) => {
1026                format!(
1027                    "add:{}:{}",
1028                    Self::expr_cse_key(left),
1029                    Self::expr_cse_key(right)
1030                )
1031            }
1032            Expr::Sub(left, right) => {
1033                format!(
1034                    "sub:{}:{}",
1035                    Self::expr_cse_key(left),
1036                    Self::expr_cse_key(right)
1037                )
1038            }
1039            Expr::Mul(left, right) => {
1040                format!(
1041                    "mul:{}:{}",
1042                    Self::expr_cse_key(left),
1043                    Self::expr_cse_key(right)
1044                )
1045            }
1046            Expr::Div(left, right) => {
1047                format!(
1048                    "div:{}:{}",
1049                    Self::expr_cse_key(left),
1050                    Self::expr_cse_key(right)
1051                )
1052            }
1053            Expr::Mod(left, right) => {
1054                format!(
1055                    "mod:{}:{}",
1056                    Self::expr_cse_key(left),
1057                    Self::expr_cse_key(right)
1058                )
1059            }
1060            Expr::Abs(inner) => format!("abs:{}", Self::expr_cse_key(inner)),
1061            Expr::Min(left, right) => {
1062                format!(
1063                    "min:{}:{}",
1064                    Self::expr_cse_key(left),
1065                    Self::expr_cse_key(right)
1066                )
1067            }
1068            Expr::Max(left, right) => {
1069                format!(
1070                    "max:{}:{}",
1071                    Self::expr_cse_key(left),
1072                    Self::expr_cse_key(right)
1073                )
1074            }
1075            Expr::Pow(left, right) => {
1076                format!(
1077                    "pow:{}:{}",
1078                    Self::expr_cse_key(left),
1079                    Self::expr_cse_key(right)
1080                )
1081            }
1082            Expr::Cast(inner, ty) => format!("cast:{:?}:{}", ty, Self::expr_cse_key(inner)),
1083            Expr::Conditional {
1084                condition,
1085                then_expr,
1086                else_expr,
1087            } => format!(
1088                "if:{}:{}:{}",
1089                Self::expr_cse_key(condition),
1090                Self::expr_cse_key(then_expr),
1091                Self::expr_cse_key(else_expr)
1092            ),
1093        }
1094    }
1095
1096    fn project_expr_cse_key(expr: &ProjectExpr) -> String {
1097        match expr {
1098            ProjectExpr::Column(idx) => format!("col:{idx}"),
1099            ProjectExpr::Computed(expr, ty) => {
1100                format!("computed:{:?}:{}", ty, Self::expr_cse_key(expr))
1101            }
1102        }
1103    }
1104
1105    fn const_cse_key(value: &xlog_ir::ConstValue) -> String {
1106        match value {
1107            xlog_ir::ConstValue::U32(value) => format!("u32:{value}"),
1108            xlog_ir::ConstValue::U64(value) => format!("u64:{value}"),
1109            xlog_ir::ConstValue::I32(value) => format!("i32:{value}"),
1110            xlog_ir::ConstValue::I64(value) => format!("i64:{value}"),
1111            xlog_ir::ConstValue::F32(value) => format!("f32:{:08x}", value.to_bits()),
1112            xlog_ir::ConstValue::F64(value) => format!("f64:{:016x}", value.to_bits()),
1113            xlog_ir::ConstValue::Bool(value) => format!("bool:{value}"),
1114            xlog_ir::ConstValue::Symbol(value) => format!("symbol:{value:?}"),
1115        }
1116    }
1117
1118    fn compare_op_cse_key(op: xlog_ir::CompareOp) -> &'static str {
1119        match op {
1120            xlog_ir::CompareOp::Eq => "eq",
1121            xlog_ir::CompareOp::Ne => "ne",
1122            xlog_ir::CompareOp::Lt => "lt",
1123            xlog_ir::CompareOp::Le => "le",
1124            xlog_ir::CompareOp::Gt => "gt",
1125            xlog_ir::CompareOp::Ge => "ge",
1126        }
1127    }
1128
1129    fn store_put(&mut self, name: &str, buffer: CudaBuffer) {
1130        self.common_subexpression_cache.clear();
1131        self.store.put(name, buffer);
1132        if let Some(&rel_id) = self.name_to_rel.get(name) {
1133            self.join_index_cache.invalidate_rel(rel_id);
1134        }
1135    }
1136
1137    fn store_remove(&mut self, name: &str) -> Option<CudaBuffer> {
1138        self.common_subexpression_cache.clear();
1139        if let Some(&rel_id) = self.name_to_rel.get(name) {
1140            self.join_index_cache.invalidate_rel(rel_id);
1141        }
1142        self.store.remove(name)
1143    }
1144
1145    /// Register a relation name for a RelId
1146    ///
1147    /// This mapping is used when executing Scan nodes to look up relations
1148    /// by their RelId.
1149    ///
1150    /// # Arguments
1151    /// * `rel_id` - The relation identifier
1152    /// * `name` - The name to associate with the relation
1153    pub fn register_relation(&mut self, rel_id: RelId, name: &str) {
1154        self.rel_names.insert(rel_id, name.to_string());
1155        self.name_to_rel.insert(name.to_string(), rel_id);
1156        self.stats.register_relation(rel_id);
1157    }
1158
1159    /// Reverse-lookup a RelId by predicate name. Used by
1160    /// `execute_recursive_scc` to resolve a recursive predicate's
1161    /// full-rel RelId for `StatsManager::update_cardinality` calls at
1162    /// iteration boundaries. Returns `None` for unregistered names
1163    /// (defensive: production callers register IDB heads before
1164    /// `execute_plan`; tests that omit registration get a no-op stats
1165    /// update).
1166    fn name_to_rel_id(&self, name: &str) -> Option<RelId> {
1167        self.name_to_rel.get(name).copied()
1168    }
1169
1170    /// Get the relation name for a RelId
1171    fn get_rel_name(&self, rel_id: RelId) -> Option<&str> {
1172        self.rel_names.get(&rel_id).map(|s| s.as_str())
1173    }
1174
1175    /// Execute a baseline plan and conditionally adopt a compiler-supplied
1176    /// re-optimized candidate plan.
1177    ///
1178    /// The baseline runs through the normal [`Self::execute_plan`] path first,
1179    /// producing runtime join telemetry. If deterministic mis-plan thresholds
1180    /// fire and adaptive re-optimization is enabled, the candidate also runs
1181    /// through [`Self::execute_plan`]. Candidate outputs are compared on the GPU
1182    /// with deterministic full-row set difference; divergent or failing
1183    /// candidates roll back to the baseline relation/statistics snapshot.
1184    pub fn execute_plan_with_adaptive_candidate(
1185        &mut self,
1186        baseline_plan: &ExecutionPlan,
1187        candidate_plan: &ExecutionPlan,
1188    ) -> Result<CudaBuffer> {
1189        self.adaptive_reoptimization_stats.invocations = self
1190            .adaptive_reoptimization_stats
1191            .invocations
1192            .saturating_add(1);
1193        self.adaptive_reoptimization_stats.diagnostics.clear();
1194        let before_dtoh_calls = self.provider.host_transfer_stats().dtoh_calls;
1195
1196        self.execute_plan(baseline_plan)?;
1197        let baseline_observations = self.adaptive_join_observations.clone();
1198        self.adaptive_reoptimization_stats.last_observations = baseline_observations.clone();
1199        let decision = self.adaptive_reoptimization_decision(&baseline_observations);
1200        self.adaptive_reoptimization_stats.last_decision = Some(decision.clone());
1201
1202        match decision.action {
1203            AdaptiveReoptimizationAction::Disabled => {
1204                self.adaptive_reoptimization_stats.disabled = self
1205                    .adaptive_reoptimization_stats
1206                    .disabled
1207                    .saturating_add(1);
1208                self.record_adaptive_dtoh_delta(before_dtoh_calls);
1209                return self.clone_final_plan_output(baseline_plan);
1210            }
1211            AdaptiveReoptimizationAction::Skipped => {
1212                self.adaptive_reoptimization_stats.skipped =
1213                    self.adaptive_reoptimization_stats.skipped.saturating_add(1);
1214                self.record_adaptive_dtoh_delta(before_dtoh_calls);
1215                return self.clone_final_plan_output(baseline_plan);
1216            }
1217            AdaptiveReoptimizationAction::AttemptCandidate => {}
1218            AdaptiveReoptimizationAction::Adopted | AdaptiveReoptimizationAction::RolledBack => {
1219                unreachable!("decision replay never returns terminal adaptive actions")
1220            }
1221        }
1222
1223        let head_names = Self::plan_head_names(baseline_plan);
1224        let baseline_snapshot = self.clone_store_snapshot()?;
1225        let baseline_stats_snapshot = self.stats_snapshot();
1226
1227        if let Err(err) = self.execute_plan(candidate_plan) {
1228            self.restore_store_snapshot(baseline_snapshot);
1229            self.restore_stats_snapshot(&baseline_stats_snapshot);
1230            self.adaptive_reoptimization_stats.rolled_back = self
1231                .adaptive_reoptimization_stats
1232                .rolled_back
1233                .saturating_add(1);
1234            self.adaptive_reoptimization_stats
1235                .diagnostics
1236                .push(AdaptiveReoptimizationDiagnostic {
1237                    kind: AdaptiveReoptimizationDiagnosticKind::CandidateExecutionFailed,
1238                    message: err.to_string(),
1239                });
1240            self.adaptive_reoptimization_stats
1241                .diagnostics
1242                .push(AdaptiveReoptimizationDiagnostic {
1243                    kind: AdaptiveReoptimizationDiagnosticKind::RollbackRestoredBaseline,
1244                    message: "baseline_snapshot_restored".to_string(),
1245                });
1246            self.adaptive_reoptimization_stats.last_observations = baseline_observations;
1247            self.adaptive_reoptimization_stats.last_decision =
1248                Some(AdaptiveReoptimizationDecision {
1249                    action: AdaptiveReoptimizationAction::RolledBack,
1250                    reason: "candidate_execution_failed".to_string(),
1251                    max_misplan_ratio: decision.max_misplan_ratio,
1252                    min_misplan_ratio: decision.min_misplan_ratio,
1253                });
1254            self.record_adaptive_dtoh_delta(before_dtoh_calls);
1255            return self.clone_final_plan_output(baseline_plan);
1256        }
1257
1258        if !self.plan_outputs_match(&head_names, &baseline_snapshot)? {
1259            self.restore_store_snapshot(baseline_snapshot);
1260            self.restore_stats_snapshot(&baseline_stats_snapshot);
1261            self.adaptive_reoptimization_stats.rolled_back = self
1262                .adaptive_reoptimization_stats
1263                .rolled_back
1264                .saturating_add(1);
1265            self.adaptive_reoptimization_stats
1266                .diagnostics
1267                .push(AdaptiveReoptimizationDiagnostic {
1268                    kind: AdaptiveReoptimizationDiagnosticKind::CandidateOutputMismatch,
1269                    message: "candidate_output_mismatch".to_string(),
1270                });
1271            self.adaptive_reoptimization_stats
1272                .diagnostics
1273                .push(AdaptiveReoptimizationDiagnostic {
1274                    kind: AdaptiveReoptimizationDiagnosticKind::RollbackRestoredBaseline,
1275                    message: "baseline_snapshot_restored".to_string(),
1276                });
1277            self.adaptive_reoptimization_stats.last_observations = baseline_observations;
1278            self.adaptive_reoptimization_stats.last_decision =
1279                Some(AdaptiveReoptimizationDecision {
1280                    action: AdaptiveReoptimizationAction::RolledBack,
1281                    reason: "candidate_output_mismatch".to_string(),
1282                    max_misplan_ratio: decision.max_misplan_ratio,
1283                    min_misplan_ratio: decision.min_misplan_ratio,
1284                });
1285            self.record_adaptive_dtoh_delta(before_dtoh_calls);
1286            return self.clone_final_plan_output(baseline_plan);
1287        }
1288
1289        self.adaptive_reoptimization_stats.adopted =
1290            self.adaptive_reoptimization_stats.adopted.saturating_add(1);
1291        self.adaptive_reoptimization_stats.last_observations = baseline_observations;
1292        self.adaptive_reoptimization_stats.last_decision = Some(AdaptiveReoptimizationDecision {
1293            action: AdaptiveReoptimizationAction::Adopted,
1294            reason: "candidate_adopted".to_string(),
1295            max_misplan_ratio: decision.max_misplan_ratio,
1296            min_misplan_ratio: decision.min_misplan_ratio,
1297        });
1298        self.record_adaptive_dtoh_delta(before_dtoh_calls);
1299        self.clone_final_plan_output(candidate_plan)
1300    }
1301
1302    /// Execute a complete execution plan
1303    ///
1304    /// Iterates through strata in order, executing each one.
1305    /// Returns the result of the final query if present, or an empty buffer.
1306    ///
1307    /// # Arguments
1308    /// * `plan` - The execution plan to execute
1309    ///
1310    /// # Returns
1311    /// The result buffer from executing the plan
1312    ///
1313    /// # Errors
1314    /// Returns an error if any stratum or query execution fails
1315    pub fn execute_plan(&mut self, plan: &ExecutionPlan) -> Result<CudaBuffer> {
1316        self.adaptive_join_observations.clear();
1317        self.common_subexpression_cache.clear();
1318        // Opt-in deterministic-Datalog D2H gate. Enabled only for the
1319        // duration of this call; the provider is shared so we restore the
1320        // prior state on every exit path (including errors). This PR ships
1321        // the gate as opt-in only — known violating relational paths
1322        // (set difference, binary-join count/materialize) are scheduled for
1323        // replacement before the default flips.
1324        let gate = self.config.strict_deterministic_d2h;
1325        let prev_gate = self.provider.strict_deterministic_d2h_enabled();
1326        if gate && !prev_gate {
1327            // Only reset the violation counter when *this* call is what
1328            // engages the gate. If a caller has manually enabled the
1329            // gate to accumulate violations across a broader strict
1330            // section, we must not clobber their telemetry.
1331            self.provider.reset_deterministic_d2h_violations();
1332            self.provider.enable_strict_deterministic_d2h();
1333        }
1334        // Cloning the Arc keeps the guard independent of `self`, so the
1335        // guard can coexist with `&mut self` calls inside the strata loop.
1336        let _gate_guard = D2hGateGuard {
1337            provider: Arc::clone(&self.provider),
1338            engaged: gate,
1339            previous: prev_gate,
1340        };
1341
1342        // Execute strata in order
1343        for (idx, stratum) in plan.strata.iter().enumerate() {
1344            // Count rules and check if recursive
1345            let (num_rules, is_recursive) = stratum
1346                .sccs
1347                .iter()
1348                .map(|&scc_id| {
1349                    let rules = plan
1350                        .rules_by_scc
1351                        .get(scc_id as usize)
1352                        .map(|r| r.len())
1353                        .unwrap_or(0);
1354                    let recursive = plan
1355                        .sccs
1356                        .get(scc_id as usize)
1357                        .map(|s| s.is_recursive)
1358                        .unwrap_or(false);
1359                    (rules, recursive)
1360                })
1361                .fold((0, false), |(r, rec), (nr, nrec)| (r + nr, rec || nrec));
1362
1363            self.profiler.begin_stratum(idx, num_rules, is_recursive);
1364            self.execute_stratum_impl(stratum, plan)?;
1365
1366            // Record peak memory after stratum
1367            let mem_bytes = self.provider.memory().allocated_bytes();
1368            self.profiler.record_peak_memory(mem_bytes);
1369
1370            self.profiler.end_stratum();
1371        }
1372
1373        // Ensure all GPU work completes before returning control to callers.
1374        self.provider.device().synchronize()?;
1375        self.adaptive_reoptimization_stats.last_observations =
1376            self.adaptive_join_observations.clone();
1377
1378        // If there are no strata, return empty buffer
1379        self.provider.create_empty_buffer(Schema::new(vec![]))
1380    }
1381
1382    /// Execute a stratum (public API)
1383    ///
1384    /// This method cannot be called directly because stratum execution requires
1385    /// access to the full ExecutionPlan (for rules_by_scc mapping). Use
1386    /// `execute_plan` instead, which processes all strata with proper context.
1387    ///
1388    /// # Arguments
1389    /// * `_stratum` - The stratum (unused - see error)
1390    ///
1391    /// # Returns
1392    /// Evaluate a predicate expression for a single row
1393    #[cfg(test)]
1394    fn evaluate_predicate(
1395        expr: &Expr,
1396        columns: &[Vec<u8>],
1397        row_idx: usize,
1398        schema: &Schema,
1399    ) -> Result<bool> {
1400        match expr {
1401            Expr::Column(col_idx) => {
1402                // Interpret column value as boolean
1403                let col_type = schema.column_type(*col_idx);
1404                if let Some(ScalarType::Bool) = col_type {
1405                    Ok(columns
1406                        .get(*col_idx)
1407                        .map(|c| c.get(row_idx).copied().unwrap_or(0) != 0)
1408                        .unwrap_or(false))
1409                } else {
1410                    // Non-bool columns: check if non-zero
1411                    Ok(true)
1412                }
1413            }
1414
1415            Expr::Const(ConstValue::Bool(b)) => Ok(*b),
1416            Expr::Const(_) => Ok(true), // Non-bool constants are truthy
1417
1418            Expr::Compare { left, op, right } => {
1419                let use_float =
1420                    Self::expr_may_be_float(left, schema) || Self::expr_may_be_float(right, schema);
1421
1422                if use_float {
1423                    let left_val = Self::evaluate_expr_as_f64(left, columns, row_idx, schema)?;
1424                    let right_val = Self::evaluate_expr_as_f64(right, columns, row_idx, schema)?;
1425
1426                    Ok(match op {
1427                        CompareOp::Eq => left_val == right_val,
1428                        CompareOp::Ne => left_val != right_val,
1429                        CompareOp::Lt => left_val < right_val,
1430                        CompareOp::Le => left_val <= right_val,
1431                        CompareOp::Gt => left_val > right_val,
1432                        CompareOp::Ge => left_val >= right_val,
1433                    })
1434                } else {
1435                    let left_val = Self::evaluate_expr_as_i64(left, columns, row_idx, schema)?;
1436                    let right_val = Self::evaluate_expr_as_i64(right, columns, row_idx, schema)?;
1437
1438                    Ok(match op {
1439                        CompareOp::Eq => left_val == right_val,
1440                        CompareOp::Ne => left_val != right_val,
1441                        CompareOp::Lt => left_val < right_val,
1442                        CompareOp::Le => left_val <= right_val,
1443                        CompareOp::Gt => left_val > right_val,
1444                        CompareOp::Ge => left_val >= right_val,
1445                    })
1446                }
1447            }
1448
1449            Expr::And(exprs) => {
1450                for e in exprs {
1451                    if !Self::evaluate_predicate(e, columns, row_idx, schema)? {
1452                        return Ok(false);
1453                    }
1454                }
1455                Ok(true)
1456            }
1457
1458            Expr::Or(exprs) => {
1459                for e in exprs {
1460                    if Self::evaluate_predicate(e, columns, row_idx, schema)? {
1461                        return Ok(true);
1462                    }
1463                }
1464                Ok(false)
1465            }
1466
1467            Expr::Not(inner) => Ok(!Self::evaluate_predicate(inner, columns, row_idx, schema)?),
1468
1469            // Arithmetic expressions are not used as predicates directly
1470            Expr::Add(_, _)
1471            | Expr::Sub(_, _)
1472            | Expr::Mul(_, _)
1473            | Expr::Div(_, _)
1474            | Expr::Mod(_, _)
1475            | Expr::Abs(_)
1476            | Expr::Min(_, _)
1477            | Expr::Max(_, _)
1478            | Expr::Pow(_, _)
1479            | Expr::Cast(_, _)
1480            | Expr::Conditional { .. } => Err(XlogError::Execution(
1481                "Arithmetic expression cannot be evaluated as boolean predicate".into(),
1482            )),
1483        }
1484    }
1485
1486    #[cfg(test)]
1487    fn evaluate_expr_as_f64(
1488        expr: &Expr,
1489        columns: &[Vec<u8>],
1490        row_idx: usize,
1491        schema: &Schema,
1492    ) -> Result<f64> {
1493        match expr {
1494            Expr::Column(col_idx) => {
1495                let col_type = schema.column_type(*col_idx).unwrap_or(ScalarType::U32);
1496                let col_data = columns
1497                    .get(*col_idx)
1498                    .ok_or_else(|| XlogError::Execution(format!("Column {} not found", col_idx)))?;
1499
1500                let type_size = col_type.size_bytes();
1501                let start = row_idx * type_size;
1502
1503                Ok(match col_type {
1504                    ScalarType::F64 => {
1505                        let bytes = &col_data[start..start + 8];
1506                        f64::from_le_bytes([
1507                            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1508                            bytes[7],
1509                        ])
1510                    }
1511                    ScalarType::F32 => {
1512                        let bytes = &col_data[start..start + 4];
1513                        f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64
1514                    }
1515                    ScalarType::U32 | ScalarType::Symbol => {
1516                        let bytes = &col_data[start..start + 4];
1517                        u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64
1518                    }
1519                    ScalarType::I32 => {
1520                        let bytes = &col_data[start..start + 4];
1521                        i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64
1522                    }
1523                    ScalarType::U64 => {
1524                        let bytes = &col_data[start..start + 8];
1525                        u64::from_le_bytes([
1526                            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1527                            bytes[7],
1528                        ]) as f64
1529                    }
1530                    ScalarType::I64 => {
1531                        let bytes = &col_data[start..start + 8];
1532                        i64::from_le_bytes([
1533                            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1534                            bytes[7],
1535                        ]) as f64
1536                    }
1537                    ScalarType::Bool => col_data.get(start).copied().unwrap_or(0) as f64,
1538                })
1539            }
1540
1541            Expr::Const(val) => Ok(match val {
1542                ConstValue::U32(v) => *v as f64,
1543                ConstValue::I32(v) => *v as f64,
1544                ConstValue::U64(v) => *v as f64,
1545                ConstValue::I64(v) => *v as f64,
1546                ConstValue::Bool(b) => {
1547                    if *b {
1548                        1.0
1549                    } else {
1550                        0.0
1551                    }
1552                }
1553                ConstValue::F32(f) => *f as f64,
1554                ConstValue::F64(f) => *f,
1555                ConstValue::Symbol(_) => {
1556                    return Err(XlogError::Execution(
1557                        "Cannot evaluate Symbol constant as f64".to_string(),
1558                    ));
1559                }
1560            }),
1561
1562            Expr::Add(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1563                + Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?),
1564            Expr::Sub(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1565                - Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?),
1566            Expr::Mul(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1567                * Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?),
1568            Expr::Div(l, r) => {
1569                let left_val = Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?;
1570                let right_val = Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?;
1571                if right_val == 0.0 {
1572                    return Err(XlogError::Execution("Division by zero".to_string()));
1573                }
1574                Ok(left_val / right_val)
1575            }
1576            Expr::Mod(l, r) => {
1577                let left_val = Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?;
1578                let right_val = Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?;
1579                if right_val == 0.0 {
1580                    return Err(XlogError::Execution("Modulo by zero".to_string()));
1581                }
1582                Ok(left_val % right_val)
1583            }
1584            Expr::Abs(inner) => {
1585                Ok(Self::evaluate_expr_as_f64(inner, columns, row_idx, schema)?.abs())
1586            }
1587            Expr::Min(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1588                .min(Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?)),
1589            Expr::Max(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1590                .max(Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?)),
1591            Expr::Pow(base, exp) => Ok(Self::evaluate_expr_as_f64(base, columns, row_idx, schema)?
1592                .powf(Self::evaluate_expr_as_f64(exp, columns, row_idx, schema)?)),
1593            Expr::Cast(inner, target_type) => match target_type {
1594                ScalarType::F64 => Self::evaluate_expr_as_f64(inner, columns, row_idx, schema),
1595                ScalarType::F32 => {
1596                    Ok(Self::evaluate_expr_as_f64(inner, columns, row_idx, schema)? as f32 as f64)
1597                }
1598                _ => Ok(Self::evaluate_expr_as_i64(inner, columns, row_idx, schema)? as f64),
1599            },
1600
1601            _ => Err(XlogError::Execution(
1602                "Cannot evaluate compound expression as f64".to_string(),
1603            )),
1604        }
1605    }
1606
1607    /// Evaluate an expression as an i64 value
1608    #[cfg(test)]
1609    fn evaluate_expr_as_i64(
1610        expr: &Expr,
1611        columns: &[Vec<u8>],
1612        row_idx: usize,
1613        schema: &Schema,
1614    ) -> Result<i64> {
1615        match expr {
1616            Expr::Column(col_idx) => {
1617                let col_type = schema.column_type(*col_idx).unwrap_or(ScalarType::U32);
1618                let col_data = columns
1619                    .get(*col_idx)
1620                    .ok_or_else(|| XlogError::Execution(format!("Column {} not found", col_idx)))?;
1621
1622                let type_size = col_type.size_bytes();
1623                let start = row_idx * type_size;
1624
1625                Ok(match col_type {
1626                    ScalarType::U32 => {
1627                        let bytes = &col_data[start..start + 4];
1628                        u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64
1629                    }
1630                    ScalarType::I32 => {
1631                        let bytes = &col_data[start..start + 4];
1632                        i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64
1633                    }
1634                    ScalarType::U64 => {
1635                        let bytes = &col_data[start..start + 8];
1636                        u64::from_le_bytes([
1637                            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1638                            bytes[7],
1639                        ]) as i64
1640                    }
1641                    ScalarType::I64 => {
1642                        let bytes = &col_data[start..start + 8];
1643                        i64::from_le_bytes([
1644                            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1645                            bytes[7],
1646                        ])
1647                    }
1648                    ScalarType::Bool => col_data.get(start).copied().unwrap_or(0) as i64,
1649                    ScalarType::Symbol => {
1650                        let bytes = &col_data[start..start + 4];
1651                        u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64
1652                    }
1653                    ScalarType::F32 => {
1654                        let bytes = &col_data[start..start + 4];
1655                        let val = f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
1656                        val as i64
1657                    }
1658                    ScalarType::F64 => {
1659                        let bytes = &col_data[start..start + 8];
1660                        let val = f64::from_le_bytes([
1661                            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1662                            bytes[7],
1663                        ]);
1664                        val as i64
1665                    }
1666                })
1667            }
1668
1669            Expr::Const(val) => Ok(match val {
1670                ConstValue::U32(v) => *v as i64,
1671                ConstValue::I32(v) => *v as i64,
1672                ConstValue::U64(v) => *v as i64,
1673                ConstValue::I64(v) => *v,
1674                ConstValue::Bool(b) => *b as i64,
1675                ConstValue::F32(f) => *f as i64,
1676                ConstValue::F64(f) => *f as i64,
1677                ConstValue::Symbol(_) => 0,
1678            }),
1679
1680            // Arithmetic expressions - evaluate them and return the result
1681            Expr::Add(l, r) => {
1682                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1683                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1684                Ok(left_val.wrapping_add(right_val))
1685            }
1686            Expr::Sub(l, r) => {
1687                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1688                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1689                Ok(left_val.wrapping_sub(right_val))
1690            }
1691            Expr::Mul(l, r) => {
1692                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1693                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1694                Ok(left_val.wrapping_mul(right_val))
1695            }
1696            Expr::Div(l, r) => {
1697                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1698                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1699                if right_val == 0 {
1700                    return Err(XlogError::Execution("Division by zero".to_string()));
1701                }
1702                Ok(left_val / right_val)
1703            }
1704            Expr::Mod(l, r) => {
1705                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1706                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1707                if right_val == 0 {
1708                    return Err(XlogError::Execution("Modulo by zero".to_string()));
1709                }
1710                Ok(left_val % right_val)
1711            }
1712            Expr::Abs(inner) => {
1713                let val = Self::evaluate_expr_as_i64(inner, columns, row_idx, schema)?;
1714                Ok(val.abs())
1715            }
1716            Expr::Min(l, r) => {
1717                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1718                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1719                Ok(left_val.min(right_val))
1720            }
1721            Expr::Max(l, r) => {
1722                let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1723                let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1724                Ok(left_val.max(right_val))
1725            }
1726            Expr::Pow(base, exp) => {
1727                let base_val = Self::evaluate_expr_as_i64(base, columns, row_idx, schema)?;
1728                let exp_val = Self::evaluate_expr_as_i64(exp, columns, row_idx, schema)?;
1729                if exp_val < 0 {
1730                    Err(XlogError::Execution(
1731                        "Negative exponent in integer pow".to_string(),
1732                    ))
1733                } else if exp_val > u32::MAX as i64 {
1734                    // Exponent too large - would overflow anyway
1735                    Ok(i64::MAX)
1736                } else {
1737                    Ok(base_val.pow(exp_val as u32))
1738                }
1739            }
1740            Expr::Cast(inner, _target_type) => {
1741                // For i64 evaluation, cast is a no-op since we evaluate everything as i64
1742                Self::evaluate_expr_as_i64(inner, columns, row_idx, schema)
1743            }
1744
1745            _ => Err(XlogError::Execution(
1746                "Cannot evaluate compound expression as value".to_string(),
1747            )),
1748        }
1749    }
1750
1751    /// Get the relation name for a RelId, creating a default name if not registered
1752    fn get_or_create_rel_name(&mut self, rel_id: RelId, default: &str) -> String {
1753        if let Some(name) = self.rel_names.get(&rel_id) {
1754            name.clone()
1755        } else {
1756            self.register_relation(rel_id, default);
1757            default.to_string()
1758        }
1759    }
1760
1761    // ============== Helper methods ==============
1762
1763    /// Create an empty buffer with the given schema
1764    fn create_empty_buffer(&self, schema: Schema) -> Result<CudaBuffer> {
1765        self.provider.create_empty_buffer(schema)
1766    }
1767
1768    /// Clone a buffer (device-to-device copy)
1769    fn clone_buffer(&self, buffer: &CudaBuffer) -> Result<CudaBuffer> {
1770        if buffer.is_empty() {
1771            return self.create_empty_buffer(buffer.schema().clone());
1772        }
1773
1774        let mut result_columns = Vec::with_capacity(buffer.arity());
1775
1776        for col_idx in 0..buffer.arity() {
1777            let col_type_size = buffer
1778                .schema()
1779                .column_type(col_idx)
1780                .map(|t| t.size_bytes())
1781                .unwrap_or(4);
1782            let bytes = (buffer.num_rows() as usize) * col_type_size;
1783
1784            if let Some(src_col) = buffer.column(col_idx) {
1785                let mut dst_col = self.provider.memory().alloc::<u8>(bytes)?;
1786                if bytes > 0 {
1787                    self.provider
1788                        .device()
1789                        .inner()
1790                        .dtod_copy(src_col, &mut dst_col)
1791                        .map_err(|e| {
1792                            XlogError::Execution(format!("Failed to clone column on device: {}", e))
1793                        })?;
1794                }
1795                result_columns.push(dst_col.into());
1796            }
1797        }
1798
1799        let d_num_rows = self.clone_device_row_count(buffer)?;
1800        Ok(CudaBuffer::from_columns(
1801            result_columns,
1802            buffer.num_rows(),
1803            d_num_rows,
1804            buffer.schema().clone(),
1805        ))
1806    }
1807
1808    fn clone_device_row_count(&self, buffer: &CudaBuffer) -> Result<TrackedCudaSlice<u32>> {
1809        let mut d_num_rows = self.provider.memory().alloc::<u32>(1)?;
1810        self.provider
1811            .device()
1812            .inner()
1813            .dtod_copy(buffer.num_rows_device(), &mut d_num_rows)
1814            .map_err(|e| XlogError::Execution(format!("Failed to copy row count: {}", e)))?;
1815        Ok(d_num_rows)
1816    }
1817
1818    fn buffer_row_count(&self, buffer: &CudaBuffer) -> Result<u32> {
1819        if let Some(n) = buffer.cached_row_count() {
1820            return Ok(n);
1821        }
1822        // Metadata-only read: row counts are control-plane state, not
1823        // tuple data. Route through `dtoh_scalar_untracked` so the
1824        // metadata-vs-data-plane contract stays grepable and the
1825        // deterministic-D2H gate continues to allow it. Re-map the
1826        // provider-level `XlogError::Kernel` into `XlogError::Execution`
1827        // with the executor's historical "Failed to read row count"
1828        // context so callers see a consistent error category.
1829        let n = self
1830            .provider
1831            .dtoh_scalar_untracked::<u32>(buffer.num_rows_device(), 0)
1832            .map_err(|e| XlogError::Execution(format!("Failed to read row count: {}", e)))?;
1833        buffer.set_cached_row_count_if_unset(n);
1834        Ok(n)
1835    }
1836}
1837
1838/// RAII guard that restores the provider's deterministic-D2H gate state on
1839/// drop. Engaged only when `Executor::execute_plan` opted in via
1840/// `RuntimeConfig::strict_deterministic_d2h`.
1841struct D2hGateGuard {
1842    provider: Arc<CudaKernelProvider>,
1843    engaged: bool,
1844    previous: bool,
1845}
1846
1847impl Drop for D2hGateGuard {
1848    fn drop(&mut self) {
1849        if !self.engaged {
1850            return;
1851        }
1852        if self.previous {
1853            self.provider.enable_strict_deterministic_d2h();
1854        } else {
1855            self.provider.disable_strict_deterministic_d2h();
1856        }
1857    }
1858}
1859
1860#[cfg(test)]
1861mod tests {
1862    use super::*;
1863    use std::time::{Duration, Instant};
1864    use xlog_core::MemoryBudget;
1865    use xlog_cuda::{CudaDevice, GpuMemoryManager};
1866    use xlog_ir::{CompiledRule, RirMeta, Scc};
1867
1868    fn has_cuda_device() -> bool {
1869        // Check if CUDA device is available using CudaDevice wrapper
1870        CudaDevice::new(0).is_ok()
1871    }
1872
1873    fn create_test_executor() -> Option<Executor> {
1874        if !has_cuda_device() {
1875            return None;
1876        }
1877        let device = Arc::new(CudaDevice::new(0).ok()?);
1878        let budget = MemoryBudget::with_limit(1024 * 1024 * 1024); // 1 GB
1879        let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
1880        let provider = Arc::new(CudaKernelProvider::new(device, memory).ok()?);
1881        Some(Executor::new(provider))
1882    }
1883
1884    fn create_test_executor_with_config(config: RuntimeConfig) -> Option<Executor> {
1885        if !has_cuda_device() {
1886            return None;
1887        }
1888        let device = Arc::new(CudaDevice::new(0).ok()?);
1889        let budget = MemoryBudget::with_limit(1024 * 1024 * 1024); // 1 GB
1890        let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
1891        let provider = Arc::new(CudaKernelProvider::new(device, memory).ok()?);
1892        Some(Executor::new_with_config(provider, config))
1893    }
1894
1895    fn device_row_count(executor: &Executor, rows: u64) -> TrackedCudaSlice<u32> {
1896        let rows_u32 = u32::try_from(rows).expect("row count fits u32");
1897        let mut d_num_rows = executor.provider.memory().alloc::<u32>(1).expect("alloc");
1898        executor
1899            .provider
1900            .device()
1901            .inner()
1902            .htod_sync_copy_into(&[rows_u32], &mut d_num_rows)
1903            .expect("htod");
1904        d_num_rows
1905    }
1906
1907    fn create_test_buffer(executor: &Executor, data: &[u32], col_name: &str) -> CudaBuffer {
1908        let schema = Schema::new(vec![(col_name.to_string(), ScalarType::U32)]);
1909        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
1910
1911        let mut col = executor
1912            .provider
1913            .memory()
1914            .alloc::<u8>(bytes.len())
1915            .expect("alloc");
1916        executor
1917            .provider
1918            .device()
1919            .inner()
1920            .htod_sync_copy_into(&bytes, &mut col)
1921            .expect("htod");
1922
1923        let rows = data.len() as u64;
1924        let d_num_rows = device_row_count(executor, rows);
1925        CudaBuffer::from_columns(vec![col.into()], rows, d_num_rows, schema)
1926    }
1927
1928    fn read_buffer_u32(executor: &Executor, buffer: &CudaBuffer, col: usize) -> Vec<u32> {
1929        executor
1930            .provider
1931            .download_column::<u32>(buffer, col)
1932            .unwrap_or_default()
1933    }
1934
1935    fn buffer_row_count(executor: &Executor, buffer: &CudaBuffer) -> u32 {
1936        if let Some(n) = buffer.cached_row_count() {
1937            return n;
1938        }
1939        let mut host_rows = [0u32];
1940        executor
1941            .provider
1942            .device()
1943            .inner()
1944            .dtoh_sync_copy_into(buffer.num_rows_device(), &mut host_rows)
1945            .expect("dtoh row count");
1946        buffer.set_cached_row_count_if_unset(host_rows[0]);
1947        host_rows[0]
1948    }
1949
1950    fn to_f64_column_bytes(values: &[f64]) -> Vec<u8> {
1951        values.iter().flat_map(|v| v.to_le_bytes()).collect()
1952    }
1953
1954    fn to_f32_column_bytes(values: &[f32]) -> Vec<u8> {
1955        values.iter().flat_map(|v| v.to_le_bytes()).collect()
1956    }
1957
1958    // ============== Basic Executor Tests ==============
1959
1960    #[test]
1961    fn test_executor_creation() {
1962        let executor = match create_test_executor() {
1963            Some(e) => e,
1964            None => {
1965                eprintln!("Skipping test: no CUDA device available");
1966                return;
1967            }
1968        };
1969
1970        assert!(executor.store().is_empty());
1971    }
1972
1973    #[test]
1974    fn test_predicate_f64_comparisons() {
1975        let schema = Schema::new(vec![("x".to_string(), ScalarType::F64)]);
1976        let values = [1.0f64, 2.0, 3.0, f64::NAN];
1977        let columns = vec![to_f64_column_bytes(&values)];
1978
1979        let gt_two = Expr::Compare {
1980            left: Box::new(Expr::Column(0)),
1981            op: CompareOp::Gt,
1982            right: Box::new(Expr::Const(ConstValue::F64(2.0))),
1983        };
1984
1985        let results: Vec<bool> = (0..values.len())
1986            .map(|row| Executor::evaluate_predicate(&gt_two, &columns, row, &schema).unwrap())
1987            .collect();
1988        assert_eq!(results, vec![false, false, true, false]);
1989
1990        let eq_nan = Expr::Compare {
1991            left: Box::new(Expr::Column(0)),
1992            op: CompareOp::Eq,
1993            right: Box::new(Expr::Const(ConstValue::F64(f64::NAN))),
1994        };
1995        let results: Vec<bool> = (0..values.len())
1996            .map(|row| Executor::evaluate_predicate(&eq_nan, &columns, row, &schema).unwrap())
1997            .collect();
1998        assert_eq!(results, vec![false, false, false, false]);
1999
2000        let ne_nan = Expr::Compare {
2001            left: Box::new(Expr::Column(0)),
2002            op: CompareOp::Ne,
2003            right: Box::new(Expr::Const(ConstValue::F64(f64::NAN))),
2004        };
2005        let results: Vec<bool> = (0..values.len())
2006            .map(|row| Executor::evaluate_predicate(&ne_nan, &columns, row, &schema).unwrap())
2007            .collect();
2008        assert_eq!(results, vec![true, true, true, true]);
2009    }
2010
2011    #[test]
2012    fn test_predicate_f32_comparisons() {
2013        let schema = Schema::new(vec![("x".to_string(), ScalarType::F32)]);
2014        let values = [1.0f32, 2.0, 3.0, f32::NAN];
2015        let columns = vec![to_f32_column_bytes(&values)];
2016
2017        let le_two = Expr::Compare {
2018            left: Box::new(Expr::Column(0)),
2019            op: CompareOp::Le,
2020            right: Box::new(Expr::Const(ConstValue::F32(2.0))),
2021        };
2022
2023        let results: Vec<bool> = (0..values.len())
2024            .map(|row| Executor::evaluate_predicate(&le_two, &columns, row, &schema).unwrap())
2025            .collect();
2026        assert_eq!(results, vec![true, true, false, false]);
2027    }
2028
2029    #[test]
2030    fn test_predicate_mixed_float_int_comparisons() {
2031        let schema = Schema::new(vec![
2032            ("x".to_string(), ScalarType::F64),
2033            ("y".to_string(), ScalarType::U32),
2034        ]);
2035
2036        let x = [1.5f64, 2.0, 2.5];
2037        let y = [1u32, 2, 3];
2038        let columns = vec![
2039            to_f64_column_bytes(&x),
2040            y.iter().flat_map(|v| v.to_le_bytes()).collect(),
2041        ];
2042
2043        let x_gt_2 = Expr::Compare {
2044            left: Box::new(Expr::Column(0)),
2045            op: CompareOp::Gt,
2046            right: Box::new(Expr::Const(ConstValue::U32(2))),
2047        };
2048        let results: Vec<bool> = (0..x.len())
2049            .map(|row| Executor::evaluate_predicate(&x_gt_2, &columns, row, &schema).unwrap())
2050            .collect();
2051        assert_eq!(results, vec![false, false, true]);
2052
2053        let y_lt_2_5 = Expr::Compare {
2054            left: Box::new(Expr::Column(1)),
2055            op: CompareOp::Lt,
2056            right: Box::new(Expr::Const(ConstValue::F64(2.5))),
2057        };
2058        let results: Vec<bool> = (0..y.len())
2059            .map(|row| Executor::evaluate_predicate(&y_lt_2_5, &columns, row, &schema).unwrap())
2060            .collect();
2061        assert_eq!(results, vec![true, true, false]);
2062    }
2063
2064    #[test]
2065    fn test_register_and_get_relation() {
2066        let mut executor = match create_test_executor() {
2067            Some(e) => e,
2068            None => {
2069                eprintln!("Skipping test: no CUDA device available");
2070                return;
2071            }
2072        };
2073
2074        // Register a relation
2075        executor.register_relation(RelId(1), "test_rel");
2076
2077        // Verify mapping
2078        assert_eq!(executor.get_rel_name(RelId(1)), Some("test_rel"));
2079        assert_eq!(executor.get_rel_name(RelId(2)), None);
2080    }
2081
2082    // ============== Scan Node Tests ==============
2083
2084    #[test]
2085    fn test_execute_scan_not_found() {
2086        let mut executor = match create_test_executor() {
2087            Some(e) => e,
2088            None => {
2089                eprintln!("Skipping test: no CUDA device available");
2090                return;
2091            }
2092        };
2093
2094        executor.register_relation(RelId(1), "missing_rel");
2095
2096        let node = RirNode::Scan { rel: RelId(1) };
2097        let result = executor.execute_node(&node);
2098
2099        assert!(result.is_err());
2100    }
2101
2102    #[test]
2103    fn test_execute_scan_success() {
2104        let mut executor = match create_test_executor() {
2105            Some(e) => e,
2106            None => {
2107                eprintln!("Skipping test: no CUDA device available");
2108                return;
2109            }
2110        };
2111
2112        // Create and store a buffer
2113        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2114        executor.store_mut().put("test_rel", buffer);
2115        executor.register_relation(RelId(1), "test_rel");
2116
2117        // Execute scan
2118        let node = RirNode::Scan { rel: RelId(1) };
2119        let result = executor.execute_node(&node);
2120
2121        assert!(result.is_ok());
2122        let result = result.unwrap();
2123        assert_eq!(buffer_row_count(&executor, &result), 5);
2124
2125        let values = read_buffer_u32(&executor, &result, 0);
2126        assert_eq!(values, vec![1, 2, 3, 4, 5]);
2127    }
2128
2129    // ============== Filter Node Tests ==============
2130
2131    #[test]
2132    fn test_execute_filter_empty_input() {
2133        let executor = match create_test_executor() {
2134            Some(e) => e,
2135            None => {
2136                eprintln!("Skipping test: no CUDA device available");
2137                return;
2138            }
2139        };
2140
2141        let schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2142        let empty = executor.create_empty_buffer(schema).unwrap();
2143
2144        let predicate = Expr::Const(ConstValue::Bool(true));
2145        let result = executor.execute_filter(&empty, &predicate);
2146
2147        assert!(result.is_ok());
2148        let result = result.unwrap();
2149        assert_eq!(buffer_row_count(&executor, &result), 0);
2150    }
2151
2152    #[test]
2153    fn test_execute_filter_all_match() {
2154        let executor = match create_test_executor() {
2155            Some(e) => e,
2156            None => {
2157                eprintln!("Skipping test: no CUDA device available");
2158                return;
2159            }
2160        };
2161
2162        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2163        let predicate = Expr::Const(ConstValue::Bool(true));
2164
2165        let result = executor.execute_filter(&buffer, &predicate);
2166        assert!(result.is_ok());
2167
2168        let result = result.unwrap();
2169        assert_eq!(buffer_row_count(&executor, &result), 5);
2170    }
2171
2172    #[test]
2173    fn test_execute_filter_none_match() {
2174        let executor = match create_test_executor() {
2175            Some(e) => e,
2176            None => {
2177                eprintln!("Skipping test: no CUDA device available");
2178                return;
2179            }
2180        };
2181
2182        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2183        let predicate = Expr::Const(ConstValue::Bool(false));
2184
2185        let result = executor.execute_filter(&buffer, &predicate);
2186        assert!(result.is_ok());
2187        let result = result.unwrap();
2188        assert_eq!(buffer_row_count(&executor, &result), 0);
2189    }
2190
2191    #[test]
2192    fn test_execute_filter_comparison() {
2193        let executor = match create_test_executor() {
2194            Some(e) => e,
2195            None => {
2196                eprintln!("Skipping test: no CUDA device available");
2197                return;
2198            }
2199        };
2200
2201        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2202
2203        // Filter: key > 3
2204        let predicate = Expr::Compare {
2205            left: Box::new(Expr::Column(0)),
2206            op: CompareOp::Gt,
2207            right: Box::new(Expr::Const(ConstValue::U32(3))),
2208        };
2209
2210        let result = executor.execute_filter(&buffer, &predicate);
2211        assert!(result.is_ok());
2212
2213        let result = result.unwrap();
2214        assert_eq!(buffer_row_count(&executor, &result), 2);
2215
2216        let values = read_buffer_u32(&executor, &result, 0);
2217        assert_eq!(values, vec![4, 5]);
2218    }
2219
2220    #[test]
2221    fn test_execute_filter_and() {
2222        let executor = match create_test_executor() {
2223            Some(e) => e,
2224            None => {
2225                eprintln!("Skipping test: no CUDA device available");
2226                return;
2227            }
2228        };
2229
2230        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2231
2232        // Filter: key >= 2 AND key <= 4
2233        let predicate = Expr::And(vec![
2234            Expr::Compare {
2235                left: Box::new(Expr::Column(0)),
2236                op: CompareOp::Ge,
2237                right: Box::new(Expr::Const(ConstValue::U32(2))),
2238            },
2239            Expr::Compare {
2240                left: Box::new(Expr::Column(0)),
2241                op: CompareOp::Le,
2242                right: Box::new(Expr::Const(ConstValue::U32(4))),
2243            },
2244        ]);
2245
2246        let result = executor.execute_filter(&buffer, &predicate);
2247        assert!(result.is_ok());
2248
2249        let result = result.unwrap();
2250        assert_eq!(buffer_row_count(&executor, &result), 3);
2251
2252        let values = read_buffer_u32(&executor, &result, 0);
2253        assert_eq!(values, vec![2, 3, 4]);
2254    }
2255
2256    // ============== Project Node Tests ==============
2257
2258    #[test]
2259    fn test_execute_project_empty_input() {
2260        let executor = match create_test_executor() {
2261            Some(e) => e,
2262            None => {
2263                eprintln!("Skipping test: no CUDA device available");
2264                return;
2265            }
2266        };
2267
2268        let schema = Schema::new(vec![
2269            ("a".to_string(), ScalarType::U32),
2270            ("b".to_string(), ScalarType::U32),
2271        ]);
2272        let empty = executor.create_empty_buffer(schema).unwrap();
2273
2274        let result = executor.execute_project(&empty, &[ProjectExpr::Column(0)]);
2275        assert!(result.is_ok());
2276
2277        let result = result.unwrap();
2278        assert_eq!(buffer_row_count(&executor, &result), 0);
2279        assert_eq!(result.arity(), 1);
2280    }
2281
2282    #[test]
2283    fn test_execute_project_reorder() {
2284        let executor = match create_test_executor() {
2285            Some(e) => e,
2286            None => {
2287                eprintln!("Skipping test: no CUDA device available");
2288                return;
2289            }
2290        };
2291
2292        // Create a 2-column buffer
2293        let schema = Schema::new(vec![
2294            ("a".to_string(), ScalarType::U32),
2295            ("b".to_string(), ScalarType::U32),
2296        ]);
2297
2298        let a_data: Vec<u8> = [1u32, 2, 3].iter().flat_map(|v| v.to_le_bytes()).collect();
2299        let b_data: Vec<u8> = [10u32, 20, 30]
2300            .iter()
2301            .flat_map(|v| v.to_le_bytes())
2302            .collect();
2303
2304        let mut col_a = executor
2305            .provider
2306            .memory()
2307            .alloc::<u8>(a_data.len())
2308            .unwrap();
2309        let mut col_b = executor
2310            .provider
2311            .memory()
2312            .alloc::<u8>(b_data.len())
2313            .unwrap();
2314
2315        executor
2316            .provider
2317            .device()
2318            .inner()
2319            .htod_sync_copy_into(&a_data, &mut col_a)
2320            .unwrap();
2321        executor
2322            .provider
2323            .device()
2324            .inner()
2325            .htod_sync_copy_into(&b_data, &mut col_b)
2326            .unwrap();
2327
2328        let d_num_rows = device_row_count(&executor, 3);
2329        let buffer =
2330            CudaBuffer::from_columns(vec![col_a.into(), col_b.into()], 3, d_num_rows, schema);
2331
2332        // Project: [b, a] (reverse order)
2333        let result =
2334            executor.execute_project(&buffer, &[ProjectExpr::Column(1), ProjectExpr::Column(0)]);
2335        assert!(result.is_ok());
2336
2337        let result = result.unwrap();
2338        assert_eq!(buffer_row_count(&executor, &result), 3);
2339        assert_eq!(result.arity(), 2);
2340
2341        // First column should be b's values
2342        let col0 = read_buffer_u32(&executor, &result, 0);
2343        assert_eq!(col0, vec![10, 20, 30]);
2344
2345        // Second column should be a's values
2346        let col1 = read_buffer_u32(&executor, &result, 1);
2347        assert_eq!(col1, vec![1, 2, 3]);
2348    }
2349
2350    #[test]
2351    fn test_execute_computed_projection_wiring() {
2352        // Test that ProjectExpr::Computed is handled correctly
2353        // Even if arithmetic stubs return errors, verify the flow is correct
2354        let executor = match create_test_executor() {
2355            Some(e) => e,
2356            None => {
2357                eprintln!("Skipping test: no CUDA device available");
2358                return;
2359            }
2360        };
2361
2362        // Create a 2-column buffer
2363        let schema = Schema::new(vec![
2364            ("a".to_string(), ScalarType::U32),
2365            ("b".to_string(), ScalarType::U32),
2366        ]);
2367
2368        let a_data: Vec<u8> = [10u32, 20, 30]
2369            .iter()
2370            .flat_map(|v| v.to_le_bytes())
2371            .collect();
2372        let b_data: Vec<u8> = [1u32, 2, 3].iter().flat_map(|v| v.to_le_bytes()).collect();
2373
2374        let mut col_a = executor
2375            .provider
2376            .memory()
2377            .alloc::<u8>(a_data.len())
2378            .unwrap();
2379        let mut col_b = executor
2380            .provider
2381            .memory()
2382            .alloc::<u8>(b_data.len())
2383            .unwrap();
2384
2385        executor
2386            .provider
2387            .device()
2388            .inner()
2389            .htod_sync_copy_into(&a_data, &mut col_a)
2390            .unwrap();
2391        executor
2392            .provider
2393            .device()
2394            .inner()
2395            .htod_sync_copy_into(&b_data, &mut col_b)
2396            .unwrap();
2397
2398        let d_num_rows = device_row_count(&executor, 3);
2399        let buffer =
2400            CudaBuffer::from_columns(vec![col_a.into(), col_b.into()], 3, d_num_rows, schema);
2401
2402        // Project with computed expression: a + b
2403        let add_expr = Expr::Add(Box::new(Expr::Column(0)), Box::new(Expr::Column(1)));
2404        let projections = vec![
2405            ProjectExpr::Column(0),                           // Pass through column a
2406            ProjectExpr::Computed(add_expr, ScalarType::U32), // Compute a + b
2407        ];
2408
2409        let result = executor.execute_project(&buffer, &projections);
2410
2411        // The wiring should be correct - result depends on whether CUDA arithmetic kernels are available
2412        // If available: result has 2 columns with computed values
2413        // If not available: may return error from provider stubs
2414        match result {
2415            Ok(res) => {
2416                // Wiring worked and arithmetic kernels are available
2417                assert_eq!(buffer_row_count(&executor, &res), 3);
2418                assert_eq!(res.arity(), 2);
2419
2420                // First column should be a's values (pass-through)
2421                let col0 = read_buffer_u32(&executor, &res, 0);
2422                assert_eq!(col0, vec![10, 20, 30]);
2423
2424                // Second column should be a + b = [11, 22, 33]
2425                let col1 = read_buffer_u32(&executor, &res, 1);
2426                assert_eq!(col1, vec![11, 22, 33]);
2427            }
2428            Err(e) => {
2429                // Arithmetic kernels not available - that's OK for this test
2430                // The important thing is that the wiring reached the provider
2431                let err_msg = format!("{}", e);
2432                assert!(
2433                    err_msg.contains("not implemented")
2434                        || err_msg.contains("not yet implemented")
2435                        || err_msg.contains("not supported")
2436                        || err_msg.contains("stub")
2437                        || err_msg.contains("Unsupported")
2438                        || err_msg.contains("arithmetic kernels"),
2439                    "Unexpected error: {}. Expected arithmetic kernel stub error.",
2440                    err_msg
2441                );
2442            }
2443        }
2444    }
2445
2446    // ============== Union Node Tests ==============
2447
2448    #[test]
2449    fn test_execute_union_empty_inputs() {
2450        let executor = match create_test_executor() {
2451            Some(e) => e,
2452            None => {
2453                eprintln!("Skipping test: no CUDA device available");
2454                return;
2455            }
2456        };
2457
2458        let result = executor.execute_union(&[]);
2459        assert!(result.is_ok());
2460        let result = result.unwrap();
2461        assert_eq!(buffer_row_count(&executor, &result), 0);
2462    }
2463
2464    #[test]
2465    fn test_execute_union_single_input() {
2466        let executor = match create_test_executor() {
2467            Some(e) => e,
2468            None => {
2469                eprintln!("Skipping test: no CUDA device available");
2470                return;
2471            }
2472        };
2473
2474        let buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2475
2476        let result = executor.execute_union(&[buffer]);
2477        assert!(result.is_ok());
2478
2479        let result = result.unwrap();
2480        assert_eq!(buffer_row_count(&executor, &result), 3);
2481
2482        let values = read_buffer_u32(&executor, &result, 0);
2483        assert_eq!(values, vec![1, 2, 3]);
2484    }
2485
2486    #[test]
2487    fn test_execute_union_multiple_inputs() {
2488        let executor = match create_test_executor() {
2489            Some(e) => e,
2490            None => {
2491                eprintln!("Skipping test: no CUDA device available");
2492                return;
2493            }
2494        };
2495
2496        let buffer1 = create_test_buffer(&executor, &[1, 2], "key");
2497        let buffer2 = create_test_buffer(&executor, &[3, 4], "key");
2498        let buffer3 = create_test_buffer(&executor, &[5], "key");
2499
2500        let result = executor.execute_union(&[buffer1, buffer2, buffer3]);
2501        assert!(result.is_ok());
2502
2503        let result = result.unwrap();
2504        assert_eq!(buffer_row_count(&executor, &result), 5);
2505    }
2506
2507    // ============== Distinct Node Tests ==============
2508
2509    #[test]
2510    fn test_execute_distinct_empty() {
2511        let executor = match create_test_executor() {
2512            Some(e) => e,
2513            None => {
2514                eprintln!("Skipping test: no CUDA device available");
2515                return;
2516            }
2517        };
2518
2519        let schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2520        let empty = executor.create_empty_buffer(schema).unwrap();
2521
2522        let result = executor.execute_distinct(&empty, &[0]);
2523        assert!(result.is_ok());
2524        let result = result.unwrap();
2525        assert_eq!(buffer_row_count(&executor, &result), 0);
2526    }
2527
2528    // ============== Diff Node Tests ==============
2529
2530    #[test]
2531    fn test_execute_diff() {
2532        let executor = match create_test_executor() {
2533            Some(e) => e,
2534            None => {
2535                eprintln!("Skipping test: no CUDA device available");
2536                return;
2537            }
2538        };
2539
2540        let left = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2541        let right = create_test_buffer(&executor, &[2, 4], "key");
2542
2543        let result = executor.execute_diff(&left, &right);
2544        assert!(result.is_ok());
2545
2546        let result = result.unwrap();
2547        assert_eq!(buffer_row_count(&executor, &result), 3);
2548
2549        let values = read_buffer_u32(&executor, &result, 0);
2550        assert_eq!(values, vec![1, 3, 5]);
2551    }
2552
2553    // ============== Fixpoint Tests ==============
2554
2555    #[test]
2556    fn test_execute_fixpoint_base_only() {
2557        // Test fixpoint with a base case that reaches fixpoint immediately
2558        // (recursive step produces nothing new)
2559        let mut executor = match create_test_executor() {
2560            Some(e) => e,
2561            None => {
2562                eprintln!("Skipping test: no CUDA device available");
2563                return;
2564            }
2565        };
2566
2567        // Create base relation
2568        let buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2569        executor.store_mut().put("base_rel", buffer);
2570        executor.register_relation(RelId(1), "base_rel");
2571
2572        // Create an empty recursive relation (simulating a recursive step that produces nothing)
2573        let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2574        let empty_buffer = executor.create_empty_buffer(empty_schema).unwrap();
2575        executor.store_mut().put("empty_rel", empty_buffer);
2576        executor.register_relation(RelId(4), "empty_rel");
2577
2578        // Base: scan base_rel
2579        // Recursive: scan empty_rel (produces nothing new)
2580        let base = Box::new(RirNode::Scan { rel: RelId(1) });
2581        let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2582
2583        let node = RirNode::Fixpoint {
2584            scc_id: 0,
2585            base,
2586            recursive,
2587            delta_rel: RelId(2),
2588            full_rel: RelId(3),
2589        };
2590
2591        let result = executor.execute_node(&node);
2592        assert!(result.is_ok());
2593
2594        // Should return base case since recursive produces nothing
2595        let result = result.unwrap();
2596        assert_eq!(buffer_row_count(&executor, &result), 3);
2597        let values = read_buffer_u32(&executor, &result, 0);
2598        assert_eq!(values, vec![1, 2, 3]);
2599    }
2600
2601    #[test]
2602    fn test_execute_fixpoint_empty_base() {
2603        // Test fixpoint with empty base case
2604        let mut executor = match create_test_executor() {
2605            Some(e) => e,
2606            None => {
2607                eprintln!("Skipping test: no CUDA device available");
2608                return;
2609            }
2610        };
2611
2612        // Create empty base relation
2613        let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2614        let empty_buffer = executor.create_empty_buffer(empty_schema.clone()).unwrap();
2615        executor.store_mut().put("empty_base", empty_buffer);
2616        executor.register_relation(RelId(1), "empty_base");
2617
2618        // Create recursive relation (won't be used since base is empty)
2619        let rec_buffer = create_test_buffer(&executor, &[4, 5, 6], "key");
2620        executor.store_mut().put("rec_rel", rec_buffer);
2621        executor.register_relation(RelId(4), "rec_rel");
2622
2623        let base = Box::new(RirNode::Scan { rel: RelId(1) });
2624        let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2625
2626        let node = RirNode::Fixpoint {
2627            scc_id: 0,
2628            base,
2629            recursive,
2630            delta_rel: RelId(2),
2631            full_rel: RelId(3),
2632        };
2633
2634        let result = executor.execute_node(&node);
2635        assert!(result.is_ok());
2636
2637        // Should return empty since base is empty
2638        let result = result.unwrap();
2639        assert_eq!(buffer_row_count(&executor, &result), 0);
2640    }
2641
2642    #[test]
2643    fn test_execute_fixpoint_one_iteration() {
2644        // Test fixpoint that converges after one iteration
2645        let mut executor = match create_test_executor() {
2646            Some(e) => e,
2647            None => {
2648                eprintln!("Skipping test: no CUDA device available");
2649                return;
2650            }
2651        };
2652
2653        // Base: [1, 2]
2654        let base_buffer = create_test_buffer(&executor, &[1, 2], "key");
2655        executor.store_mut().put("base_rel", base_buffer);
2656        executor.register_relation(RelId(1), "base_rel");
2657
2658        // Recursive produces [1, 2, 3] - after diff with R, only [3] remains
2659        let rec_buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2660        executor.store_mut().put("rec_rel", rec_buffer);
2661        executor.register_relation(RelId(4), "rec_rel");
2662
2663        // After first iteration, R = [1, 2, 3], recursive produces [1, 2, 3] again
2664        // diff([1, 2, 3], [1, 2, 3]) = empty -> fixpoint reached
2665
2666        let base = Box::new(RirNode::Scan { rel: RelId(1) });
2667        let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2668
2669        let node = RirNode::Fixpoint {
2670            scc_id: 0,
2671            base,
2672            recursive,
2673            delta_rel: RelId(2),
2674            full_rel: RelId(3),
2675        };
2676
2677        let result = executor.execute_node(&node);
2678        assert!(result.is_ok());
2679
2680        let result = result.unwrap();
2681        // Result should be [1, 2, 3]
2682        assert_eq!(buffer_row_count(&executor, &result), 3);
2683    }
2684
2685    #[test]
2686    fn test_execute_fixpoint_multiple_iterations() {
2687        // Test fixpoint that requires multiple iterations to converge
2688        // This simulates transitive closure behavior
2689        let mut executor = match create_test_executor() {
2690            Some(e) => e,
2691            None => {
2692                eprintln!("Skipping test: no CUDA device available");
2693                return;
2694            }
2695        };
2696
2697        // Base: [1]
2698        let base_buffer = create_test_buffer(&executor, &[1], "key");
2699        executor.store_mut().put("base_rel", base_buffer);
2700        executor.register_relation(RelId(1), "base_rel");
2701
2702        // For this test, we need a recursive rule that can expand
2703        // Since we can't easily simulate join-based recursion without complex setup,
2704        // we'll test a simpler case where recursive produces cumulative data
2705
2706        // Recursive relation will produce [1, 2] in first iteration,
2707        // then [1, 2, 3] in second iteration, etc.
2708        // This requires a more complex setup, so let's test the basic convergence
2709
2710        // Simplified test: recursive produces union of base with [2]
2711        // First iteration: R=[1], rec produces [1, 2] -> delta_new = [2]
2712        // Second iteration: R=[1, 2], rec produces [1, 2] -> delta_new = empty
2713        let rec_buffer = create_test_buffer(&executor, &[1, 2], "key");
2714        executor.store_mut().put("rec_rel", rec_buffer);
2715        executor.register_relation(RelId(4), "rec_rel");
2716
2717        let base = Box::new(RirNode::Scan { rel: RelId(1) });
2718        let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2719
2720        let node = RirNode::Fixpoint {
2721            scc_id: 0,
2722            base,
2723            recursive,
2724            delta_rel: RelId(2),
2725            full_rel: RelId(3),
2726        };
2727
2728        let result = executor.execute_node(&node);
2729        assert!(result.is_ok());
2730
2731        let result = result.unwrap();
2732        // Result should be union of [1] and [2] = [1, 2]
2733        assert_eq!(buffer_row_count(&executor, &result), 2);
2734    }
2735
2736    #[test]
2737    fn test_execute_fixpoint_via_node() {
2738        // Test fixpoint through execute_node to ensure the match arm works
2739        let mut executor = match create_test_executor() {
2740            Some(e) => e,
2741            None => {
2742                eprintln!("Skipping test: no CUDA device available");
2743                return;
2744            }
2745        };
2746
2747        // Create and store a base buffer
2748        let buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2749        executor.store_mut().put("base_rel", buffer);
2750        executor.register_relation(RelId(1), "base_rel");
2751
2752        // Empty recursive means immediate fixpoint
2753        let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2754        let empty_buffer = executor.create_empty_buffer(empty_schema).unwrap();
2755        executor.store_mut().put("empty_rel", empty_buffer);
2756        executor.register_relation(RelId(4), "empty_rel");
2757
2758        let base = Box::new(RirNode::Scan { rel: RelId(1) });
2759        let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2760
2761        let node = RirNode::Fixpoint {
2762            scc_id: 0,
2763            base,
2764            recursive,
2765            delta_rel: RelId(2),
2766            full_rel: RelId(3),
2767        };
2768
2769        let result = executor.execute_node(&node);
2770        assert!(result.is_ok());
2771
2772        let result = result.unwrap();
2773        assert_eq!(buffer_row_count(&executor, &result), 3);
2774    }
2775
2776    #[test]
2777    fn test_fixpoint_cleanup() {
2778        // Test that fixpoint properly cleans up delta and full relations
2779        let mut executor = match create_test_executor() {
2780            Some(e) => e,
2781            None => {
2782                eprintln!("Skipping test: no CUDA device available");
2783                return;
2784            }
2785        };
2786
2787        let buffer = create_test_buffer(&executor, &[1, 2], "key");
2788        executor.store_mut().put("base_rel", buffer);
2789        executor.register_relation(RelId(1), "base_rel");
2790
2791        let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2792        let empty_buffer = executor.create_empty_buffer(empty_schema).unwrap();
2793        executor.store_mut().put("empty_rel", empty_buffer);
2794        executor.register_relation(RelId(4), "empty_rel");
2795
2796        // Register names for delta and full relations to check cleanup
2797        executor.register_relation(RelId(2), "__delta_test");
2798        executor.register_relation(RelId(3), "__full_test");
2799
2800        let base = Box::new(RirNode::Scan { rel: RelId(1) });
2801        let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2802
2803        let node = RirNode::Fixpoint {
2804            scc_id: 0,
2805            base,
2806            recursive,
2807            delta_rel: RelId(2),
2808            full_rel: RelId(3),
2809        };
2810
2811        let result = executor.execute_node(&node);
2812        assert!(result.is_ok());
2813
2814        // After fixpoint, the delta and full relations should be cleaned up
2815        assert!(!executor.store().contains("__delta_test"));
2816        assert!(!executor.store().contains("__full_test"));
2817    }
2818
2819    // ============== Execute Plan Tests ==============
2820
2821    #[test]
2822    fn test_execute_plan_empty() {
2823        let mut executor = match create_test_executor() {
2824            Some(e) => e,
2825            None => {
2826                eprintln!("Skipping test: no CUDA device available");
2827                return;
2828            }
2829        };
2830
2831        let plan = ExecutionPlan::new(vec![]);
2832
2833        let result = executor.execute_plan(&plan);
2834        assert!(result.is_ok());
2835        let result = result.unwrap();
2836        assert_eq!(buffer_row_count(&executor, &result), 0);
2837    }
2838
2839    #[test]
2840    fn test_execute_plan_with_stratum() {
2841        let mut executor = match create_test_executor() {
2842            Some(e) => e,
2843            None => {
2844                eprintln!("Skipping test: no CUDA device available");
2845                return;
2846            }
2847        };
2848
2849        // Create input relation
2850        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2851        executor.store_mut().put("input", buffer);
2852        executor.register_relation(RelId(1), "input");
2853
2854        // Build a simple plan
2855        let scc = Scc {
2856            id: 0,
2857            predicates: vec!["output".to_string()],
2858            is_recursive: false,
2859        };
2860
2861        let rule = CompiledRule {
2862            head: "output".to_string(),
2863            body: RirNode::Scan { rel: RelId(1) },
2864            meta: RirMeta::default(),
2865        };
2866
2867        let stratum = Stratum {
2868            id: 0,
2869            sccs: vec![0],
2870        };
2871
2872        let plan = ExecutionPlan {
2873            sccs: vec![scc],
2874            strata: vec![stratum],
2875            rules_by_scc: vec![vec![rule]],
2876            est_memory_peak: 0,
2877            rel_arities: std::collections::HashMap::new(),
2878        };
2879
2880        let result = executor.execute_plan(&plan);
2881        assert!(result.is_ok());
2882
2883        // Verify output relation was created
2884        assert!(executor.store().contains("output"));
2885        let output = executor.store().get("output").unwrap();
2886        assert_eq!(buffer_row_count(&executor, output), 5);
2887    }
2888
2889    #[test]
2890    fn test_apply_deltas_and_recompute_updates_dependents() {
2891        let mut executor = match create_test_executor() {
2892            Some(e) => e,
2893            None => {
2894                eprintln!("Skipping test: no CUDA device available");
2895                return;
2896            }
2897        };
2898
2899        let input = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2900        executor.store_mut().put("input", input);
2901        executor.register_relation(RelId(1), "input");
2902
2903        // SCC0: identity rule for input (mirrors how compiled facts appear as scan rules).
2904        // SCC1: output depends on input.
2905        let scc0 = Scc {
2906            id: 0,
2907            predicates: vec!["input".to_string()],
2908            is_recursive: false,
2909        };
2910        let scc1 = Scc {
2911            id: 1,
2912            predicates: vec!["output".to_string()],
2913            is_recursive: false,
2914        };
2915
2916        let input_rule = CompiledRule {
2917            head: "input".to_string(),
2918            body: RirNode::Scan { rel: RelId(1) },
2919            meta: RirMeta::default(),
2920        };
2921
2922        let output_rule = CompiledRule {
2923            head: "output".to_string(),
2924            body: RirNode::Filter {
2925                input: Box::new(RirNode::Scan { rel: RelId(1) }),
2926                predicate: Expr::Compare {
2927                    left: Box::new(Expr::Column(0)),
2928                    op: CompareOp::Gt,
2929                    right: Box::new(Expr::Const(ConstValue::U32(2))),
2930                },
2931            },
2932            meta: RirMeta::default(),
2933        };
2934
2935        let stratum = Stratum {
2936            id: 0,
2937            sccs: vec![0, 1],
2938        };
2939
2940        let plan = ExecutionPlan {
2941            sccs: vec![scc0, scc1],
2942            strata: vec![stratum],
2943            rules_by_scc: vec![vec![input_rule], vec![output_rule]],
2944            est_memory_peak: 0,
2945            rel_arities: std::collections::HashMap::new(),
2946        };
2947
2948        executor.execute_plan(&plan).expect("initial execute_plan");
2949        let initial_out = executor.store().get("output").expect("output missing");
2950        let initial_vals = read_buffer_u32(&executor, initial_out, 0);
2951        assert_eq!(initial_vals, vec![3, 4, 5]);
2952
2953        let delete_buf = create_test_buffer(&executor, &[5], "key");
2954        let insert_buf = create_test_buffer(&executor, &[10], "key");
2955
2956        let mut deltas = HashMap::new();
2957        deltas.insert(
2958            "input".to_string(),
2959            RelationDelta::new(Some(insert_buf), Some(delete_buf)),
2960        );
2961
2962        executor
2963            .apply_deltas_and_recompute(&plan, &deltas)
2964            .expect("apply_deltas_and_recompute");
2965
2966        let out = executor
2967            .store()
2968            .get("output")
2969            .expect("output missing after recompute");
2970        let vals = read_buffer_u32(&executor, out, 0);
2971        assert_eq!(vals, vec![3, 4, 10]);
2972    }
2973
2974    #[test]
2975    fn test_apply_deltas_and_recompute_insert_only_recomputes_anti_join() {
2976        let mut executor = match create_test_executor() {
2977            Some(e) => e,
2978            None => {
2979                eprintln!("Skipping test: no CUDA device available");
2980                return;
2981            }
2982        };
2983
2984        let lhs = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2985        executor.store_mut().put("lhs", lhs);
2986        executor.register_relation(RelId(1), "lhs");
2987
2988        let blocked = create_test_buffer(&executor, &[], "key");
2989        executor.store_mut().put("blocked", blocked);
2990        executor.register_relation(RelId(2), "blocked");
2991
2992        // SCC0: lhs identity rule
2993        // SCC1: blocked identity rule
2994        // SCC2: out = lhs \ blocked (anti-join)
2995        let scc0 = Scc {
2996            id: 0,
2997            predicates: vec!["lhs".to_string()],
2998            is_recursive: false,
2999        };
3000        let scc1 = Scc {
3001            id: 1,
3002            predicates: vec!["blocked".to_string()],
3003            is_recursive: false,
3004        };
3005        let scc2 = Scc {
3006            id: 2,
3007            predicates: vec!["out".to_string()],
3008            is_recursive: false,
3009        };
3010
3011        let lhs_rule = CompiledRule {
3012            head: "lhs".to_string(),
3013            body: RirNode::Scan { rel: RelId(1) },
3014            meta: RirMeta::default(),
3015        };
3016        let blocked_rule = CompiledRule {
3017            head: "blocked".to_string(),
3018            body: RirNode::Scan { rel: RelId(2) },
3019            meta: RirMeta::default(),
3020        };
3021        let out_rule = CompiledRule {
3022            head: "out".to_string(),
3023            body: RirNode::Join {
3024                left: Box::new(RirNode::Scan { rel: RelId(1) }),
3025                right: Box::new(RirNode::Scan { rel: RelId(2) }),
3026                left_keys: vec![0],
3027                right_keys: vec![0],
3028                join_type: JoinType::Anti,
3029            },
3030            meta: RirMeta::default(),
3031        };
3032
3033        let stratum = Stratum {
3034            id: 0,
3035            sccs: vec![0, 1, 2],
3036        };
3037
3038        let plan = ExecutionPlan {
3039            sccs: vec![scc0, scc1, scc2],
3040            strata: vec![stratum],
3041            rules_by_scc: vec![vec![lhs_rule], vec![blocked_rule], vec![out_rule]],
3042            est_memory_peak: 0,
3043            rel_arities: std::collections::HashMap::new(),
3044        };
3045
3046        executor.execute_plan(&plan).expect("initial execute_plan");
3047        let initial = executor.store().get("out").expect("out missing");
3048        let initial_vals = read_buffer_u32(&executor, initial, 0);
3049        assert_eq!(initial_vals, vec![1, 2, 3, 4, 5]);
3050
3051        // Insert into the "blocked" relation: output should shrink.
3052        let insert_buf = create_test_buffer(&executor, &[2, 4], "key");
3053        let mut deltas = HashMap::new();
3054        deltas.insert(
3055            "blocked".to_string(),
3056            RelationDelta::new(Some(insert_buf), None),
3057        );
3058
3059        executor
3060            .apply_deltas_and_recompute(&plan, &deltas)
3061            .expect("apply_deltas_and_recompute");
3062
3063        let out = executor
3064            .store()
3065            .get("out")
3066            .expect("out missing after update");
3067        let vals = read_buffer_u32(&executor, out, 0);
3068        assert_eq!(vals, vec![1, 3, 5]);
3069    }
3070
3071    // ============== RIR Node Composition Tests ==============
3072
3073    #[test]
3074    fn test_execute_filter_project_chain() {
3075        let mut executor = match create_test_executor() {
3076            Some(e) => e,
3077            None => {
3078                eprintln!("Skipping test: no CUDA device available");
3079                return;
3080            }
3081        };
3082
3083        // Create input relation
3084        let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
3085        executor.store_mut().put("input", buffer);
3086        executor.register_relation(RelId(1), "input");
3087
3088        // Build: Project(Filter(Scan))
3089        let scan = RirNode::Scan { rel: RelId(1) };
3090        let filter = RirNode::Filter {
3091            input: Box::new(scan),
3092            predicate: Expr::Compare {
3093                left: Box::new(Expr::Column(0)),
3094                op: CompareOp::Gt,
3095                right: Box::new(Expr::Const(ConstValue::U32(2))),
3096            },
3097        };
3098        let project = RirNode::Project {
3099            input: Box::new(filter),
3100            columns: vec![ProjectExpr::Column(0)],
3101        };
3102
3103        let result = executor.execute_node(&project);
3104        assert!(result.is_ok());
3105
3106        let result = result.unwrap();
3107        assert_eq!(buffer_row_count(&executor, &result), 3);
3108
3109        let values = read_buffer_u32(&executor, &result, 0);
3110        assert_eq!(values, vec![3, 4, 5]);
3111    }
3112
3113    // ============== Common Subexpression Elimination Tests ==============
3114
3115    fn duplicate_join_union_plan() -> RirNode {
3116        let join = RirNode::Join {
3117            left: Box::new(RirNode::Scan { rel: RelId(1) }),
3118            right: Box::new(RirNode::Scan { rel: RelId(2) }),
3119            left_keys: vec![0],
3120            right_keys: vec![0],
3121            join_type: JoinType::Inner,
3122        };
3123        RirNode::Union {
3124            inputs: vec![join.clone(), join],
3125        }
3126    }
3127
3128    fn seed_cse_join_fixture(executor: &mut Executor, right: &[u32]) {
3129        executor.register_relation(RelId(1), "left");
3130        executor.register_relation(RelId(2), "right");
3131        let left = create_test_buffer(executor, &[1, 2, 3, 4], "key");
3132        let right = create_test_buffer(executor, right, "key");
3133        executor.put_relation("left", left);
3134        executor.put_relation("right", right);
3135    }
3136
3137    #[test]
3138    fn test_common_subexpression_cache_reuses_duplicate_inner_join_when_enabled() {
3139        let mut executor = match create_test_executor_with_config(
3140            RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3141        ) {
3142            Some(e) => e,
3143            None => {
3144                eprintln!("Skipping test: no CUDA device available");
3145                return;
3146            }
3147        };
3148        seed_cse_join_fixture(&mut executor, &[2, 3, 5]);
3149
3150        let result = executor
3151            .execute_node(&duplicate_join_union_plan())
3152            .expect("duplicate join union executes");
3153
3154        assert_eq!(buffer_row_count(&executor, &result), 2);
3155        let stats = executor.common_subexpression_stats();
3156        assert_eq!(stats.hits, 1);
3157        assert!(stats.misses >= 1);
3158        assert_eq!(stats.unsafe_rejections, 0);
3159    }
3160
3161    #[test]
3162    fn test_common_subexpression_off_on_preserves_output_and_records_reuse_only_when_enabled() {
3163        let mut disabled = match create_test_executor_with_config(
3164            RuntimeConfig::default().with_common_subexpression_elimination(Some(false)),
3165        ) {
3166            Some(e) => e,
3167            None => {
3168                eprintln!("Skipping test: no CUDA device available");
3169                return;
3170            }
3171        };
3172        let mut enabled = match create_test_executor_with_config(
3173            RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3174        ) {
3175            Some(e) => e,
3176            None => {
3177                eprintln!("Skipping test: no CUDA device available");
3178                return;
3179            }
3180        };
3181        seed_cse_join_fixture(&mut disabled, &[2, 3, 5]);
3182        seed_cse_join_fixture(&mut enabled, &[2, 3, 5]);
3183        let plan = duplicate_join_union_plan();
3184
3185        disabled.provider.reset_d2h_transfer_count();
3186        enabled.provider.reset_d2h_transfer_count();
3187        let disabled_result = disabled.execute_node(&plan).expect("disabled CSE output");
3188        let enabled_result = enabled.execute_node(&plan).expect("enabled CSE output");
3189        let disabled_d2h = disabled.provider.d2h_transfer_count();
3190        let enabled_d2h = enabled.provider.d2h_transfer_count();
3191
3192        assert_eq!(
3193            read_buffer_u32(&disabled, &disabled_result, 0),
3194            read_buffer_u32(&enabled, &enabled_result, 0)
3195        );
3196        assert_eq!(enabled_d2h, disabled_d2h);
3197        assert_eq!(disabled.common_subexpression_stats().hits, 0);
3198        assert_eq!(disabled.common_subexpression_stats().misses, 0);
3199        assert_eq!(enabled.common_subexpression_stats().hits, 1);
3200    }
3201
3202    #[test]
3203    fn test_common_subexpression_cache_invalidates_on_relation_generation_change() {
3204        let mut executor = match create_test_executor_with_config(
3205            RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3206        ) {
3207            Some(e) => e,
3208            None => {
3209                eprintln!("Skipping test: no CUDA device available");
3210                return;
3211            }
3212        };
3213        seed_cse_join_fixture(&mut executor, &[2, 3, 5]);
3214        let plan = duplicate_join_union_plan();
3215
3216        executor.execute_node(&plan).expect("first execution");
3217        assert_eq!(executor.common_subexpression_stats().hits, 1);
3218
3219        let changed_right = create_test_buffer(&executor, &[4], "key");
3220        executor.put_relation("right", changed_right);
3221        let result = executor.execute_node(&plan).expect("second execution");
3222
3223        assert_eq!(buffer_row_count(&executor, &result), 1);
3224        let stats = executor.common_subexpression_stats();
3225        assert_eq!(stats.hits, 2);
3226        assert!(stats.misses >= 2);
3227    }
3228
3229    #[test]
3230    fn test_common_subexpression_cache_rejects_unsafe_difference_boundary() {
3231        let mut executor = match create_test_executor_with_config(
3232            RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3233        ) {
3234            Some(e) => e,
3235            None => {
3236                eprintln!("Skipping test: no CUDA device available");
3237                return;
3238            }
3239        };
3240        seed_cse_join_fixture(&mut executor, &[2, 3, 5]);
3241        let diff = RirNode::Diff {
3242            left: Box::new(RirNode::Scan { rel: RelId(1) }),
3243            right: Box::new(RirNode::Scan { rel: RelId(2) }),
3244        };
3245
3246        executor
3247            .execute_node(&RirNode::Union {
3248                inputs: vec![diff.clone(), diff],
3249            })
3250            .expect("unsafe duplicate diff still executes without CSE sharing");
3251
3252        let stats = executor.common_subexpression_stats();
3253        assert_eq!(stats.hits, 0);
3254        assert!(stats.unsafe_rejections >= 1);
3255        assert!(stats
3256            .rejection_reasons
3257            .iter()
3258            .any(|reason| reason == "negation_or_difference_boundary"));
3259    }
3260
3261    #[test]
3262    fn test_common_subexpression_key_rejects_aggregate_and_tensor_boundaries() {
3263        let mut executor = match create_test_executor_with_config(
3264            RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3265        ) {
3266            Some(e) => e,
3267            None => {
3268                eprintln!("Skipping test: no CUDA device available");
3269                return;
3270            }
3271        };
3272        let aggregate = RirNode::GroupBy {
3273            input: Box::new(RirNode::Scan { rel: RelId(1) }),
3274            key_cols: vec![0],
3275            aggs: vec![(0, xlog_core::AggOp::Count)],
3276        };
3277        let tensor = RirNode::TensorMaskedJoin {
3278            mask_name: "W".to_string(),
3279            schema_size: 1,
3280            left_keys: vec![0],
3281            right_keys: vec![0],
3282            rel_index: vec![(RelId(1), "left".to_string())],
3283            head_rel_name: "head".to_string(),
3284            head_rel_id: RelId(3),
3285            max_active_rules: 1,
3286            head_projection: vec![0],
3287        };
3288        let chain = RirNode::ChainJoin {
3289            left: Box::new(RirNode::Scan { rel: RelId(1) }),
3290            right: Box::new(RirNode::Scan { rel: RelId(2) }),
3291            left_key: 0,
3292            right_key: 0,
3293            output_columns: vec![ProjectExpr::Column(0)],
3294            fallback: Box::new(RirNode::Join {
3295                left: Box::new(RirNode::Scan { rel: RelId(1) }),
3296                right: Box::new(RirNode::Scan { rel: RelId(2) }),
3297                left_keys: vec![0],
3298                right_keys: vec![0],
3299                join_type: JoinType::Inner,
3300            }),
3301        };
3302
3303        assert!(executor.common_subexpression_key(&aggregate).is_none());
3304        assert!(executor.common_subexpression_key(&tensor).is_none());
3305        assert!(executor.common_subexpression_key(&chain).is_none());
3306
3307        let reasons = &executor.common_subexpression_stats().rejection_reasons;
3308        assert!(reasons.iter().any(|reason| reason == "aggregate_boundary"));
3309        assert!(reasons
3310            .iter()
3311            .any(|reason| reason == "provenance_or_tensor_boundary"));
3312        assert!(reasons
3313            .iter()
3314            .any(|reason| reason == "specialized_dispatch_boundary"));
3315    }
3316
3317    // ============== Adaptive Runtime Re-Optimization Tests ==============
3318
3319    fn adaptive_scc() -> Scc {
3320        Scc {
3321            id: 0,
3322            predicates: vec!["out".to_string()],
3323            is_recursive: false,
3324        }
3325    }
3326
3327    fn adaptive_stratum() -> Stratum {
3328        Stratum {
3329            id: 0,
3330            sccs: vec![0],
3331        }
3332    }
3333
3334    fn adaptive_rule(body: RirNode) -> CompiledRule {
3335        CompiledRule {
3336            head: "out".to_string(),
3337            body,
3338            meta: RirMeta::default(),
3339        }
3340    }
3341
3342    fn adaptive_plan(body: RirNode) -> ExecutionPlan {
3343        ExecutionPlan {
3344            sccs: vec![adaptive_scc()],
3345            strata: vec![adaptive_stratum()],
3346            rules_by_scc: vec![vec![adaptive_rule(body)]],
3347            est_memory_peak: 0,
3348            rel_arities: std::collections::HashMap::new(),
3349        }
3350    }
3351
3352    fn adaptive_baseline_join_plan() -> ExecutionPlan {
3353        adaptive_plan(RirNode::Project {
3354            input: Box::new(RirNode::Join {
3355                left: Box::new(RirNode::Scan { rel: RelId(1) }),
3356                right: Box::new(RirNode::Scan { rel: RelId(2) }),
3357                left_keys: vec![0],
3358                right_keys: vec![0],
3359                join_type: JoinType::Inner,
3360            }),
3361            columns: vec![ProjectExpr::Column(0)],
3362        })
3363    }
3364
3365    fn adaptive_scan_candidate_plan(rel: RelId) -> ExecutionPlan {
3366        adaptive_plan(RirNode::Scan { rel })
3367    }
3368
3369    fn seed_adaptive_fixture(executor: &mut Executor, right: &[u32]) {
3370        executor.register_relation(RelId(1), "left");
3371        executor.register_relation(RelId(2), "right");
3372        let left = create_test_buffer(executor, &[1, 2, 3, 4, 5, 6, 7, 8], "key");
3373        let right = create_test_buffer(executor, right, "key");
3374        executor.put_relation("left", left);
3375        executor.put_relation("right", right);
3376    }
3377
3378    #[test]
3379    fn test_adaptive_reoptimization_disabled_uses_baseline_and_records_decision() {
3380        let mut executor = match create_test_executor_with_config(
3381            RuntimeConfig::default().with_adaptive_reoptimization(Some(false)),
3382        ) {
3383            Some(e) => e,
3384            None => {
3385                eprintln!("Skipping test: no CUDA device available");
3386                return;
3387            }
3388        };
3389        seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3390
3391        let baseline = adaptive_baseline_join_plan();
3392        let candidate = adaptive_scan_candidate_plan(RelId(2));
3393        let result = executor
3394            .execute_plan_with_adaptive_candidate(&baseline, &candidate)
3395            .expect("disabled adaptation executes baseline");
3396
3397        assert_eq!(
3398            read_buffer_u32(&executor, &result, 0),
3399            (1..=8).collect::<Vec<_>>()
3400        );
3401        let stats = executor.adaptive_reoptimization_stats();
3402        assert_eq!(stats.disabled, 1);
3403        assert_eq!(stats.adopted, 0);
3404        assert_eq!(stats.rolled_back, 0);
3405        assert_eq!(
3406            stats.last_decision.as_ref().map(|decision| decision.action),
3407            Some(AdaptiveReoptimizationAction::Disabled)
3408        );
3409    }
3410
3411    #[test]
3412    fn test_adaptive_reoptimization_adopts_equivalent_candidate_and_records_telemetry() {
3413        let mut executor = match create_test_executor_with_config(
3414            RuntimeConfig::default().with_adaptive_reoptimization(Some(true)),
3415        ) {
3416            Some(e) => e,
3417            None => {
3418                eprintln!("Skipping test: no CUDA device available");
3419                return;
3420            }
3421        };
3422        seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3423        executor.provider.reset_host_transfer_stats();
3424
3425        let baseline = adaptive_baseline_join_plan();
3426        let candidate = adaptive_scan_candidate_plan(RelId(1));
3427        let result = executor
3428            .execute_plan_with_adaptive_candidate(&baseline, &candidate)
3429            .expect("equivalent candidate is adopted");
3430
3431        assert_eq!(
3432            read_buffer_u32(&executor, &result, 0),
3433            (1..=8).collect::<Vec<_>>()
3434        );
3435        let stats = executor.adaptive_reoptimization_stats();
3436        assert_eq!(stats.adopted, 1);
3437        assert_eq!(stats.rolled_back, 0);
3438        assert_eq!(stats.last_observations.len(), 1);
3439        assert!(stats.last_observations[0].cardinality_delta_abs > 0);
3440        assert!(stats.last_observations[0].selectivity_delta_abs > 0.0);
3441        assert_eq!(stats.data_plane_dtoh_calls, 0);
3442    }
3443
3444    #[test]
3445    fn test_adaptive_reoptimization_rolls_back_bad_candidate_with_typed_diagnostic() {
3446        let mut executor = match create_test_executor_with_config(
3447            RuntimeConfig::default().with_adaptive_reoptimization(Some(true)),
3448        ) {
3449            Some(e) => e,
3450            None => {
3451                eprintln!("Skipping test: no CUDA device available");
3452                return;
3453            }
3454        };
3455        seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3456
3457        let baseline = adaptive_baseline_join_plan();
3458        let bad_candidate = adaptive_scan_candidate_plan(RelId(2));
3459        executor.put_relation("right", create_test_buffer(&executor, &[99], "key"));
3460        let result = executor
3461            .execute_plan_with_adaptive_candidate(&baseline, &bad_candidate)
3462            .expect("bad candidate rolls back to baseline output");
3463
3464        assert_eq!(read_buffer_u32(&executor, &result, 0), Vec::<u32>::new());
3465        let out = executor.store().get("out").expect("rollback restored out");
3466        assert_eq!(read_buffer_u32(&executor, out, 0), Vec::<u32>::new());
3467        let stats = executor.adaptive_reoptimization_stats();
3468        assert_eq!(stats.adopted, 0);
3469        assert_eq!(stats.rolled_back, 1);
3470        assert!(stats.diagnostics.iter().any(|diagnostic| {
3471            diagnostic.kind == AdaptiveReoptimizationDiagnosticKind::CandidateOutputMismatch
3472        }));
3473    }
3474
3475    #[test]
3476    fn test_adaptive_reoptimization_decisions_are_deterministic_under_replay() {
3477        let mut executor = match create_test_executor_with_config(
3478            RuntimeConfig::default().with_adaptive_reoptimization(Some(true)),
3479        ) {
3480            Some(e) => e,
3481            None => {
3482                eprintln!("Skipping test: no CUDA device available");
3483                return;
3484            }
3485        };
3486        seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3487        let baseline = adaptive_baseline_join_plan();
3488        executor
3489            .execute_plan(&baseline)
3490            .expect("baseline execution records telemetry");
3491        let observations = executor
3492            .adaptive_reoptimization_stats()
3493            .last_observations
3494            .clone();
3495
3496        let first = executor.replay_adaptive_reoptimization_decision(&observations);
3497        for _ in 0..100 {
3498            assert_eq!(
3499                executor.replay_adaptive_reoptimization_decision(&observations),
3500                first
3501            );
3502        }
3503    }
3504
3505    // ============== Persistent Hash Index Manager Tests ==============
3506
3507    fn persistent_index_join_plan() -> ExecutionPlan {
3508        adaptive_baseline_join_plan()
3509    }
3510
3511    fn persistent_index_heavy_join_plan(repetitions: usize) -> ExecutionPlan {
3512        let mut inputs = Vec::with_capacity(repetitions);
3513        for _ in 0..repetitions {
3514            inputs.push(RirNode::Project {
3515                input: Box::new(RirNode::Join {
3516                    left: Box::new(RirNode::Scan { rel: RelId(1) }),
3517                    right: Box::new(RirNode::Scan { rel: RelId(2) }),
3518                    left_keys: vec![0],
3519                    right_keys: vec![0],
3520                    join_type: JoinType::Semi,
3521                }),
3522                columns: vec![ProjectExpr::Column(0)],
3523            });
3524        }
3525        adaptive_plan(RirNode::Union { inputs })
3526    }
3527
3528    fn seed_persistent_index_fixture(executor: &mut Executor, rows: u32) {
3529        executor.register_relation(RelId(1), "left");
3530        executor.register_relation(RelId(2), "right");
3531        let values: Vec<u32> = (0..rows).collect();
3532        let left = create_test_buffer(executor, &values, "key");
3533        let right = create_test_buffer(executor, &values, "key");
3534        executor.put_relation("left", left);
3535        executor.put_relation("right", right);
3536    }
3537
3538    fn seed_persistent_index_performance_fixture(
3539        executor: &mut Executor,
3540        left_rows: u32,
3541        right_rows: u32,
3542    ) {
3543        executor.register_relation(RelId(1), "left");
3544        executor.register_relation(RelId(2), "right");
3545        let left_values: Vec<u32> = (0..left_rows).collect();
3546        let right_values: Vec<u32> = (0..right_rows).collect();
3547        let left = create_test_buffer(executor, &left_values, "key");
3548        let right = create_test_buffer(executor, &right_values, "key");
3549        executor.put_relation("left", left);
3550        executor.put_relation("right", right);
3551    }
3552
3553    fn warm_persistent_index(executor: &mut Executor, plan: &ExecutionPlan, times: usize) {
3554        for _ in 0..times {
3555            executor.execute_plan(plan).expect("persistent index plan");
3556        }
3557    }
3558
3559    fn median_duration(samples: &mut [Duration]) -> Duration {
3560        samples.sort_unstable();
3561        samples[samples.len() / 2]
3562    }
3563
3564    fn measure_persistent_index_fixture(
3565        mut executor: Executor,
3566        plan: &ExecutionPlan,
3567        warmup: usize,
3568        iterations: usize,
3569    ) -> (
3570        Duration,
3571        u64,
3572        JoinIndexCacheStats,
3573        xlog_cuda::provider::HostTransferStats,
3574    ) {
3575        let mut output_rows = None;
3576        warm_persistent_index(&mut executor, plan, warmup);
3577        executor.provider.reset_host_transfer_stats();
3578
3579        let mut samples = Vec::with_capacity(iterations);
3580        for _ in 0..iterations {
3581            let start = Instant::now();
3582            let output = executor.execute_plan(plan).expect("persistent index plan");
3583            executor
3584                .provider
3585                .device()
3586                .synchronize()
3587                .expect("sync device");
3588            samples.push(start.elapsed());
3589            output_rows = Some(if let Some(buffer) = executor.store().get("out") {
3590                executor
3591                    .buffer_row_count(buffer)
3592                    .expect("read output row count")
3593                    .into()
3594            } else {
3595                output.num_rows()
3596            });
3597        }
3598
3599        (
3600            median_duration(&mut samples),
3601            output_rows.expect("at least one measured execution"),
3602            executor.join_index_cache_stats(),
3603            executor.provider.host_transfer_stats(),
3604        )
3605    }
3606
3607    #[test]
3608    fn test_persistent_hash_index_reuses_across_repeated_session_evaluations() {
3609        let mut executor = match create_test_executor_with_config(
3610            RuntimeConfig::default().with_persistent_hash_indexes(Some(true)),
3611        ) {
3612            Some(e) => e,
3613            None => {
3614                eprintln!("Skipping test: no CUDA device available");
3615                return;
3616            }
3617        };
3618        seed_persistent_index_fixture(&mut executor, 2_500);
3619        let plan = persistent_index_join_plan();
3620        executor.provider.reset_host_transfer_stats();
3621
3622        warm_persistent_index(&mut executor, &plan, 5);
3623
3624        let stats = executor.join_index_cache_stats();
3625        let transfers = executor.provider.host_transfer_stats();
3626        assert_eq!(stats.builds, 1);
3627        assert!(stats.hits >= 1);
3628        assert_eq!(stats.stale_rejections, 0);
3629        assert_eq!(stats.entries, 1);
3630        assert!(stats.total_bytes > 0);
3631        assert_eq!(transfers.dtoh_calls, 0);
3632        assert_eq!(transfers.htod_calls, 0);
3633    }
3634
3635    #[test]
3636    fn test_persistent_hash_index_invalidates_on_relation_generation_change() {
3637        let mut executor = match create_test_executor_with_config(
3638            RuntimeConfig::default().with_persistent_hash_indexes(Some(true)),
3639        ) {
3640            Some(e) => e,
3641            None => {
3642                eprintln!("Skipping test: no CUDA device available");
3643                return;
3644            }
3645        };
3646        seed_persistent_index_fixture(&mut executor, 2_500);
3647        let plan = persistent_index_join_plan();
3648        warm_persistent_index(&mut executor, &plan, 5);
3649        assert_eq!(executor.join_index_cache_stats().entries, 1);
3650
3651        let changed_values: Vec<u32> = (10_000..12_500).collect();
3652        let changed_right = create_test_buffer(&executor, &changed_values, "key");
3653        executor.put_relation("right", changed_right);
3654
3655        let stats = executor.join_index_cache_stats();
3656        assert_eq!(stats.entries, 0);
3657        assert!(stats.invalidations >= 1);
3658    }
3659
3660    #[test]
3661    fn test_persistent_hash_index_background_build_records_requests() {
3662        let mut executor = match create_test_executor_with_config(
3663            RuntimeConfig::default()
3664                .with_persistent_hash_indexes(Some(true))
3665                .with_persistent_hash_index_background_build(Some(true)),
3666        ) {
3667            Some(e) => e,
3668            None => {
3669                eprintln!("Skipping test: no CUDA device available");
3670                return;
3671            }
3672        };
3673        seed_persistent_index_fixture(&mut executor, 2_500);
3674        let plan = persistent_index_join_plan();
3675
3676        warm_persistent_index(&mut executor, &plan, 5);
3677
3678        let stats = executor.join_index_cache_stats();
3679        assert_eq!(stats.background_build_requests, 1);
3680        assert_eq!(stats.background_builds_completed, 1);
3681        assert_eq!(stats.entries, 1);
3682    }
3683
3684    #[test]
3685    fn test_persistent_hash_index_background_build_defers_current_join_reuse() {
3686        let mut executor = match create_test_executor_with_config(
3687            RuntimeConfig::default()
3688                .with_persistent_hash_indexes(Some(true))
3689                .with_persistent_hash_index_background_build(Some(true)),
3690        ) {
3691            Some(e) => e,
3692            None => {
3693                eprintln!("Skipping test: no CUDA device available");
3694                return;
3695            }
3696        };
3697        seed_persistent_index_fixture(&mut executor, 2_500);
3698        let plan = persistent_index_join_plan();
3699
3700        let mut before_build = executor.join_index_cache_stats();
3701        let mut after_build = None;
3702        for _ in 0..5 {
3703            executor
3704                .execute_plan(&plan)
3705                .expect("background-build warm evaluation");
3706            let stats = executor.join_index_cache_stats();
3707            if stats.background_build_requests > before_build.background_build_requests {
3708                after_build = Some(stats);
3709                break;
3710            }
3711            before_build = stats;
3712        }
3713
3714        let after_first = after_build.expect("background build request observed");
3715        assert_eq!(after_first.background_build_requests, 1);
3716        assert_eq!(after_first.background_builds_completed, 1);
3717        assert_eq!(after_first.background_builds_deferred, 1);
3718        assert_eq!(
3719            after_first.hits, before_build.hits,
3720            "background build must not be consumed by the same evaluation that requested it"
3721        );
3722        assert_eq!(after_first.entries, 1);
3723
3724        executor
3725            .execute_plan(&plan)
3726            .expect("second evaluation reuses completed background index");
3727        let after_second = executor.join_index_cache_stats();
3728        assert_eq!(after_second.background_build_requests, 1);
3729        assert_eq!(after_second.background_builds_deferred, 1);
3730        assert!(after_second.hits >= 1);
3731    }
3732
3733    #[test]
3734    fn test_persistent_hash_index_performance_fixture_meets_speedup_target() {
3735        const LEFT_ROWS: u32 = 8;
3736        const RIGHT_ROWS: u32 = 8_000_000;
3737        const JOIN_REPETITIONS: usize = 1;
3738        const WARMUP: usize = 12;
3739        const ITERATIONS: usize = 9;
3740
3741        let mut cached = match create_test_executor_with_config(
3742            RuntimeConfig::default().with_persistent_hash_indexes(Some(true)),
3743        ) {
3744            Some(e) => e,
3745            None => {
3746                eprintln!("Skipping test: no CUDA device available");
3747                return;
3748            }
3749        };
3750        seed_persistent_index_performance_fixture(&mut cached, LEFT_ROWS, RIGHT_ROWS);
3751
3752        let mut uncached = match create_test_executor_with_config(
3753            RuntimeConfig::default().with_persistent_hash_indexes(Some(false)),
3754        ) {
3755            Some(e) => e,
3756            None => {
3757                eprintln!("Skipping test: no CUDA device available");
3758                return;
3759            }
3760        };
3761        seed_persistent_index_performance_fixture(&mut uncached, LEFT_ROWS, RIGHT_ROWS);
3762
3763        let plan = persistent_index_heavy_join_plan(JOIN_REPETITIONS);
3764        let (cached_median, cached_rows, cached_stats, cached_transfers) =
3765            measure_persistent_index_fixture(cached, &plan, WARMUP, ITERATIONS);
3766        let (uncached_median, uncached_rows, uncached_stats, uncached_transfers) =
3767            measure_persistent_index_fixture(uncached, &plan, WARMUP, ITERATIONS);
3768
3769        let speedup_ratio = uncached_median.as_secs_f64() / cached_median.as_secs_f64();
3770        eprintln!(
3771            "persistent_hash_index_perf left_rows={} right_rows={} join_repetitions={} warmup={} iterations={} \
3772             cached_median_sec={:.9} uncached_median_sec={:.9} speedup_ratio={:.3} \
3773             cached_output_rows={} uncached_output_rows={} cached_builds={} cached_hits={} \
3774             uncached_builds={} cached_dtoh_calls={} cached_htod_calls={}",
3775            LEFT_ROWS,
3776            RIGHT_ROWS,
3777            JOIN_REPETITIONS,
3778            WARMUP,
3779            ITERATIONS,
3780            cached_median.as_secs_f64(),
3781            uncached_median.as_secs_f64(),
3782            speedup_ratio,
3783            cached_rows,
3784            uncached_rows,
3785            cached_stats.builds,
3786            cached_stats.hits,
3787            uncached_stats.builds,
3788            cached_transfers.dtoh_calls,
3789            cached_transfers.htod_calls
3790        );
3791
3792        assert_eq!(cached_rows, uncached_rows);
3793        assert_eq!(cached_rows, LEFT_ROWS as u64);
3794        assert_eq!(cached_stats.builds, 1);
3795        assert!(cached_stats.hits >= ITERATIONS as u64);
3796        assert_eq!(uncached_stats.builds, 0);
3797        assert_eq!(cached_transfers.dtoh_calls, 0);
3798        assert_eq!(cached_transfers.htod_calls, 0);
3799        assert_eq!(uncached_transfers.dtoh_calls, 0);
3800        assert_eq!(uncached_transfers.htod_calls, 0);
3801        assert!(
3802            speedup_ratio >= 1.5,
3803            "persistent index speedup {:.3} below 1.5 target",
3804            speedup_ratio
3805        );
3806    }
3807
3808    // ============== MC Relation Reset Tests ==============
3809
3810    #[test]
3811    fn test_reset_for_mc_relations_preserves_static_and_clears_dynamic() {
3812        let mut executor = match create_test_executor() {
3813            Some(e) => e,
3814            None => {
3815                eprintln!("Skipping: no CUDA device");
3816                return;
3817            }
3818        };
3819
3820        executor.register_relation(RelId(1), "base_rel");
3821        executor.register_relation(RelId(2), "dyn_rel");
3822
3823        let schema = Schema::new(vec![("x".to_string(), ScalarType::U32)]);
3824        let base = create_test_buffer(&executor, &[1u32], "x");
3825        let dyn_buf = create_test_buffer(&executor, &[9u32], "x");
3826        executor.put_relation("base_rel", base);
3827        executor.put_relation("dyn_rel", dyn_buf);
3828
3829        executor
3830            .reset_for_mc_relations(&["base_rel"], &[("dyn_rel", schema.clone())])
3831            .unwrap();
3832
3833        assert_eq!(
3834            buffer_row_count(&executor, executor.store().get("base_rel").unwrap()),
3835            1
3836        );
3837        assert_eq!(
3838            buffer_row_count(&executor, executor.store().get("dyn_rel").unwrap()),
3839            0
3840        );
3841    }
3842}