1use crate::harness::TestContext;
18use std::collections::{HashMap, HashSet};
19use xlog_core::{ScalarType, Schema};
20use xlog_cuda::CompareOp;
21
22pub fn prop_sort_stability(
32 ctx: &TestContext,
33 keys: Vec<u32>,
34 vals: Vec<u32>,
35) -> Result<(), String> {
36 if keys.is_empty() || keys.len() != vals.len() {
37 return Ok(()); }
39
40 let schema = Schema::new(vec![
41 ("key".to_string(), ScalarType::U32),
42 ("val".to_string(), ScalarType::U32),
43 ]);
44
45 let buffer = ctx
47 .provider
48 .create_buffer_from_u32_columns(&[&keys, &vals], schema)
49 .map_err(|e| format!("Buffer creation failed: {}", e))?;
50
51 let sorted = ctx
53 .provider
54 .sort(&buffer, &[0])
55 .map_err(|e| format!("Sort failed: {}", e))?;
56
57 let sorted_keys = ctx
59 .provider
60 .download_column::<u32>(&sorted, 0)
61 .map_err(|e| format!("Download keys failed: {}", e))?;
62 let sorted_vals = ctx
63 .provider
64 .download_column::<u32>(&sorted, 1)
65 .map_err(|e| format!("Download vals failed: {}", e))?;
66
67 for i in 1..sorted_keys.len() {
69 if sorted_keys[i] < sorted_keys[i - 1] {
70 return Err(format!(
71 "Keys not sorted at index {}: {} < {}",
72 i,
73 sorted_keys[i],
74 sorted_keys[i - 1]
75 ));
76 }
77 }
78
79 let mut original_counts: HashMap<(u32, u32), usize> = HashMap::new();
81 for (&k, &v) in keys.iter().zip(vals.iter()) {
82 *original_counts.entry((k, v)).or_insert(0) += 1;
83 }
84
85 let mut sorted_counts: HashMap<(u32, u32), usize> = HashMap::new();
86 for (&k, &v) in sorted_keys.iter().zip(sorted_vals.iter()) {
87 *sorted_counts.entry((k, v)).or_insert(0) += 1;
88 }
89
90 if original_counts != sorted_counts {
91 return Err("Sort changed data - counts don't match".to_string());
92 }
93
94 Ok(())
95}
96
97pub fn prop_join_correctness(
106 ctx: &TestContext,
107 left_keys: Vec<u32>,
108 left_vals: Vec<u32>,
109 right_keys: Vec<u32>,
110 right_vals: Vec<u32>,
111) -> Result<(), String> {
112 if left_keys.len() != left_vals.len() || right_keys.len() != right_vals.len() {
113 return Ok(()); }
115
116 let left_schema = Schema::new(vec![
117 ("key".to_string(), ScalarType::U32),
118 ("lval".to_string(), ScalarType::U32),
119 ]);
120 let right_schema = Schema::new(vec![
121 ("key".to_string(), ScalarType::U32),
122 ("rval".to_string(), ScalarType::U32),
123 ]);
124
125 if left_keys.is_empty() {
127 let left_buffer = ctx
128 .provider
129 .create_empty_buffer(left_schema)
130 .map_err(|e| format!("Failed to create empty left buffer: {}", e))?;
131
132 if right_keys.is_empty() {
133 let right_buffer = ctx
134 .provider
135 .create_empty_buffer(right_schema)
136 .map_err(|e| format!("Failed to create empty right buffer: {}", e))?;
137
138 let joined = ctx
139 .provider
140 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
141 .map_err(|e| format!("Join failed: {}", e))?;
142
143 if ctx.device_row_count(&joined) != 0 {
144 return Err(format!(
145 "Empty tables should produce 0 rows, got {}",
146 ctx.device_row_count(&joined)
147 ));
148 }
149 return Ok(());
150 }
151
152 let right_buffer = ctx
153 .provider
154 .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema)
155 .map_err(|e| format!("Failed to create right buffer: {}", e))?;
156
157 let joined = ctx
158 .provider
159 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
160 .map_err(|e| format!("Join failed: {}", e))?;
161
162 if ctx.device_row_count(&joined) != 0 {
163 return Err(format!(
164 "Empty left should produce 0 rows, got {}",
165 ctx.device_row_count(&joined)
166 ));
167 }
168 return Ok(());
169 }
170
171 if right_keys.is_empty() {
172 let left_buffer = ctx
173 .provider
174 .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema)
175 .map_err(|e| format!("Failed to create left buffer: {}", e))?;
176 let right_buffer = ctx
177 .provider
178 .create_empty_buffer(right_schema)
179 .map_err(|e| format!("Failed to create empty right buffer: {}", e))?;
180
181 let joined = ctx
182 .provider
183 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
184 .map_err(|e| format!("Join failed: {}", e))?;
185
186 if ctx.device_row_count(&joined) != 0 {
187 return Err(format!(
188 "Empty right should produce 0 rows, got {}",
189 ctx.device_row_count(&joined)
190 ));
191 }
192 return Ok(());
193 }
194
195 let left_buffer = ctx
196 .provider
197 .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema)
198 .map_err(|e| format!("Failed to create left buffer: {}", e))?;
199
200 let right_buffer = ctx
201 .provider
202 .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema)
203 .map_err(|e| format!("Failed to create right buffer: {}", e))?;
204
205 let joined = ctx
207 .provider
208 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
209 .map_err(|e| format!("Join failed: {}", e))?;
210
211 let result_keys = ctx
213 .provider
214 .download_column::<u32>(&joined, 0)
215 .map_err(|e| format!("Download result keys failed: {}", e))?;
216 let result_lvals = ctx
217 .provider
218 .download_column::<u32>(&joined, 1)
219 .map_err(|e| format!("Download result lvals failed: {}", e))?;
220 let result_rvals = ctx
221 .provider
222 .download_column::<u32>(&joined, 2)
223 .map_err(|e| format!("Download result rvals failed: {}", e))?;
224
225 let mut right_by_key: HashMap<u32, Vec<u32>> = HashMap::new();
227 for (&k, &v) in right_keys.iter().zip(right_vals.iter()) {
228 right_by_key.entry(k).or_default().push(v);
229 }
230
231 let mut expected: HashMap<(u32, u32, u32), usize> = HashMap::new();
233 for (&lk, &lv) in left_keys.iter().zip(left_vals.iter()) {
234 if let Some(right_vals_for_key) = right_by_key.get(&lk) {
235 for &rv in right_vals_for_key {
236 *expected.entry((lk, lv, rv)).or_insert(0) += 1;
237 }
238 }
239 }
240
241 let mut actual: HashMap<(u32, u32, u32), usize> = HashMap::new();
243 for i in 0..result_keys.len() {
244 let tuple = (result_keys[i], result_lvals[i], result_rvals[i]);
245 *actual.entry(tuple).or_insert(0) += 1;
246 }
247
248 if expected != actual {
250 let expected_total: usize = expected.values().sum();
251 let actual_total: usize = actual.values().sum();
252
253 let mut missing: Vec<(u32, u32, u32)> = Vec::new();
254 let mut extra: Vec<(u32, u32, u32)> = Vec::new();
255
256 for (tuple, &count) in &expected {
257 let actual_count = actual.get(tuple).copied().unwrap_or(0);
258 if actual_count < count {
259 for _ in 0..(count - actual_count) {
260 missing.push(*tuple);
261 }
262 }
263 }
264
265 for (tuple, &count) in &actual {
266 let expected_count = expected.get(tuple).copied().unwrap_or(0);
267 if count > expected_count {
268 for _ in 0..(count - expected_count) {
269 extra.push(*tuple);
270 }
271 }
272 }
273
274 let mut msg = format!(
275 "Join result mismatch: expected {} rows, got {}.",
276 expected_total, actual_total
277 );
278 if !missing.is_empty() {
279 msg.push_str(&format!(" Missing: {:?}", &missing[..missing.len().min(5)]));
280 }
281 if !extra.is_empty() {
282 msg.push_str(&format!(" Extra: {:?}", &extra[..extra.len().min(5)]));
283 }
284
285 return Err(msg);
286 }
287
288 Ok(())
289}
290
291pub fn prop_filter_idempotence(
302 ctx: &TestContext,
303 data: Vec<u32>,
304 threshold: u32,
305) -> Result<(), String> {
306 if data.is_empty() {
307 return Ok(()); }
309
310 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
311
312 let buffer = ctx
313 .provider
314 .create_buffer_from_slice::<u32>(&data, schema)
315 .map_err(|e| format!("Buffer creation failed: {}", e))?;
316
317 let filtered_once = ctx
319 .provider
320 .filter::<u32>(&buffer, 0, threshold, CompareOp::Lt)
321 .map_err(|e| format!("First filter failed: {}", e))?;
322
323 let filtered_twice = ctx
325 .provider
326 .filter::<u32>(&filtered_once, 0, threshold, CompareOp::Lt)
327 .map_err(|e| format!("Second filter failed: {}", e))?;
328
329 let once_data = ctx
331 .provider
332 .download_column::<u32>(&filtered_once, 0)
333 .map_err(|e| format!("Download first filter failed: {}", e))?;
334
335 let twice_data = ctx
336 .provider
337 .download_column::<u32>(&filtered_twice, 0)
338 .map_err(|e| format!("Download second filter failed: {}", e))?;
339
340 if once_data.len() != twice_data.len() {
342 return Err(format!(
343 "Filter idempotence violated: once={} rows, twice={} rows",
344 once_data.len(),
345 twice_data.len()
346 ));
347 }
348
349 if once_data != twice_data {
350 return Err(
351 "Filter idempotence violated: data differs after second application".to_string(),
352 );
353 }
354
355 let expected: Vec<u32> = data.iter().filter(|&&v| v < threshold).copied().collect();
357 if once_data != expected {
358 return Err(format!(
359 "First filter incorrect: expected {} rows matching, got {}",
360 expected.len(),
361 once_data.len()
362 ));
363 }
364
365 Ok(())
366}
367
368pub fn prop_dedup_determinism(
377 ctx: &TestContext,
378 keys: Vec<u32>,
379 vals: Vec<u32>,
380) -> Result<(), String> {
381 if keys.is_empty() || keys.len() != vals.len() {
382 return Ok(()); }
384
385 let schema = Schema::new(vec![
386 ("key".to_string(), ScalarType::U32),
387 ("val".to_string(), ScalarType::U32),
388 ]);
389
390 let buffer1 = ctx
392 .provider
393 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
394 .map_err(|e| format!("Buffer1 creation failed: {}", e))?;
395
396 let buffer2 = ctx
397 .provider
398 .create_buffer_from_u32_columns(&[&keys, &vals], schema)
399 .map_err(|e| format!("Buffer2 creation failed: {}", e))?;
400
401 let dedup1 = ctx
402 .provider
403 .dedup(&buffer1, &[0])
404 .map_err(|e| format!("First dedup failed: {}", e))?;
405
406 let dedup2 = ctx
407 .provider
408 .dedup(&buffer2, &[0])
409 .map_err(|e| format!("Second dedup failed: {}", e))?;
410
411 let keys1 = ctx
413 .provider
414 .download_column::<u32>(&dedup1, 0)
415 .map_err(|e| format!("Download dedup1 keys failed: {}", e))?;
416
417 let keys2 = ctx
418 .provider
419 .download_column::<u32>(&dedup2, 0)
420 .map_err(|e| format!("Download dedup2 keys failed: {}", e))?;
421
422 if ctx.device_row_count(&dedup1) != ctx.device_row_count(&dedup2) {
424 return Err(format!(
425 "Dedup row count differs: {} vs {}",
426 ctx.device_row_count(&dedup1),
427 ctx.device_row_count(&dedup2)
428 ));
429 }
430
431 let set1: HashSet<u32> = keys1.iter().copied().collect();
433 let set2: HashSet<u32> = keys2.iter().copied().collect();
434
435 if set1 != set2 {
436 return Err("Dedup key sets differ across runs".to_string());
437 }
438
439 let expected_unique: HashSet<u32> = keys.iter().copied().collect();
441 if set1 != expected_unique {
442 return Err(format!(
443 "Dedup result incorrect: expected {} unique keys, got {}",
444 expected_unique.len(),
445 set1.len()
446 ));
447 }
448
449 if keys1.len() != set1.len() {
451 return Err(format!(
452 "Dedup output contains duplicates: {} rows but only {} unique",
453 keys1.len(),
454 set1.len()
455 ));
456 }
457
458 Ok(())
459}
460
461#[cfg(test)]
466mod tests {
467 use super::*;
468 use proptest::prelude::*;
469
470 fn gpu_proptest_config() -> ProptestConfig {
473 ProptestConfig {
474 cases: 50,
475 max_shrink_iters: 100,
476 failure_persistence: None,
477 ..ProptestConfig::default()
478 }
479 }
480
481 fn get_context() -> Option<TestContext> {
483 match TestContext::new() {
484 Ok(ctx) => Some(ctx),
485 Err(e) => {
486 eprintln!("Skipping property test: no CUDA device ({})", e);
487 None
488 }
489 }
490 }
491
492 fn sort_data_strategy(
498 max_rows: usize,
499 max_key: u32,
500 ) -> impl Strategy<Value = (Vec<u32>, Vec<u32>)> {
501 prop::collection::vec(0..max_key, 1..=max_rows).prop_flat_map(move |keys| {
502 let len = keys.len();
503 prop::collection::vec(any::<u32>(), len..=len)
504 .prop_map(move |vals| (keys.clone(), vals))
505 })
506 }
507
508 fn join_data_strategy(
510 max_rows: usize,
511 max_key: u32,
512 ) -> impl Strategy<Value = (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>)> {
513 (
514 prop::collection::vec(0..max_key, 0..=max_rows),
515 prop::collection::vec(0..max_key, 0..=max_rows),
516 )
517 .prop_flat_map(move |(left_keys, right_keys)| {
518 let left_len = left_keys.len();
519 let right_len = right_keys.len();
520 (
521 Just(left_keys),
522 prop::collection::vec(any::<u32>(), left_len..=left_len),
523 Just(right_keys),
524 prop::collection::vec(any::<u32>(), right_len..=right_len),
525 )
526 })
527 .prop_map(|(lk, lv, rk, rv)| (lk, lv, rk, rv))
528 }
529
530 fn filter_data_strategy(
532 max_rows: usize,
533 max_val: u32,
534 ) -> impl Strategy<Value = (Vec<u32>, u32)> {
535 prop::collection::vec(0..max_val, 1..=max_rows)
536 .prop_flat_map(move |data| (Just(data), 0..max_val))
537 }
538
539 fn dedup_data_strategy(
541 max_rows: usize,
542 max_key: u32,
543 ) -> impl Strategy<Value = (Vec<u32>, Vec<u32>)> {
544 prop::collection::vec(0..max_key, 1..=max_rows).prop_flat_map(|keys| {
545 let len = keys.len();
546 prop::collection::vec(any::<u32>(), len..=len)
547 .prop_map(move |vals| (keys.clone(), vals))
548 })
549 }
550
551 proptest! {
556 #![proptest_config(gpu_proptest_config())]
557
558 #[test]
559 fn test_prop_sort_stability(
560 (keys, vals) in sort_data_strategy(1000, 100)
561 ) {
562 if let Some(ctx) = get_context() {
563 prop_sort_stability(&ctx, keys, vals).map_err(|e| {
564 TestCaseError::Fail(e.into())
565 })?;
566 }
567 }
568
569 #[test]
570 fn test_prop_sort_stability_high_duplicates(
571 (keys, vals) in sort_data_strategy(1000, 10) ) {
573 if let Some(ctx) = get_context() {
574 prop_sort_stability(&ctx, keys, vals).map_err(|e| {
575 TestCaseError::Fail(e.into())
576 })?;
577 }
578 }
579
580 #[test]
581 fn test_prop_sort_stability_large(
582 (keys, vals) in sort_data_strategy(5000, 500)
583 ) {
584 if let Some(ctx) = get_context() {
585 prop_sort_stability(&ctx, keys, vals).map_err(|e| {
586 TestCaseError::Fail(e.into())
587 })?;
588 }
589 }
590 }
591
592 proptest! {
597 #![proptest_config(gpu_proptest_config())]
598
599 #[test]
600 fn test_prop_join_correctness(
601 (lk, lv, rk, rv) in join_data_strategy(500, 100)
602 ) {
603 if let Some(ctx) = get_context() {
604 prop_join_correctness(&ctx, lk, lv, rk, rv).map_err(|e| {
605 TestCaseError::Fail(e.into())
606 })?;
607 }
608 }
609
610 #[test]
611 fn test_prop_join_correctness_high_overlap(
612 (lk, lv, rk, rv) in join_data_strategy(500, 20) ) {
614 if let Some(ctx) = get_context() {
615 prop_join_correctness(&ctx, lk, lv, rk, rv).map_err(|e| {
616 TestCaseError::Fail(e.into())
617 })?;
618 }
619 }
620
621 #[test]
622 fn test_prop_join_correctness_no_overlap(
623 left_size in 1usize..500,
624 right_size in 1usize..500,
625 ) {
626 if let Some(ctx) = get_context() {
627 let lk: Vec<u32> = (0..left_size as u32).collect();
629 let lv: Vec<u32> = lk.iter().map(|k| k * 10).collect();
630 let rk: Vec<u32> = ((left_size as u32)..(left_size as u32 + right_size as u32)).collect();
631 let rv: Vec<u32> = rk.iter().map(|k| k * 100).collect();
632
633 prop_join_correctness(&ctx, lk, lv, rk, rv).map_err(|e| {
634 TestCaseError::Fail(e.into())
635 })?;
636 }
637 }
638 }
639
640 proptest! {
645 #![proptest_config(gpu_proptest_config())]
646
647 #[test]
648 fn test_prop_filter_idempotence(
649 (data, threshold) in filter_data_strategy(1000, 1000)
650 ) {
651 if let Some(ctx) = get_context() {
652 prop_filter_idempotence(&ctx, data, threshold).map_err(|e| {
653 TestCaseError::Fail(e.into())
654 })?;
655 }
656 }
657
658 #[test]
659 fn test_prop_filter_idempotence_all_pass(
660 data in prop::collection::vec(0u32..100, 1..1000)
661 ) {
662 if let Some(ctx) = get_context() {
663 prop_filter_idempotence(&ctx, data, 1000).map_err(|e| {
665 TestCaseError::Fail(e.into())
666 })?;
667 }
668 }
669
670 #[test]
671 fn test_prop_filter_idempotence_none_pass(
672 data in prop::collection::vec(100u32..1000, 1..1000)
673 ) {
674 if let Some(ctx) = get_context() {
675 prop_filter_idempotence(&ctx, data, 50).map_err(|e| {
677 TestCaseError::Fail(e.into())
678 })?;
679 }
680 }
681 }
682
683 proptest! {
688 #![proptest_config(gpu_proptest_config())]
689
690 #[test]
691 fn test_prop_dedup_determinism(
692 (keys, vals) in dedup_data_strategy(1000, 100)
693 ) {
694 if let Some(ctx) = get_context() {
695 prop_dedup_determinism(&ctx, keys, vals).map_err(|e| {
696 TestCaseError::Fail(e.into())
697 })?;
698 }
699 }
700
701 #[test]
702 fn test_prop_dedup_determinism_all_same(
703 size in 1usize..1000,
704 key in any::<u32>(),
705 ) {
706 if let Some(ctx) = get_context() {
707 let keys = vec![key; size];
708 let vals: Vec<u32> = (0..size as u32).collect();
709 prop_dedup_determinism(&ctx, keys, vals).map_err(|e| {
710 TestCaseError::Fail(e.into())
711 })?;
712 }
713 }
714
715 #[test]
716 fn test_prop_dedup_determinism_all_unique(
717 size in 1usize..1000,
718 ) {
719 if let Some(ctx) = get_context() {
720 let keys: Vec<u32> = (0..size as u32).collect();
721 let vals = keys.clone();
722 prop_dedup_determinism(&ctx, keys, vals).map_err(|e| {
723 TestCaseError::Fail(e.into())
724 })?;
725 }
726 }
727 }
728
729 #[test]
734 fn test_sort_stability_single_element() {
735 if let Some(ctx) = get_context() {
736 prop_sort_stability(&ctx, vec![42], vec![100]).unwrap();
737 }
738 }
739
740 #[test]
741 fn test_sort_stability_all_equal_keys() {
742 if let Some(ctx) = get_context() {
743 let keys = vec![5; 100];
744 let vals: Vec<u32> = (0..100).collect();
745 prop_sort_stability(&ctx, keys, vals).unwrap();
746 }
747 }
748
749 #[test]
750 fn test_join_correctness_empty_tables() {
751 if let Some(ctx) = get_context() {
752 prop_join_correctness(&ctx, vec![], vec![], vec![], vec![]).unwrap();
753 }
754 }
755
756 #[test]
757 fn test_join_correctness_single_match() {
758 if let Some(ctx) = get_context() {
759 prop_join_correctness(&ctx, vec![1], vec![10], vec![1], vec![100]).unwrap();
760 }
761 }
762
763 #[test]
764 fn test_filter_idempotence_single_element() {
765 if let Some(ctx) = get_context() {
766 prop_filter_idempotence(&ctx, vec![50], 100).unwrap();
767 prop_filter_idempotence(&ctx, vec![150], 100).unwrap();
768 }
769 }
770
771 #[test]
772 fn test_dedup_determinism_single_element() {
773 if let Some(ctx) = get_context() {
774 prop_dedup_determinism(&ctx, vec![42], vec![100]).unwrap();
775 }
776 }
777}