1use crate::harness::xgcf;
8use crate::harness::{CategoryResult, TestContext, TestResult};
9use std::collections::{BTreeMap, HashSet};
10use std::sync::Arc;
11use std::time::Instant;
12use xlog_core::{RuntimeConfig, ScalarType, Schema};
13use xlog_cuda::{CudaBuffer, CudaKernelProvider};
14use xlog_logic::Compiler;
15use xlog_runtime::Executor;
16
17const WIDENED_FRONTIER_REPLAY_SOURCE: &str = r#"
18 pred frontier_pred(u32).
19 pred widened_pred(u32).
20 pred frontier_edge(u32, u32).
21 pred blocked_pred(u32).
22 pred promoted(u32).
23 pred replay_reachable(u32).
24 pred rollback_hit(u32).
25
26 promoted(P) :- frontier_pred(P), widened_pred(P).
27 replay_reachable(P) :- promoted(P).
28 replay_reachable(Q) :- replay_reachable(P), frontier_edge(P, Q), frontier_pred(Q).
29 rollback_hit(P) :- replay_reachable(P), blocked_pred(P).
30"#;
31
32pub fn run_all(ctx: &TestContext) -> CategoryResult {
34 let mut results = CategoryResult::new("c15_determinism");
35 let start = Instant::now();
36
37 results.add_result(test_sort_reproducibility(ctx));
38 results.add_result(test_filter_reproducibility(ctx));
39 results.add_result(test_join_reproducibility(ctx));
40 results.add_result(test_dedup_reproducibility(ctx));
41 results.add_result(test_stable_sort_order(ctx));
42 results.add_result(test_mc_sample_reproducibility(ctx));
43 results.add_result(test_xgcf_forward_reproducibility(ctx));
44 results.add_result(test_xgcf_backward_reproducibility(ctx));
45 results.add_result(test_widened_frontier_replay_representative(ctx));
46
47 results.set_duration(start.elapsed());
48 results
49}
50
51fn widened_frontier_replay_unary_schema() -> Schema {
52 Schema::new(vec![("c0".to_string(), ScalarType::U32)])
53}
54
55fn widened_frontier_replay_binary_schema() -> Schema {
56 Schema::new(vec![
57 ("c0".to_string(), ScalarType::U32),
58 ("c1".to_string(), ScalarType::U32),
59 ])
60}
61
62fn upload_widened_frontier_replay_unary(
63 provider: &CudaKernelProvider,
64 values: &[u32],
65) -> Result<CudaBuffer, String> {
66 provider
67 .create_buffer_from_u32_columns(&[values], widened_frontier_replay_unary_schema())
68 .map_err(|e| format!("upload unary failed: {}", e))
69}
70
71fn upload_widened_frontier_replay_binary(
72 provider: &CudaKernelProvider,
73 values: &[(u32, u32)],
74) -> Result<CudaBuffer, String> {
75 let col0: Vec<u32> = values.iter().map(|(a, _)| *a).collect();
76 let col1: Vec<u32> = values.iter().map(|(_, b)| *b).collect();
77 provider
78 .create_buffer_from_u32_columns(&[&col0, &col1], widened_frontier_replay_binary_schema())
79 .map_err(|e| format!("upload binary failed: {}", e))
80}
81
82fn download_widened_frontier_replay_rows(
83 provider: &CudaKernelProvider,
84 buffer: &CudaBuffer,
85) -> Result<Vec<Vec<u32>>, String> {
86 let columns: Vec<Vec<u32>> = (0..buffer.arity())
87 .map(|col| {
88 provider
89 .download_column::<u32>(buffer, col)
90 .map_err(|e| format!("download column {} failed: {}", col, e))
91 })
92 .collect::<Result<Vec<_>, _>>()?;
93 if columns.is_empty() {
94 return Ok(Vec::new());
95 }
96 let row_count = columns[0].len();
97 let mut rows: Vec<Vec<u32>> = (0..row_count)
98 .map(|row| columns.iter().map(|col| col[row]).collect())
99 .collect();
100 rows.sort();
101 Ok(rows)
102}
103
104fn run_widened_frontier_replay(
105 provider: Arc<CudaKernelProvider>,
106) -> Result<BTreeMap<String, Vec<Vec<u32>>>, String> {
107 let mut compiler = Compiler::new();
108 let plan = compiler
109 .compile(WIDENED_FRONTIER_REPLAY_SOURCE)
110 .map_err(|e| format!("compile replay failed: {}", e))?;
111 let mut executor = Executor::new_with_config(
112 Arc::clone(&provider),
113 RuntimeConfig::default().with_wcoj_triangle_dispatch(Some(false)),
114 );
115 for (name, rel_id) in compiler.rel_ids() {
116 executor.register_relation(*rel_id, name);
117 }
118
119 let frontier_pred = [1, 2, 3, 4, 5];
120 let widened_pred = [2, 4];
121 let blocked_pred = [5];
122 let frontier_edge = [(2, 3), (3, 5), (4, 5)];
123 executor.put_relation(
124 "frontier_pred",
125 upload_widened_frontier_replay_unary(&provider, &frontier_pred)?,
126 );
127 executor.put_relation(
128 "widened_pred",
129 upload_widened_frontier_replay_unary(&provider, &widened_pred)?,
130 );
131 executor.put_relation(
132 "blocked_pred",
133 upload_widened_frontier_replay_unary(&provider, &blocked_pred)?,
134 );
135 executor.put_relation(
136 "frontier_edge",
137 upload_widened_frontier_replay_binary(&provider, &frontier_edge)?,
138 );
139 executor
140 .execute_plan(&plan)
141 .map_err(|e| format!("execute replay failed: {}", e))?;
142
143 let mut out = BTreeMap::new();
144 for name in ["promoted", "replay_reachable", "rollback_hit"] {
145 let buffer = executor
146 .store()
147 .get(name)
148 .ok_or_else(|| format!("missing replay relation {}", name))?;
149 out.insert(
150 name.to_string(),
151 download_widened_frontier_replay_rows(&provider, buffer)?,
152 );
153 }
154 Ok(out)
155}
156
157fn test_widened_frontier_replay_representative(ctx: &TestContext) -> TestResult {
160 let start = Instant::now();
161 let provider = match CudaKernelProvider::new(ctx.device.clone(), ctx.memory.clone()) {
162 Ok(p) => Arc::new(p),
163 Err(e) => {
164 return TestResult::error(
165 "test_widened_frontier_replay_representative",
166 start.elapsed(),
167 format!("provider init failed: {}", e),
168 )
169 }
170 };
171 let first = match run_widened_frontier_replay(Arc::clone(&provider)) {
172 Ok(snapshot) => snapshot,
173 Err(e) => {
174 return TestResult::error(
175 "test_widened_frontier_replay_representative",
176 start.elapsed(),
177 e,
178 )
179 }
180 };
181 let second = match run_widened_frontier_replay(provider) {
182 Ok(snapshot) => snapshot,
183 Err(e) => {
184 return TestResult::error(
185 "test_widened_frontier_replay_representative",
186 start.elapsed(),
187 e,
188 )
189 }
190 };
191
192 if first != second {
193 return TestResult::error(
194 "test_widened_frontier_replay_representative",
195 start.elapsed(),
196 format!(
197 "replay representative diverged: first={:?}, second={:?}",
198 first, second
199 ),
200 );
201 }
202 if first["promoted"].len() != 2
203 || first["replay_reachable"].len() != 4
204 || first["rollback_hit"].len() != 1
205 {
206 return TestResult::error(
207 "test_widened_frontier_replay_representative",
208 start.elapsed(),
209 format!("unexpected replay row counts: {:?}", first),
210 );
211 }
212
213 TestResult::passed(
214 "test_widened_frontier_replay_representative",
215 start.elapsed(),
216 )
217}
218
219fn test_mc_sample_reproducibility(ctx: &TestContext) -> TestResult {
221 let start = Instant::now();
222
223 let probs: Vec<f32> = vec![0.1, 0.5, 0.9];
224 let num_samples = 4096usize;
225 let seed = 424242u64;
226
227 let num_vars = probs.len();
229 let mut d_force_mask = ctx.memory.alloc::<u8>(num_vars.max(1)).unwrap();
230 ctx.device.inner().memset_zeros(&mut d_force_mask).unwrap();
231 let mut d_forced_value = ctx.memory.alloc::<u8>(num_vars.max(1)).unwrap();
232 ctx.device
233 .inner()
234 .memset_zeros(&mut d_forced_value)
235 .unwrap();
236
237 let a = match ctx.provider.sample_bernoulli_matrix(
238 &probs,
239 num_samples,
240 seed,
241 &d_force_mask.slice(..),
242 &d_forced_value.slice(..),
243 ) {
244 Ok(v) => v,
245 Err(e) => {
246 return TestResult::error(
247 "test_mc_sample_reproducibility",
248 start.elapsed(),
249 format!("sample_bernoulli_matrix failed: {}", e),
250 )
251 }
252 };
253 let b = match ctx.provider.sample_bernoulli_matrix(
254 &probs,
255 num_samples,
256 seed,
257 &d_force_mask.slice(..),
258 &d_forced_value.slice(..),
259 ) {
260 Ok(v) => v,
261 Err(e) => {
262 return TestResult::error(
263 "test_mc_sample_reproducibility",
264 start.elapsed(),
265 format!("sample_bernoulli_matrix failed (2nd run): {}", e),
266 )
267 }
268 };
269
270 if a != b {
271 return TestResult::error(
272 "test_mc_sample_reproducibility",
273 start.elapsed(),
274 format!(
275 "MC sampling not deterministic: outputs differ (len={})",
276 a.len()
277 ),
278 );
279 }
280
281 TestResult::passed("test_mc_sample_reproducibility", start.elapsed())
282}
283
284fn test_xgcf_forward_reproducibility(ctx: &TestContext) -> TestResult {
286 let start = Instant::now();
287
288 let spec = xgcf::tiny_xgcf_spec();
289 let a = match xgcf::run_tiny_xgcf_forward(ctx, &spec) {
290 Ok(v) => v,
291 Err(e) => {
292 return TestResult::error(
293 "test_xgcf_forward_reproducibility",
294 start.elapsed(),
295 format!("xgcf forward failed: {}", e),
296 )
297 }
298 };
299 let b = match xgcf::run_tiny_xgcf_forward(ctx, &spec) {
300 Ok(v) => v,
301 Err(e) => {
302 return TestResult::error(
303 "test_xgcf_forward_reproducibility",
304 start.elapsed(),
305 format!("xgcf forward failed (2nd run): {}", e),
306 )
307 }
308 };
309
310 if a != b {
311 return TestResult::error(
312 "test_xgcf_forward_reproducibility",
313 start.elapsed(),
314 "XGCF forward not deterministic: values differ across runs".to_string(),
315 );
316 }
317
318 TestResult::passed("test_xgcf_forward_reproducibility", start.elapsed())
319}
320
321fn test_xgcf_backward_reproducibility(ctx: &TestContext) -> TestResult {
323 let start = Instant::now();
324
325 let spec = xgcf::tiny_xgcf_spec();
326 let a = match xgcf::run_tiny_xgcf_backward(ctx, &spec) {
327 Ok(r) => r,
328 Err(e) => {
329 return TestResult::error(
330 "test_xgcf_backward_reproducibility",
331 start.elapsed(),
332 format!("xgcf backward failed: {}", e),
333 )
334 }
335 };
336 let b = match xgcf::run_tiny_xgcf_backward(ctx, &spec) {
337 Ok(r) => r,
338 Err(e) => {
339 return TestResult::error(
340 "test_xgcf_backward_reproducibility",
341 start.elapsed(),
342 format!("xgcf backward failed (2nd run): {}", e),
343 )
344 }
345 };
346
347 if a.values != b.values
348 || a.adj != b.adj
349 || a.grad_true != b.grad_true
350 || a.grad_false != b.grad_false
351 {
352 return TestResult::error(
353 "test_xgcf_backward_reproducibility",
354 start.elapsed(),
355 "XGCF backward not deterministic: outputs differ across runs".to_string(),
356 );
357 }
358
359 TestResult::passed("test_xgcf_backward_reproducibility", start.elapsed())
360}
361
362fn test_sort_reproducibility(ctx: &TestContext) -> TestResult {
367 let start = Instant::now();
368 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
369
370 const SIZE: usize = 10000;
371 const NUM_ITERATIONS: usize = 5;
372
373 let data: Vec<u32> = (0..SIZE)
375 .map(|i| ((i * 1103515245 + 12345) % 1000000) as u32)
376 .collect();
377
378 let buffer = match ctx
379 .provider
380 .create_buffer_from_slice::<u32>(&data, schema.clone())
381 {
382 Ok(buf) => buf,
383 Err(e) => {
384 return TestResult::error(
385 "test_sort_reproducibility",
386 start.elapsed(),
387 format!("Failed to create buffer: {}", e),
388 )
389 }
390 };
391
392 let first_sorted = match ctx.provider.sort(&buffer, &[0]) {
394 Ok(s) => s,
395 Err(e) => {
396 return TestResult::error(
397 "test_sort_reproducibility",
398 start.elapsed(),
399 format!("First sort failed: {}", e),
400 )
401 }
402 };
403
404 let first_result = match ctx.provider.download_column::<u32>(&first_sorted, 0) {
405 Ok(d) => d,
406 Err(e) => {
407 return TestResult::error(
408 "test_sort_reproducibility",
409 start.elapsed(),
410 format!("Failed to download first sort result: {}", e),
411 )
412 }
413 };
414
415 for i in 1..first_result.len() {
417 if first_result[i] < first_result[i - 1] {
418 return TestResult::error(
419 "test_sort_reproducibility",
420 start.elapsed(),
421 format!(
422 "First sort result not sorted at index {}: {} < {}",
423 i,
424 first_result[i],
425 first_result[i - 1]
426 ),
427 );
428 }
429 }
430
431 for iteration in 1..NUM_ITERATIONS {
433 let sorted = match ctx.provider.sort(&buffer, &[0]) {
434 Ok(s) => s,
435 Err(e) => {
436 return TestResult::error(
437 "test_sort_reproducibility",
438 start.elapsed(),
439 format!("Sort iteration {} failed: {}", iteration, e),
440 )
441 }
442 };
443
444 let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
445 Ok(d) => d,
446 Err(e) => {
447 return TestResult::error(
448 "test_sort_reproducibility",
449 start.elapsed(),
450 format!("Failed to download iteration {} result: {}", iteration, e),
451 )
452 }
453 };
454
455 if result.len() != first_result.len() {
457 return TestResult::error(
458 "test_sort_reproducibility",
459 start.elapsed(),
460 format!(
461 "Iteration {} produced {} rows, first produced {}",
462 iteration,
463 result.len(),
464 first_result.len()
465 ),
466 );
467 }
468
469 for (i, (&a, &b)) in first_result.iter().zip(result.iter()).enumerate() {
470 if a != b {
471 return TestResult::error(
472 "test_sort_reproducibility",
473 start.elapsed(),
474 format!(
475 "Iteration {} differs from first at index {}: {} vs {}",
476 iteration, i, a, b
477 ),
478 );
479 }
480 }
481 }
482
483 let buffer2 = match ctx
485 .provider
486 .create_buffer_from_slice::<u32>(&data, schema.clone())
487 {
488 Ok(buf) => buf,
489 Err(e) => {
490 return TestResult::error(
491 "test_sort_reproducibility",
492 start.elapsed(),
493 format!("Failed to create second buffer: {}", e),
494 )
495 }
496 };
497
498 let sorted2 = match ctx.provider.sort(&buffer2, &[0]) {
499 Ok(s) => s,
500 Err(e) => {
501 return TestResult::error(
502 "test_sort_reproducibility",
503 start.elapsed(),
504 format!("Sort on fresh buffer failed: {}", e),
505 )
506 }
507 };
508
509 let result2 = match ctx.provider.download_column::<u32>(&sorted2, 0) {
510 Ok(d) => d,
511 Err(e) => {
512 return TestResult::error(
513 "test_sort_reproducibility",
514 start.elapsed(),
515 format!("Failed to download fresh buffer sort result: {}", e),
516 )
517 }
518 };
519
520 if result2 != first_result {
521 return TestResult::error(
522 "test_sort_reproducibility",
523 start.elapsed(),
524 "Sort on fresh buffer produced different result than original".to_string(),
525 );
526 }
527
528 if let Err(e) = ctx.sync_and_check() {
529 return TestResult::error(
530 "test_sort_reproducibility",
531 start.elapsed(),
532 format!("Sync failed: {}", e),
533 );
534 }
535
536 TestResult::passed("test_sort_reproducibility", start.elapsed())
537}
538
539fn test_filter_reproducibility(ctx: &TestContext) -> TestResult {
544 let start = Instant::now();
545 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
546
547 const SIZE: usize = 10000;
548 const NUM_ITERATIONS: usize = 5;
549
550 let data: Vec<u32> = (0..SIZE as u32).collect();
552
553 let mask: Vec<u8> = (0..SIZE)
555 .map(|i| if (i * 7 + 3) % 10 < 3 { 1 } else { 0 })
556 .collect();
557
558 let buffer = match ctx
559 .provider
560 .create_buffer_from_slice::<u32>(&data, schema.clone())
561 {
562 Ok(buf) => buf,
563 Err(e) => {
564 return TestResult::error(
565 "test_filter_reproducibility",
566 start.elapsed(),
567 format!("Failed to create buffer: {}", e),
568 )
569 }
570 };
571
572 let first_filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
574 Ok(f) => f,
575 Err(e) => {
576 return TestResult::error(
577 "test_filter_reproducibility",
578 start.elapsed(),
579 format!("First filter failed: {}", e),
580 )
581 }
582 };
583
584 let first_result = match ctx.provider.download_column::<u32>(&first_filtered, 0) {
585 Ok(d) => d,
586 Err(e) => {
587 return TestResult::error(
588 "test_filter_reproducibility",
589 start.elapsed(),
590 format!("Failed to download first filter result: {}", e),
591 )
592 }
593 };
594
595 let expected_count: usize = mask.iter().map(|&m| m as usize).sum();
597 if first_result.len() != expected_count {
598 return TestResult::error(
599 "test_filter_reproducibility",
600 start.elapsed(),
601 format!(
602 "First filter returned {} rows, expected {}",
603 first_result.len(),
604 expected_count
605 ),
606 );
607 }
608
609 for iteration in 1..NUM_ITERATIONS {
611 let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
612 Ok(f) => f,
613 Err(e) => {
614 return TestResult::error(
615 "test_filter_reproducibility",
616 start.elapsed(),
617 format!("Filter iteration {} failed: {}", iteration, e),
618 )
619 }
620 };
621
622 let result = match ctx.provider.download_column::<u32>(&filtered, 0) {
623 Ok(d) => d,
624 Err(e) => {
625 return TestResult::error(
626 "test_filter_reproducibility",
627 start.elapsed(),
628 format!("Failed to download iteration {} result: {}", iteration, e),
629 )
630 }
631 };
632
633 if result != first_result {
635 let first_diff = result
636 .iter()
637 .zip(first_result.iter())
638 .position(|(a, b)| a != b);
639 return TestResult::error(
640 "test_filter_reproducibility",
641 start.elapsed(),
642 format!(
643 "Filter iteration {} differs from first (first diff at {:?})",
644 iteration, first_diff
645 ),
646 );
647 }
648 }
649
650 let test_masks: Vec<(String, Vec<u8>)> = vec![
652 (
653 "10%".to_string(),
654 (0..SIZE).map(|i| if i % 10 == 0 { 1 } else { 0 }).collect(),
655 ),
656 (
657 "50%".to_string(),
658 (0..SIZE).map(|i| if i % 2 == 0 { 1 } else { 0 }).collect(),
659 ),
660 (
661 "90%".to_string(),
662 (0..SIZE).map(|i| if i % 10 != 0 { 1 } else { 0 }).collect(),
663 ),
664 ];
665
666 for (name, test_mask) in test_masks {
667 let baseline = match ctx.provider.filter_by_mask(&buffer, &test_mask) {
668 Ok(f) => f,
669 Err(e) => {
670 return TestResult::error(
671 "test_filter_reproducibility",
672 start.elapsed(),
673 format!("Baseline filter {} failed: {}", name, e),
674 )
675 }
676 };
677
678 let baseline_data = match ctx.provider.download_column::<u32>(&baseline, 0) {
679 Ok(d) => d,
680 Err(e) => {
681 return TestResult::error(
682 "test_filter_reproducibility",
683 start.elapsed(),
684 format!("Failed to download {} baseline: {}", name, e),
685 )
686 }
687 };
688
689 let repeat = match ctx.provider.filter_by_mask(&buffer, &test_mask) {
691 Ok(f) => f,
692 Err(e) => {
693 return TestResult::error(
694 "test_filter_reproducibility",
695 start.elapsed(),
696 format!("Repeat filter {} failed: {}", name, e),
697 )
698 }
699 };
700
701 let repeat_data = match ctx.provider.download_column::<u32>(&repeat, 0) {
702 Ok(d) => d,
703 Err(e) => {
704 return TestResult::error(
705 "test_filter_reproducibility",
706 start.elapsed(),
707 format!("Failed to download {} repeat: {}", name, e),
708 )
709 }
710 };
711
712 if baseline_data != repeat_data {
713 return TestResult::error(
714 "test_filter_reproducibility",
715 start.elapsed(),
716 format!("Filter {} produced different results on repeat", name),
717 );
718 }
719 }
720
721 if let Err(e) = ctx.sync_and_check() {
722 return TestResult::error(
723 "test_filter_reproducibility",
724 start.elapsed(),
725 format!("Sync failed: {}", e),
726 );
727 }
728
729 TestResult::passed("test_filter_reproducibility", start.elapsed())
730}
731
732fn test_join_reproducibility(ctx: &TestContext) -> TestResult {
737 let start = Instant::now();
738
739 let left_schema = Schema::new(vec![
740 ("key".to_string(), ScalarType::U32),
741 ("lval".to_string(), ScalarType::U32),
742 ]);
743 let right_schema = Schema::new(vec![
744 ("key".to_string(), ScalarType::U32),
745 ("rval".to_string(), ScalarType::U32),
746 ]);
747
748 const LEFT_SIZE: usize = 5000;
749 const RIGHT_SIZE: usize = 3000;
750 const NUM_ITERATIONS: usize = 5;
751
752 let left_keys: Vec<u32> = (0..LEFT_SIZE).map(|i| (i * 3) as u32).collect();
754 let left_vals: Vec<u32> = left_keys.iter().map(|&k| k * 10).collect();
755
756 let right_keys: Vec<u32> = (0..RIGHT_SIZE).map(|i| (i * 5) as u32).collect();
758 let right_vals: Vec<u32> = right_keys.iter().map(|&k| k * 100).collect();
759
760 let left_buffer = match ctx
761 .provider
762 .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema.clone())
763 {
764 Ok(buf) => buf,
765 Err(e) => {
766 return TestResult::error(
767 "test_join_reproducibility",
768 start.elapsed(),
769 format!("Failed to create left buffer: {}", e),
770 )
771 }
772 };
773
774 let right_buffer = match ctx
775 .provider
776 .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema.clone())
777 {
778 Ok(buf) => buf,
779 Err(e) => {
780 return TestResult::error(
781 "test_join_reproducibility",
782 start.elapsed(),
783 format!("Failed to create right buffer: {}", e),
784 )
785 }
786 };
787
788 let first_joined = match ctx
790 .provider
791 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
792 {
793 Ok(j) => j,
794 Err(e) => {
795 return TestResult::error(
796 "test_join_reproducibility",
797 start.elapsed(),
798 format!("First join failed: {}", e),
799 )
800 }
801 };
802
803 let first_keys = match ctx.provider.download_column::<u32>(&first_joined, 0) {
804 Ok(d) => d,
805 Err(e) => {
806 return TestResult::error(
807 "test_join_reproducibility",
808 start.elapsed(),
809 format!("Failed to download first join keys: {}", e),
810 )
811 }
812 };
813
814 let first_lvals = match ctx.provider.download_column::<u32>(&first_joined, 1) {
815 Ok(d) => d,
816 Err(e) => {
817 return TestResult::error(
818 "test_join_reproducibility",
819 start.elapsed(),
820 format!("Failed to download first join lvals: {}", e),
821 )
822 }
823 };
824
825 let first_rvals = match ctx.provider.download_column::<u32>(&first_joined, 2) {
826 Ok(d) => d,
827 Err(e) => {
828 return TestResult::error(
829 "test_join_reproducibility",
830 start.elapsed(),
831 format!("Failed to download first join rvals: {}", e),
832 )
833 }
834 };
835
836 for iteration in 1..NUM_ITERATIONS {
838 let joined = match ctx
839 .provider
840 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
841 {
842 Ok(j) => j,
843 Err(e) => {
844 return TestResult::error(
845 "test_join_reproducibility",
846 start.elapsed(),
847 format!("Join iteration {} failed: {}", iteration, e),
848 )
849 }
850 };
851
852 if ctx.device_row_count(&joined) != ctx.device_row_count(&first_joined) {
854 return TestResult::error(
855 "test_join_reproducibility",
856 start.elapsed(),
857 format!(
858 "Iteration {} returned {} rows, first returned {}",
859 iteration,
860 ctx.device_row_count(&joined),
861 ctx.device_row_count(&first_joined)
862 ),
863 );
864 }
865
866 let keys = match ctx.provider.download_column::<u32>(&joined, 0) {
867 Ok(d) => d,
868 Err(e) => {
869 return TestResult::error(
870 "test_join_reproducibility",
871 start.elapsed(),
872 format!("Failed to download iteration {} keys: {}", iteration, e),
873 )
874 }
875 };
876
877 let lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
878 Ok(d) => d,
879 Err(e) => {
880 return TestResult::error(
881 "test_join_reproducibility",
882 start.elapsed(),
883 format!("Failed to download iteration {} lvals: {}", iteration, e),
884 )
885 }
886 };
887
888 let rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
889 Ok(d) => d,
890 Err(e) => {
891 return TestResult::error(
892 "test_join_reproducibility",
893 start.elapsed(),
894 format!("Failed to download iteration {} rvals: {}", iteration, e),
895 )
896 }
897 };
898
899 let first_tuples: HashSet<(u32, u32, u32)> = first_keys
901 .iter()
902 .zip(first_lvals.iter())
903 .zip(first_rvals.iter())
904 .map(|((&k, &l), &r)| (k, l, r))
905 .collect();
906
907 let iter_tuples: HashSet<(u32, u32, u32)> = keys
908 .iter()
909 .zip(lvals.iter())
910 .zip(rvals.iter())
911 .map(|((&k, &l), &r)| (k, l, r))
912 .collect();
913
914 if first_tuples != iter_tuples {
915 return TestResult::error(
916 "test_join_reproducibility",
917 start.elapsed(),
918 format!(
919 "Iteration {} produced different tuples: {} vs {} unique",
920 iteration,
921 iter_tuples.len(),
922 first_tuples.len()
923 ),
924 );
925 }
926 }
927
928 if let Err(e) = ctx.sync_and_check() {
929 return TestResult::error(
930 "test_join_reproducibility",
931 start.elapsed(),
932 format!("Sync failed: {}", e),
933 );
934 }
935
936 TestResult::passed("test_join_reproducibility", start.elapsed())
937}
938
939fn test_dedup_reproducibility(ctx: &TestContext) -> TestResult {
944 let start = Instant::now();
945 let schema = Schema::new(vec![
946 ("key".to_string(), ScalarType::U32),
947 ("val".to_string(), ScalarType::U32),
948 ]);
949
950 const SIZE: usize = 10000;
951 const NUM_ITERATIONS: usize = 5;
952
953 let keys: Vec<u32> = (0..SIZE).map(|i| (i % 1000) as u32).collect();
955 let vals: Vec<u32> = (0..SIZE as u32).collect();
956
957 let buffer = match ctx
958 .provider
959 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
960 {
961 Ok(buf) => buf,
962 Err(e) => {
963 return TestResult::error(
964 "test_dedup_reproducibility",
965 start.elapsed(),
966 format!("Failed to create buffer: {}", e),
967 )
968 }
969 };
970
971 let first_deduped = match ctx.provider.dedup(&buffer, &[0]) {
973 Ok(d) => d,
974 Err(e) => {
975 return TestResult::error(
976 "test_dedup_reproducibility",
977 start.elapsed(),
978 format!("First dedup failed: {}", e),
979 )
980 }
981 };
982
983 let first_keys = match ctx.provider.download_column::<u32>(&first_deduped, 0) {
984 Ok(d) => d,
985 Err(e) => {
986 return TestResult::error(
987 "test_dedup_reproducibility",
988 start.elapsed(),
989 format!("Failed to download first dedup keys: {}", e),
990 )
991 }
992 };
993
994 let unique_keys: HashSet<u32> = keys.iter().copied().collect();
996 if first_keys.len() != unique_keys.len() {
997 return TestResult::error(
998 "test_dedup_reproducibility",
999 start.elapsed(),
1000 format!(
1001 "First dedup returned {} rows, expected {}",
1002 first_keys.len(),
1003 unique_keys.len()
1004 ),
1005 );
1006 }
1007
1008 let first_key_set: HashSet<u32> = first_keys.iter().copied().collect();
1010 if first_key_set.len() != first_keys.len() {
1011 return TestResult::error(
1012 "test_dedup_reproducibility",
1013 start.elapsed(),
1014 "First dedup result contains duplicates".to_string(),
1015 );
1016 }
1017
1018 for iteration in 1..NUM_ITERATIONS {
1020 let deduped = match ctx.provider.dedup(&buffer, &[0]) {
1021 Ok(d) => d,
1022 Err(e) => {
1023 return TestResult::error(
1024 "test_dedup_reproducibility",
1025 start.elapsed(),
1026 format!("Dedup iteration {} failed: {}", iteration, e),
1027 )
1028 }
1029 };
1030
1031 if ctx.device_row_count(&deduped) != ctx.device_row_count(&first_deduped) {
1033 return TestResult::error(
1034 "test_dedup_reproducibility",
1035 start.elapsed(),
1036 format!(
1037 "Iteration {} returned {} rows, first returned {}",
1038 iteration,
1039 ctx.device_row_count(&deduped),
1040 ctx.device_row_count(&first_deduped)
1041 ),
1042 );
1043 }
1044
1045 let iter_keys = match ctx.provider.download_column::<u32>(&deduped, 0) {
1046 Ok(d) => d,
1047 Err(e) => {
1048 return TestResult::error(
1049 "test_dedup_reproducibility",
1050 start.elapsed(),
1051 format!("Failed to download iteration {} keys: {}", iteration, e),
1052 )
1053 }
1054 };
1055
1056 let iter_key_set: HashSet<u32> = iter_keys.iter().copied().collect();
1058 if first_key_set != iter_key_set {
1059 return TestResult::error(
1060 "test_dedup_reproducibility",
1061 start.elapsed(),
1062 format!(
1063 "Iteration {} produced different unique keys: {} vs {}",
1064 iteration,
1065 iter_key_set.len(),
1066 first_key_set.len()
1067 ),
1068 );
1069 }
1070 }
1071
1072 let test_patterns: Vec<(&str, Vec<u32>)> = vec![
1074 ("all_same", vec![42; 5000]),
1075 ("pairs", (0..2500u32).flat_map(|i| vec![i, i]).collect()),
1076 (
1077 "random_dups",
1078 (0..5000usize)
1079 .map(|i| ((i * 1103515245 + 12345) % 500) as u32)
1080 .collect(),
1081 ),
1082 ];
1083
1084 for (name, pattern_keys) in test_patterns {
1085 let pattern_vals: Vec<u32> = (0..pattern_keys.len() as u32).collect();
1086 let pattern_buffer = match ctx
1087 .provider
1088 .create_buffer_from_u32_columns(&[&pattern_keys, &pattern_vals], schema.clone())
1089 {
1090 Ok(buf) => buf,
1091 Err(e) => {
1092 return TestResult::error(
1093 "test_dedup_reproducibility",
1094 start.elapsed(),
1095 format!("Failed to create {} buffer: {}", name, e),
1096 )
1097 }
1098 };
1099
1100 let baseline = match ctx.provider.dedup(&pattern_buffer, &[0]) {
1101 Ok(d) => d,
1102 Err(e) => {
1103 return TestResult::error(
1104 "test_dedup_reproducibility",
1105 start.elapsed(),
1106 format!("{} baseline dedup failed: {}", name, e),
1107 )
1108 }
1109 };
1110
1111 let baseline_keys = match ctx.provider.download_column::<u32>(&baseline, 0) {
1112 Ok(d) => d,
1113 Err(e) => {
1114 return TestResult::error(
1115 "test_dedup_reproducibility",
1116 start.elapsed(),
1117 format!("Failed to download {} baseline: {}", name, e),
1118 )
1119 }
1120 };
1121
1122 let repeat = match ctx.provider.dedup(&pattern_buffer, &[0]) {
1123 Ok(d) => d,
1124 Err(e) => {
1125 return TestResult::error(
1126 "test_dedup_reproducibility",
1127 start.elapsed(),
1128 format!("{} repeat dedup failed: {}", name, e),
1129 )
1130 }
1131 };
1132
1133 let repeat_keys = match ctx.provider.download_column::<u32>(&repeat, 0) {
1134 Ok(d) => d,
1135 Err(e) => {
1136 return TestResult::error(
1137 "test_dedup_reproducibility",
1138 start.elapsed(),
1139 format!("Failed to download {} repeat: {}", name, e),
1140 )
1141 }
1142 };
1143
1144 let baseline_set: HashSet<u32> = baseline_keys.iter().copied().collect();
1145 let repeat_set: HashSet<u32> = repeat_keys.iter().copied().collect();
1146
1147 if baseline_set != repeat_set {
1148 return TestResult::error(
1149 "test_dedup_reproducibility",
1150 start.elapsed(),
1151 format!("{} dedup produced different results on repeat", name),
1152 );
1153 }
1154 }
1155
1156 if let Err(e) = ctx.sync_and_check() {
1157 return TestResult::error(
1158 "test_dedup_reproducibility",
1159 start.elapsed(),
1160 format!("Sync failed: {}", e),
1161 );
1162 }
1163
1164 TestResult::passed("test_dedup_reproducibility", start.elapsed())
1165}
1166
1167fn test_stable_sort_order(ctx: &TestContext) -> TestResult {
1172 let start = Instant::now();
1173 let schema = Schema::new(vec![
1174 ("key".to_string(), ScalarType::U32),
1175 ("val".to_string(), ScalarType::U32),
1176 ]);
1177
1178 let mut keys: Vec<u32> = Vec::new();
1181 let mut vals: Vec<u32> = Vec::new();
1182
1183 for key in 0..100u32 {
1185 for instance in 0..10u32 {
1186 keys.push(key);
1187 vals.push(key * 100 + instance); }
1189 }
1190
1191 let buffer = match ctx
1192 .provider
1193 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
1194 {
1195 Ok(buf) => buf,
1196 Err(e) => {
1197 return TestResult::error(
1198 "test_stable_sort_order",
1199 start.elapsed(),
1200 format!("Failed to create buffer: {}", e),
1201 )
1202 }
1203 };
1204
1205 let sorted = match ctx.provider.sort(&buffer, &[0]) {
1207 Ok(s) => s,
1208 Err(e) => {
1209 return TestResult::error(
1210 "test_stable_sort_order",
1211 start.elapsed(),
1212 format!("Sort failed: {}", e),
1213 )
1214 }
1215 };
1216
1217 let sorted_keys = match ctx.provider.download_column::<u32>(&sorted, 0) {
1218 Ok(d) => d,
1219 Err(e) => {
1220 return TestResult::error(
1221 "test_stable_sort_order",
1222 start.elapsed(),
1223 format!("Failed to download sorted keys: {}", e),
1224 )
1225 }
1226 };
1227
1228 let sorted_vals = match ctx.provider.download_column::<u32>(&sorted, 1) {
1229 Ok(d) => d,
1230 Err(e) => {
1231 return TestResult::error(
1232 "test_stable_sort_order",
1233 start.elapsed(),
1234 format!("Failed to download sorted vals: {}", e),
1235 )
1236 }
1237 };
1238
1239 for i in 1..sorted_keys.len() {
1241 if sorted_keys[i] < sorted_keys[i - 1] {
1242 return TestResult::error(
1243 "test_stable_sort_order",
1244 start.elapsed(),
1245 format!(
1246 "Keys not sorted at index {}: {} < {}",
1247 i,
1248 sorted_keys[i],
1249 sorted_keys[i - 1]
1250 ),
1251 );
1252 }
1253 }
1254
1255 let mut current_key = sorted_keys[0];
1261 let mut group_start = 0;
1262
1263 for i in 1..=sorted_keys.len() {
1264 let at_end = i == sorted_keys.len();
1265 let key_changed = !at_end && sorted_keys[i] != current_key;
1266
1267 if at_end || key_changed {
1268 for j in (group_start + 1)..i {
1270 if sorted_vals[j] < sorted_vals[j - 1] {
1273 }
1277 }
1278
1279 if !at_end {
1280 current_key = sorted_keys[i];
1281 group_start = i;
1282 }
1283 }
1284 }
1285
1286 let mut key_counts: std::collections::HashMap<u32, usize> = std::collections::HashMap::new();
1288 for &key in &sorted_keys {
1289 *key_counts.entry(key).or_insert(0) += 1;
1290 }
1291
1292 for (&key, &count) in &key_counts {
1293 if count != 10 {
1294 return TestResult::error(
1295 "test_stable_sort_order",
1296 start.elapsed(),
1297 format!("Key {} appears {} times, expected 10", key, count),
1298 );
1299 }
1300 }
1301
1302 let sorted2 = match ctx.provider.sort(&buffer, &[0]) {
1304 Ok(s) => s,
1305 Err(e) => {
1306 return TestResult::error(
1307 "test_stable_sort_order",
1308 start.elapsed(),
1309 format!("Second sort failed: {}", e),
1310 )
1311 }
1312 };
1313
1314 let sorted_keys2 = match ctx.provider.download_column::<u32>(&sorted2, 0) {
1315 Ok(d) => d,
1316 Err(e) => {
1317 return TestResult::error(
1318 "test_stable_sort_order",
1319 start.elapsed(),
1320 format!("Failed to download second sorted keys: {}", e),
1321 )
1322 }
1323 };
1324
1325 let sorted_vals2 = match ctx.provider.download_column::<u32>(&sorted2, 1) {
1326 Ok(d) => d,
1327 Err(e) => {
1328 return TestResult::error(
1329 "test_stable_sort_order",
1330 start.elapsed(),
1331 format!("Failed to download second sorted vals: {}", e),
1332 )
1333 }
1334 };
1335
1336 if sorted_keys != sorted_keys2 {
1338 return TestResult::error(
1339 "test_stable_sort_order",
1340 start.elapsed(),
1341 "Two sorts produced different key orderings".to_string(),
1342 );
1343 }
1344
1345 if sorted_vals != sorted_vals2 {
1346 return TestResult::error(
1347 "test_stable_sort_order",
1348 start.elapsed(),
1349 "Two sorts produced different val orderings (sort may not be deterministic)"
1350 .to_string(),
1351 );
1352 }
1353
1354 if let Err(e) = ctx.sync_and_check() {
1355 return TestResult::error(
1356 "test_stable_sort_order",
1357 start.elapsed(),
1358 format!("Sync failed: {}", e),
1359 );
1360 }
1361
1362 TestResult::passed("test_stable_sort_order", start.elapsed())
1363}