1use std::collections::{BTreeSet, HashMap};
4use std::sync::Arc;
5
6use xlog_core::{symbol, RelId, Result, ScalarType, Schema, XlogError};
7use xlog_cuda::{CudaBuffer, CudaKernelProvider};
8use xlog_ir::{EpistemicExecutablePlan, ExecutionPlan};
9use xlog_logic::ast::{AggOp, PredColumn, TypeRef};
10use xlog_logic::epistemic::{
11 compile_epistemic_gpu_execution, compile_epistemic_gpu_split_execution,
12 reduce_epistemic_program_to_ordinary,
13 reduce_epistemic_program_to_ordinary_for_stratified_schema,
14 try_plan_stratified_epistemic_program, try_reduce_case_a_recursive_epistemic_program,
15 EpistemicSplitExecutablePlan,
16};
17use xlog_logic::{
18 Atom, BodyLiteral, Compiler, EpistemicLiteral, EpistemicOp, Program, Query, Rule, Term,
19};
20use xlog_runtime::executor::JoinIndexCacheStats;
21use xlog_runtime::{
22 DeltaRecomputeStats, EpistemicGpuExecutionResult, EpistemicGpuWorkspaceCapacities,
23 ExecutionStats, Executor, RelationDelta, RelationStore,
24};
25
26pub struct LogicQueryResult {
28 pub relation_name: String,
30 pub columns: Vec<String>,
32 pub sort_labels: Vec<String>,
34 pub buffer: CudaBuffer,
36}
37
38pub struct LogicEvalResult {
40 pub queries: Vec<LogicQueryResult>,
42 pub stats: Option<ExecutionStats>,
44}
45
46pub struct LogicSessionRuntime {
48 executor: Executor,
49 profiling: bool,
50}
51
52impl LogicSessionRuntime {
53 pub fn join_index_cache_stats(&self) -> JoinIndexCacheStats {
55 self.executor.join_index_cache_stats()
56 }
57
58 pub fn wcoj_dispatch_stats(&self) -> WcojDispatchStats {
60 WcojDispatchStats {
61 free_join_dispatch_count: self.executor.free_join_dispatch_count(),
62 factorized_delta_dispatch_count: self.executor.factorized_delta_dispatch_count(),
63 wcoj_groupby_fusion_dispatch_count: self.executor.wcoj_groupby_fusion_dispatch_count(),
64 wcoj_error_decline_count: self.executor.wcoj_error_decline_count(),
65 }
66 }
67}
68
69#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
72pub struct WcojDispatchStats {
73 pub free_join_dispatch_count: u64,
75 pub factorized_delta_dispatch_count: u64,
78 pub wcoj_groupby_fusion_dispatch_count: u64,
80 pub wcoj_error_decline_count: u64,
82}
83
84#[derive(Clone, Debug, Default, PartialEq)]
86pub struct DeltaPlannerTelemetry {
87 pub cache_reused: bool,
89 pub fallback_decision: String,
91 pub affected_sccs: usize,
93 pub recomputed_sccs: usize,
95 pub incremental_sccs: usize,
97 pub estimated_delta_speedup: Option<f64>,
99 pub measured_delta_speedup: Option<f64>,
101 pub planner_advice: Vec<String>,
103}
104
105impl DeltaPlannerTelemetry {
106 pub fn from_delta_report(
108 report: &LogicDeltaReport,
109 cache_reused: bool,
110 measured_micros: Option<(u64, u64)>,
111 ) -> Self {
112 let fallback_decision = if report.affected_sccs == 0 {
113 "no_op"
114 } else if report.has_deletes || report.recomputed_sccs > 0 {
115 "full_recompute_fallback"
116 } else {
117 "incremental"
118 }
119 .to_string();
120 let estimated_delta_speedup = if report.affected_sccs > 0 {
121 Some((report.affected_sccs.max(1) as f64) / (report.incremental_sccs.max(1) as f64))
122 } else {
123 None
124 };
125 let measured_delta_speedup = measured_micros.and_then(|(delta_us, full_us)| {
126 if delta_us == 0 {
127 None
128 } else {
129 Some(full_us as f64 / delta_us as f64)
130 }
131 });
132
133 let mut planner_advice = Vec::new();
134 if fallback_decision == "full_recompute_fallback" {
135 planner_advice.push(
136 "full recompute fallback selected; inspect deletes or affected SCC fanout"
137 .to_string(),
138 );
139 } else if let Some(speedup) = measured_delta_speedup {
140 if speedup >= 1.0 {
141 planner_advice.push(format!("delta path is faster by {speedup:.2}x"));
142 } else {
143 planner_advice.push(format!(
144 "full recompute may be faster; delta measured {speedup:.2}x"
145 ));
146 }
147 } else if fallback_decision == "incremental" {
148 planner_advice.push(
149 "incremental delta path selected; run equivalence timing to measure speedup"
150 .to_string(),
151 );
152 }
153
154 Self {
155 cache_reused,
156 fallback_decision,
157 affected_sccs: report.affected_sccs,
158 recomputed_sccs: report.recomputed_sccs,
159 incremental_sccs: report.incremental_sccs,
160 estimated_delta_speedup,
161 measured_delta_speedup,
162 planner_advice,
163 }
164 }
165}
166
167pub struct LogicDeltaReport {
169 pub input_delta_count: usize,
171 pub changed_relations: usize,
173 pub changed_relation_names: Vec<String>,
175 pub insert_rows: u64,
177 pub delete_rows: u64,
179 pub has_deletes: bool,
181 pub affected_sccs: usize,
183 pub recomputed_sccs: usize,
185 pub incremental_sccs: usize,
187 pub coalesced_insert_rows: u64,
189 pub coalesced_delete_rows: u64,
191 pub canceled_rows: u64,
193 pub planner_telemetry: DeltaPlannerTelemetry,
195 pub debug_trace: Vec<String>,
197}
198
199struct CoalescedRelationDeltaBatch {
200 deltas: HashMap<String, RelationDelta>,
201 input_delta_count: usize,
202 changed_relations: usize,
203 coalesced_insert_rows: u64,
204 coalesced_delete_rows: u64,
205 canceled_rows: u64,
206}
207
208#[derive(Default)]
209struct PendingRelationDelta {
210 insert: Option<CudaBuffer>,
211 delete: Option<CudaBuffer>,
212}
213
214#[derive(Clone)]
222struct StratumExecutable {
223 plan: StratumPlanKind,
226}
227
228#[derive(Clone)]
229enum StratumPlanKind {
230 Single(EpistemicExecutablePlan),
231 Split(EpistemicSplitExecutablePlan),
232 Ordinary {
240 plan: ExecutionPlan,
241 head_predicates: Vec<String>,
243 },
244}
245
246#[derive(Clone)]
247enum LogicExecutionPlan {
248 Ordinary(ExecutionPlan),
249 EpistemicWfsGpu(EpistemicWfsGpuPlan),
250 EpistemicSingle(EpistemicExecutablePlan),
251 EpistemicSplit(EpistemicSplitExecutablePlan),
252 EpistemicStratified(Vec<StratumExecutable>),
255}
256
257#[derive(Clone)]
258struct EpistemicWfsGpuPlan {
259 overapprox: WfsGpuOrdinaryPlan,
260 lower: WfsGpuOrdinaryPlan,
261 upper: WfsGpuOrdinaryPlan,
262 intensional_predicates: Vec<String>,
263 upper_fixed_names: HashMap<String, String>,
264 lower_fixed_names: HashMap<String, String>,
265 max_iterations: usize,
266}
267
268#[derive(Clone)]
269struct WfsGpuOrdinaryPlan {
270 plan: ExecutionPlan,
271 schemas: HashMap<String, Schema>,
272 rel_ids: HashMap<String, RelId>,
273}
274
275#[derive(Clone)]
281struct EpistemicProvenance {
282 reduction: &'static str,
284 literals: Vec<xlog_ir::EirEpistemicLiteral>,
286}
287
288#[derive(Clone)]
290pub struct LogicProgram {
291 program: Program,
292 plan: LogicExecutionPlan,
293 schemas: HashMap<String, Schema>,
294 rel_ids: HashMap<String, RelId>,
295 epistemic_provenance: Option<EpistemicProvenance>,
298}
299
300impl LogicProgram {
301 pub fn compile(source: &str) -> Result<Self> {
303 let program = xlog_logic::parse_program(source)?;
304 let normalized = normalize_program(program)?;
305 Self::compile_normalized_program(normalized)
306 }
307
308 fn compile_normalized_program(normalized: Program) -> Result<Self> {
309 if program_has_epistemic_literals(&normalized) {
310 return Self::compile_epistemic_program(normalized);
311 }
312 let mut compiler = Compiler::new();
313 let plan = compiler.compile_program(&normalized)?;
314 Ok(Self {
315 program: normalized,
316 plan: LogicExecutionPlan::Ordinary(plan),
317 schemas: compiler.schemas().clone(),
318 rel_ids: compiler.rel_ids().clone(),
319 epistemic_provenance: None,
320 })
321 }
322
323 fn compile_epistemic_program(normalized: Program) -> Result<Self> {
324 let provenance_literals = collect_eir_epistemic_literals(&normalized);
329 if let Some(stratified) = try_plan_stratified_epistemic_program(&normalized)? {
343 let reduced = reduce_epistemic_program_to_ordinary_for_stratified_schema(&normalized);
354 let mut schema_compiler = Compiler::new();
355 schema_compiler.compile_program(&reduced)?;
356 let mut schemas = schema_compiler.schemas().clone();
357 augment_same_name_multi_arity_schemas(&normalized, &mut schemas)?;
358
359 let mut strata = Vec::with_capacity(stratified.strata.len());
360 for stratum in &stratified.strata {
361 strata.push(StratumExecutable {
362 plan: Self::compile_stratum_plan(&stratum.program)?,
363 });
364 }
365 let plan = LogicExecutionPlan::EpistemicStratified(strata);
366 let rel_ids = epistemic_relation_ids(&plan)?;
367 return Ok(Self {
368 program: normalized,
369 plan,
370 schemas,
371 rel_ids,
372 epistemic_provenance: Some(EpistemicProvenance {
373 reduction: "stratified",
374 literals: provenance_literals,
375 }),
376 });
377 }
378
379 if let Some(case_a_reduced) = try_reduce_case_a_recursive_epistemic_program(&normalized)? {
385 let strat = xlog_logic::stratify::analyze_stratification(&case_a_reduced);
386 if !strat.non_monotone_sccs.is_empty() {
387 let wfs_plan = compile_epistemic_wfs_gpu_plan(&case_a_reduced)?;
388 let schemas = wfs_plan_combined_schemas(&wfs_plan);
389 let rel_ids = wfs_plan_combined_rel_ids(&wfs_plan)?;
390 return Ok(Self {
391 program: case_a_reduced,
392 plan: LogicExecutionPlan::EpistemicWfsGpu(wfs_plan),
393 schemas,
394 rel_ids,
395 epistemic_provenance: Some(EpistemicProvenance {
396 reduction: "wfs_gpu_recursive",
397 literals: provenance_literals,
398 }),
399 });
400 }
401 let mut compiler = Compiler::new();
402 let plan = compiler.compile_program(&case_a_reduced)?;
403 return Ok(Self {
404 program: case_a_reduced,
405 plan: LogicExecutionPlan::Ordinary(plan),
406 schemas: compiler.schemas().clone(),
407 rel_ids: compiler.rel_ids().clone(),
408 epistemic_provenance: Some(EpistemicProvenance {
409 reduction: "ordinary_recursive_modal_reduction",
410 literals: provenance_literals,
411 }),
412 });
413 }
414
415 let reduced = reduce_epistemic_program_to_ordinary(&normalized);
416 let mut schema_compiler = Compiler::new();
417 schema_compiler.compile_program(&reduced)?;
418 let mut schemas = schema_compiler.schemas().clone();
419 augment_same_name_multi_arity_schemas(&normalized, &mut schemas)?;
420
421 let plan = if epistemic_output_head_predicate_count(&normalized) > 1 {
422 LogicExecutionPlan::EpistemicSplit(compile_epistemic_gpu_split_execution(&normalized)?)
423 } else {
424 match compile_epistemic_gpu_execution(&normalized) {
425 Ok(executable) => LogicExecutionPlan::EpistemicSingle(executable),
426 Err(XlogError::UnsupportedEpistemicConstruct { construct, .. })
427 if construct == "epistemic GPU final output relation" =>
428 {
429 LogicExecutionPlan::EpistemicSplit(compile_epistemic_gpu_split_execution(
430 &normalized,
431 )?)
432 }
433 Err(err) => return Err(err),
434 }
435 };
436 let rel_ids = epistemic_relation_ids(&plan)?;
437
438 Ok(Self {
439 program: normalized,
440 plan,
441 schemas,
442 rel_ids,
443 epistemic_provenance: Some(EpistemicProvenance {
444 reduction: "epistemic_executable",
445 literals: provenance_literals,
446 }),
447 })
448 }
449
450 fn compile_stratum_plan(stratum_program: &Program) -> Result<StratumPlanKind> {
459 if let Some(case_a_reduced) =
460 try_reduce_case_a_recursive_epistemic_program(stratum_program)?
461 {
462 let mut compiler = Compiler::new();
463 let plan = compiler.compile_program(&case_a_reduced)?;
464 let head_predicates = epistemic_stratum_output_heads(stratum_program);
465 return Ok(StratumPlanKind::Ordinary {
466 plan,
467 head_predicates,
468 });
469 }
470 if epistemic_output_head_predicate_count(stratum_program) > 1 {
471 Ok(StratumPlanKind::Split(
472 compile_epistemic_gpu_split_execution(stratum_program)?,
473 ))
474 } else {
475 Ok(StratumPlanKind::Single(compile_epistemic_gpu_execution(
476 stratum_program,
477 )?))
478 }
479 }
480
481 pub fn compile_with_resolver(
493 source: &str,
494 resolver: &xlog_logic::resolver::ModuleResolver,
495 ) -> Result<Self> {
496 let program = xlog_logic::parse_program(source)?;
497
498 let merged = resolver
500 .merge_imports(program)
501 .map_err(|e| XlogError::Compilation(format!("Module resolution failed: {}", e)))?;
502
503 let normalized = normalize_program(merged)?;
504 Self::compile_normalized_program(normalized)
505 }
506
507 pub fn epistemic_plan_json(&self) -> Option<String> {
519 let gpu_plans: Vec<(String, &xlog_ir::EpistemicGpuPlan)> = match &self.plan {
520 LogicExecutionPlan::Ordinary(_) => {
527 let prov = self.epistemic_provenance.as_ref()?;
528 return Some(epistemic_provenance_summary_json(
529 "epistemic_reduced_ordinary",
530 prov,
531 None,
532 None,
533 ));
534 }
535 LogicExecutionPlan::EpistemicWfsGpu(wfs) => {
536 let prov = self.epistemic_provenance.as_ref()?;
537 return Some(epistemic_provenance_summary_json(
538 self.plan_kind_label(),
539 prov,
540 Some(wfs.max_iterations),
541 Some(wfs),
542 ));
543 }
544 LogicExecutionPlan::EpistemicSingle(plan) => {
545 vec![("single".to_string(), &plan.gpu_plan)]
546 }
547 LogicExecutionPlan::EpistemicSplit(split) => split
548 .components
549 .iter()
550 .enumerate()
551 .map(|(i, c)| (format!("split[{i}]"), &c.executable.gpu_plan))
552 .collect(),
553 LogicExecutionPlan::EpistemicStratified(strata) => {
554 let mut plans = Vec::new();
555 for (i, stratum) in strata.iter().enumerate() {
556 match &stratum.plan {
557 StratumPlanKind::Single(plan) => {
558 plans.push((format!("stratum[{i}]"), &plan.gpu_plan));
559 }
560 StratumPlanKind::Split(split) => {
561 for (j, c) in split.components.iter().enumerate() {
562 plans.push((
563 format!("stratum[{i}].split[{j}]"),
564 &c.executable.gpu_plan,
565 ));
566 }
567 }
568 StratumPlanKind::Ordinary { .. } => {}
572 }
573 }
574 plans
575 }
576 };
577 Some(epistemic_plan_summary_json(
578 self.plan_kind_label(),
579 &gpu_plans,
580 ))
581 }
582
583 fn plan_kind_label(&self) -> &'static str {
584 match &self.plan {
585 LogicExecutionPlan::Ordinary(_) => "ordinary",
586 LogicExecutionPlan::EpistemicWfsGpu(_) => "epistemic_wfs_gpu",
587 LogicExecutionPlan::EpistemicSingle(_) => "epistemic_single",
588 LogicExecutionPlan::EpistemicSplit(_) => "epistemic_split",
589 LogicExecutionPlan::EpistemicStratified(_) => "epistemic_stratified",
590 }
591 }
592
593 pub fn schema(&self, relation: &str) -> Option<&Schema> {
595 self.schemas.get(relation)
596 }
597
598 pub fn schemas(&self) -> &HashMap<String, Schema> {
600 &self.schemas
601 }
602
603 pub fn rule_provenance(&self) -> Vec<xlog_logic::RuleProvenance> {
605 xlog_logic::rule_provenance(&self.program, None)
606 }
607
608 pub fn proof_traces(&self) -> Vec<xlog_logic::QueryProofTrace> {
610 let provenance = self.rule_provenance();
611 xlog_logic::query_proof_traces(&self.program, &provenance)
612 }
613
614 pub fn create_relation_store(
616 &self,
617 provider: Arc<CudaKernelProvider>,
618 ) -> Result<RelationStore> {
619 let mut store = RelationStore::new(provider.clone());
620 for (name, schema) in &self.schemas {
621 if is_user_visible_relation(name) || is_list_helper_relation(name) {
622 store.put(name, provider.create_empty_buffer(schema.clone())?);
623 }
624 }
625 self.load_facts_into_store(provider.as_ref(), &mut store)?;
626 Ok(store)
627 }
628
629 pub fn evaluate_with_relation_store(
635 &self,
636 provider: Arc<CudaKernelProvider>,
637 relation_store: &RelationStore,
638 profiling: bool,
639 ) -> Result<LogicEvalResult> {
640 let (result, _) =
641 self.evaluate_with_relation_store_and_cache(provider, relation_store, profiling)?;
642 Ok(result)
643 }
644
645 pub fn evaluate_with_relation_store_and_cache(
647 &self,
648 provider: Arc<CudaKernelProvider>,
649 relation_store: &RelationStore,
650 profiling: bool,
651 ) -> Result<(LogicEvalResult, RelationStore)> {
652 let mut executor =
653 self.executor_from_relation_store(provider.clone(), relation_store, profiling)?;
654 executor.execute_plan(self.ordinary_plan("relation-store evaluation")?)?;
655 self.enforce_constraints(&provider, &executor)?;
656
657 let total_output_rows = self.total_query_rows(executor.store())?;
658 let stats = if profiling {
659 Some(executor.execution_stats(total_output_rows))
660 } else {
661 None
662 };
663
664 let cached_store = self.clone_relation_store(&provider, executor.store())?;
665 let result = self.logic_result_from_store(provider.as_ref(), &cached_store, stats)?;
666 Ok((result, cached_store))
667 }
668
669 pub fn create_session_runtime(
671 &self,
672 provider: Arc<CudaKernelProvider>,
673 relation_store: &RelationStore,
674 profiling: bool,
675 ) -> Result<LogicSessionRuntime> {
676 self.ordinary_plan("persistent relation session")?;
677 Ok(LogicSessionRuntime {
678 executor: self.executor_from_relation_store(provider, relation_store, profiling)?,
679 profiling,
680 })
681 }
682
683 pub fn evaluate_with_session_runtime(
685 &self,
686 provider: Arc<CudaKernelProvider>,
687 runtime: &mut LogicSessionRuntime,
688 ) -> Result<(LogicEvalResult, RelationStore)> {
689 runtime.executor.set_profiling(runtime.profiling);
690 runtime
691 .executor
692 .execute_plan(self.ordinary_plan("session runtime evaluation")?)?;
693 self.enforce_constraints(&provider, &runtime.executor)?;
694
695 let total_output_rows = self.total_query_rows(runtime.executor.store())?;
696 let stats = if runtime.profiling {
697 Some(runtime.executor.execution_stats(total_output_rows))
698 } else {
699 None
700 };
701
702 let cached_store = self.clone_relation_store(&provider, runtime.executor.store())?;
703 let result = self.logic_result_from_store(provider.as_ref(), &cached_store, stats)?;
704 Ok((result, cached_store))
705 }
706
707 pub fn evaluate_cached_relation_store(
709 &self,
710 provider: Arc<CudaKernelProvider>,
711 relation_store: &RelationStore,
712 ) -> Result<LogicEvalResult> {
713 self.logic_result_from_store(provider.as_ref(), relation_store, None)
714 }
715
716 pub fn apply_relation_deltas(
718 &self,
719 provider: Arc<CudaKernelProvider>,
720 relation_store: &mut RelationStore,
721 cached_store: &mut Option<RelationStore>,
722 deltas: HashMap<String, RelationDelta>,
723 ) -> Result<LogicDeltaReport> {
724 let insert_rows = deltas
725 .values()
726 .filter_map(|d| d.insert.as_ref())
727 .map(|b| b.num_rows())
728 .sum();
729 let delete_rows = deltas
730 .values()
731 .filter_map(|d| d.delete.as_ref())
732 .map(|b| b.num_rows())
733 .sum();
734 let cache_reused = cached_store.is_some();
735 let mut changed_relation_names = deltas.keys().cloned().collect::<Vec<_>>();
736 changed_relation_names.sort();
737
738 if cached_store.is_none() {
739 let (_, store) = self.evaluate_with_relation_store_and_cache(
740 provider.clone(),
741 relation_store,
742 false,
743 )?;
744 *cached_store = Some(store);
745 }
746
747 let store_before_delta = cached_store.as_ref().ok_or_else(|| {
748 XlogError::Execution("Missing cached relation store for delta update".to_string())
749 })?;
750 let mut executor =
751 self.executor_from_relation_store(provider.clone(), store_before_delta, false)?;
752 let delta_stats = executor
753 .apply_deltas_and_recompute(self.ordinary_plan("relation-delta recompute")?, &deltas)?;
754 self.enforce_constraints(&provider, &executor)?;
755
756 for name in deltas.keys() {
757 let updated = executor.store().get(name).ok_or_else(|| {
758 XlogError::Execution(format!(
759 "Delta relation {} missing after runtime recompute",
760 name
761 ))
762 })?;
763 relation_store.put(name, provider.clone_buffer(updated)?);
764 }
765
766 *cached_store = Some(self.clone_relation_store(&provider, executor.store())?);
767
768 let mut report = logic_delta_report(delta_stats, insert_rows, delete_rows);
769 report.changed_relation_names = changed_relation_names;
770 report.planner_telemetry =
771 DeltaPlannerTelemetry::from_delta_report(&report, cache_reused, None);
772 report.debug_trace = delta_debug_trace(&report);
773 Ok(report)
774 }
775
776 pub fn apply_relation_deltas_with_session_runtime(
778 &self,
779 provider: Arc<CudaKernelProvider>,
780 relation_store: &mut RelationStore,
781 cached_store: &mut Option<RelationStore>,
782 session_runtime: &mut Option<LogicSessionRuntime>,
783 deltas: HashMap<String, RelationDelta>,
784 ) -> Result<LogicDeltaReport> {
785 let insert_rows = deltas
786 .values()
787 .filter_map(|d| d.insert.as_ref())
788 .map(|b| b.num_rows())
789 .sum();
790 let delete_rows = deltas
791 .values()
792 .filter_map(|d| d.delete.as_ref())
793 .map(|b| b.num_rows())
794 .sum();
795 let cache_reused = session_runtime.is_some() || cached_store.is_some();
796 let mut changed_relation_names = deltas.keys().cloned().collect::<Vec<_>>();
797 changed_relation_names.sort();
798
799 if session_runtime.is_none() {
800 let seed_store: &RelationStore = match cached_store.as_ref() {
801 Some(store) => store,
802 None => &*relation_store,
803 };
804 *session_runtime =
805 Some(self.create_session_runtime(provider.clone(), seed_store, false)?);
806 }
807
808 if cached_store.is_none() {
809 let runtime = session_runtime.as_mut().ok_or_else(|| {
810 XlogError::Execution("Missing session runtime for cached evaluation".to_string())
811 })?;
812 let (_, store) = self.evaluate_with_session_runtime(provider.clone(), runtime)?;
813 *cached_store = Some(store);
814 }
815
816 let runtime = session_runtime.as_mut().ok_or_else(|| {
817 XlogError::Execution("Missing session runtime for delta update".to_string())
818 })?;
819 let delta_stats = runtime.executor.apply_deltas_and_recompute(
820 self.ordinary_plan("session relation-delta recompute")?,
821 &deltas,
822 )?;
823 self.enforce_constraints(&provider, &runtime.executor)?;
824
825 for name in deltas.keys() {
826 let updated = runtime.executor.store().get(name).ok_or_else(|| {
827 XlogError::Execution(format!(
828 "Delta relation {} missing after runtime recompute",
829 name
830 ))
831 })?;
832 relation_store.put(name, provider.clone_buffer(updated)?);
833 }
834
835 *cached_store = Some(self.clone_relation_store(&provider, runtime.executor.store())?);
836
837 let mut report = logic_delta_report(delta_stats, insert_rows, delete_rows);
838 report.changed_relation_names = changed_relation_names;
839 report.planner_telemetry =
840 DeltaPlannerTelemetry::from_delta_report(&report, cache_reused, None);
841 report.debug_trace = delta_debug_trace(&report);
842 Ok(report)
843 }
844
845 pub fn apply_relation_delta_batch(
847 &self,
848 provider: Arc<CudaKernelProvider>,
849 relation_store: &mut RelationStore,
850 cached_store: &mut Option<RelationStore>,
851 delta_batch: Vec<(String, RelationDelta)>,
852 ) -> Result<LogicDeltaReport> {
853 let coalesced = coalesce_relation_delta_batch(provider.as_ref(), delta_batch)?;
854 if coalesced.deltas.is_empty() {
855 return Ok(LogicDeltaReport {
856 input_delta_count: coalesced.input_delta_count,
857 changed_relations: 0,
858 changed_relation_names: Vec::new(),
859 insert_rows: 0,
860 delete_rows: 0,
861 has_deletes: false,
862 affected_sccs: 0,
863 recomputed_sccs: 0,
864 incremental_sccs: 0,
865 coalesced_insert_rows: 0,
866 coalesced_delete_rows: 0,
867 canceled_rows: coalesced.canceled_rows,
868 planner_telemetry: DeltaPlannerTelemetry {
869 fallback_decision: "no_op".to_string(),
870 ..DeltaPlannerTelemetry::default()
871 },
872 debug_trace: vec![format!("canceled_rows={}", coalesced.canceled_rows)],
873 });
874 }
875
876 let mut report =
877 self.apply_relation_deltas(provider, relation_store, cached_store, coalesced.deltas)?;
878 report.input_delta_count = coalesced.input_delta_count;
879 report.changed_relations = coalesced.changed_relations;
880 report.coalesced_insert_rows = coalesced.coalesced_insert_rows;
881 report.coalesced_delete_rows = coalesced.coalesced_delete_rows;
882 report.canceled_rows = coalesced.canceled_rows;
883 report.planner_telemetry = DeltaPlannerTelemetry::from_delta_report(&report, true, None);
884 report.debug_trace = delta_debug_trace(&report);
885 Ok(report)
886 }
887
888 pub fn apply_relation_delta_batch_with_session_runtime(
890 &self,
891 provider: Arc<CudaKernelProvider>,
892 relation_store: &mut RelationStore,
893 cached_store: &mut Option<RelationStore>,
894 session_runtime: &mut Option<LogicSessionRuntime>,
895 delta_batch: Vec<(String, RelationDelta)>,
896 ) -> Result<LogicDeltaReport> {
897 let coalesced = coalesce_relation_delta_batch(provider.as_ref(), delta_batch)?;
898 if coalesced.deltas.is_empty() {
899 return Ok(LogicDeltaReport {
900 input_delta_count: coalesced.input_delta_count,
901 changed_relations: 0,
902 changed_relation_names: Vec::new(),
903 insert_rows: 0,
904 delete_rows: 0,
905 has_deletes: false,
906 affected_sccs: 0,
907 recomputed_sccs: 0,
908 incremental_sccs: 0,
909 coalesced_insert_rows: 0,
910 coalesced_delete_rows: 0,
911 canceled_rows: coalesced.canceled_rows,
912 planner_telemetry: DeltaPlannerTelemetry {
913 fallback_decision: "no_op".to_string(),
914 ..DeltaPlannerTelemetry::default()
915 },
916 debug_trace: vec![format!("canceled_rows={}", coalesced.canceled_rows)],
917 });
918 }
919
920 let mut report = self.apply_relation_deltas_with_session_runtime(
921 provider,
922 relation_store,
923 cached_store,
924 session_runtime,
925 coalesced.deltas,
926 )?;
927 report.input_delta_count = coalesced.input_delta_count;
928 report.changed_relations = coalesced.changed_relations;
929 report.coalesced_insert_rows = coalesced.coalesced_insert_rows;
930 report.coalesced_delete_rows = coalesced.coalesced_delete_rows;
931 report.canceled_rows = coalesced.canceled_rows;
932 report.planner_telemetry = DeltaPlannerTelemetry::from_delta_report(&report, true, None);
933 report.debug_trace = delta_debug_trace(&report);
934 Ok(report)
935 }
936
937 pub fn evaluate(
939 &self,
940 provider: Arc<CudaKernelProvider>,
941 inputs: HashMap<String, CudaBuffer>,
942 ) -> Result<LogicEvalResult> {
943 self.evaluate_with_options(provider, inputs, false)
944 }
945
946 pub fn evaluate_with_options(
953 &self,
954 provider: Arc<CudaKernelProvider>,
955 inputs: HashMap<String, CudaBuffer>,
956 profiling: bool,
957 ) -> Result<LogicEvalResult> {
958 let mut executor = Executor::new(provider.clone());
959 executor.set_profiling(profiling);
960 for (name, rel_id) in &self.rel_ids {
961 executor.register_relation(*rel_id, name);
962 }
963
964 for (name, schema) in &self.schemas {
965 executor
966 .store_mut()
967 .put(name, provider.create_empty_buffer(schema.clone())?);
968 }
969
970 for (name, buffer) in inputs {
971 let schema = self.schemas.get(&name).ok_or_else(|| {
972 XlogError::Execution(format!(
973 "Input relation {} not declared in program schemas",
974 name
975 ))
976 })?;
977 ensure_schema_type_compatible(schema, buffer.schema()).map_err(|e| {
978 XlogError::Execution(format!("Input relation {} schema mismatch: {}", name, e))
979 })?;
980 executor.store_mut().put(&name, buffer);
981 }
982
983 self.load_facts(&provider, &mut executor)?;
984
985 if let LogicExecutionPlan::EpistemicWfsGpu(wfs_plan) = &self.plan {
986 return self.evaluate_wfs_gpu_program(provider, executor, wfs_plan, profiling);
987 }
988
989 let LogicExecutionPlan::Ordinary(plan) = &self.plan else {
990 return self.evaluate_epistemic_with_executor(executor, profiling);
991 };
992
993 executor.execute_plan(plan)?;
994
995 self.enforce_constraints(&provider, &executor)?;
996
997 let mut queries: Vec<LogicQueryResult> = Vec::with_capacity(self.program.queries.len());
998 for (i, query) in self.program.queries.iter().enumerate() {
999 let relation_name = format!("__xlog_query_{}", i);
1000 let buffer = executor.store_mut().remove(&relation_name).ok_or_else(|| {
1001 XlogError::Execution(format!(
1002 "Missing query result relation {} (compiler bug?)",
1003 relation_name
1004 ))
1005 })?;
1006
1007 let columns = query_output_vars(query);
1008 queries.push(LogicQueryResult {
1009 relation_name,
1010 sort_labels: columns.clone(),
1011 columns,
1012 buffer,
1013 });
1014 }
1015
1016 let total_output_rows: u64 = queries.iter().map(|q| q.buffer.num_rows()).sum();
1018 let stats = if profiling {
1019 Some(executor.execution_stats(total_output_rows))
1020 } else {
1021 None
1022 };
1023
1024 Ok(LogicEvalResult { queries, stats })
1025 }
1026
1027 pub fn relation_stores_query_equivalent(
1029 &self,
1030 provider: &CudaKernelProvider,
1031 left: &RelationStore,
1032 right: &RelationStore,
1033 ) -> Result<bool> {
1034 for idx in 0..self.program.queries.len() {
1035 let name = format!("__xlog_query_{}", idx);
1036 let Some(left_buffer) = left.get(&name) else {
1037 return Ok(false);
1038 };
1039 let Some(right_buffer) = right.get(&name) else {
1040 return Ok(false);
1041 };
1042 if !buffers_gpu_set_equivalent(provider, left_buffer, right_buffer)? {
1043 return Ok(false);
1044 }
1045 }
1046 Ok(true)
1047 }
1048
1049 fn executor_from_relation_store(
1050 &self,
1051 provider: Arc<CudaKernelProvider>,
1052 relation_store: &RelationStore,
1053 profiling: bool,
1054 ) -> Result<Executor> {
1055 let mut executor = Executor::new(provider.clone());
1056 executor.set_profiling(profiling);
1057 for (name, rel_id) in &self.rel_ids {
1058 executor.register_relation(*rel_id, name);
1059 }
1060
1061 for (name, schema) in &self.schemas {
1062 executor
1063 .store_mut()
1064 .put(name, provider.create_empty_buffer(schema.clone())?);
1065 }
1066
1067 for name in relation_store.names() {
1068 let buffer = relation_store.get(name).ok_or_else(|| {
1069 XlogError::Execution(format!(
1070 "Persistent relation {} disappeared during evaluation",
1071 name
1072 ))
1073 })?;
1074 let schema = self.schemas.get(name).ok_or_else(|| {
1075 XlogError::Execution(format!(
1076 "Persistent relation {} not declared in program schemas",
1077 name
1078 ))
1079 })?;
1080 ensure_schema_type_compatible(schema, buffer.schema()).map_err(|e| {
1081 XlogError::Execution(format!(
1082 "Persistent relation {} schema mismatch: {}",
1083 name, e
1084 ))
1085 })?;
1086 executor
1087 .store_mut()
1088 .put(name, provider.clone_buffer(buffer)?);
1089 }
1090
1091 Ok(executor)
1092 }
1093
1094 fn clone_relation_store(
1095 &self,
1096 provider: &Arc<CudaKernelProvider>,
1097 source: &RelationStore,
1098 ) -> Result<RelationStore> {
1099 let mut cloned = RelationStore::new(provider.clone());
1100 for name in source.names() {
1101 let buffer = source.get(name).ok_or_else(|| {
1102 XlogError::Execution(format!("Relation {} disappeared during clone", name))
1103 })?;
1104 cloned.put(name, provider.clone_buffer(buffer)?);
1105 }
1106 Ok(cloned)
1107 }
1108
1109 fn total_query_rows(&self, store: &RelationStore) -> Result<u64> {
1110 let mut total = 0;
1111 for i in 0..self.program.queries.len() {
1112 let relation_name = format!("__xlog_query_{}", i);
1113 let buffer = store.get(&relation_name).ok_or_else(|| {
1114 XlogError::Execution(format!(
1115 "Missing query result relation {} (compiler bug?)",
1116 relation_name
1117 ))
1118 })?;
1119 total += buffer.num_rows();
1120 }
1121 Ok(total)
1122 }
1123
1124 fn logic_result_from_store(
1125 &self,
1126 provider: &CudaKernelProvider,
1127 store: &RelationStore,
1128 stats: Option<ExecutionStats>,
1129 ) -> Result<LogicEvalResult> {
1130 let mut queries: Vec<LogicQueryResult> = Vec::with_capacity(self.program.queries.len());
1131 for (i, query) in self.program.queries.iter().enumerate() {
1132 let relation_name = format!("__xlog_query_{}", i);
1133 let buffer = store.get(&relation_name).ok_or_else(|| {
1134 XlogError::Execution(format!(
1135 "Missing query result relation {} (compiler bug?)",
1136 relation_name
1137 ))
1138 })?;
1139
1140 let columns = query_output_vars(query);
1141 queries.push(LogicQueryResult {
1142 relation_name,
1143 sort_labels: columns.clone(),
1144 columns,
1145 buffer: provider.clone_buffer(buffer)?,
1146 });
1147 }
1148
1149 Ok(LogicEvalResult { queries, stats })
1150 }
1151
1152 fn load_facts(&self, provider: &CudaKernelProvider, executor: &mut Executor) -> Result<()> {
1153 self.load_facts_into_store(provider, executor.store_mut())
1154 }
1155
1156 fn load_facts_into_store(
1157 &self,
1158 provider: &CudaKernelProvider,
1159 store: &mut RelationStore,
1160 ) -> Result<()> {
1161 let arities = predicate_arities(&self.program);
1162 let mut rows_by_pred: HashMap<String, Vec<&[Term]>> = HashMap::new();
1163 for fact in self.program.facts() {
1164 let pred = fact.head.predicate.as_str();
1165 let arity = fact.head.terms.len();
1166 let key = arity_qualified_name_if_needed(pred, arity, &arities);
1167 rows_by_pred.entry(key).or_default().push(&fact.head.terms);
1168 }
1169
1170 for (pred, rows) in rows_by_pred {
1171 let schema = self.schemas.get(pred.as_str()).ok_or_else(|| {
1172 XlogError::Execution(format!(
1173 "Missing inferred schema for fact predicate {}",
1174 pred
1175 ))
1176 })?;
1177
1178 if rows.iter().any(|r| r.len() != schema.arity()) {
1179 return Err(XlogError::Execution(format!(
1180 "Fact arity mismatch for {} (expected {} columns)",
1181 pred,
1182 schema.arity()
1183 )));
1184 }
1185
1186 let mut columns: Vec<Vec<u8>> = vec![Vec::new(); schema.arity()];
1187 for row in rows {
1188 for (col_idx, term) in row.iter().enumerate() {
1189 let typ = schema.column_type(col_idx).ok_or_else(|| {
1190 XlogError::Execution(format!("Missing type for column {}", col_idx))
1191 })?;
1192 push_term_bytes(&mut columns[col_idx], term, typ)?;
1193 }
1194 }
1195
1196 let fact_buf = if schema.arity() == 0 {
1197 provider.create_zero_arity_buffer(schema.clone(), 1)?
1203 } else {
1204 let slices: Vec<&[u8]> = columns.iter().map(|c| c.as_slice()).collect();
1205 provider.create_buffer_from_slices(&slices, schema.clone())?
1206 };
1207
1208 let existing = store.get(&pred).ok_or_else(|| {
1209 XlogError::Execution(format!(
1210 "Missing base relation {} while loading facts",
1211 pred
1212 ))
1213 })?;
1214
1215 let merged = provider.union(existing, &fact_buf)?;
1216 store.put(pred.as_str(), merged);
1217 }
1218
1219 Ok(())
1220 }
1221
1222 fn evaluate_wfs_gpu_program(
1223 &self,
1224 provider: Arc<CudaKernelProvider>,
1225 base_executor: Executor,
1226 wfs: &EpistemicWfsGpuPlan,
1227 profiling: bool,
1228 ) -> Result<LogicEvalResult> {
1229 let base_store = self.clone_relation_store(&provider, base_executor.store())?;
1230 let upper_executor =
1231 self.run_wfs_gpu_pass(&provider, &wfs.overapprox, &base_store, &[], profiling)?;
1232 let mut upper_store = self.clone_relation_store(&provider, upper_executor.store())?;
1233 let mut lower_store = self.clone_relation_store(&provider, &base_store)?;
1234
1235 for _ in 0..wfs.max_iterations {
1236 let upper_fixed: Vec<_> = wfs
1237 .upper_fixed_names
1238 .iter()
1239 .map(|(source, fixed)| (source.as_str(), fixed.as_str(), &upper_store))
1240 .collect();
1241 let lower_executor =
1242 self.run_wfs_gpu_pass(&provider, &wfs.lower, &base_store, &upper_fixed, profiling)?;
1243 let next_lower = self.clone_relation_store(&provider, lower_executor.store())?;
1244
1245 let lower_fixed: Vec<_> = wfs
1246 .lower_fixed_names
1247 .iter()
1248 .map(|(source, fixed)| (source.as_str(), fixed.as_str(), &next_lower))
1249 .collect();
1250 let next_upper_executor =
1251 self.run_wfs_gpu_pass(&provider, &wfs.upper, &base_store, &lower_fixed, profiling)?;
1252 let next_upper = self.clone_relation_store(&provider, next_upper_executor.store())?;
1253
1254 let lower_converged =
1255 self.wfs_gpu_stores_equivalent(&provider, wfs, &lower_store, &next_lower)?;
1256 let upper_converged =
1257 self.wfs_gpu_stores_equivalent(&provider, wfs, &upper_store, &next_upper)?;
1258 lower_store = next_lower;
1259 upper_store = next_upper;
1260 if lower_converged && upper_converged {
1261 return self.logic_result_from_store(provider.as_ref(), &lower_store, None);
1262 }
1263 }
1264
1265 Err(XlogError::ResourceExhausted {
1266 context: "GPU-backed WFS alternating fixpoint iterations".to_string(),
1267 estimated_bytes: wfs.max_iterations as u64,
1268 budget_bytes: wfs.max_iterations as u64,
1269 })
1270 }
1271
1272 fn run_wfs_gpu_pass(
1273 &self,
1274 provider: &Arc<CudaKernelProvider>,
1275 pass: &WfsGpuOrdinaryPlan,
1276 base_store: &RelationStore,
1277 fixed_relations: &[(&str, &str, &RelationStore)],
1278 profiling: bool,
1279 ) -> Result<Executor> {
1280 let mut executor = Executor::new(provider.clone());
1281 executor.set_profiling(profiling);
1282 for (name, rel_id) in &pass.rel_ids {
1283 executor.register_relation(*rel_id, name);
1284 }
1285 for (name, schema) in &pass.schemas {
1286 executor
1287 .store_mut()
1288 .put(name, provider.create_empty_buffer(schema.clone())?);
1289 }
1290 for name in base_store.names() {
1291 if pass.schemas.contains_key(name) {
1292 let buffer = base_store.get(name).ok_or_else(|| {
1293 XlogError::Execution(format!("WFS base relation {name} disappeared"))
1294 })?;
1295 executor
1296 .store_mut()
1297 .put(name, provider.clone_buffer(buffer)?);
1298 }
1299 }
1300 for &(source, fixed, source_store) in fixed_relations {
1301 let buffer =
1302 self.wfs_gpu_clone_or_empty(provider, &pass.schemas, source, source_store)?;
1303 executor.store_mut().put(fixed, buffer);
1304 }
1305 executor.execute_plan(&pass.plan)?;
1306 Ok(executor)
1307 }
1308
1309 fn wfs_gpu_clone_or_empty(
1310 &self,
1311 provider: &Arc<CudaKernelProvider>,
1312 schemas: &HashMap<String, Schema>,
1313 name: &str,
1314 store: &RelationStore,
1315 ) -> Result<CudaBuffer> {
1316 if let Some(buffer) = store.get(name) {
1317 return provider.clone_buffer(buffer);
1318 }
1319 let schema = schemas
1320 .get(name)
1321 .or_else(|| self.schemas.get(name))
1322 .ok_or_else(|| XlogError::Execution(format!("missing WFS GPU schema for {name}")))?;
1323 provider.create_empty_buffer(schema.clone())
1324 }
1325
1326 fn wfs_gpu_stores_equivalent(
1327 &self,
1328 provider: &Arc<CudaKernelProvider>,
1329 wfs: &EpistemicWfsGpuPlan,
1330 left: &RelationStore,
1331 right: &RelationStore,
1332 ) -> Result<bool> {
1333 for pred in &wfs.intensional_predicates {
1334 let left_buf = self.wfs_gpu_clone_or_empty(provider, &wfs.lower.schemas, pred, left)?;
1335 let right_buf =
1336 self.wfs_gpu_clone_or_empty(provider, &wfs.lower.schemas, pred, right)?;
1337 if !buffers_gpu_set_equivalent(provider.as_ref(), &left_buf, &right_buf)? {
1338 return Ok(false);
1339 }
1340 }
1341 Ok(true)
1342 }
1343
1344 fn ordinary_plan(&self, context: &str) -> Result<&ExecutionPlan> {
1345 match &self.plan {
1346 LogicExecutionPlan::Ordinary(plan) => Ok(plan),
1347 LogicExecutionPlan::EpistemicWfsGpu(_)
1348 | LogicExecutionPlan::EpistemicSingle(_)
1349 | LogicExecutionPlan::EpistemicSplit(_)
1350 | LogicExecutionPlan::EpistemicStratified(_) => {
1351 Err(XlogError::UnsupportedEpistemicConstruct {
1352 construct: "epistemic high-level persistent execution".to_string(),
1353 context: format!(
1354 "{context} requires an ordinary RIR plan; use evaluate/evaluate_with_options \
1355 for production epistemic GPU dispatch"
1356 ),
1357 })
1358 }
1359 }
1360 }
1361
1362 fn evaluate_epistemic_with_executor(
1363 &self,
1364 mut executor: Executor,
1365 profiling: bool,
1366 ) -> Result<LogicEvalResult> {
1367 let mut queries = Vec::new();
1368 match &self.plan {
1369 LogicExecutionPlan::EpistemicSingle(executable) => {
1370 let result = executor.execute_epistemic_gpu_execution(
1371 executable,
1372 capacities_for_epistemic_executable(executable)?,
1373 )?;
1374 result.require_runtime_dispatch_certification()?;
1375 queries.extend(epistemic_result_to_query_results(
1376 epistemic_output_relation_name(executable)?,
1377 result,
1378 ));
1379 }
1380 LogicExecutionPlan::EpistemicSplit(split) => {
1381 let executables: Vec<_> = split
1382 .components
1383 .iter()
1384 .map(|component| &component.executable)
1385 .collect();
1386 let batch = executor.execute_epistemic_gpu_execution_batch_with_trace(
1387 &executables,
1388 capacities_for_epistemic_split(split)?,
1389 )?;
1390 batch
1391 .require_trace_matches_components("xlog high-level epistemic GPU execution")?;
1392 for result in &batch.results {
1393 result.require_runtime_dispatch_certification()?;
1394 }
1395 for (component, result) in split.components.iter().zip(batch.results) {
1396 queries.extend(epistemic_result_to_query_results(
1401 epistemic_output_relation_name(&component.executable)?,
1402 result,
1403 ));
1404 }
1405 }
1406 LogicExecutionPlan::EpistemicStratified(strata) => {
1407 let queried_predicates: BTreeSet<&str> = self
1421 .program
1422 .queries
1423 .iter()
1424 .map(|query| query.atom.predicate.as_str())
1425 .collect();
1426 let stratum_count = strata.len();
1427 for (stratum_index, stratum) in strata.iter().enumerate() {
1428 let is_last = stratum_index + 1 == stratum_count;
1429 match &stratum.plan {
1430 StratumPlanKind::Single(executable) => {
1431 let result = executor.execute_epistemic_gpu_execution(
1432 executable,
1433 capacities_for_epistemic_executable(executable)?,
1434 )?;
1435 result.require_runtime_dispatch_certification()?;
1436 let primary_head = epistemic_output_relation_name(executable)?;
1437 Self::materialize_and_surface_epistemic_stratum_result(
1438 &mut executor,
1439 primary_head,
1440 result,
1441 is_last,
1442 &queried_predicates,
1443 &mut queries,
1444 )?;
1445 }
1446 StratumPlanKind::Split(split) => {
1447 let executables: Vec<_> = split
1448 .components
1449 .iter()
1450 .map(|component| &component.executable)
1451 .collect();
1452 let batch = executor.execute_epistemic_gpu_execution_batch_with_trace(
1453 &executables,
1454 capacities_for_epistemic_split(split)?,
1455 )?;
1456 batch.require_trace_matches_components(
1457 "xlog high-level stratified epistemic GPU execution",
1458 )?;
1459 for result in &batch.results {
1460 result.require_runtime_dispatch_certification()?;
1461 }
1462 let primaries: Vec<String> = split
1463 .components
1464 .iter()
1465 .map(|component| {
1466 epistemic_output_relation_name(&component.executable)
1467 })
1468 .collect::<Result<Vec<_>>>()?;
1469 for (primary_head, result) in primaries.into_iter().zip(batch.results) {
1470 Self::materialize_and_surface_epistemic_stratum_result(
1471 &mut executor,
1472 primary_head,
1473 result,
1474 is_last,
1475 &queried_predicates,
1476 &mut queries,
1477 )?;
1478 }
1479 }
1480 StratumPlanKind::Ordinary {
1481 plan,
1482 head_predicates,
1483 } => {
1484 executor.execute_plan(plan)?;
1488 for head in head_predicates {
1489 if is_last || queried_predicates.contains(head.as_str()) {
1490 let buffer =
1491 executor.store().get(head.as_str()).ok_or_else(|| {
1492 XlogError::Execution(format!(
1493 "missing stratified ordinary stratum output \
1494 relation {head}"
1495 ))
1496 })?;
1497 let cloned = executor.clone_store_relation(buffer)?;
1498 queries.push(epistemic_buffer_to_query_result(
1499 head.clone(),
1500 cloned,
1501 ));
1502 }
1503 }
1504 }
1505 }
1506 }
1507 }
1508 LogicExecutionPlan::EpistemicWfsGpu(_) => {
1509 unreachable!("GPU WFS plans are handled earlier")
1510 }
1511 LogicExecutionPlan::Ordinary(_) => {
1512 unreachable!("ordinary plans are handled earlier")
1513 }
1514 }
1515
1516 let total_output_rows: u64 = queries.iter().map(|q| q.buffer.num_rows()).sum();
1517 let stats = if profiling {
1518 Some(executor.execution_stats(total_output_rows))
1519 } else {
1520 None
1521 };
1522 Ok(LogicEvalResult { queries, stats })
1523 }
1524
1525 fn materialize_and_surface_epistemic_stratum_result(
1533 executor: &mut Executor,
1534 primary_head: String,
1535 result: EpistemicGpuExecutionResult,
1536 is_last: bool,
1537 queried_predicates: &BTreeSet<&str>,
1538 queries: &mut Vec<LogicQueryResult>,
1539 ) -> Result<()> {
1540 executor.materialize_epistemic_head_relation(&primary_head, &result.final_output)?;
1541 for (head, buffer) in &result.additional_head_outputs {
1542 executor.materialize_epistemic_head_relation(head, buffer)?;
1543 }
1544
1545 let surface_primary = is_last || queried_predicates.contains(primary_head.as_str());
1548 let additional_heads: Vec<String> = result
1549 .additional_head_outputs
1550 .iter()
1551 .map(|(head, _)| head.clone())
1552 .collect();
1553
1554 let mut all_results = epistemic_result_to_query_results(primary_head.clone(), result);
1555 all_results.retain(|query_result| {
1556 if query_result.relation_name == primary_head {
1557 surface_primary
1558 } else {
1559 is_last
1560 || (additional_heads.contains(&query_result.relation_name)
1561 && queried_predicates.contains(query_result.relation_name.as_str()))
1562 }
1563 });
1564 queries.extend(all_results);
1565 Ok(())
1566 }
1567
1568 fn enforce_constraints(
1569 &self,
1570 provider: &CudaKernelProvider,
1571 executor: &Executor,
1572 ) -> Result<()> {
1573 for i in 0..self.program.constraints.len() {
1574 let name = format!("__xlog_constraint_{}", i);
1575 let buf = executor.store().get(&name).ok_or_else(|| {
1576 XlogError::Execution(format!(
1577 "Missing constraint result relation {} (compiler bug?)",
1578 name
1579 ))
1580 })?;
1581
1582 if buf.num_rows() == 0 {
1583 continue;
1584 }
1585
1586 let rows = provider.download_column::<u32>(buf, 0).unwrap_or_default();
1587 if rows.is_empty() {
1588 continue;
1589 }
1590
1591 return Err(XlogError::Execution(format!(
1592 "Constraint {} violated: {}",
1593 i,
1594 format_constraint(&self.program.constraints[i].body)
1595 )));
1596 }
1597
1598 Ok(())
1599 }
1600}
1601
1602const DEFAULT_EPISTEMIC_MAX_MODELS_PER_REDUCTION: usize = 1024;
1603
1604fn normalize_program(program: Program) -> Result<Program> {
1605 let max_recursion = program.directives.max_recursion_depth.unwrap_or(100);
1606 let expanded = xlog_logic::expand_program_functions(&program, max_recursion)
1607 .map_err(|e| XlogError::Compilation(e.to_string()))?;
1608 let normalized = xlog_logic::normalize_meta_builtins(&expanded)?;
1609 let listed = xlog_logic::normalize_list_builtins(&normalized)?;
1610 Ok(desugar_shared_variable_epistemic_constraints(listed))
1611}
1612
1613enum WfsNegationTransform<'a> {
1614 Drop,
1615 Rename(&'a HashMap<String, String>),
1616}
1617
1618fn compile_epistemic_wfs_gpu_plan(program: &Program) -> Result<EpistemicWfsGpuPlan> {
1619 if !program.constraints.is_empty() {
1620 return Err(XlogError::UnsupportedEpistemicConstruct {
1621 construct: "GPU WFS integrity constraints".to_string(),
1622 context: "cyclic WFS execution currently supports reduced normal rules only"
1623 .to_string(),
1624 });
1625 }
1626
1627 let negated = wfs_negated_predicates(program);
1628 let upper_fixed_names = wfs_fixed_names(program, &negated, "__wfs_upper");
1629 let lower_fixed_names = wfs_fixed_names(program, &negated, "__wfs_lower");
1630
1631 let overapprox_program = wfs_transform_program(program, WfsNegationTransform::Drop)?;
1632 let lower_program =
1633 wfs_transform_program(program, WfsNegationTransform::Rename(&upper_fixed_names))?;
1634 let upper_program =
1635 wfs_transform_program(program, WfsNegationTransform::Rename(&lower_fixed_names))?;
1636
1637 Ok(EpistemicWfsGpuPlan {
1638 overapprox: compile_wfs_gpu_ordinary_plan(&overapprox_program)?,
1639 lower: compile_wfs_gpu_ordinary_plan(&lower_program)?,
1640 upper: compile_wfs_gpu_ordinary_plan(&upper_program)?,
1641 intensional_predicates: wfs_intensional_predicates(program),
1642 upper_fixed_names,
1643 lower_fixed_names,
1644 max_iterations: (program.directives.max_recursion_depth_or_default() as usize).max(1),
1645 })
1646}
1647
1648fn compile_wfs_gpu_ordinary_plan(program: &Program) -> Result<WfsGpuOrdinaryPlan> {
1649 let mut compiler = Compiler::new();
1650 let plan = compiler.compile_program(program)?;
1651 Ok(WfsGpuOrdinaryPlan {
1652 plan,
1653 schemas: compiler.schemas().clone(),
1654 rel_ids: compiler.rel_ids().clone(),
1655 })
1656}
1657
1658fn wfs_transform_program(program: &Program, negation: WfsNegationTransform<'_>) -> Result<Program> {
1659 let mut out = program.clone();
1660 out.rules = program
1661 .rules
1662 .iter()
1663 .map(|rule| {
1664 let mut rule = rule.clone();
1665 let mut body = Vec::with_capacity(rule.body.len());
1666 for lit in &rule.body {
1667 match (lit, &negation) {
1668 (BodyLiteral::Negated(_), WfsNegationTransform::Drop) => {}
1669 (BodyLiteral::Negated(atom), WfsNegationTransform::Rename(names)) => {
1670 let mut atom = atom.clone();
1671 atom.predicate = names.get(&atom.predicate).cloned().ok_or_else(|| {
1672 XlogError::Execution(format!(
1673 "missing WFS fixed relation name for {}",
1674 atom.predicate
1675 ))
1676 })?;
1677 body.push(BodyLiteral::Negated(atom));
1678 }
1679 _ => body.push(lit.clone()),
1680 }
1681 }
1682 rule.body = body;
1683 Ok(rule)
1684 })
1685 .collect::<Result<Vec<_>>>()?;
1686 if let WfsNegationTransform::Rename(names) = negation {
1687 add_wfs_fixed_predicates(&mut out, names)?;
1688 }
1689 Ok(out)
1690}
1691
1692fn add_wfs_fixed_predicates(program: &mut Program, names: &HashMap<String, String>) -> Result<()> {
1693 let existing: BTreeSet<String> = program
1694 .predicates
1695 .iter()
1696 .map(|decl| decl.name.clone())
1697 .collect();
1698 for (source, fixed) in names {
1699 if existing.contains(fixed) {
1700 return Err(XlogError::UnsupportedEpistemicConstruct {
1701 construct: "GPU WFS fixed relation name".to_string(),
1702 context: format!(
1703 "internal fixed relation {fixed} collides with a declared predicate"
1704 ),
1705 });
1706 }
1707 let Some(decl) = program.predicates.iter().find(|decl| decl.name == *source) else {
1708 return Err(XlogError::UnsupportedEpistemicConstruct {
1709 construct: "GPU WFS fixed relation schema".to_string(),
1710 context: format!(
1711 "negated predicate {source} has no declaration to type fixed relation {fixed}"
1712 ),
1713 });
1714 };
1715 let mut fixed_decl = decl.clone();
1716 fixed_decl.name = fixed.clone();
1717 program.predicates.push(fixed_decl);
1718 }
1719 Ok(())
1720}
1721
1722fn wfs_negated_predicates(program: &Program) -> BTreeSet<String> {
1723 program
1724 .rules
1725 .iter()
1726 .flat_map(|rule| &rule.body)
1727 .filter_map(|lit| match lit {
1728 BodyLiteral::Negated(atom) => Some(atom.predicate.clone()),
1729 _ => None,
1730 })
1731 .collect()
1732}
1733
1734fn wfs_intensional_predicates(program: &Program) -> Vec<String> {
1735 program
1736 .proper_rules()
1737 .map(|rule| rule.head.predicate.clone())
1738 .collect::<BTreeSet<_>>()
1739 .into_iter()
1740 .collect()
1741}
1742
1743fn wfs_fixed_names(
1744 program: &Program,
1745 predicates: &BTreeSet<String>,
1746 prefix: &str,
1747) -> HashMap<String, String> {
1748 let mut reserved: BTreeSet<String> = program
1749 .predicates
1750 .iter()
1751 .map(|decl| decl.name.clone())
1752 .collect();
1753 let mut names = HashMap::new();
1754 for pred in predicates {
1755 let mut candidate = format!("{prefix}_{pred}");
1756 if reserved.contains(&candidate) {
1757 let mut suffix = 0usize;
1758 loop {
1759 let suffixed = format!("{prefix}_{suffix}_{pred}");
1760 if !reserved.contains(&suffixed) {
1761 candidate = suffixed;
1762 break;
1763 }
1764 suffix += 1;
1765 }
1766 }
1767 reserved.insert(candidate.clone());
1768 names.insert(pred.clone(), candidate);
1769 }
1770 names
1771}
1772
1773fn wfs_plan_combined_schemas(plan: &EpistemicWfsGpuPlan) -> HashMap<String, Schema> {
1774 let mut schemas = HashMap::new();
1775 for ordinary in [&plan.overapprox, &plan.lower, &plan.upper] {
1776 for (name, schema) in &ordinary.schemas {
1777 schemas
1778 .entry(name.clone())
1779 .or_insert_with(|| schema.clone());
1780 }
1781 }
1782 schemas
1783}
1784
1785fn wfs_plan_combined_rel_ids(plan: &EpistemicWfsGpuPlan) -> Result<HashMap<String, RelId>> {
1786 let mut rel_ids = HashMap::new();
1787 for ordinary in [&plan.overapprox, &plan.lower, &plan.upper] {
1788 for (name, rel_id) in &ordinary.rel_ids {
1789 rel_ids.insert(name.clone(), *rel_id);
1790 }
1791 }
1792 Ok(rel_ids)
1793}
1794
1795fn schema_from_pred_decl(
1796 decl: &xlog_logic::ast::PredDecl,
1797 domains: &HashMap<String, ScalarType>,
1798) -> Result<Schema> {
1799 let columns = pred_columns_for_decl(decl);
1800 let resolved = columns
1801 .iter()
1802 .enumerate()
1803 .map(|(idx, column)| {
1804 let name = column.name.clone().unwrap_or_else(|| format!("c{idx}"));
1805 resolve_pred_column_type(&decl.name, idx, &column.typ, domains).map(|typ| (name, typ))
1806 })
1807 .collect::<Result<Vec<_>>>()?;
1808 Ok(Schema::new(resolved))
1809}
1810
1811fn pred_columns_for_decl(decl: &xlog_logic::ast::PredDecl) -> Vec<PredColumn> {
1812 if decl.columns.is_empty() {
1813 decl.types
1814 .iter()
1815 .cloned()
1816 .map(|typ| PredColumn { name: None, typ })
1817 .collect()
1818 } else {
1819 decl.columns.clone()
1820 }
1821}
1822
1823fn resolve_pred_column_type(
1824 predicate: &str,
1825 index: usize,
1826 typ: &TypeRef,
1827 domains: &HashMap<String, ScalarType>,
1828) -> Result<ScalarType> {
1829 match typ {
1830 TypeRef::Scalar(ty) => Ok(*ty),
1831 TypeRef::Domain(name) => domains.get(name).copied().ok_or_else(|| {
1832 XlogError::Compilation(format!(
1833 "unknown domain alias '{}' in predicate '{}' column {}",
1834 name, predicate, index
1835 ))
1836 }),
1837 TypeRef::List(_) | TypeRef::Term | TypeRef::Compound | TypeRef::PredRef => {
1838 Ok(ScalarType::U64)
1839 }
1840 }
1841}
1842
1843fn schema_from_terms(terms: &[Term]) -> Schema {
1844 let columns = terms
1845 .iter()
1846 .enumerate()
1847 .map(|(idx, term)| (format!("c{idx}"), infer_term_type(term)))
1848 .collect();
1849 Schema::new(columns)
1850}
1851
1852fn infer_term_type(term: &Term) -> ScalarType {
1853 match term {
1854 Term::Variable(_) | Term::Anonymous => ScalarType::U64,
1855 Term::Integer(value) => {
1856 if *value >= 0 && *value <= u32::MAX as i64 {
1857 ScalarType::U32
1858 } else {
1859 ScalarType::I64
1860 }
1861 }
1862 Term::Float(_) => ScalarType::F64,
1863 Term::String(_) | Term::Symbol(_) => ScalarType::Symbol,
1864 Term::List(_) | Term::Cons { .. } | Term::Compound { .. } | Term::PredRef(_) => {
1865 ScalarType::U64
1866 }
1867 Term::Aggregate(agg) => match agg.op {
1868 AggOp::Count => ScalarType::U32,
1869 AggOp::Sum => ScalarType::U64,
1870 AggOp::Min | AggOp::Max => ScalarType::U32,
1871 AggOp::LogSumExp => ScalarType::F64,
1872 },
1873 }
1874}
1875
1876fn desugar_shared_variable_epistemic_constraints(mut program: Program) -> Program {
1904 let modal_derived: BTreeSet<String> = program
1909 .rules
1910 .iter()
1911 .filter(|rule| {
1912 rule.body
1913 .iter()
1914 .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
1915 })
1916 .map(|rule| rule.head.predicate.clone())
1917 .collect();
1918 let mut extraction_rules: Vec<Rule> = Vec::new();
1919 let mut counter = 0usize;
1920 for constraint in &mut program.constraints {
1921 let has_epistemic = constraint
1922 .body
1923 .iter()
1924 .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)));
1925 if !has_epistemic || !constraint_has_shared_variable(&constraint.body) {
1926 continue;
1927 }
1928 let has_modal_derived_target = constraint.body.iter().any(|lit| {
1930 matches!(lit, BodyLiteral::Epistemic(e) if modal_derived.contains(&e.atom.predicate))
1931 });
1932 if has_modal_derived_target {
1933 continue;
1934 }
1935 let distinct = distinct_body_variables(&constraint.body);
1936 let helper = format!("__epi_join_{counter}");
1937 counter += 1;
1938 let helper_terms: Vec<Term> = distinct.iter().map(|v| Term::Variable(v.clone())).collect();
1939 let helper_body: Vec<BodyLiteral> = constraint
1940 .body
1941 .iter()
1942 .map(ordinaryize_modal_literal)
1943 .collect();
1944 extraction_rules.push(Rule {
1945 head: Atom {
1946 predicate: helper.clone(),
1947 terms: helper_terms.clone(),
1948 },
1949 body: helper_body,
1950 });
1951 constraint.body = vec![BodyLiteral::Epistemic(EpistemicLiteral {
1953 op: EpistemicOp::Know,
1954 negated: false,
1955 atom: Atom {
1956 predicate: helper,
1957 terms: helper_terms,
1958 },
1959 })];
1960 }
1961 program.rules.extend(extraction_rules);
1962 program
1963}
1964
1965fn ordinaryize_modal_literal(lit: &BodyLiteral) -> BodyLiteral {
1970 match lit {
1971 BodyLiteral::Epistemic(e) if e.negated => BodyLiteral::Negated(e.atom.clone()),
1972 BodyLiteral::Epistemic(e) => BodyLiteral::Positive(e.atom.clone()),
1973 other => other.clone(),
1974 }
1975}
1976
1977fn constraint_has_shared_variable(body: &[BodyLiteral]) -> bool {
1980 let mut counts: std::collections::BTreeMap<String, usize> = std::collections::BTreeMap::new();
1981 for lit in body {
1982 if let Some(atom) = lit.atom() {
1983 for term in &atom.terms {
1984 if let Term::Variable(name) = term {
1985 *counts.entry(name.clone()).or_insert(0) += 1;
1986 }
1987 }
1988 }
1989 }
1990 counts.values().any(|&count| count > 1)
1991}
1992
1993fn distinct_body_variables(body: &[BodyLiteral]) -> Vec<String> {
1996 let mut seen = BTreeSet::new();
1997 let mut order = Vec::new();
1998 for lit in body {
1999 if let Some(atom) = lit.atom() {
2000 for term in &atom.terms {
2001 if let Term::Variable(name) = term {
2002 if seen.insert(name.clone()) {
2003 order.push(name.clone());
2004 }
2005 }
2006 }
2007 }
2008 }
2009 order
2010}
2011
2012fn augment_same_name_multi_arity_schemas(
2013 program: &Program,
2014 schemas: &mut HashMap<String, Schema>,
2015) -> Result<()> {
2016 let arities = predicate_arities(program);
2017 let domains: HashMap<String, ScalarType> = program
2018 .domains
2019 .iter()
2020 .map(|domain| (domain.name.clone(), domain.typ))
2021 .collect();
2022
2023 for decl in &program.predicates {
2024 let Some(pred_arities) = arities.get(&decl.name) else {
2025 continue;
2026 };
2027 if pred_arities.len() <= 1 {
2028 continue;
2029 }
2030 let key = arity_qualified_name(&decl.name, pred_decl_arity(decl));
2031 schemas.insert(key, schema_from_pred_decl(decl, &domains)?);
2032 }
2033
2034 for fact in program.facts() {
2035 let pred = fact.head.predicate.as_str();
2036 let arity = fact.head.terms.len();
2037 let Some(pred_arities) = arities.get(pred) else {
2038 continue;
2039 };
2040 if pred_arities.len() <= 1 {
2041 continue;
2042 }
2043 let key = arity_qualified_name(pred, arity);
2044 schemas
2045 .entry(key)
2046 .or_insert_with(|| schema_from_terms(&fact.head.terms));
2047 }
2048
2049 for rule in &program.rules {
2050 augment_atom_schema_if_needed(&rule.head, &arities, schemas);
2051 for literal in &rule.body {
2052 match literal {
2053 BodyLiteral::Positive(atom) | BodyLiteral::Negated(atom) => {
2054 augment_atom_schema_if_needed(atom, &arities, schemas);
2055 }
2056 BodyLiteral::Epistemic(epistemic) => {
2057 augment_atom_schema_if_needed(&epistemic.atom, &arities, schemas);
2058 }
2059 BodyLiteral::Comparison(_) | BodyLiteral::IsExpr(_) | BodyLiteral::Univ(_) => {}
2060 }
2061 }
2062 }
2063
2064 for query in &program.queries {
2065 augment_atom_schema_if_needed(&query.atom, &arities, schemas);
2066 }
2067
2068 Ok(())
2069}
2070
2071fn augment_atom_schema_if_needed(
2072 atom: &Atom,
2073 arities: &HashMap<String, BTreeSet<usize>>,
2074 schemas: &mut HashMap<String, Schema>,
2075) {
2076 let Some(pred_arities) = arities.get(&atom.predicate) else {
2077 return;
2078 };
2079 if pred_arities.len() <= 1 {
2080 return;
2081 }
2082 let key = arity_qualified_name(&atom.predicate, atom.terms.len());
2083 schemas
2084 .entry(key)
2085 .or_insert_with(|| schema_from_terms(&atom.terms));
2086}
2087
2088fn predicate_arities(program: &Program) -> HashMap<String, BTreeSet<usize>> {
2089 let mut arities = HashMap::new();
2090 for decl in &program.predicates {
2091 add_predicate_arity(&mut arities, &decl.name, pred_decl_arity(decl));
2092 }
2093 for rule in &program.rules {
2094 add_predicate_arity(&mut arities, &rule.head.predicate, rule.head.terms.len());
2095 for literal in &rule.body {
2096 match literal {
2097 BodyLiteral::Positive(atom) | BodyLiteral::Negated(atom) => {
2098 add_predicate_arity(&mut arities, &atom.predicate, atom.terms.len());
2099 }
2100 BodyLiteral::Epistemic(epistemic) => {
2101 add_predicate_arity(
2102 &mut arities,
2103 &epistemic.atom.predicate,
2104 epistemic.atom.terms.len(),
2105 );
2106 }
2107 BodyLiteral::Comparison(_) | BodyLiteral::IsExpr(_) | BodyLiteral::Univ(_) => {}
2108 }
2109 }
2110 }
2111 for query in &program.queries {
2112 add_predicate_arity(&mut arities, &query.atom.predicate, query.atom.terms.len());
2113 }
2114 arities
2115}
2116
2117fn add_predicate_arity(
2118 arities: &mut HashMap<String, BTreeSet<usize>>,
2119 predicate: &str,
2120 arity: usize,
2121) {
2122 arities
2123 .entry(predicate.to_string())
2124 .or_default()
2125 .insert(arity);
2126}
2127
2128fn arity_qualified_name_if_needed(
2129 predicate: &str,
2130 arity: usize,
2131 arities: &HashMap<String, BTreeSet<usize>>,
2132) -> String {
2133 if arities.get(predicate).is_some_and(|items| items.len() > 1) {
2134 arity_qualified_name(predicate, arity)
2135 } else {
2136 predicate.to_string()
2137 }
2138}
2139
2140fn arity_qualified_name(predicate: &str, arity: usize) -> String {
2141 format!("{predicate}/{arity}")
2142}
2143
2144fn pred_decl_arity(decl: &xlog_logic::ast::PredDecl) -> usize {
2145 if decl.columns.is_empty() {
2146 decl.types.len()
2147 } else {
2148 decl.columns.len()
2149 }
2150}
2151
2152fn program_has_epistemic_literals(program: &Program) -> bool {
2153 program.rules.iter().any(|rule| {
2154 rule.body
2155 .iter()
2156 .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2157 }) || program.constraints.iter().any(|constraint| {
2158 constraint
2159 .body
2160 .iter()
2161 .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2162 })
2163}
2164
2165fn epistemic_output_head_predicate_count(program: &Program) -> usize {
2166 program
2167 .rules
2168 .iter()
2169 .filter(|rule| {
2170 rule.body
2171 .iter()
2172 .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2173 })
2174 .map(|rule| rule.head.predicate.as_str())
2175 .collect::<BTreeSet<_>>()
2176 .len()
2177}
2178
2179fn epistemic_stratum_output_heads(program: &Program) -> Vec<String> {
2183 program
2184 .rules
2185 .iter()
2186 .filter(|rule| {
2187 rule.body
2188 .iter()
2189 .any(|lit| matches!(lit, BodyLiteral::Epistemic(_)))
2190 })
2191 .map(|rule| rule.head.predicate.clone())
2192 .collect::<BTreeSet<_>>()
2193 .into_iter()
2194 .collect()
2195}
2196
2197fn epistemic_relation_ids(plan: &LogicExecutionPlan) -> Result<HashMap<String, RelId>> {
2198 let mut rel_ids = HashMap::new();
2199 match plan {
2200 LogicExecutionPlan::EpistemicSingle(executable) => {
2201 for (name, rel_id) in &executable.relation_ids {
2202 insert_epistemic_relation_id(&mut rel_ids, name, *rel_id)?;
2203 }
2204 }
2205 LogicExecutionPlan::EpistemicSplit(split) => {
2206 for component in &split.components {
2207 for (name, rel_id) in &component.executable.relation_ids {
2208 insert_epistemic_relation_id(&mut rel_ids, name, *rel_id)?;
2209 }
2210 }
2211 }
2212 LogicExecutionPlan::EpistemicStratified(strata) => {
2213 for stratum in strata {
2214 match &stratum.plan {
2215 StratumPlanKind::Single(executable) => {
2216 for (name, rel_id) in &executable.relation_ids {
2217 rel_ids.insert(name.clone(), *rel_id);
2221 }
2222 }
2223 StratumPlanKind::Split(split) => {
2224 for component in &split.components {
2225 for (name, rel_id) in &component.executable.relation_ids {
2226 rel_ids.insert(name.clone(), *rel_id);
2227 }
2228 }
2229 }
2230 StratumPlanKind::Ordinary { .. } => {}
2234 }
2235 }
2236 }
2237 LogicExecutionPlan::EpistemicWfsGpu(wfs) => {
2238 for plan in [&wfs.overapprox, &wfs.lower, &wfs.upper] {
2239 for (name, rel_id) in &plan.rel_ids {
2240 rel_ids.insert(name.clone(), *rel_id);
2241 }
2242 }
2243 }
2244 LogicExecutionPlan::Ordinary(_) => {}
2245 }
2246 Ok(rel_ids)
2247}
2248
2249fn insert_epistemic_relation_id(
2250 rel_ids: &mut HashMap<String, RelId>,
2251 name: &str,
2252 rel_id: RelId,
2253) -> Result<()> {
2254 if let Some(previous) = rel_ids.insert(name.to_string(), rel_id) {
2255 if previous != rel_id {
2256 return Err(XlogError::Compilation(format!(
2257 "epistemic split components assigned conflicting relation ids for {name}: \
2258 {previous:?} vs {rel_id:?}"
2259 )));
2260 }
2261 }
2262 Ok(())
2263}
2264
2265fn capacities_for_epistemic_executable(
2266 executable: &EpistemicExecutablePlan,
2267) -> Result<EpistemicGpuWorkspaceCapacities> {
2268 let literal_count = executable.gpu_plan.epistemic_literals.len();
2269 let max_candidates = 1usize.checked_shl(literal_count as u32).ok_or_else(|| {
2270 XlogError::UnsupportedEpistemicConstruct {
2271 construct: "epistemic GPU execution candidate generation".to_string(),
2272 context: format!("literal count {literal_count} exceeds target pointer width"),
2273 }
2274 })?;
2275 Ok(EpistemicGpuWorkspaceCapacities {
2276 max_candidates,
2277 max_worlds: 1,
2278 max_models_per_reduction: DEFAULT_EPISTEMIC_MAX_MODELS_PER_REDUCTION,
2279 })
2280}
2281
2282fn capacities_for_epistemic_split(
2283 split: &EpistemicSplitExecutablePlan,
2284) -> Result<EpistemicGpuWorkspaceCapacities> {
2285 let mut capacities = EpistemicGpuWorkspaceCapacities {
2286 max_candidates: 1,
2287 max_worlds: 1,
2288 max_models_per_reduction: DEFAULT_EPISTEMIC_MAX_MODELS_PER_REDUCTION,
2289 };
2290 for component in &split.components {
2291 let component_capacities = capacities_for_epistemic_executable(&component.executable)?;
2292 capacities.max_candidates = capacities
2293 .max_candidates
2294 .max(component_capacities.max_candidates);
2295 }
2296 Ok(capacities)
2297}
2298
2299fn epistemic_output_relation_name(executable: &EpistemicExecutablePlan) -> Result<String> {
2300 executable
2301 .gpu_plan
2302 .reductions
2303 .last()
2304 .map(|reduction| reduction.head_predicate.clone())
2305 .ok_or_else(|| XlogError::UnsupportedEpistemicConstruct {
2306 construct: "epistemic GPU reduced output".to_string(),
2307 context: "executable plan has no epistemic reductions".to_string(),
2308 })
2309}
2310
2311fn epistemic_buffer_to_query_result(relation_name: String, buffer: CudaBuffer) -> LogicQueryResult {
2312 let schema = buffer.schema();
2313 let columns = schema
2314 .columns
2315 .iter()
2316 .map(|(name, _)| name.clone())
2317 .collect();
2318 let sort_labels = schema.sort_labels().to_vec();
2319 LogicQueryResult {
2320 relation_name,
2321 columns,
2322 sort_labels,
2323 buffer,
2324 }
2325}
2326
2327fn epistemic_result_to_query_results(
2335 primary_relation_name: String,
2336 result: EpistemicGpuExecutionResult,
2337) -> Vec<LogicQueryResult> {
2338 let mut results = Vec::with_capacity(1 + result.additional_head_outputs.len());
2339 for (head, buffer) in result.additional_head_outputs {
2340 results.push(epistemic_buffer_to_query_result(head, buffer));
2341 }
2342 results.push(epistemic_buffer_to_query_result(
2343 primary_relation_name,
2344 result.final_output,
2345 ));
2346 results
2347}
2348
2349fn is_user_visible_relation(name: &str) -> bool {
2350 !name.starts_with("__")
2351}
2352
2353fn is_list_helper_relation(name: &str) -> bool {
2354 name.starts_with("__xlog_list_")
2355}
2356
2357fn logic_delta_report(
2358 stats: DeltaRecomputeStats,
2359 insert_rows: u64,
2360 delete_rows: u64,
2361) -> LogicDeltaReport {
2362 LogicDeltaReport {
2363 input_delta_count: stats.changed_relations,
2364 changed_relations: stats.changed_relations,
2365 changed_relation_names: Vec::new(),
2366 insert_rows,
2367 delete_rows,
2368 has_deletes: stats.has_deletes,
2369 affected_sccs: stats.affected_sccs,
2370 recomputed_sccs: stats.recomputed_sccs,
2371 incremental_sccs: stats.incremental_sccs,
2372 coalesced_insert_rows: insert_rows,
2373 coalesced_delete_rows: delete_rows,
2374 canceled_rows: 0,
2375 planner_telemetry: DeltaPlannerTelemetry::default(),
2376 debug_trace: Vec::new(),
2377 }
2378}
2379
2380fn delta_debug_trace(report: &LogicDeltaReport) -> Vec<String> {
2381 vec![
2382 format!("changed_relation_names={:?}", report.changed_relation_names),
2383 format!("affected_sccs={}", report.affected_sccs),
2384 format!("recomputed_sccs={}", report.recomputed_sccs),
2385 format!("incremental_sccs={}", report.incremental_sccs),
2386 format!("insert_rows={}", report.insert_rows),
2387 format!("delete_rows={}", report.delete_rows),
2388 format!(
2389 "planner_fallback_decision={}",
2390 report.planner_telemetry.fallback_decision
2391 ),
2392 format!(
2393 "estimated_delta_speedup={:?}",
2394 report.planner_telemetry.estimated_delta_speedup
2395 ),
2396 ]
2397}
2398
2399fn buffers_gpu_set_equivalent(
2400 provider: &CudaKernelProvider,
2401 left: &CudaBuffer,
2402 right: &CudaBuffer,
2403) -> Result<bool> {
2404 if left.schema() != right.schema() {
2405 return Ok(false);
2406 }
2407 let left_rows = provider.device_row_count(left)?;
2408 let right_rows = provider.device_row_count(right)?;
2409 if left_rows != right_rows {
2410 return Ok(false);
2411 }
2412
2413 let left_minus_right = provider.diff_full_row(left, right)?;
2414 if provider.device_row_count(&left_minus_right)? != 0 {
2415 return Ok(false);
2416 }
2417 let right_minus_left = provider.diff_full_row(right, left)?;
2418 Ok(provider.device_row_count(&right_minus_left)? == 0)
2419}
2420
2421fn coalesce_relation_delta_batch(
2422 provider: &CudaKernelProvider,
2423 delta_batch: Vec<(String, RelationDelta)>,
2424) -> Result<CoalescedRelationDeltaBatch> {
2425 let input_delta_count = delta_batch.len();
2426 let mut pending_by_relation: HashMap<String, PendingRelationDelta> = HashMap::new();
2427 let mut canceled_rows = 0u64;
2428
2429 for (name, delta) in delta_batch {
2430 let pending = pending_by_relation.entry(name).or_default();
2431 if let Some(insert) = delta.insert {
2432 merge_insert_delta(provider, pending, insert, &mut canceled_rows)?;
2433 }
2434 if let Some(delete) = delta.delete {
2435 merge_delete_delta(provider, pending, delete, &mut canceled_rows)?;
2436 }
2437 }
2438
2439 let mut deltas = HashMap::new();
2440 let mut coalesced_insert_rows = 0u64;
2441 let mut coalesced_delete_rows = 0u64;
2442 for (name, pending) in pending_by_relation {
2443 let insert = pending.insert.and_then(non_empty_buffer);
2444 let delete = pending.delete.and_then(non_empty_buffer);
2445 if insert.is_none() && delete.is_none() {
2446 continue;
2447 }
2448 coalesced_insert_rows += insert.as_ref().map(buffer_rows).unwrap_or(0);
2449 coalesced_delete_rows += delete.as_ref().map(buffer_rows).unwrap_or(0);
2450 deltas.insert(name, RelationDelta::new(insert, delete));
2451 }
2452
2453 let changed_relations = deltas.len();
2454 Ok(CoalescedRelationDeltaBatch {
2455 deltas,
2456 input_delta_count,
2457 changed_relations,
2458 coalesced_insert_rows,
2459 coalesced_delete_rows,
2460 canceled_rows,
2461 })
2462}
2463
2464fn merge_insert_delta(
2465 provider: &CudaKernelProvider,
2466 pending: &mut PendingRelationDelta,
2467 insert: CudaBuffer,
2468 canceled_rows: &mut u64,
2469) -> Result<()> {
2470 let mut incoming = provider.dedup_full_row(&insert)?;
2471 if let Some(delete) = pending.delete.take().and_then(non_empty_buffer) {
2472 let delete_before = buffer_rows(&delete);
2473 let delete_after = provider.diff_full_row(&delete, &incoming)?;
2474 let insert_after = provider.diff_full_row(&incoming, &delete)?;
2475 *canceled_rows += delete_before.saturating_sub(buffer_rows(&delete_after));
2476 pending.delete = non_empty_buffer(delete_after);
2477 incoming = insert_after;
2478 }
2479 pending.insert = merge_optional_buffer(provider, pending.insert.take(), incoming)?;
2480 Ok(())
2481}
2482
2483fn merge_delete_delta(
2484 provider: &CudaKernelProvider,
2485 pending: &mut PendingRelationDelta,
2486 delete: CudaBuffer,
2487 canceled_rows: &mut u64,
2488) -> Result<()> {
2489 let mut incoming = provider.dedup_full_row(&delete)?;
2490 if let Some(insert) = pending.insert.take().and_then(non_empty_buffer) {
2491 let insert_before = buffer_rows(&insert);
2492 let insert_after = provider.diff_full_row(&insert, &incoming)?;
2493 let delete_after = provider.diff_full_row(&incoming, &insert)?;
2494 *canceled_rows += insert_before.saturating_sub(buffer_rows(&insert_after));
2495 pending.insert = non_empty_buffer(insert_after);
2496 incoming = delete_after;
2497 }
2498 pending.delete = merge_optional_buffer(provider, pending.delete.take(), incoming)?;
2499 Ok(())
2500}
2501
2502fn merge_optional_buffer(
2503 provider: &CudaKernelProvider,
2504 existing: Option<CudaBuffer>,
2505 incoming: CudaBuffer,
2506) -> Result<Option<CudaBuffer>> {
2507 let Some(incoming) = non_empty_buffer(incoming) else {
2508 return Ok(existing.and_then(non_empty_buffer));
2509 };
2510 match existing.and_then(non_empty_buffer) {
2511 Some(existing) => provider
2512 .union_gpu(&existing, &incoming)
2513 .map(non_empty_buffer),
2514 None => Ok(Some(incoming)),
2515 }
2516}
2517
2518fn non_empty_buffer(buffer: CudaBuffer) -> Option<CudaBuffer> {
2519 if buffer.is_empty() {
2520 None
2521 } else {
2522 Some(buffer)
2523 }
2524}
2525
2526fn buffer_rows(buffer: &CudaBuffer) -> u64 {
2527 buffer
2528 .cached_row_count()
2529 .map(u64::from)
2530 .unwrap_or_else(|| buffer.num_rows())
2531}
2532
2533fn ensure_schema_type_compatible(expected: &Schema, actual: &Schema) -> Result<()> {
2534 if expected.arity() != actual.arity() {
2535 return Err(XlogError::Execution(format!(
2536 "Expected {} columns, got {}",
2537 expected.arity(),
2538 actual.arity()
2539 )));
2540 }
2541 for i in 0..expected.arity() {
2542 let exp = expected.column_type(i).ok_or_else(|| {
2543 XlogError::Execution(format!("Missing expected type for column {}", i))
2544 })?;
2545 let act = actual
2546 .column_type(i)
2547 .ok_or_else(|| XlogError::Execution(format!("Missing actual type for column {}", i)))?;
2548 if exp != act {
2549 return Err(XlogError::Execution(format!(
2550 "Column {} type mismatch: expected {:?}, got {:?}",
2551 i, exp, act
2552 )));
2553 }
2554 }
2555 Ok(())
2556}
2557
2558fn push_term_bytes(out: &mut Vec<u8>, term: &Term, typ: xlog_core::ScalarType) -> Result<()> {
2559 use xlog_core::symbol;
2560 use xlog_core::ScalarType;
2561
2562 match (typ, term) {
2563 (ScalarType::U32, Term::Integer(v)) => {
2564 let v = u32::try_from(*v)
2565 .map_err(|_| XlogError::Execution(format!("u32 out of range: {}", v)))?;
2566 out.extend_from_slice(&v.to_le_bytes());
2567 }
2568 (ScalarType::U64, Term::Integer(v)) => {
2569 let v = u64::try_from(*v)
2570 .map_err(|_| XlogError::Execution(format!("u64 out of range: {}", v)))?;
2571 out.extend_from_slice(&v.to_le_bytes());
2572 }
2573 (ScalarType::I32, Term::Integer(v)) => {
2574 let v = i32::try_from(*v)
2575 .map_err(|_| XlogError::Execution(format!("i32 out of range: {}", v)))?;
2576 out.extend_from_slice(&v.to_le_bytes());
2577 }
2578 (ScalarType::I64, Term::Integer(v)) => {
2579 out.extend_from_slice(&v.to_le_bytes());
2580 }
2581 (ScalarType::F32, Term::Float(v)) => {
2582 out.extend_from_slice(&(*v as f32).to_le_bytes());
2583 }
2584 (ScalarType::F64, Term::Float(v)) => {
2585 out.extend_from_slice(&v.to_le_bytes());
2586 }
2587 (ScalarType::F32, Term::Integer(v)) => {
2588 out.extend_from_slice(&(*v as f32).to_le_bytes());
2589 }
2590 (ScalarType::F64, Term::Integer(v)) => {
2591 out.extend_from_slice(&(*v as f64).to_le_bytes());
2592 }
2593 (ScalarType::Bool, Term::Integer(v)) => {
2594 let b = match *v {
2595 0 => 0u8,
2596 1 => 1u8,
2597 other => {
2598 return Err(XlogError::Execution(format!(
2599 "bool expects 0/1, got {}",
2600 other
2601 )));
2602 }
2603 };
2604 out.push(b);
2605 }
2606 (ScalarType::Bool, Term::Symbol(id)) => {
2607 let s = symbol::resolve(*id);
2608 if s == "true" || s == "false" {
2609 out.push(if s == "true" { 1u8 } else { 0u8 });
2610 } else {
2611 return Err(XlogError::Execution(format!(
2612 "Expected boolean symbol 'true' or 'false', got '{}'",
2613 s
2614 )));
2615 }
2616 }
2617 (ScalarType::Symbol, Term::String(s)) => {
2618 out.extend_from_slice(&symbol::intern(s).to_le_bytes());
2619 }
2620 (ScalarType::Symbol, Term::Symbol(id)) => {
2621 out.extend_from_slice(&id.to_le_bytes());
2623 }
2624 (_, Term::Variable(v)) => {
2625 return Err(XlogError::Execution(format!(
2626 "Fact cannot contain variable {}",
2627 v
2628 )));
2629 }
2630 (_, Term::Anonymous) => {
2631 return Err(XlogError::Execution(
2632 "Fact cannot contain anonymous wildcard '_'".to_string(),
2633 ));
2634 }
2635 (_, Term::Aggregate(_)) => {
2636 return Err(XlogError::Execution(
2637 "Fact cannot contain aggregate".to_string(),
2638 ));
2639 }
2640 (expected, got) => {
2641 return Err(XlogError::Execution(format!(
2642 "Type mismatch in fact: expected {:?}, got {:?}",
2643 expected, got
2644 )));
2645 }
2646 }
2647
2648 Ok(())
2649}
2650
2651fn query_output_vars(Query { atom }: &Query) -> Vec<String> {
2652 let mut out = Vec::new();
2653 let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
2654 for term in &atom.terms {
2655 for name in term.variables() {
2656 if seen.insert(name) {
2657 out.push(name.to_string());
2658 }
2659 }
2660 }
2661 out
2662}
2663
2664fn format_term(term: &Term) -> String {
2665 match term {
2666 Term::Variable(v) => v.clone(),
2667 Term::Anonymous => "_".to_string(),
2668 Term::Integer(i) => i.to_string(),
2669 Term::Float(f) => f.to_string(),
2670 Term::String(s) => format!("{:?}", s),
2671 Term::Symbol(id) => symbol::resolve(*id),
2672 Term::List(items) => format!(
2673 "[{}]",
2674 items.iter().map(format_term).collect::<Vec<_>>().join(", ")
2675 ),
2676 Term::Cons { head, tail } => format!("[{} | {}]", format_term(head), format_term(tail)),
2677 Term::Compound { functor, args } => format!(
2678 "{}({})",
2679 functor,
2680 args.iter().map(format_term).collect::<Vec<_>>().join(", ")
2681 ),
2682 Term::PredRef(name) => format!("predref({})", name),
2683 Term::Aggregate(a) => format!("{:?}({})", a.op, a.variable),
2684 }
2685}
2686
2687fn format_constraint(body: &[BodyLiteral]) -> String {
2688 let lits = body
2689 .iter()
2690 .map(|lit| match lit {
2691 BodyLiteral::Positive(a) => {
2692 let args = a
2693 .terms
2694 .iter()
2695 .map(format_term)
2696 .collect::<Vec<_>>()
2697 .join(", ");
2698 format!("{}({})", a.predicate, args)
2699 }
2700 BodyLiteral::Negated(a) => {
2701 let args = a
2702 .terms
2703 .iter()
2704 .map(format_term)
2705 .collect::<Vec<_>>()
2706 .join(", ");
2707 format!("not {}({})", a.predicate, args)
2708 }
2709 BodyLiteral::Epistemic(lit) => {
2710 let args = lit
2711 .atom
2712 .terms
2713 .iter()
2714 .map(format_term)
2715 .collect::<Vec<_>>()
2716 .join(", ");
2717 let op = match lit.op {
2718 xlog_logic::EpistemicOp::Know => "know",
2719 xlog_logic::EpistemicOp::Possible => "possible",
2720 };
2721 let prefix = if lit.negated { "not " } else { "" };
2722 format!("{prefix}{op} {}({})", lit.atom.predicate, args)
2723 }
2724 BodyLiteral::Comparison(c) => format!("{:?} {:?} {:?}", c.left, c.op, c.right),
2725 BodyLiteral::IsExpr(is) => format!("{} is {:?}", is.target, is.expr),
2726 BodyLiteral::Univ(univ) => {
2727 format!(
2728 "{} =.. {}",
2729 format_term(&univ.term),
2730 format_term(&univ.parts)
2731 )
2732 }
2733 })
2734 .collect::<Vec<_>>()
2735 .join(", ");
2736 format!(":- {}.", lits)
2737}
2738
2739fn json_escape(s: &str) -> String {
2744 let mut out = String::with_capacity(s.len() + 2);
2745 for c in s.chars() {
2746 match c {
2747 '"' => out.push_str("\\\""),
2748 '\\' => out.push_str("\\\\"),
2749 '\n' => out.push_str("\\n"),
2750 '\r' => out.push_str("\\r"),
2751 '\t' => out.push_str("\\t"),
2752 c if (c as u32) < 0x20 => out.push_str(&format!("\\u{:04x}", c as u32)),
2753 c => out.push(c),
2754 }
2755 }
2756 out
2757}
2758
2759fn fnv1a_64(s: &str) -> u64 {
2763 let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
2764 for b in s.as_bytes() {
2765 hash ^= *b as u64;
2766 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
2767 }
2768 hash
2769}
2770
2771fn collect_eir_epistemic_literals(program: &Program) -> Vec<xlog_ir::EirEpistemicLiteral> {
2775 let mut lits = Vec::new();
2776 if let Ok(eir) = xlog_logic::build_eir(program) {
2777 for rule in &eir.rules {
2778 for lit in &rule.body {
2779 if let xlog_ir::EirBodyLiteral::Epistemic(e) = lit {
2780 lits.push(e.clone());
2781 }
2782 }
2783 }
2784 }
2785 lits
2786}
2787
2788fn epistemic_provenance_summary_json(
2794 plan_kind: &str,
2795 prov: &EpistemicProvenance,
2796 max_iterations: Option<usize>,
2797 wfs: Option<&EpistemicWfsGpuPlan>,
2798) -> String {
2799 let literals = prov
2800 .literals
2801 .iter()
2802 .map(epistemic_literal_json)
2803 .collect::<Vec<_>>()
2804 .join(",");
2805 let wfs_fixed_relations = wfs
2806 .map(wfs_fixed_relations_json)
2807 .unwrap_or_else(|| "null".to_string());
2808 let wfs_convergence_predicates = wfs
2809 .map(wfs_convergence_predicates_json)
2810 .unwrap_or_else(|| "null".to_string());
2811 let wfs_gpu_passes = if wfs.is_some() {
2812 "[\"overapprox\",\"lower\",\"upper\"]"
2813 } else {
2814 "null"
2815 };
2816 let host_wfs_fallback_allowed = if wfs.is_some() { "false" } else { "null" };
2817 let body = format!(
2818 "{{\"plan_kind\":\"{}\",\"reduction\":\"{}\",\
2819\"epistemic_literals\":[{}],\"units\":[],\"max_iterations\":{},\
2820\"wfs_fixed_relations\":{},\"wfs_convergence_predicates\":{},\
2821\"wfs_gpu_passes\":{},\
2822\"host_wfs_fallback_allowed\":{},\
2823\"cpu_fallback_total_zero\":true}}",
2824 json_escape(plan_kind),
2825 json_escape(prov.reduction),
2826 literals,
2827 max_iterations
2828 .map(|value| value.to_string())
2829 .unwrap_or_else(|| "null".to_string()),
2830 wfs_fixed_relations,
2831 wfs_convergence_predicates,
2832 wfs_gpu_passes,
2833 host_wfs_fallback_allowed
2834 );
2835 let plan_id = fnv1a_64(&body);
2836 format!(
2837 "{{\"plan_id\":\"epi-{:016x}\",\"plan_kind\":\"{}\",\
2838\"reduction\":\"{}\",\"epistemic_literals\":[{}],\"units\":[],\
2839\"max_iterations\":{},\"wfs_fixed_relations\":{},\
2840\"wfs_convergence_predicates\":{},\"wfs_gpu_passes\":{},\
2841\"host_wfs_fallback_allowed\":{},\
2842\"cpu_fallback_total_zero\":true}}",
2843 plan_id,
2844 json_escape(plan_kind),
2845 json_escape(prov.reduction),
2846 literals,
2847 max_iterations
2848 .map(|value| value.to_string())
2849 .unwrap_or_else(|| "null".to_string()),
2850 wfs_fixed_relations,
2851 wfs_convergence_predicates,
2852 wfs_gpu_passes,
2853 host_wfs_fallback_allowed
2854 )
2855}
2856
2857fn wfs_fixed_relations_json(wfs: &EpistemicWfsGpuPlan) -> String {
2858 let mut sources: BTreeSet<&str> = BTreeSet::new();
2859 for source in wfs.upper_fixed_names.keys() {
2860 sources.insert(source.as_str());
2861 }
2862 for source in wfs.lower_fixed_names.keys() {
2863 sources.insert(source.as_str());
2864 }
2865 let entries = sources
2866 .into_iter()
2867 .map(|source| {
2868 let upper = wfs
2869 .upper_fixed_names
2870 .get(source)
2871 .map(String::as_str)
2872 .unwrap_or("");
2873 let lower = wfs
2874 .lower_fixed_names
2875 .get(source)
2876 .map(String::as_str)
2877 .unwrap_or("");
2878 format!(
2879 "\"{}\":{{\"upper\":\"{}\",\"lower\":\"{}\"}}",
2880 json_escape(source),
2881 json_escape(upper),
2882 json_escape(lower)
2883 )
2884 })
2885 .collect::<Vec<_>>()
2886 .join(",");
2887 format!("{{{entries}}}")
2888}
2889
2890fn wfs_convergence_predicates_json(wfs: &EpistemicWfsGpuPlan) -> String {
2891 let entries = wfs
2892 .intensional_predicates
2893 .iter()
2894 .map(|pred| format!("\"{}\"", json_escape(pred)))
2895 .collect::<Vec<_>>()
2896 .join(",");
2897 format!("[{entries}]")
2898}
2899
2900fn epistemic_literal_json(lit: &xlog_ir::EirEpistemicLiteral) -> String {
2901 let op = match lit.op {
2902 xlog_ir::EirEpistemicOp::Know => "know",
2903 xlog_ir::EirEpistemicOp::Possible => "possible",
2904 };
2905 format!(
2906 "{{\"op\":\"{}\",\"negated\":{},\"predicate\":\"{}\",\"arity\":{}}}",
2907 op,
2908 lit.negated,
2909 json_escape(&lit.atom.predicate),
2910 lit.atom.arity
2911 )
2912}
2913
2914fn epistemic_gpu_plan_json(plan: &xlog_ir::EpistemicGpuPlan) -> String {
2915 let mode = match plan.mode {
2916 xlog_ir::EirEpistemicMode::G91 => "g91",
2917 xlog_ir::EirEpistemicMode::Faeel => "faeel",
2918 };
2919 let literals = plan
2920 .epistemic_literals
2921 .iter()
2922 .map(epistemic_literal_json)
2923 .collect::<Vec<_>>()
2924 .join(",");
2925 let phases = plan
2926 .required_phases
2927 .iter()
2928 .map(|p| format!("\"{:?}\"", p))
2929 .collect::<Vec<_>>()
2930 .join(",");
2931 let kernels = plan
2932 .required_kernel_phases
2933 .iter()
2934 .map(|p| format!("\"{:?}\"", p))
2935 .collect::<Vec<_>>()
2936 .join(",");
2937 let constraints = plan
2938 .constraints
2939 .iter()
2940 .map(|c| {
2941 let idx = c
2942 .literal_indices
2943 .iter()
2944 .map(|i| i.to_string())
2945 .collect::<Vec<_>>()
2946 .join(",");
2947 format!(
2948 "{{\"constraint_index\":{},\"literal_indices\":[{}]}}",
2949 c.constraint_index, idx
2950 )
2951 })
2952 .collect::<Vec<_>>()
2953 .join(",");
2954 let reductions = plan
2955 .reductions
2956 .iter()
2957 .map(|r| {
2958 format!(
2959 "{{\"rule_index\":{},\"head\":\"{}\",\"public_head_arity\":{},\"relational_body_atoms\":{}}}",
2960 r.rule_index,
2961 json_escape(&r.head_predicate),
2962 r.public_head_arity,
2963 r.relational_body_atoms
2964 )
2965 })
2966 .collect::<Vec<_>>()
2967 .join(",");
2968 let f = &plan.cpu_fallbacks;
2969 format!(
2970 "{{\"mode\":\"{}\",\"epistemic_literals\":[{}],\"required_phases\":[{}],\
2971\"required_kernel_phases\":[{}],\"constraints\":[{}],\"reductions\":[{}],\
2972\"cpu_fallbacks\":{{\"candidate_enumeration\":{},\"world_view_validation\":{},\
2973\"solver_search\":{},\"probabilistic_recompute\":{}}},\"cpu_fallback_is_zero\":{}}}",
2974 mode,
2975 literals,
2976 phases,
2977 kernels,
2978 constraints,
2979 reductions,
2980 f.candidate_enumeration,
2981 f.world_view_validation,
2982 f.solver_search,
2983 f.probabilistic_recompute,
2984 f.is_zero()
2985 )
2986}
2987
2988fn epistemic_plan_summary_json(
2989 plan_kind: &str,
2990 gpu_plans: &[(String, &xlog_ir::EpistemicGpuPlan)],
2991) -> String {
2992 let units = gpu_plans
2993 .iter()
2994 .map(|(label, plan)| {
2995 format!(
2996 "{{\"unit\":\"{}\",\"plan\":{}}}",
2997 json_escape(label),
2998 epistemic_gpu_plan_json(plan)
2999 )
3000 })
3001 .collect::<Vec<_>>()
3002 .join(",");
3003 let all_zero = gpu_plans
3004 .iter()
3005 .all(|(_, plan)| plan.cpu_fallbacks.is_zero());
3006 let body = format!(
3008 "{{\"plan_kind\":\"{}\",\"units\":[{}],\"cpu_fallback_total_zero\":{}}}",
3009 json_escape(plan_kind),
3010 units,
3011 all_zero
3012 );
3013 let plan_id = fnv1a_64(&body);
3014 format!(
3015 "{{\"plan_id\":\"epi-{:016x}\",\"plan_kind\":\"{}\",\"units\":[{}],\"cpu_fallback_total_zero\":{}}}",
3016 plan_id,
3017 json_escape(plan_kind),
3018 units,
3019 all_zero
3020 )
3021}
3022
3023#[cfg(test)]
3024mod v086_delta_coalesce_tests {
3025 use super::*;
3026 use std::collections::HashMap;
3027 use std::sync::Arc;
3028
3029 use xlog_core::{MemoryBudget, ScalarType};
3030 use xlog_cuda::{CudaDevice, GpuMemoryManager};
3031
3032 fn test_provider() -> Option<Arc<CudaKernelProvider>> {
3033 let device = Arc::new(CudaDevice::new(0).ok()?);
3034 let budget = MemoryBudget::with_limit(1024 * 1024 * 1024);
3035 let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
3036 Some(Arc::new(CudaKernelProvider::new(device, memory).ok()?))
3037 }
3038
3039 fn test_buffer(provider: &CudaKernelProvider, rows: &[u32]) -> CudaBuffer {
3040 let schema = Schema::new(vec![("id".to_string(), ScalarType::U32)]);
3041 let bytes: Vec<u8> = rows.iter().flat_map(|v| v.to_le_bytes()).collect();
3042 let mut col = provider.memory().alloc::<u8>(bytes.len()).expect("alloc");
3043 provider
3044 .device()
3045 .inner()
3046 .htod_sync_copy_into(&bytes, &mut col)
3047 .expect("upload rows");
3048 let mut d_num_rows = provider.memory().alloc::<u32>(1).expect("alloc rows");
3049 let row_count = rows.len() as u32;
3050 provider
3051 .device()
3052 .inner()
3053 .htod_sync_copy_into(&[row_count], &mut d_num_rows)
3054 .expect("upload row count");
3055 CudaBuffer::from_columns(vec![col.into()], rows.len() as u64, d_num_rows, schema)
3056 }
3057
3058 fn read_u32(provider: &CudaKernelProvider, buffer: &CudaBuffer) -> Vec<u32> {
3059 provider
3060 .download_column::<u32>(buffer, 0)
3061 .expect("download")
3062 }
3063
3064 fn sorted_query_rows(provider: &CudaKernelProvider, result: &LogicEvalResult) -> Vec<u32> {
3065 let mut rows = read_u32(provider, &result.queries[0].buffer);
3066 rows.sort_unstable();
3067 rows
3068 }
3069
3070 #[test]
3071 fn coalesce_batch_cancels_insert_delete_pairs_on_device() {
3072 let provider = match test_provider() {
3073 Some(provider) => provider,
3074 None => {
3075 eprintln!("Skipping test: no CUDA device available");
3076 return;
3077 }
3078 };
3079
3080 let batch = vec![
3081 (
3082 "external_consumer_commit".to_string(),
3083 RelationDelta::new(Some(test_buffer(&provider, &[7, 8])), None),
3084 ),
3085 (
3086 "external_consumer_commit".to_string(),
3087 RelationDelta::new(None, Some(test_buffer(&provider, &[8]))),
3088 ),
3089 (
3090 "external_consumer_commit".to_string(),
3091 RelationDelta::new(Some(test_buffer(&provider, &[9])), None),
3092 ),
3093 ];
3094
3095 let report = coalesce_relation_delta_batch(provider.as_ref(), batch)
3096 .expect("coalesce relation delta batch");
3097 let delta = report
3098 .deltas
3099 .get("external_consumer_commit")
3100 .expect("coalesced relation");
3101 let insert = delta.insert.as_ref().expect("coalesced insert");
3102 assert_eq!(read_u32(&provider, insert), vec![7, 9]);
3103 assert!(delta.delete.as_ref().map(|b| b.is_empty()).unwrap_or(true));
3104 assert_eq!(report.input_delta_count, 3);
3105 assert_eq!(report.changed_relations, 1);
3106 assert_eq!(report.coalesced_insert_rows, 2);
3107 assert_eq!(report.coalesced_delete_rows, 0);
3108 assert_eq!(report.canceled_rows, 1);
3109 }
3110
3111 #[test]
3112 fn relation_delta_batch_updates_runtime_store_and_reports_coalesced_counts() -> Result<()> {
3113 let Some(provider) = test_provider() else {
3114 eprintln!("Skipping test: no CUDA device available");
3115 return Ok(());
3116 };
3117
3118 let source = r#"
3119 pred external_consumer_commit(u32).
3120 pred out(u32).
3121
3122 out(X) :- external_consumer_commit(X).
3123
3124 ?- out(X).
3125 "#;
3126 let program = LogicProgram::compile(source)?;
3127 let mut coalesced_store = program.create_relation_store(provider.clone())?;
3128 let mut coalesced_cache = None;
3129
3130 provider.reset_host_transfer_stats();
3131 provider.reset_d2h_transfer_count();
3132 let report = program.apply_relation_delta_batch(
3133 provider.clone(),
3134 &mut coalesced_store,
3135 &mut coalesced_cache,
3136 vec![
3137 (
3138 "external_consumer_commit".to_string(),
3139 RelationDelta::new(Some(test_buffer(&provider, &[1, 2, 3])), None),
3140 ),
3141 (
3142 "external_consumer_commit".to_string(),
3143 RelationDelta::new(None, Some(test_buffer(&provider, &[2]))),
3144 ),
3145 (
3146 "external_consumer_commit".to_string(),
3147 RelationDelta::new(Some(test_buffer(&provider, &[4])), None),
3148 ),
3149 ],
3150 )?;
3151 let transfer_stats = provider.host_transfer_stats();
3152
3153 assert_eq!(report.input_delta_count, 3);
3154 assert_eq!(report.changed_relations, 1);
3155 assert_eq!(report.insert_rows, 3);
3156 assert_eq!(report.delete_rows, 0);
3157 assert_eq!(report.coalesced_insert_rows, 3);
3158 assert_eq!(report.coalesced_delete_rows, 0);
3159 assert_eq!(report.canceled_rows, 1);
3160 assert_eq!(transfer_stats.dtoh_bytes, 0);
3161 assert_eq!(transfer_stats.dtoh_calls, 0);
3162 assert_eq!(provider.d2h_transfer_count(), 0);
3163
3164 let coalesced = program.evaluate_cached_relation_store(
3165 provider.clone(),
3166 coalesced_cache
3167 .as_ref()
3168 .expect("cached store after delta batch"),
3169 )?;
3170 let coalesced_rows = sorted_query_rows(&provider, &coalesced);
3171
3172 let mut sequential_store = program.create_relation_store(provider.clone())?;
3173 let mut sequential_cache = None;
3174 for delta in [
3175 RelationDelta::new(Some(test_buffer(&provider, &[1, 2, 3])), None),
3176 RelationDelta::new(None, Some(test_buffer(&provider, &[2]))),
3177 RelationDelta::new(Some(test_buffer(&provider, &[4])), None),
3178 ] {
3179 program.apply_relation_deltas(
3180 provider.clone(),
3181 &mut sequential_store,
3182 &mut sequential_cache,
3183 HashMap::from([("external_consumer_commit".to_string(), delta)]),
3184 )?;
3185 }
3186 let sequential = program.evaluate_cached_relation_store(
3187 provider.clone(),
3188 sequential_cache
3189 .as_ref()
3190 .expect("cached store after sequential deltas"),
3191 )?;
3192 let sequential_rows = sorted_query_rows(&provider, &sequential);
3193
3194 let mut replacement_store = program.create_relation_store(provider.clone())?;
3195 replacement_store.put(
3196 "external_consumer_commit",
3197 test_buffer(&provider, &[1, 3, 4]),
3198 );
3199 let replacement =
3200 program.evaluate_with_relation_store(provider.clone(), &replacement_store, false)?;
3201 let replacement_rows = sorted_query_rows(&provider, &replacement);
3202
3203 assert_eq!(coalesced_rows, vec![1, 3, 4]);
3204 assert_eq!(coalesced_rows, sequential_rows);
3205 assert_eq!(coalesced_rows, replacement_rows);
3206 Ok(())
3207 }
3208}