Skip to main content

xlog_cuda_tests/harness/
provider.rs

1//! Test context and provider setup for CUDA certification tests.
2
3use cudarc::driver::sys;
4use cudarc::driver::{DevicePtr, DevicePtrMut, DeviceRepr};
5use fs2::FileExt;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex, OnceLock};
8use std::{fs::OpenOptions, path::Path};
9use xlog_core::{MemoryBudget, Result, XlogError};
10use xlog_cuda::device_runtime::{
11    AsyncCudaResource, DeviceMemoryResource, GlobalDeviceBudget, LogRecord, LoggingResource,
12    LoggingSink, SinkError, StreamPool, XlogDeviceRuntime,
13};
14use xlog_cuda::{CudaBuffer, CudaDevice, CudaKernelProvider, GpuMemoryManager};
15
16#[derive(Default)]
17struct TransferCounters {
18    htod_bytes: AtomicU64,
19    dtoh_bytes: AtomicU64,
20}
21
22impl TransferCounters {
23    fn reset(&self) {
24        self.htod_bytes.store(0, Ordering::SeqCst);
25        self.dtoh_bytes.store(0, Ordering::SeqCst);
26    }
27
28    fn add_htod(&self, bytes: u64) {
29        self.htod_bytes.fetch_add(bytes, Ordering::SeqCst);
30    }
31
32    fn add_dtoh(&self, bytes: u64) {
33        self.dtoh_bytes.fetch_add(bytes, Ordering::SeqCst);
34    }
35
36    fn snapshot(&self) -> (u64, u64) {
37        (
38            self.htod_bytes.load(Ordering::SeqCst),
39            self.dtoh_bytes.load(Ordering::SeqCst),
40        )
41    }
42}
43
44const GPU_TEST_LOCK_PATH: &str = "/tmp/xlog-gpu-tests.lock";
45
46fn gpu_test_lock() -> std::sync::MutexGuard<'static, ()> {
47    static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
48    LOCK.get_or_init(|| Mutex::new(()))
49        .lock()
50        .unwrap_or_else(|e| e.into_inner())
51}
52
53fn gpu_test_lock_file() -> Result<std::fs::File> {
54    let path = Path::new(GPU_TEST_LOCK_PATH);
55    let file = OpenOptions::new()
56        .create(true)
57        .truncate(false)
58        .read(true)
59        .write(true)
60        .open(path)
61        .map_err(|e| XlogError::Kernel(format!("Failed to open GPU test lock file: {}", e)))?;
62    file.lock_exclusive()
63        .map_err(|e| XlogError::Kernel(format!("Failed to acquire GPU test lock file: {}", e)))?;
64    Ok(file)
65}
66
67/// Sink that drops every log record. Used by the runtime-backed
68/// `TestContext` mode: the cert harness has no need for a live
69/// log stream during kernel testing, but `LoggingResource`
70/// requires a sink. Errors from the sink would propagate as
71/// allocation failures, which would mask kernel issues — drop
72/// silently here.
73struct DiscardSink;
74
75impl LoggingSink for DiscardSink {
76    fn emit(&self, _record: LogRecord) -> std::result::Result<(), SinkError> {
77        Ok(())
78    }
79}
80
81/// Backend the test context uses for allocation + recorded-launch
82/// dispatch. Selectable via `XLOG_USE_DEVICE_RUNTIME=1` at process
83/// start; default is `Legacy` so the existing cert-suite evidence
84/// is unperturbed.
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86enum TestRuntimeBackend {
87    /// Legacy cudarc-backed `GpuMemoryManager::new`. The
88    /// env-var dispatchers in `provider::sort` / `filter_by_mask`
89    /// / `hash_join_v2` / etc. fall through to the legacy
90    /// kernels because `provider.memory.runtime()` is `None`.
91    Legacy,
92    /// `XlogDeviceRuntime` with the production decorator stack
93    /// (`AsyncCudaResource → LoggingResource → GlobalDeviceBudget`).
94    /// `XLOG_USE_RECORDED_OPS=1` (or per-operator
95    /// `XLOG_USE_RECORDED_*=1`) routes operator calls through
96    /// the recorded path so cert categories that use
97    /// `provider.sort` / `provider.filter_by_mask` etc. exercise
98    /// the prepare/finish stream-dependency manager landed in
99    /// PR #72.
100    DeviceRuntime,
101}
102
103impl TestRuntimeBackend {
104    /// Read `XLOG_USE_DEVICE_RUNTIME` at context construction;
105    /// any non-empty truthy value selects the runtime stack.
106    /// Cached per-process via `OnceLock` so every `TestContext`
107    /// in a single test binary agrees on the backend.
108    fn from_env() -> Self {
109        static CACHED: OnceLock<TestRuntimeBackend> = OnceLock::new();
110        *CACHED.get_or_init(|| match std::env::var("XLOG_USE_DEVICE_RUNTIME") {
111            Ok(v) if matches!(v.as_str(), "1" | "true" | "TRUE" | "True") => {
112                TestRuntimeBackend::DeviceRuntime
113            }
114            _ => TestRuntimeBackend::Legacy,
115        })
116    }
117}
118
119/// Test context providing CUDA resources for certification tests.
120pub struct TestContext {
121    // Hold a process-wide mutex for the lifetime of the context so GPU tests run serially.
122    // This prevents timing-based certification tests from producing false positives under
123    // parallel `cargo test` execution.
124    _lock: std::sync::MutexGuard<'static, ()>,
125    _file_lock: std::fs::File,
126    pub provider: CudaKernelProvider,
127    pub device: Arc<CudaDevice>,
128    pub memory: Arc<GpuMemoryManager>,
129    /// Backend selected for this context — informational, used
130    /// by tests that want to assert which stack is active.
131    backend: TestRuntimeBackend,
132    /// Held so the `XlogDeviceRuntime`'s stream pool outlives
133    /// the context when the runtime backend is active.
134    /// `None` in legacy mode.
135    _runtime: Option<Arc<XlogDeviceRuntime>>,
136    transfer: Arc<TransferCounters>,
137}
138
139/// Release-gate hardening: when `XLOG_REQUIRE_CUDA=1`, CUDA/test-context
140/// initialization failures must fail loudly instead of returning `Err`.
141/// Tests use `let Ok(ctx) = ... else { return; }` skip patterns that pass
142/// vacuously without a GPU; `scripts/validate_release_gpu.sh` exports this
143/// variable so a CPU-only machine can never satisfy the certification gate.
144pub fn enforce_cuda_required(context: &str, err: &XlogError) {
145    if std::env::var("XLOG_REQUIRE_CUDA").as_deref() == Ok("1") {
146        panic!("XLOG_REQUIRE_CUDA=1 but CUDA is unavailable ({context}): {err}");
147    }
148}
149
150impl TestContext {
151    /// Create test context with specific memory budget in bytes.
152    /// Backend is chosen by `XLOG_USE_DEVICE_RUNTIME` (default
153    /// legacy).
154    ///
155    /// When `XLOG_REQUIRE_CUDA=1`, any initialization failure panics via
156    /// [`enforce_cuda_required`] so callers' skip-on-error paths cannot turn
157    /// a missing GPU into a vacuous pass.
158    pub fn with_budget(budget_bytes: usize) -> Result<Self> {
159        let result = Self::with_budget_inner(budget_bytes);
160        if let Err(err) = &result {
161            enforce_cuda_required("TestContext::with_budget", err);
162        }
163        result
164    }
165
166    fn with_budget_inner(budget_bytes: usize) -> Result<Self> {
167        let lock = gpu_test_lock();
168        let file_lock = gpu_test_lock_file()?;
169
170        // cudarc may panic on driver init failures in restricted containers; treat as a normal error.
171        let device_count = std::panic::catch_unwind(CudaDevice::count)
172            .map_err(|_| {
173                XlogError::Kernel(
174                    "Failed to get device count: cudarc panicked during driver initialization"
175                        .to_string(),
176                )
177            })?
178            .map_err(|e| XlogError::Kernel(format!("Failed to get device count: {}", e)))?;
179
180        if device_count == 0 {
181            return Err(XlogError::Kernel("No CUDA devices available".to_string()));
182        }
183
184        let device = Arc::new(CudaDevice::new(0)?);
185        let budget = MemoryBudget::with_limit(budget_bytes as u64);
186        let backend = TestRuntimeBackend::from_env();
187        let transfer = Arc::new(TransferCounters::default());
188
189        let (memory, provider, runtime_arc) = match backend {
190            TestRuntimeBackend::Legacy => {
191                let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
192                let provider = CudaKernelProvider::new(device.clone(), memory.clone())?;
193                (memory, provider, None)
194            }
195            TestRuntimeBackend::DeviceRuntime => {
196                // Build the same decorator stack production callers
197                // (and the integration suite under
198                // XLOG_USE_DEVICE_RUNTIME=1) use, so cert
199                // categories that touch `provider.sort` /
200                // `filter_by_mask` etc. exercise the recorded
201                // launch discipline once `XLOG_USE_RECORDED_*`
202                // is also set.
203                let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
204                let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
205                    AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
206                );
207                let logging: Box<dyn DeviceMemoryResource + Send + Sync> =
208                    Box::new(LoggingResource::new(
209                        async_resource,
210                        Arc::new(DiscardSink) as Arc<dyn LoggingSink>,
211                    ));
212                let budget_resource: Box<dyn DeviceMemoryResource + Send + Sync> =
213                    Box::new(GlobalDeviceBudget::new(logging, budget_bytes));
214                let runtime = Arc::new(XlogDeviceRuntime::with_resource(
215                    Arc::clone(&device),
216                    0,
217                    Arc::clone(&pool),
218                    budget_resource,
219                ));
220                let memory = Arc::new(GpuMemoryManager::with_runtime(
221                    Arc::clone(&device),
222                    budget,
223                    Arc::clone(&runtime),
224                ));
225                let provider =
226                    CudaKernelProvider::with_runtime(Arc::clone(&device), Arc::clone(&memory))?;
227                (memory, provider, Some(runtime))
228            }
229        };
230
231        Ok(Self {
232            _lock: lock,
233            _file_lock: file_lock,
234            provider,
235            device,
236            memory,
237            backend,
238            _runtime: runtime_arc,
239            transfer,
240        })
241    }
242
243    /// Return whether this context was built against the
244    /// runtime-backed allocator stack (vs the legacy cudarc-only
245    /// stack). Cert categories can use this to gate behavior or
246    /// diagnostics.
247    pub fn uses_device_runtime(&self) -> bool {
248        self.backend == TestRuntimeBackend::DeviceRuntime
249    }
250
251    /// Drain pending async frees on the runtime allocator, if
252    /// any. No-op for the legacy backend. Cert harnesses call
253    /// this between categories so the `GlobalDeviceBudget`
254    /// reservation bookkeeping releases bytes that have been
255    /// freed via `cuMemFreeAsync` but whose owning stream has
256    /// not yet been synchronized — without this, a long
257    /// sequence of small allocate-then-drop tests piles up
258    /// pending frees and exhausts the reservation pool even
259    /// though the GPU has plenty of free memory.
260    pub fn reap_pending(&self) {
261        if let Some(rt) = &self._runtime {
262            // Best-effort: a transient driver error during reap
263            // is recoverable on the next iteration, and the cert
264            // suite already runs categories sequentially under
265            // the harness lock — propagating an error here would
266            // tear down the entire suite for what is a
267            // bookkeeping issue, not a kernel correctness one.
268            let _ = rt.reap_pending();
269        }
270    }
271
272    /// Create test context with default 1GB memory budget.
273    pub fn new() -> Result<Self> {
274        Self::with_budget(1024 * 1024 * 1024)
275    }
276
277    /// Create test context with large 4GB memory budget for stress tests.
278    pub fn large() -> Result<Self> {
279        Self::with_budget(4 * 1024 * 1024 * 1024)
280    }
281
282    /// Force device synchronization and check for async errors.
283    pub fn sync_and_check(&self) -> Result<()> {
284        self.device
285            .inner()
286            .synchronize()
287            .map_err(|e| XlogError::Kernel(format!("Sync failed: {}", e)))?;
288        Ok(())
289    }
290
291    /// Get current memory usage in bytes.
292    pub fn memory_used(&self) -> usize {
293        self.memory.allocated_bytes() as usize
294    }
295
296    /// Get memory budget in bytes.
297    pub fn memory_budget(&self) -> usize {
298        self.memory.budget().device_bytes as usize
299    }
300
301    /// Reset host/device transfer counters (in bytes).
302    pub fn reset_transfer_counters(&self) {
303        self.transfer.reset();
304    }
305
306    /// Snapshot transfer counters (htod_bytes, dtoh_bytes).
307    pub fn transfer_counters(&self) -> (u64, u64) {
308        self.transfer.snapshot()
309    }
310
311    /// Track a synchronous host-to-device copy.
312    pub fn htod_sync_copy_into<T: DeviceRepr, Dst: DevicePtrMut<T>>(
313        &self,
314        src: &[T],
315        dst: &mut Dst,
316    ) -> Result<()> {
317        let bytes = std::mem::size_of::<T>()
318            .checked_mul(src.len())
319            .ok_or_else(|| XlogError::Kernel("htod byte count overflow".to_string()))?;
320        self.transfer.add_htod(bytes as u64);
321        self.device
322            .inner()
323            .htod_sync_copy_into(src, dst)
324            .map_err(|e| XlogError::Kernel(format!("Failed htod copy: {}", e)))?;
325        Ok(())
326    }
327
328    /// Track a synchronous device-to-host copy returning a Vec.
329    pub fn dtoh_sync_copy<T: DeviceRepr, Src: DevicePtr<T>>(&self, src: &Src) -> Result<Vec<T>> {
330        let bytes = std::mem::size_of::<T>()
331            .checked_mul(src.len())
332            .ok_or_else(|| XlogError::Kernel("dtoh byte count overflow".to_string()))?;
333        self.transfer.add_dtoh(bytes as u64);
334        self.device
335            .inner()
336            .dtoh_sync_copy(src)
337            .map_err(|e| XlogError::Kernel(format!("Failed dtoh copy: {}", e)))
338    }
339
340    /// Track a synchronous device-to-host copy into a slice.
341    pub fn dtoh_sync_copy_into<T: DeviceRepr, Src: DevicePtr<T>>(
342        &self,
343        src: &Src,
344        dst: &mut [T],
345    ) -> Result<()> {
346        let bytes = std::mem::size_of::<T>()
347            .checked_mul(dst.len())
348            .ok_or_else(|| XlogError::Kernel("dtoh byte count overflow".to_string()))?;
349        self.transfer.add_dtoh(bytes as u64);
350        self.device
351            .inner()
352            .dtoh_sync_copy_into(src, dst)
353            .map_err(|e| XlogError::Kernel(format!("Failed dtoh copy: {}", e)))?;
354        Ok(())
355    }
356
357    /// Measure GPU time (in milliseconds) for work enqueued on the device stream.
358    pub fn measure_gpu_ms<F>(&self, f: F) -> Result<f32>
359    where
360        F: FnOnce() -> Result<()>,
361    {
362        let stream = self.device.inner().stream();
363        let start = stream
364            .context()
365            .new_event(Some(sys::CUevent_flags::CU_EVENT_DEFAULT))
366            .map_err(|e| XlogError::Kernel(format!("Failed to create start event: {}", e)))?;
367        let end = stream
368            .context()
369            .new_event(Some(sys::CUevent_flags::CU_EVENT_DEFAULT))
370            .map_err(|e| XlogError::Kernel(format!("Failed to create end event: {}", e)))?;
371
372        start
373            .record(stream)
374            .map_err(|e| XlogError::Kernel(format!("Failed to record start event: {}", e)))?;
375        f()?;
376        end.record(stream)
377            .map_err(|e| XlogError::Kernel(format!("Failed to record end event: {}", e)))?;
378        self.sync_and_check()?;
379        let elapsed = start.elapsed_ms(&end).map_err(|e| {
380            XlogError::Kernel(format!("Failed to measure event elapsed time: {}", e))
381        })?;
382        Ok(elapsed)
383    }
384
385    /// Check if multi-GPU is available.
386    pub fn multi_gpu_available(&self) -> bool {
387        match std::panic::catch_unwind(CudaDevice::count) {
388            Ok(Ok(n)) => n > 1,
389            _ => false,
390        }
391    }
392
393    /// Get compute capability of current device.
394    pub fn compute_capability(&self) -> Result<(u32, u32)> {
395        use cudarc::driver::sys::CUdevice_attribute;
396
397        let major = self
398            .device
399            .inner()
400            .attribute(CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MAJOR)
401            .map_err(|e| {
402                XlogError::Kernel(format!("Failed to query compute capability major: {}", e))
403            })?;
404        let minor = self
405            .device
406            .inner()
407            .attribute(CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MINOR)
408            .map_err(|e| {
409                XlogError::Kernel(format!("Failed to query compute capability minor: {}", e))
410            })?;
411
412        let major_u32: u32 = major.try_into().map_err(|_| {
413            XlogError::Kernel(format!(
414                "Failed to convert compute capability major {} to u32",
415                major
416            ))
417        })?;
418        let minor_u32: u32 = minor.try_into().map_err(|_| {
419            XlogError::Kernel(format!(
420                "Failed to convert compute capability minor {} to u32",
421                minor
422            ))
423        })?;
424
425        Ok((major_u32, minor_u32))
426    }
427
428    /// Read device-resident row count for a buffer.
429    ///
430    /// Panics if the count cannot be read; certification tests treat this as fatal.
431    pub fn device_row_count(&self, buffer: &CudaBuffer) -> u64 {
432        let mut host_rows = [0u32];
433        self.dtoh_sync_copy_into(buffer.num_rows_device(), &mut host_rows)
434            .unwrap_or_else(|e| panic!("Failed to read device row count: {}", e));
435        host_rows[0] as u64
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use std::process::Command;
443
444    #[test]
445    fn test_gpu_test_lock_file_is_exclusive() {
446        let ctx = match TestContext::new() {
447            Ok(ctx) => ctx,
448            Err(_) => {
449                eprintln!("Skipping test: no CUDA device available");
450                return;
451            }
452        };
453
454        let status = Command::new("flock")
455            .arg("-n")
456            .arg(GPU_TEST_LOCK_PATH)
457            .arg("-c")
458            .arg("true")
459            .status();
460
461        match status {
462            Ok(status) => assert!(
463                !status.success(),
464                "GPU test lock file should be held exclusively while TestContext is alive"
465            ),
466            Err(_) => {
467                eprintln!("Skipping test: flock command not available");
468            }
469        }
470
471        drop(ctx);
472    }
473}
474
475/// Macro for tests requiring CUDA device - panics if not available.
476#[macro_export]
477macro_rules! gpu_test {
478    ($name:ident, $body:expr) => {
479        #[test]
480        fn $name() {
481            let ctx =
482                $crate::harness::TestContext::new().expect("CUDA device required for this test");
483            $body(&ctx);
484        }
485    };
486}
487
488/// Macro for tests requiring CUDA device - skips if not available.
489#[macro_export]
490macro_rules! gpu_test_skip {
491    ($name:ident, $body:expr) => {
492        #[test]
493        fn $name() {
494            let ctx = match $crate::harness::TestContext::new() {
495                Ok(ctx) => ctx,
496                Err(_) => {
497                    eprintln!("Skipping {}: no CUDA device", stringify!($name));
498                    return;
499                }
500            };
501            $body(&ctx);
502        }
503    };
504}