1use std::collections::{HashMap, HashSet};
7use std::sync::{Arc, OnceLock};
8
9#[cfg(test)]
10use xlog_core::ScalarType;
11use xlog_core::{RelId, Result, RuntimeConfig, Schema, XlogError};
12use xlog_cuda::memory::TrackedCudaSlice;
13use xlog_cuda::{CudaBuffer, CudaKernelProvider};
14#[cfg(test)]
15use xlog_ir::{CompareOp, ConstValue, Stratum};
16use xlog_ir::{ExecutionPlan, Expr, JoinType, ProjectExpr, RirNode};
17use xlog_stats::{StatsManager, StatsSnapshot};
18
19use crate::ilp_registry::{IlpRegistry, IlpTaggedResult};
20use crate::profiler::{ExecutionStats, Profiler};
21use crate::RelationStore;
22
23mod delta;
24mod epistemic_workspace;
25mod expression;
26mod join_cache;
27mod node_dispatch;
28mod recursive;
29mod rewrite;
30mod wcoj_cost_model;
31mod wcoj_dispatch;
32#[cfg(feature = "wcoj-phase-timing")]
33pub mod wcoj_phase_timing;
34pub use epistemic_workspace::{
35 EpistemicGpuBatchExecutionResult, EpistemicGpuBatchExecutionTrace,
36 EpistemicGpuCandidateGenerationTrace, EpistemicGpuCandidateValidationTrace,
37 EpistemicGpuConstraintValidationTrace, EpistemicGpuConstraintWorldViewValidationTrace,
38 EpistemicGpuExecutionResult, EpistemicGpuFinalResultMaterializationTrace,
39 EpistemicGpuFinalResultTransferTrace, EpistemicGpuFinalTupleMaterializationTrace,
40 EpistemicGpuKernelTimingTrace, EpistemicGpuMaterializationTrace,
41 EpistemicGpuModelMembershipSource, EpistemicGpuModelMembershipTrace,
42 EpistemicGpuPreparedExecution, EpistemicGpuPropagationTrace, EpistemicGpuProviderIdentity,
43 EpistemicGpuRejectionReason, EpistemicGpuRuntimeCounters, EpistemicGpuRuntimePreflight,
44 EpistemicGpuRuntimeTrace, EpistemicGpuRuntimeWcojCertification,
45 EpistemicGpuTransferBudgetTrace, EpistemicGpuWorkspace, EpistemicGpuWorkspaceCapacities,
46 EpistemicGpuWorkspaceLayout, EpistemicGpuWorkspaceResetTrace,
47 EpistemicGpuWorldViewValidationTrace,
48};
49use join_cache::JoinIndexCache;
50pub use join_cache::JoinIndexCacheStats;
51
52pub struct RelationDelta {
54 pub insert: Option<CudaBuffer>,
56 pub delete: Option<CudaBuffer>,
58}
59
60impl RelationDelta {
61 pub fn new(insert: Option<CudaBuffer>, delete: Option<CudaBuffer>) -> Self {
63 Self { insert, delete }
64 }
65}
66
67#[derive(Clone, Debug, Default, PartialEq, Eq)]
69pub struct DeltaRecomputeStats {
70 pub changed_relations: usize,
72 pub has_deletes: bool,
74 pub affected_sccs: usize,
76 pub recomputed_sccs: usize,
78 pub incremental_sccs: usize,
80}
81
82#[derive(Clone, Debug, Default, PartialEq, Eq)]
84pub struct CommonSubexpressionStats {
85 pub hits: u64,
87 pub misses: u64,
89 pub unsafe_rejections: u64,
91 pub rejection_reasons: Vec<String>,
93}
94
95#[derive(Clone, Debug, PartialEq)]
97pub struct AdaptiveJoinObservation {
98 pub left_rel: RelId,
100 pub right_rel: RelId,
102 pub estimated_output_rows: u64,
104 pub actual_output_rows: u64,
106 pub cardinality_delta_abs: u64,
108 pub estimated_selectivity: f64,
110 pub actual_selectivity: f64,
112 pub selectivity_delta_abs: f64,
114 pub left_heat: f32,
116 pub right_heat: f32,
118 pub heat_delta_abs: f32,
120 pub misplan_ratio: f64,
122}
123
124#[derive(Clone, Copy, Debug, PartialEq, Eq)]
126pub enum AdaptiveReoptimizationAction {
127 Disabled,
129 Skipped,
131 AttemptCandidate,
133 Adopted,
135 RolledBack,
137}
138
139#[derive(Clone, Debug, PartialEq)]
141pub struct AdaptiveReoptimizationDecision {
142 pub action: AdaptiveReoptimizationAction,
144 pub reason: String,
146 pub max_misplan_ratio: f64,
148 pub min_misplan_ratio: f64,
150}
151
152#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub enum AdaptiveReoptimizationDiagnosticKind {
155 CandidateExecutionFailed,
157 CandidateOutputMismatch,
159 RollbackRestoredBaseline,
161}
162
163#[derive(Clone, Debug, PartialEq, Eq)]
165pub struct AdaptiveReoptimizationDiagnostic {
166 pub kind: AdaptiveReoptimizationDiagnosticKind,
168 pub message: String,
170}
171
172#[derive(Clone, Debug, Default, PartialEq)]
174pub struct AdaptiveReoptimizationStats {
175 pub invocations: u64,
177 pub disabled: u64,
179 pub skipped: u64,
181 pub adopted: u64,
183 pub rolled_back: u64,
185 pub last_decision: Option<AdaptiveReoptimizationDecision>,
187 pub last_observations: Vec<AdaptiveJoinObservation>,
189 pub diagnostics: Vec<AdaptiveReoptimizationDiagnostic>,
191 pub data_plane_dtoh_calls: u64,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Hash)]
196enum CommonSubexpressionKey {
197 Scan {
198 rel: RelId,
199 generation: u64,
200 },
201 Filter {
202 input: Box<CommonSubexpressionKey>,
203 predicate: String,
204 },
205 Project {
206 input: Box<CommonSubexpressionKey>,
207 columns: Vec<String>,
208 },
209 Join {
210 left: Box<CommonSubexpressionKey>,
211 right: Box<CommonSubexpressionKey>,
212 left_keys: Vec<usize>,
213 right_keys: Vec<usize>,
214 },
215 Union {
216 inputs: Vec<CommonSubexpressionKey>,
217 },
218 Distinct {
219 input: Box<CommonSubexpressionKey>,
220 key_cols: Vec<usize>,
221 },
222}
223
224pub struct Executor {
244 provider: Arc<CudaKernelProvider>,
246 store: RelationStore,
248 rel_names: HashMap<RelId, String>,
250 name_to_rel: HashMap<String, RelId>,
252 stats: StatsManager,
254 join_index_cache: JoinIndexCache,
256 common_subexpression_cache: HashMap<CommonSubexpressionKey, CudaBuffer>,
258 common_subexpression_stats: CommonSubexpressionStats,
260 adaptive_reoptimization_stats: AdaptiveReoptimizationStats,
262 adaptive_join_observations: Vec<AdaptiveJoinObservation>,
264 config: RuntimeConfig,
266 profiler: Profiler,
268 ilp_registry: IlpRegistry,
270 ilp_last_result: Option<IlpTaggedResult>,
272 wcoj_triangle_dispatch_count: u64,
278 pub(super) wcoj_4cycle_dispatch_count: u64,
281 pub(super) chain_dispatch_count: u64,
284 pub(super) wcoj_clique5_dispatch_count: u64,
288 pub(super) wcoj_clique6_dispatch_count: u64,
291 pub(super) wcoj_clique7_dispatch_count: u64,
294 pub(super) wcoj_clique8_dispatch_count: u64,
297 pub(super) kclique_histogram_refresh_count: u64,
300 pub(super) kclique_histogram_refresh_nanos: u128,
303 pub(super) nested_loop_dispatch_count: u64,
312 pub(super) wcoj_error_decline_count: u64,
319 pub(super) wcoj_groupby_fusion_dispatch_count: u64,
323 pub(super) free_join_dispatch_count: u64,
327 pub(super) factorized_delta_dispatch_count: u64,
331 wcoj_dispatch_stream: OnceLock<xlog_cuda::device_runtime::StreamId>,
346 #[cfg(feature = "wcoj-phase-timing")]
353 pub(super) last_wcoj_phase_timing:
354 std::sync::Mutex<Option<wcoj_phase_timing::WcojDispatchPhaseTiming>>,
355 #[cfg(feature = "recursive-stats-trace")]
364 pub(super) last_recursive_stats_trace: RecursiveStatsTrace,
365}
366
367#[cfg(feature = "recursive-stats-trace")]
376#[derive(Debug, Default, Clone)]
377#[allow(missing_docs)]
378pub struct RecursiveStatsTrace {
379 pub entries: Vec<RecursiveStatsTraceEntry>,
380}
381
382#[cfg(feature = "recursive-stats-trace")]
390#[derive(Debug, Clone)]
391#[allow(missing_docs)]
392pub struct RecursiveStatsTraceEntry {
393 pub iteration: usize,
394 pub pred: String,
395 pub full_rel: RelId,
396 pub delta_rel: RelId,
397 pub full_rows: u64,
398 pub delta_rows: u64,
399 pub phase: RecursiveStatsPhase,
400 pub binary_est_for_variant: Option<u64>,
406}
407
408#[cfg(feature = "recursive-stats-trace")]
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410#[allow(missing_docs)]
411pub enum RecursiveStatsPhase {
412 Seed,
415 Phase2Delta,
419 Phase4Full,
423}
424
425impl Executor {
426 pub fn new(provider: Arc<CudaKernelProvider>) -> Self {
431 Self::new_with_config(provider, RuntimeConfig::default())
432 }
433
434 pub fn new_with_config(provider: Arc<CudaKernelProvider>, config: RuntimeConfig) -> Self {
436 const DEFAULT_JOIN_INDEX_CACHE_BYTES: u64 = 256 * 1024 * 1024;
437 let max_index_cache_bytes =
438 (provider.memory().budget().device_bytes / 4).min(DEFAULT_JOIN_INDEX_CACHE_BYTES);
439 Self {
440 provider: provider.clone(),
441 store: RelationStore::new(provider.clone()),
442 rel_names: HashMap::new(),
443 name_to_rel: HashMap::new(),
444 stats: StatsManager::new(),
445 join_index_cache: JoinIndexCache::new(max_index_cache_bytes),
446 common_subexpression_cache: HashMap::new(),
447 common_subexpression_stats: CommonSubexpressionStats::default(),
448 adaptive_reoptimization_stats: AdaptiveReoptimizationStats::default(),
449 adaptive_join_observations: Vec::new(),
450 config,
451 profiler: Profiler::default(),
452 ilp_registry: IlpRegistry::new(),
453 ilp_last_result: None,
454 wcoj_triangle_dispatch_count: 0,
455 wcoj_4cycle_dispatch_count: 0,
456 chain_dispatch_count: 0,
457 wcoj_clique5_dispatch_count: 0,
458 wcoj_clique6_dispatch_count: 0,
459 wcoj_clique7_dispatch_count: 0,
460 wcoj_clique8_dispatch_count: 0,
461 kclique_histogram_refresh_count: 0,
462 kclique_histogram_refresh_nanos: 0,
463 nested_loop_dispatch_count: 0,
464 wcoj_error_decline_count: 0,
465 wcoj_groupby_fusion_dispatch_count: 0,
466 free_join_dispatch_count: 0,
467 factorized_delta_dispatch_count: 0,
468 wcoj_dispatch_stream: OnceLock::new(),
469 #[cfg(feature = "wcoj-phase-timing")]
470 last_wcoj_phase_timing: std::sync::Mutex::new(None),
471 #[cfg(feature = "recursive-stats-trace")]
472 last_recursive_stats_trace: RecursiveStatsTrace::default(),
473 }
474 }
475
476 #[cfg(feature = "recursive-stats-trace")]
480 pub fn last_recursive_stats_trace(&self) -> &RecursiveStatsTrace {
481 &self.last_recursive_stats_trace
482 }
483
484 #[cfg(feature = "wcoj-phase-timing")]
493 pub fn take_wcoj_phase_timing(&self) -> Option<wcoj_phase_timing::WcojDispatchPhaseTiming> {
494 self.last_wcoj_phase_timing
495 .lock()
496 .ok()
497 .and_then(|mut g| g.take())
498 }
499
500 pub fn set_profiling(&mut self, enabled: bool) {
504 self.profiler = Profiler::new(enabled);
505 if enabled {
506 self.profiler
507 .set_memory_budget(self.provider.memory().budget().device_bytes);
508 }
509 }
510
511 pub fn is_profiling(&self) -> bool {
513 self.profiler.is_enabled()
514 }
515
516 pub fn execution_stats(&self, total_output_rows: u64) -> ExecutionStats {
520 self.profiler.execution_stats(total_output_rows)
521 }
522
523 pub fn store(&self) -> &RelationStore {
525 &self.store
526 }
527
528 pub fn store_mut(&mut self) -> &mut RelationStore {
530 &mut self.store
531 }
532
533 pub fn ilp_registry_mut(&mut self) -> &mut IlpRegistry {
535 &mut self.ilp_registry
536 }
537
538 pub fn ilp_registry(&self) -> &IlpRegistry {
540 &self.ilp_registry
541 }
542
543 pub fn ilp_last_result(&self) -> Option<&IlpTaggedResult> {
545 self.ilp_last_result.as_ref()
546 }
547
548 pub fn put_relation(&mut self, name: &str, buffer: CudaBuffer) {
550 self.store_put(name, buffer);
551 }
552
553 pub fn stats(&self) -> &StatsManager {
555 &self.stats
556 }
557
558 pub fn join_index_cache_stats(&self) -> JoinIndexCacheStats {
560 self.join_index_cache.stats()
561 }
562
563 pub fn reset_for_mc(&mut self) {
567 self.store.clear();
568 self.join_index_cache.clear();
569 self.common_subexpression_cache.clear();
570 self.adaptive_join_observations.clear();
571 }
572
573 pub fn reset_for_mc_relations(
586 &mut self,
587 preserve: &[&str],
588 clear_to_empty: &[(&str, Schema)],
589 ) -> Result<()> {
590 let preserve_set: HashSet<&str> = preserve.iter().copied().collect();
591 let existing_names: Vec<String> = self.store.names().map(|s| s.to_string()).collect();
592
593 for name in &existing_names {
594 if !preserve_set.contains(name.as_str()) {
595 self.store.remove(name);
596 }
597 }
598
599 for (name, schema) in clear_to_empty {
600 let empty = self.provider.create_empty_buffer(schema.clone())?;
601 self.store.put(name, empty);
602 }
603
604 self.join_index_cache.clear();
605 self.common_subexpression_cache.clear();
606 self.adaptive_join_observations.clear();
607 Ok(())
608 }
609
610 pub fn reset_for_ilp(&mut self) {
617 self.ilp_registry.clear();
618 self.ilp_last_result = None;
619 self.store.clear();
620 self.join_index_cache.clear();
621 self.common_subexpression_cache.clear();
622 self.adaptive_join_observations.clear();
623 self.stats = StatsManager::new();
624 self.profiler = Profiler::default();
625 }
626
627 pub fn stats_mut(&mut self) -> &mut StatsManager {
629 &mut self.stats
630 }
631
632 pub fn stats_snapshot(&self) -> StatsSnapshot {
636 let mut snapshot = self.stats.snapshot();
637 snapshot.rel_names = self
638 .rel_names
639 .iter()
640 .map(|(id, name)| (*id, name.clone()))
641 .collect();
642 snapshot
643 }
644
645 pub fn common_subexpression_stats(&self) -> &CommonSubexpressionStats {
647 &self.common_subexpression_stats
648 }
649
650 pub fn adaptive_reoptimization_stats(&self) -> &AdaptiveReoptimizationStats {
652 &self.adaptive_reoptimization_stats
653 }
654
655 pub fn replay_adaptive_reoptimization_decision(
657 &self,
658 observations: &[AdaptiveJoinObservation],
659 ) -> AdaptiveReoptimizationDecision {
660 self.adaptive_reoptimization_decision(observations)
661 }
662
663 fn common_subexpression_enabled(&self) -> bool {
664 self.config.resolved_common_subexpression_elimination()
665 }
666
667 fn adaptive_reoptimization_enabled(&self) -> bool {
668 self.config.resolved_adaptive_reoptimization()
669 }
670
671 fn adaptive_reoptimization_decision(
672 &self,
673 observations: &[AdaptiveJoinObservation],
674 ) -> AdaptiveReoptimizationDecision {
675 let min_misplan_ratio = self
676 .config
677 .resolved_adaptive_reoptimization_min_misplan_ratio();
678 let max_misplan_ratio = observations
679 .iter()
680 .map(|observation| observation.misplan_ratio)
681 .fold(1.0_f64, f64::max);
682
683 if !self.adaptive_reoptimization_enabled() {
684 return AdaptiveReoptimizationDecision {
685 action: AdaptiveReoptimizationAction::Disabled,
686 reason: "adaptive_reoptimization_disabled".to_string(),
687 max_misplan_ratio,
688 min_misplan_ratio,
689 };
690 }
691
692 if observations.is_empty() {
693 return AdaptiveReoptimizationDecision {
694 action: AdaptiveReoptimizationAction::Skipped,
695 reason: "no_join_telemetry".to_string(),
696 max_misplan_ratio,
697 min_misplan_ratio,
698 };
699 }
700
701 if max_misplan_ratio >= min_misplan_ratio {
702 AdaptiveReoptimizationDecision {
703 action: AdaptiveReoptimizationAction::AttemptCandidate,
704 reason: "misplan_threshold_crossed".to_string(),
705 max_misplan_ratio,
706 min_misplan_ratio,
707 }
708 } else {
709 AdaptiveReoptimizationDecision {
710 action: AdaptiveReoptimizationAction::Skipped,
711 reason: "misplan_threshold_not_crossed".to_string(),
712 max_misplan_ratio,
713 min_misplan_ratio,
714 }
715 }
716 }
717
718 fn record_adaptive_join_observation(
719 &mut self,
720 left_rel: RelId,
721 right_rel: RelId,
722 left_keys: &[usize],
723 right_keys: &[usize],
724 input_rows: u64,
725 actual_output_rows: u64,
726 ) {
727 let estimated_output_rows = self
728 .stats
729 .estimate_join_cardinality(left_rel, right_rel, left_keys, right_keys);
730 let estimated_selectivity = if input_rows > 0 {
731 estimated_output_rows as f64 / input_rows as f64
732 } else {
733 0.0
734 };
735 let actual_selectivity = if input_rows > 0 {
736 actual_output_rows as f64 / input_rows as f64
737 } else {
738 0.0
739 };
740 let cardinality_delta_abs = estimated_output_rows.abs_diff(actual_output_rows);
741 let selectivity_delta_abs = (estimated_selectivity - actual_selectivity).abs();
742 let left_heat = self
743 .stats
744 .get_relation_stats(left_rel)
745 .map(|stats| stats.heat)
746 .unwrap_or(0.0);
747 let right_heat = self
748 .stats
749 .get_relation_stats(right_rel)
750 .map(|stats| stats.heat)
751 .unwrap_or(0.0);
752 let heat_delta_abs = (left_heat - right_heat).abs();
753 let smaller = estimated_output_rows.min(actual_output_rows);
754 let larger = estimated_output_rows.max(actual_output_rows);
755 let misplan_ratio = if smaller == 0 {
756 if larger == 0 {
757 1.0
758 } else {
759 f64::INFINITY
760 }
761 } else {
762 (larger as f64 / smaller as f64).max(1.0)
763 };
764
765 self.adaptive_join_observations
766 .push(AdaptiveJoinObservation {
767 left_rel,
768 right_rel,
769 estimated_output_rows,
770 actual_output_rows,
771 cardinality_delta_abs,
772 estimated_selectivity,
773 actual_selectivity,
774 selectivity_delta_abs,
775 left_heat,
776 right_heat,
777 heat_delta_abs,
778 misplan_ratio,
779 });
780 }
781
782 fn plan_head_names(plan: &ExecutionPlan) -> Vec<String> {
783 let mut names = Vec::new();
784 for stratum in &plan.strata {
785 for scc_id in &stratum.sccs {
786 if let Some(rules) = plan.rules_by_scc.get(*scc_id as usize) {
787 for rule in rules {
788 if !names.iter().any(|name| name == &rule.head) {
789 names.push(rule.head.clone());
790 }
791 }
792 }
793 }
794 }
795
796 if names.is_empty() {
797 for rules in &plan.rules_by_scc {
798 for rule in rules {
799 if !names.iter().any(|name| name == &rule.head) {
800 names.push(rule.head.clone());
801 }
802 }
803 }
804 }
805
806 names
807 }
808
809 fn clone_store_snapshot(&self) -> Result<HashMap<String, CudaBuffer>> {
810 let names: Vec<String> = self.store.names().map(|name| name.to_string()).collect();
811 let mut snapshot = HashMap::with_capacity(names.len());
812 for name in names {
813 if let Some(buffer) = self.store.get(&name) {
814 snapshot.insert(name, self.clone_buffer(buffer)?);
815 }
816 }
817 Ok(snapshot)
818 }
819
820 fn restore_store_snapshot(&mut self, snapshot: HashMap<String, CudaBuffer>) {
821 let snapshot_names: HashSet<String> = snapshot.keys().cloned().collect();
822 let existing_names: Vec<String> = self.store.names().map(|name| name.to_string()).collect();
823 for name in existing_names {
824 if !snapshot_names.contains(&name) {
825 self.store.remove(&name);
826 }
827 }
828 for (name, buffer) in snapshot {
829 self.store.put(&name, buffer);
830 }
831 }
832
833 fn restore_stats_snapshot(&mut self, snapshot: &StatsSnapshot) {
834 self.stats.clear();
835 self.stats.merge_snapshot(snapshot);
836 }
837
838 fn clone_final_plan_output(&self, plan: &ExecutionPlan) -> Result<CudaBuffer> {
839 let head_names = Self::plan_head_names(plan);
840 if let Some(name) = head_names.last() {
841 let output = self.store.get(name).ok_or_else(|| {
842 XlogError::Execution(format!("adaptive reoptimization output missing: {name}"))
843 })?;
844 return self.clone_buffer(output);
845 }
846
847 self.provider.create_empty_buffer(Schema::new(vec![]))
848 }
849
850 fn plan_outputs_match(
851 &self,
852 head_names: &[String],
853 baseline_snapshot: &HashMap<String, CudaBuffer>,
854 ) -> Result<bool> {
855 for name in head_names {
856 let Some(baseline) = baseline_snapshot.get(name) else {
857 return Ok(false);
858 };
859 let Some(candidate) = self.store.get(name) else {
860 return Ok(false);
861 };
862 if !self.buffers_gpu_set_equivalent(baseline, candidate)? {
863 return Ok(false);
864 }
865 }
866 Ok(true)
867 }
868
869 fn buffers_gpu_set_equivalent(&self, left: &CudaBuffer, right: &CudaBuffer) -> Result<bool> {
870 if left.schema() != right.schema() {
871 return Ok(false);
872 }
873 let left_rows = self.provider.device_row_count(left)?;
874 let right_rows = self.provider.device_row_count(right)?;
875 if left_rows != right_rows {
876 return Ok(false);
877 }
878
879 let left_minus_right = self.provider.diff_full_row(left, right)?;
880 if self.provider.device_row_count(&left_minus_right)? != 0 {
881 return Ok(false);
882 }
883 let right_minus_left = self.provider.diff_full_row(right, left)?;
884 Ok(self.provider.device_row_count(&right_minus_left)? == 0)
885 }
886
887 fn record_adaptive_dtoh_delta(&mut self, before_dtoh_calls: u64) {
888 let after_dtoh_calls = self.provider.host_transfer_stats().dtoh_calls;
889 self.adaptive_reoptimization_stats.data_plane_dtoh_calls =
890 after_dtoh_calls.saturating_sub(before_dtoh_calls);
891 }
892
893 fn is_common_subexpression_cacheable(node: &RirNode) -> bool {
894 !matches!(node, RirNode::Unit | RirNode::Scan { .. })
895 }
896
897 fn record_common_subexpression_rejection(&mut self, reason: &'static str) {
898 self.common_subexpression_stats.unsafe_rejections = self
899 .common_subexpression_stats
900 .unsafe_rejections
901 .saturating_add(1);
902 if !self
903 .common_subexpression_stats
904 .rejection_reasons
905 .iter()
906 .any(|seen| seen == reason)
907 {
908 self.common_subexpression_stats
909 .rejection_reasons
910 .push(reason.to_string());
911 }
912 }
913
914 fn common_subexpression_key(&mut self, node: &RirNode) -> Option<CommonSubexpressionKey> {
915 match node {
916 RirNode::Unit => None,
917 RirNode::Scan { rel } => {
918 let generation = self
919 .get_rel_name(*rel)
920 .and_then(|name| self.store.version(name))
921 .unwrap_or(0);
922 Some(CommonSubexpressionKey::Scan {
923 rel: *rel,
924 generation,
925 })
926 }
927 RirNode::Filter { input, predicate } => {
928 let input = self.common_subexpression_key(input)?;
929 Some(CommonSubexpressionKey::Filter {
930 input: Box::new(input),
931 predicate: Self::expr_cse_key(predicate),
932 })
933 }
934 RirNode::Project { input, columns } => {
935 let input = self.common_subexpression_key(input)?;
936 Some(CommonSubexpressionKey::Project {
937 input: Box::new(input),
938 columns: columns.iter().map(Self::project_expr_cse_key).collect(),
939 })
940 }
941 RirNode::Join {
942 left,
943 right,
944 left_keys,
945 right_keys,
946 join_type,
947 } => {
948 if *join_type != JoinType::Inner {
949 self.record_common_subexpression_rejection("negation_or_outer_join_boundary");
950 return None;
951 }
952 let left = self.common_subexpression_key(left)?;
953 let right = self.common_subexpression_key(right)?;
954 Some(CommonSubexpressionKey::Join {
955 left: Box::new(left),
956 right: Box::new(right),
957 left_keys: left_keys.clone(),
958 right_keys: right_keys.clone(),
959 })
960 }
961 RirNode::Union { inputs } => {
962 let mut input_keys = Vec::with_capacity(inputs.len());
963 for input in inputs {
964 input_keys.push(self.common_subexpression_key(input)?);
965 }
966 Some(CommonSubexpressionKey::Union { inputs: input_keys })
967 }
968 RirNode::Distinct { input, key_cols } => {
969 let input = self.common_subexpression_key(input)?;
970 Some(CommonSubexpressionKey::Distinct {
971 input: Box::new(input),
972 key_cols: key_cols.clone(),
973 })
974 }
975 RirNode::Diff { .. } => {
976 self.record_common_subexpression_rejection("negation_or_difference_boundary");
977 None
978 }
979 RirNode::GroupBy { .. } => {
980 self.record_common_subexpression_rejection("aggregate_boundary");
981 None
982 }
983 RirNode::Fixpoint { .. } => {
984 self.record_common_subexpression_rejection("recursive_or_mutable_boundary");
985 None
986 }
987 RirNode::TensorMaskedJoin { .. } => {
988 self.record_common_subexpression_rejection("provenance_or_tensor_boundary");
989 None
990 }
991 RirNode::MultiWayJoin { .. } | RirNode::ChainJoin { .. } => {
992 self.record_common_subexpression_rejection("specialized_dispatch_boundary");
993 None
994 }
995 }
996 }
997
998 fn expr_cse_key(expr: &Expr) -> String {
999 match expr {
1000 Expr::Column(idx) => format!("col:{idx}"),
1001 Expr::Const(value) => format!("const:{}", Self::const_cse_key(value)),
1002 Expr::Compare { left, op, right } => format!(
1003 "cmp:{}:{}:{}",
1004 Self::expr_cse_key(left),
1005 Self::compare_op_cse_key(*op),
1006 Self::expr_cse_key(right)
1007 ),
1008 Expr::And(items) => format!(
1009 "and:[{}]",
1010 items
1011 .iter()
1012 .map(Self::expr_cse_key)
1013 .collect::<Vec<_>>()
1014 .join(",")
1015 ),
1016 Expr::Or(items) => format!(
1017 "or:[{}]",
1018 items
1019 .iter()
1020 .map(Self::expr_cse_key)
1021 .collect::<Vec<_>>()
1022 .join(",")
1023 ),
1024 Expr::Not(inner) => format!("not:{}", Self::expr_cse_key(inner)),
1025 Expr::Add(left, right) => {
1026 format!(
1027 "add:{}:{}",
1028 Self::expr_cse_key(left),
1029 Self::expr_cse_key(right)
1030 )
1031 }
1032 Expr::Sub(left, right) => {
1033 format!(
1034 "sub:{}:{}",
1035 Self::expr_cse_key(left),
1036 Self::expr_cse_key(right)
1037 )
1038 }
1039 Expr::Mul(left, right) => {
1040 format!(
1041 "mul:{}:{}",
1042 Self::expr_cse_key(left),
1043 Self::expr_cse_key(right)
1044 )
1045 }
1046 Expr::Div(left, right) => {
1047 format!(
1048 "div:{}:{}",
1049 Self::expr_cse_key(left),
1050 Self::expr_cse_key(right)
1051 )
1052 }
1053 Expr::Mod(left, right) => {
1054 format!(
1055 "mod:{}:{}",
1056 Self::expr_cse_key(left),
1057 Self::expr_cse_key(right)
1058 )
1059 }
1060 Expr::Abs(inner) => format!("abs:{}", Self::expr_cse_key(inner)),
1061 Expr::Min(left, right) => {
1062 format!(
1063 "min:{}:{}",
1064 Self::expr_cse_key(left),
1065 Self::expr_cse_key(right)
1066 )
1067 }
1068 Expr::Max(left, right) => {
1069 format!(
1070 "max:{}:{}",
1071 Self::expr_cse_key(left),
1072 Self::expr_cse_key(right)
1073 )
1074 }
1075 Expr::Pow(left, right) => {
1076 format!(
1077 "pow:{}:{}",
1078 Self::expr_cse_key(left),
1079 Self::expr_cse_key(right)
1080 )
1081 }
1082 Expr::Cast(inner, ty) => format!("cast:{:?}:{}", ty, Self::expr_cse_key(inner)),
1083 Expr::Conditional {
1084 condition,
1085 then_expr,
1086 else_expr,
1087 } => format!(
1088 "if:{}:{}:{}",
1089 Self::expr_cse_key(condition),
1090 Self::expr_cse_key(then_expr),
1091 Self::expr_cse_key(else_expr)
1092 ),
1093 }
1094 }
1095
1096 fn project_expr_cse_key(expr: &ProjectExpr) -> String {
1097 match expr {
1098 ProjectExpr::Column(idx) => format!("col:{idx}"),
1099 ProjectExpr::Computed(expr, ty) => {
1100 format!("computed:{:?}:{}", ty, Self::expr_cse_key(expr))
1101 }
1102 }
1103 }
1104
1105 fn const_cse_key(value: &xlog_ir::ConstValue) -> String {
1106 match value {
1107 xlog_ir::ConstValue::U32(value) => format!("u32:{value}"),
1108 xlog_ir::ConstValue::U64(value) => format!("u64:{value}"),
1109 xlog_ir::ConstValue::I32(value) => format!("i32:{value}"),
1110 xlog_ir::ConstValue::I64(value) => format!("i64:{value}"),
1111 xlog_ir::ConstValue::F32(value) => format!("f32:{:08x}", value.to_bits()),
1112 xlog_ir::ConstValue::F64(value) => format!("f64:{:016x}", value.to_bits()),
1113 xlog_ir::ConstValue::Bool(value) => format!("bool:{value}"),
1114 xlog_ir::ConstValue::Symbol(value) => format!("symbol:{value:?}"),
1115 }
1116 }
1117
1118 fn compare_op_cse_key(op: xlog_ir::CompareOp) -> &'static str {
1119 match op {
1120 xlog_ir::CompareOp::Eq => "eq",
1121 xlog_ir::CompareOp::Ne => "ne",
1122 xlog_ir::CompareOp::Lt => "lt",
1123 xlog_ir::CompareOp::Le => "le",
1124 xlog_ir::CompareOp::Gt => "gt",
1125 xlog_ir::CompareOp::Ge => "ge",
1126 }
1127 }
1128
1129 fn store_put(&mut self, name: &str, buffer: CudaBuffer) {
1130 self.common_subexpression_cache.clear();
1131 self.store.put(name, buffer);
1132 if let Some(&rel_id) = self.name_to_rel.get(name) {
1133 self.join_index_cache.invalidate_rel(rel_id);
1134 }
1135 }
1136
1137 fn store_remove(&mut self, name: &str) -> Option<CudaBuffer> {
1138 self.common_subexpression_cache.clear();
1139 if let Some(&rel_id) = self.name_to_rel.get(name) {
1140 self.join_index_cache.invalidate_rel(rel_id);
1141 }
1142 self.store.remove(name)
1143 }
1144
1145 pub fn register_relation(&mut self, rel_id: RelId, name: &str) {
1154 self.rel_names.insert(rel_id, name.to_string());
1155 self.name_to_rel.insert(name.to_string(), rel_id);
1156 self.stats.register_relation(rel_id);
1157 }
1158
1159 fn name_to_rel_id(&self, name: &str) -> Option<RelId> {
1167 self.name_to_rel.get(name).copied()
1168 }
1169
1170 fn get_rel_name(&self, rel_id: RelId) -> Option<&str> {
1172 self.rel_names.get(&rel_id).map(|s| s.as_str())
1173 }
1174
1175 pub fn execute_plan_with_adaptive_candidate(
1185 &mut self,
1186 baseline_plan: &ExecutionPlan,
1187 candidate_plan: &ExecutionPlan,
1188 ) -> Result<CudaBuffer> {
1189 self.adaptive_reoptimization_stats.invocations = self
1190 .adaptive_reoptimization_stats
1191 .invocations
1192 .saturating_add(1);
1193 self.adaptive_reoptimization_stats.diagnostics.clear();
1194 let before_dtoh_calls = self.provider.host_transfer_stats().dtoh_calls;
1195
1196 self.execute_plan(baseline_plan)?;
1197 let baseline_observations = self.adaptive_join_observations.clone();
1198 self.adaptive_reoptimization_stats.last_observations = baseline_observations.clone();
1199 let decision = self.adaptive_reoptimization_decision(&baseline_observations);
1200 self.adaptive_reoptimization_stats.last_decision = Some(decision.clone());
1201
1202 match decision.action {
1203 AdaptiveReoptimizationAction::Disabled => {
1204 self.adaptive_reoptimization_stats.disabled = self
1205 .adaptive_reoptimization_stats
1206 .disabled
1207 .saturating_add(1);
1208 self.record_adaptive_dtoh_delta(before_dtoh_calls);
1209 return self.clone_final_plan_output(baseline_plan);
1210 }
1211 AdaptiveReoptimizationAction::Skipped => {
1212 self.adaptive_reoptimization_stats.skipped =
1213 self.adaptive_reoptimization_stats.skipped.saturating_add(1);
1214 self.record_adaptive_dtoh_delta(before_dtoh_calls);
1215 return self.clone_final_plan_output(baseline_plan);
1216 }
1217 AdaptiveReoptimizationAction::AttemptCandidate => {}
1218 AdaptiveReoptimizationAction::Adopted | AdaptiveReoptimizationAction::RolledBack => {
1219 unreachable!("decision replay never returns terminal adaptive actions")
1220 }
1221 }
1222
1223 let head_names = Self::plan_head_names(baseline_plan);
1224 let baseline_snapshot = self.clone_store_snapshot()?;
1225 let baseline_stats_snapshot = self.stats_snapshot();
1226
1227 if let Err(err) = self.execute_plan(candidate_plan) {
1228 self.restore_store_snapshot(baseline_snapshot);
1229 self.restore_stats_snapshot(&baseline_stats_snapshot);
1230 self.adaptive_reoptimization_stats.rolled_back = self
1231 .adaptive_reoptimization_stats
1232 .rolled_back
1233 .saturating_add(1);
1234 self.adaptive_reoptimization_stats
1235 .diagnostics
1236 .push(AdaptiveReoptimizationDiagnostic {
1237 kind: AdaptiveReoptimizationDiagnosticKind::CandidateExecutionFailed,
1238 message: err.to_string(),
1239 });
1240 self.adaptive_reoptimization_stats
1241 .diagnostics
1242 .push(AdaptiveReoptimizationDiagnostic {
1243 kind: AdaptiveReoptimizationDiagnosticKind::RollbackRestoredBaseline,
1244 message: "baseline_snapshot_restored".to_string(),
1245 });
1246 self.adaptive_reoptimization_stats.last_observations = baseline_observations;
1247 self.adaptive_reoptimization_stats.last_decision =
1248 Some(AdaptiveReoptimizationDecision {
1249 action: AdaptiveReoptimizationAction::RolledBack,
1250 reason: "candidate_execution_failed".to_string(),
1251 max_misplan_ratio: decision.max_misplan_ratio,
1252 min_misplan_ratio: decision.min_misplan_ratio,
1253 });
1254 self.record_adaptive_dtoh_delta(before_dtoh_calls);
1255 return self.clone_final_plan_output(baseline_plan);
1256 }
1257
1258 if !self.plan_outputs_match(&head_names, &baseline_snapshot)? {
1259 self.restore_store_snapshot(baseline_snapshot);
1260 self.restore_stats_snapshot(&baseline_stats_snapshot);
1261 self.adaptive_reoptimization_stats.rolled_back = self
1262 .adaptive_reoptimization_stats
1263 .rolled_back
1264 .saturating_add(1);
1265 self.adaptive_reoptimization_stats
1266 .diagnostics
1267 .push(AdaptiveReoptimizationDiagnostic {
1268 kind: AdaptiveReoptimizationDiagnosticKind::CandidateOutputMismatch,
1269 message: "candidate_output_mismatch".to_string(),
1270 });
1271 self.adaptive_reoptimization_stats
1272 .diagnostics
1273 .push(AdaptiveReoptimizationDiagnostic {
1274 kind: AdaptiveReoptimizationDiagnosticKind::RollbackRestoredBaseline,
1275 message: "baseline_snapshot_restored".to_string(),
1276 });
1277 self.adaptive_reoptimization_stats.last_observations = baseline_observations;
1278 self.adaptive_reoptimization_stats.last_decision =
1279 Some(AdaptiveReoptimizationDecision {
1280 action: AdaptiveReoptimizationAction::RolledBack,
1281 reason: "candidate_output_mismatch".to_string(),
1282 max_misplan_ratio: decision.max_misplan_ratio,
1283 min_misplan_ratio: decision.min_misplan_ratio,
1284 });
1285 self.record_adaptive_dtoh_delta(before_dtoh_calls);
1286 return self.clone_final_plan_output(baseline_plan);
1287 }
1288
1289 self.adaptive_reoptimization_stats.adopted =
1290 self.adaptive_reoptimization_stats.adopted.saturating_add(1);
1291 self.adaptive_reoptimization_stats.last_observations = baseline_observations;
1292 self.adaptive_reoptimization_stats.last_decision = Some(AdaptiveReoptimizationDecision {
1293 action: AdaptiveReoptimizationAction::Adopted,
1294 reason: "candidate_adopted".to_string(),
1295 max_misplan_ratio: decision.max_misplan_ratio,
1296 min_misplan_ratio: decision.min_misplan_ratio,
1297 });
1298 self.record_adaptive_dtoh_delta(before_dtoh_calls);
1299 self.clone_final_plan_output(candidate_plan)
1300 }
1301
1302 pub fn execute_plan(&mut self, plan: &ExecutionPlan) -> Result<CudaBuffer> {
1316 self.adaptive_join_observations.clear();
1317 self.common_subexpression_cache.clear();
1318 let gate = self.config.strict_deterministic_d2h;
1325 let prev_gate = self.provider.strict_deterministic_d2h_enabled();
1326 if gate && !prev_gate {
1327 self.provider.reset_deterministic_d2h_violations();
1332 self.provider.enable_strict_deterministic_d2h();
1333 }
1334 let _gate_guard = D2hGateGuard {
1337 provider: Arc::clone(&self.provider),
1338 engaged: gate,
1339 previous: prev_gate,
1340 };
1341
1342 for (idx, stratum) in plan.strata.iter().enumerate() {
1344 let (num_rules, is_recursive) = stratum
1346 .sccs
1347 .iter()
1348 .map(|&scc_id| {
1349 let rules = plan
1350 .rules_by_scc
1351 .get(scc_id as usize)
1352 .map(|r| r.len())
1353 .unwrap_or(0);
1354 let recursive = plan
1355 .sccs
1356 .get(scc_id as usize)
1357 .map(|s| s.is_recursive)
1358 .unwrap_or(false);
1359 (rules, recursive)
1360 })
1361 .fold((0, false), |(r, rec), (nr, nrec)| (r + nr, rec || nrec));
1362
1363 self.profiler.begin_stratum(idx, num_rules, is_recursive);
1364 self.execute_stratum_impl(stratum, plan)?;
1365
1366 let mem_bytes = self.provider.memory().allocated_bytes();
1368 self.profiler.record_peak_memory(mem_bytes);
1369
1370 self.profiler.end_stratum();
1371 }
1372
1373 self.provider.device().synchronize()?;
1375 self.adaptive_reoptimization_stats.last_observations =
1376 self.adaptive_join_observations.clone();
1377
1378 self.provider.create_empty_buffer(Schema::new(vec![]))
1380 }
1381
1382 #[cfg(test)]
1394 fn evaluate_predicate(
1395 expr: &Expr,
1396 columns: &[Vec<u8>],
1397 row_idx: usize,
1398 schema: &Schema,
1399 ) -> Result<bool> {
1400 match expr {
1401 Expr::Column(col_idx) => {
1402 let col_type = schema.column_type(*col_idx);
1404 if let Some(ScalarType::Bool) = col_type {
1405 Ok(columns
1406 .get(*col_idx)
1407 .map(|c| c.get(row_idx).copied().unwrap_or(0) != 0)
1408 .unwrap_or(false))
1409 } else {
1410 Ok(true)
1412 }
1413 }
1414
1415 Expr::Const(ConstValue::Bool(b)) => Ok(*b),
1416 Expr::Const(_) => Ok(true), Expr::Compare { left, op, right } => {
1419 let use_float =
1420 Self::expr_may_be_float(left, schema) || Self::expr_may_be_float(right, schema);
1421
1422 if use_float {
1423 let left_val = Self::evaluate_expr_as_f64(left, columns, row_idx, schema)?;
1424 let right_val = Self::evaluate_expr_as_f64(right, columns, row_idx, schema)?;
1425
1426 Ok(match op {
1427 CompareOp::Eq => left_val == right_val,
1428 CompareOp::Ne => left_val != right_val,
1429 CompareOp::Lt => left_val < right_val,
1430 CompareOp::Le => left_val <= right_val,
1431 CompareOp::Gt => left_val > right_val,
1432 CompareOp::Ge => left_val >= right_val,
1433 })
1434 } else {
1435 let left_val = Self::evaluate_expr_as_i64(left, columns, row_idx, schema)?;
1436 let right_val = Self::evaluate_expr_as_i64(right, columns, row_idx, schema)?;
1437
1438 Ok(match op {
1439 CompareOp::Eq => left_val == right_val,
1440 CompareOp::Ne => left_val != right_val,
1441 CompareOp::Lt => left_val < right_val,
1442 CompareOp::Le => left_val <= right_val,
1443 CompareOp::Gt => left_val > right_val,
1444 CompareOp::Ge => left_val >= right_val,
1445 })
1446 }
1447 }
1448
1449 Expr::And(exprs) => {
1450 for e in exprs {
1451 if !Self::evaluate_predicate(e, columns, row_idx, schema)? {
1452 return Ok(false);
1453 }
1454 }
1455 Ok(true)
1456 }
1457
1458 Expr::Or(exprs) => {
1459 for e in exprs {
1460 if Self::evaluate_predicate(e, columns, row_idx, schema)? {
1461 return Ok(true);
1462 }
1463 }
1464 Ok(false)
1465 }
1466
1467 Expr::Not(inner) => Ok(!Self::evaluate_predicate(inner, columns, row_idx, schema)?),
1468
1469 Expr::Add(_, _)
1471 | Expr::Sub(_, _)
1472 | Expr::Mul(_, _)
1473 | Expr::Div(_, _)
1474 | Expr::Mod(_, _)
1475 | Expr::Abs(_)
1476 | Expr::Min(_, _)
1477 | Expr::Max(_, _)
1478 | Expr::Pow(_, _)
1479 | Expr::Cast(_, _)
1480 | Expr::Conditional { .. } => Err(XlogError::Execution(
1481 "Arithmetic expression cannot be evaluated as boolean predicate".into(),
1482 )),
1483 }
1484 }
1485
1486 #[cfg(test)]
1487 fn evaluate_expr_as_f64(
1488 expr: &Expr,
1489 columns: &[Vec<u8>],
1490 row_idx: usize,
1491 schema: &Schema,
1492 ) -> Result<f64> {
1493 match expr {
1494 Expr::Column(col_idx) => {
1495 let col_type = schema.column_type(*col_idx).unwrap_or(ScalarType::U32);
1496 let col_data = columns
1497 .get(*col_idx)
1498 .ok_or_else(|| XlogError::Execution(format!("Column {} not found", col_idx)))?;
1499
1500 let type_size = col_type.size_bytes();
1501 let start = row_idx * type_size;
1502
1503 Ok(match col_type {
1504 ScalarType::F64 => {
1505 let bytes = &col_data[start..start + 8];
1506 f64::from_le_bytes([
1507 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1508 bytes[7],
1509 ])
1510 }
1511 ScalarType::F32 => {
1512 let bytes = &col_data[start..start + 4];
1513 f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64
1514 }
1515 ScalarType::U32 | ScalarType::Symbol => {
1516 let bytes = &col_data[start..start + 4];
1517 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64
1518 }
1519 ScalarType::I32 => {
1520 let bytes = &col_data[start..start + 4];
1521 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as f64
1522 }
1523 ScalarType::U64 => {
1524 let bytes = &col_data[start..start + 8];
1525 u64::from_le_bytes([
1526 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1527 bytes[7],
1528 ]) as f64
1529 }
1530 ScalarType::I64 => {
1531 let bytes = &col_data[start..start + 8];
1532 i64::from_le_bytes([
1533 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1534 bytes[7],
1535 ]) as f64
1536 }
1537 ScalarType::Bool => col_data.get(start).copied().unwrap_or(0) as f64,
1538 })
1539 }
1540
1541 Expr::Const(val) => Ok(match val {
1542 ConstValue::U32(v) => *v as f64,
1543 ConstValue::I32(v) => *v as f64,
1544 ConstValue::U64(v) => *v as f64,
1545 ConstValue::I64(v) => *v as f64,
1546 ConstValue::Bool(b) => {
1547 if *b {
1548 1.0
1549 } else {
1550 0.0
1551 }
1552 }
1553 ConstValue::F32(f) => *f as f64,
1554 ConstValue::F64(f) => *f,
1555 ConstValue::Symbol(_) => {
1556 return Err(XlogError::Execution(
1557 "Cannot evaluate Symbol constant as f64".to_string(),
1558 ));
1559 }
1560 }),
1561
1562 Expr::Add(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1563 + Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?),
1564 Expr::Sub(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1565 - Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?),
1566 Expr::Mul(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1567 * Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?),
1568 Expr::Div(l, r) => {
1569 let left_val = Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?;
1570 let right_val = Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?;
1571 if right_val == 0.0 {
1572 return Err(XlogError::Execution("Division by zero".to_string()));
1573 }
1574 Ok(left_val / right_val)
1575 }
1576 Expr::Mod(l, r) => {
1577 let left_val = Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?;
1578 let right_val = Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?;
1579 if right_val == 0.0 {
1580 return Err(XlogError::Execution("Modulo by zero".to_string()));
1581 }
1582 Ok(left_val % right_val)
1583 }
1584 Expr::Abs(inner) => {
1585 Ok(Self::evaluate_expr_as_f64(inner, columns, row_idx, schema)?.abs())
1586 }
1587 Expr::Min(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1588 .min(Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?)),
1589 Expr::Max(l, r) => Ok(Self::evaluate_expr_as_f64(l, columns, row_idx, schema)?
1590 .max(Self::evaluate_expr_as_f64(r, columns, row_idx, schema)?)),
1591 Expr::Pow(base, exp) => Ok(Self::evaluate_expr_as_f64(base, columns, row_idx, schema)?
1592 .powf(Self::evaluate_expr_as_f64(exp, columns, row_idx, schema)?)),
1593 Expr::Cast(inner, target_type) => match target_type {
1594 ScalarType::F64 => Self::evaluate_expr_as_f64(inner, columns, row_idx, schema),
1595 ScalarType::F32 => {
1596 Ok(Self::evaluate_expr_as_f64(inner, columns, row_idx, schema)? as f32 as f64)
1597 }
1598 _ => Ok(Self::evaluate_expr_as_i64(inner, columns, row_idx, schema)? as f64),
1599 },
1600
1601 _ => Err(XlogError::Execution(
1602 "Cannot evaluate compound expression as f64".to_string(),
1603 )),
1604 }
1605 }
1606
1607 #[cfg(test)]
1609 fn evaluate_expr_as_i64(
1610 expr: &Expr,
1611 columns: &[Vec<u8>],
1612 row_idx: usize,
1613 schema: &Schema,
1614 ) -> Result<i64> {
1615 match expr {
1616 Expr::Column(col_idx) => {
1617 let col_type = schema.column_type(*col_idx).unwrap_or(ScalarType::U32);
1618 let col_data = columns
1619 .get(*col_idx)
1620 .ok_or_else(|| XlogError::Execution(format!("Column {} not found", col_idx)))?;
1621
1622 let type_size = col_type.size_bytes();
1623 let start = row_idx * type_size;
1624
1625 Ok(match col_type {
1626 ScalarType::U32 => {
1627 let bytes = &col_data[start..start + 4];
1628 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64
1629 }
1630 ScalarType::I32 => {
1631 let bytes = &col_data[start..start + 4];
1632 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64
1633 }
1634 ScalarType::U64 => {
1635 let bytes = &col_data[start..start + 8];
1636 u64::from_le_bytes([
1637 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1638 bytes[7],
1639 ]) as i64
1640 }
1641 ScalarType::I64 => {
1642 let bytes = &col_data[start..start + 8];
1643 i64::from_le_bytes([
1644 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1645 bytes[7],
1646 ])
1647 }
1648 ScalarType::Bool => col_data.get(start).copied().unwrap_or(0) as i64,
1649 ScalarType::Symbol => {
1650 let bytes = &col_data[start..start + 4];
1651 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as i64
1652 }
1653 ScalarType::F32 => {
1654 let bytes = &col_data[start..start + 4];
1655 let val = f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
1656 val as i64
1657 }
1658 ScalarType::F64 => {
1659 let bytes = &col_data[start..start + 8];
1660 let val = f64::from_le_bytes([
1661 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
1662 bytes[7],
1663 ]);
1664 val as i64
1665 }
1666 })
1667 }
1668
1669 Expr::Const(val) => Ok(match val {
1670 ConstValue::U32(v) => *v as i64,
1671 ConstValue::I32(v) => *v as i64,
1672 ConstValue::U64(v) => *v as i64,
1673 ConstValue::I64(v) => *v,
1674 ConstValue::Bool(b) => *b as i64,
1675 ConstValue::F32(f) => *f as i64,
1676 ConstValue::F64(f) => *f as i64,
1677 ConstValue::Symbol(_) => 0,
1678 }),
1679
1680 Expr::Add(l, r) => {
1682 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1683 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1684 Ok(left_val.wrapping_add(right_val))
1685 }
1686 Expr::Sub(l, r) => {
1687 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1688 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1689 Ok(left_val.wrapping_sub(right_val))
1690 }
1691 Expr::Mul(l, r) => {
1692 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1693 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1694 Ok(left_val.wrapping_mul(right_val))
1695 }
1696 Expr::Div(l, r) => {
1697 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1698 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1699 if right_val == 0 {
1700 return Err(XlogError::Execution("Division by zero".to_string()));
1701 }
1702 Ok(left_val / right_val)
1703 }
1704 Expr::Mod(l, r) => {
1705 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1706 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1707 if right_val == 0 {
1708 return Err(XlogError::Execution("Modulo by zero".to_string()));
1709 }
1710 Ok(left_val % right_val)
1711 }
1712 Expr::Abs(inner) => {
1713 let val = Self::evaluate_expr_as_i64(inner, columns, row_idx, schema)?;
1714 Ok(val.abs())
1715 }
1716 Expr::Min(l, r) => {
1717 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1718 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1719 Ok(left_val.min(right_val))
1720 }
1721 Expr::Max(l, r) => {
1722 let left_val = Self::evaluate_expr_as_i64(l, columns, row_idx, schema)?;
1723 let right_val = Self::evaluate_expr_as_i64(r, columns, row_idx, schema)?;
1724 Ok(left_val.max(right_val))
1725 }
1726 Expr::Pow(base, exp) => {
1727 let base_val = Self::evaluate_expr_as_i64(base, columns, row_idx, schema)?;
1728 let exp_val = Self::evaluate_expr_as_i64(exp, columns, row_idx, schema)?;
1729 if exp_val < 0 {
1730 Err(XlogError::Execution(
1731 "Negative exponent in integer pow".to_string(),
1732 ))
1733 } else if exp_val > u32::MAX as i64 {
1734 Ok(i64::MAX)
1736 } else {
1737 Ok(base_val.pow(exp_val as u32))
1738 }
1739 }
1740 Expr::Cast(inner, _target_type) => {
1741 Self::evaluate_expr_as_i64(inner, columns, row_idx, schema)
1743 }
1744
1745 _ => Err(XlogError::Execution(
1746 "Cannot evaluate compound expression as value".to_string(),
1747 )),
1748 }
1749 }
1750
1751 fn get_or_create_rel_name(&mut self, rel_id: RelId, default: &str) -> String {
1753 if let Some(name) = self.rel_names.get(&rel_id) {
1754 name.clone()
1755 } else {
1756 self.register_relation(rel_id, default);
1757 default.to_string()
1758 }
1759 }
1760
1761 fn create_empty_buffer(&self, schema: Schema) -> Result<CudaBuffer> {
1765 self.provider.create_empty_buffer(schema)
1766 }
1767
1768 fn clone_buffer(&self, buffer: &CudaBuffer) -> Result<CudaBuffer> {
1770 if buffer.is_empty() {
1771 return self.create_empty_buffer(buffer.schema().clone());
1772 }
1773
1774 let mut result_columns = Vec::with_capacity(buffer.arity());
1775
1776 for col_idx in 0..buffer.arity() {
1777 let col_type_size = buffer
1778 .schema()
1779 .column_type(col_idx)
1780 .map(|t| t.size_bytes())
1781 .unwrap_or(4);
1782 let bytes = (buffer.num_rows() as usize) * col_type_size;
1783
1784 if let Some(src_col) = buffer.column(col_idx) {
1785 let mut dst_col = self.provider.memory().alloc::<u8>(bytes)?;
1786 if bytes > 0 {
1787 self.provider
1788 .device()
1789 .inner()
1790 .dtod_copy(src_col, &mut dst_col)
1791 .map_err(|e| {
1792 XlogError::Execution(format!("Failed to clone column on device: {}", e))
1793 })?;
1794 }
1795 result_columns.push(dst_col.into());
1796 }
1797 }
1798
1799 let d_num_rows = self.clone_device_row_count(buffer)?;
1800 Ok(CudaBuffer::from_columns(
1801 result_columns,
1802 buffer.num_rows(),
1803 d_num_rows,
1804 buffer.schema().clone(),
1805 ))
1806 }
1807
1808 fn clone_device_row_count(&self, buffer: &CudaBuffer) -> Result<TrackedCudaSlice<u32>> {
1809 let mut d_num_rows = self.provider.memory().alloc::<u32>(1)?;
1810 self.provider
1811 .device()
1812 .inner()
1813 .dtod_copy(buffer.num_rows_device(), &mut d_num_rows)
1814 .map_err(|e| XlogError::Execution(format!("Failed to copy row count: {}", e)))?;
1815 Ok(d_num_rows)
1816 }
1817
1818 fn buffer_row_count(&self, buffer: &CudaBuffer) -> Result<u32> {
1819 if let Some(n) = buffer.cached_row_count() {
1820 return Ok(n);
1821 }
1822 let n = self
1830 .provider
1831 .dtoh_scalar_untracked::<u32>(buffer.num_rows_device(), 0)
1832 .map_err(|e| XlogError::Execution(format!("Failed to read row count: {}", e)))?;
1833 buffer.set_cached_row_count_if_unset(n);
1834 Ok(n)
1835 }
1836}
1837
1838struct D2hGateGuard {
1842 provider: Arc<CudaKernelProvider>,
1843 engaged: bool,
1844 previous: bool,
1845}
1846
1847impl Drop for D2hGateGuard {
1848 fn drop(&mut self) {
1849 if !self.engaged {
1850 return;
1851 }
1852 if self.previous {
1853 self.provider.enable_strict_deterministic_d2h();
1854 } else {
1855 self.provider.disable_strict_deterministic_d2h();
1856 }
1857 }
1858}
1859
1860#[cfg(test)]
1861mod tests {
1862 use super::*;
1863 use std::time::{Duration, Instant};
1864 use xlog_core::MemoryBudget;
1865 use xlog_cuda::{CudaDevice, GpuMemoryManager};
1866 use xlog_ir::{CompiledRule, RirMeta, Scc};
1867
1868 fn has_cuda_device() -> bool {
1869 CudaDevice::new(0).is_ok()
1871 }
1872
1873 fn create_test_executor() -> Option<Executor> {
1874 if !has_cuda_device() {
1875 return None;
1876 }
1877 let device = Arc::new(CudaDevice::new(0).ok()?);
1878 let budget = MemoryBudget::with_limit(1024 * 1024 * 1024); let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
1880 let provider = Arc::new(CudaKernelProvider::new(device, memory).ok()?);
1881 Some(Executor::new(provider))
1882 }
1883
1884 fn create_test_executor_with_config(config: RuntimeConfig) -> Option<Executor> {
1885 if !has_cuda_device() {
1886 return None;
1887 }
1888 let device = Arc::new(CudaDevice::new(0).ok()?);
1889 let budget = MemoryBudget::with_limit(1024 * 1024 * 1024); let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
1891 let provider = Arc::new(CudaKernelProvider::new(device, memory).ok()?);
1892 Some(Executor::new_with_config(provider, config))
1893 }
1894
1895 fn device_row_count(executor: &Executor, rows: u64) -> TrackedCudaSlice<u32> {
1896 let rows_u32 = u32::try_from(rows).expect("row count fits u32");
1897 let mut d_num_rows = executor.provider.memory().alloc::<u32>(1).expect("alloc");
1898 executor
1899 .provider
1900 .device()
1901 .inner()
1902 .htod_sync_copy_into(&[rows_u32], &mut d_num_rows)
1903 .expect("htod");
1904 d_num_rows
1905 }
1906
1907 fn create_test_buffer(executor: &Executor, data: &[u32], col_name: &str) -> CudaBuffer {
1908 let schema = Schema::new(vec![(col_name.to_string(), ScalarType::U32)]);
1909 let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
1910
1911 let mut col = executor
1912 .provider
1913 .memory()
1914 .alloc::<u8>(bytes.len())
1915 .expect("alloc");
1916 executor
1917 .provider
1918 .device()
1919 .inner()
1920 .htod_sync_copy_into(&bytes, &mut col)
1921 .expect("htod");
1922
1923 let rows = data.len() as u64;
1924 let d_num_rows = device_row_count(executor, rows);
1925 CudaBuffer::from_columns(vec![col.into()], rows, d_num_rows, schema)
1926 }
1927
1928 fn read_buffer_u32(executor: &Executor, buffer: &CudaBuffer, col: usize) -> Vec<u32> {
1929 executor
1930 .provider
1931 .download_column::<u32>(buffer, col)
1932 .unwrap_or_default()
1933 }
1934
1935 fn buffer_row_count(executor: &Executor, buffer: &CudaBuffer) -> u32 {
1936 if let Some(n) = buffer.cached_row_count() {
1937 return n;
1938 }
1939 let mut host_rows = [0u32];
1940 executor
1941 .provider
1942 .device()
1943 .inner()
1944 .dtoh_sync_copy_into(buffer.num_rows_device(), &mut host_rows)
1945 .expect("dtoh row count");
1946 buffer.set_cached_row_count_if_unset(host_rows[0]);
1947 host_rows[0]
1948 }
1949
1950 fn to_f64_column_bytes(values: &[f64]) -> Vec<u8> {
1951 values.iter().flat_map(|v| v.to_le_bytes()).collect()
1952 }
1953
1954 fn to_f32_column_bytes(values: &[f32]) -> Vec<u8> {
1955 values.iter().flat_map(|v| v.to_le_bytes()).collect()
1956 }
1957
1958 #[test]
1961 fn test_executor_creation() {
1962 let executor = match create_test_executor() {
1963 Some(e) => e,
1964 None => {
1965 eprintln!("Skipping test: no CUDA device available");
1966 return;
1967 }
1968 };
1969
1970 assert!(executor.store().is_empty());
1971 }
1972
1973 #[test]
1974 fn test_predicate_f64_comparisons() {
1975 let schema = Schema::new(vec![("x".to_string(), ScalarType::F64)]);
1976 let values = [1.0f64, 2.0, 3.0, f64::NAN];
1977 let columns = vec![to_f64_column_bytes(&values)];
1978
1979 let gt_two = Expr::Compare {
1980 left: Box::new(Expr::Column(0)),
1981 op: CompareOp::Gt,
1982 right: Box::new(Expr::Const(ConstValue::F64(2.0))),
1983 };
1984
1985 let results: Vec<bool> = (0..values.len())
1986 .map(|row| Executor::evaluate_predicate(>_two, &columns, row, &schema).unwrap())
1987 .collect();
1988 assert_eq!(results, vec![false, false, true, false]);
1989
1990 let eq_nan = Expr::Compare {
1991 left: Box::new(Expr::Column(0)),
1992 op: CompareOp::Eq,
1993 right: Box::new(Expr::Const(ConstValue::F64(f64::NAN))),
1994 };
1995 let results: Vec<bool> = (0..values.len())
1996 .map(|row| Executor::evaluate_predicate(&eq_nan, &columns, row, &schema).unwrap())
1997 .collect();
1998 assert_eq!(results, vec![false, false, false, false]);
1999
2000 let ne_nan = Expr::Compare {
2001 left: Box::new(Expr::Column(0)),
2002 op: CompareOp::Ne,
2003 right: Box::new(Expr::Const(ConstValue::F64(f64::NAN))),
2004 };
2005 let results: Vec<bool> = (0..values.len())
2006 .map(|row| Executor::evaluate_predicate(&ne_nan, &columns, row, &schema).unwrap())
2007 .collect();
2008 assert_eq!(results, vec![true, true, true, true]);
2009 }
2010
2011 #[test]
2012 fn test_predicate_f32_comparisons() {
2013 let schema = Schema::new(vec![("x".to_string(), ScalarType::F32)]);
2014 let values = [1.0f32, 2.0, 3.0, f32::NAN];
2015 let columns = vec![to_f32_column_bytes(&values)];
2016
2017 let le_two = Expr::Compare {
2018 left: Box::new(Expr::Column(0)),
2019 op: CompareOp::Le,
2020 right: Box::new(Expr::Const(ConstValue::F32(2.0))),
2021 };
2022
2023 let results: Vec<bool> = (0..values.len())
2024 .map(|row| Executor::evaluate_predicate(&le_two, &columns, row, &schema).unwrap())
2025 .collect();
2026 assert_eq!(results, vec![true, true, false, false]);
2027 }
2028
2029 #[test]
2030 fn test_predicate_mixed_float_int_comparisons() {
2031 let schema = Schema::new(vec![
2032 ("x".to_string(), ScalarType::F64),
2033 ("y".to_string(), ScalarType::U32),
2034 ]);
2035
2036 let x = [1.5f64, 2.0, 2.5];
2037 let y = [1u32, 2, 3];
2038 let columns = vec![
2039 to_f64_column_bytes(&x),
2040 y.iter().flat_map(|v| v.to_le_bytes()).collect(),
2041 ];
2042
2043 let x_gt_2 = Expr::Compare {
2044 left: Box::new(Expr::Column(0)),
2045 op: CompareOp::Gt,
2046 right: Box::new(Expr::Const(ConstValue::U32(2))),
2047 };
2048 let results: Vec<bool> = (0..x.len())
2049 .map(|row| Executor::evaluate_predicate(&x_gt_2, &columns, row, &schema).unwrap())
2050 .collect();
2051 assert_eq!(results, vec![false, false, true]);
2052
2053 let y_lt_2_5 = Expr::Compare {
2054 left: Box::new(Expr::Column(1)),
2055 op: CompareOp::Lt,
2056 right: Box::new(Expr::Const(ConstValue::F64(2.5))),
2057 };
2058 let results: Vec<bool> = (0..y.len())
2059 .map(|row| Executor::evaluate_predicate(&y_lt_2_5, &columns, row, &schema).unwrap())
2060 .collect();
2061 assert_eq!(results, vec![true, true, false]);
2062 }
2063
2064 #[test]
2065 fn test_register_and_get_relation() {
2066 let mut executor = match create_test_executor() {
2067 Some(e) => e,
2068 None => {
2069 eprintln!("Skipping test: no CUDA device available");
2070 return;
2071 }
2072 };
2073
2074 executor.register_relation(RelId(1), "test_rel");
2076
2077 assert_eq!(executor.get_rel_name(RelId(1)), Some("test_rel"));
2079 assert_eq!(executor.get_rel_name(RelId(2)), None);
2080 }
2081
2082 #[test]
2085 fn test_execute_scan_not_found() {
2086 let mut executor = match create_test_executor() {
2087 Some(e) => e,
2088 None => {
2089 eprintln!("Skipping test: no CUDA device available");
2090 return;
2091 }
2092 };
2093
2094 executor.register_relation(RelId(1), "missing_rel");
2095
2096 let node = RirNode::Scan { rel: RelId(1) };
2097 let result = executor.execute_node(&node);
2098
2099 assert!(result.is_err());
2100 }
2101
2102 #[test]
2103 fn test_execute_scan_success() {
2104 let mut executor = match create_test_executor() {
2105 Some(e) => e,
2106 None => {
2107 eprintln!("Skipping test: no CUDA device available");
2108 return;
2109 }
2110 };
2111
2112 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2114 executor.store_mut().put("test_rel", buffer);
2115 executor.register_relation(RelId(1), "test_rel");
2116
2117 let node = RirNode::Scan { rel: RelId(1) };
2119 let result = executor.execute_node(&node);
2120
2121 assert!(result.is_ok());
2122 let result = result.unwrap();
2123 assert_eq!(buffer_row_count(&executor, &result), 5);
2124
2125 let values = read_buffer_u32(&executor, &result, 0);
2126 assert_eq!(values, vec![1, 2, 3, 4, 5]);
2127 }
2128
2129 #[test]
2132 fn test_execute_filter_empty_input() {
2133 let executor = match create_test_executor() {
2134 Some(e) => e,
2135 None => {
2136 eprintln!("Skipping test: no CUDA device available");
2137 return;
2138 }
2139 };
2140
2141 let schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2142 let empty = executor.create_empty_buffer(schema).unwrap();
2143
2144 let predicate = Expr::Const(ConstValue::Bool(true));
2145 let result = executor.execute_filter(&empty, &predicate);
2146
2147 assert!(result.is_ok());
2148 let result = result.unwrap();
2149 assert_eq!(buffer_row_count(&executor, &result), 0);
2150 }
2151
2152 #[test]
2153 fn test_execute_filter_all_match() {
2154 let executor = match create_test_executor() {
2155 Some(e) => e,
2156 None => {
2157 eprintln!("Skipping test: no CUDA device available");
2158 return;
2159 }
2160 };
2161
2162 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2163 let predicate = Expr::Const(ConstValue::Bool(true));
2164
2165 let result = executor.execute_filter(&buffer, &predicate);
2166 assert!(result.is_ok());
2167
2168 let result = result.unwrap();
2169 assert_eq!(buffer_row_count(&executor, &result), 5);
2170 }
2171
2172 #[test]
2173 fn test_execute_filter_none_match() {
2174 let executor = match create_test_executor() {
2175 Some(e) => e,
2176 None => {
2177 eprintln!("Skipping test: no CUDA device available");
2178 return;
2179 }
2180 };
2181
2182 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2183 let predicate = Expr::Const(ConstValue::Bool(false));
2184
2185 let result = executor.execute_filter(&buffer, &predicate);
2186 assert!(result.is_ok());
2187 let result = result.unwrap();
2188 assert_eq!(buffer_row_count(&executor, &result), 0);
2189 }
2190
2191 #[test]
2192 fn test_execute_filter_comparison() {
2193 let executor = match create_test_executor() {
2194 Some(e) => e,
2195 None => {
2196 eprintln!("Skipping test: no CUDA device available");
2197 return;
2198 }
2199 };
2200
2201 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2202
2203 let predicate = Expr::Compare {
2205 left: Box::new(Expr::Column(0)),
2206 op: CompareOp::Gt,
2207 right: Box::new(Expr::Const(ConstValue::U32(3))),
2208 };
2209
2210 let result = executor.execute_filter(&buffer, &predicate);
2211 assert!(result.is_ok());
2212
2213 let result = result.unwrap();
2214 assert_eq!(buffer_row_count(&executor, &result), 2);
2215
2216 let values = read_buffer_u32(&executor, &result, 0);
2217 assert_eq!(values, vec![4, 5]);
2218 }
2219
2220 #[test]
2221 fn test_execute_filter_and() {
2222 let executor = match create_test_executor() {
2223 Some(e) => e,
2224 None => {
2225 eprintln!("Skipping test: no CUDA device available");
2226 return;
2227 }
2228 };
2229
2230 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2231
2232 let predicate = Expr::And(vec![
2234 Expr::Compare {
2235 left: Box::new(Expr::Column(0)),
2236 op: CompareOp::Ge,
2237 right: Box::new(Expr::Const(ConstValue::U32(2))),
2238 },
2239 Expr::Compare {
2240 left: Box::new(Expr::Column(0)),
2241 op: CompareOp::Le,
2242 right: Box::new(Expr::Const(ConstValue::U32(4))),
2243 },
2244 ]);
2245
2246 let result = executor.execute_filter(&buffer, &predicate);
2247 assert!(result.is_ok());
2248
2249 let result = result.unwrap();
2250 assert_eq!(buffer_row_count(&executor, &result), 3);
2251
2252 let values = read_buffer_u32(&executor, &result, 0);
2253 assert_eq!(values, vec![2, 3, 4]);
2254 }
2255
2256 #[test]
2259 fn test_execute_project_empty_input() {
2260 let executor = match create_test_executor() {
2261 Some(e) => e,
2262 None => {
2263 eprintln!("Skipping test: no CUDA device available");
2264 return;
2265 }
2266 };
2267
2268 let schema = Schema::new(vec![
2269 ("a".to_string(), ScalarType::U32),
2270 ("b".to_string(), ScalarType::U32),
2271 ]);
2272 let empty = executor.create_empty_buffer(schema).unwrap();
2273
2274 let result = executor.execute_project(&empty, &[ProjectExpr::Column(0)]);
2275 assert!(result.is_ok());
2276
2277 let result = result.unwrap();
2278 assert_eq!(buffer_row_count(&executor, &result), 0);
2279 assert_eq!(result.arity(), 1);
2280 }
2281
2282 #[test]
2283 fn test_execute_project_reorder() {
2284 let executor = match create_test_executor() {
2285 Some(e) => e,
2286 None => {
2287 eprintln!("Skipping test: no CUDA device available");
2288 return;
2289 }
2290 };
2291
2292 let schema = Schema::new(vec![
2294 ("a".to_string(), ScalarType::U32),
2295 ("b".to_string(), ScalarType::U32),
2296 ]);
2297
2298 let a_data: Vec<u8> = [1u32, 2, 3].iter().flat_map(|v| v.to_le_bytes()).collect();
2299 let b_data: Vec<u8> = [10u32, 20, 30]
2300 .iter()
2301 .flat_map(|v| v.to_le_bytes())
2302 .collect();
2303
2304 let mut col_a = executor
2305 .provider
2306 .memory()
2307 .alloc::<u8>(a_data.len())
2308 .unwrap();
2309 let mut col_b = executor
2310 .provider
2311 .memory()
2312 .alloc::<u8>(b_data.len())
2313 .unwrap();
2314
2315 executor
2316 .provider
2317 .device()
2318 .inner()
2319 .htod_sync_copy_into(&a_data, &mut col_a)
2320 .unwrap();
2321 executor
2322 .provider
2323 .device()
2324 .inner()
2325 .htod_sync_copy_into(&b_data, &mut col_b)
2326 .unwrap();
2327
2328 let d_num_rows = device_row_count(&executor, 3);
2329 let buffer =
2330 CudaBuffer::from_columns(vec![col_a.into(), col_b.into()], 3, d_num_rows, schema);
2331
2332 let result =
2334 executor.execute_project(&buffer, &[ProjectExpr::Column(1), ProjectExpr::Column(0)]);
2335 assert!(result.is_ok());
2336
2337 let result = result.unwrap();
2338 assert_eq!(buffer_row_count(&executor, &result), 3);
2339 assert_eq!(result.arity(), 2);
2340
2341 let col0 = read_buffer_u32(&executor, &result, 0);
2343 assert_eq!(col0, vec![10, 20, 30]);
2344
2345 let col1 = read_buffer_u32(&executor, &result, 1);
2347 assert_eq!(col1, vec![1, 2, 3]);
2348 }
2349
2350 #[test]
2351 fn test_execute_computed_projection_wiring() {
2352 let executor = match create_test_executor() {
2355 Some(e) => e,
2356 None => {
2357 eprintln!("Skipping test: no CUDA device available");
2358 return;
2359 }
2360 };
2361
2362 let schema = Schema::new(vec![
2364 ("a".to_string(), ScalarType::U32),
2365 ("b".to_string(), ScalarType::U32),
2366 ]);
2367
2368 let a_data: Vec<u8> = [10u32, 20, 30]
2369 .iter()
2370 .flat_map(|v| v.to_le_bytes())
2371 .collect();
2372 let b_data: Vec<u8> = [1u32, 2, 3].iter().flat_map(|v| v.to_le_bytes()).collect();
2373
2374 let mut col_a = executor
2375 .provider
2376 .memory()
2377 .alloc::<u8>(a_data.len())
2378 .unwrap();
2379 let mut col_b = executor
2380 .provider
2381 .memory()
2382 .alloc::<u8>(b_data.len())
2383 .unwrap();
2384
2385 executor
2386 .provider
2387 .device()
2388 .inner()
2389 .htod_sync_copy_into(&a_data, &mut col_a)
2390 .unwrap();
2391 executor
2392 .provider
2393 .device()
2394 .inner()
2395 .htod_sync_copy_into(&b_data, &mut col_b)
2396 .unwrap();
2397
2398 let d_num_rows = device_row_count(&executor, 3);
2399 let buffer =
2400 CudaBuffer::from_columns(vec![col_a.into(), col_b.into()], 3, d_num_rows, schema);
2401
2402 let add_expr = Expr::Add(Box::new(Expr::Column(0)), Box::new(Expr::Column(1)));
2404 let projections = vec![
2405 ProjectExpr::Column(0), ProjectExpr::Computed(add_expr, ScalarType::U32), ];
2408
2409 let result = executor.execute_project(&buffer, &projections);
2410
2411 match result {
2415 Ok(res) => {
2416 assert_eq!(buffer_row_count(&executor, &res), 3);
2418 assert_eq!(res.arity(), 2);
2419
2420 let col0 = read_buffer_u32(&executor, &res, 0);
2422 assert_eq!(col0, vec![10, 20, 30]);
2423
2424 let col1 = read_buffer_u32(&executor, &res, 1);
2426 assert_eq!(col1, vec![11, 22, 33]);
2427 }
2428 Err(e) => {
2429 let err_msg = format!("{}", e);
2432 assert!(
2433 err_msg.contains("not implemented")
2434 || err_msg.contains("not yet implemented")
2435 || err_msg.contains("not supported")
2436 || err_msg.contains("stub")
2437 || err_msg.contains("Unsupported")
2438 || err_msg.contains("arithmetic kernels"),
2439 "Unexpected error: {}. Expected arithmetic kernel stub error.",
2440 err_msg
2441 );
2442 }
2443 }
2444 }
2445
2446 #[test]
2449 fn test_execute_union_empty_inputs() {
2450 let executor = match create_test_executor() {
2451 Some(e) => e,
2452 None => {
2453 eprintln!("Skipping test: no CUDA device available");
2454 return;
2455 }
2456 };
2457
2458 let result = executor.execute_union(&[]);
2459 assert!(result.is_ok());
2460 let result = result.unwrap();
2461 assert_eq!(buffer_row_count(&executor, &result), 0);
2462 }
2463
2464 #[test]
2465 fn test_execute_union_single_input() {
2466 let executor = match create_test_executor() {
2467 Some(e) => e,
2468 None => {
2469 eprintln!("Skipping test: no CUDA device available");
2470 return;
2471 }
2472 };
2473
2474 let buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2475
2476 let result = executor.execute_union(&[buffer]);
2477 assert!(result.is_ok());
2478
2479 let result = result.unwrap();
2480 assert_eq!(buffer_row_count(&executor, &result), 3);
2481
2482 let values = read_buffer_u32(&executor, &result, 0);
2483 assert_eq!(values, vec![1, 2, 3]);
2484 }
2485
2486 #[test]
2487 fn test_execute_union_multiple_inputs() {
2488 let executor = match create_test_executor() {
2489 Some(e) => e,
2490 None => {
2491 eprintln!("Skipping test: no CUDA device available");
2492 return;
2493 }
2494 };
2495
2496 let buffer1 = create_test_buffer(&executor, &[1, 2], "key");
2497 let buffer2 = create_test_buffer(&executor, &[3, 4], "key");
2498 let buffer3 = create_test_buffer(&executor, &[5], "key");
2499
2500 let result = executor.execute_union(&[buffer1, buffer2, buffer3]);
2501 assert!(result.is_ok());
2502
2503 let result = result.unwrap();
2504 assert_eq!(buffer_row_count(&executor, &result), 5);
2505 }
2506
2507 #[test]
2510 fn test_execute_distinct_empty() {
2511 let executor = match create_test_executor() {
2512 Some(e) => e,
2513 None => {
2514 eprintln!("Skipping test: no CUDA device available");
2515 return;
2516 }
2517 };
2518
2519 let schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2520 let empty = executor.create_empty_buffer(schema).unwrap();
2521
2522 let result = executor.execute_distinct(&empty, &[0]);
2523 assert!(result.is_ok());
2524 let result = result.unwrap();
2525 assert_eq!(buffer_row_count(&executor, &result), 0);
2526 }
2527
2528 #[test]
2531 fn test_execute_diff() {
2532 let executor = match create_test_executor() {
2533 Some(e) => e,
2534 None => {
2535 eprintln!("Skipping test: no CUDA device available");
2536 return;
2537 }
2538 };
2539
2540 let left = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2541 let right = create_test_buffer(&executor, &[2, 4], "key");
2542
2543 let result = executor.execute_diff(&left, &right);
2544 assert!(result.is_ok());
2545
2546 let result = result.unwrap();
2547 assert_eq!(buffer_row_count(&executor, &result), 3);
2548
2549 let values = read_buffer_u32(&executor, &result, 0);
2550 assert_eq!(values, vec![1, 3, 5]);
2551 }
2552
2553 #[test]
2556 fn test_execute_fixpoint_base_only() {
2557 let mut executor = match create_test_executor() {
2560 Some(e) => e,
2561 None => {
2562 eprintln!("Skipping test: no CUDA device available");
2563 return;
2564 }
2565 };
2566
2567 let buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2569 executor.store_mut().put("base_rel", buffer);
2570 executor.register_relation(RelId(1), "base_rel");
2571
2572 let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2574 let empty_buffer = executor.create_empty_buffer(empty_schema).unwrap();
2575 executor.store_mut().put("empty_rel", empty_buffer);
2576 executor.register_relation(RelId(4), "empty_rel");
2577
2578 let base = Box::new(RirNode::Scan { rel: RelId(1) });
2581 let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2582
2583 let node = RirNode::Fixpoint {
2584 scc_id: 0,
2585 base,
2586 recursive,
2587 delta_rel: RelId(2),
2588 full_rel: RelId(3),
2589 };
2590
2591 let result = executor.execute_node(&node);
2592 assert!(result.is_ok());
2593
2594 let result = result.unwrap();
2596 assert_eq!(buffer_row_count(&executor, &result), 3);
2597 let values = read_buffer_u32(&executor, &result, 0);
2598 assert_eq!(values, vec![1, 2, 3]);
2599 }
2600
2601 #[test]
2602 fn test_execute_fixpoint_empty_base() {
2603 let mut executor = match create_test_executor() {
2605 Some(e) => e,
2606 None => {
2607 eprintln!("Skipping test: no CUDA device available");
2608 return;
2609 }
2610 };
2611
2612 let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2614 let empty_buffer = executor.create_empty_buffer(empty_schema.clone()).unwrap();
2615 executor.store_mut().put("empty_base", empty_buffer);
2616 executor.register_relation(RelId(1), "empty_base");
2617
2618 let rec_buffer = create_test_buffer(&executor, &[4, 5, 6], "key");
2620 executor.store_mut().put("rec_rel", rec_buffer);
2621 executor.register_relation(RelId(4), "rec_rel");
2622
2623 let base = Box::new(RirNode::Scan { rel: RelId(1) });
2624 let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2625
2626 let node = RirNode::Fixpoint {
2627 scc_id: 0,
2628 base,
2629 recursive,
2630 delta_rel: RelId(2),
2631 full_rel: RelId(3),
2632 };
2633
2634 let result = executor.execute_node(&node);
2635 assert!(result.is_ok());
2636
2637 let result = result.unwrap();
2639 assert_eq!(buffer_row_count(&executor, &result), 0);
2640 }
2641
2642 #[test]
2643 fn test_execute_fixpoint_one_iteration() {
2644 let mut executor = match create_test_executor() {
2646 Some(e) => e,
2647 None => {
2648 eprintln!("Skipping test: no CUDA device available");
2649 return;
2650 }
2651 };
2652
2653 let base_buffer = create_test_buffer(&executor, &[1, 2], "key");
2655 executor.store_mut().put("base_rel", base_buffer);
2656 executor.register_relation(RelId(1), "base_rel");
2657
2658 let rec_buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2660 executor.store_mut().put("rec_rel", rec_buffer);
2661 executor.register_relation(RelId(4), "rec_rel");
2662
2663 let base = Box::new(RirNode::Scan { rel: RelId(1) });
2667 let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2668
2669 let node = RirNode::Fixpoint {
2670 scc_id: 0,
2671 base,
2672 recursive,
2673 delta_rel: RelId(2),
2674 full_rel: RelId(3),
2675 };
2676
2677 let result = executor.execute_node(&node);
2678 assert!(result.is_ok());
2679
2680 let result = result.unwrap();
2681 assert_eq!(buffer_row_count(&executor, &result), 3);
2683 }
2684
2685 #[test]
2686 fn test_execute_fixpoint_multiple_iterations() {
2687 let mut executor = match create_test_executor() {
2690 Some(e) => e,
2691 None => {
2692 eprintln!("Skipping test: no CUDA device available");
2693 return;
2694 }
2695 };
2696
2697 let base_buffer = create_test_buffer(&executor, &[1], "key");
2699 executor.store_mut().put("base_rel", base_buffer);
2700 executor.register_relation(RelId(1), "base_rel");
2701
2702 let rec_buffer = create_test_buffer(&executor, &[1, 2], "key");
2714 executor.store_mut().put("rec_rel", rec_buffer);
2715 executor.register_relation(RelId(4), "rec_rel");
2716
2717 let base = Box::new(RirNode::Scan { rel: RelId(1) });
2718 let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2719
2720 let node = RirNode::Fixpoint {
2721 scc_id: 0,
2722 base,
2723 recursive,
2724 delta_rel: RelId(2),
2725 full_rel: RelId(3),
2726 };
2727
2728 let result = executor.execute_node(&node);
2729 assert!(result.is_ok());
2730
2731 let result = result.unwrap();
2732 assert_eq!(buffer_row_count(&executor, &result), 2);
2734 }
2735
2736 #[test]
2737 fn test_execute_fixpoint_via_node() {
2738 let mut executor = match create_test_executor() {
2740 Some(e) => e,
2741 None => {
2742 eprintln!("Skipping test: no CUDA device available");
2743 return;
2744 }
2745 };
2746
2747 let buffer = create_test_buffer(&executor, &[1, 2, 3], "key");
2749 executor.store_mut().put("base_rel", buffer);
2750 executor.register_relation(RelId(1), "base_rel");
2751
2752 let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2754 let empty_buffer = executor.create_empty_buffer(empty_schema).unwrap();
2755 executor.store_mut().put("empty_rel", empty_buffer);
2756 executor.register_relation(RelId(4), "empty_rel");
2757
2758 let base = Box::new(RirNode::Scan { rel: RelId(1) });
2759 let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2760
2761 let node = RirNode::Fixpoint {
2762 scc_id: 0,
2763 base,
2764 recursive,
2765 delta_rel: RelId(2),
2766 full_rel: RelId(3),
2767 };
2768
2769 let result = executor.execute_node(&node);
2770 assert!(result.is_ok());
2771
2772 let result = result.unwrap();
2773 assert_eq!(buffer_row_count(&executor, &result), 3);
2774 }
2775
2776 #[test]
2777 fn test_fixpoint_cleanup() {
2778 let mut executor = match create_test_executor() {
2780 Some(e) => e,
2781 None => {
2782 eprintln!("Skipping test: no CUDA device available");
2783 return;
2784 }
2785 };
2786
2787 let buffer = create_test_buffer(&executor, &[1, 2], "key");
2788 executor.store_mut().put("base_rel", buffer);
2789 executor.register_relation(RelId(1), "base_rel");
2790
2791 let empty_schema = Schema::new(vec![("key".to_string(), ScalarType::U32)]);
2792 let empty_buffer = executor.create_empty_buffer(empty_schema).unwrap();
2793 executor.store_mut().put("empty_rel", empty_buffer);
2794 executor.register_relation(RelId(4), "empty_rel");
2795
2796 executor.register_relation(RelId(2), "__delta_test");
2798 executor.register_relation(RelId(3), "__full_test");
2799
2800 let base = Box::new(RirNode::Scan { rel: RelId(1) });
2801 let recursive = Box::new(RirNode::Scan { rel: RelId(4) });
2802
2803 let node = RirNode::Fixpoint {
2804 scc_id: 0,
2805 base,
2806 recursive,
2807 delta_rel: RelId(2),
2808 full_rel: RelId(3),
2809 };
2810
2811 let result = executor.execute_node(&node);
2812 assert!(result.is_ok());
2813
2814 assert!(!executor.store().contains("__delta_test"));
2816 assert!(!executor.store().contains("__full_test"));
2817 }
2818
2819 #[test]
2822 fn test_execute_plan_empty() {
2823 let mut executor = match create_test_executor() {
2824 Some(e) => e,
2825 None => {
2826 eprintln!("Skipping test: no CUDA device available");
2827 return;
2828 }
2829 };
2830
2831 let plan = ExecutionPlan::new(vec![]);
2832
2833 let result = executor.execute_plan(&plan);
2834 assert!(result.is_ok());
2835 let result = result.unwrap();
2836 assert_eq!(buffer_row_count(&executor, &result), 0);
2837 }
2838
2839 #[test]
2840 fn test_execute_plan_with_stratum() {
2841 let mut executor = match create_test_executor() {
2842 Some(e) => e,
2843 None => {
2844 eprintln!("Skipping test: no CUDA device available");
2845 return;
2846 }
2847 };
2848
2849 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2851 executor.store_mut().put("input", buffer);
2852 executor.register_relation(RelId(1), "input");
2853
2854 let scc = Scc {
2856 id: 0,
2857 predicates: vec!["output".to_string()],
2858 is_recursive: false,
2859 };
2860
2861 let rule = CompiledRule {
2862 head: "output".to_string(),
2863 body: RirNode::Scan { rel: RelId(1) },
2864 meta: RirMeta::default(),
2865 };
2866
2867 let stratum = Stratum {
2868 id: 0,
2869 sccs: vec![0],
2870 };
2871
2872 let plan = ExecutionPlan {
2873 sccs: vec![scc],
2874 strata: vec![stratum],
2875 rules_by_scc: vec![vec![rule]],
2876 est_memory_peak: 0,
2877 rel_arities: std::collections::HashMap::new(),
2878 };
2879
2880 let result = executor.execute_plan(&plan);
2881 assert!(result.is_ok());
2882
2883 assert!(executor.store().contains("output"));
2885 let output = executor.store().get("output").unwrap();
2886 assert_eq!(buffer_row_count(&executor, output), 5);
2887 }
2888
2889 #[test]
2890 fn test_apply_deltas_and_recompute_updates_dependents() {
2891 let mut executor = match create_test_executor() {
2892 Some(e) => e,
2893 None => {
2894 eprintln!("Skipping test: no CUDA device available");
2895 return;
2896 }
2897 };
2898
2899 let input = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2900 executor.store_mut().put("input", input);
2901 executor.register_relation(RelId(1), "input");
2902
2903 let scc0 = Scc {
2906 id: 0,
2907 predicates: vec!["input".to_string()],
2908 is_recursive: false,
2909 };
2910 let scc1 = Scc {
2911 id: 1,
2912 predicates: vec!["output".to_string()],
2913 is_recursive: false,
2914 };
2915
2916 let input_rule = CompiledRule {
2917 head: "input".to_string(),
2918 body: RirNode::Scan { rel: RelId(1) },
2919 meta: RirMeta::default(),
2920 };
2921
2922 let output_rule = CompiledRule {
2923 head: "output".to_string(),
2924 body: RirNode::Filter {
2925 input: Box::new(RirNode::Scan { rel: RelId(1) }),
2926 predicate: Expr::Compare {
2927 left: Box::new(Expr::Column(0)),
2928 op: CompareOp::Gt,
2929 right: Box::new(Expr::Const(ConstValue::U32(2))),
2930 },
2931 },
2932 meta: RirMeta::default(),
2933 };
2934
2935 let stratum = Stratum {
2936 id: 0,
2937 sccs: vec![0, 1],
2938 };
2939
2940 let plan = ExecutionPlan {
2941 sccs: vec![scc0, scc1],
2942 strata: vec![stratum],
2943 rules_by_scc: vec![vec![input_rule], vec![output_rule]],
2944 est_memory_peak: 0,
2945 rel_arities: std::collections::HashMap::new(),
2946 };
2947
2948 executor.execute_plan(&plan).expect("initial execute_plan");
2949 let initial_out = executor.store().get("output").expect("output missing");
2950 let initial_vals = read_buffer_u32(&executor, initial_out, 0);
2951 assert_eq!(initial_vals, vec![3, 4, 5]);
2952
2953 let delete_buf = create_test_buffer(&executor, &[5], "key");
2954 let insert_buf = create_test_buffer(&executor, &[10], "key");
2955
2956 let mut deltas = HashMap::new();
2957 deltas.insert(
2958 "input".to_string(),
2959 RelationDelta::new(Some(insert_buf), Some(delete_buf)),
2960 );
2961
2962 executor
2963 .apply_deltas_and_recompute(&plan, &deltas)
2964 .expect("apply_deltas_and_recompute");
2965
2966 let out = executor
2967 .store()
2968 .get("output")
2969 .expect("output missing after recompute");
2970 let vals = read_buffer_u32(&executor, out, 0);
2971 assert_eq!(vals, vec![3, 4, 10]);
2972 }
2973
2974 #[test]
2975 fn test_apply_deltas_and_recompute_insert_only_recomputes_anti_join() {
2976 let mut executor = match create_test_executor() {
2977 Some(e) => e,
2978 None => {
2979 eprintln!("Skipping test: no CUDA device available");
2980 return;
2981 }
2982 };
2983
2984 let lhs = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
2985 executor.store_mut().put("lhs", lhs);
2986 executor.register_relation(RelId(1), "lhs");
2987
2988 let blocked = create_test_buffer(&executor, &[], "key");
2989 executor.store_mut().put("blocked", blocked);
2990 executor.register_relation(RelId(2), "blocked");
2991
2992 let scc0 = Scc {
2996 id: 0,
2997 predicates: vec!["lhs".to_string()],
2998 is_recursive: false,
2999 };
3000 let scc1 = Scc {
3001 id: 1,
3002 predicates: vec!["blocked".to_string()],
3003 is_recursive: false,
3004 };
3005 let scc2 = Scc {
3006 id: 2,
3007 predicates: vec!["out".to_string()],
3008 is_recursive: false,
3009 };
3010
3011 let lhs_rule = CompiledRule {
3012 head: "lhs".to_string(),
3013 body: RirNode::Scan { rel: RelId(1) },
3014 meta: RirMeta::default(),
3015 };
3016 let blocked_rule = CompiledRule {
3017 head: "blocked".to_string(),
3018 body: RirNode::Scan { rel: RelId(2) },
3019 meta: RirMeta::default(),
3020 };
3021 let out_rule = CompiledRule {
3022 head: "out".to_string(),
3023 body: RirNode::Join {
3024 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3025 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3026 left_keys: vec![0],
3027 right_keys: vec![0],
3028 join_type: JoinType::Anti,
3029 },
3030 meta: RirMeta::default(),
3031 };
3032
3033 let stratum = Stratum {
3034 id: 0,
3035 sccs: vec![0, 1, 2],
3036 };
3037
3038 let plan = ExecutionPlan {
3039 sccs: vec![scc0, scc1, scc2],
3040 strata: vec![stratum],
3041 rules_by_scc: vec![vec![lhs_rule], vec![blocked_rule], vec![out_rule]],
3042 est_memory_peak: 0,
3043 rel_arities: std::collections::HashMap::new(),
3044 };
3045
3046 executor.execute_plan(&plan).expect("initial execute_plan");
3047 let initial = executor.store().get("out").expect("out missing");
3048 let initial_vals = read_buffer_u32(&executor, initial, 0);
3049 assert_eq!(initial_vals, vec![1, 2, 3, 4, 5]);
3050
3051 let insert_buf = create_test_buffer(&executor, &[2, 4], "key");
3053 let mut deltas = HashMap::new();
3054 deltas.insert(
3055 "blocked".to_string(),
3056 RelationDelta::new(Some(insert_buf), None),
3057 );
3058
3059 executor
3060 .apply_deltas_and_recompute(&plan, &deltas)
3061 .expect("apply_deltas_and_recompute");
3062
3063 let out = executor
3064 .store()
3065 .get("out")
3066 .expect("out missing after update");
3067 let vals = read_buffer_u32(&executor, out, 0);
3068 assert_eq!(vals, vec![1, 3, 5]);
3069 }
3070
3071 #[test]
3074 fn test_execute_filter_project_chain() {
3075 let mut executor = match create_test_executor() {
3076 Some(e) => e,
3077 None => {
3078 eprintln!("Skipping test: no CUDA device available");
3079 return;
3080 }
3081 };
3082
3083 let buffer = create_test_buffer(&executor, &[1, 2, 3, 4, 5], "key");
3085 executor.store_mut().put("input", buffer);
3086 executor.register_relation(RelId(1), "input");
3087
3088 let scan = RirNode::Scan { rel: RelId(1) };
3090 let filter = RirNode::Filter {
3091 input: Box::new(scan),
3092 predicate: Expr::Compare {
3093 left: Box::new(Expr::Column(0)),
3094 op: CompareOp::Gt,
3095 right: Box::new(Expr::Const(ConstValue::U32(2))),
3096 },
3097 };
3098 let project = RirNode::Project {
3099 input: Box::new(filter),
3100 columns: vec![ProjectExpr::Column(0)],
3101 };
3102
3103 let result = executor.execute_node(&project);
3104 assert!(result.is_ok());
3105
3106 let result = result.unwrap();
3107 assert_eq!(buffer_row_count(&executor, &result), 3);
3108
3109 let values = read_buffer_u32(&executor, &result, 0);
3110 assert_eq!(values, vec![3, 4, 5]);
3111 }
3112
3113 fn duplicate_join_union_plan() -> RirNode {
3116 let join = RirNode::Join {
3117 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3118 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3119 left_keys: vec![0],
3120 right_keys: vec![0],
3121 join_type: JoinType::Inner,
3122 };
3123 RirNode::Union {
3124 inputs: vec![join.clone(), join],
3125 }
3126 }
3127
3128 fn seed_cse_join_fixture(executor: &mut Executor, right: &[u32]) {
3129 executor.register_relation(RelId(1), "left");
3130 executor.register_relation(RelId(2), "right");
3131 let left = create_test_buffer(executor, &[1, 2, 3, 4], "key");
3132 let right = create_test_buffer(executor, right, "key");
3133 executor.put_relation("left", left);
3134 executor.put_relation("right", right);
3135 }
3136
3137 #[test]
3138 fn test_common_subexpression_cache_reuses_duplicate_inner_join_when_enabled() {
3139 let mut executor = match create_test_executor_with_config(
3140 RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3141 ) {
3142 Some(e) => e,
3143 None => {
3144 eprintln!("Skipping test: no CUDA device available");
3145 return;
3146 }
3147 };
3148 seed_cse_join_fixture(&mut executor, &[2, 3, 5]);
3149
3150 let result = executor
3151 .execute_node(&duplicate_join_union_plan())
3152 .expect("duplicate join union executes");
3153
3154 assert_eq!(buffer_row_count(&executor, &result), 2);
3155 let stats = executor.common_subexpression_stats();
3156 assert_eq!(stats.hits, 1);
3157 assert!(stats.misses >= 1);
3158 assert_eq!(stats.unsafe_rejections, 0);
3159 }
3160
3161 #[test]
3162 fn test_common_subexpression_off_on_preserves_output_and_records_reuse_only_when_enabled() {
3163 let mut disabled = match create_test_executor_with_config(
3164 RuntimeConfig::default().with_common_subexpression_elimination(Some(false)),
3165 ) {
3166 Some(e) => e,
3167 None => {
3168 eprintln!("Skipping test: no CUDA device available");
3169 return;
3170 }
3171 };
3172 let mut enabled = match create_test_executor_with_config(
3173 RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3174 ) {
3175 Some(e) => e,
3176 None => {
3177 eprintln!("Skipping test: no CUDA device available");
3178 return;
3179 }
3180 };
3181 seed_cse_join_fixture(&mut disabled, &[2, 3, 5]);
3182 seed_cse_join_fixture(&mut enabled, &[2, 3, 5]);
3183 let plan = duplicate_join_union_plan();
3184
3185 disabled.provider.reset_d2h_transfer_count();
3186 enabled.provider.reset_d2h_transfer_count();
3187 let disabled_result = disabled.execute_node(&plan).expect("disabled CSE output");
3188 let enabled_result = enabled.execute_node(&plan).expect("enabled CSE output");
3189 let disabled_d2h = disabled.provider.d2h_transfer_count();
3190 let enabled_d2h = enabled.provider.d2h_transfer_count();
3191
3192 assert_eq!(
3193 read_buffer_u32(&disabled, &disabled_result, 0),
3194 read_buffer_u32(&enabled, &enabled_result, 0)
3195 );
3196 assert_eq!(enabled_d2h, disabled_d2h);
3197 assert_eq!(disabled.common_subexpression_stats().hits, 0);
3198 assert_eq!(disabled.common_subexpression_stats().misses, 0);
3199 assert_eq!(enabled.common_subexpression_stats().hits, 1);
3200 }
3201
3202 #[test]
3203 fn test_common_subexpression_cache_invalidates_on_relation_generation_change() {
3204 let mut executor = match create_test_executor_with_config(
3205 RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3206 ) {
3207 Some(e) => e,
3208 None => {
3209 eprintln!("Skipping test: no CUDA device available");
3210 return;
3211 }
3212 };
3213 seed_cse_join_fixture(&mut executor, &[2, 3, 5]);
3214 let plan = duplicate_join_union_plan();
3215
3216 executor.execute_node(&plan).expect("first execution");
3217 assert_eq!(executor.common_subexpression_stats().hits, 1);
3218
3219 let changed_right = create_test_buffer(&executor, &[4], "key");
3220 executor.put_relation("right", changed_right);
3221 let result = executor.execute_node(&plan).expect("second execution");
3222
3223 assert_eq!(buffer_row_count(&executor, &result), 1);
3224 let stats = executor.common_subexpression_stats();
3225 assert_eq!(stats.hits, 2);
3226 assert!(stats.misses >= 2);
3227 }
3228
3229 #[test]
3230 fn test_common_subexpression_cache_rejects_unsafe_difference_boundary() {
3231 let mut executor = match create_test_executor_with_config(
3232 RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3233 ) {
3234 Some(e) => e,
3235 None => {
3236 eprintln!("Skipping test: no CUDA device available");
3237 return;
3238 }
3239 };
3240 seed_cse_join_fixture(&mut executor, &[2, 3, 5]);
3241 let diff = RirNode::Diff {
3242 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3243 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3244 };
3245
3246 executor
3247 .execute_node(&RirNode::Union {
3248 inputs: vec![diff.clone(), diff],
3249 })
3250 .expect("unsafe duplicate diff still executes without CSE sharing");
3251
3252 let stats = executor.common_subexpression_stats();
3253 assert_eq!(stats.hits, 0);
3254 assert!(stats.unsafe_rejections >= 1);
3255 assert!(stats
3256 .rejection_reasons
3257 .iter()
3258 .any(|reason| reason == "negation_or_difference_boundary"));
3259 }
3260
3261 #[test]
3262 fn test_common_subexpression_key_rejects_aggregate_and_tensor_boundaries() {
3263 let mut executor = match create_test_executor_with_config(
3264 RuntimeConfig::default().with_common_subexpression_elimination(Some(true)),
3265 ) {
3266 Some(e) => e,
3267 None => {
3268 eprintln!("Skipping test: no CUDA device available");
3269 return;
3270 }
3271 };
3272 let aggregate = RirNode::GroupBy {
3273 input: Box::new(RirNode::Scan { rel: RelId(1) }),
3274 key_cols: vec![0],
3275 aggs: vec![(0, xlog_core::AggOp::Count)],
3276 };
3277 let tensor = RirNode::TensorMaskedJoin {
3278 mask_name: "W".to_string(),
3279 schema_size: 1,
3280 left_keys: vec![0],
3281 right_keys: vec![0],
3282 rel_index: vec![(RelId(1), "left".to_string())],
3283 head_rel_name: "head".to_string(),
3284 head_rel_id: RelId(3),
3285 max_active_rules: 1,
3286 head_projection: vec![0],
3287 };
3288 let chain = RirNode::ChainJoin {
3289 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3290 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3291 left_key: 0,
3292 right_key: 0,
3293 output_columns: vec![ProjectExpr::Column(0)],
3294 fallback: Box::new(RirNode::Join {
3295 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3296 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3297 left_keys: vec![0],
3298 right_keys: vec![0],
3299 join_type: JoinType::Inner,
3300 }),
3301 };
3302
3303 assert!(executor.common_subexpression_key(&aggregate).is_none());
3304 assert!(executor.common_subexpression_key(&tensor).is_none());
3305 assert!(executor.common_subexpression_key(&chain).is_none());
3306
3307 let reasons = &executor.common_subexpression_stats().rejection_reasons;
3308 assert!(reasons.iter().any(|reason| reason == "aggregate_boundary"));
3309 assert!(reasons
3310 .iter()
3311 .any(|reason| reason == "provenance_or_tensor_boundary"));
3312 assert!(reasons
3313 .iter()
3314 .any(|reason| reason == "specialized_dispatch_boundary"));
3315 }
3316
3317 fn adaptive_scc() -> Scc {
3320 Scc {
3321 id: 0,
3322 predicates: vec!["out".to_string()],
3323 is_recursive: false,
3324 }
3325 }
3326
3327 fn adaptive_stratum() -> Stratum {
3328 Stratum {
3329 id: 0,
3330 sccs: vec![0],
3331 }
3332 }
3333
3334 fn adaptive_rule(body: RirNode) -> CompiledRule {
3335 CompiledRule {
3336 head: "out".to_string(),
3337 body,
3338 meta: RirMeta::default(),
3339 }
3340 }
3341
3342 fn adaptive_plan(body: RirNode) -> ExecutionPlan {
3343 ExecutionPlan {
3344 sccs: vec![adaptive_scc()],
3345 strata: vec![adaptive_stratum()],
3346 rules_by_scc: vec![vec![adaptive_rule(body)]],
3347 est_memory_peak: 0,
3348 rel_arities: std::collections::HashMap::new(),
3349 }
3350 }
3351
3352 fn adaptive_baseline_join_plan() -> ExecutionPlan {
3353 adaptive_plan(RirNode::Project {
3354 input: Box::new(RirNode::Join {
3355 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3356 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3357 left_keys: vec![0],
3358 right_keys: vec![0],
3359 join_type: JoinType::Inner,
3360 }),
3361 columns: vec![ProjectExpr::Column(0)],
3362 })
3363 }
3364
3365 fn adaptive_scan_candidate_plan(rel: RelId) -> ExecutionPlan {
3366 adaptive_plan(RirNode::Scan { rel })
3367 }
3368
3369 fn seed_adaptive_fixture(executor: &mut Executor, right: &[u32]) {
3370 executor.register_relation(RelId(1), "left");
3371 executor.register_relation(RelId(2), "right");
3372 let left = create_test_buffer(executor, &[1, 2, 3, 4, 5, 6, 7, 8], "key");
3373 let right = create_test_buffer(executor, right, "key");
3374 executor.put_relation("left", left);
3375 executor.put_relation("right", right);
3376 }
3377
3378 #[test]
3379 fn test_adaptive_reoptimization_disabled_uses_baseline_and_records_decision() {
3380 let mut executor = match create_test_executor_with_config(
3381 RuntimeConfig::default().with_adaptive_reoptimization(Some(false)),
3382 ) {
3383 Some(e) => e,
3384 None => {
3385 eprintln!("Skipping test: no CUDA device available");
3386 return;
3387 }
3388 };
3389 seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3390
3391 let baseline = adaptive_baseline_join_plan();
3392 let candidate = adaptive_scan_candidate_plan(RelId(2));
3393 let result = executor
3394 .execute_plan_with_adaptive_candidate(&baseline, &candidate)
3395 .expect("disabled adaptation executes baseline");
3396
3397 assert_eq!(
3398 read_buffer_u32(&executor, &result, 0),
3399 (1..=8).collect::<Vec<_>>()
3400 );
3401 let stats = executor.adaptive_reoptimization_stats();
3402 assert_eq!(stats.disabled, 1);
3403 assert_eq!(stats.adopted, 0);
3404 assert_eq!(stats.rolled_back, 0);
3405 assert_eq!(
3406 stats.last_decision.as_ref().map(|decision| decision.action),
3407 Some(AdaptiveReoptimizationAction::Disabled)
3408 );
3409 }
3410
3411 #[test]
3412 fn test_adaptive_reoptimization_adopts_equivalent_candidate_and_records_telemetry() {
3413 let mut executor = match create_test_executor_with_config(
3414 RuntimeConfig::default().with_adaptive_reoptimization(Some(true)),
3415 ) {
3416 Some(e) => e,
3417 None => {
3418 eprintln!("Skipping test: no CUDA device available");
3419 return;
3420 }
3421 };
3422 seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3423 executor.provider.reset_host_transfer_stats();
3424
3425 let baseline = adaptive_baseline_join_plan();
3426 let candidate = adaptive_scan_candidate_plan(RelId(1));
3427 let result = executor
3428 .execute_plan_with_adaptive_candidate(&baseline, &candidate)
3429 .expect("equivalent candidate is adopted");
3430
3431 assert_eq!(
3432 read_buffer_u32(&executor, &result, 0),
3433 (1..=8).collect::<Vec<_>>()
3434 );
3435 let stats = executor.adaptive_reoptimization_stats();
3436 assert_eq!(stats.adopted, 1);
3437 assert_eq!(stats.rolled_back, 0);
3438 assert_eq!(stats.last_observations.len(), 1);
3439 assert!(stats.last_observations[0].cardinality_delta_abs > 0);
3440 assert!(stats.last_observations[0].selectivity_delta_abs > 0.0);
3441 assert_eq!(stats.data_plane_dtoh_calls, 0);
3442 }
3443
3444 #[test]
3445 fn test_adaptive_reoptimization_rolls_back_bad_candidate_with_typed_diagnostic() {
3446 let mut executor = match create_test_executor_with_config(
3447 RuntimeConfig::default().with_adaptive_reoptimization(Some(true)),
3448 ) {
3449 Some(e) => e,
3450 None => {
3451 eprintln!("Skipping test: no CUDA device available");
3452 return;
3453 }
3454 };
3455 seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3456
3457 let baseline = adaptive_baseline_join_plan();
3458 let bad_candidate = adaptive_scan_candidate_plan(RelId(2));
3459 executor.put_relation("right", create_test_buffer(&executor, &[99], "key"));
3460 let result = executor
3461 .execute_plan_with_adaptive_candidate(&baseline, &bad_candidate)
3462 .expect("bad candidate rolls back to baseline output");
3463
3464 assert_eq!(read_buffer_u32(&executor, &result, 0), Vec::<u32>::new());
3465 let out = executor.store().get("out").expect("rollback restored out");
3466 assert_eq!(read_buffer_u32(&executor, out, 0), Vec::<u32>::new());
3467 let stats = executor.adaptive_reoptimization_stats();
3468 assert_eq!(stats.adopted, 0);
3469 assert_eq!(stats.rolled_back, 1);
3470 assert!(stats.diagnostics.iter().any(|diagnostic| {
3471 diagnostic.kind == AdaptiveReoptimizationDiagnosticKind::CandidateOutputMismatch
3472 }));
3473 }
3474
3475 #[test]
3476 fn test_adaptive_reoptimization_decisions_are_deterministic_under_replay() {
3477 let mut executor = match create_test_executor_with_config(
3478 RuntimeConfig::default().with_adaptive_reoptimization(Some(true)),
3479 ) {
3480 Some(e) => e,
3481 None => {
3482 eprintln!("Skipping test: no CUDA device available");
3483 return;
3484 }
3485 };
3486 seed_adaptive_fixture(&mut executor, &[1, 2, 3, 4, 5, 6, 7, 8]);
3487 let baseline = adaptive_baseline_join_plan();
3488 executor
3489 .execute_plan(&baseline)
3490 .expect("baseline execution records telemetry");
3491 let observations = executor
3492 .adaptive_reoptimization_stats()
3493 .last_observations
3494 .clone();
3495
3496 let first = executor.replay_adaptive_reoptimization_decision(&observations);
3497 for _ in 0..100 {
3498 assert_eq!(
3499 executor.replay_adaptive_reoptimization_decision(&observations),
3500 first
3501 );
3502 }
3503 }
3504
3505 fn persistent_index_join_plan() -> ExecutionPlan {
3508 adaptive_baseline_join_plan()
3509 }
3510
3511 fn persistent_index_heavy_join_plan(repetitions: usize) -> ExecutionPlan {
3512 let mut inputs = Vec::with_capacity(repetitions);
3513 for _ in 0..repetitions {
3514 inputs.push(RirNode::Project {
3515 input: Box::new(RirNode::Join {
3516 left: Box::new(RirNode::Scan { rel: RelId(1) }),
3517 right: Box::new(RirNode::Scan { rel: RelId(2) }),
3518 left_keys: vec![0],
3519 right_keys: vec![0],
3520 join_type: JoinType::Semi,
3521 }),
3522 columns: vec![ProjectExpr::Column(0)],
3523 });
3524 }
3525 adaptive_plan(RirNode::Union { inputs })
3526 }
3527
3528 fn seed_persistent_index_fixture(executor: &mut Executor, rows: u32) {
3529 executor.register_relation(RelId(1), "left");
3530 executor.register_relation(RelId(2), "right");
3531 let values: Vec<u32> = (0..rows).collect();
3532 let left = create_test_buffer(executor, &values, "key");
3533 let right = create_test_buffer(executor, &values, "key");
3534 executor.put_relation("left", left);
3535 executor.put_relation("right", right);
3536 }
3537
3538 fn seed_persistent_index_performance_fixture(
3539 executor: &mut Executor,
3540 left_rows: u32,
3541 right_rows: u32,
3542 ) {
3543 executor.register_relation(RelId(1), "left");
3544 executor.register_relation(RelId(2), "right");
3545 let left_values: Vec<u32> = (0..left_rows).collect();
3546 let right_values: Vec<u32> = (0..right_rows).collect();
3547 let left = create_test_buffer(executor, &left_values, "key");
3548 let right = create_test_buffer(executor, &right_values, "key");
3549 executor.put_relation("left", left);
3550 executor.put_relation("right", right);
3551 }
3552
3553 fn warm_persistent_index(executor: &mut Executor, plan: &ExecutionPlan, times: usize) {
3554 for _ in 0..times {
3555 executor.execute_plan(plan).expect("persistent index plan");
3556 }
3557 }
3558
3559 fn median_duration(samples: &mut [Duration]) -> Duration {
3560 samples.sort_unstable();
3561 samples[samples.len() / 2]
3562 }
3563
3564 fn measure_persistent_index_fixture(
3565 mut executor: Executor,
3566 plan: &ExecutionPlan,
3567 warmup: usize,
3568 iterations: usize,
3569 ) -> (
3570 Duration,
3571 u64,
3572 JoinIndexCacheStats,
3573 xlog_cuda::provider::HostTransferStats,
3574 ) {
3575 let mut output_rows = None;
3576 warm_persistent_index(&mut executor, plan, warmup);
3577 executor.provider.reset_host_transfer_stats();
3578
3579 let mut samples = Vec::with_capacity(iterations);
3580 for _ in 0..iterations {
3581 let start = Instant::now();
3582 let output = executor.execute_plan(plan).expect("persistent index plan");
3583 executor
3584 .provider
3585 .device()
3586 .synchronize()
3587 .expect("sync device");
3588 samples.push(start.elapsed());
3589 output_rows = Some(if let Some(buffer) = executor.store().get("out") {
3590 executor
3591 .buffer_row_count(buffer)
3592 .expect("read output row count")
3593 .into()
3594 } else {
3595 output.num_rows()
3596 });
3597 }
3598
3599 (
3600 median_duration(&mut samples),
3601 output_rows.expect("at least one measured execution"),
3602 executor.join_index_cache_stats(),
3603 executor.provider.host_transfer_stats(),
3604 )
3605 }
3606
3607 #[test]
3608 fn test_persistent_hash_index_reuses_across_repeated_session_evaluations() {
3609 let mut executor = match create_test_executor_with_config(
3610 RuntimeConfig::default().with_persistent_hash_indexes(Some(true)),
3611 ) {
3612 Some(e) => e,
3613 None => {
3614 eprintln!("Skipping test: no CUDA device available");
3615 return;
3616 }
3617 };
3618 seed_persistent_index_fixture(&mut executor, 2_500);
3619 let plan = persistent_index_join_plan();
3620 executor.provider.reset_host_transfer_stats();
3621
3622 warm_persistent_index(&mut executor, &plan, 5);
3623
3624 let stats = executor.join_index_cache_stats();
3625 let transfers = executor.provider.host_transfer_stats();
3626 assert_eq!(stats.builds, 1);
3627 assert!(stats.hits >= 1);
3628 assert_eq!(stats.stale_rejections, 0);
3629 assert_eq!(stats.entries, 1);
3630 assert!(stats.total_bytes > 0);
3631 assert_eq!(transfers.dtoh_calls, 0);
3632 assert_eq!(transfers.htod_calls, 0);
3633 }
3634
3635 #[test]
3636 fn test_persistent_hash_index_invalidates_on_relation_generation_change() {
3637 let mut executor = match create_test_executor_with_config(
3638 RuntimeConfig::default().with_persistent_hash_indexes(Some(true)),
3639 ) {
3640 Some(e) => e,
3641 None => {
3642 eprintln!("Skipping test: no CUDA device available");
3643 return;
3644 }
3645 };
3646 seed_persistent_index_fixture(&mut executor, 2_500);
3647 let plan = persistent_index_join_plan();
3648 warm_persistent_index(&mut executor, &plan, 5);
3649 assert_eq!(executor.join_index_cache_stats().entries, 1);
3650
3651 let changed_values: Vec<u32> = (10_000..12_500).collect();
3652 let changed_right = create_test_buffer(&executor, &changed_values, "key");
3653 executor.put_relation("right", changed_right);
3654
3655 let stats = executor.join_index_cache_stats();
3656 assert_eq!(stats.entries, 0);
3657 assert!(stats.invalidations >= 1);
3658 }
3659
3660 #[test]
3661 fn test_persistent_hash_index_background_build_records_requests() {
3662 let mut executor = match create_test_executor_with_config(
3663 RuntimeConfig::default()
3664 .with_persistent_hash_indexes(Some(true))
3665 .with_persistent_hash_index_background_build(Some(true)),
3666 ) {
3667 Some(e) => e,
3668 None => {
3669 eprintln!("Skipping test: no CUDA device available");
3670 return;
3671 }
3672 };
3673 seed_persistent_index_fixture(&mut executor, 2_500);
3674 let plan = persistent_index_join_plan();
3675
3676 warm_persistent_index(&mut executor, &plan, 5);
3677
3678 let stats = executor.join_index_cache_stats();
3679 assert_eq!(stats.background_build_requests, 1);
3680 assert_eq!(stats.background_builds_completed, 1);
3681 assert_eq!(stats.entries, 1);
3682 }
3683
3684 #[test]
3685 fn test_persistent_hash_index_background_build_defers_current_join_reuse() {
3686 let mut executor = match create_test_executor_with_config(
3687 RuntimeConfig::default()
3688 .with_persistent_hash_indexes(Some(true))
3689 .with_persistent_hash_index_background_build(Some(true)),
3690 ) {
3691 Some(e) => e,
3692 None => {
3693 eprintln!("Skipping test: no CUDA device available");
3694 return;
3695 }
3696 };
3697 seed_persistent_index_fixture(&mut executor, 2_500);
3698 let plan = persistent_index_join_plan();
3699
3700 let mut before_build = executor.join_index_cache_stats();
3701 let mut after_build = None;
3702 for _ in 0..5 {
3703 executor
3704 .execute_plan(&plan)
3705 .expect("background-build warm evaluation");
3706 let stats = executor.join_index_cache_stats();
3707 if stats.background_build_requests > before_build.background_build_requests {
3708 after_build = Some(stats);
3709 break;
3710 }
3711 before_build = stats;
3712 }
3713
3714 let after_first = after_build.expect("background build request observed");
3715 assert_eq!(after_first.background_build_requests, 1);
3716 assert_eq!(after_first.background_builds_completed, 1);
3717 assert_eq!(after_first.background_builds_deferred, 1);
3718 assert_eq!(
3719 after_first.hits, before_build.hits,
3720 "background build must not be consumed by the same evaluation that requested it"
3721 );
3722 assert_eq!(after_first.entries, 1);
3723
3724 executor
3725 .execute_plan(&plan)
3726 .expect("second evaluation reuses completed background index");
3727 let after_second = executor.join_index_cache_stats();
3728 assert_eq!(after_second.background_build_requests, 1);
3729 assert_eq!(after_second.background_builds_deferred, 1);
3730 assert!(after_second.hits >= 1);
3731 }
3732
3733 #[test]
3734 fn test_persistent_hash_index_performance_fixture_meets_speedup_target() {
3735 const LEFT_ROWS: u32 = 8;
3736 const RIGHT_ROWS: u32 = 8_000_000;
3737 const JOIN_REPETITIONS: usize = 1;
3738 const WARMUP: usize = 12;
3739 const ITERATIONS: usize = 9;
3740
3741 let mut cached = match create_test_executor_with_config(
3742 RuntimeConfig::default().with_persistent_hash_indexes(Some(true)),
3743 ) {
3744 Some(e) => e,
3745 None => {
3746 eprintln!("Skipping test: no CUDA device available");
3747 return;
3748 }
3749 };
3750 seed_persistent_index_performance_fixture(&mut cached, LEFT_ROWS, RIGHT_ROWS);
3751
3752 let mut uncached = match create_test_executor_with_config(
3753 RuntimeConfig::default().with_persistent_hash_indexes(Some(false)),
3754 ) {
3755 Some(e) => e,
3756 None => {
3757 eprintln!("Skipping test: no CUDA device available");
3758 return;
3759 }
3760 };
3761 seed_persistent_index_performance_fixture(&mut uncached, LEFT_ROWS, RIGHT_ROWS);
3762
3763 let plan = persistent_index_heavy_join_plan(JOIN_REPETITIONS);
3764 let (cached_median, cached_rows, cached_stats, cached_transfers) =
3765 measure_persistent_index_fixture(cached, &plan, WARMUP, ITERATIONS);
3766 let (uncached_median, uncached_rows, uncached_stats, uncached_transfers) =
3767 measure_persistent_index_fixture(uncached, &plan, WARMUP, ITERATIONS);
3768
3769 let speedup_ratio = uncached_median.as_secs_f64() / cached_median.as_secs_f64();
3770 eprintln!(
3771 "persistent_hash_index_perf left_rows={} right_rows={} join_repetitions={} warmup={} iterations={} \
3772 cached_median_sec={:.9} uncached_median_sec={:.9} speedup_ratio={:.3} \
3773 cached_output_rows={} uncached_output_rows={} cached_builds={} cached_hits={} \
3774 uncached_builds={} cached_dtoh_calls={} cached_htod_calls={}",
3775 LEFT_ROWS,
3776 RIGHT_ROWS,
3777 JOIN_REPETITIONS,
3778 WARMUP,
3779 ITERATIONS,
3780 cached_median.as_secs_f64(),
3781 uncached_median.as_secs_f64(),
3782 speedup_ratio,
3783 cached_rows,
3784 uncached_rows,
3785 cached_stats.builds,
3786 cached_stats.hits,
3787 uncached_stats.builds,
3788 cached_transfers.dtoh_calls,
3789 cached_transfers.htod_calls
3790 );
3791
3792 assert_eq!(cached_rows, uncached_rows);
3793 assert_eq!(cached_rows, LEFT_ROWS as u64);
3794 assert_eq!(cached_stats.builds, 1);
3795 assert!(cached_stats.hits >= ITERATIONS as u64);
3796 assert_eq!(uncached_stats.builds, 0);
3797 assert_eq!(cached_transfers.dtoh_calls, 0);
3798 assert_eq!(cached_transfers.htod_calls, 0);
3799 assert_eq!(uncached_transfers.dtoh_calls, 0);
3800 assert_eq!(uncached_transfers.htod_calls, 0);
3801 assert!(
3802 speedup_ratio >= 1.5,
3803 "persistent index speedup {:.3} below 1.5 target",
3804 speedup_ratio
3805 );
3806 }
3807
3808 #[test]
3811 fn test_reset_for_mc_relations_preserves_static_and_clears_dynamic() {
3812 let mut executor = match create_test_executor() {
3813 Some(e) => e,
3814 None => {
3815 eprintln!("Skipping: no CUDA device");
3816 return;
3817 }
3818 };
3819
3820 executor.register_relation(RelId(1), "base_rel");
3821 executor.register_relation(RelId(2), "dyn_rel");
3822
3823 let schema = Schema::new(vec![("x".to_string(), ScalarType::U32)]);
3824 let base = create_test_buffer(&executor, &[1u32], "x");
3825 let dyn_buf = create_test_buffer(&executor, &[9u32], "x");
3826 executor.put_relation("base_rel", base);
3827 executor.put_relation("dyn_rel", dyn_buf);
3828
3829 executor
3830 .reset_for_mc_relations(&["base_rel"], &[("dyn_rel", schema.clone())])
3831 .unwrap();
3832
3833 assert_eq!(
3834 buffer_row_count(&executor, executor.store().get("base_rel").unwrap()),
3835 1
3836 );
3837 assert_eq!(
3838 buffer_row_count(&executor, executor.store().get("dyn_rel").unwrap()),
3839 0
3840 );
3841 }
3842}