1use crate::harness::{CategoryResult, TestContext, TestResult};
7use std::time::Instant;
8use xlog_core::{ScalarType, Schema};
9
10pub fn run_all(ctx: &TestContext) -> CategoryResult {
12 let mut results = CategoryResult::new("c08_synchronization");
13 let start = Instant::now();
14
15 results.add_result(test_hash_join_atomics(ctx));
16 results.add_result(test_filter_scan_sync(ctx));
17 results.add_result(test_sort_barrier_correctness(ctx));
18 results.add_result(test_dedup_atomic_marking(ctx));
19 results.add_result(test_concurrent_operations(ctx));
20
21 results.set_duration(start.elapsed());
22 results
23}
24
25fn test_hash_join_atomics(ctx: &TestContext) -> TestResult {
30 let start = Instant::now();
31
32 let left_schema = Schema::new(vec![
33 ("key".to_string(), ScalarType::U32),
34 ("lval".to_string(), ScalarType::U32),
35 ]);
36 let right_schema = Schema::new(vec![
37 ("key".to_string(), ScalarType::U32),
38 ("rval".to_string(), ScalarType::U32),
39 ]);
40
41 const LEFT_SIZE: usize = 10000;
43 let left_keys: Vec<u32> = (0..LEFT_SIZE as u32).collect();
44 let left_vals: Vec<u32> = (0..LEFT_SIZE as u32).map(|i| i * 10).collect();
45
46 const RIGHT_SIZE: usize = 5000;
48 let right_keys: Vec<u32> = (0..RIGHT_SIZE as u32).map(|i| i * 2).collect();
49 let right_vals: Vec<u32> = (0..RIGHT_SIZE as u32).map(|i| i * 100).collect();
50
51 let left_buffer = match ctx
52 .provider
53 .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema.clone())
54 {
55 Ok(buf) => buf,
56 Err(e) => {
57 return TestResult::error(
58 "test_hash_join_atomics",
59 start.elapsed(),
60 format!("Failed to create left buffer: {}", e),
61 )
62 }
63 };
64
65 let right_buffer = match ctx
66 .provider
67 .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema.clone())
68 {
69 Ok(buf) => buf,
70 Err(e) => {
71 return TestResult::error(
72 "test_hash_join_atomics",
73 start.elapsed(),
74 format!("Failed to create right buffer: {}", e),
75 )
76 }
77 };
78
79 let joined = match ctx
81 .provider
82 .hash_join(&left_buffer, &right_buffer, &[0], &[0])
83 {
84 Ok(j) => j,
85 Err(e) => {
86 return TestResult::error(
87 "test_hash_join_atomics",
88 start.elapsed(),
89 format!("Hash join failed: {}", e),
90 )
91 }
92 };
93
94 let expected_matches = RIGHT_SIZE;
96 if ctx.device_row_count(&joined) != expected_matches as u64 {
97 return TestResult::error(
98 "test_hash_join_atomics",
99 start.elapsed(),
100 format!(
101 "Join returned {} rows, expected {}",
102 ctx.device_row_count(&joined),
103 expected_matches
104 ),
105 );
106 }
107
108 let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
110 Ok(d) => d,
111 Err(e) => {
112 return TestResult::error(
113 "test_hash_join_atomics",
114 start.elapsed(),
115 format!("Failed to download joined keys: {}", e),
116 )
117 }
118 };
119
120 let joined_lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
121 Ok(d) => d,
122 Err(e) => {
123 return TestResult::error(
124 "test_hash_join_atomics",
125 start.elapsed(),
126 format!("Failed to download joined lvals: {}", e),
127 )
128 }
129 };
130
131 let joined_rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
132 Ok(d) => d,
133 Err(e) => {
134 return TestResult::error(
135 "test_hash_join_atomics",
136 start.elapsed(),
137 format!("Failed to download joined rvals: {}", e),
138 )
139 }
140 };
141
142 for i in 0..ctx.device_row_count(&joined) as usize {
144 let key = joined_keys[i];
145 let lval = joined_lvals[i];
146 let rval = joined_rvals[i];
147
148 if key % 2 != 0 {
150 return TestResult::error(
151 "test_hash_join_atomics",
152 start.elapsed(),
153 format!("Row {}: key {} should be even", i, key),
154 );
155 }
156
157 let expected_lval = key * 10;
159 if lval != expected_lval {
160 return TestResult::error(
161 "test_hash_join_atomics",
162 start.elapsed(),
163 format!(
164 "Row {}: lval {} doesn't match expected {} for key {}",
165 i, lval, expected_lval, key
166 ),
167 );
168 }
169
170 let expected_rval = (key / 2) * 100;
172 if rval != expected_rval {
173 return TestResult::error(
174 "test_hash_join_atomics",
175 start.elapsed(),
176 format!(
177 "Row {}: rval {} doesn't match expected {} for key {}",
178 i, rval, expected_rval, key
179 ),
180 );
181 }
182 }
183
184 if let Err(e) = ctx.sync_and_check() {
185 return TestResult::error(
186 "test_hash_join_atomics",
187 start.elapsed(),
188 format!("Sync failed: {}", e),
189 );
190 }
191
192 TestResult::passed("test_hash_join_atomics", start.elapsed())
193}
194
195fn test_filter_scan_sync(ctx: &TestContext) -> TestResult {
200 let start = Instant::now();
201 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
202
203 let test_cases: Vec<(usize, Box<dyn Fn(usize) -> bool>)> = vec![
205 (1000, Box::new(|i| i % 2 == 0)), (10000, Box::new(|i| i % 3 == 0)), (50000, Box::new(|i| i < 25000)), (100000, Box::new(|i| i % 7 == 0)), (65536, Box::new(|i| (i / 256) % 2 == 0)), ];
212
213 for (size, predicate) in test_cases {
214 let data: Vec<u32> = (0..size as u32).collect();
216
217 let buffer = match ctx
218 .provider
219 .create_buffer_from_slice::<u32>(&data, schema.clone())
220 {
221 Ok(buf) => buf,
222 Err(e) => {
223 return TestResult::error(
224 "test_filter_scan_sync",
225 start.elapsed(),
226 format!("Size {}: failed to create buffer: {}", size, e),
227 )
228 }
229 };
230
231 let mask: Vec<u8> = (0..size)
233 .map(|i| if predicate(i) { 1 } else { 0 })
234 .collect();
235 let expected_count: usize = mask.iter().map(|&m| m as usize).sum();
236
237 let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
238 Ok(f) => f,
239 Err(e) => {
240 return TestResult::error(
241 "test_filter_scan_sync",
242 start.elapsed(),
243 format!("Size {}: filter failed: {}", size, e),
244 )
245 }
246 };
247
248 if ctx.device_row_count(&filtered) != expected_count as u64 {
250 return TestResult::error(
251 "test_filter_scan_sync",
252 start.elapsed(),
253 format!(
254 "Size {}: filter returned {} rows, expected {}",
255 size,
256 ctx.device_row_count(&filtered),
257 expected_count
258 ),
259 );
260 }
261
262 let filtered_data = match ctx.provider.download_column::<u32>(&filtered, 0) {
264 Ok(d) => d,
265 Err(e) => {
266 return TestResult::error(
267 "test_filter_scan_sync",
268 start.elapsed(),
269 format!("Size {}: failed to download: {}", size, e),
270 )
271 }
272 };
273
274 let mut expected_idx = 0;
276 for i in 0..size {
277 if predicate(i) {
278 if expected_idx >= filtered_data.len() {
279 return TestResult::error(
280 "test_filter_scan_sync",
281 start.elapsed(),
282 format!(
283 "Size {}: filtered data too short at expected index {}",
284 size, expected_idx
285 ),
286 );
287 }
288 if filtered_data[expected_idx] != i as u32 {
289 return TestResult::error(
290 "test_filter_scan_sync",
291 start.elapsed(),
292 format!(
293 "Size {}: filtered[{}] = {}, expected {}",
294 size, expected_idx, filtered_data[expected_idx], i
295 ),
296 );
297 }
298 expected_idx += 1;
299 }
300 }
301 }
302
303 if let Err(e) = ctx.sync_and_check() {
304 return TestResult::error(
305 "test_filter_scan_sync",
306 start.elapsed(),
307 format!("Sync failed: {}", e),
308 );
309 }
310
311 TestResult::passed("test_filter_scan_sync", start.elapsed())
312}
313
314fn test_sort_barrier_correctness(ctx: &TestContext) -> TestResult {
320 let start = Instant::now();
321 let schema = Schema::new(vec![
322 ("key".to_string(), ScalarType::U32),
323 ("val".to_string(), ScalarType::U32),
324 ]);
325
326 let test_patterns: Vec<(&str, Vec<u32>)> = vec![
328 ("reverse", (0..10000u32).rev().collect()),
329 (
330 "alternating",
331 (0..10000u32)
332 .map(|i| if i % 2 == 0 { i } else { 10000 - i })
333 .collect(),
334 ),
335 ("sawtooth", (0..10000u32).map(|i| i % 100).collect()),
336 (
337 "random_lcg",
338 (0..10000usize)
339 .map(|i| ((i * 1103515245 + 12345) % 10000) as u32)
340 .collect(),
341 ),
342 (
343 "blocks_reversed",
344 (0..10000u32)
345 .map(|i| {
346 let block = i / 256;
347 let offset = i % 256;
348 block * 256 + (255 - offset)
349 })
350 .collect(),
351 ),
352 ];
353
354 for (name, keys) in test_patterns {
355 let size = keys.len();
356 let vals: Vec<u32> = (0..size as u32).collect();
357
358 let buffer = match ctx
359 .provider
360 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
361 {
362 Ok(buf) => buf,
363 Err(e) => {
364 return TestResult::error(
365 "test_sort_barrier_correctness",
366 start.elapsed(),
367 format!("Pattern {}: failed to create buffer: {}", name, e),
368 )
369 }
370 };
371
372 let sorted = match ctx.provider.sort(&buffer, &[0]) {
374 Ok(s) => s,
375 Err(e) => {
376 return TestResult::error(
377 "test_sort_barrier_correctness",
378 start.elapsed(),
379 format!("Pattern {}: sort failed: {}", name, e),
380 )
381 }
382 };
383
384 if ctx.device_row_count(&sorted) != size as u64 {
386 return TestResult::error(
387 "test_sort_barrier_correctness",
388 start.elapsed(),
389 format!(
390 "Pattern {}: sort returned {} rows, expected {}",
391 name,
392 ctx.device_row_count(&sorted),
393 size
394 ),
395 );
396 }
397
398 let sorted_keys = match ctx.provider.download_column::<u32>(&sorted, 0) {
400 Ok(d) => d,
401 Err(e) => {
402 return TestResult::error(
403 "test_sort_barrier_correctness",
404 start.elapsed(),
405 format!("Pattern {}: failed to download sorted keys: {}", name, e),
406 )
407 }
408 };
409
410 for i in 1..size {
412 if sorted_keys[i] < sorted_keys[i - 1] {
413 return TestResult::error(
414 "test_sort_barrier_correctness",
415 start.elapsed(),
416 format!(
417 "Pattern {}: sort order incorrect at {}: {} > {}",
418 name,
419 i,
420 sorted_keys[i - 1],
421 sorted_keys[i]
422 ),
423 );
424 }
425 }
426
427 let mut original_sorted = keys.clone();
429 original_sorted.sort();
430 if sorted_keys != original_sorted {
431 return TestResult::error(
432 "test_sort_barrier_correctness",
433 start.elapsed(),
434 format!("Pattern {}: keys not preserved through sort", name),
435 );
436 }
437 }
438
439 if let Err(e) = ctx.sync_and_check() {
440 return TestResult::error(
441 "test_sort_barrier_correctness",
442 start.elapsed(),
443 format!("Sync failed: {}", e),
444 );
445 }
446
447 TestResult::passed("test_sort_barrier_correctness", start.elapsed())
448}
449
450fn test_dedup_atomic_marking(ctx: &TestContext) -> TestResult {
455 let start = Instant::now();
456 let schema = Schema::new(vec![
457 ("key".to_string(), ScalarType::U32),
458 ("val".to_string(), ScalarType::U32),
459 ]);
460
461 let test_cases: Vec<(&str, Vec<u32>, Vec<u32>)> = vec![
463 ("all_same", vec![42; 10000], (0..10000u32).collect()),
465 (
466 "pairs",
467 (0..5000u32).flat_map(|i| vec![i, i]).collect(),
468 (0..10000u32).collect(),
469 ),
470 (
471 "triples",
472 (0..3333u32)
473 .flat_map(|i| vec![i, i, i])
474 .take(9999)
475 .collect(),
476 (0..9999u32).collect(),
477 ),
478 ("no_dups", (0..10000u32).collect(), (0..10000u32).collect()),
479 (
480 "many_dups",
481 (0..10000u32).map(|i| i % 100).collect(),
482 (0..10000u32).collect(),
483 ),
484 ];
485
486 for (name, keys, vals) in test_cases {
487 let buffer = match ctx
488 .provider
489 .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
490 {
491 Ok(buf) => buf,
492 Err(e) => {
493 return TestResult::error(
494 "test_dedup_atomic_marking",
495 start.elapsed(),
496 format!("Pattern {}: failed to create buffer: {}", name, e),
497 )
498 }
499 };
500
501 let deduped = match ctx.provider.dedup(&buffer, &[0]) {
503 Ok(d) => d,
504 Err(e) => {
505 return TestResult::error(
506 "test_dedup_atomic_marking",
507 start.elapsed(),
508 format!("Pattern {}: dedup failed: {}", name, e),
509 )
510 }
511 };
512
513 let mut unique_keys: std::collections::HashSet<u32> = std::collections::HashSet::new();
515 for &k in &keys {
516 unique_keys.insert(k);
517 }
518 let expected_unique = unique_keys.len();
519
520 if ctx.device_row_count(&deduped) != expected_unique as u64 {
521 return TestResult::error(
522 "test_dedup_atomic_marking",
523 start.elapsed(),
524 format!(
525 "Pattern {}: dedup returned {} rows, expected {}",
526 name,
527 ctx.device_row_count(&deduped),
528 expected_unique
529 ),
530 );
531 }
532
533 let deduped_keys = match ctx.provider.download_column::<u32>(&deduped, 0) {
535 Ok(d) => d,
536 Err(e) => {
537 return TestResult::error(
538 "test_dedup_atomic_marking",
539 start.elapsed(),
540 format!("Pattern {}: failed to download deduped keys: {}", name, e),
541 )
542 }
543 };
544
545 let mut seen_keys: std::collections::HashSet<u32> = std::collections::HashSet::new();
547 for &k in &deduped_keys {
548 if !seen_keys.insert(k) {
549 return TestResult::error(
550 "test_dedup_atomic_marking",
551 start.elapsed(),
552 format!("Pattern {}: duplicate key {} in dedup result", name, k),
553 );
554 }
555 }
556
557 if seen_keys != unique_keys {
559 return TestResult::error(
560 "test_dedup_atomic_marking",
561 start.elapsed(),
562 format!("Pattern {}: dedup result missing some unique keys", name),
563 );
564 }
565 }
566
567 if let Err(e) = ctx.sync_and_check() {
568 return TestResult::error(
569 "test_dedup_atomic_marking",
570 start.elapsed(),
571 format!("Sync failed: {}", e),
572 );
573 }
574
575 TestResult::passed("test_dedup_atomic_marking", start.elapsed())
576}
577
578fn test_concurrent_operations(ctx: &TestContext) -> TestResult {
583 let start = Instant::now();
584 let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
585
586 const NUM_BUFFERS: usize = 5;
588 const BUFFER_SIZE: usize = 10000;
589
590 let mut buffers = Vec::with_capacity(NUM_BUFFERS);
591 let mut original_data = Vec::with_capacity(NUM_BUFFERS);
592
593 for i in 0..NUM_BUFFERS {
594 let data: Vec<u32> = (0..BUFFER_SIZE)
596 .map(|j| ((j * (i + 1) * 7 + i * 13) % BUFFER_SIZE) as u32)
597 .collect();
598
599 let buffer = match ctx
600 .provider
601 .create_buffer_from_slice::<u32>(&data, schema.clone())
602 {
603 Ok(buf) => buf,
604 Err(e) => {
605 return TestResult::error(
606 "test_concurrent_operations",
607 start.elapsed(),
608 format!("Failed to create buffer {}: {}", i, e),
609 )
610 }
611 };
612
613 original_data.push(data);
614 buffers.push(buffer);
615 }
616
617 let mut results = Vec::with_capacity(NUM_BUFFERS);
619
620 let sorted0 = match ctx.provider.sort(&buffers[0], &[0]) {
622 Ok(s) => s,
623 Err(e) => {
624 return TestResult::error(
625 "test_concurrent_operations",
626 start.elapsed(),
627 format!("Sort of buffer 0 failed: {}", e),
628 )
629 }
630 };
631 results.push(("sort", sorted0));
632
633 let mask1: Vec<u8> = (0..BUFFER_SIZE)
635 .map(|i| if i < BUFFER_SIZE / 2 { 1 } else { 0 })
636 .collect();
637 let filtered1 = match ctx.provider.filter_by_mask(&buffers[1], &mask1) {
638 Ok(f) => f,
639 Err(e) => {
640 return TestResult::error(
641 "test_concurrent_operations",
642 start.elapsed(),
643 format!("Filter of buffer 1 failed: {}", e),
644 )
645 }
646 };
647 results.push(("filter", filtered1));
648
649 let sorted2 = match ctx.provider.sort(&buffers[2], &[0]) {
651 Ok(s) => s,
652 Err(e) => {
653 return TestResult::error(
654 "test_concurrent_operations",
655 start.elapsed(),
656 format!("Sort of buffer 2 failed: {}", e),
657 )
658 }
659 };
660 results.push(("sort", sorted2));
661
662 let mask3: Vec<u8> = (0..BUFFER_SIZE)
664 .map(|i| if i % 2 == 0 { 1 } else { 0 })
665 .collect();
666 let filtered3 = match ctx.provider.filter_by_mask(&buffers[3], &mask3) {
667 Ok(f) => f,
668 Err(e) => {
669 return TestResult::error(
670 "test_concurrent_operations",
671 start.elapsed(),
672 format!("Filter of buffer 3 failed: {}", e),
673 )
674 }
675 };
676 results.push(("filter", filtered3));
677
678 let sorted4 = match ctx.provider.sort(&buffers[4], &[0]) {
680 Ok(s) => s,
681 Err(e) => {
682 return TestResult::error(
683 "test_concurrent_operations",
684 start.elapsed(),
685 format!("Sort of buffer 4 failed: {}", e),
686 )
687 }
688 };
689 results.push(("sort", sorted4));
690
691 for &idx in &[0usize, 2, 4] {
694 let (op, ref result) = results[idx];
695 assert!(op == "sort");
696
697 if ctx.device_row_count(&result) != BUFFER_SIZE as u64 {
698 return TestResult::error(
699 "test_concurrent_operations",
700 start.elapsed(),
701 format!(
702 "Buffer {}: {} returned {} rows, expected {}",
703 idx,
704 op,
705 ctx.device_row_count(&result),
706 BUFFER_SIZE
707 ),
708 );
709 }
710
711 let sorted_data = match ctx.provider.download_column::<u32>(result, 0) {
712 Ok(d) => d,
713 Err(e) => {
714 return TestResult::error(
715 "test_concurrent_operations",
716 start.elapsed(),
717 format!("Buffer {}: failed to download: {}", idx, e),
718 )
719 }
720 };
721
722 for i in 1..BUFFER_SIZE {
724 if sorted_data[i] < sorted_data[i - 1] {
725 return TestResult::error(
726 "test_concurrent_operations",
727 start.elapsed(),
728 format!(
729 "Buffer {}: sort order incorrect at {}: {} > {}",
730 idx,
731 i,
732 sorted_data[i - 1],
733 sorted_data[i]
734 ),
735 );
736 }
737 }
738
739 let mut expected = original_data[idx].clone();
741 expected.sort();
742 if sorted_data != expected {
743 return TestResult::error(
744 "test_concurrent_operations",
745 start.elapsed(),
746 format!("Buffer {}: sorted data doesn't match expected", idx),
747 );
748 }
749 }
750
751 let (op1, ref result1) = results[1];
754 assert!(op1 == "filter");
755 if ctx.device_row_count(&result1) != (BUFFER_SIZE / 2) as u64 {
756 return TestResult::error(
757 "test_concurrent_operations",
758 start.elapsed(),
759 format!(
760 "Buffer 1: filter returned {} rows, expected {}",
761 ctx.device_row_count(&result1),
762 BUFFER_SIZE / 2
763 ),
764 );
765 }
766
767 let (op3, ref result3) = results[3];
769 assert!(op3 == "filter");
770 if ctx.device_row_count(&result3) != ((BUFFER_SIZE + 1) / 2) as u64 {
771 return TestResult::error(
772 "test_concurrent_operations",
773 start.elapsed(),
774 format!(
775 "Buffer 3: filter returned {} rows, expected {}",
776 ctx.device_row_count(&result3),
777 (BUFFER_SIZE + 1) / 2
778 ),
779 );
780 }
781
782 for i in 0..NUM_BUFFERS {
784 let current_data = match ctx.provider.download_column::<u32>(&buffers[i], 0) {
785 Ok(d) => d,
786 Err(e) => {
787 return TestResult::error(
788 "test_concurrent_operations",
789 start.elapsed(),
790 format!("Failed to verify buffer {}: {}", i, e),
791 )
792 }
793 };
794
795 if current_data != original_data[i] {
796 return TestResult::error(
797 "test_concurrent_operations",
798 start.elapsed(),
799 format!("Buffer {} was modified by operations", i),
800 );
801 }
802 }
803
804 if let Err(e) = ctx.sync_and_check() {
805 return TestResult::error(
806 "test_concurrent_operations",
807 start.elapsed(),
808 format!("Sync failed: {}", e),
809 );
810 }
811
812 TestResult::passed("test_concurrent_operations", start.elapsed())
813}