Skip to main content

xlog_cuda_tests/categories/
c21_hardware.rs

1//! Category 21: Hardware reliability
2//!
3//! Tests hardware error handling and reliability, including error detection,
4//! recovery after errors, stress operations, memory pressure, and sustained
5//! operation stability.
6
7use crate::harness::{CategoryResult, TestContext, TestResult};
8use std::time::Instant;
9use xlog_core::{ScalarType, Schema};
10
11/// Run all tests in this category.
12pub fn run_all(ctx: &TestContext) -> CategoryResult {
13    let mut results = CategoryResult::new("c21_hardware");
14    let start = Instant::now();
15
16    results.add_result(test_error_detection(ctx));
17    results.add_result(test_recovery_after_error(ctx));
18    results.add_result(test_stress_operations(ctx));
19    results.add_result(test_memory_pressure(ctx));
20    results.add_result(test_sustained_operation(ctx));
21
22    results.set_duration(start.elapsed());
23    results
24}
25
26/// Test 1: Verify errors are properly detected via sync_and_check.
27///
28/// Tests that the sync_and_check mechanism properly detects and reports
29/// GPU errors, and that successful operations don't generate false errors.
30fn test_error_detection(ctx: &TestContext) -> TestResult {
31    let start = Instant::now();
32    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
33
34    // Test 1: Successful operation should not trigger error
35    let data: Vec<u32> = (0..10000u32).collect();
36
37    let buffer = match ctx
38        .provider
39        .create_buffer_from_slice::<u32>(&data, schema.clone())
40    {
41        Ok(buf) => buf,
42        Err(e) => {
43            return TestResult::error(
44                "test_error_detection",
45                start.elapsed(),
46                format!("Buffer creation failed: {}", e),
47            )
48        }
49    };
50
51    let sorted = match ctx.provider.sort(&buffer, &[0]) {
52        Ok(s) => s,
53        Err(e) => {
54            return TestResult::error(
55                "test_error_detection",
56                start.elapsed(),
57                format!("Sort failed: {}", e),
58            )
59        }
60    };
61
62    // sync_and_check should succeed for valid operations
63    if let Err(e) = ctx.sync_and_check() {
64        return TestResult::error(
65            "test_error_detection",
66            start.elapsed(),
67            format!("sync_and_check failed for valid operation: {}", e),
68        );
69    }
70
71    // Verify result is correct
72    let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
73        Ok(d) => d,
74        Err(e) => {
75            return TestResult::error(
76                "test_error_detection",
77                start.elapsed(),
78                format!("Download failed: {}", e),
79            )
80        }
81    };
82
83    for (i, &val) in result.iter().enumerate() {
84        if val != i as u32 {
85            return TestResult::error(
86                "test_error_detection",
87                start.elapsed(),
88                format!(
89                    "Result incorrect at index {}: expected {}, got {}",
90                    i, i, val
91                ),
92            );
93        }
94    }
95
96    // Test 2: Multiple sync_and_check calls should all succeed
97    for i in 0..5 {
98        let check_data: Vec<u32> = (0..1000u32).map(|j| j + i * 1000).collect();
99
100        let check_buffer = match ctx
101            .provider
102            .create_buffer_from_slice::<u32>(&check_data, schema.clone())
103        {
104            Ok(buf) => buf,
105            Err(e) => {
106                return TestResult::error(
107                    "test_error_detection",
108                    start.elapsed(),
109                    format!("Check {} buffer creation failed: {}", i, e),
110                )
111            }
112        };
113
114        let _check_sorted = match ctx.provider.sort(&check_buffer, &[0]) {
115            Ok(s) => s,
116            Err(e) => {
117                return TestResult::error(
118                    "test_error_detection",
119                    start.elapsed(),
120                    format!("Check {} sort failed: {}", i, e),
121                )
122            }
123        };
124
125        if let Err(e) = ctx.sync_and_check() {
126            return TestResult::error(
127                "test_error_detection",
128                start.elapsed(),
129                format!("Check {} sync_and_check failed: {}", i, e),
130            );
131        }
132    }
133
134    // Test 3: Empty buffer operations should not cause errors
135    let empty_data: Vec<u32> = vec![];
136    let empty_buffer = match ctx
137        .provider
138        .create_buffer_from_slice::<u32>(&empty_data, schema.clone())
139    {
140        Ok(buf) => buf,
141        Err(e) => {
142            return TestResult::error(
143                "test_error_detection",
144                start.elapsed(),
145                format!("Empty buffer creation failed: {}", e),
146            )
147        }
148    };
149
150    let _empty_sorted = match ctx.provider.sort(&empty_buffer, &[0]) {
151        Ok(s) => s,
152        Err(e) => {
153            return TestResult::error(
154                "test_error_detection",
155                start.elapsed(),
156                format!("Empty sort failed: {}", e),
157            )
158        }
159    };
160
161    if let Err(e) = ctx.sync_and_check() {
162        return TestResult::error(
163            "test_error_detection",
164            start.elapsed(),
165            format!("sync_and_check failed for empty operation: {}", e),
166        );
167    }
168
169    // Test 4: Single element operations
170    let single_data: Vec<u32> = vec![42];
171    let single_buffer = match ctx
172        .provider
173        .create_buffer_from_slice::<u32>(&single_data, schema.clone())
174    {
175        Ok(buf) => buf,
176        Err(e) => {
177            return TestResult::error(
178                "test_error_detection",
179                start.elapsed(),
180                format!("Single buffer creation failed: {}", e),
181            )
182        }
183    };
184
185    let single_sorted = match ctx.provider.sort(&single_buffer, &[0]) {
186        Ok(s) => s,
187        Err(e) => {
188            return TestResult::error(
189                "test_error_detection",
190                start.elapsed(),
191                format!("Single sort failed: {}", e),
192            )
193        }
194    };
195
196    if let Err(e) = ctx.sync_and_check() {
197        return TestResult::error(
198            "test_error_detection",
199            start.elapsed(),
200            format!("sync_and_check failed for single element: {}", e),
201        );
202    }
203
204    let single_result = match ctx.provider.download_column::<u32>(&single_sorted, 0) {
205        Ok(d) => d,
206        Err(e) => {
207            return TestResult::error(
208                "test_error_detection",
209                start.elapsed(),
210                format!("Single download failed: {}", e),
211            )
212        }
213    };
214
215    if single_result != vec![42] {
216        return TestResult::error(
217            "test_error_detection",
218            start.elapsed(),
219            format!("Single result incorrect: {:?}", single_result),
220        );
221    }
222
223    if let Err(e) = ctx.sync_and_check() {
224        return TestResult::error(
225            "test_error_detection",
226            start.elapsed(),
227            format!("Final sync failed: {}", e),
228        );
229    }
230
231    TestResult::passed("test_error_detection", start.elapsed())
232}
233
234/// Test 2: Test recovery after sync_and_check detects issues.
235///
236/// Tests that the system can recover and continue operating correctly
237/// after an error or edge case is handled.
238fn test_recovery_after_error(ctx: &TestContext) -> TestResult {
239    let start = Instant::now();
240    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
241
242    // Test 1: Recover after handling edge cases
243    // First, do a valid operation
244    let data1: Vec<u32> = (0..5000u32).collect();
245    let buffer1 = match ctx
246        .provider
247        .create_buffer_from_slice::<u32>(&data1, schema.clone())
248    {
249        Ok(buf) => buf,
250        Err(e) => {
251            return TestResult::error(
252                "test_recovery_after_error",
253                start.elapsed(),
254                format!("Buffer1 creation failed: {}", e),
255            )
256        }
257    };
258
259    let sorted1 = match ctx.provider.sort(&buffer1, &[0]) {
260        Ok(s) => s,
261        Err(e) => {
262            return TestResult::error(
263                "test_recovery_after_error",
264                start.elapsed(),
265                format!("Sort1 failed: {}", e),
266            )
267        }
268    };
269
270    if let Err(e) = ctx.sync_and_check() {
271        return TestResult::error(
272            "test_recovery_after_error",
273            start.elapsed(),
274            format!("Sync1 failed: {}", e),
275        );
276    }
277
278    // Try an edge case operation (mismatched mask size - may error or be handled)
279    let edge_data: Vec<u32> = vec![1, 2, 3, 4, 5];
280    let edge_buffer = match ctx
281        .provider
282        .create_buffer_from_slice::<u32>(&edge_data, schema.clone())
283    {
284        Ok(buf) => buf,
285        Err(e) => {
286            return TestResult::error(
287                "test_recovery_after_error",
288                start.elapsed(),
289                format!("Edge buffer creation failed: {}", e),
290            )
291        }
292    };
293
294    // This may succeed with truncation or fail - both are acceptable
295    let wrong_mask: Vec<u8> = vec![1, 0, 1]; // Wrong size
296    let _edge_result = ctx.provider.filter_by_mask(&edge_buffer, &wrong_mask);
297
298    // Sync to clear any pending errors
299    let _ = ctx.sync_and_check();
300
301    // Test 2: Verify system recovered - operations should work again
302    let data2: Vec<u32> = (0..10000u32).collect();
303    let buffer2 = match ctx
304        .provider
305        .create_buffer_from_slice::<u32>(&data2, schema.clone())
306    {
307        Ok(buf) => buf,
308        Err(e) => {
309            return TestResult::error(
310                "test_recovery_after_error",
311                start.elapsed(),
312                format!("Recovery buffer creation failed: {}", e),
313            )
314        }
315    };
316
317    let sorted2 = match ctx.provider.sort(&buffer2, &[0]) {
318        Ok(s) => s,
319        Err(e) => {
320            return TestResult::error(
321                "test_recovery_after_error",
322                start.elapsed(),
323                format!("Recovery sort failed: {}", e),
324            )
325        }
326    };
327
328    if let Err(e) = ctx.sync_and_check() {
329        return TestResult::error(
330            "test_recovery_after_error",
331            start.elapsed(),
332            format!("Recovery sync failed: {}", e),
333        );
334    }
335
336    // Verify recovery was complete
337    let result2 = match ctx.provider.download_column::<u32>(&sorted2, 0) {
338        Ok(d) => d,
339        Err(e) => {
340            return TestResult::error(
341                "test_recovery_after_error",
342                start.elapsed(),
343                format!("Recovery download failed: {}", e),
344            )
345        }
346    };
347
348    for (i, &val) in result2.iter().enumerate() {
349        if val != i as u32 {
350            return TestResult::error(
351                "test_recovery_after_error",
352                start.elapsed(),
353                format!(
354                    "Recovery result incorrect at {}: expected {}, got {}",
355                    i, i, val
356                ),
357            );
358        }
359    }
360
361    // Test 3: Multiple recovery cycles
362    for cycle in 0..3 {
363        // Valid operation
364        let valid_data: Vec<u32> = (0..1000u32).map(|j| j + cycle * 1000).collect();
365        let valid_buffer = match ctx
366            .provider
367            .create_buffer_from_slice::<u32>(&valid_data, schema.clone())
368        {
369            Ok(buf) => buf,
370            Err(e) => {
371                return TestResult::error(
372                    "test_recovery_after_error",
373                    start.elapsed(),
374                    format!("Cycle {} valid buffer failed: {}", cycle, e),
375                )
376            }
377        };
378
379        let valid_sorted = match ctx.provider.sort(&valid_buffer, &[0]) {
380            Ok(s) => s,
381            Err(e) => {
382                return TestResult::error(
383                    "test_recovery_after_error",
384                    start.elapsed(),
385                    format!("Cycle {} valid sort failed: {}", cycle, e),
386                )
387            }
388        };
389
390        if let Err(e) = ctx.sync_and_check() {
391            return TestResult::error(
392                "test_recovery_after_error",
393                start.elapsed(),
394                format!("Cycle {} valid sync failed: {}", cycle, e),
395            );
396        }
397
398        // Verify
399        let valid_result = match ctx.provider.download_column::<u32>(&valid_sorted, 0) {
400            Ok(d) => d,
401            Err(e) => {
402                return TestResult::error(
403                    "test_recovery_after_error",
404                    start.elapsed(),
405                    format!("Cycle {} valid download failed: {}", cycle, e),
406                )
407            }
408        };
409
410        if valid_result.len() != 1000 {
411            return TestResult::error(
412                "test_recovery_after_error",
413                start.elapsed(),
414                format!(
415                    "Cycle {}: expected 1000 rows, got {}",
416                    cycle,
417                    valid_result.len()
418                ),
419            );
420        }
421    }
422
423    // Test 4: Verify previous results still accessible
424    let result1 = match ctx.provider.download_column::<u32>(&sorted1, 0) {
425        Ok(d) => d,
426        Err(e) => {
427            return TestResult::error(
428                "test_recovery_after_error",
429                start.elapsed(),
430                format!("Previous result download failed: {}", e),
431            )
432        }
433    };
434
435    for (i, &val) in result1.iter().enumerate() {
436        if val != i as u32 {
437            return TestResult::error(
438                "test_recovery_after_error",
439                start.elapsed(),
440                format!(
441                    "Previous result corrupted at {}: expected {}, got {}",
442                    i, i, val
443                ),
444            );
445        }
446    }
447
448    if let Err(e) = ctx.sync_and_check() {
449        return TestResult::error(
450            "test_recovery_after_error",
451            start.elapsed(),
452            format!("Final sync failed: {}", e),
453        );
454    }
455
456    TestResult::passed("test_recovery_after_error", start.elapsed())
457}
458
459/// Test 3: Run many operations to stress hardware.
460///
461/// Tests hardware reliability under heavy load by running a large number
462/// of operations in rapid succession.
463fn test_stress_operations(ctx: &TestContext) -> TestResult {
464    let start = Instant::now();
465    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
466
467    const NUM_OPERATIONS: usize = 200;
468    const DATA_SIZE: usize = 10000;
469
470    // Stress test: many sort operations
471    for i in 0..NUM_OPERATIONS {
472        let data: Vec<u32> = (0..DATA_SIZE)
473            .map(|j| ((j * (i + 1) * 1103515245 + 12345) % DATA_SIZE) as u32)
474            .collect();
475
476        let buffer = match ctx
477            .provider
478            .create_buffer_from_slice::<u32>(&data, schema.clone())
479        {
480            Ok(buf) => buf,
481            Err(e) => {
482                return TestResult::error(
483                    "test_stress_operations",
484                    start.elapsed(),
485                    format!("Stress sort {}: buffer creation failed: {}", i, e),
486                )
487            }
488        };
489
490        let sorted = match ctx.provider.sort(&buffer, &[0]) {
491            Ok(s) => s,
492            Err(e) => {
493                return TestResult::error(
494                    "test_stress_operations",
495                    start.elapsed(),
496                    format!("Stress sort {}: sort failed: {}", i, e),
497                )
498            }
499        };
500
501        // Periodic verification (every 20 operations)
502        if i % 20 == 0 {
503            let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
504                Ok(d) => d,
505                Err(e) => {
506                    return TestResult::error(
507                        "test_stress_operations",
508                        start.elapsed(),
509                        format!("Stress sort {}: download failed: {}", i, e),
510                    )
511                }
512            };
513
514            for j in 1..result.len() {
515                if result[j] < result[j - 1] {
516                    return TestResult::error(
517                        "test_stress_operations",
518                        start.elapsed(),
519                        format!("Stress sort {}: incorrect at index {}", i, j),
520                    );
521                }
522            }
523
524            if let Err(e) = ctx.sync_and_check() {
525                return TestResult::error(
526                    "test_stress_operations",
527                    start.elapsed(),
528                    format!("Stress sort {}: sync failed: {}", i, e),
529                );
530            }
531        }
532    }
533
534    // Stress test: many filter operations
535    let filter_data: Vec<u32> = (0..DATA_SIZE as u32).collect();
536    let filter_buffer = match ctx
537        .provider
538        .create_buffer_from_slice::<u32>(&filter_data, schema.clone())
539    {
540        Ok(buf) => buf,
541        Err(e) => {
542            return TestResult::error(
543                "test_stress_operations",
544                start.elapsed(),
545                format!("Filter buffer creation failed: {}", e),
546            )
547        }
548    };
549
550    for i in 0..NUM_OPERATIONS {
551        let selectivity = (i % 10 + 1) * 10; // 10%, 20%, ..., 100%
552        let mask: Vec<u8> = (0..DATA_SIZE)
553            .map(|j| {
554                if (j * 100 / DATA_SIZE) < selectivity {
555                    1
556                } else {
557                    0
558                }
559            })
560            .collect();
561
562        let filtered = match ctx.provider.filter_by_mask(&filter_buffer, &mask) {
563            Ok(f) => f,
564            Err(e) => {
565                return TestResult::error(
566                    "test_stress_operations",
567                    start.elapsed(),
568                    format!("Stress filter {}: failed: {}", i, e),
569                )
570            }
571        };
572
573        // Periodic verification
574        if i % 20 == 0 {
575            let expected_min = (DATA_SIZE * selectivity / 100).saturating_sub(DATA_SIZE / 20);
576            let expected_max = (DATA_SIZE * selectivity / 100) + DATA_SIZE / 20 + 1;
577
578            let count = ctx.device_row_count(&filtered) as usize;
579            if count < expected_min || count > expected_max {
580                return TestResult::error(
581                    "test_stress_operations",
582                    start.elapsed(),
583                    format!(
584                        "Stress filter {}: got {} rows, expected ~{}",
585                        i,
586                        count,
587                        DATA_SIZE * selectivity / 100
588                    ),
589                );
590            }
591
592            if let Err(e) = ctx.sync_and_check() {
593                return TestResult::error(
594                    "test_stress_operations",
595                    start.elapsed(),
596                    format!("Stress filter {}: sync failed: {}", i, e),
597                );
598            }
599        }
600    }
601
602    // Stress test: mixed operations
603    let schema2 = Schema::new(vec![
604        ("key".to_string(), ScalarType::U32),
605        ("val".to_string(), ScalarType::U32),
606    ]);
607
608    for i in 0..NUM_OPERATIONS / 2 {
609        // Dedup
610        let dedup_keys: Vec<u32> = (0..DATA_SIZE).map(|j| (j % 100) as u32).collect();
611        let dedup_vals: Vec<u32> = (0..DATA_SIZE as u32).collect();
612
613        let dedup_buffer = match ctx
614            .provider
615            .create_buffer_from_u32_columns(&[&dedup_keys, &dedup_vals], schema2.clone())
616        {
617            Ok(buf) => buf,
618            Err(e) => {
619                return TestResult::error(
620                    "test_stress_operations",
621                    start.elapsed(),
622                    format!("Stress dedup {}: buffer creation failed: {}", i, e),
623                )
624            }
625        };
626
627        let deduped = match ctx.provider.dedup(&dedup_buffer, &[0]) {
628            Ok(d) => d,
629            Err(e) => {
630                return TestResult::error(
631                    "test_stress_operations",
632                    start.elapsed(),
633                    format!("Stress dedup {}: failed: {}", i, e),
634                )
635            }
636        };
637
638        if ctx.device_row_count(&deduped) != 100 {
639            return TestResult::error(
640                "test_stress_operations",
641                start.elapsed(),
642                format!(
643                    "Stress dedup {}: expected 100, got {}",
644                    i,
645                    ctx.device_row_count(&deduped)
646                ),
647            );
648        }
649
650        if i % 20 == 0 {
651            if let Err(e) = ctx.sync_and_check() {
652                return TestResult::error(
653                    "test_stress_operations",
654                    start.elapsed(),
655                    format!("Stress dedup {}: sync failed: {}", i, e),
656                );
657            }
658        }
659    }
660
661    // Final comprehensive sync
662    if let Err(e) = ctx.sync_and_check() {
663        return TestResult::error(
664            "test_stress_operations",
665            start.elapsed(),
666            format!("Final stress sync failed: {}", e),
667        );
668    }
669
670    TestResult::passed("test_stress_operations", start.elapsed())
671}
672
673/// Test 4: Test operations at high memory usage.
674///
675/// Tests system stability when operating near memory limits.
676fn test_memory_pressure(ctx: &TestContext) -> TestResult {
677    let start = Instant::now();
678    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
679
680    let budget = ctx.memory_budget();
681
682    // Calculate size that uses significant portion of budget
683    // Each u32 is 4 bytes, aim for ~20% of budget per buffer
684    let buffer_size = (budget / 20 / 4).min(5_000_000);
685
686    if buffer_size < 1000 {
687        return TestResult::skipped(
688            "test_memory_pressure",
689            "Memory budget too small for pressure test",
690        );
691    }
692
693    // Create multiple buffers to approach memory limit
694    let mut buffers = Vec::new();
695    let mut successful_allocations = 0;
696
697    for i in 0..5 {
698        let data: Vec<u32> = (0..buffer_size)
699            .map(|j| ((j + i * buffer_size) % buffer_size) as u32)
700            .collect();
701
702        match ctx
703            .provider
704            .create_buffer_from_slice::<u32>(&data, schema.clone())
705        {
706            Ok(buf) => {
707                buffers.push(buf);
708                successful_allocations += 1;
709            }
710            Err(_) => {
711                // Memory limit reached - this is acceptable
712                break;
713            }
714        }
715    }
716
717    // We should have been able to allocate at least one buffer
718    if successful_allocations == 0 {
719        return TestResult::error(
720            "test_memory_pressure",
721            start.elapsed(),
722            "Could not allocate any buffers".to_string(),
723        );
724    }
725
726    // Perform operations on allocated buffers
727    for (i, buffer) in buffers.iter().enumerate() {
728        let sorted = match ctx.provider.sort(buffer, &[0]) {
729            Ok(s) => s,
730            Err(e) => {
731                return TestResult::error(
732                    "test_memory_pressure",
733                    start.elapsed(),
734                    format!("Buffer {}: sort failed under pressure: {}", i, e),
735                )
736            }
737        };
738
739        // Verify correctness
740        let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
741            Ok(d) => d,
742            Err(e) => {
743                return TestResult::error(
744                    "test_memory_pressure",
745                    start.elapsed(),
746                    format!("Buffer {}: download failed under pressure: {}", i, e),
747                )
748            }
749        };
750
751        // Spot check sorted
752        for j in (1..result.len()).step_by(10000) {
753            if result[j] < result[j - 1] {
754                return TestResult::error(
755                    "test_memory_pressure",
756                    start.elapsed(),
757                    format!("Buffer {}: sort incorrect at index {}", i, j),
758                );
759            }
760        }
761    }
762
763    if let Err(e) = ctx.sync_and_check() {
764        return TestResult::error(
765            "test_memory_pressure",
766            start.elapsed(),
767            format!("Sync under pressure failed: {}", e),
768        );
769    }
770
771    // Test filter under pressure
772    for (i, buffer) in buffers.iter().enumerate() {
773        let mask: Vec<u8> = (0..buffer_size)
774            .map(|j| if j % 2 == 0 { 1 } else { 0 })
775            .collect();
776
777        let filtered = match ctx.provider.filter_by_mask(buffer, &mask) {
778            Ok(f) => f,
779            Err(e) => {
780                return TestResult::error(
781                    "test_memory_pressure",
782                    start.elapsed(),
783                    format!("Buffer {}: filter failed under pressure: {}", i, e),
784                )
785            }
786        };
787
788        let expected = (buffer_size + 1) / 2;
789        if ctx.device_row_count(&filtered) != expected as u64 {
790            return TestResult::error(
791                "test_memory_pressure",
792                start.elapsed(),
793                format!(
794                    "Buffer {}: filter expected {} rows, got {}",
795                    i,
796                    expected,
797                    ctx.device_row_count(&filtered)
798                ),
799            );
800        }
801    }
802
803    // Release buffers and verify operations still work
804    drop(buffers);
805
806    if let Err(e) = ctx.sync_and_check() {
807        return TestResult::error(
808            "test_memory_pressure",
809            start.elapsed(),
810            format!("Sync after release failed: {}", e),
811        );
812    }
813
814    // New operations should work after release
815    let fresh_data: Vec<u32> = (0..10000u32).collect();
816    let fresh_buffer = match ctx
817        .provider
818        .create_buffer_from_slice::<u32>(&fresh_data, schema.clone())
819    {
820        Ok(buf) => buf,
821        Err(e) => {
822            return TestResult::error(
823                "test_memory_pressure",
824                start.elapsed(),
825                format!("Fresh buffer after pressure failed: {}", e),
826            )
827        }
828    };
829
830    let fresh_sorted = match ctx.provider.sort(&fresh_buffer, &[0]) {
831        Ok(s) => s,
832        Err(e) => {
833            return TestResult::error(
834                "test_memory_pressure",
835                start.elapsed(),
836                format!("Fresh sort after pressure failed: {}", e),
837            )
838        }
839    };
840
841    if ctx.device_row_count(&fresh_sorted) != 10000 {
842        return TestResult::error(
843            "test_memory_pressure",
844            start.elapsed(),
845            format!(
846                "Fresh result after pressure: expected 10000, got {}",
847                ctx.device_row_count(&fresh_sorted)
848            ),
849        );
850    }
851
852    if let Err(e) = ctx.sync_and_check() {
853        return TestResult::error(
854            "test_memory_pressure",
855            start.elapsed(),
856            format!("Final sync failed: {}", e),
857        );
858    }
859
860    TestResult::passed("test_memory_pressure", start.elapsed())
861}
862
863/// Test 5: Long-running operations verify stability.
864///
865/// Tests hardware stability by running operations over an extended period
866/// to detect any degradation or intermittent failures.
867fn test_sustained_operation(ctx: &TestContext) -> TestResult {
868    let start = Instant::now();
869    let schema = Schema::new(vec![("val".to_string(), ScalarType::U32)]);
870
871    const SIZE: usize = 50000;
872    const DURATION_SECONDS: u64 = 5; // Run for at least 5 seconds
873
874    let deadline = start + std::time::Duration::from_secs(DURATION_SECONDS);
875    let mut operation_count = 0;
876    let mut error_count = 0;
877
878    // Create reference data
879    let reference_data: Vec<u32> = (0..SIZE)
880        .map(|i| ((i * 1103515245 + 12345) % SIZE) as u32)
881        .collect();
882
883    let mut expected_sorted = reference_data.clone();
884    expected_sorted.sort();
885
886    // Run operations until deadline
887    while Instant::now() < deadline {
888        let data: Vec<u32> = reference_data.clone();
889
890        let buffer = match ctx
891            .provider
892            .create_buffer_from_slice::<u32>(&data, schema.clone())
893        {
894            Ok(buf) => buf,
895            Err(_) => {
896                error_count += 1;
897                continue;
898            }
899        };
900
901        let sorted = match ctx.provider.sort(&buffer, &[0]) {
902            Ok(s) => s,
903            Err(_) => {
904                error_count += 1;
905                continue;
906            }
907        };
908
909        // Periodic full verification
910        if operation_count % 10 == 0 {
911            let result = match ctx.provider.download_column::<u32>(&sorted, 0) {
912                Ok(d) => d,
913                Err(_) => {
914                    error_count += 1;
915                    continue;
916                }
917            };
918
919            if result != expected_sorted {
920                return TestResult::error(
921                    "test_sustained_operation",
922                    start.elapsed(),
923                    format!(
924                        "Operation {} produced incorrect result after {:?}",
925                        operation_count,
926                        start.elapsed()
927                    ),
928                );
929            }
930
931            if let Err(e) = ctx.sync_and_check() {
932                return TestResult::error(
933                    "test_sustained_operation",
934                    start.elapsed(),
935                    format!("Sync failed at operation {}: {}", operation_count, e),
936                );
937            }
938        }
939
940        operation_count += 1;
941    }
942
943    // Should have completed many operations
944    if operation_count < 10 {
945        return TestResult::error(
946            "test_sustained_operation",
947            start.elapsed(),
948            format!(
949                "Too few operations completed: {} (expected >= 10)",
950                operation_count
951            ),
952        );
953    }
954
955    // Error rate should be very low
956    if error_count > operation_count / 100 {
957        return TestResult::error(
958            "test_sustained_operation",
959            start.elapsed(),
960            format!(
961                "High error rate: {}/{} operations failed",
962                error_count, operation_count
963            ),
964        );
965    }
966
967    // Run sustained filter operations
968    let filter_data: Vec<u32> = (0..SIZE as u32).collect();
969    let filter_buffer = match ctx
970        .provider
971        .create_buffer_from_slice::<u32>(&filter_data, schema.clone())
972    {
973        Ok(buf) => buf,
974        Err(e) => {
975            return TestResult::error(
976                "test_sustained_operation",
977                start.elapsed(),
978                format!("Sustained filter buffer failed: {}", e),
979            )
980        }
981    };
982
983    let deadline2 = Instant::now() + std::time::Duration::from_secs(DURATION_SECONDS / 2);
984    let mut filter_count = 0;
985
986    while Instant::now() < deadline2 {
987        let selectivity = (filter_count % 10 + 1) * 10;
988        let mask: Vec<u8> = (0..SIZE)
989            .map(|j| if (j * 100 / SIZE) < selectivity { 1 } else { 0 })
990            .collect();
991
992        let filtered = match ctx.provider.filter_by_mask(&filter_buffer, &mask) {
993            Ok(f) => f,
994            Err(e) => {
995                return TestResult::error(
996                    "test_sustained_operation",
997                    start.elapsed(),
998                    format!("Sustained filter {} failed: {}", filter_count, e),
999                )
1000            }
1001        };
1002
1003        // Verify row count is reasonable
1004        let expected_min = (SIZE * selectivity / 100).saturating_sub(SIZE / 10);
1005        let expected_max = (SIZE * selectivity / 100) + SIZE / 10 + 1;
1006
1007        let count = ctx.device_row_count(&filtered) as usize;
1008        if count < expected_min || count > expected_max {
1009            return TestResult::error(
1010                "test_sustained_operation",
1011                start.elapsed(),
1012                format!(
1013                    "Sustained filter {}: got {} rows, expected ~{}",
1014                    filter_count,
1015                    count,
1016                    SIZE * selectivity / 100
1017                ),
1018            );
1019        }
1020
1021        filter_count += 1;
1022    }
1023
1024    // Run sustained join operations
1025    let schema2 = Schema::new(vec![
1026        ("key".to_string(), ScalarType::U32),
1027        ("val".to_string(), ScalarType::U32),
1028    ]);
1029
1030    let left_keys: Vec<u32> = (0..1000u32).collect();
1031    let left_vals: Vec<u32> = left_keys.iter().map(|&k| k * 2).collect();
1032
1033    let right_keys: Vec<u32> = (0..500u32).map(|i| i * 2).collect();
1034    let right_vals: Vec<u32> = right_keys.iter().map(|&k| k * 3).collect();
1035
1036    let left_buffer = match ctx
1037        .provider
1038        .create_buffer_from_u32_columns(&[&left_keys, &left_vals], schema2.clone())
1039    {
1040        Ok(buf) => buf,
1041        Err(e) => {
1042            return TestResult::error(
1043                "test_sustained_operation",
1044                start.elapsed(),
1045                format!("Sustained join left buffer failed: {}", e),
1046            )
1047        }
1048    };
1049
1050    let right_buffer = match ctx
1051        .provider
1052        .create_buffer_from_u32_columns(&[&right_keys, &right_vals], schema2)
1053    {
1054        Ok(buf) => buf,
1055        Err(e) => {
1056            return TestResult::error(
1057                "test_sustained_operation",
1058                start.elapsed(),
1059                format!("Sustained join right buffer failed: {}", e),
1060            )
1061        }
1062    };
1063
1064    let deadline3 = Instant::now() + std::time::Duration::from_secs(DURATION_SECONDS / 2);
1065    let mut join_count = 0;
1066
1067    while Instant::now() < deadline3 {
1068        let joined = match ctx
1069            .provider
1070            .hash_join(&left_buffer, &right_buffer, &[0], &[0])
1071        {
1072            Ok(j) => j,
1073            Err(e) => {
1074                return TestResult::error(
1075                    "test_sustained_operation",
1076                    start.elapsed(),
1077                    format!("Sustained join {} failed: {}", join_count, e),
1078                )
1079            }
1080        };
1081
1082        if ctx.device_row_count(&joined) != 500 {
1083            return TestResult::error(
1084                "test_sustained_operation",
1085                start.elapsed(),
1086                format!(
1087                    "Sustained join {}: expected 500 rows, got {}",
1088                    join_count,
1089                    ctx.device_row_count(&joined)
1090                ),
1091            );
1092        }
1093
1094        join_count += 1;
1095    }
1096
1097    // Final verification
1098    if let Err(e) = ctx.sync_and_check() {
1099        return TestResult::error(
1100            "test_sustained_operation",
1101            start.elapsed(),
1102            format!("Final sustained sync failed: {}", e),
1103        );
1104    }
1105
1106    TestResult::passed("test_sustained_operation", start.elapsed())
1107}