xlog_cuda/device_runtime/
budget.rs1use std::sync::Mutex;
70
71use super::resource::{
72 Access, AllocTag, BlockId, DeviceBlock, DeviceMemoryResource, ResourceError, ResourceResult,
73 StreamId,
74};
75
76struct BudgetState {
79 reserved: usize,
80}
81
82pub struct GlobalDeviceBudget {
84 inner: Box<dyn DeviceMemoryResource + Send + Sync>,
85 limit: usize,
86 state: Mutex<BudgetState>,
87}
88
89impl GlobalDeviceBudget {
90 pub fn new(inner: Box<dyn DeviceMemoryResource + Send + Sync>, limit: usize) -> Self {
96 let initial = inner.bytes_outstanding();
97 Self {
98 inner,
99 limit,
100 state: Mutex::new(BudgetState { reserved: initial }),
101 }
102 }
103
104 pub fn limit(&self) -> usize {
106 self.limit
107 }
108
109 pub fn reserved_bytes(&self) -> usize {
113 self.state
114 .lock()
115 .expect("GlobalDeviceBudget poisoned")
116 .reserved
117 }
118
119 pub fn remaining(&self) -> usize {
122 let state = self.state.lock().expect("GlobalDeviceBudget poisoned");
123 self.limit.saturating_sub(state.reserved)
124 }
125}
126
127impl DeviceMemoryResource for GlobalDeviceBudget {
128 fn allocate(
129 &self,
130 bytes: usize,
131 stream: StreamId,
132 tag: AllocTag,
133 ) -> ResourceResult<DeviceBlock> {
134 {
138 let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
139 let remaining = self.limit.saturating_sub(state.reserved);
140 if bytes <= remaining {
141 state.reserved = state.reserved.saturating_add(bytes);
142 drop(state);
143 return match self.inner.allocate(bytes, stream, tag) {
144 Ok(block) => Ok(block),
145 Err(e) => {
146 let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
147 state.reserved = state.reserved.saturating_sub(bytes);
148 Err(e)
149 }
150 };
151 }
152 if bytes > self.limit {
157 return Err(ResourceError::OutOfBudget {
158 requested: bytes,
159 remaining,
160 });
161 }
162 }
163
164 let _ = self.reap_pending();
181
182 let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
183 let remaining = self.limit.saturating_sub(state.reserved);
184 if bytes > remaining {
185 return Err(ResourceError::OutOfBudget {
186 requested: bytes,
187 remaining,
188 });
189 }
190 state.reserved = state.reserved.saturating_add(bytes);
191 drop(state);
192
193 match self.inner.allocate(bytes, stream, tag) {
194 Ok(block) => Ok(block),
195 Err(e) => {
196 let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
197 state.reserved = state.reserved.saturating_sub(bytes);
198 Err(e)
199 }
200 }
201 }
202
203 fn deallocate(&self, block: DeviceBlock) -> ResourceResult<()> {
204 let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
205
206 let before = self.inner.bytes_outstanding();
207 let result = self.inner.deallocate(block);
208 let after = self.inner.bytes_outstanding();
209 let freed = before.saturating_sub(after);
210 if freed > 0 {
211 state.reserved = state.reserved.saturating_sub(freed);
212 }
213 result
214 }
215
216 fn device_ordinal(&self) -> u32 {
217 self.inner.device_ordinal()
218 }
219
220 fn bytes_outstanding(&self) -> usize {
221 self.inner.bytes_outstanding()
225 }
226
227 fn reap_pending(&self) -> ResourceResult<()> {
228 let mut state = self.state.lock().expect("GlobalDeviceBudget poisoned");
229
230 let before = self.inner.bytes_outstanding();
231 let result = self.inner.reap_pending();
232 let after = self.inner.bytes_outstanding();
233 let freed = before.saturating_sub(after);
234 if freed > 0 {
235 state.reserved = state.reserved.saturating_sub(freed);
236 }
237 result
238 }
239
240 fn record_block_use(&self, block: &DeviceBlock, use_stream: StreamId) -> ResourceResult<()> {
241 self.inner.record_block_use(block, use_stream)
246 }
247
248 fn supports_block_use_tracking(&self) -> bool {
249 self.inner.supports_block_use_tracking()
250 }
251
252 fn prepare_block_use(
253 &self,
254 block: BlockId,
255 use_stream: StreamId,
256 access: Access,
257 ) -> ResourceResult<()> {
258 self.inner.prepare_block_use(block, use_stream, access)
261 }
262
263 fn finish_block_use(
264 &self,
265 block: BlockId,
266 use_stream: StreamId,
267 access: Access,
268 ) -> ResourceResult<()> {
269 self.inner.finish_block_use(block, use_stream, access)
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::super::async_resource::AsyncCudaResource;
277 use super::super::direct::DirectCudaResource;
278 use super::super::resource::{BlockState, Generation};
279 use super::super::stream_pool::StreamPool;
280 use super::*;
281 use std::sync::Arc;
282
283 use crate::CudaDevice;
284
285 fn try_device() -> Option<Arc<CudaDevice>> {
286 CudaDevice::new(0).ok().map(Arc::new)
287 }
288
289 struct AlwaysFailAllocResource {
295 ord: u32,
296 outstanding: std::sync::atomic::AtomicUsize,
297 }
298
299 impl AlwaysFailAllocResource {
300 fn new(ord: u32) -> Self {
301 Self {
302 ord,
303 outstanding: std::sync::atomic::AtomicUsize::new(0),
304 }
305 }
306 }
307
308 impl DeviceMemoryResource for AlwaysFailAllocResource {
309 fn allocate(
310 &self,
311 _bytes: usize,
312 _stream: StreamId,
313 _tag: AllocTag,
314 ) -> ResourceResult<DeviceBlock> {
315 Err(ResourceError::Driver("inner always fails".into()))
316 }
317 fn deallocate(&self, _block: DeviceBlock) -> ResourceResult<()> {
318 Ok(())
319 }
320 fn device_ordinal(&self) -> u32 {
321 self.ord
322 }
323 fn bytes_outstanding(&self) -> usize {
324 self.outstanding.load(std::sync::atomic::Ordering::Relaxed)
325 }
326 }
327
328 #[test]
329 fn allocate_within_limit_succeeds_and_updates_reserved() {
330 let Some(device) = try_device() else {
331 return;
332 };
333 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
334 let budget = GlobalDeviceBudget::new(inner, 64 * 1024);
335
336 let block = budget
337 .allocate(2048, StreamId::DEFAULT, AllocTag("budget-success"))
338 .expect("alloc within limit");
339 assert_eq!(budget.reserved_bytes(), 2048);
340 assert_eq!(budget.remaining(), 64 * 1024 - 2048);
341 assert_eq!(budget.bytes_outstanding(), 2048);
342
343 budget.deallocate(block).expect("dealloc");
344 assert_eq!(budget.reserved_bytes(), 0);
345 assert_eq!(budget.bytes_outstanding(), 0);
346 }
347
348 #[test]
349 fn allocate_at_exact_limit_succeeds_then_next_byte_rejected() {
350 let Some(device) = try_device() else {
351 return;
352 };
353 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
354 let budget = GlobalDeviceBudget::new(inner, 4096);
355
356 let block = budget
357 .allocate(4096, StreamId::DEFAULT, AllocTag::UNTAGGED)
358 .expect("alloc at exact limit");
359 assert_eq!(budget.reserved_bytes(), 4096);
360 assert_eq!(budget.remaining(), 0);
361
362 let err = budget.allocate(1, StreamId::DEFAULT, AllocTag::UNTAGGED);
363 assert!(
364 matches!(
365 err,
366 Err(ResourceError::OutOfBudget {
367 requested: 1,
368 remaining: 0
369 })
370 ),
371 "expected OutOfBudget {{1,0}}, got {:?}",
372 err
373 );
374 assert_eq!(budget.reserved_bytes(), 4096);
376
377 budget.deallocate(block).expect("dealloc");
378 assert_eq!(budget.reserved_bytes(), 0);
379 }
380
381 #[test]
382 fn over_limit_alloc_returns_out_of_budget_with_correct_remaining() {
383 let Some(device) = try_device() else {
384 return;
385 };
386 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
387 let budget = GlobalDeviceBudget::new(inner, 1024);
388
389 let block = budget
391 .allocate(768, StreamId::DEFAULT, AllocTag::UNTAGGED)
392 .expect("first alloc");
393 assert_eq!(budget.remaining(), 256);
394
395 let err = budget.allocate(512, StreamId::DEFAULT, AllocTag::UNTAGGED);
396 assert!(
397 matches!(
398 err,
399 Err(ResourceError::OutOfBudget {
400 requested: 512,
401 remaining: 256
402 })
403 ),
404 "expected OutOfBudget {{512,256}}, got {:?}",
405 err
406 );
407
408 budget.deallocate(block).expect("dealloc");
409 }
410
411 #[test]
412 fn failed_inner_allocation_rolls_back_reservation() {
413 let inner = Box::new(AlwaysFailAllocResource::new(0));
415 let budget = GlobalDeviceBudget::new(inner, 1024 * 1024);
416 assert_eq!(budget.reserved_bytes(), 0);
417
418 let err = budget.allocate(2048, StreamId::DEFAULT, AllocTag::UNTAGGED);
419 assert!(matches!(err, Err(ResourceError::Driver(_))));
420 assert_eq!(budget.reserved_bytes(), 0);
424 assert_eq!(budget.remaining(), 1024 * 1024);
425 }
426
427 #[test]
428 fn deallocate_releases_budget_immediately_for_synchronous_inner() {
429 let Some(device) = try_device() else {
433 return;
434 };
435 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
436 let budget = GlobalDeviceBudget::new(inner, 16 * 1024);
437
438 let block = budget
439 .allocate(8 * 1024, StreamId::DEFAULT, AllocTag::UNTAGGED)
440 .expect("alloc");
441 assert_eq!(budget.reserved_bytes(), 8 * 1024);
442 budget.deallocate(block).expect("dealloc");
443 assert_eq!(
444 budget.reserved_bytes(),
445 0,
446 "synchronous inner releases budget at deallocate"
447 );
448 budget.reap_pending().expect("reap noop");
450 assert_eq!(budget.reserved_bytes(), 0);
451 }
452
453 #[test]
454 fn deallocate_holds_budget_for_async_inner_until_reap_pending() {
455 let Some(device) = try_device() else {
456 return;
457 };
458 let pool = Arc::new(StreamPool::with_defaults(Arc::clone(&device)));
459 let inner = Box::new(AsyncCudaResource::new(
460 Arc::clone(&device),
461 0,
462 Arc::clone(&pool),
463 ));
464 let budget = GlobalDeviceBudget::new(inner, 32 * 1024);
465
466 let block = budget
467 .allocate(4096, StreamId::DEFAULT, AllocTag("budget-async"))
468 .expect("alloc");
469 assert_eq!(budget.reserved_bytes(), 4096);
470
471 budget.deallocate(block).expect("dealloc");
475 assert_eq!(
476 budget.reserved_bytes(),
477 4096,
478 "async inner: budget must stay reserved until reap_pending drains pending free"
479 );
480 assert_eq!(budget.bytes_outstanding(), 4096);
481
482 budget.reap_pending().expect("reap");
483 assert_eq!(
484 budget.reserved_bytes(),
485 0,
486 "async inner: reap_pending releases the pending bytes"
487 );
488 assert_eq!(budget.bytes_outstanding(), 0);
489 }
490
491 #[test]
492 fn deallocate_unknown_block_does_not_release_budget() {
493 let Some(device) = try_device() else {
494 return;
495 };
496 let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
497 let budget = GlobalDeviceBudget::new(inner, 16 * 1024);
498
499 let block = budget
500 .allocate(2048, StreamId::DEFAULT, AllocTag::UNTAGGED)
501 .expect("alloc");
502 assert_eq!(budget.reserved_bytes(), 2048);
503
504 let bogus = DeviceBlock {
507 ptr: 0xfeed_face,
508 device_ordinal: 0,
509 alloc_stream: StreamId::DEFAULT,
510 bytes: 1024,
511 align: 1,
512 tag: AllocTag::UNTAGGED,
513 generation: Generation::next(),
514 state: BlockState::Live,
515 };
516 let res = budget.deallocate(bogus);
517 assert!(matches!(res, Err(ResourceError::UseAfterFree { .. })));
518 assert_eq!(
519 budget.reserved_bytes(),
520 2048,
521 "bogus dealloc must not release budget"
522 );
523
524 budget.deallocate(block).expect("real dealloc");
525 assert_eq!(budget.reserved_bytes(), 0);
526 }
527
528 #[test]
529 fn forwards_device_ordinal() {
530 let inner = Box::new(AlwaysFailAllocResource::new(7));
531 let budget = GlobalDeviceBudget::new(inner, 1024);
532 assert_eq!(budget.device_ordinal(), 7);
533 }
534}