Skip to main content

xlog_cuda_tests/categories/
c12_atomics.rs

1//! Category 12: Atomic Operations
2//!
3//! Tests atomic operation correctness including hash join atomics, dedup atomics,
4//! high contention scenarios, atomic counting, and concurrent atomic updates.
5
6use crate::harness::{CategoryResult, TestContext, TestResult};
7use std::collections::HashSet;
8use std::time::Instant;
9use xlog_core::{ScalarType, Schema};
10
11/// Run all tests in this category.
12pub fn run_all(ctx: &TestContext) -> CategoryResult {
13    let mut results = CategoryResult::new("c12_atomics");
14    let start = Instant::now();
15
16    results.add_result(test_hash_join_atomic_correctness(ctx));
17    results.add_result(test_dedup_atomic_correctness(ctx));
18    results.add_result(test_high_contention_join(ctx));
19    results.add_result(test_atomic_counting(ctx));
20    results.add_result(test_concurrent_atomic_updates(ctx));
21
22    results.set_duration(start.elapsed());
23    results
24}
25
26/// Test 1: Hash join uses atomic hash table operations - verify correctness.
27///
28/// Hash join builds a hash table using atomic operations to handle collisions.
29/// This test verifies that concurrent atomic insertions produce correct results.
30fn test_hash_join_atomic_correctness(ctx: &TestContext) -> TestResult {
31    let start = Instant::now();
32
33    let left_schema = Schema::new(vec![
34        ("key".to_string(), ScalarType::U32),
35        ("lval".to_string(), ScalarType::U32),
36    ]);
37    let right_schema = Schema::new(vec![
38        ("key".to_string(), ScalarType::U32),
39        ("rval".to_string(), ScalarType::U32),
40    ]);
41
42    // Test with various sizes to exercise atomic contention
43    let test_cases: Vec<(usize, usize, f64)> = vec![
44        (1000, 500, 0.5),   // 50% match rate
45        (5000, 2500, 0.5),  // 50% match rate, larger
46        (10000, 1000, 1.0), // All right keys match
47        (10000, 5000, 0.8), // 80% match rate
48    ];
49
50    for (left_size, right_size, match_rate) in test_cases {
51        // Create left table with sequential keys
52        let left_keys: Vec<u32> = (0..left_size as u32).collect();
53        let left_vals: Vec<u32> = left_keys.iter().map(|&k| k * 10).collect();
54
55        // Create right table with keys that have 'match_rate' overlap
56        let matching_right = (right_size as f64 * match_rate) as usize;
57        let mut right_keys: Vec<u32> = Vec::with_capacity(right_size);
58        let mut right_vals: Vec<u32> = Vec::with_capacity(right_size);
59
60        // First portion: matching keys
61        for i in 0..matching_right {
62            let key = (i * left_size / matching_right.max(1)) as u32;
63            right_keys.push(key);
64            right_vals.push(key * 100);
65        }
66        // Remaining: non-matching keys
67        for i in matching_right..right_size {
68            let key = (left_size as u32) + (i as u32);
69            right_keys.push(key);
70            right_vals.push(key * 100);
71        }
72
73        let left_buffer = match ctx
74            .provider
75            .create_buffer_from_u32_columns(&[&left_keys, &left_vals], left_schema.clone())
76        {
77            Ok(buf) => buf,
78            Err(e) => {
79                return TestResult::error(
80                    "test_hash_join_atomic_correctness",
81                    start.elapsed(),
82                    format!("Left {}: failed to create buffer: {}", left_size, e),
83                )
84            }
85        };
86
87        let right_buffer = match ctx
88            .provider
89            .create_buffer_from_u32_columns(&[&right_keys, &right_vals], right_schema.clone())
90        {
91            Ok(buf) => buf,
92            Err(e) => {
93                return TestResult::error(
94                    "test_hash_join_atomic_correctness",
95                    start.elapsed(),
96                    format!("Right {}: failed to create buffer: {}", right_size, e),
97                )
98            }
99        };
100
101        // Perform hash join
102        let joined = match ctx
103            .provider
104            .hash_join(&left_buffer, &right_buffer, &[0], &[0])
105        {
106            Ok(j) => j,
107            Err(e) => {
108                return TestResult::error(
109                    "test_hash_join_atomic_correctness",
110                    start.elapsed(),
111                    format!("Hash join failed ({}x{}): {}", left_size, right_size, e),
112                )
113            }
114        };
115
116        // Verify join count matches expected
117        if ctx.device_row_count(&joined) != matching_right as u64 {
118            return TestResult::error(
119                "test_hash_join_atomic_correctness",
120                start.elapsed(),
121                format!(
122                    "Join ({}x{}) returned {} rows, expected {}",
123                    left_size,
124                    right_size,
125                    ctx.device_row_count(&joined),
126                    matching_right
127                ),
128            );
129        }
130
131        // Download and verify join results
132        let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
133            Ok(d) => d,
134            Err(e) => {
135                return TestResult::error(
136                    "test_hash_join_atomic_correctness",
137                    start.elapsed(),
138                    format!("Failed to download joined keys: {}", e),
139                )
140            }
141        };
142
143        let joined_lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
144            Ok(d) => d,
145            Err(e) => {
146                return TestResult::error(
147                    "test_hash_join_atomic_correctness",
148                    start.elapsed(),
149                    format!("Failed to download joined lvals: {}", e),
150                )
151            }
152        };
153
154        let joined_rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
155            Ok(d) => d,
156            Err(e) => {
157                return TestResult::error(
158                    "test_hash_join_atomic_correctness",
159                    start.elapsed(),
160                    format!("Failed to download joined rvals: {}", e),
161                )
162            }
163        };
164
165        // Verify each joined row
166        for i in 0..ctx.device_row_count(&joined) as usize {
167            let key = joined_keys[i];
168            let lval = joined_lvals[i];
169            let rval = joined_rvals[i];
170
171            // Key must be in left table range
172            if key >= left_size as u32 {
173                return TestResult::error(
174                    "test_hash_join_atomic_correctness",
175                    start.elapsed(),
176                    format!(
177                        "Row {}: key {} is outside left table range [0, {})",
178                        i, key, left_size
179                    ),
180                );
181            }
182
183            // lval should be key * 10
184            if lval != key * 10 {
185                return TestResult::error(
186                    "test_hash_join_atomic_correctness",
187                    start.elapsed(),
188                    format!(
189                        "Row {}: lval {} doesn't match expected {} for key {}",
190                        i,
191                        lval,
192                        key * 10,
193                        key
194                    ),
195                );
196            }
197
198            // rval should be key * 100
199            if rval != key * 100 {
200                return TestResult::error(
201                    "test_hash_join_atomic_correctness",
202                    start.elapsed(),
203                    format!(
204                        "Row {}: rval {} doesn't match expected {} for key {}",
205                        i,
206                        rval,
207                        key * 100,
208                        key
209                    ),
210                );
211            }
212        }
213    }
214
215    if let Err(e) = ctx.sync_and_check() {
216        return TestResult::error(
217            "test_hash_join_atomic_correctness",
218            start.elapsed(),
219            format!("Sync failed: {}", e),
220        );
221    }
222
223    TestResult::passed("test_hash_join_atomic_correctness", start.elapsed())
224}
225
226/// Test 2: Dedup uses atomic marking - verify correctness.
227///
228/// Dedup operations use atomic operations to mark duplicates. This test
229/// verifies that concurrent atomic marking produces correct deduplication.
230fn test_dedup_atomic_correctness(ctx: &TestContext) -> TestResult {
231    let start = Instant::now();
232    let schema = Schema::new(vec![
233        ("key".to_string(), ScalarType::U32),
234        ("val".to_string(), ScalarType::U32),
235    ]);
236
237    // Test various duplicate patterns
238    let test_cases: Vec<(&str, Vec<u32>, Vec<u32>)> = vec![
239        // (name, keys, vals)
240        ("all_same", vec![42; 10000], (0..10000u32).collect()),
241        (
242            "pairs",
243            (0..5000u32).flat_map(|i| vec![i, i]).collect(),
244            (0..10000u32).collect(),
245        ),
246        (
247            "random_dups",
248            (0..10000usize)
249                .map(|i| ((i * 1103515245 + 12345) % 1000) as u32)
250                .collect(),
251            (0..10000u32).collect(),
252        ),
253        (
254            "clustered_dups",
255            (0..10000usize).map(|i| (i / 10) as u32).collect(),
256            (0..10000u32).collect(),
257        ),
258        (
259            "sparse_dups",
260            (0..10000usize)
261                .map(|i| if i % 100 == 0 { 0 } else { i as u32 })
262                .collect(),
263            (0..10000u32).collect(),
264        ),
265    ];
266
267    for (name, keys, vals) in test_cases {
268        let buffer = match ctx
269            .provider
270            .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
271        {
272            Ok(buf) => buf,
273            Err(e) => {
274                return TestResult::error(
275                    "test_dedup_atomic_correctness",
276                    start.elapsed(),
277                    format!("Pattern {}: failed to create buffer: {}", name, e),
278                )
279            }
280        };
281
282        // Dedup by key column
283        let deduped = match ctx.provider.dedup(&buffer, &[0]) {
284            Ok(d) => d,
285            Err(e) => {
286                return TestResult::error(
287                    "test_dedup_atomic_correctness",
288                    start.elapsed(),
289                    format!("Pattern {}: dedup failed: {}", name, e),
290                )
291            }
292        };
293
294        // Calculate expected unique count
295        let unique_keys: HashSet<u32> = keys.iter().cloned().collect();
296        let expected_unique = unique_keys.len();
297
298        if ctx.device_row_count(&deduped) != expected_unique as u64 {
299            return TestResult::error(
300                "test_dedup_atomic_correctness",
301                start.elapsed(),
302                format!(
303                    "Pattern {}: dedup returned {} rows, expected {}",
304                    name,
305                    ctx.device_row_count(&deduped),
306                    expected_unique
307                ),
308            );
309        }
310
311        // Download and verify uniqueness
312        let deduped_keys = match ctx.provider.download_column::<u32>(&deduped, 0) {
313            Ok(d) => d,
314            Err(e) => {
315                return TestResult::error(
316                    "test_dedup_atomic_correctness",
317                    start.elapsed(),
318                    format!("Pattern {}: failed to download deduped keys: {}", name, e),
319                )
320            }
321        };
322
323        // Verify all keys are unique
324        let mut seen_keys: HashSet<u32> = HashSet::new();
325        for &k in &deduped_keys {
326            if !seen_keys.insert(k) {
327                return TestResult::error(
328                    "test_dedup_atomic_correctness",
329                    start.elapsed(),
330                    format!("Pattern {}: duplicate key {} in dedup result", name, k),
331                );
332            }
333        }
334
335        // Verify all original unique keys are present
336        if seen_keys != unique_keys {
337            return TestResult::error(
338                "test_dedup_atomic_correctness",
339                start.elapsed(),
340                format!("Pattern {}: dedup result missing some unique keys", name),
341            );
342        }
343    }
344
345    if let Err(e) = ctx.sync_and_check() {
346        return TestResult::error(
347            "test_dedup_atomic_correctness",
348            start.elapsed(),
349            format!("Sync failed: {}", e),
350        );
351    }
352
353    TestResult::passed("test_dedup_atomic_correctness", start.elapsed())
354}
355
356/// Test 3: Join with many hash collisions (adversarial keys).
357///
358/// Adversarial key patterns that cause hash collisions stress atomic
359/// contention in the hash table. This test uses keys designed to create
360/// maximum collision.
361fn test_high_contention_join(ctx: &TestContext) -> TestResult {
362    let start = Instant::now();
363
364    let left_schema = Schema::new(vec![
365        ("key".to_string(), ScalarType::U32),
366        ("lval".to_string(), ScalarType::U32),
367    ]);
368    let right_schema = Schema::new(vec![
369        ("key".to_string(), ScalarType::U32),
370        ("rval".to_string(), ScalarType::U32),
371    ]);
372
373    // Pattern 1: Keys that are multiples of power of 2 (common hash table size)
374    // These will likely collide if the hash table uses power-of-2 sizing
375    const SIZE: usize = 5000;
376
377    let collision_keys: Vec<u32> = (0..SIZE).map(|i| (i * 256) as u32).collect();
378    let left_vals: Vec<u32> = collision_keys.iter().map(|&k| k + 1).collect();
379    let right_vals: Vec<u32> = collision_keys.iter().map(|&k| k + 2).collect();
380
381    let left_buffer = match ctx
382        .provider
383        .create_buffer_from_u32_columns(&[&collision_keys, &left_vals], left_schema.clone())
384    {
385        Ok(buf) => buf,
386        Err(e) => {
387            return TestResult::error(
388                "test_high_contention_join",
389                start.elapsed(),
390                format!("Failed to create left buffer: {}", e),
391            )
392        }
393    };
394
395    let right_buffer = match ctx
396        .provider
397        .create_buffer_from_u32_columns(&[&collision_keys, &right_vals], right_schema.clone())
398    {
399        Ok(buf) => buf,
400        Err(e) => {
401            return TestResult::error(
402                "test_high_contention_join",
403                start.elapsed(),
404                format!("Failed to create right buffer: {}", e),
405            )
406        }
407    };
408
409    // Perform hash join with collision-prone keys
410    let joined = match ctx
411        .provider
412        .hash_join(&left_buffer, &right_buffer, &[0], &[0])
413    {
414        Ok(j) => j,
415        Err(e) => {
416            return TestResult::error(
417                "test_high_contention_join",
418                start.elapsed(),
419                format!("Hash join with collision keys failed: {}", e),
420            )
421        }
422    };
423
424    // All keys should match
425    if ctx.device_row_count(&joined) != SIZE as u64 {
426        return TestResult::error(
427            "test_high_contention_join",
428            start.elapsed(),
429            format!(
430                "Collision join returned {} rows, expected {}",
431                ctx.device_row_count(&joined),
432                SIZE
433            ),
434        );
435    }
436
437    // Download and verify
438    let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
439        Ok(d) => d,
440        Err(e) => {
441            return TestResult::error(
442                "test_high_contention_join",
443                start.elapsed(),
444                format!("Failed to download collision join keys: {}", e),
445            )
446        }
447    };
448
449    let joined_lvals = match ctx.provider.download_column::<u32>(&joined, 1) {
450        Ok(d) => d,
451        Err(e) => {
452            return TestResult::error(
453                "test_high_contention_join",
454                start.elapsed(),
455                format!("Failed to download collision join lvals: {}", e),
456            )
457        }
458    };
459
460    let joined_rvals = match ctx.provider.download_column::<u32>(&joined, 2) {
461        Ok(d) => d,
462        Err(e) => {
463            return TestResult::error(
464                "test_high_contention_join",
465                start.elapsed(),
466                format!("Failed to download collision join rvals: {}", e),
467            )
468        }
469    };
470
471    // Verify all key-value relationships
472    for i in 0..ctx.device_row_count(&joined) as usize {
473        let key = joined_keys[i];
474        let lval = joined_lvals[i];
475        let rval = joined_rvals[i];
476
477        if lval != key + 1 {
478            return TestResult::error(
479                "test_high_contention_join",
480                start.elapsed(),
481                format!(
482                    "Collision row {}: lval {} != key + 1 = {}",
483                    i,
484                    lval,
485                    key + 1
486                ),
487            );
488        }
489
490        if rval != key + 2 {
491            return TestResult::error(
492                "test_high_contention_join",
493                start.elapsed(),
494                format!(
495                    "Collision row {}: rval {} != key + 2 = {}",
496                    i,
497                    rval,
498                    key + 2
499                ),
500            );
501        }
502    }
503
504    // Verify all keys are present (no lost due to collisions)
505    let seen_keys: HashSet<u32> = joined_keys.iter().cloned().collect();
506    let expected_keys: HashSet<u32> = collision_keys.iter().cloned().collect();
507    if seen_keys != expected_keys {
508        return TestResult::error(
509            "test_high_contention_join",
510            start.elapsed(),
511            format!(
512                "Missing keys in collision join: expected {}, got {}",
513                expected_keys.len(),
514                seen_keys.len()
515            ),
516        );
517    }
518
519    // Pattern 2: All same key (maximum contention)
520    let same_key_left: Vec<u32> = vec![12345; 1000];
521    let same_key_left_vals: Vec<u32> = (0..1000u32).collect();
522    let same_key_right: Vec<u32> = vec![12345; 100];
523    let same_key_right_vals: Vec<u32> = (0..100u32).map(|i| i * 1000).collect();
524
525    let left_same = match ctx
526        .provider
527        .create_buffer_from_u32_columns(&[&same_key_left, &same_key_left_vals], left_schema.clone())
528    {
529        Ok(buf) => buf,
530        Err(e) => {
531            return TestResult::error(
532                "test_high_contention_join",
533                start.elapsed(),
534                format!("Failed to create same-key left buffer: {}", e),
535            )
536        }
537    };
538
539    let right_same = match ctx.provider.create_buffer_from_u32_columns(
540        &[&same_key_right, &same_key_right_vals],
541        right_schema.clone(),
542    ) {
543        Ok(buf) => buf,
544        Err(e) => {
545            return TestResult::error(
546                "test_high_contention_join",
547                start.elapsed(),
548                format!("Failed to create same-key right buffer: {}", e),
549            )
550        }
551    };
552
553    // Join with same key - should produce 1000 * 100 = 100,000 rows (Cartesian for that key)
554    let same_joined = match ctx.provider.hash_join(&left_same, &right_same, &[0], &[0]) {
555        Ok(j) => j,
556        Err(e) => {
557            return TestResult::error(
558                "test_high_contention_join",
559                start.elapsed(),
560                format!("Same-key hash join failed: {}", e),
561            )
562        }
563    };
564
565    let expected_same = 1000 * 100;
566    if ctx.device_row_count(&same_joined) != expected_same as u64 {
567        return TestResult::error(
568            "test_high_contention_join",
569            start.elapsed(),
570            format!(
571                "Same-key join returned {} rows, expected {} (Cartesian)",
572                ctx.device_row_count(&same_joined),
573                expected_same
574            ),
575        );
576    }
577
578    // Verify all joined keys are the same key
579    let same_joined_keys = match ctx.provider.download_column::<u32>(&same_joined, 0) {
580        Ok(d) => d,
581        Err(e) => {
582            return TestResult::error(
583                "test_high_contention_join",
584                start.elapsed(),
585                format!("Failed to download same-key join keys: {}", e),
586            )
587        }
588    };
589
590    for (i, &key) in same_joined_keys.iter().enumerate() {
591        if key != 12345 {
592            return TestResult::error(
593                "test_high_contention_join",
594                start.elapsed(),
595                format!("Same-key row {}: key {} != 12345", i, key),
596            );
597        }
598    }
599
600    if let Err(e) = ctx.sync_and_check() {
601        return TestResult::error(
602            "test_high_contention_join",
603            start.elapsed(),
604            format!("Sync failed: {}", e),
605        );
606    }
607
608    TestResult::passed("test_high_contention_join", start.elapsed())
609}
610
611/// Test 4: Operations that rely on atomic counting (filter scan).
612///
613/// Filter operations use atomic prefix sum / scan to compute output positions.
614/// This test verifies that atomic counting produces correct output offsets.
615fn test_atomic_counting(ctx: &TestContext) -> TestResult {
616    let start = Instant::now();
617    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
618
619    // Test prefix_sum_mask directly (if available) and verify through filter
620    let test_cases: Vec<(usize, Box<dyn Fn(usize) -> bool>)> = vec![
621        // (size, predicate)
622        (1000, Box::new(|i| i % 2 == 0)),   // 50% selectivity
623        (10000, Box::new(|i| i % 10 == 0)), // 10% selectivity
624        (50000, Box::new(|i| i < 10000)),   // First 20%
625        (100000, Box::new(|i| i % 7 == 0)), // ~14% selectivity
626    ];
627
628    for (size, predicate) in test_cases {
629        // Create mask
630        let mask: Vec<u8> = (0..size)
631            .map(|i| if predicate(i) { 1 } else { 0 })
632            .collect();
633        let expected_count: usize = mask.iter().map(|&m| m as usize).sum();
634
635        // Test prefix_sum_mask
636        let (prefix_sums, total) = match ctx.provider.prefix_sum_mask(&mask) {
637            Ok(result) => result,
638            Err(e) => {
639                return TestResult::error(
640                    "test_atomic_counting",
641                    start.elapsed(),
642                    format!("Size {}: prefix_sum_mask failed: {}", size, e),
643                )
644            }
645        };
646
647        // Verify total count
648        if total != expected_count as u32 {
649            return TestResult::error(
650                "test_atomic_counting",
651                start.elapsed(),
652                format!(
653                    "Size {}: prefix_sum total {} != expected {}",
654                    size, total, expected_count
655                ),
656            );
657        }
658
659        // Verify prefix sum is monotonic and correct
660        let mut running_sum = 0u32;
661        for (i, &m) in mask.iter().enumerate() {
662            if prefix_sums[i] != running_sum {
663                return TestResult::error(
664                    "test_atomic_counting",
665                    start.elapsed(),
666                    format!(
667                        "Size {}: prefix_sums[{}] = {}, expected {}",
668                        size, i, prefix_sums[i], running_sum
669                    ),
670                );
671            }
672            running_sum += m as u32;
673        }
674
675        // Verify final prefix sum + last mask equals total
676        if running_sum != total {
677            return TestResult::error(
678                "test_atomic_counting",
679                start.elapsed(),
680                format!(
681                    "Size {}: final sum {} != total {}",
682                    size, running_sum, total
683                ),
684            );
685        }
686
687        // Also verify through filter operation
688        let data: Vec<u32> = (0..size as u32).collect();
689
690        let buffer = match ctx
691            .provider
692            .create_buffer_from_slice::<u32>(&data, schema.clone())
693        {
694            Ok(buf) => buf,
695            Err(e) => {
696                return TestResult::error(
697                    "test_atomic_counting",
698                    start.elapsed(),
699                    format!("Size {}: failed to create buffer: {}", size, e),
700                )
701            }
702        };
703
704        let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
705            Ok(f) => f,
706            Err(e) => {
707                return TestResult::error(
708                    "test_atomic_counting",
709                    start.elapsed(),
710                    format!("Size {}: filter failed: {}", size, e),
711                )
712            }
713        };
714
715        if ctx.device_row_count(&filtered) != expected_count as u64 {
716            return TestResult::error(
717                "test_atomic_counting",
718                start.elapsed(),
719                format!(
720                    "Size {}: filter returned {} rows, expected {}",
721                    size,
722                    ctx.device_row_count(&filtered),
723                    expected_count
724                ),
725            );
726        }
727
728        // Verify filtered positions match prefix sums
729        let filtered_data = match ctx.provider.download_column::<u32>(&filtered, 0) {
730            Ok(d) => d,
731            Err(e) => {
732                return TestResult::error(
733                    "test_atomic_counting",
734                    start.elapsed(),
735                    format!("Size {}: failed to download filtered: {}", size, e),
736                )
737            }
738        };
739
740        let mut expected_idx = 0;
741        for (i, &m) in mask.iter().enumerate() {
742            if m == 1 {
743                if expected_idx >= filtered_data.len() {
744                    return TestResult::error(
745                        "test_atomic_counting",
746                        start.elapsed(),
747                        format!(
748                            "Size {}: filtered data too short at expected index {}",
749                            size, expected_idx
750                        ),
751                    );
752                }
753                if filtered_data[expected_idx] != i as u32 {
754                    return TestResult::error(
755                        "test_atomic_counting",
756                        start.elapsed(),
757                        format!(
758                            "Size {}: filtered[{}] = {}, expected {}",
759                            size, expected_idx, filtered_data[expected_idx], i
760                        ),
761                    );
762                }
763                expected_idx += 1;
764            }
765        }
766    }
767
768    if let Err(e) = ctx.sync_and_check() {
769        return TestResult::error(
770            "test_atomic_counting",
771            start.elapsed(),
772            format!("Sync failed: {}", e),
773        );
774    }
775
776    TestResult::passed("test_atomic_counting", start.elapsed())
777}
778
779/// Test 5: Multiple operations using atomics in sequence.
780///
781/// Tests that multiple operations using atomics can run in sequence without
782/// interference, verifying proper atomic operation cleanup between operations.
783fn test_concurrent_atomic_updates(ctx: &TestContext) -> TestResult {
784    let start = Instant::now();
785
786    let schema = Schema::new(vec![
787        ("key".to_string(), ScalarType::U32),
788        ("val".to_string(), ScalarType::U32),
789    ]);
790
791    // Run multiple atomic-using operations in sequence
792    const NUM_ITERATIONS: usize = 5;
793    const SIZE: usize = 10000;
794
795    for iteration in 0..NUM_ITERATIONS {
796        // Create data unique to this iteration
797        let keys: Vec<u32> = (0..SIZE).map(|i| (i + iteration * 1000) as u32).collect();
798        let vals: Vec<u32> = keys.iter().map(|&k| k * 10).collect();
799
800        let buffer = match ctx
801            .provider
802            .create_buffer_from_u32_columns(&[&keys, &vals], schema.clone())
803        {
804            Ok(buf) => buf,
805            Err(e) => {
806                return TestResult::error(
807                    "test_concurrent_atomic_updates",
808                    start.elapsed(),
809                    format!("Iteration {}: failed to create buffer: {}", iteration, e),
810                )
811            }
812        };
813
814        // Operation 1: Dedup (uses atomic marking)
815        let deduped = match ctx.provider.dedup(&buffer, &[0]) {
816            Ok(d) => d,
817            Err(e) => {
818                return TestResult::error(
819                    "test_concurrent_atomic_updates",
820                    start.elapsed(),
821                    format!("Iteration {}: dedup failed: {}", iteration, e),
822                )
823            }
824        };
825
826        // All keys are unique, so dedup should return same count
827        if ctx.device_row_count(&deduped) != SIZE as u64 {
828            return TestResult::error(
829                "test_concurrent_atomic_updates",
830                start.elapsed(),
831                format!(
832                    "Iteration {}: dedup returned {} rows, expected {}",
833                    iteration,
834                    ctx.device_row_count(&deduped),
835                    SIZE
836                ),
837            );
838        }
839
840        // Operation 2: Filter (uses atomic scan)
841        let mask: Vec<u8> = (0..SIZE).map(|i| if i % 3 == 0 { 1 } else { 0 }).collect();
842        let expected_filter = (SIZE + 2) / 3;
843
844        let filtered = match ctx.provider.filter_by_mask(&buffer, &mask) {
845            Ok(f) => f,
846            Err(e) => {
847                return TestResult::error(
848                    "test_concurrent_atomic_updates",
849                    start.elapsed(),
850                    format!("Iteration {}: filter failed: {}", iteration, e),
851                )
852            }
853        };
854
855        if ctx.device_row_count(&filtered) != expected_filter as u64 {
856            return TestResult::error(
857                "test_concurrent_atomic_updates",
858                start.elapsed(),
859                format!(
860                    "Iteration {}: filter returned {} rows, expected {}",
861                    iteration,
862                    ctx.device_row_count(&filtered),
863                    expected_filter
864                ),
865            );
866        }
867
868        // Operation 3: Self-join (uses atomic hash table)
869        // Join buffer with itself on key
870        let joined = match ctx.provider.hash_join(&buffer, &buffer, &[0], &[0]) {
871            Ok(j) => j,
872            Err(e) => {
873                return TestResult::error(
874                    "test_concurrent_atomic_updates",
875                    start.elapsed(),
876                    format!("Iteration {}: self-join failed: {}", iteration, e),
877                )
878            }
879        };
880
881        // Self-join should return SIZE rows (each key matches exactly once)
882        if ctx.device_row_count(&joined) != SIZE as u64 {
883            return TestResult::error(
884                "test_concurrent_atomic_updates",
885                start.elapsed(),
886                format!(
887                    "Iteration {}: self-join returned {} rows, expected {}",
888                    iteration,
889                    ctx.device_row_count(&joined),
890                    SIZE
891                ),
892            );
893        }
894
895        // Verify joined results
896        let joined_keys = match ctx.provider.download_column::<u32>(&joined, 0) {
897            Ok(d) => d,
898            Err(e) => {
899                return TestResult::error(
900                    "test_concurrent_atomic_updates",
901                    start.elapsed(),
902                    format!(
903                        "Iteration {}: failed to download join keys: {}",
904                        iteration, e
905                    ),
906                )
907            }
908        };
909
910        // Verify all original keys appear in join result
911        let joined_key_set: HashSet<u32> = joined_keys.iter().cloned().collect();
912        let original_key_set: HashSet<u32> = keys.iter().cloned().collect();
913
914        if joined_key_set != original_key_set {
915            return TestResult::error(
916                "test_concurrent_atomic_updates",
917                start.elapsed(),
918                format!(
919                    "Iteration {}: self-join keys don't match original ({} vs {})",
920                    iteration,
921                    joined_key_set.len(),
922                    original_key_set.len()
923                ),
924            );
925        }
926
927        // Operation 4: Another dedup to verify atomics reset properly
928        let deduped2 = match ctx.provider.dedup(&buffer, &[0]) {
929            Ok(d) => d,
930            Err(e) => {
931                return TestResult::error(
932                    "test_concurrent_atomic_updates",
933                    start.elapsed(),
934                    format!("Iteration {}: second dedup failed: {}", iteration, e),
935                )
936            }
937        };
938
939        if ctx.device_row_count(&deduped2) != SIZE as u64 {
940            return TestResult::error(
941                "test_concurrent_atomic_updates",
942                start.elapsed(),
943                format!(
944                    "Iteration {}: second dedup returned {} rows, expected {} (atomics may not have reset)",
945                    iteration, ctx.device_row_count(&deduped2), SIZE
946                ),
947            );
948        }
949    }
950
951    if let Err(e) = ctx.sync_and_check() {
952        return TestResult::error(
953            "test_concurrent_atomic_updates",
954            start.elapsed(),
955            format!("Sync failed: {}", e),
956        );
957    }
958
959    TestResult::passed("test_concurrent_atomic_updates", start.elapsed())
960}