1use cudarc::driver::sys;
4use cudarc::driver::{DevicePtr, DevicePtrMut, DeviceRepr};
5use fs2::FileExt;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex, OnceLock};
8use std::{fs::OpenOptions, path::Path};
9use xlog_core::{MemoryBudget, Result, XlogError};
10use xlog_cuda::device_runtime::{
11 AsyncCudaResource, DeviceMemoryResource, GlobalDeviceBudget, LogRecord, LoggingResource,
12 LoggingSink, SinkError, StreamPool, XlogDeviceRuntime,
13};
14use xlog_cuda::{CudaBuffer, CudaDevice, CudaKernelProvider, GpuMemoryManager};
15
16#[derive(Default)]
17struct TransferCounters {
18 htod_bytes: AtomicU64,
19 dtoh_bytes: AtomicU64,
20}
21
22impl TransferCounters {
23 fn reset(&self) {
24 self.htod_bytes.store(0, Ordering::SeqCst);
25 self.dtoh_bytes.store(0, Ordering::SeqCst);
26 }
27
28 fn add_htod(&self, bytes: u64) {
29 self.htod_bytes.fetch_add(bytes, Ordering::SeqCst);
30 }
31
32 fn add_dtoh(&self, bytes: u64) {
33 self.dtoh_bytes.fetch_add(bytes, Ordering::SeqCst);
34 }
35
36 fn snapshot(&self) -> (u64, u64) {
37 (
38 self.htod_bytes.load(Ordering::SeqCst),
39 self.dtoh_bytes.load(Ordering::SeqCst),
40 )
41 }
42}
43
44const GPU_TEST_LOCK_PATH: &str = "/tmp/xlog-gpu-tests.lock";
45
46fn gpu_test_lock() -> std::sync::MutexGuard<'static, ()> {
47 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
48 LOCK.get_or_init(|| Mutex::new(()))
49 .lock()
50 .unwrap_or_else(|e| e.into_inner())
51}
52
53fn gpu_test_lock_file() -> Result<std::fs::File> {
54 let path = Path::new(GPU_TEST_LOCK_PATH);
55 let file = OpenOptions::new()
56 .create(true)
57 .truncate(false)
58 .read(true)
59 .write(true)
60 .open(path)
61 .map_err(|e| XlogError::Kernel(format!("Failed to open GPU test lock file: {}", e)))?;
62 file.lock_exclusive()
63 .map_err(|e| XlogError::Kernel(format!("Failed to acquire GPU test lock file: {}", e)))?;
64 Ok(file)
65}
66
67struct DiscardSink;
74
75impl LoggingSink for DiscardSink {
76 fn emit(&self, _record: LogRecord) -> std::result::Result<(), SinkError> {
77 Ok(())
78 }
79}
80
81#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86enum TestRuntimeBackend {
87 Legacy,
92 DeviceRuntime,
101}
102
103impl TestRuntimeBackend {
104 fn from_env() -> Self {
109 static CACHED: OnceLock<TestRuntimeBackend> = OnceLock::new();
110 *CACHED.get_or_init(|| match std::env::var("XLOG_USE_DEVICE_RUNTIME") {
111 Ok(v) if matches!(v.as_str(), "1" | "true" | "TRUE" | "True") => {
112 TestRuntimeBackend::DeviceRuntime
113 }
114 _ => TestRuntimeBackend::Legacy,
115 })
116 }
117}
118
119pub struct TestContext {
121 _lock: std::sync::MutexGuard<'static, ()>,
125 _file_lock: std::fs::File,
126 pub provider: CudaKernelProvider,
127 pub device: Arc<CudaDevice>,
128 pub memory: Arc<GpuMemoryManager>,
129 backend: TestRuntimeBackend,
132 _runtime: Option<Arc<XlogDeviceRuntime>>,
136 transfer: Arc<TransferCounters>,
137}
138
139pub fn enforce_cuda_required(context: &str, err: &XlogError) {
145 if std::env::var("XLOG_REQUIRE_CUDA").as_deref() == Ok("1") {
146 panic!("XLOG_REQUIRE_CUDA=1 but CUDA is unavailable ({context}): {err}");
147 }
148}
149
150impl TestContext {
151 pub fn with_budget(budget_bytes: usize) -> Result<Self> {
159 let result = Self::with_budget_inner(budget_bytes);
160 if let Err(err) = &result {
161 enforce_cuda_required("TestContext::with_budget", err);
162 }
163 result
164 }
165
166 fn with_budget_inner(budget_bytes: usize) -> Result<Self> {
167 let lock = gpu_test_lock();
168 let file_lock = gpu_test_lock_file()?;
169
170 let device_count = std::panic::catch_unwind(CudaDevice::count)
172 .map_err(|_| {
173 XlogError::Kernel(
174 "Failed to get device count: cudarc panicked during driver initialization"
175 .to_string(),
176 )
177 })?
178 .map_err(|e| XlogError::Kernel(format!("Failed to get device count: {}", e)))?;
179
180 if device_count == 0 {
181 return Err(XlogError::Kernel("No CUDA devices available".to_string()));
182 }
183
184 let device = Arc::new(CudaDevice::new(0)?);
185 let budget = MemoryBudget::with_limit(budget_bytes as u64);
186 let backend = TestRuntimeBackend::from_env();
187 let transfer = Arc::new(TransferCounters::default());
188
189 let (memory, provider, runtime_arc) = match backend {
190 TestRuntimeBackend::Legacy => {
191 let memory = Arc::new(GpuMemoryManager::new(device.clone(), budget));
192 let provider = CudaKernelProvider::new(device.clone(), memory.clone())?;
193 (memory, provider, None)
194 }
195 TestRuntimeBackend::DeviceRuntime => {
196 let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
204 let async_resource: Box<dyn DeviceMemoryResource + Send + Sync> = Box::new(
205 AsyncCudaResource::new(Arc::clone(&device), 0, Arc::clone(&pool)),
206 );
207 let logging: Box<dyn DeviceMemoryResource + Send + Sync> =
208 Box::new(LoggingResource::new(
209 async_resource,
210 Arc::new(DiscardSink) as Arc<dyn LoggingSink>,
211 ));
212 let budget_resource: Box<dyn DeviceMemoryResource + Send + Sync> =
213 Box::new(GlobalDeviceBudget::new(logging, budget_bytes));
214 let runtime = Arc::new(XlogDeviceRuntime::with_resource(
215 Arc::clone(&device),
216 0,
217 Arc::clone(&pool),
218 budget_resource,
219 ));
220 let memory = Arc::new(GpuMemoryManager::with_runtime(
221 Arc::clone(&device),
222 budget,
223 Arc::clone(&runtime),
224 ));
225 let provider =
226 CudaKernelProvider::with_runtime(Arc::clone(&device), Arc::clone(&memory))?;
227 (memory, provider, Some(runtime))
228 }
229 };
230
231 Ok(Self {
232 _lock: lock,
233 _file_lock: file_lock,
234 provider,
235 device,
236 memory,
237 backend,
238 _runtime: runtime_arc,
239 transfer,
240 })
241 }
242
243 pub fn uses_device_runtime(&self) -> bool {
248 self.backend == TestRuntimeBackend::DeviceRuntime
249 }
250
251 pub fn reap_pending(&self) {
261 if let Some(rt) = &self._runtime {
262 let _ = rt.reap_pending();
269 }
270 }
271
272 pub fn new() -> Result<Self> {
274 Self::with_budget(1024 * 1024 * 1024)
275 }
276
277 pub fn large() -> Result<Self> {
279 Self::with_budget(4 * 1024 * 1024 * 1024)
280 }
281
282 pub fn sync_and_check(&self) -> Result<()> {
284 self.device
285 .inner()
286 .synchronize()
287 .map_err(|e| XlogError::Kernel(format!("Sync failed: {}", e)))?;
288 Ok(())
289 }
290
291 pub fn memory_used(&self) -> usize {
293 self.memory.allocated_bytes() as usize
294 }
295
296 pub fn memory_budget(&self) -> usize {
298 self.memory.budget().device_bytes as usize
299 }
300
301 pub fn reset_transfer_counters(&self) {
303 self.transfer.reset();
304 }
305
306 pub fn transfer_counters(&self) -> (u64, u64) {
308 self.transfer.snapshot()
309 }
310
311 pub fn htod_sync_copy_into<T: DeviceRepr, Dst: DevicePtrMut<T>>(
313 &self,
314 src: &[T],
315 dst: &mut Dst,
316 ) -> Result<()> {
317 let bytes = std::mem::size_of::<T>()
318 .checked_mul(src.len())
319 .ok_or_else(|| XlogError::Kernel("htod byte count overflow".to_string()))?;
320 self.transfer.add_htod(bytes as u64);
321 self.device
322 .inner()
323 .htod_sync_copy_into(src, dst)
324 .map_err(|e| XlogError::Kernel(format!("Failed htod copy: {}", e)))?;
325 Ok(())
326 }
327
328 pub fn dtoh_sync_copy<T: DeviceRepr, Src: DevicePtr<T>>(&self, src: &Src) -> Result<Vec<T>> {
330 let bytes = std::mem::size_of::<T>()
331 .checked_mul(src.len())
332 .ok_or_else(|| XlogError::Kernel("dtoh byte count overflow".to_string()))?;
333 self.transfer.add_dtoh(bytes as u64);
334 self.device
335 .inner()
336 .dtoh_sync_copy(src)
337 .map_err(|e| XlogError::Kernel(format!("Failed dtoh copy: {}", e)))
338 }
339
340 pub fn dtoh_sync_copy_into<T: DeviceRepr, Src: DevicePtr<T>>(
342 &self,
343 src: &Src,
344 dst: &mut [T],
345 ) -> Result<()> {
346 let bytes = std::mem::size_of::<T>()
347 .checked_mul(dst.len())
348 .ok_or_else(|| XlogError::Kernel("dtoh byte count overflow".to_string()))?;
349 self.transfer.add_dtoh(bytes as u64);
350 self.device
351 .inner()
352 .dtoh_sync_copy_into(src, dst)
353 .map_err(|e| XlogError::Kernel(format!("Failed dtoh copy: {}", e)))?;
354 Ok(())
355 }
356
357 pub fn measure_gpu_ms<F>(&self, f: F) -> Result<f32>
359 where
360 F: FnOnce() -> Result<()>,
361 {
362 let stream = self.device.inner().stream();
363 let start = stream
364 .context()
365 .new_event(Some(sys::CUevent_flags::CU_EVENT_DEFAULT))
366 .map_err(|e| XlogError::Kernel(format!("Failed to create start event: {}", e)))?;
367 let end = stream
368 .context()
369 .new_event(Some(sys::CUevent_flags::CU_EVENT_DEFAULT))
370 .map_err(|e| XlogError::Kernel(format!("Failed to create end event: {}", e)))?;
371
372 start
373 .record(stream)
374 .map_err(|e| XlogError::Kernel(format!("Failed to record start event: {}", e)))?;
375 f()?;
376 end.record(stream)
377 .map_err(|e| XlogError::Kernel(format!("Failed to record end event: {}", e)))?;
378 self.sync_and_check()?;
379 let elapsed = start.elapsed_ms(&end).map_err(|e| {
380 XlogError::Kernel(format!("Failed to measure event elapsed time: {}", e))
381 })?;
382 Ok(elapsed)
383 }
384
385 pub fn multi_gpu_available(&self) -> bool {
387 match std::panic::catch_unwind(CudaDevice::count) {
388 Ok(Ok(n)) => n > 1,
389 _ => false,
390 }
391 }
392
393 pub fn compute_capability(&self) -> Result<(u32, u32)> {
395 use cudarc::driver::sys::CUdevice_attribute;
396
397 let major = self
398 .device
399 .inner()
400 .attribute(CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MAJOR)
401 .map_err(|e| {
402 XlogError::Kernel(format!("Failed to query compute capability major: {}", e))
403 })?;
404 let minor = self
405 .device
406 .inner()
407 .attribute(CUdevice_attribute::CU_DEVICE_ATTRIBUTE_COMPUTE_CAPABILITY_MINOR)
408 .map_err(|e| {
409 XlogError::Kernel(format!("Failed to query compute capability minor: {}", e))
410 })?;
411
412 let major_u32: u32 = major.try_into().map_err(|_| {
413 XlogError::Kernel(format!(
414 "Failed to convert compute capability major {} to u32",
415 major
416 ))
417 })?;
418 let minor_u32: u32 = minor.try_into().map_err(|_| {
419 XlogError::Kernel(format!(
420 "Failed to convert compute capability minor {} to u32",
421 minor
422 ))
423 })?;
424
425 Ok((major_u32, minor_u32))
426 }
427
428 pub fn device_row_count(&self, buffer: &CudaBuffer) -> u64 {
432 let mut host_rows = [0u32];
433 self.dtoh_sync_copy_into(buffer.num_rows_device(), &mut host_rows)
434 .unwrap_or_else(|e| panic!("Failed to read device row count: {}", e));
435 host_rows[0] as u64
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use std::process::Command;
443
444 #[test]
445 fn test_gpu_test_lock_file_is_exclusive() {
446 let ctx = match TestContext::new() {
447 Ok(ctx) => ctx,
448 Err(_) => {
449 eprintln!("Skipping test: no CUDA device available");
450 return;
451 }
452 };
453
454 let status = Command::new("flock")
455 .arg("-n")
456 .arg(GPU_TEST_LOCK_PATH)
457 .arg("-c")
458 .arg("true")
459 .status();
460
461 match status {
462 Ok(status) => assert!(
463 !status.success(),
464 "GPU test lock file should be held exclusively while TestContext is alive"
465 ),
466 Err(_) => {
467 eprintln!("Skipping test: flock command not available");
468 }
469 }
470
471 drop(ctx);
472 }
473}
474
475#[macro_export]
477macro_rules! gpu_test {
478 ($name:ident, $body:expr) => {
479 #[test]
480 fn $name() {
481 let ctx =
482 $crate::harness::TestContext::new().expect("CUDA device required for this test");
483 $body(&ctx);
484 }
485 };
486}
487
488#[macro_export]
490macro_rules! gpu_test_skip {
491 ($name:ident, $body:expr) => {
492 #[test]
493 fn $name() {
494 let ctx = match $crate::harness::TestContext::new() {
495 Ok(ctx) => ctx,
496 Err(_) => {
497 eprintln!("Skipping {}: no CUDA device", stringify!($name));
498 return;
499 }
500 };
501 $body(&ctx);
502 }
503 };
504}