Skip to main content

xlog_gpu/
logic.rs

1//! GPU-accelerated evaluation of compiled Datalog programs.
2
3use std::collections::{BTreeSet, HashMap};
4use std::sync::Arc;
5
6use xlog_core::{symbol, RelId, Result, ScalarType, Schema, XlogError};
7use xlog_cuda::{CudaBuffer, CudaKernelProvider};
8use xlog_ir::{EpistemicExecutablePlan, ExecutionPlan};
9use xlog_logic::ast::{AggOp, PredColumn, TypeRef};
10use xlog_logic::epistemic::{
11    compile_epistemic_gpu_execution, compile_epistemic_gpu_split_execution,
12    reduce_epistemic_program_to_ordinary,
13    reduce_epistemic_program_to_ordinary_for_stratified_schema,
14    try_plan_stratified_epistemic_program, try_reduce_case_a_recursive_epistemic_program,
15    EpistemicSplitExecutablePlan,
16};
17use xlog_logic::{
18    Atom, BodyLiteral, Compiler, EpistemicLiteral, EpistemicOp, Program, Query, Rule, Term,
19};
20use xlog_runtime::executor::JoinIndexCacheStats;
21use xlog_runtime::{
22    DeltaRecomputeStats, EpistemicGpuExecutionResult, EpistemicGpuWorkspaceCapacities,
23    ExecutionStats, Executor, RelationDelta, RelationStore,
24};
25
26/// Result of evaluating a single query in a Datalog program.
27pub struct LogicQueryResult {
28    /// Internal relation name (e.g. `__xlog_query_0`).
29    pub relation_name: String,
30    /// Output variable names in column order.
31    pub columns: Vec<String>,
32    /// Per-output-column sort labels in column order.
33    pub sort_labels: Vec<String>,
34    /// GPU-resident column buffer with the result tuples.
35    pub buffer: CudaBuffer,
36}
37
38/// Result of evaluating an entire Datalog program.
39pub struct LogicEvalResult {
40    /// One result per `?-` query in the source program.
41    pub queries: Vec<LogicQueryResult>,
42    /// Execution statistics (populated when profiling is enabled).
43    pub stats: Option<ExecutionStats>,
44}
45
46/// Runtime state retained by a persistent logic session.
47pub struct LogicSessionRuntime {
48    executor: Executor,
49    profiling: bool,
50}
51
52impl LogicSessionRuntime {
53    /// Return persistent hash-index cache telemetry for the retained executor.
54    pub fn join_index_cache_stats(&self) -> JoinIndexCacheStats {
55        self.executor.join_index_cache_stats()
56    }
57
58    /// Return multiway/Free-Join dispatch telemetry for the retained executor.
59    pub fn wcoj_dispatch_stats(&self) -> WcojDispatchStats {
60        WcojDispatchStats {
61            free_join_dispatch_count: self.executor.free_join_dispatch_count(),
62            factorized_delta_dispatch_count: self.executor.factorized_delta_dispatch_count(),
63            wcoj_groupby_fusion_dispatch_count: self.executor.wcoj_groupby_fusion_dispatch_count(),
64            wcoj_error_decline_count: self.executor.wcoj_error_decline_count(),
65        }
66    }
67}
68
69/// Multiway/Free-Join dispatch telemetry counters for a retained session
70/// executor. Counts accumulate across evaluates within the session.
71#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
72pub struct WcojDispatchStats {
73    /// Free Join dispatches taken through the multiway plan.
74    pub free_join_dispatch_count: u64,
75    /// Factorized recursive-delta dispatches taken in the semi-naive
76    /// fixpoint (dense bitvector or sparse hash-set route).
77    pub factorized_delta_dispatch_count: u64,
78    /// Aggregate-fused group-by-root dispatches (no materialized join rows).
79    pub wcoj_groupby_fusion_dispatch_count: u64,
80    /// WCOJ pipeline errors that declined to the binary-join fallback.
81    pub wcoj_error_decline_count: u64,
82}
83
84/// Planner-grade telemetry for a persistent-session relation delta update.
85#[derive(Clone, Debug, Default, PartialEq)]
86pub struct DeltaPlannerTelemetry {
87    /// True when the relation-delta path reused an existing session/cache.
88    pub cache_reused: bool,
89    /// Planner decision used for this delta update.
90    pub fallback_decision: String,
91    /// Number of SCCs affected by the delta dependency closure.
92    pub affected_sccs: usize,
93    /// Number of SCCs recomputed from scratch.
94    pub recomputed_sccs: usize,
95    /// Number of SCCs updated incrementally.
96    pub incremental_sccs: usize,
97    /// Estimated speedup of delta evaluation over full recompute when available.
98    pub estimated_delta_speedup: Option<f64>,
99    /// Measured speedup of delta evaluation over full recompute when both timings are available.
100    pub measured_delta_speedup: Option<f64>,
101    /// Human-readable planner guidance for downstream diagnostics.
102    pub planner_advice: Vec<String>,
103}
104
105impl DeltaPlannerTelemetry {
106    /// Build planner telemetry from a delta report and optional timing evidence.
107    pub fn from_delta_report(
108        report: &LogicDeltaReport,
109        cache_reused: bool,
110        measured_micros: Option<(u64, u64)>,
111    ) -> Self {
112        let fallback_decision = if report.affected_sccs == 0 {
113            "no_op"
114        } else if report.has_deletes || report.recomputed_sccs > 0 {
115            "full_recompute_fallback"
116        } else {
117            "incremental"
118        }
119        .to_string();
120        let estimated_delta_speedup = if report.affected_sccs > 0 {
121            Some((report.affected_sccs.max(1) as f64) / (report.incremental_sccs.max(1) as f64))
122        } else {
123            None
124        };
125        let measured_delta_speedup = measured_micros.and_then(|(delta_us, full_us)| {
126            if delta_us == 0 {
127                None
128            } else {
129                Some(full_us as f64 / delta_us as f64)
130            }
131        });
132
133        let mut planner_advice = Vec::new();
134        if fallback_decision == "full_recompute_fallback" {
135            planner_advice.push(
136                "full recompute fallback selected; inspect deletes or affected SCC fanout"
137                    .to_string(),
138            );
139        } else if let Some(speedup) = measured_delta_speedup {
140            if speedup >= 1.0 {
141                planner_advice.push(format!("delta path is faster by {speedup:.2}x"));
142            } else {
143                planner_advice.push(format!(
144                    "full recompute may be faster; delta measured {speedup:.2}x"
145                ));
146            }
147        } else if fallback_decision == "incremental" {
148            planner_advice.push(
149                "incremental delta path selected; run equivalence timing to measure speedup"
150                    .to_string(),
151            );
152        }
153
154        Self {
155            cache_reused,
156            fallback_decision,
157            affected_sccs: report.affected_sccs,
158            recomputed_sccs: report.recomputed_sccs,
159            incremental_sccs: report.incremental_sccs,
160            estimated_delta_speedup,
161            measured_delta_speedup,
162            planner_advice,
163        }
164    }
165}
166
167/// Summary for a persistent-session relation delta update.
168pub struct LogicDeltaReport {
169    /// Number of relation delta entries supplied by the caller before coalescing.
170    pub input_delta_count: usize,
171    /// Number of changed relation names in the delta batch.
172    pub changed_relations: usize,
173    /// Changed relation names after coalescing.
174    pub changed_relation_names: Vec<String>,
175    /// Total inserted rows across all changed relations.
176    pub insert_rows: u64,
177    /// Total deleted rows across all changed relations.
178    pub delete_rows: u64,
179    /// True when at least one relation supplied delete rows.
180    pub has_deletes: bool,
181    /// Number of SCCs whose dependency closure was affected.
182    pub affected_sccs: usize,
183    /// Number of affected SCCs that were cleared and fully recomputed.
184    pub recomputed_sccs: usize,
185    /// Number of affected SCCs updated without clearing prior output.
186    pub incremental_sccs: usize,
187    /// Net insert rows after batch coalescing and insert/delete cancellation.
188    pub coalesced_insert_rows: u64,
189    /// Net delete rows after batch coalescing and insert/delete cancellation.
190    pub coalesced_delete_rows: u64,
191    /// Rows canceled because an insert and delete for the same relation matched in the batch.
192    pub canceled_rows: u64,
193    /// Planner-grade cache, fallback, and speedup telemetry.
194    pub planner_telemetry: DeltaPlannerTelemetry,
195    /// Metadata-only debug trace for the delta recompute.
196    pub debug_trace: Vec<String>,
197}
198
199struct CoalescedRelationDeltaBatch {
200    deltas: HashMap<String, RelationDelta>,
201    input_delta_count: usize,
202    changed_relations: usize,
203    coalesced_insert_rows: u64,
204    coalesced_delete_rows: u64,
205    canceled_rows: u64,
206}
207
208#[derive(Default)]
209struct PendingRelationDelta {
210    insert: Option<CudaBuffer>,
211    delete: Option<CudaBuffer>,
212}
213
214/// One stratum of a stratified epistemic plan: the epistemic head(s) it
215/// materializes plus the GPU executable plan that computes them.
216///
217/// Lower strata are executed first; their GATED head outputs are written into the
218/// relation store as base relations BEFORE higher strata run, so a higher
219/// stratum's `know`/`possible` over a lower head gates against the materialized
220/// (now-base) relation through the existing tuple-key membership filter.
221#[derive(Clone)]
222struct StratumExecutable {
223    /// The stratum's GPU plan: single-head or joint multi-head split. The gated
224    /// head relation name(s) are recovered from the plan's reductions at runtime.
225    plan: StratumPlanKind,
226}
227
228#[derive(Clone)]
229enum StratumPlanKind {
230    Single(EpistemicExecutablePlan),
231    Split(EpistemicSplitExecutablePlan),
232    /// A higher stratum that RECURSES over a lower stratum's materialized
233    /// (now-base) determined head. Once the determined head is a base relation in
234    /// the store, its `know`/`possible` modal is over an invariant relation, so the
235    /// stratum is admissible Case-A: the modal resolves to an ordinary join (no
236    /// second gate) and the recursive semi-naive engine iterates the fixpoint. The
237    /// reduced ordinary program drives an ordinary RIR plan whose head IS this
238    /// stratum's user-visible output relation.
239    Ordinary {
240        plan: ExecutionPlan,
241        /// User-visible output head predicate(s) this stratum computes.
242        head_predicates: Vec<String>,
243    },
244}
245
246#[derive(Clone)]
247enum LogicExecutionPlan {
248    Ordinary(ExecutionPlan),
249    EpistemicWfsGpu(EpistemicWfsGpuPlan),
250    EpistemicSingle(EpistemicExecutablePlan),
251    EpistemicSplit(EpistemicSplitExecutablePlan),
252    /// Stratified epistemic execution: ordered strata, each materializing its
253    /// gated head(s) into the store before the next stratum runs.
254    EpistemicStratified(Vec<StratumExecutable>),
255}
256
257#[derive(Clone)]
258struct EpistemicWfsGpuPlan {
259    overapprox: WfsGpuOrdinaryPlan,
260    lower: WfsGpuOrdinaryPlan,
261    upper: WfsGpuOrdinaryPlan,
262    intensional_predicates: Vec<String>,
263    upper_fixed_names: HashMap<String, String>,
264    lower_fixed_names: HashMap<String, String>,
265    max_iterations: usize,
266}
267
268#[derive(Clone)]
269struct WfsGpuOrdinaryPlan {
270    plan: ExecutionPlan,
271    schemas: HashMap<String, Schema>,
272    rel_ids: HashMap<String, RelId>,
273}
274
275/// Compile-time epistemic provenance, retained even when the executable plan is
276/// `Ordinary` (e.g. a Case-A recursive epistemic fixpoint whose modal literals were
277/// resolved into invariant joins). This carries the source's epistemic literals so
278/// the epistemic plan dump can emit a stable id for a recursive epistemic fixpoint that no
279/// longer carries an epistemic GPU plan.
280#[derive(Clone)]
281struct EpistemicProvenance {
282    /// How the epistemic source was reduced for execution.
283    reduction: &'static str,
284    /// Epistemic `know`/`possible` literals (with negation) seen in the source EIR.
285    literals: Vec<xlog_ir::EirEpistemicLiteral>,
286}
287
288/// A compiled Datalog program ready for GPU evaluation.
289#[derive(Clone)]
290pub struct LogicProgram {
291    program: Program,
292    plan: LogicExecutionPlan,
293    schemas: HashMap<String, Schema>,
294    rel_ids: HashMap<String, RelId>,
295    /// `Some` iff the source program contained epistemic literals (regardless of
296    /// whether the executable plan ended up epistemic or ordinary).
297    epistemic_provenance: Option<EpistemicProvenance>,
298}
299
300impl LogicProgram {
301    /// Compile a Datalog source string into a GPU-executable program.
302    pub fn compile(source: &str) -> Result<Self> {
303        let program = xlog_logic::parse_program(source)?;
304        let normalized = normalize_program(program)?;
305        Self::compile_normalized_program(normalized)
306    }
307
308    fn compile_normalized_program(normalized: Program) -> Result<Self> {
309        if program_has_epistemic_literals(&normalized) {
310            return Self::compile_epistemic_program(normalized);
311        }
312        let mut compiler = Compiler::new();
313        let plan = compiler.compile_program(&normalized)?;
314        Ok(Self {
315            program: normalized,
316            plan: LogicExecutionPlan::Ordinary(plan),
317            schemas: compiler.schemas().clone(),
318            rel_ids: compiler.rel_ids().clone(),
319            epistemic_provenance: None,
320        })
321    }
322
323    fn compile_epistemic_program(normalized: Program) -> Result<Self> {
324        // Capture epistemic provenance up front: the source-EIR modal literals are
325        // retained even when a Case-A recursive reduction lowers the program to an
326        // Ordinary executable plan, so the epistemic plan dump can still emit a stable id
327        // for a recursive epistemic fixpoint.
328        let provenance_literals = collect_eir_epistemic_literals(&normalized);
329        // Stratified epistemic execution FIRST: a modal literal ranges over an
330        // epistemically-DETERMINED derived head (`b :- know a` where `a :- know p`,
331        // `p` invariant — possibly with the higher stratum RECURSING over the
332        // determined head, e.g. `reach :- reach, know a`). Partition into strata;
333        // each is compiled through the existing epistemic OR Case-A ordinary path,
334        // and at runtime each lower stratum's GATED head is materialized into the
335        // store as a base relation before the higher stratum gates against it (via
336        // the existing tuple-key membership filter or — once the head is a materialized
337        // base relation — Case-A resolve-into-body; either way NO double-gating
338        // against a still-modal relation). Example 18's shared BASE modal `q` (EDB,
339        // not a determined derived head) returns `None` here and falls through to
340        // the joint path UNCHANGED; plain Case-A recursion over an EDB modal
341        // (`know edge`) also returns `None` and falls through to Case-A below.
342        if let Some(stratified) = try_plan_stratified_epistemic_program(&normalized)? {
343            // SCHEMA-ONLY reduction: resolve augmenting positive modals over INVARIANT
344            // *or* epistemically-DETERMINED targets into positive ordinary atoms, so an
345            // augmented head whose extra output column is bound by a modal over a
346            // multi-column determined head (`out(X) :- node(X), know r(X, Y)`, `r`
347            // determined) types its appended `Y` column from `r`'s declaration instead
348            // of failing closed as `UnsafeVariable`. This drives ONLY plan schema
349            // inference; per-stratum EXECUTION compiles below over sub-programs where
350            // the determined head is already a materialized base relation (strict
351            // invariant resolve), so no modal is ever resolved over an un-gated
352            // candidate at runtime.
353            let reduced = reduce_epistemic_program_to_ordinary_for_stratified_schema(&normalized);
354            let mut schema_compiler = Compiler::new();
355            schema_compiler.compile_program(&reduced)?;
356            let mut schemas = schema_compiler.schemas().clone();
357            augment_same_name_multi_arity_schemas(&normalized, &mut schemas)?;
358
359            let mut strata = Vec::with_capacity(stratified.strata.len());
360            for stratum in &stratified.strata {
361                strata.push(StratumExecutable {
362                    plan: Self::compile_stratum_plan(&stratum.program)?,
363                });
364            }
365            let plan = LogicExecutionPlan::EpistemicStratified(strata);
366            let rel_ids = epistemic_relation_ids(&plan)?;
367            return Ok(Self {
368                program: normalized,
369                plan,
370                schemas,
371                rel_ids,
372                epistemic_provenance: Some(EpistemicProvenance {
373                    reduction: "stratified",
374                    literals: provenance_literals,
375                }),
376            });
377        }
378
379        // Case A/B: reduce admissible recursive epistemic programs to ordinary
380        // recursion. Stratified reduced programs route through the existing ordinary
381        // semi-naive engine; non-monotone reduced SCCs route through the GPU-native
382        // WFS alternating-fixpoint plan below. Recursive shapes outside the admissible
383        // fragment still fail closed in `try_reduce_case_a_recursive_epistemic_program`.
384        if let Some(case_a_reduced) = try_reduce_case_a_recursive_epistemic_program(&normalized)? {
385            let strat = xlog_logic::stratify::analyze_stratification(&case_a_reduced);
386            if !strat.non_monotone_sccs.is_empty() {
387                let wfs_plan = compile_epistemic_wfs_gpu_plan(&case_a_reduced)?;
388                let schemas = wfs_plan_combined_schemas(&wfs_plan);
389                let rel_ids = wfs_plan_combined_rel_ids(&wfs_plan)?;
390                return Ok(Self {
391                    program: case_a_reduced,
392                    plan: LogicExecutionPlan::EpistemicWfsGpu(wfs_plan),
393                    schemas,
394                    rel_ids,
395                    epistemic_provenance: Some(EpistemicProvenance {
396                        reduction: "wfs_gpu_recursive",
397                        literals: provenance_literals,
398                    }),
399                });
400            }
401            let mut compiler = Compiler::new();
402            let plan = compiler.compile_program(&case_a_reduced)?;
403            return Ok(Self {
404                program: case_a_reduced,
405                plan: LogicExecutionPlan::Ordinary(plan),
406                schemas: compiler.schemas().clone(),
407                rel_ids: compiler.rel_ids().clone(),
408                epistemic_provenance: Some(EpistemicProvenance {
409                    reduction: "ordinary_recursive_modal_reduction",
410                    literals: provenance_literals,
411                }),
412            });
413        }
414
415        let reduced = reduce_epistemic_program_to_ordinary(&normalized);
416        let mut schema_compiler = Compiler::new();
417        schema_compiler.compile_program(&reduced)?;
418        let mut schemas = schema_compiler.schemas().clone();
419        augment_same_name_multi_arity_schemas(&normalized, &mut schemas)?;
420
421        let plan = if epistemic_output_head_predicate_count(&normalized) > 1 {
422            LogicExecutionPlan::EpistemicSplit(compile_epistemic_gpu_split_execution(&normalized)?)
423        } else {
424            match compile_epistemic_gpu_execution(&normalized) {
425                Ok(executable) => LogicExecutionPlan::EpistemicSingle(executable),
426                Err(XlogError::UnsupportedEpistemicConstruct { construct, .. })
427                    if construct == "epistemic GPU final output relation" =>
428                {
429                    LogicExecutionPlan::EpistemicSplit(compile_epistemic_gpu_split_execution(
430                        &normalized,
431                    )?)
432                }
433                Err(err) => return Err(err),
434            }
435        };
436        let rel_ids = epistemic_relation_ids(&plan)?;
437
438        Ok(Self {
439            program: normalized,
440            plan,
441            schemas,
442            rel_ids,
443            epistemic_provenance: Some(EpistemicProvenance {
444                reduction: "epistemic_executable",
445                literals: provenance_literals,
446            }),
447        })
448    }
449
450    /// Compile one stratum sub-program into its plan kind.
451    ///
452    /// A stratum whose epistemic heads gate only over invariant or
453    /// already-materialized lower-stratum relations is either an admissible Case-A
454    /// recursion (the modal resolves to an ordinary join over the now-base relation)
455    /// or a plain single/joint epistemic plan. Case-A is tried first so a recursive
456    /// higher stratum (`reach :- reach, know a`, `a` materialized base) routes
457    /// through the ordinary semi-naive engine.
458    fn compile_stratum_plan(stratum_program: &Program) -> Result<StratumPlanKind> {
459        if let Some(case_a_reduced) =
460            try_reduce_case_a_recursive_epistemic_program(stratum_program)?
461        {
462            let mut compiler = Compiler::new();
463            let plan = compiler.compile_program(&case_a_reduced)?;
464            let head_predicates = epistemic_stratum_output_heads(stratum_program);
465            return Ok(StratumPlanKind::Ordinary {
466                plan,
467                head_predicates,
468            });
469        }
470        if epistemic_output_head_predicate_count(stratum_program) > 1 {
471            Ok(StratumPlanKind::Split(
472                compile_epistemic_gpu_split_execution(stratum_program)?,
473            ))
474        } else {
475            Ok(StratumPlanKind::Single(compile_epistemic_gpu_execution(
476                stratum_program,
477            )?))
478        }
479    }
480
481    /// Compile a program with module resolution.
482    ///
483    /// This method resolves all imports using the provided resolver and merges
484    /// imported predicates, functions, and rules into the main program.
485    ///
486    /// # Arguments
487    /// * `source` - The source code of the main program
488    /// * `resolver` - A pre-loaded ModuleResolver with all dependencies resolved
489    ///
490    /// # Returns
491    /// The compiled LogicProgram with all imports merged
492    pub fn compile_with_resolver(
493        source: &str,
494        resolver: &xlog_logic::resolver::ModuleResolver,
495    ) -> Result<Self> {
496        let program = xlog_logic::parse_program(source)?;
497
498        // Merge imports from the resolver
499        let merged = resolver
500            .merge_imports(program)
501            .map_err(|e| XlogError::Compilation(format!("Module resolution failed: {}", e)))?;
502
503        let normalized = normalize_program(merged)?;
504        Self::compile_normalized_program(normalized)
505    }
506
507    /// Serialize the compiled epistemic execution plan to a JSON summary.
508    ///
509    /// Returns `None` for ordinary (non-epistemic) programs. For epistemic
510    /// programs this dumps the EIR-derived GPU plan(s): selected mode, the
511    /// epistemic `know`/`possible` literals (with negation), required GPU hot-path
512    /// phases/kernels, world-view integrity constraints, reduced-program head
513    /// summaries, the forbidden CPU-fallback counters (which must all be zero on
514    /// the accepted GPU hot path), and a deterministic plan id (a stable hash of
515    /// the canonical summary). This is the epistemic-plan/EIR dump surface:
516    /// it lets an external caller (pyxlog or CLI consumer) read the accepted
517    /// world-view structure and assert `cpu_fallback == 0` off a real run.
518    pub fn epistemic_plan_json(&self) -> Option<String> {
519        let gpu_plans: Vec<(String, &xlog_ir::EpistemicGpuPlan)> = match &self.plan {
520            // A program whose source was epistemic but whose executable plan is
521            // ordinary: this is a Case-A recursive epistemic fixpoint (modal literals
522            // resolved into invariant joins). It carries no epistemic GPU plan, but it
523            // IS GPU-clean by construction (the recursion runs on the ordinary
524            // semi-naive engine with no epistemic CPU fallback). Emit a provenance
525            // summary with a stable id so the recursive-fixpoint fixture is auditable.
526            LogicExecutionPlan::Ordinary(_) => {
527                let prov = self.epistemic_provenance.as_ref()?;
528                return Some(epistemic_provenance_summary_json(
529                    "epistemic_reduced_ordinary",
530                    prov,
531                    None,
532                    None,
533                ));
534            }
535            LogicExecutionPlan::EpistemicWfsGpu(wfs) => {
536                let prov = self.epistemic_provenance.as_ref()?;
537                return Some(epistemic_provenance_summary_json(
538                    self.plan_kind_label(),
539                    prov,
540                    Some(wfs.max_iterations),
541                    Some(wfs),
542                ));
543            }
544            LogicExecutionPlan::EpistemicSingle(plan) => {
545                vec![("single".to_string(), &plan.gpu_plan)]
546            }
547            LogicExecutionPlan::EpistemicSplit(split) => split
548                .components
549                .iter()
550                .enumerate()
551                .map(|(i, c)| (format!("split[{i}]"), &c.executable.gpu_plan))
552                .collect(),
553            LogicExecutionPlan::EpistemicStratified(strata) => {
554                let mut plans = Vec::new();
555                for (i, stratum) in strata.iter().enumerate() {
556                    match &stratum.plan {
557                        StratumPlanKind::Single(plan) => {
558                            plans.push((format!("stratum[{i}]"), &plan.gpu_plan));
559                        }
560                        StratumPlanKind::Split(split) => {
561                            for (j, c) in split.components.iter().enumerate() {
562                                plans.push((
563                                    format!("stratum[{i}].split[{j}]"),
564                                    &c.executable.gpu_plan,
565                                ));
566                            }
567                        }
568                        // Recursive/ordinary higher strata carry no epistemic GPU
569                        // plan (the modal already resolved to an ordinary join over
570                        // a materialized base); they contribute no fallback counters.
571                        StratumPlanKind::Ordinary { .. } => {}
572                    }
573                }
574                plans
575            }
576        };
577        Some(epistemic_plan_summary_json(
578            self.plan_kind_label(),
579            &gpu_plans,
580        ))
581    }
582
583    fn plan_kind_label(&self) -> &'static str {
584        match &self.plan {
585            LogicExecutionPlan::Ordinary(_) => "ordinary",
586            LogicExecutionPlan::EpistemicWfsGpu(_) => "epistemic_wfs_gpu",
587            LogicExecutionPlan::EpistemicSingle(_) => "epistemic_single",
588            LogicExecutionPlan::EpistemicSplit(_) => "epistemic_split",
589            LogicExecutionPlan::EpistemicStratified(_) => "epistemic_stratified",
590        }
591    }
592
593    /// Look up the schema for a named relation.
594    pub fn schema(&self, relation: &str) -> Option<&Schema> {
595        self.schemas.get(relation)
596    }
597
598    /// Return the full schema map (relation name to schema).
599    pub fn schemas(&self) -> &HashMap<String, Schema> {
600        &self.schemas
601    }
602
603    /// Return stable rule provenance for source-visible rules.
604    pub fn rule_provenance(&self) -> Vec<xlog_logic::RuleProvenance> {
605        xlog_logic::rule_provenance(&self.program, None)
606    }
607
608    /// Return direct proof traces for source queries.
609    pub fn proof_traces(&self) -> Vec<xlog_logic::QueryProofTrace> {
610        let provenance = self.rule_provenance();
611        xlog_logic::query_proof_traces(&self.program, &provenance)
612    }
613
614    /// Create a persistent user-visible relation store initialized with inline facts.
615    pub fn create_relation_store(
616        &self,
617        provider: Arc<CudaKernelProvider>,
618    ) -> Result<RelationStore> {
619        let mut store = RelationStore::new(provider.clone());
620        for (name, schema) in &self.schemas {
621            if is_user_visible_relation(name) || is_list_helper_relation(name) {
622                store.put(name, provider.create_empty_buffer(schema.clone())?);
623            }
624        }
625        self.load_facts_into_store(provider.as_ref(), &mut store)?;
626        Ok(store)
627    }
628
629    /// Evaluate using a persistent base relation store.
630    ///
631    /// The provided store is treated as immutable seed state. Buffers are cloned
632    /// into a fresh executor for each evaluation so repeated evaluations reuse
633    /// stored relations without mutating the persistent store itself.
634    pub fn evaluate_with_relation_store(
635        &self,
636        provider: Arc<CudaKernelProvider>,
637        relation_store: &RelationStore,
638        profiling: bool,
639    ) -> Result<LogicEvalResult> {
640        let (result, _) =
641            self.evaluate_with_relation_store_and_cache(provider, relation_store, profiling)?;
642        Ok(result)
643    }
644
645    /// Evaluate using a persistent relation store and return the complete runtime store.
646    pub fn evaluate_with_relation_store_and_cache(
647        &self,
648        provider: Arc<CudaKernelProvider>,
649        relation_store: &RelationStore,
650        profiling: bool,
651    ) -> Result<(LogicEvalResult, RelationStore)> {
652        let mut executor =
653            self.executor_from_relation_store(provider.clone(), relation_store, profiling)?;
654        executor.execute_plan(self.ordinary_plan("relation-store evaluation")?)?;
655        self.enforce_constraints(&provider, &executor)?;
656
657        let total_output_rows = self.total_query_rows(executor.store())?;
658        let stats = if profiling {
659            Some(executor.execution_stats(total_output_rows))
660        } else {
661            None
662        };
663
664        let cached_store = self.clone_relation_store(&provider, executor.store())?;
665        let result = self.logic_result_from_store(provider.as_ref(), &cached_store, stats)?;
666        Ok((result, cached_store))
667    }
668
669    /// Create retained runtime state for a persistent relation session.
670    pub fn create_session_runtime(
671        &self,
672        provider: Arc<CudaKernelProvider>,
673        relation_store: &RelationStore,
674        profiling: bool,
675    ) -> Result<LogicSessionRuntime> {
676        self.ordinary_plan("persistent relation session")?;
677        Ok(LogicSessionRuntime {
678            executor: self.executor_from_relation_store(provider, relation_store, profiling)?,
679            profiling,
680        })
681    }
682
683    /// Evaluate with retained session runtime state and return a materialized store snapshot.
684    pub fn evaluate_with_session_runtime(
685        &self,
686        provider: Arc<CudaKernelProvider>,
687        runtime: &mut LogicSessionRuntime,
688    ) -> Result<(LogicEvalResult, RelationStore)> {
689        runtime.executor.set_profiling(runtime.profiling);
690        runtime
691            .executor
692            .execute_plan(self.ordinary_plan("session runtime evaluation")?)?;
693        self.enforce_constraints(&provider, &runtime.executor)?;
694
695        let total_output_rows = self.total_query_rows(runtime.executor.store())?;
696        let stats = if runtime.profiling {
697            Some(runtime.executor.execution_stats(total_output_rows))
698        } else {
699            None
700        };
701
702        let cached_store = self.clone_relation_store(&provider, runtime.executor.store())?;
703        let result = self.logic_result_from_store(provider.as_ref(), &cached_store, stats)?;
704        Ok((result, cached_store))
705    }
706
707    /// Build query results from an already materialized runtime store.
708    pub fn evaluate_cached_relation_store(
709        &self,
710        provider: Arc<CudaKernelProvider>,
711        relation_store: &RelationStore,
712    ) -> Result<LogicEvalResult> {
713        self.logic_result_from_store(provider.as_ref(), relation_store, None)
714    }
715
716    /// Apply relation deltas to a persistent session store through the runtime delta path.
717    pub fn apply_relation_deltas(
718        &self,
719        provider: Arc<CudaKernelProvider>,
720        relation_store: &mut RelationStore,
721        cached_store: &mut Option<RelationStore>,
722        deltas: HashMap<String, RelationDelta>,
723    ) -> Result<LogicDeltaReport> {
724        let insert_rows = deltas
725            .values()
726            .filter_map(|d| d.insert.as_ref())
727            .map(|b| b.num_rows())
728            .sum();
729        let delete_rows = deltas
730            .values()
731            .filter_map(|d| d.delete.as_ref())
732            .map(|b| b.num_rows())
733            .sum();
734        let cache_reused = cached_store.is_some();
735        let mut changed_relation_names = deltas.keys().cloned().collect::<Vec<_>>();
736        changed_relation_names.sort();
737
738        if cached_store.is_none() {
739            let (_, store) = self.evaluate_with_relation_store_and_cache(
740                provider.clone(),
741                relation_store,
742                false,
743            )?;
744            *cached_store = Some(store);
745        }
746
747        let store_before_delta = cached_store.as_ref().ok_or_else(|| {
748            XlogError::Execution("Missing cached relation store for delta update".to_string())
749        })?;
750        let mut executor =
751            self.executor_from_relation_store(provider.clone(), store_before_delta, false)?;
752        let delta_stats = executor
753            .apply_deltas_and_recompute(self.ordinary_plan("relation-delta recompute")?, &deltas)?;
754        self.enforce_constraints(&provider, &executor)?;
755
756        for name in deltas.keys() {
757            let updated = executor.store().get(name).ok_or_else(|| {
758                XlogError::Execution(format!(
759                    "Delta relation {} missing after runtime recompute",
760                    name
761                ))
762            })?;
763            relation_store.put(name, provider.clone_buffer(updated)?);
764        }
765
766        *cached_store = Some(self.clone_relation_store(&provider, executor.store())?);
767
768        let mut report = logic_delta_report(delta_stats, insert_rows, delete_rows);
769        report.changed_relation_names = changed_relation_names;
770        report.planner_telemetry =
771            DeltaPlannerTelemetry::from_delta_report(&report, cache_reused, None);
772        report.debug_trace = delta_debug_trace(&report);
773        Ok(report)
774    }
775
776    /// Apply relation deltas while preserving retained session runtime state.
777    pub fn apply_relation_deltas_with_session_runtime(
778        &self,
779        provider: Arc<CudaKernelProvider>,
780        relation_store: &mut RelationStore,
781        cached_store: &mut Option<RelationStore>,
782        session_runtime: &mut Option<LogicSessionRuntime>,
783        deltas: HashMap<String, RelationDelta>,
784    ) -> Result<LogicDeltaReport> {
785        let insert_rows = deltas
786            .values()
787            .filter_map(|d| d.insert.as_ref())
788            .map(|b| b.num_rows())
789            .sum();
790        let delete_rows = deltas
791            .values()
792            .filter_map(|d| d.delete.as_ref())
793            .map(|b| b.num_rows())
794            .sum();
795        let cache_reused = session_runtime.is_some() || cached_store.is_some();
796        let mut changed_relation_names = deltas.keys().cloned().collect::<Vec<_>>();
797        changed_relation_names.sort();
798
799        if session_runtime.is_none() {
800            let seed_store: &RelationStore = match cached_store.as_ref() {
801                Some(store) => store,
802                None => &*relation_store,
803            };
804            *session_runtime =
805                Some(self.create_session_runtime(provider.clone(), seed_store, false)?);
806        }
807
808        if cached_store.is_none() {
809            let runtime = session_runtime.as_mut().ok_or_else(|| {
810                XlogError::Execution("Missing session runtime for cached evaluation".to_string())
811            })?;
812            let (_, store) = self.evaluate_with_session_runtime(provider.clone(), runtime)?;
813            *cached_store = Some(store);
814        }
815
816        let runtime = session_runtime.as_mut().ok_or_else(|| {
817            XlogError::Execution("Missing session runtime for delta update".to_string())
818        })?;
819        let delta_stats = runtime.executor.apply_deltas_and_recompute(
820            self.ordinary_plan("session relation-delta recompute")?,
821            &deltas,
822        )?;
823        self.enforce_constraints(&provider, &runtime.executor)?;
824
825        for name in deltas.keys() {
826            let updated = runtime.executor.store().get(name).ok_or_else(|| {
827                XlogError::Execution(format!(
828                    "Delta relation {} missing after runtime recompute",
829                    name
830                ))
831            })?;
832            relation_store.put(name, provider.clone_buffer(updated)?);
833        }
834
835        *cached_store = Some(self.clone_relation_store(&provider, runtime.executor.store())?);
836
837        let mut report = logic_delta_report(delta_stats, insert_rows, delete_rows);
838        report.changed_relation_names = changed_relation_names;
839        report.planner_telemetry =
840            DeltaPlannerTelemetry::from_delta_report(&report, cache_reused, None);
841        report.debug_trace = delta_debug_trace(&report);
842        Ok(report)
843    }
844
845    /// Apply an ordered batch of relation deltas after device-side coalescing.
846    pub fn apply_relation_delta_batch(
847        &self,
848        provider: Arc<CudaKernelProvider>,
849        relation_store: &mut RelationStore,
850        cached_store: &mut Option<RelationStore>,
851        delta_batch: Vec<(String, RelationDelta)>,
852    ) -> Result<LogicDeltaReport> {
853        let coalesced = coalesce_relation_delta_batch(provider.as_ref(), delta_batch)?;
854        if coalesced.deltas.is_empty() {
855            return Ok(LogicDeltaReport {
856                input_delta_count: coalesced.input_delta_count,
857                changed_relations: 0,
858                changed_relation_names: Vec::new(),
859                insert_rows: 0,
860                delete_rows: 0,
861                has_deletes: false,
862                affected_sccs: 0,
863                recomputed_sccs: 0,
864                incremental_sccs: 0,
865                coalesced_insert_rows: 0,
866                coalesced_delete_rows: 0,
867                canceled_rows: coalesced.canceled_rows,
868                planner_telemetry: DeltaPlannerTelemetry {
869                    fallback_decision: "no_op".to_string(),
870                    ..DeltaPlannerTelemetry::default()
871                },
872                debug_trace: vec![format!("canceled_rows={}", coalesced.canceled_rows)],
873            });
874        }
875
876        let mut report =
877            self.apply_relation_deltas(provider, relation_store, cached_store, coalesced.deltas)?;
878        report.input_delta_count = coalesced.input_delta_count;
879        report.changed_relations = coalesced.changed_relations;
880        report.coalesced_insert_rows = coalesced.coalesced_insert_rows;
881        report.coalesced_delete_rows = coalesced.coalesced_delete_rows;
882        report.canceled_rows = coalesced.canceled_rows;
883        report.planner_telemetry = DeltaPlannerTelemetry::from_delta_report(&report, true, None);
884        report.debug_trace = delta_debug_trace(&report);
885        Ok(report)
886    }
887
888    /// Apply an ordered batch of relation deltas while preserving session runtime state.
889    pub fn apply_relation_delta_batch_with_session_runtime(
890        &self,
891        provider: Arc<CudaKernelProvider>,
892        relation_store: &mut RelationStore,
893        cached_store: &mut Option<RelationStore>,
894        session_runtime: &mut Option<LogicSessionRuntime>,
895        delta_batch: Vec<(String, RelationDelta)>,
896    ) -> Result<LogicDeltaReport> {
897        let coalesced = coalesce_relation_delta_batch(provider.as_ref(), delta_batch)?;
898        if coalesced.deltas.is_empty() {
899            return Ok(LogicDeltaReport {
900                input_delta_count: coalesced.input_delta_count,
901                changed_relations: 0,
902                changed_relation_names: Vec::new(),
903                insert_rows: 0,
904                delete_rows: 0,
905                has_deletes: false,
906                affected_sccs: 0,
907                recomputed_sccs: 0,
908                incremental_sccs: 0,
909                coalesced_insert_rows: 0,
910                coalesced_delete_rows: 0,
911                canceled_rows: coalesced.canceled_rows,
912                planner_telemetry: DeltaPlannerTelemetry {
913                    fallback_decision: "no_op".to_string(),
914                    ..DeltaPlannerTelemetry::default()
915                },
916                debug_trace: vec![format!("canceled_rows={}", coalesced.canceled_rows)],
917            });
918        }
919
920        let mut report = self.apply_relation_deltas_with_session_runtime(
921            provider,
922            relation_store,
923            cached_store,
924            session_runtime,
925            coalesced.deltas,
926        )?;
927        report.input_delta_count = coalesced.input_delta_count;
928        report.changed_relations = coalesced.changed_relations;
929        report.coalesced_insert_rows = coalesced.coalesced_insert_rows;
930        report.coalesced_delete_rows = coalesced.coalesced_delete_rows;
931        report.canceled_rows = coalesced.canceled_rows;
932        report.planner_telemetry = DeltaPlannerTelemetry::from_delta_report(&report, true, None);
933        report.debug_trace = delta_debug_trace(&report);
934        Ok(report)
935    }
936
937    /// Evaluate the program with the given input relations (no profiling).
938    pub fn evaluate(
939        &self,
940        provider: Arc<CudaKernelProvider>,
941        inputs: HashMap<String, CudaBuffer>,
942    ) -> Result<LogicEvalResult> {
943        self.evaluate_with_options(provider, inputs, false)
944    }
945
946    /// Evaluate the program with optional profiling
947    ///
948    /// # Arguments
949    /// * `provider` - The CUDA kernel provider
950    /// * `inputs` - Input relations
951    /// * `profiling` - Whether to collect execution statistics
952    pub fn evaluate_with_options(
953        &self,
954        provider: Arc<CudaKernelProvider>,
955        inputs: HashMap<String, CudaBuffer>,
956        profiling: bool,
957    ) -> Result<LogicEvalResult> {
958        let mut executor = Executor::new(provider.clone());
959        executor.set_profiling(profiling);
960        for (name, rel_id) in &self.rel_ids {
961            executor.register_relation(*rel_id, name);
962        }
963
964        for (name, schema) in &self.schemas {
965            executor
966                .store_mut()
967                .put(name, provider.create_empty_buffer(schema.clone())?);
968        }
969
970        for (name, buffer) in inputs {
971            let schema = self.schemas.get(&name).ok_or_else(|| {
972                XlogError::Execution(format!(
973                    "Input relation {} not declared in program schemas",
974                    name
975                ))
976            })?;
977            ensure_schema_type_compatible(schema, buffer.schema()).map_err(|e| {
978                XlogError::Execution(format!("Input relation {} schema mismatch: {}", name, e))
979            })?;
980            executor.store_mut().put(&name, buffer);
981        }
982
983        self.load_facts(&provider, &mut executor)?;
984
985        if let LogicExecutionPlan::EpistemicWfsGpu(wfs_plan) = &self.plan {
986            return self.evaluate_wfs_gpu_program(provider, executor, wfs_plan, profiling);
987        }
988
989        let LogicExecutionPlan::Ordinary(plan) = &self.plan else {
990            return self.evaluate_epistemic_with_executor(executor, profiling);
991        };
992
993        executor.execute_plan(plan)?;
994
995        self.enforce_constraints(&provider, &executor)?;
996
997        let mut queries: Vec<LogicQueryResult> = Vec::with_capacity(self.program.queries.len());
998        for (i, query) in self.program.queries.iter().enumerate() {
999            let relation_name = format!("__xlog_query_{}", i);
1000            let buffer = executor.store_mut().remove(&relation_name).ok_or_else(|| {
1001                XlogError::Execution(format!(
1002                    "Missing query result relation {} (compiler bug?)",
1003                    relation_name
1004                ))
1005            })?;
1006
1007            let columns = query_output_vars(query);
1008            queries.push(LogicQueryResult {
1009                relation_name,
1010                sort_labels: columns.clone(),
1011                columns,
1012                buffer,
1013            });
1014        }
1015
1016        // Collect execution stats if profiling was enabled
1017        let total_output_rows: u64 = queries.iter().map(|q| q.buffer.num_rows()).sum();
1018        let stats = if profiling {
1019            Some(executor.execution_stats(total_output_rows))
1020        } else {
1021            None
1022        };
1023
1024        Ok(LogicEvalResult { queries, stats })
1025    }
1026
1027    /// Compare query result relations between two stores using GPU set difference.
1028    pub fn relation_stores_query_equivalent(
1029        &self,
1030        provider: &CudaKernelProvider,
1031        left: &RelationStore,
1032        right: &RelationStore,
1033    ) -> Result<bool> {
1034        for idx in 0..self.program.queries.len() {
1035            let name = format!("__xlog_query_{}", idx);
1036            let Some(left_buffer) = left.get(&name) else {
1037                return Ok(false);
1038            };
1039            let Some(right_buffer) = right.get(&name) else {
1040                return Ok(false);
1041            };
1042            if !buffers_gpu_set_equivalent(provider, left_buffer, right_buffer)? {
1043                return Ok(false);
1044            }
1045        }
1046        Ok(true)
1047    }
1048
1049    fn executor_from_relation_store(
1050        &self,
1051        provider: Arc<CudaKernelProvider>,
1052        relation_store: &RelationStore,
1053        profiling: bool,
1054    ) -> Result<Executor> {
1055        let mut executor = Executor::new(provider.clone());
1056        executor.set_profiling(profiling);
1057        for (name, rel_id) in &self.rel_ids {
1058            executor.register_relation(*rel_id, name);
1059        }
1060
1061        for (name, schema) in &self.schemas {
1062            executor
1063                .store_mut()
1064                .put(name, provider.create_empty_buffer(schema.clone())?);
1065        }
1066
1067        for name in relation_store.names() {
1068            let buffer = relation_store.get(name).ok_or_else(|| {
1069                XlogError::Execution(format!(
1070                    "Persistent relation {} disappeared during evaluation",
1071                    name
1072                ))
1073            })?;
1074            let schema = self.schemas.get(name).ok_or_else(|| {
1075                XlogError::Execution(format!(
1076                    "Persistent relation {} not declared in program schemas",
1077                    name
1078                ))
1079            })?;
1080            ensure_schema_type_compatible(schema, buffer.schema()).map_err(|e| {
1081                XlogError::Execution(format!(
1082                    "Persistent relation {} schema mismatch: {}",
1083                    name, e
1084                ))
1085            })?;
1086            executor
1087                .store_mut()
1088                .put(name, provider.clone_buffer(buffer)?);
1089        }
1090
1091        Ok(executor)
1092    }
1093
1094    fn clone_relation_store(
1095        &self,
1096        provider: &Arc<CudaKernelProvider>,
1097        source: &RelationStore,
1098    ) -> Result<RelationStore> {
1099        let mut cloned = RelationStore::new(provider.clone());
1100        for name in source.names() {
1101            let buffer = source.get(name).ok_or_else(|| {
1102                XlogError::Execution(format!("Relation {} disappeared during clone", name))
1103            })?;
1104            cloned.put(name, provider.clone_buffer(buffer)?);
1105        }
1106        Ok(cloned)
1107    }
1108
1109    fn total_query_rows(&self, store: &RelationStore) -> Result<u64> {
1110        let mut total = 0;
1111        for i in 0..self.program.queries.len() {
1112            let relation_name = format!("__xlog_query_{}", i);
1113            let buffer = store.get(&relation_name).ok_or_else(|| {
1114                XlogError::Execution(format!(
1115                    "Missing query result relation {} (compiler bug?)",
1116                    relation_name
1117                ))
1118            })?;
1119            total += buffer.num_rows();
1120        }
1121        Ok(total)
1122    }
1123
1124    fn logic_result_from_store(
1125        &self,
1126        provider: &CudaKernelProvider,
1127        store: &RelationStore,
1128        stats: Option<ExecutionStats>,
1129    ) -> Result<LogicEvalResult> {
1130        let mut queries: Vec<LogicQueryResult> = Vec::with_capacity(self.program.queries.len());
1131        for (i, query) in self.program.queries.iter().enumerate() {
1132            let relation_name = format!("__xlog_query_{}", i);
1133            let buffer = store.get(&relation_name).ok_or_else(|| {
1134                XlogError::Execution(format!(
1135                    "Missing query result relation {} (compiler bug?)",
1136                    relation_name
1137                ))
1138            })?;
1139
1140            let columns = query_output_vars(query);
1141            queries.push(LogicQueryResult {
1142                relation_name,
1143                sort_labels: columns.clone(),
1144                columns,
1145                buffer: provider.clone_buffer(buffer)?,
1146            });
1147        }
1148
1149        Ok(LogicEvalResult { queries, stats })
1150    }
1151
1152    fn load_facts(&self, provider: &CudaKernelProvider, executor: &mut Executor) -> Result<()> {
1153        self.load_facts_into_store(provider, executor.store_mut())
1154    }
1155
1156    fn load_facts_into_store(
1157        &self,
1158        provider: &CudaKernelProvider,
1159        store: &mut RelationStore,
1160    ) -> Result<()> {
1161        let arities = predicate_arities(&self.program);
1162        let mut rows_by_pred: HashMap<String, Vec<&[Term]>> = HashMap::new();
1163        for fact in self.program.facts() {
1164            let pred = fact.head.predicate.as_str();
1165            let arity = fact.head.terms.len();
1166            let key = arity_qualified_name_if_needed(pred, arity, &arities);
1167            rows_by_pred.entry(key).or_default().push(&fact.head.terms);
1168        }
1169
1170        for (pred, rows) in rows_by_pred {
1171            let schema = self.schemas.get(pred.as_str()).ok_or_else(|| {
1172                XlogError::Execution(format!(
1173                    "Missing inferred schema for fact predicate {}",
1174                    pred
1175                ))
1176            })?;
1177
1178            if rows.iter().any(|r| r.len() != schema.arity()) {
1179                return Err(XlogError::Execution(format!(
1180                    "Fact arity mismatch for {} (expected {} columns)",
1181                    pred,
1182                    schema.arity()
1183                )));
1184            }
1185
1186            let mut columns: Vec<Vec<u8>> = vec![Vec::new(); schema.arity()];
1187            for row in rows {
1188                for (col_idx, term) in row.iter().enumerate() {
1189                    let typ = schema.column_type(col_idx).ok_or_else(|| {
1190                        XlogError::Execution(format!("Missing type for column {}", col_idx))
1191                    })?;
1192                    push_term_bytes(&mut columns[col_idx], term, typ)?;
1193                }
1194            }
1195
1196            let fact_buf = if schema.arity() == 0 {
1197                // Nullary predicate: every `pred().` assertion denotes the same unit
1198                // tuple `()`, so presence is a single row. `create_buffer_from_slices`
1199                // with no column slices yields a 0-row (absent) relation, which would
1200                // make an asserted nullary fact read as false everywhere downstream
1201                // (ordinary joins and epistemic modal membership alike).
1202                provider.create_zero_arity_buffer(schema.clone(), 1)?
1203            } else {
1204                let slices: Vec<&[u8]> = columns.iter().map(|c| c.as_slice()).collect();
1205                provider.create_buffer_from_slices(&slices, schema.clone())?
1206            };
1207
1208            let existing = store.get(&pred).ok_or_else(|| {
1209                XlogError::Execution(format!(
1210                    "Missing base relation {} while loading facts",
1211                    pred
1212                ))
1213            })?;
1214
1215            let merged = provider.union(existing, &fact_buf)?;
1216            store.put(pred.as_str(), merged);
1217        }
1218
1219        Ok(())
1220    }
1221
1222    fn evaluate_wfs_gpu_program(
1223        &self,
1224        provider: Arc<CudaKernelProvider>,
1225        base_executor: Executor,
1226        wfs: &EpistemicWfsGpuPlan,
1227        profiling: bool,
1228    ) -> Result<LogicEvalResult> {
1229        let base_store = self.clone_relation_store(&provider, base_executor.store())?;
1230        let upper_executor =
1231            self.run_wfs_gpu_pass(&provider, &wfs.overapprox, &base_store, &[], profiling)?;
1232        let mut upper_store = self.clone_relation_store(&provider, upper_executor.store())?;
1233        let mut lower_store = self.clone_relation_store(&provider, &base_store)?;
1234
1235        for _ in 0..wfs.max_iterations {
1236            let upper_fixed: Vec<_> = wfs
1237                .upper_fixed_names
1238                .iter()
1239                .map(|(source, fixed)| (source.as_str(), fixed.as_str(), &upper_store))
1240                .collect();
1241            let lower_executor =
1242                self.run_wfs_gpu_pass(&provider, &wfs.lower, &base_store, &upper_fixed, profiling)?;
1243            let next_lower = self.clone_relation_store(&provider, lower_executor.store())?;
1244
1245            let lower_fixed: Vec<_> = wfs
1246                .lower_fixed_names
1247                .iter()
1248                .map(|(source, fixed)| (source.as_str(), fixed.as_str(), &next_lower))
1249                .collect();
1250            let next_upper_executor =
1251                self.run_wfs_gpu_pass(&provider, &wfs.upper, &base_store, &lower_fixed, profiling)?;
1252            let next_upper = self.clone_relation_store(&provider, next_upper_executor.store())?;
1253
1254            let lower_converged =
1255                self.wfs_gpu_stores_equivalent(&provider, wfs, &lower_store, &next_lower)?;
1256            let upper_converged =
1257                self.wfs_gpu_stores_equivalent(&provider, wfs, &upper_store, &next_upper)?;
1258            lower_store = next_lower;
1259            upper_store = next_upper;
1260            if lower_converged && upper_converged {
1261                return self.logic_result_from_store(provider.as_ref(), &lower_store, None);
1262            }
1263        }
1264
1265        Err(XlogError::ResourceExhausted {
1266            context: "GPU-backed WFS alternating fixpoint iterations".to_string(),
1267            estimated_bytes: wfs.max_iterations as u64,
1268            budget_bytes: wfs.max_iterations as u64,
1269        })
1270    }
1271
1272    fn run_wfs_gpu_pass(
1273        &self,
1274        provider: &Arc<CudaKernelProvider>,
1275        pass: &WfsGpuOrdinaryPlan,
1276        base_store: &RelationStore,
1277        fixed_relations: &[(&str, &str, &RelationStore)],
1278        profiling: bool,
1279    ) -> Result<Executor> {
1280        let mut executor = Executor::new(provider.clone());
1281        executor.set_profiling(profiling);
1282        for (name, rel_id) in &pass.rel_ids {
1283            executor.register_relation(*rel_id, name);
1284        }
1285        for (name, schema) in &pass.schemas {
1286            executor
1287                .store_mut()
1288                .put(name, provider.create_empty_buffer(schema.clone())?);
1289        }
1290        for name in base_store.names() {
1291            if pass.schemas.contains_key(name) {
1292                let buffer = base_store.get(name).ok_or_else(|| {
1293                    XlogError::Execution(format!("WFS base relation {name} disappeared"))
1294                })?;
1295                executor
1296                    .store_mut()
1297                    .put(name, provider.clone_buffer(buffer)?);
1298            }
1299        }
1300        for &(source, fixed, source_store) in fixed_relations {
1301            let buffer =
1302                self.wfs_gpu_clone_or_empty(provider, &pass.schemas, source, source_store)?;
1303            executor.store_mut().put(fixed, buffer);
1304        }
1305        executor.execute_plan(&pass.plan)?;
1306        Ok(executor)
1307    }
1308
1309    fn wfs_gpu_clone_or_empty(
1310        &self,
1311        provider: &Arc<CudaKernelProvider>,
1312        schemas: &HashMap<String, Schema>,
1313        name: &str,
1314        store: &RelationStore,
1315    ) -> Result<CudaBuffer> {
1316        if let Some(buffer) = store.get(name) {
1317            return provider.clone_buffer(buffer);
1318        }
1319        let schema = schemas
1320            .get(name)
1321            .or_else(|| self.schemas.get(name))
1322            .ok_or_else(|| XlogError::Execution(format!("missing WFS GPU schema for {name}")))?;
1323        provider.create_empty_buffer(schema.clone())
1324    }
1325
1326    fn wfs_gpu_stores_equivalent(
1327        &self,
1328        provider: &Arc<CudaKernelProvider>,
1329        wfs: &EpistemicWfsGpuPlan,
1330        left: &RelationStore,
1331        right: &RelationStore,
1332    ) -> Result<bool> {
1333        for pred in &wfs.intensional_predicates {
1334            let left_buf = self.wfs_gpu_clone_or_empty(provider, &wfs.lower.schemas, pred, left)?;
1335            let right_buf =
1336                self.wfs_gpu_clone_or_empty(provider, &wfs.lower.schemas, pred, right)?;
1337            if !buffers_gpu_set_equivalent(provider.as_ref(), &left_buf, &right_buf)? {
1338                return Ok(false);
1339            }
1340        }
1341        Ok(true)
1342    }
1343
1344    fn ordinary_plan(&self, context: &str) -> Result<&ExecutionPlan> {
1345        match &self.plan {
1346            LogicExecutionPlan::Ordinary(plan) => Ok(plan),
1347            LogicExecutionPlan::EpistemicWfsGpu(_)
1348            | LogicExecutionPlan::EpistemicSingle(_)
1349            | LogicExecutionPlan::EpistemicSplit(_)
1350            | LogicExecutionPlan::EpistemicStratified(_) => {
1351                Err(XlogError::UnsupportedEpistemicConstruct {
1352                    construct: "epistemic high-level persistent execution".to_string(),
1353                    context: format!(
1354                        "{context} requires an ordinary RIR plan; use evaluate/evaluate_with_options \
1355                         for production epistemic GPU dispatch"
1356                    ),
1357                })
1358            }
1359        }
1360    }
1361
1362    fn evaluate_epistemic_with_executor(
1363        &self,
1364        mut executor: Executor,
1365        profiling: bool,
1366    ) -> Result<LogicEvalResult> {
1367        let mut queries = Vec::new();
1368        match &self.plan {
1369            LogicExecutionPlan::EpistemicSingle(executable) => {
1370                let result = executor.execute_epistemic_gpu_execution(
1371                    executable,
1372                    capacities_for_epistemic_executable(executable)?,
1373                )?;
1374                result.require_runtime_dispatch_certification()?;
1375                queries.extend(epistemic_result_to_query_results(
1376                    epistemic_output_relation_name(executable)?,
1377                    result,
1378                ));
1379            }
1380            LogicExecutionPlan::EpistemicSplit(split) => {
1381                let executables: Vec<_> = split
1382                    .components
1383                    .iter()
1384                    .map(|component| &component.executable)
1385                    .collect();
1386                let batch = executor.execute_epistemic_gpu_execution_batch_with_trace(
1387                    &executables,
1388                    capacities_for_epistemic_split(split)?,
1389                )?;
1390                batch
1391                    .require_trace_matches_components("xlog high-level epistemic GPU execution")?;
1392                for result in &batch.results {
1393                    result.require_runtime_dispatch_certification()?;
1394                }
1395                for (component, result) in split.components.iter().zip(batch.results) {
1396                    // A JOINT-SOLVED coalesced multi-head component yields one query
1397                    // per coupled head: the primary head from `final_output` plus
1398                    // each additional head materialized against the SAME accepted
1399                    // world view. Single-head components yield exactly one query.
1400                    queries.extend(epistemic_result_to_query_results(
1401                        epistemic_output_relation_name(&component.executable)?,
1402                        result,
1403                    ));
1404                }
1405            }
1406            LogicExecutionPlan::EpistemicStratified(strata) => {
1407                // Execute strata in topological order on the SAME executor. After
1408                // each stratum, write its GATED head output(s) into the store as
1409                // base relations so the NEXT stratum's `know`/`possible` over a
1410                // lower head reads the gated extension through the existing tuple-key
1411                // membership filter (or, once the head is a materialized base
1412                // relation, Case-A resolve-into-body) — never double-gating against
1413                // a still-modal relation.
1414                //
1415                // A head is surfaced as a user-visible query result when the source
1416                // program explicitly queries it (`?- head(...)`), regardless of
1417                // which stratum produced it; otherwise only the TOP stratum's heads
1418                // are surfaced (lower-stratum heads are intermediate, materialized
1419                // for gating only).
1420                let queried_predicates: BTreeSet<&str> = self
1421                    .program
1422                    .queries
1423                    .iter()
1424                    .map(|query| query.atom.predicate.as_str())
1425                    .collect();
1426                let stratum_count = strata.len();
1427                for (stratum_index, stratum) in strata.iter().enumerate() {
1428                    let is_last = stratum_index + 1 == stratum_count;
1429                    match &stratum.plan {
1430                        StratumPlanKind::Single(executable) => {
1431                            let result = executor.execute_epistemic_gpu_execution(
1432                                executable,
1433                                capacities_for_epistemic_executable(executable)?,
1434                            )?;
1435                            result.require_runtime_dispatch_certification()?;
1436                            let primary_head = epistemic_output_relation_name(executable)?;
1437                            Self::materialize_and_surface_epistemic_stratum_result(
1438                                &mut executor,
1439                                primary_head,
1440                                result,
1441                                is_last,
1442                                &queried_predicates,
1443                                &mut queries,
1444                            )?;
1445                        }
1446                        StratumPlanKind::Split(split) => {
1447                            let executables: Vec<_> = split
1448                                .components
1449                                .iter()
1450                                .map(|component| &component.executable)
1451                                .collect();
1452                            let batch = executor.execute_epistemic_gpu_execution_batch_with_trace(
1453                                &executables,
1454                                capacities_for_epistemic_split(split)?,
1455                            )?;
1456                            batch.require_trace_matches_components(
1457                                "xlog high-level stratified epistemic GPU execution",
1458                            )?;
1459                            for result in &batch.results {
1460                                result.require_runtime_dispatch_certification()?;
1461                            }
1462                            let primaries: Vec<String> = split
1463                                .components
1464                                .iter()
1465                                .map(|component| {
1466                                    epistemic_output_relation_name(&component.executable)
1467                                })
1468                                .collect::<Result<Vec<_>>>()?;
1469                            for (primary_head, result) in primaries.into_iter().zip(batch.results) {
1470                                Self::materialize_and_surface_epistemic_stratum_result(
1471                                    &mut executor,
1472                                    primary_head,
1473                                    result,
1474                                    is_last,
1475                                    &queried_predicates,
1476                                    &mut queries,
1477                                )?;
1478                            }
1479                        }
1480                        StratumPlanKind::Ordinary {
1481                            plan,
1482                            head_predicates,
1483                        } => {
1484                            // Case-A recursive stratum over the materialized base
1485                            // determined head: the ordinary semi-naive engine writes
1486                            // the (correctly gated) head relation into the store.
1487                            executor.execute_plan(plan)?;
1488                            for head in head_predicates {
1489                                if is_last || queried_predicates.contains(head.as_str()) {
1490                                    let buffer =
1491                                        executor.store().get(head.as_str()).ok_or_else(|| {
1492                                            XlogError::Execution(format!(
1493                                                "missing stratified ordinary stratum output \
1494                                                 relation {head}"
1495                                            ))
1496                                        })?;
1497                                    let cloned = executor.clone_store_relation(buffer)?;
1498                                    queries.push(epistemic_buffer_to_query_result(
1499                                        head.clone(),
1500                                        cloned,
1501                                    ));
1502                                }
1503                            }
1504                        }
1505                    }
1506                }
1507            }
1508            LogicExecutionPlan::EpistemicWfsGpu(_) => {
1509                unreachable!("GPU WFS plans are handled earlier")
1510            }
1511            LogicExecutionPlan::Ordinary(_) => {
1512                unreachable!("ordinary plans are handled earlier")
1513            }
1514        }
1515
1516        let total_output_rows: u64 = queries.iter().map(|q| q.buffer.num_rows()).sum();
1517        let stats = if profiling {
1518            Some(executor.execution_stats(total_output_rows))
1519        } else {
1520            None
1521        };
1522        Ok(LogicEvalResult { queries, stats })
1523    }
1524
1525    /// Materialize one epistemic stratum result's GATED head(s) into the store and
1526    /// surface them as query results when appropriate.
1527    ///
1528    /// Every gated head (primary `final_output` plus joint additional heads) is
1529    /// written to the store so higher strata can gate against it. A head is added
1530    /// to `queries` when its stratum is the TOP stratum OR the source program
1531    /// explicitly queries it.
1532    fn materialize_and_surface_epistemic_stratum_result(
1533        executor: &mut Executor,
1534        primary_head: String,
1535        result: EpistemicGpuExecutionResult,
1536        is_last: bool,
1537        queried_predicates: &BTreeSet<&str>,
1538        queries: &mut Vec<LogicQueryResult>,
1539    ) -> Result<()> {
1540        executor.materialize_epistemic_head_relation(&primary_head, &result.final_output)?;
1541        for (head, buffer) in &result.additional_head_outputs {
1542            executor.materialize_epistemic_head_relation(head, buffer)?;
1543        }
1544
1545        // Collect the heads to surface: primary + additional, filtered by
1546        // top-stratum-or-explicitly-queried.
1547        let surface_primary = is_last || queried_predicates.contains(primary_head.as_str());
1548        let additional_heads: Vec<String> = result
1549            .additional_head_outputs
1550            .iter()
1551            .map(|(head, _)| head.clone())
1552            .collect();
1553
1554        let mut all_results = epistemic_result_to_query_results(primary_head.clone(), result);
1555        all_results.retain(|query_result| {
1556            if query_result.relation_name == primary_head {
1557                surface_primary
1558            } else {
1559                is_last
1560                    || (additional_heads.contains(&query_result.relation_name)
1561                        && queried_predicates.contains(query_result.relation_name.as_str()))
1562            }
1563        });
1564        queries.extend(all_results);
1565        Ok(())
1566    }
1567
1568    fn enforce_constraints(
1569        &self,
1570        provider: &CudaKernelProvider,
1571        executor: &Executor,
1572    ) -> Result<()> {
1573        for i in 0..self.program.constraints.len() {
1574            let name = format!("__xlog_constraint_{}", i);
1575            let buf = executor.store().get(&name).ok_or_else(|| {
1576                XlogError::Execution(format!(
1577                    "Missing constraint result relation {} (compiler bug?)",
1578                    name
1579                ))
1580            })?;
1581
1582            if buf.num_rows() == 0 {
1583                continue;
1584            }
1585
1586            let rows = provider.download_column::<u32>(buf, 0).unwrap_or_default();
1587            if rows.is_empty() {
1588                continue;
1589            }
1590
1591            return Err(XlogError::Execution(format!(
1592                "Constraint {} violated: {}",
1593                i,
1594                format_constraint(&self.program.constraints[i].body)
1595            )));
1596        }
1597
1598        Ok(())
1599    }
1600}
1601
1602const DEFAULT_EPISTEMIC_MAX_MODELS_PER_REDUCTION: usize = 1024;
1603
1604fn normalize_program(program: Program) -> Result<Program> {
1605    let max_recursion = program.directives.max_recursion_depth.unwrap_or(100);
1606    let expanded = xlog_logic::expand_program_functions(&program, max_recursion)
1607        .map_err(|e| XlogError::Compilation(e.to_string()))?;
1608    let normalized = xlog_logic::normalize_meta_builtins(&expanded)?;
1609    let listed = xlog_logic::normalize_list_builtins(&normalized)?;
1610    Ok(desugar_shared_variable_epistemic_constraints(listed))
1611}
1612
1613enum WfsNegationTransform<'a> {
1614    Drop,
1615    Rename(&'a HashMap<String, String>),
1616}
1617
1618fn compile_epistemic_wfs_gpu_plan(program: &Program) -> Result<EpistemicWfsGpuPlan> {
1619    if !program.constraints.is_empty() {
1620        return Err(XlogError::UnsupportedEpistemicConstruct {
1621            construct: "GPU WFS integrity constraints".to_string(),
1622            context: "cyclic WFS execution currently supports reduced normal rules only"
1623                .to_string(),
1624        });
1625    }
1626
1627    let negated = wfs_negated_predicates(program);
1628    let upper_fixed_names = wfs_fixed_names(program, &negated, "__wfs_upper");
1629    let lower_fixed_names = wfs_fixed_names(program, &negated, "__wfs_lower");
1630
1631    let overapprox_program = wfs_transform_program(program, WfsNegationTransform::Drop)?;
1632    let lower_program =
1633        wfs_transform_program(program, WfsNegationTransform::Rename(&upper_fixed_names))?;
1634    let upper_program =
1635        wfs_transform_program(program, WfsNegationTransform::Rename(&lower_fixed_names))?;
1636
1637    Ok(EpistemicWfsGpuPlan {
1638        overapprox: compile_wfs_gpu_ordinary_plan(&overapprox_program)?,
1639        lower: compile_wfs_gpu_ordinary_plan(&lower_program)?,
1640        upper: compile_wfs_gpu_ordinary_plan(&upper_program)?,
1641        intensional_predicates: wfs_intensional_predicates(program),
1642        upper_fixed_names,
1643        lower_fixed_names,
1644        max_iterations: (program.directives.max_recursion_depth_or_default() as usize).max(1),
1645    })
1646}
1647
1648fn compile_wfs_gpu_ordinary_plan(program: &Program) -> Result<WfsGpuOrdinaryPlan> {
1649    let mut compiler = Compiler::new();
1650    let plan = compiler.compile_program(program)?;
1651    Ok(WfsGpuOrdinaryPlan {
1652        plan,
1653        schemas: compiler.schemas().clone(),
1654        rel_ids: compiler.rel_ids().clone(),
1655    })
1656}
1657
1658fn wfs_transform_program(program: &Program, negation: WfsNegationTransform<'_>) -> Result<Program> {
1659    let mut out = program.clone();
1660    out.rules = program
1661        .rules
1662        .iter()
1663        .map(|rule| {
1664            let mut rule = rule.clone();
1665            let mut body = Vec::with_capacity(rule.body.len());
1666            for lit in &rule.body {
1667                match (lit, &negation) {
1668                    (BodyLiteral::Negated(_), WfsNegationTransform::Drop) => {}
1669                    (BodyLiteral::Negated(atom), WfsNegationTransform::Rename(names)) => {
1670                        let mut atom = atom.clone();
1671                        atom.predicate = names.get(&atom.predicate).cloned().ok_or_else(|| {
1672                            XlogError::Execution(format!(
1673                                "missing WFS fixed relation name for {}",
1674                                atom.predicate
1675                            ))
1676                        })?;
1677                        body.push(BodyLiteral::Negated(atom));
1678                    }
1679                    _ => body.push(lit.clone()),
1680                }
1681            }
1682            rule.body = body;
1683            Ok(rule)
1684        })
1685        .collect::<Result<Vec<_>>>()?;
1686    if let WfsNegationTransform::Rename(names) = negation {
1687        add_wfs_fixed_predicates(&mut out, names)?;
1688    }
1689    Ok(out)
1690}
1691
1692fn add_wfs_fixed_predicates(program: &mut Program, names: &HashMap<String, String>) -> Result<()> {
1693    let existing: BTreeSet<String> = program
1694        .predicates
1695        .iter()
1696        .map(|decl| decl.name.clone())
1697        .collect();
1698    for (source, fixed) in names {
1699        if existing.contains(fixed) {
1700            return Err(XlogError::UnsupportedEpistemicConstruct {
1701                construct: "GPU WFS fixed relation name".to_string(),
1702                context: format!(
1703                    "internal fixed relation {fixed} collides with a declared predicate"
1704                ),
1705            });
1706        }
1707        let Some(decl) = program.predicates.iter().find(|decl| decl.name == *source) else {
1708            return Err(XlogError::UnsupportedEpistemicConstruct {
1709                construct: "GPU WFS fixed relation schema".to_string(),
1710                context: format!(
1711                    "negated predicate {source} has no declaration to type fixed relation {fixed}"
1712                ),
1713            });
1714        };
1715        let mut fixed_decl = decl.clone();
1716        fixed_decl.name = fixed.clone();
1717        program.predicates.push(fixed_decl);
1718    }
1719    Ok(())
1720}
1721
1722fn wfs_negated_predicates(program: &Program) -> BTreeSet<String> {
1723    program
1724        .rules
1725        .iter()
1726        .flat_map(|rule| &rule.body)
1727        .filter_map(|lit| match lit {
1728            BodyLiteral::Negated(atom) => Some(atom.predicate.clone()),
1729            _ => None,
1730        })
1731        .collect()
1732}
1733
1734fn wfs_intensional_predicates(program: &Program) -> Vec<String> {
1735    program
1736        .proper_rules()
1737        .map(|rule| rule.head.predicate.clone())
1738        .collect::<BTreeSet<_>>()
1739        .into_iter()
1740        .collect()
1741}
1742
1743fn wfs_fixed_names(
1744    program: &Program,
1745    predicates: &BTreeSet<String>,
1746    prefix: &str,
1747) -> HashMap<String, String> {
1748    let mut reserved: BTreeSet<String> = program
1749        .predicates
1750        .iter()
1751        .map(|decl| decl.name.clone())
1752        .collect();
1753    let mut names = HashMap::new();
1754    for pred in predicates {
1755        let mut candidate = format!("{prefix}_{pred}");
1756        if reserved.contains(&candidate) {
1757            let mut suffix = 0usize;
1758            loop {
1759                let suffixed = format!("{prefix}_{suffix}_{pred}");
1760                if !reserved.contains(&suffixed) {
1761                    candidate = suffixed;
1762                    break;
1763                }
1764                suffix += 1;
1765            }
1766        }
1767        reserved.insert(candidate.clone());
1768        names.insert(pred.clone(), candidate);
1769    }
1770    names
1771}
1772
1773fn wfs_plan_combined_schemas(plan: &EpistemicWfsGpuPlan) -> HashMap<String, Schema> {
1774    let mut schemas = HashMap::new();
1775    for ordinary in [&plan.overapprox, &plan.lower, &plan.upper] {
1776        for (name, schema) in &ordinary.schemas {
1777            schemas
1778                .entry(name.clone())
1779                .or_insert_with(|| schema.clone());
1780        }
1781    }
1782    schemas
1783}
1784
1785fn wfs_plan_combined_rel_ids(plan: &EpistemicWfsGpuPlan) -> Result<HashMap<String, RelId>> {
1786    let mut rel_ids = HashMap::new();
1787    for ordinary in [&plan.overapprox, &plan.lower, &plan.upper] {
1788        for (name, rel_id) in &ordinary.rel_ids {
1789            rel_ids.insert(name.clone(), *rel_id);
1790        }
1791    }
1792    Ok(rel_ids)
1793}
1794
1795fn schema_from_pred_decl(
1796    decl: &xlog_logic::ast::PredDecl,
1797    domains: &HashMap<String, ScalarType>,
1798) -> Result<Schema> {
1799    let columns = pred_columns_for_decl(decl);
1800    let resolved = columns
1801        .iter()
1802        .enumerate()
1803        .map(|(idx, column)| {
1804            let name = column.name.clone().unwrap_or_else(|| format!("c{idx}"));
1805            resolve_pred_column_type(&decl.name, idx, &column.typ, domains).map(|typ| (name, typ))
1806        })
1807        .collect::<Result<Vec<_>>>()?;
1808    Ok(Schema::new(resolved))
1809}
1810
1811fn pred_columns_for_decl(decl: &xlog_logic::ast::PredDecl) -> Vec<PredColumn> {
1812    if decl.columns.is_empty() {
1813        decl.types
1814            .iter()
1815            .cloned()
1816            .map(|typ| PredColumn { name: None, typ })
1817            .collect()
1818    } else {
1819        decl.columns.clone()
1820    }
1821}
1822
1823fn resolve_pred_column_type(
1824    predicate: &str,
1825    index: usize,
1826    typ: &TypeRef,
1827    domains: &HashMap<String, ScalarType>,
1828) -> Result<ScalarType> {
1829    match typ {
1830        TypeRef::Scalar(ty) => Ok(*ty),
1831        TypeRef::Domain(name) => domains.get(name).copied().ok_or_else(|| {
1832            XlogError::Compilation(format!(
1833                "unknown domain alias '{}' in predicate '{}' column {}",
1834                name, predicate, index
1835            ))
1836        }),
1837        TypeRef::List(_) | TypeRef::Term | TypeRef::Compound | TypeRef::PredRef => {
1838            Ok(ScalarType::U64)
1839        }
1840    }
1841}
1842
1843fn schema_from_terms(terms: &[Term]) -> Schema {
1844    let columns = terms
1845        .iter()
1846        .enumerate()
1847        .map(|(idx, term)| (format!("c{idx}"), infer_term_type(term)))
1848        .collect();
1849    Schema::new(columns)
1850}
1851
1852fn infer_term_type(term: &Term) -> ScalarType {
1853    match term {
1854        Term::Variable(_) | Term::Anonymous => ScalarType::U64,
1855        Term::Integer(value) => {
1856            if *value >= 0 && *value <= u32::MAX as i64 {
1857                ScalarType::U32
1858            } else {
1859                ScalarType::I64
1860            }
1861        }
1862        Term::Float(_) => ScalarType::F64,
1863        Term::String(_) | Term::Symbol(_) => ScalarType::Symbol,
1864        Term::List(_) | Term::Cons { .. } | Term::Compound { .. } | Term::PredRef(_) => {
1865            ScalarType::U64
1866        }
1867        Term::Aggregate(agg) => match agg.op {
1868            AggOp::Count => ScalarType::U32,
1869            AggOp::Sum => ScalarType::U64,
1870            AggOp::Min | AggOp::Max => ScalarType::U32,
1871            AggOp::LogSumExp => ScalarType::F64,
1872        },
1873    }
1874}
1875
1876/// Desugar a shared-variable epistemic constraint — a constraint with at least one
1877/// epistemic literal and a variable appearing in more than one term position across the body
1878/// (the join `:- know p(X), possible q(X).`, the diagonal `:- know p(X, X).`, or the
1879/// negated-difference `:- q(X), not know p(X).`) — into an ordinary extraction rule plus a
1880/// single-occurrence modal over it:
1881///
1882/// ```text
1883///   :- BodyLit1, BodyLit2, ..., BodyLitN.
1884///        ==> __epi_join_N(Vars) :- ord(BodyLit1), ..., ord(BodyLitN).
1885///            :- know __epi_join_N(Vars).
1886/// ```
1887///
1888/// where `ord` ordinary-izes each modal literal (`know/possible r(..)` -> `r(..)`,
1889/// `not know/possible r(..)` -> `not r(..)`) and keeps non-modal literals unchanged. For a
1890/// base/EDB or purely-ordinary-derived modal target `know r == possible r == r`, so the
1891/// ordinary join `__epi_join_N` is exactly the set of variable bindings the constraint
1892/// forbids; the single-occurrence `:- know __epi_join_N(Vars)` then routes through the
1893/// existing variable-keyed world-view constraint path, which prunes the world view to empty —
1894/// with NO new kernel. Applied at the normalization choke point so BOTH the reduced ordinary
1895/// materialization and the epistemic planner observe the helper relation (an EIR-only rewrite
1896/// is accepted at planning but the helper is never materialized).
1897///
1898/// Guarded to non-modal-derived targets (where the `know == possible == ordinary`
1899/// equivalence holds); a constraint with a modal-derived target is left unchanged and falls
1900/// through to the core compiler's existing shared-variable rejection. Single-occurrence
1901/// variable-keyed modal, distinct-variable multi-literal, and ground constraints have no
1902/// repeated variable and are likewise untouched.
1903fn desugar_shared_variable_epistemic_constraints(mut program: Program) -> Program {
1904    // A predicate defined by any rule carrying an epistemic body literal is "modal-derived":
1905    // for it `know p`/`possible p` is NOT equal to the ordinary `p`, so ordinary-izing it
1906    // would be UNSOUND. Restrict the desugaring to base/EDB or purely-ordinary-derived
1907    // targets (where `know p == possible p == p`), the case for base tuple-key targets.
1908    let modal_derived: BTreeSet<String> = program
1909        .rules
1910        .iter()
1911        .filter(|rule| {
1912            rule.body
1913                .iter()
1914                .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
1915        })
1916        .map(|rule| rule.head.predicate.clone())
1917        .collect();
1918    let mut extraction_rules: Vec<Rule> = Vec::new();
1919    let mut counter = 0usize;
1920    for constraint in &mut program.constraints {
1921        let has_epistemic = constraint
1922            .body
1923            .iter()
1924            .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)));
1925        if !has_epistemic || !constraint_has_shared_variable(&constraint.body) {
1926            continue;
1927        }
1928        // Sound only when EVERY modal target is non-modal-derived (know == possible == ord).
1929        let has_modal_derived_target = constraint.body.iter().any(|lit| {
1930            matches!(lit, BodyLiteral::Epistemic(e) if modal_derived.contains(&e.atom.predicate))
1931        });
1932        if has_modal_derived_target {
1933            continue;
1934        }
1935        let distinct = distinct_body_variables(&constraint.body);
1936        let helper = format!("__epi_join_{counter}");
1937        counter += 1;
1938        let helper_terms: Vec<Term> = distinct.iter().map(|v| Term::Variable(v.clone())).collect();
1939        let helper_body: Vec<BodyLiteral> = constraint
1940            .body
1941            .iter()
1942            .map(ordinaryize_modal_literal)
1943            .collect();
1944        extraction_rules.push(Rule {
1945            head: Atom {
1946                predicate: helper.clone(),
1947                terms: helper_terms.clone(),
1948            },
1949            body: helper_body,
1950        });
1951        // Replace the whole constraint with a single-occurrence modal over the join helper.
1952        constraint.body = vec![BodyLiteral::Epistemic(EpistemicLiteral {
1953            op: EpistemicOp::Know,
1954            negated: false,
1955            atom: Atom {
1956                predicate: helper,
1957                terms: helper_terms,
1958            },
1959        })];
1960    }
1961    program.rules.extend(extraction_rules);
1962    program
1963}
1964
1965/// Replace a modal literal with its ordinary counterpart (`know/possible r` -> `r`,
1966/// `not know/possible r` -> `not r`); non-modal literals are returned unchanged. Sound for
1967/// the shared-variable constraint desugaring when the modal target is non-modal-derived,
1968/// where `know r == possible r == r`.
1969fn ordinaryize_modal_literal(lit: &BodyLiteral) -> BodyLiteral {
1970    match lit {
1971        BodyLiteral::Epistemic(e) if e.negated => BodyLiteral::Negated(e.atom.clone()),
1972        BodyLiteral::Epistemic(e) => BodyLiteral::Positive(e.atom.clone()),
1973        other => other.clone(),
1974    }
1975}
1976
1977/// True if some variable occurs in more than one atom term position across the constraint
1978/// body — the signature of a join / diagonal / negated-difference the core compiler rejects.
1979fn constraint_has_shared_variable(body: &[BodyLiteral]) -> bool {
1980    let mut counts: std::collections::BTreeMap<String, usize> = std::collections::BTreeMap::new();
1981    for lit in body {
1982        if let Some(atom) = lit.atom() {
1983            for term in &atom.terms {
1984                if let Term::Variable(name) = term {
1985                    *counts.entry(name.clone()).or_insert(0) += 1;
1986                }
1987            }
1988        }
1989    }
1990    counts.values().any(|&count| count > 1)
1991}
1992
1993/// Ordered DISTINCT variable names appearing in atom positions across the constraint body
1994/// (first-appearance order), used as the extracted helper relation's columns.
1995fn distinct_body_variables(body: &[BodyLiteral]) -> Vec<String> {
1996    let mut seen = BTreeSet::new();
1997    let mut order = Vec::new();
1998    for lit in body {
1999        if let Some(atom) = lit.atom() {
2000            for term in &atom.terms {
2001                if let Term::Variable(name) = term {
2002                    if seen.insert(name.clone()) {
2003                        order.push(name.clone());
2004                    }
2005                }
2006            }
2007        }
2008    }
2009    order
2010}
2011
2012fn augment_same_name_multi_arity_schemas(
2013    program: &Program,
2014    schemas: &mut HashMap<String, Schema>,
2015) -> Result<()> {
2016    let arities = predicate_arities(program);
2017    let domains: HashMap<String, ScalarType> = program
2018        .domains
2019        .iter()
2020        .map(|domain| (domain.name.clone(), domain.typ))
2021        .collect();
2022
2023    for decl in &program.predicates {
2024        let Some(pred_arities) = arities.get(&decl.name) else {
2025            continue;
2026        };
2027        if pred_arities.len() <= 1 {
2028            continue;
2029        }
2030        let key = arity_qualified_name(&decl.name, pred_decl_arity(decl));
2031        schemas.insert(key, schema_from_pred_decl(decl, &domains)?);
2032    }
2033
2034    for fact in program.facts() {
2035        let pred = fact.head.predicate.as_str();
2036        let arity = fact.head.terms.len();
2037        let Some(pred_arities) = arities.get(pred) else {
2038            continue;
2039        };
2040        if pred_arities.len() <= 1 {
2041            continue;
2042        }
2043        let key = arity_qualified_name(pred, arity);
2044        schemas
2045            .entry(key)
2046            .or_insert_with(|| schema_from_terms(&fact.head.terms));
2047    }
2048
2049    for rule in &program.rules {
2050        augment_atom_schema_if_needed(&rule.head, &arities, schemas);
2051        for literal in &rule.body {
2052            match literal {
2053                BodyLiteral::Positive(atom) | BodyLiteral::Negated(atom) => {
2054                    augment_atom_schema_if_needed(atom, &arities, schemas);
2055                }
2056                BodyLiteral::Epistemic(epistemic) => {
2057                    augment_atom_schema_if_needed(&epistemic.atom, &arities, schemas);
2058                }
2059                BodyLiteral::Comparison(_) | BodyLiteral::IsExpr(_) | BodyLiteral::Univ(_) => {}
2060            }
2061        }
2062    }
2063
2064    for query in &program.queries {
2065        augment_atom_schema_if_needed(&query.atom, &arities, schemas);
2066    }
2067
2068    Ok(())
2069}
2070
2071fn augment_atom_schema_if_needed(
2072    atom: &Atom,
2073    arities: &HashMap<String, BTreeSet<usize>>,
2074    schemas: &mut HashMap<String, Schema>,
2075) {
2076    let Some(pred_arities) = arities.get(&atom.predicate) else {
2077        return;
2078    };
2079    if pred_arities.len() <= 1 {
2080        return;
2081    }
2082    let key = arity_qualified_name(&atom.predicate, atom.terms.len());
2083    schemas
2084        .entry(key)
2085        .or_insert_with(|| schema_from_terms(&atom.terms));
2086}
2087
2088fn predicate_arities(program: &Program) -> HashMap<String, BTreeSet<usize>> {
2089    let mut arities = HashMap::new();
2090    for decl in &program.predicates {
2091        add_predicate_arity(&mut arities, &decl.name, pred_decl_arity(decl));
2092    }
2093    for rule in &program.rules {
2094        add_predicate_arity(&mut arities, &rule.head.predicate, rule.head.terms.len());
2095        for literal in &rule.body {
2096            match literal {
2097                BodyLiteral::Positive(atom) | BodyLiteral::Negated(atom) => {
2098                    add_predicate_arity(&mut arities, &atom.predicate, atom.terms.len());
2099                }
2100                BodyLiteral::Epistemic(epistemic) => {
2101                    add_predicate_arity(
2102                        &mut arities,
2103                        &epistemic.atom.predicate,
2104                        epistemic.atom.terms.len(),
2105                    );
2106                }
2107                BodyLiteral::Comparison(_) | BodyLiteral::IsExpr(_) | BodyLiteral::Univ(_) => {}
2108            }
2109        }
2110    }
2111    for query in &program.queries {
2112        add_predicate_arity(&mut arities, &query.atom.predicate, query.atom.terms.len());
2113    }
2114    arities
2115}
2116
2117fn add_predicate_arity(
2118    arities: &mut HashMap<String, BTreeSet<usize>>,
2119    predicate: &str,
2120    arity: usize,
2121) {
2122    arities
2123        .entry(predicate.to_string())
2124        .or_default()
2125        .insert(arity);
2126}
2127
2128fn arity_qualified_name_if_needed(
2129    predicate: &str,
2130    arity: usize,
2131    arities: &HashMap<String, BTreeSet<usize>>,
2132) -> String {
2133    if arities.get(predicate).is_some_and(|items| items.len() > 1) {
2134        arity_qualified_name(predicate, arity)
2135    } else {
2136        predicate.to_string()
2137    }
2138}
2139
2140fn arity_qualified_name(predicate: &str, arity: usize) -> String {
2141    format!("{predicate}/{arity}")
2142}
2143
2144fn pred_decl_arity(decl: &xlog_logic::ast::PredDecl) -> usize {
2145    if decl.columns.is_empty() {
2146        decl.types.len()
2147    } else {
2148        decl.columns.len()
2149    }
2150}
2151
2152fn program_has_epistemic_literals(program: &Program) -> bool {
2153    program.rules.iter().any(|rule| {
2154        rule.body
2155            .iter()
2156            .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2157    }) || program.constraints.iter().any(|constraint| {
2158        constraint
2159            .body
2160            .iter()
2161            .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2162    })
2163}
2164
2165fn epistemic_output_head_predicate_count(program: &Program) -> usize {
2166    program
2167        .rules
2168        .iter()
2169        .filter(|rule| {
2170            rule.body
2171                .iter()
2172                .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2173        })
2174        .map(|rule| rule.head.predicate.as_str())
2175        .collect::<BTreeSet<_>>()
2176        .len()
2177}
2178
2179/// The user-visible output head predicate(s) of a stratum's epistemic-bearing
2180/// rules. For a recursive stratum (`reach :- reach, know a`) this is the recursive
2181/// head whose materialized relation is the stratum's output.
2182fn epistemic_stratum_output_heads(program: &Program) -> Vec<String> {
2183    program
2184        .rules
2185        .iter()
2186        .filter(|rule| {
2187            rule.body
2188                .iter()
2189                .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2190        })
2191        .map(|rule| rule.head.predicate.clone())
2192        .collect::<BTreeSet<_>>()
2193        .into_iter()
2194        .collect()
2195}
2196
2197fn epistemic_relation_ids(plan: &LogicExecutionPlan) -> Result<HashMap<String, RelId>> {
2198    let mut rel_ids = HashMap::new();
2199    match plan {
2200        LogicExecutionPlan::EpistemicSingle(executable) => {
2201            for (name, rel_id) in &executable.relation_ids {
2202                insert_epistemic_relation_id(&mut rel_ids, name, *rel_id)?;
2203            }
2204        }
2205        LogicExecutionPlan::EpistemicSplit(split) => {
2206            for component in &split.components {
2207                for (name, rel_id) in &component.executable.relation_ids {
2208                    insert_epistemic_relation_id(&mut rel_ids, name, *rel_id)?;
2209                }
2210            }
2211        }
2212        LogicExecutionPlan::EpistemicStratified(strata) => {
2213            for stratum in strata {
2214                match &stratum.plan {
2215                    StratumPlanKind::Single(executable) => {
2216                        for (name, rel_id) in &executable.relation_ids {
2217                            // Each stratum is a distinct sub-program compiled with a
2218                            // fresh compiler, so relation ids legitimately differ
2219                            // across strata; keep the last writer per name.
2220                            rel_ids.insert(name.clone(), *rel_id);
2221                        }
2222                    }
2223                    StratumPlanKind::Split(split) => {
2224                        for component in &split.components {
2225                            for (name, rel_id) in &component.executable.relation_ids {
2226                                rel_ids.insert(name.clone(), *rel_id);
2227                            }
2228                        }
2229                    }
2230                    // An ordinary (Case-A recursive) stratum carries no epistemic
2231                    // relation-id map; its relations are owned by its own ordinary
2232                    // RIR plan and surfaced from the store after execution.
2233                    StratumPlanKind::Ordinary { .. } => {}
2234                }
2235            }
2236        }
2237        LogicExecutionPlan::EpistemicWfsGpu(wfs) => {
2238            for plan in [&wfs.overapprox, &wfs.lower, &wfs.upper] {
2239                for (name, rel_id) in &plan.rel_ids {
2240                    rel_ids.insert(name.clone(), *rel_id);
2241                }
2242            }
2243        }
2244        LogicExecutionPlan::Ordinary(_) => {}
2245    }
2246    Ok(rel_ids)
2247}
2248
2249fn insert_epistemic_relation_id(
2250    rel_ids: &mut HashMap<String, RelId>,
2251    name: &str,
2252    rel_id: RelId,
2253) -> Result<()> {
2254    if let Some(previous) = rel_ids.insert(name.to_string(), rel_id) {
2255        if previous != rel_id {
2256            return Err(XlogError::Compilation(format!(
2257                "epistemic split components assigned conflicting relation ids for {name}: \
2258                 {previous:?} vs {rel_id:?}"
2259            )));
2260        }
2261    }
2262    Ok(())
2263}
2264
2265fn capacities_for_epistemic_executable(
2266    executable: &EpistemicExecutablePlan,
2267) -> Result<EpistemicGpuWorkspaceCapacities> {
2268    let literal_count = executable.gpu_plan.epistemic_literals.len();
2269    let max_candidates = 1usize.checked_shl(literal_count as u32).ok_or_else(|| {
2270        XlogError::UnsupportedEpistemicConstruct {
2271            construct: "epistemic GPU execution candidate generation".to_string(),
2272            context: format!("literal count {literal_count} exceeds target pointer width"),
2273        }
2274    })?;
2275    Ok(EpistemicGpuWorkspaceCapacities {
2276        max_candidates,
2277        max_worlds: 1,
2278        max_models_per_reduction: DEFAULT_EPISTEMIC_MAX_MODELS_PER_REDUCTION,
2279    })
2280}
2281
2282fn capacities_for_epistemic_split(
2283    split: &EpistemicSplitExecutablePlan,
2284) -> Result<EpistemicGpuWorkspaceCapacities> {
2285    let mut capacities = EpistemicGpuWorkspaceCapacities {
2286        max_candidates: 1,
2287        max_worlds: 1,
2288        max_models_per_reduction: DEFAULT_EPISTEMIC_MAX_MODELS_PER_REDUCTION,
2289    };
2290    for component in &split.components {
2291        let component_capacities = capacities_for_epistemic_executable(&component.executable)?;
2292        capacities.max_candidates = capacities
2293            .max_candidates
2294            .max(component_capacities.max_candidates);
2295    }
2296    Ok(capacities)
2297}
2298
2299fn epistemic_output_relation_name(executable: &EpistemicExecutablePlan) -> Result<String> {
2300    executable
2301        .gpu_plan
2302        .reductions
2303        .last()
2304        .map(|reduction| reduction.head_predicate.clone())
2305        .ok_or_else(|| XlogError::UnsupportedEpistemicConstruct {
2306            construct: "epistemic GPU reduced output".to_string(),
2307            context: "executable plan has no epistemic reductions".to_string(),
2308        })
2309}
2310
2311fn epistemic_buffer_to_query_result(relation_name: String, buffer: CudaBuffer) -> LogicQueryResult {
2312    let schema = buffer.schema();
2313    let columns = schema
2314        .columns
2315        .iter()
2316        .map(|(name, _)| name.clone())
2317        .collect();
2318    let sort_labels = schema.sort_labels().to_vec();
2319    LogicQueryResult {
2320        relation_name,
2321        columns,
2322        sort_labels,
2323        buffer,
2324    }
2325}
2326
2327/// Convert an epistemic GPU execution result into one query result per output head.
2328///
2329/// `primary_relation_name` is the primary head (from `final_output`). A
2330/// JOINT-SOLVED coalesced multi-head component also carries
2331/// `additional_head_outputs`, each materialized against the SAME accepted world
2332/// view; every coupled head becomes its own query result so `xlog run` displays
2333/// all coupled epistemic outputs.
2334fn epistemic_result_to_query_results(
2335    primary_relation_name: String,
2336    result: EpistemicGpuExecutionResult,
2337) -> Vec<LogicQueryResult> {
2338    let mut results = Vec::with_capacity(1 + result.additional_head_outputs.len());
2339    for (head, buffer) in result.additional_head_outputs {
2340        results.push(epistemic_buffer_to_query_result(head, buffer));
2341    }
2342    results.push(epistemic_buffer_to_query_result(
2343        primary_relation_name,
2344        result.final_output,
2345    ));
2346    results
2347}
2348
2349fn is_user_visible_relation(name: &str) -> bool {
2350    !name.starts_with("__")
2351}
2352
2353fn is_list_helper_relation(name: &str) -> bool {
2354    name.starts_with("__xlog_list_")
2355}
2356
2357fn logic_delta_report(
2358    stats: DeltaRecomputeStats,
2359    insert_rows: u64,
2360    delete_rows: u64,
2361) -> LogicDeltaReport {
2362    LogicDeltaReport {
2363        input_delta_count: stats.changed_relations,
2364        changed_relations: stats.changed_relations,
2365        changed_relation_names: Vec::new(),
2366        insert_rows,
2367        delete_rows,
2368        has_deletes: stats.has_deletes,
2369        affected_sccs: stats.affected_sccs,
2370        recomputed_sccs: stats.recomputed_sccs,
2371        incremental_sccs: stats.incremental_sccs,
2372        coalesced_insert_rows: insert_rows,
2373        coalesced_delete_rows: delete_rows,
2374        canceled_rows: 0,
2375        planner_telemetry: DeltaPlannerTelemetry::default(),
2376        debug_trace: Vec::new(),
2377    }
2378}
2379
2380fn delta_debug_trace(report: &LogicDeltaReport) -> Vec<String> {
2381    vec![
2382        format!("changed_relation_names={:?}", report.changed_relation_names),
2383        format!("affected_sccs={}", report.affected_sccs),
2384        format!("recomputed_sccs={}", report.recomputed_sccs),
2385        format!("incremental_sccs={}", report.incremental_sccs),
2386        format!("insert_rows={}", report.insert_rows),
2387        format!("delete_rows={}", report.delete_rows),
2388        format!(
2389            "planner_fallback_decision={}",
2390            report.planner_telemetry.fallback_decision
2391        ),
2392        format!(
2393            "estimated_delta_speedup={:?}",
2394            report.planner_telemetry.estimated_delta_speedup
2395        ),
2396    ]
2397}
2398
2399fn buffers_gpu_set_equivalent(
2400    provider: &CudaKernelProvider,
2401    left: &CudaBuffer,
2402    right: &CudaBuffer,
2403) -> Result<bool> {
2404    if left.schema() != right.schema() {
2405        return Ok(false);
2406    }
2407    let left_rows = provider.device_row_count(left)?;
2408    let right_rows = provider.device_row_count(right)?;
2409    if left_rows != right_rows {
2410        return Ok(false);
2411    }
2412
2413    let left_minus_right = provider.diff_full_row(left, right)?;
2414    if provider.device_row_count(&left_minus_right)? != 0 {
2415        return Ok(false);
2416    }
2417    let right_minus_left = provider.diff_full_row(right, left)?;
2418    Ok(provider.device_row_count(&right_minus_left)? == 0)
2419}
2420
2421fn coalesce_relation_delta_batch(
2422    provider: &CudaKernelProvider,
2423    delta_batch: Vec<(String, RelationDelta)>,
2424) -> Result<CoalescedRelationDeltaBatch> {
2425    let input_delta_count = delta_batch.len();
2426    let mut pending_by_relation: HashMap<String, PendingRelationDelta> = HashMap::new();
2427    let mut canceled_rows = 0u64;
2428
2429    for (name, delta) in delta_batch {
2430        let pending = pending_by_relation.entry(name).or_default();
2431        if let Some(insert) = delta.insert {
2432            merge_insert_delta(provider, pending, insert, &mut canceled_rows)?;
2433        }
2434        if let Some(delete) = delta.delete {
2435            merge_delete_delta(provider, pending, delete, &mut canceled_rows)?;
2436        }
2437    }
2438
2439    let mut deltas = HashMap::new();
2440    let mut coalesced_insert_rows = 0u64;
2441    let mut coalesced_delete_rows = 0u64;
2442    for (name, pending) in pending_by_relation {
2443        let insert = pending.insert.and_then(non_empty_buffer);
2444        let delete = pending.delete.and_then(non_empty_buffer);
2445        if insert.is_none() && delete.is_none() {
2446            continue;
2447        }
2448        coalesced_insert_rows += insert.as_ref().map(buffer_rows).unwrap_or(0);
2449        coalesced_delete_rows += delete.as_ref().map(buffer_rows).unwrap_or(0);
2450        deltas.insert(name, RelationDelta::new(insert, delete));
2451    }
2452
2453    let changed_relations = deltas.len();
2454    Ok(CoalescedRelationDeltaBatch {
2455        deltas,
2456        input_delta_count,
2457        changed_relations,
2458        coalesced_insert_rows,
2459        coalesced_delete_rows,
2460        canceled_rows,
2461    })
2462}
2463
2464fn merge_insert_delta(
2465    provider: &CudaKernelProvider,
2466    pending: &mut PendingRelationDelta,
2467    insert: CudaBuffer,
2468    canceled_rows: &mut u64,
2469) -> Result<()> {
2470    let mut incoming = provider.dedup_full_row(&insert)?;
2471    if let Some(delete) = pending.delete.take().and_then(non_empty_buffer) {
2472        let delete_before = buffer_rows(&delete);
2473        let delete_after = provider.diff_full_row(&delete, &incoming)?;
2474        let insert_after = provider.diff_full_row(&incoming, &delete)?;
2475        *canceled_rows += delete_before.saturating_sub(buffer_rows(&delete_after));
2476        pending.delete = non_empty_buffer(delete_after);
2477        incoming = insert_after;
2478    }
2479    pending.insert = merge_optional_buffer(provider, pending.insert.take(), incoming)?;
2480    Ok(())
2481}
2482
2483fn merge_delete_delta(
2484    provider: &CudaKernelProvider,
2485    pending: &mut PendingRelationDelta,
2486    delete: CudaBuffer,
2487    canceled_rows: &mut u64,
2488) -> Result<()> {
2489    let mut incoming = provider.dedup_full_row(&delete)?;
2490    if let Some(insert) = pending.insert.take().and_then(non_empty_buffer) {
2491        let insert_before = buffer_rows(&insert);
2492        let insert_after = provider.diff_full_row(&insert, &incoming)?;
2493        let delete_after = provider.diff_full_row(&incoming, &insert)?;
2494        *canceled_rows += insert_before.saturating_sub(buffer_rows(&insert_after));
2495        pending.insert = non_empty_buffer(insert_after);
2496        incoming = delete_after;
2497    }
2498    pending.delete = merge_optional_buffer(provider, pending.delete.take(), incoming)?;
2499    Ok(())
2500}
2501
2502fn merge_optional_buffer(
2503    provider: &CudaKernelProvider,
2504    existing: Option<CudaBuffer>,
2505    incoming: CudaBuffer,
2506) -> Result<Option<CudaBuffer>> {
2507    let Some(incoming) = non_empty_buffer(incoming) else {
2508        return Ok(existing.and_then(non_empty_buffer));
2509    };
2510    match existing.and_then(non_empty_buffer) {
2511        Some(existing) => provider
2512            .union_gpu(&existing, &incoming)
2513            .map(non_empty_buffer),
2514        None => Ok(Some(incoming)),
2515    }
2516}
2517
2518fn non_empty_buffer(buffer: CudaBuffer) -> Option<CudaBuffer> {
2519    if buffer.is_empty() {
2520        None
2521    } else {
2522        Some(buffer)
2523    }
2524}
2525
2526fn buffer_rows(buffer: &CudaBuffer) -> u64 {
2527    buffer
2528        .cached_row_count()
2529        .map(u64::from)
2530        .unwrap_or_else(|| buffer.num_rows())
2531}
2532
2533fn ensure_schema_type_compatible(expected: &Schema, actual: &Schema) -> Result<()> {
2534    if expected.arity() != actual.arity() {
2535        return Err(XlogError::Execution(format!(
2536            "Expected {} columns, got {}",
2537            expected.arity(),
2538            actual.arity()
2539        )));
2540    }
2541    for i in 0..expected.arity() {
2542        let exp = expected.column_type(i).ok_or_else(|| {
2543            XlogError::Execution(format!("Missing expected type for column {}", i))
2544        })?;
2545        let act = actual
2546            .column_type(i)
2547            .ok_or_else(|| XlogError::Execution(format!("Missing actual type for column {}", i)))?;
2548        if exp != act {
2549            return Err(XlogError::Execution(format!(
2550                "Column {} type mismatch: expected {:?}, got {:?}",
2551                i, exp, act
2552            )));
2553        }
2554    }
2555    Ok(())
2556}
2557
2558fn push_term_bytes(out: &mut Vec<u8>, term: &Term, typ: xlog_core::ScalarType) -> Result<()> {
2559    use xlog_core::symbol;
2560    use xlog_core::ScalarType;
2561
2562    match (typ, term) {
2563        (ScalarType::U32, Term::Integer(v)) => {
2564            let v = u32::try_from(*v)
2565                .map_err(|_| XlogError::Execution(format!("u32 out of range: {}", v)))?;
2566            out.extend_from_slice(&v.to_le_bytes());
2567        }
2568        (ScalarType::U64, Term::Integer(v)) => {
2569            let v = u64::try_from(*v)
2570                .map_err(|_| XlogError::Execution(format!("u64 out of range: {}", v)))?;
2571            out.extend_from_slice(&v.to_le_bytes());
2572        }
2573        (ScalarType::I32, Term::Integer(v)) => {
2574            let v = i32::try_from(*v)
2575                .map_err(|_| XlogError::Execution(format!("i32 out of range: {}", v)))?;
2576            out.extend_from_slice(&v.to_le_bytes());
2577        }
2578        (ScalarType::I64, Term::Integer(v)) => {
2579            out.extend_from_slice(&v.to_le_bytes());
2580        }
2581        (ScalarType::F32, Term::Float(v)) => {
2582            out.extend_from_slice(&(*v as f32).to_le_bytes());
2583        }
2584        (ScalarType::F64, Term::Float(v)) => {
2585            out.extend_from_slice(&v.to_le_bytes());
2586        }
2587        (ScalarType::F32, Term::Integer(v)) => {
2588            out.extend_from_slice(&(*v as f32).to_le_bytes());
2589        }
2590        (ScalarType::F64, Term::Integer(v)) => {
2591            out.extend_from_slice(&(*v as f64).to_le_bytes());
2592        }
2593        (ScalarType::Bool, Term::Integer(v)) => {
2594            let b = match *v {
2595                0 => 0u8,
2596                1 => 1u8,
2597                other => {
2598                    return Err(XlogError::Execution(format!(
2599                        "bool expects 0/1, got {}",
2600                        other
2601                    )));
2602                }
2603            };
2604            out.push(b);
2605        }
2606        (ScalarType::Bool, Term::Symbol(id)) => {
2607            let s = symbol::resolve(*id);
2608            if s == "true" || s == "false" {
2609                out.push(if s == "true" { 1u8 } else { 0u8 });
2610            } else {
2611                return Err(XlogError::Execution(format!(
2612                    "Expected boolean symbol 'true' or 'false', got '{}'",
2613                    s
2614                )));
2615            }
2616        }
2617        (ScalarType::Symbol, Term::String(s)) => {
2618            out.extend_from_slice(&symbol::intern(s).to_le_bytes());
2619        }
2620        (ScalarType::Symbol, Term::Symbol(id)) => {
2621            // Symbol is already interned, just use the ID directly
2622            out.extend_from_slice(&id.to_le_bytes());
2623        }
2624        (_, Term::Variable(v)) => {
2625            return Err(XlogError::Execution(format!(
2626                "Fact cannot contain variable {}",
2627                v
2628            )));
2629        }
2630        (_, Term::Anonymous) => {
2631            return Err(XlogError::Execution(
2632                "Fact cannot contain anonymous wildcard '_'".to_string(),
2633            ));
2634        }
2635        (_, Term::Aggregate(_)) => {
2636            return Err(XlogError::Execution(
2637                "Fact cannot contain aggregate".to_string(),
2638            ));
2639        }
2640        (expected, got) => {
2641            return Err(XlogError::Execution(format!(
2642                "Type mismatch in fact: expected {:?}, got {:?}",
2643                expected, got
2644            )));
2645        }
2646    }
2647
2648    Ok(())
2649}
2650
2651fn query_output_vars(Query { atom }: &Query) -> Vec<String> {
2652    let mut out = Vec::new();
2653    let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
2654    for term in &atom.terms {
2655        for name in term.variables() {
2656            if seen.insert(name) {
2657                out.push(name.to_string());
2658            }
2659        }
2660    }
2661    out
2662}
2663
2664fn format_term(term: &Term) -> String {
2665    match term {
2666        Term::Variable(v) => v.clone(),
2667        Term::Anonymous => "_".to_string(),
2668        Term::Integer(i) => i.to_string(),
2669        Term::Float(f) => f.to_string(),
2670        Term::String(s) => format!("{:?}", s),
2671        Term::Symbol(id) => symbol::resolve(*id),
2672        Term::List(items) => format!(
2673            "[{}]",
2674            items.iter().map(format_term).collect::<Vec<_>>().join(", ")
2675        ),
2676        Term::Cons { head, tail } => format!("[{} | {}]", format_term(head), format_term(tail)),
2677        Term::Compound { functor, args } => format!(
2678            "{}({})",
2679            functor,
2680            args.iter().map(format_term).collect::<Vec<_>>().join(", ")
2681        ),
2682        Term::PredRef(name) => format!("predref({})", name),
2683        Term::Aggregate(a) => format!("{:?}({})", a.op, a.variable),
2684    }
2685}
2686
2687fn format_constraint(body: &[BodyLiteral]) -> String {
2688    let lits = body
2689        .iter()
2690        .map(|lit| match lit {
2691            BodyLiteral::Positive(a) => {
2692                let args = a
2693                    .terms
2694                    .iter()
2695                    .map(format_term)
2696                    .collect::<Vec<_>>()
2697                    .join(", ");
2698                format!("{}({})", a.predicate, args)
2699            }
2700            BodyLiteral::Negated(a) => {
2701                let args = a
2702                    .terms
2703                    .iter()
2704                    .map(format_term)
2705                    .collect::<Vec<_>>()
2706                    .join(", ");
2707                format!("not {}({})", a.predicate, args)
2708            }
2709            BodyLiteral::Epistemic(lit) => {
2710                let args = lit
2711                    .atom
2712                    .terms
2713                    .iter()
2714                    .map(format_term)
2715                    .collect::<Vec<_>>()
2716                    .join(", ");
2717                let op = match lit.op {
2718                    xlog_logic::EpistemicOp::Know => "know",
2719                    xlog_logic::EpistemicOp::Possible => "possible",
2720                };
2721                let prefix = if lit.negated { "not " } else { "" };
2722                format!("{prefix}{op} {}({})", lit.atom.predicate, args)
2723            }
2724            BodyLiteral::Comparison(c) => format!("{:?} {:?} {:?}", c.left, c.op, c.right),
2725            BodyLiteral::IsExpr(is) => format!("{} is {:?}", is.target, is.expr),
2726            BodyLiteral::Univ(univ) => {
2727                format!(
2728                    "{} =.. {}",
2729                    format_term(&univ.term),
2730                    format_term(&univ.parts)
2731                )
2732            }
2733        })
2734        .collect::<Vec<_>>()
2735        .join(", ");
2736    format!(":- {}.", lits)
2737}
2738
2739// --------------------------------------------------------------------------- //
2740// Epistemic-plan / EIR JSON dump
2741// --------------------------------------------------------------------------- //
2742
2743fn json_escape(s: &str) -> String {
2744    let mut out = String::with_capacity(s.len() + 2);
2745    for c in s.chars() {
2746        match c {
2747            '"' => out.push_str("\\\""),
2748            '\\' => out.push_str("\\\\"),
2749            '\n' => out.push_str("\\n"),
2750            '\r' => out.push_str("\\r"),
2751            '\t' => out.push_str("\\t"),
2752            c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
2753            c => out.push(c),
2754        }
2755    }
2756    out
2757}
2758
2759/// Deterministic 64-bit FNV-1a hash of a string (stable across runs/processes,
2760/// unlike `std::hash::DefaultHasher` which is randomized). Used as the stable
2761/// epistemic plan id so two dumps of the same plan compare equal.
2762fn fnv1a_64(s: &str) -> u64 {
2763    let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
2764    for b in s.as_bytes() {
2765        hash ^= *b as u64;
2766        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
2767    }
2768    hash
2769}
2770
2771/// Extract every `know`/`possible` literal (with negation) from a program's EIR.
2772/// Used to retain epistemic provenance when a Case-A recursive reduction lowers the
2773/// program to an ordinary executable plan.
2774fn collect_eir_epistemic_literals(program: &Program) -> Vec<xlog_ir::EirEpistemicLiteral> {
2775    let mut lits = Vec::new();
2776    if let Ok(eir) = xlog_logic::build_eir(program) {
2777        for rule in &eir.rules {
2778            for lit in &rule.body {
2779                if let xlog_ir::EirBodyLiteral::Epistemic(e) = lit {
2780                    lits.push(e.clone());
2781                }
2782            }
2783        }
2784    }
2785    lits
2786}
2787
2788/// JSON summary for an epistemic source that reduced to a high-level recursive
2789/// execution plan without single-pass epistemic GPU candidate units. Case-A/B
2790/// stratified reductions use the ordinary semi-naive engine; cyclic negated-modal
2791/// reductions use the GPU-backed WFS alternating-fixpoint plan. In both cases the
2792/// modal literals are recorded and CPU fallback is zero by construction.
2793fn epistemic_provenance_summary_json(
2794    plan_kind: &str,
2795    prov: &EpistemicProvenance,
2796    max_iterations: Option<usize>,
2797    wfs: Option<&EpistemicWfsGpuPlan>,
2798) -> String {
2799    let literals = prov
2800        .literals
2801        .iter()
2802        .map(epistemic_literal_json)
2803        .collect::<Vec<_>>()
2804        .join(",");
2805    let wfs_fixed_relations = wfs
2806        .map(wfs_fixed_relations_json)
2807        .unwrap_or_else(|| "null".to_string());
2808    let wfs_convergence_predicates = wfs
2809        .map(wfs_convergence_predicates_json)
2810        .unwrap_or_else(|| "null".to_string());
2811    let wfs_gpu_passes = if wfs.is_some() {
2812        "[\"overapprox\",\"lower\",\"upper\"]"
2813    } else {
2814        "null"
2815    };
2816    let host_wfs_fallback_allowed = if wfs.is_some() { "false" } else { "null" };
2817    let body = format!(
2818        "{{\"plan_kind\":\"{}\",\"reduction\":\"{}\",\
2819\"epistemic_literals\":[{}],\"units\":[],\"max_iterations\":{},\
2820\"wfs_fixed_relations\":{},\"wfs_convergence_predicates\":{},\
2821\"wfs_gpu_passes\":{},\
2822\"host_wfs_fallback_allowed\":{},\
2823\"cpu_fallback_total_zero\":true}}",
2824        json_escape(plan_kind),
2825        json_escape(prov.reduction),
2826        literals,
2827        max_iterations
2828            .map(|value| value.to_string())
2829            .unwrap_or_else(|| "null".to_string()),
2830        wfs_fixed_relations,
2831        wfs_convergence_predicates,
2832        wfs_gpu_passes,
2833        host_wfs_fallback_allowed
2834    );
2835    let plan_id = fnv1a_64(&body);
2836    format!(
2837        "{{\"plan_id\":\"epi-{:016x}\",\"plan_kind\":\"{}\",\
2838\"reduction\":\"{}\",\"epistemic_literals\":[{}],\"units\":[],\
2839\"max_iterations\":{},\"wfs_fixed_relations\":{},\
2840\"wfs_convergence_predicates\":{},\"wfs_gpu_passes\":{},\
2841\"host_wfs_fallback_allowed\":{},\
2842\"cpu_fallback_total_zero\":true}}",
2843        plan_id,
2844        json_escape(plan_kind),
2845        json_escape(prov.reduction),
2846        literals,
2847        max_iterations
2848            .map(|value| value.to_string())
2849            .unwrap_or_else(|| "null".to_string()),
2850        wfs_fixed_relations,
2851        wfs_convergence_predicates,
2852        wfs_gpu_passes,
2853        host_wfs_fallback_allowed
2854    )
2855}
2856
2857fn wfs_fixed_relations_json(wfs: &EpistemicWfsGpuPlan) -> String {
2858    let mut sources: BTreeSet<&str> = BTreeSet::new();
2859    for source in wfs.upper_fixed_names.keys() {
2860        sources.insert(source.as_str());
2861    }
2862    for source in wfs.lower_fixed_names.keys() {
2863        sources.insert(source.as_str());
2864    }
2865    let entries = sources
2866        .into_iter()
2867        .map(|source| {
2868            let upper = wfs
2869                .upper_fixed_names
2870                .get(source)
2871                .map(String::as_str)
2872                .unwrap_or("");
2873            let lower = wfs
2874                .lower_fixed_names
2875                .get(source)
2876                .map(String::as_str)
2877                .unwrap_or("");
2878            format!(
2879                "\"{}\":{{\"upper\":\"{}\",\"lower\":\"{}\"}}",
2880                json_escape(source),
2881                json_escape(upper),
2882                json_escape(lower)
2883            )
2884        })
2885        .collect::<Vec<_>>()
2886        .join(",");
2887    format!("{{{entries}}}")
2888}
2889
2890fn wfs_convergence_predicates_json(wfs: &EpistemicWfsGpuPlan) -> String {
2891    let entries = wfs
2892        .intensional_predicates
2893        .iter()
2894        .map(|pred| format!("\"{}\"", json_escape(pred)))
2895        .collect::<Vec<_>>()
2896        .join(",");
2897    format!("[{entries}]")
2898}
2899
2900fn epistemic_literal_json(lit: &xlog_ir::EirEpistemicLiteral) -> String {
2901    let op = match lit.op {
2902        xlog_ir::EirEpistemicOp::Know => "know",
2903        xlog_ir::EirEpistemicOp::Possible => "possible",
2904    };
2905    format!(
2906        "{{\"op\":\"{}\",\"negated\":{},\"predicate\":\"{}\",\"arity\":{}}}",
2907        op,
2908        lit.negated,
2909        json_escape(&lit.atom.predicate),
2910        lit.atom.arity
2911    )
2912}
2913
2914fn epistemic_gpu_plan_json(plan: &xlog_ir::EpistemicGpuPlan) -> String {
2915    let mode = match plan.mode {
2916        xlog_ir::EirEpistemicMode::G91 => "g91",
2917        xlog_ir::EirEpistemicMode::Faeel => "faeel",
2918    };
2919    let literals = plan
2920        .epistemic_literals
2921        .iter()
2922        .map(epistemic_literal_json)
2923        .collect::<Vec<_>>()
2924        .join(",");
2925    let phases = plan
2926        .required_phases
2927        .iter()
2928        .map(|p| format!("\"{:?}\"", p))
2929        .collect::<Vec<_>>()
2930        .join(",");
2931    let kernels = plan
2932        .required_kernel_phases
2933        .iter()
2934        .map(|p| format!("\"{:?}\"", p))
2935        .collect::<Vec<_>>()
2936        .join(",");
2937    let constraints = plan
2938        .constraints
2939        .iter()
2940        .map(|c| {
2941            let idx = c
2942                .literal_indices
2943                .iter()
2944                .map(|i| i.to_string())
2945                .collect::<Vec<_>>()
2946                .join(",");
2947            format!(
2948                "{{\"constraint_index\":{},\"literal_indices\":[{}]}}",
2949                c.constraint_index, idx
2950            )
2951        })
2952        .collect::<Vec<_>>()
2953        .join(",");
2954    let reductions = plan
2955        .reductions
2956        .iter()
2957        .map(|r| {
2958            format!(
2959                "{{\"rule_index\":{},\"head\":\"{}\",\"public_head_arity\":{},\"relational_body_atoms\":{}}}",
2960                r.rule_index,
2961                json_escape(&r.head_predicate),
2962                r.public_head_arity,
2963                r.relational_body_atoms
2964            )
2965        })
2966        .collect::<Vec<_>>()
2967        .join(",");
2968    let f = &plan.cpu_fallbacks;
2969    format!(
2970        "{{\"mode\":\"{}\",\"epistemic_literals\":[{}],\"required_phases\":[{}],\
2971\"required_kernel_phases\":[{}],\"constraints\":[{}],\"reductions\":[{}],\
2972\"cpu_fallbacks\":{{\"candidate_enumeration\":{},\"world_view_validation\":{},\
2973\"solver_search\":{},\"probabilistic_recompute\":{}}},\"cpu_fallback_is_zero\":{}}}",
2974        mode,
2975        literals,
2976        phases,
2977        kernels,
2978        constraints,
2979        reductions,
2980        f.candidate_enumeration,
2981        f.world_view_validation,
2982        f.solver_search,
2983        f.probabilistic_recompute,
2984        f.is_zero()
2985    )
2986}
2987
2988fn epistemic_plan_summary_json(
2989    plan_kind: &str,
2990    gpu_plans: &[(String, &xlog_ir::EpistemicGpuPlan)],
2991) -> String {
2992    let units = gpu_plans
2993        .iter()
2994        .map(|(label, plan)| {
2995            format!(
2996                "{{\"unit\":\"{}\",\"plan\":{}}}",
2997                json_escape(label),
2998                epistemic_gpu_plan_json(plan)
2999            )
3000        })
3001        .collect::<Vec<_>>()
3002        .join(",");
3003    let all_zero = gpu_plans
3004        .iter()
3005        .all(|(_, plan)| plan.cpu_fallbacks.is_zero());
3006    // Canonical body (without the id) hashed for the stable plan id.
3007    let body = format!(
3008        "{{\"plan_kind\":\"{}\",\"units\":[{}],\"cpu_fallback_total_zero\":{}}}",
3009        json_escape(plan_kind),
3010        units,
3011        all_zero
3012    );
3013    let plan_id = fnv1a_64(&body);
3014    format!(
3015        "{{\"plan_id\":\"epi-{:016x}\",\"plan_kind\":\"{}\",\"units\":[{}],\"cpu_fallback_total_zero\":{}}}",
3016        plan_id,
3017        json_escape(plan_kind),
3018        units,
3019        all_zero
3020    )
3021}
3022
3023#[cfg(test)]
3024mod v086_delta_coalesce_tests {
3025    use super::*;
3026    use std::collections::HashMap;
3027    use std::sync::Arc;
3028
3029    use xlog_core::{MemoryBudget, ScalarType};
3030    use xlog_cuda::{CudaDevice, GpuMemoryManager};
3031
3032    fn test_provider() -> Option<Arc<CudaKernelProvider>> {
3033        let device = Arc::new(CudaDevice::new(0).ok()?);
3034        let budget = MemoryBudget::with_limit(1024 * 1024 * 1024);
3035        let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
3036        Some(Arc::new(CudaKernelProvider::new(device, memory).ok()?))
3037    }
3038
3039    fn test_buffer(provider: &CudaKernelProvider, rows: &[u32]) -> CudaBuffer {
3040        let schema = Schema::new(vec![("id".to_string(), ScalarType::U32)]);
3041        let bytes: Vec<u8> = rows.iter().flat_map(|v| v.to_le_bytes()).collect();
3042        let mut col = provider.memory().alloc::<u8>(bytes.len()).expect("alloc");
3043        provider
3044            .device()
3045            .inner()
3046            .htod_sync_copy_into(&bytes, &mut col)
3047            .expect("upload rows");
3048        let mut d_num_rows = provider.memory().alloc::<u32>(1).expect("alloc rows");
3049        let row_count = rows.len() as u32;
3050        provider
3051            .device()
3052            .inner()
3053            .htod_sync_copy_into(&[row_count], &mut d_num_rows)
3054            .expect("upload row count");
3055        CudaBuffer::from_columns(vec![col.into()], rows.len() as u64, d_num_rows, schema)
3056    }
3057
3058    fn read_u32(provider: &CudaKernelProvider, buffer: &CudaBuffer) -> Vec<u32> {
3059        provider
3060            .download_column::<u32>(buffer, 0)
3061            .expect("download")
3062    }
3063
3064    fn sorted_query_rows(provider: &CudaKernelProvider, result: &LogicEvalResult) -> Vec<u32> {
3065        let mut rows = read_u32(provider, &result.queries[0].buffer);
3066        rows.sort_unstable();
3067        rows
3068    }
3069
3070    #[test]
3071    fn coalesce_batch_cancels_insert_delete_pairs_on_device() {
3072        let provider = match test_provider() {
3073            Some(provider) => provider,
3074            None => {
3075                eprintln!("Skipping test: no CUDA device available");
3076                return;
3077            }
3078        };
3079
3080        let batch = vec![
3081            (
3082                "external_consumer_commit".to_string(),
3083                RelationDelta::new(Some(test_buffer(&provider, &[7, 8])), None),
3084            ),
3085            (
3086                "external_consumer_commit".to_string(),
3087                RelationDelta::new(None, Some(test_buffer(&provider, &[8]))),
3088            ),
3089            (
3090                "external_consumer_commit".to_string(),
3091                RelationDelta::new(Some(test_buffer(&provider, &[9])), None),
3092            ),
3093        ];
3094
3095        let report = coalesce_relation_delta_batch(provider.as_ref(), batch)
3096            .expect("coalesce relation delta batch");
3097        let delta = report
3098            .deltas
3099            .get("external_consumer_commit")
3100            .expect("coalesced relation");
3101        let insert = delta.insert.as_ref().expect("coalesced insert");
3102        assert_eq!(read_u32(&provider, insert), vec![7, 9]);
3103        assert!(delta.delete.as_ref().map(|b| b.is_empty()).unwrap_or(true));
3104        assert_eq!(report.input_delta_count, 3);
3105        assert_eq!(report.changed_relations, 1);
3106        assert_eq!(report.coalesced_insert_rows, 2);
3107        assert_eq!(report.coalesced_delete_rows, 0);
3108        assert_eq!(report.canceled_rows, 1);
3109    }
3110
3111    #[test]
3112    fn relation_delta_batch_updates_runtime_store_and_reports_coalesced_counts() -> Result<()> {
3113        let Some(provider) = test_provider() else {
3114            eprintln!("Skipping test: no CUDA device available");
3115            return Ok(());
3116        };
3117
3118        let source = r#"
3119            pred external_consumer_commit(u32).
3120            pred out(u32).
3121
3122            out(X) :- external_consumer_commit(X).
3123
3124            ?- out(X).
3125        "#;
3126        let program = LogicProgram::compile(source)?;
3127        let mut coalesced_store = program.create_relation_store(provider.clone())?;
3128        let mut coalesced_cache = None;
3129
3130        provider.reset_host_transfer_stats();
3131        provider.reset_d2h_transfer_count();
3132        let report = program.apply_relation_delta_batch(
3133            provider.clone(),
3134            &mut coalesced_store,
3135            &mut coalesced_cache,
3136            vec![
3137                (
3138                    "external_consumer_commit".to_string(),
3139                    RelationDelta::new(Some(test_buffer(&provider, &[1, 2, 3])), None),
3140                ),
3141                (
3142                    "external_consumer_commit".to_string(),
3143                    RelationDelta::new(None, Some(test_buffer(&provider, &[2]))),
3144                ),
3145                (
3146                    "external_consumer_commit".to_string(),
3147                    RelationDelta::new(Some(test_buffer(&provider, &[4])), None),
3148                ),
3149            ],
3150        )?;
3151        let transfer_stats = provider.host_transfer_stats();
3152
3153        assert_eq!(report.input_delta_count, 3);
3154        assert_eq!(report.changed_relations, 1);
3155        assert_eq!(report.insert_rows, 3);
3156        assert_eq!(report.delete_rows, 0);
3157        assert_eq!(report.coalesced_insert_rows, 3);
3158        assert_eq!(report.coalesced_delete_rows, 0);
3159        assert_eq!(report.canceled_rows, 1);
3160        assert_eq!(transfer_stats.dtoh_bytes, 0);
3161        assert_eq!(transfer_stats.dtoh_calls, 0);
3162        assert_eq!(provider.d2h_transfer_count(), 0);
3163
3164        let coalesced = program.evaluate_cached_relation_store(
3165            provider.clone(),
3166            coalesced_cache
3167                .as_ref()
3168                .expect("cached store after delta batch"),
3169        )?;
3170        let coalesced_rows = sorted_query_rows(&provider, &coalesced);
3171
3172        let mut sequential_store = program.create_relation_store(provider.clone())?;
3173        let mut sequential_cache = None;
3174        for delta in [
3175            RelationDelta::new(Some(test_buffer(&provider, &[1, 2, 3])), None),
3176            RelationDelta::new(None, Some(test_buffer(&provider, &[2]))),
3177            RelationDelta::new(Some(test_buffer(&provider, &[4])), None),
3178        ] {
3179            program.apply_relation_deltas(
3180                provider.clone(),
3181                &mut sequential_store,
3182                &mut sequential_cache,
3183                HashMap::from([("external_consumer_commit".to_string(), delta)]),
3184            )?;
3185        }
3186        let sequential = program.evaluate_cached_relation_store(
3187            provider.clone(),
3188            sequential_cache
3189                .as_ref()
3190                .expect("cached store after sequential deltas"),
3191        )?;
3192        let sequential_rows = sorted_query_rows(&provider, &sequential);
3193
3194        let mut replacement_store = program.create_relation_store(provider.clone())?;
3195        replacement_store.put(
3196            "external_consumer_commit",
3197            test_buffer(&provider, &[1, 3, 4]),
3198        );
3199        let replacement =
3200            program.evaluate_with_relation_store(provider.clone(), &replacement_store, false)?;
3201        let replacement_rows = sorted_query_rows(&provider, &replacement);
3202
3203        assert_eq!(coalesced_rows, vec![1, 3, 4]);
3204        assert_eq!(coalesced_rows, sequential_rows);
3205        assert_eq!(coalesced_rows, replacement_rows);
3206        Ok(())
3207    }
3208}