Skip to main content

xlog_cuda/device_runtime/
logging.rs

1//! [`LoggingResource`] — telemetry decorator for any
2//! [`DeviceMemoryResource`].
3//!
4//! Wraps an inner resource and records every `allocate`,
5//! `deallocate`, and `reap_pending` call (success and failure) into
6//! a [`LoggingSink`]. The decorator is fully transparent to allocator
7//! semantics: it forwards every method, never alters the inner
8//! result, and never panics or returns an error caused by the sink
9//! itself. Logging failure is recorded into a per-resource diagnostic
10//! counter (visible via [`LoggingResource::dropped_records`]) and
11//! otherwise silenced — losing a log line must not corrupt
12//! allocation correctness.
13//!
14//! Sink design
15//! -----------
16//! [`LoggingSink`] is a tiny trait with a single `emit` method that
17//! returns `Result<(), SinkError>`. The default in-memory sink
18//! ([`InMemorySink`]) buffers records in a `Mutex<Vec<LogRecord>>`
19//! and is the test workhorse — it lets unit tests assert the exact
20//! sequence of records without filesystem dependencies. A future
21//! `CsvFileSink` or `RingBufferSink` slots in without touching the
22//! decorator.
23//!
24//! Record contents
25//! ---------------
26//! Every emitted [`LogRecord`] carries:
27//!   * `action` — Allocate / Deallocate / ReapPending
28//!   * `device_ordinal`
29//!   * `stream_id` — present for Allocate / Deallocate (the block's
30//!     alloc_stream); absent for ReapPending which spans streams.
31//!   * `ptr` / `bytes` / `tag` / `generation` — present when the
32//!     operation has a corresponding [`DeviceBlock`] reference.
33//!   * `thread_id` — `std::thread::current().id()` rendered as u64
34//!     for portability.
35//!   * `order_counter` — monotonic per-process u64. Strictly
36//!     increasing across all sinks, all threads, all resources.
37//!   * `timestamp_nanos` — `SystemTime::now()` since UNIX epoch in
38//!     nanoseconds. Wall-clock; not monotonic. Use `order_counter`
39//!     for ordering, `timestamp_nanos` for human-readable spans.
40//!   * `result` — Ok or short error tag string.
41//!
42//! Decorator does NOT capture the inner resource's pre-call state
43//! (e.g., bytes_outstanding before allocate) — that would require
44//! locking the inner resource an extra time and is not part of the
45//! v0.6 telemetry contract. Callers that need pre/post diffs read
46//! `bytes_outstanding` themselves.
47
48use std::fmt;
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::Mutex;
51use std::time::{SystemTime, UNIX_EPOCH};
52
53use super::resource::{
54    Access, AllocTag, BlockId, DeviceBlock, DeviceMemoryResource, Generation, ResourceError,
55    ResourceResult, StreamId,
56};
57
58/// Action recorded in a [`LogRecord`]. Distinct variants for the
59/// three inner methods so consumers can filter without parsing the
60/// result message.
61#[derive(Clone, Copy, Debug, Eq, PartialEq)]
62pub enum LogAction {
63    Allocate,
64    Deallocate,
65    ReapPending,
66}
67
68impl fmt::Display for LogAction {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        match self {
71            LogAction::Allocate => f.write_str("allocate"),
72            LogAction::Deallocate => f.write_str("deallocate"),
73            LogAction::ReapPending => f.write_str("reap_pending"),
74        }
75    }
76}
77
78/// Result status as recorded in a log entry. Successful operations
79/// are recorded as `Ok`; failed ones carry the error variant tag
80/// plus an optional short message. We keep the message bounded so a
81/// runaway driver-error string cannot blow up the sink buffer.
82#[derive(Clone, Debug, Eq, PartialEq)]
83pub enum LogResult {
84    Ok,
85    Err { kind: &'static str, message: String },
86}
87
88impl LogResult {
89    /// Convert a [`ResourceResult`] outcome into a `LogResult`,
90    /// truncating overlong messages to keep records bounded.
91    pub fn from_result<T>(r: &ResourceResult<T>) -> Self {
92        match r {
93            Ok(_) => LogResult::Ok,
94            Err(e) => LogResult::Err {
95                kind: classify_error(e),
96                message: truncate(format!("{}", e), 256),
97            },
98        }
99    }
100}
101
102fn classify_error(e: &ResourceError) -> &'static str {
103    match e {
104        ResourceError::OutOfBudget { .. } => "OutOfBudget",
105        ResourceError::Driver(_) => "Driver",
106        ResourceError::StreamMisuse(_) => "StreamMisuse",
107        ResourceError::UseAfterFree { .. } => "UseAfterFree",
108        ResourceError::OutOfBounds { .. } => "OutOfBounds",
109    }
110}
111
112fn truncate(mut s: String, cap: usize) -> String {
113    if s.len() > cap {
114        // String::truncate panics if `cap` is not on a UTF-8 char
115        // boundary. Error messages can include non-ASCII payloads
116        // (driver strings, user tags, etc.), so walk back to the
117        // previous boundary before truncating. This preserves the
118        // decorator's "never panics" guarantee — the logging path
119        // must not crash the allocator.
120        let mut end = cap;
121        while end > 0 && !s.is_char_boundary(end) {
122            end -= 1;
123        }
124        s.truncate(end);
125        s.push('…');
126    }
127    s
128}
129
130/// A single allocation-log entry. Values are owned/`Copy` so the
131/// record is `Clone + Send + Sync` and trivially serializable.
132#[derive(Clone, Debug)]
133pub struct LogRecord {
134    pub action: LogAction,
135    pub device_ordinal: u32,
136    pub stream_id: Option<StreamId>,
137    pub ptr: Option<u64>,
138    pub bytes: Option<usize>,
139    pub tag: Option<AllocTag>,
140    pub generation: Option<Generation>,
141    pub thread_id: u64,
142    pub order_counter: u64,
143    pub timestamp_nanos: u128,
144    pub result: LogResult,
145}
146
147/// Process-wide monotonic counter used for `LogRecord::order_counter`.
148/// Cross-resource, cross-thread strict order — useful for
149/// reconstructing the exact emission sequence in tests with multiple
150/// resources composed.
151static ORDER_COUNTER: AtomicU64 = AtomicU64::new(1);
152
153fn next_order_counter() -> u64 {
154    ORDER_COUNTER.fetch_add(1, Ordering::Relaxed)
155}
156
157fn now_nanos() -> u128 {
158    SystemTime::now()
159        .duration_since(UNIX_EPOCH)
160        .map(|d| d.as_nanos())
161        .unwrap_or(0)
162}
163
164fn current_thread_id_u64() -> u64 {
165    // `ThreadId` does not expose its inner u64 on stable. Hash the
166    // Debug representation instead — stable for the lifetime of the
167    // thread, distinct across threads, and fits in u64.
168    let s = format!("{:?}", std::thread::current().id());
169    let mut h: u64 = 0xcbf2_9ce4_8422_2325;
170    for b in s.as_bytes() {
171        h ^= *b as u64;
172        h = h.wrapping_mul(0x100_0000_01b3);
173    }
174    h
175}
176
177/// Sink-side error. Returned by [`LoggingSink::emit`] when the sink
178/// cannot accept a record (full ring buffer, IO error, etc.). The
179/// decorator catches this and increments
180/// [`LoggingResource::dropped_records`]; it does **not** propagate
181/// the error to the allocator caller.
182#[derive(Debug)]
183pub enum SinkError {
184    /// The sink is closed or full and refused the record.
185    Refused(String),
186    /// IO-backed sinks may surface a wrapped IO error here.
187    Io(String),
188}
189
190impl fmt::Display for SinkError {
191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192        match self {
193            SinkError::Refused(m) => write!(f, "log sink refused record: {}", m),
194            SinkError::Io(m) => write!(f, "log sink io error: {}", m),
195        }
196    }
197}
198
199impl std::error::Error for SinkError {}
200
201/// Trait for log destinations. Implementations are required to be
202/// thread-safe (`Send + Sync`); the decorator may emit from any
203/// thread that calls into the resource.
204pub trait LoggingSink: Send + Sync {
205    /// Accept a record. Implementations must not panic on full /
206    /// closed / IO-error conditions — return [`SinkError`] instead.
207    fn emit(&self, record: LogRecord) -> Result<(), SinkError>;
208}
209
210/// In-memory sink for tests and lightweight use. Records are
211/// appended to a `Vec` under a `Mutex`. `snapshot` returns a clone
212/// so consumers can inspect without holding the mutex.
213pub struct InMemorySink {
214    records: Mutex<Vec<LogRecord>>,
215}
216
217impl InMemorySink {
218    pub fn new() -> Self {
219        Self {
220            records: Mutex::new(Vec::new()),
221        }
222    }
223
224    /// Clone the current record buffer.
225    pub fn snapshot(&self) -> Vec<LogRecord> {
226        self.records.lock().expect("InMemorySink poisoned").clone()
227    }
228
229    /// Drop all buffered records. Useful for tests that want a
230    /// clean slate between phases.
231    pub fn clear(&self) {
232        self.records.lock().expect("InMemorySink poisoned").clear();
233    }
234
235    /// Number of records currently buffered.
236    pub fn len(&self) -> usize {
237        self.records.lock().expect("InMemorySink poisoned").len()
238    }
239
240    pub fn is_empty(&self) -> bool {
241        self.len() == 0
242    }
243}
244
245impl Default for InMemorySink {
246    fn default() -> Self {
247        Self::new()
248    }
249}
250
251impl LoggingSink for InMemorySink {
252    fn emit(&self, record: LogRecord) -> Result<(), SinkError> {
253        self.records
254            .lock()
255            .expect("InMemorySink poisoned")
256            .push(record);
257        Ok(())
258    }
259}
260
261/// Discard sink: accepts every record and drops it. Use this when
262/// the test or production stack needs the runtime composition to
263/// match `LoggingResource(...)` shape but does **not** need to
264/// retain log records.
265///
266/// `InMemorySink` keeps every record alive for as long as the sink
267/// (and therefore the wrapping `LoggingResource`) lives. In long-
268/// running stress loops that compose
269/// `LoggingResource(InMemorySink)` once and reuse it across many
270/// iterations, that buffer grows unbounded — measurable memory and
271/// CPU overhead even if no test reads the records.
272/// `NullSink` solves that by discarding the record without
273/// allocating; the decorator still constructs the `LogRecord`
274/// (one stack-allocated value per call), but no per-record retention
275/// occurs.
276pub struct NullSink;
277
278impl NullSink {
279    pub fn new() -> Self {
280        Self
281    }
282}
283
284impl Default for NullSink {
285    fn default() -> Self {
286        Self
287    }
288}
289
290impl LoggingSink for NullSink {
291    fn emit(&self, _record: LogRecord) -> Result<(), SinkError> {
292        Ok(())
293    }
294}
295
296/// Telemetry decorator for [`DeviceMemoryResource`].
297pub struct LoggingResource {
298    inner: Box<dyn DeviceMemoryResource + Send + Sync>,
299    sink: std::sync::Arc<dyn LoggingSink>,
300    /// Count of records the sink refused or errored on. Surfaced as
301    /// a diagnostic for callers that want to detect telemetry loss
302    /// without halting the data-plane.
303    dropped_records: AtomicU64,
304}
305
306impl LoggingResource {
307    /// Wrap `inner` with `sink`. Records are emitted on every public
308    /// call; sink failures are counted in `dropped_records`.
309    pub fn new(
310        inner: Box<dyn DeviceMemoryResource + Send + Sync>,
311        sink: std::sync::Arc<dyn LoggingSink>,
312    ) -> Self {
313        Self {
314            inner,
315            sink,
316            dropped_records: AtomicU64::new(0),
317        }
318    }
319
320    /// Total records the sink has refused. A nonzero value means
321    /// telemetry was lost; allocator semantics are unaffected.
322    pub fn dropped_records(&self) -> u64 {
323        self.dropped_records.load(Ordering::Relaxed)
324    }
325
326    fn emit(&self, record: LogRecord) {
327        if self.sink.emit(record).is_err() {
328            self.dropped_records.fetch_add(1, Ordering::Relaxed);
329        }
330    }
331}
332
333impl DeviceMemoryResource for LoggingResource {
334    fn allocate(
335        &self,
336        bytes: usize,
337        stream: StreamId,
338        tag: AllocTag,
339    ) -> ResourceResult<DeviceBlock> {
340        let result = self.inner.allocate(bytes, stream, tag);
341        let (ptr, gen, recorded_bytes) = match &result {
342            Ok(b) => (Some(b.ptr), Some(b.generation), Some(b.bytes)),
343            Err(_) => (None, None, Some(bytes)),
344        };
345        self.emit(LogRecord {
346            action: LogAction::Allocate,
347            device_ordinal: self.inner.device_ordinal(),
348            stream_id: Some(stream),
349            ptr,
350            bytes: recorded_bytes,
351            tag: Some(tag),
352            generation: gen,
353            thread_id: current_thread_id_u64(),
354            order_counter: next_order_counter(),
355            timestamp_nanos: now_nanos(),
356            result: LogResult::from_result(&result),
357        });
358        result
359    }
360
361    fn deallocate(&self, block: DeviceBlock) -> ResourceResult<()> {
362        // Capture identifying fields before the move into inner —
363        // on success the block is consumed and we still need them
364        // for the log record.
365        let ptr = block.ptr;
366        let bytes = block.bytes;
367        let tag = block.tag;
368        let gen = block.generation;
369        let stream = block.alloc_stream;
370        let dev = block.device_ordinal;
371
372        let result = self.inner.deallocate(block);
373        self.emit(LogRecord {
374            action: LogAction::Deallocate,
375            device_ordinal: dev,
376            stream_id: Some(stream),
377            ptr: Some(ptr),
378            bytes: Some(bytes),
379            tag: Some(tag),
380            generation: Some(gen),
381            thread_id: current_thread_id_u64(),
382            order_counter: next_order_counter(),
383            timestamp_nanos: now_nanos(),
384            result: LogResult::from_result(&result),
385        });
386        result
387    }
388
389    fn device_ordinal(&self) -> u32 {
390        self.inner.device_ordinal()
391    }
392
393    fn bytes_outstanding(&self) -> usize {
394        self.inner.bytes_outstanding()
395    }
396
397    fn reap_pending(&self) -> ResourceResult<()> {
398        let result = self.inner.reap_pending();
399        self.emit(LogRecord {
400            action: LogAction::ReapPending,
401            device_ordinal: self.inner.device_ordinal(),
402            stream_id: None,
403            ptr: None,
404            bytes: None,
405            tag: None,
406            generation: None,
407            thread_id: current_thread_id_u64(),
408            order_counter: next_order_counter(),
409            timestamp_nanos: now_nanos(),
410            result: LogResult::from_result(&result),
411        });
412        result
413    }
414
415    fn record_block_use(&self, block: &DeviceBlock, use_stream: StreamId) -> ResourceResult<()> {
416        // Pass-through to the inner stream-ordered backend.
417        // No log record emitted: the launch builder calls this
418        // potentially many times per launch; recording each
419        // would balloon the log without telling the consumer
420        // anything they couldn't infer from Allocate/Deallocate
421        // pairs. If a future need arises, add a LogAction::Use
422        // variant rather than tagging this onto the existing
423        // shape.
424        self.inner.record_block_use(block, use_stream)
425    }
426
427    fn supports_block_use_tracking(&self) -> bool {
428        self.inner.supports_block_use_tracking()
429    }
430
431    fn prepare_block_use(
432        &self,
433        block: BlockId,
434        use_stream: StreamId,
435        access: Access,
436    ) -> ResourceResult<()> {
437        // Pass-through: same rationale as record_block_use.
438        // Pre-launch waits are an inner-backend concern.
439        self.inner.prepare_block_use(block, use_stream, access)
440    }
441
442    fn finish_block_use(
443        &self,
444        block: BlockId,
445        use_stream: StreamId,
446        access: Access,
447    ) -> ResourceResult<()> {
448        // Pass-through: see prepare_block_use rationale.
449        self.inner.finish_block_use(block, use_stream, access)
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::super::direct::DirectCudaResource;
456    use super::super::resource::BlockState;
457    use super::*;
458    use std::sync::Arc;
459
460    use crate::CudaDevice;
461
462    fn try_device() -> Option<Arc<CudaDevice>> {
463        CudaDevice::new(0).ok().map(Arc::new)
464    }
465
466    #[test]
467    fn pass_through_alloc_dealloc_emits_two_ok_records() {
468        let Some(device) = try_device() else {
469            return;
470        };
471        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
472        let sink = Arc::new(InMemorySink::new());
473        let r = LoggingResource::new(inner, sink.clone());
474
475        let block = r
476            .allocate(1024, StreamId::DEFAULT, AllocTag("logging-test"))
477            .expect("alloc");
478        assert_eq!(block.bytes, 1024);
479        assert_eq!(block.state, BlockState::Live);
480        r.deallocate(block).expect("dealloc");
481
482        let recs = sink.snapshot();
483        assert_eq!(recs.len(), 2, "expected 2 records, got {:?}", recs);
484        assert_eq!(recs[0].action, LogAction::Allocate);
485        assert_eq!(recs[0].result, LogResult::Ok);
486        assert_eq!(recs[0].bytes, Some(1024));
487        assert_eq!(recs[0].stream_id, Some(StreamId::DEFAULT));
488        assert!(recs[0].ptr.is_some());
489
490        assert_eq!(recs[1].action, LogAction::Deallocate);
491        assert_eq!(recs[1].result, LogResult::Ok);
492        assert_eq!(recs[1].ptr, recs[0].ptr);
493        assert_eq!(recs[1].generation, recs[0].generation);
494    }
495
496    #[test]
497    fn order_counter_strictly_increases_across_records() {
498        let Some(device) = try_device() else {
499            return;
500        };
501        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
502        let sink = Arc::new(InMemorySink::new());
503        let r = LoggingResource::new(inner, sink.clone());
504
505        for _ in 0..4 {
506            let b = r
507                .allocate(64, StreamId::DEFAULT, AllocTag::UNTAGGED)
508                .expect("alloc");
509            r.deallocate(b).expect("dealloc");
510        }
511        r.reap_pending().expect("reap");
512
513        let recs = sink.snapshot();
514        assert_eq!(recs.len(), 9); // 4*alloc + 4*dealloc + 1 reap
515        let mut last = 0u64;
516        for rec in &recs {
517            assert!(
518                rec.order_counter > last,
519                "order_counter must strictly increase: prev={}, now={}",
520                last,
521                rec.order_counter
522            );
523            last = rec.order_counter;
524        }
525    }
526
527    #[test]
528    fn failed_alloc_records_error_result() {
529        let Some(device) = try_device() else {
530            return;
531        };
532        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
533        let sink = Arc::new(InMemorySink::new());
534        let r = LoggingResource::new(inner, sink.clone());
535
536        // Zero-byte alloc fails per resource contract.
537        let _ = r.allocate(0, StreamId::DEFAULT, AllocTag::UNTAGGED);
538        let recs = sink.snapshot();
539        assert_eq!(recs.len(), 1);
540        assert_eq!(recs[0].action, LogAction::Allocate);
541        assert!(matches!(recs[0].result, LogResult::Err { kind, .. } if kind == "Driver"));
542        // Failed allocs still record the requested byte count.
543        assert_eq!(recs[0].bytes, Some(0));
544        assert!(recs[0].ptr.is_none());
545        assert!(recs[0].generation.is_none());
546    }
547
548    #[test]
549    fn failed_dealloc_records_error_result() {
550        let Some(device) = try_device() else {
551            return;
552        };
553        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
554        let sink = Arc::new(InMemorySink::new());
555        let r = LoggingResource::new(inner, sink.clone());
556
557        let bogus = DeviceBlock {
558            ptr: 0xdead_beef,
559            device_ordinal: 0,
560            alloc_stream: StreamId::DEFAULT,
561            bytes: 16,
562            align: 1,
563            tag: AllocTag::UNTAGGED,
564            generation: Generation::next(),
565            state: BlockState::Live,
566        };
567        let res = r.deallocate(bogus);
568        assert!(res.is_err());
569        let recs = sink.snapshot();
570        assert_eq!(recs.len(), 1);
571        assert_eq!(recs[0].action, LogAction::Deallocate);
572        assert!(matches!(recs[0].result, LogResult::Err { kind, .. } if kind == "UseAfterFree"));
573        assert_eq!(recs[0].ptr, Some(0xdead_beef));
574    }
575
576    #[test]
577    fn reap_pending_emits_record() {
578        let Some(device) = try_device() else {
579            return;
580        };
581        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
582        let sink = Arc::new(InMemorySink::new());
583        let r = LoggingResource::new(inner, sink.clone());
584
585        r.reap_pending().expect("reap");
586        let recs = sink.snapshot();
587        assert_eq!(recs.len(), 1);
588        assert_eq!(recs[0].action, LogAction::ReapPending);
589        assert_eq!(recs[0].result, LogResult::Ok);
590        assert!(recs[0].stream_id.is_none());
591        assert!(recs[0].ptr.is_none());
592    }
593
594    #[test]
595    fn sink_failure_increments_dropped_records_but_does_not_break_alloc() {
596        // Custom sink that always refuses. Verifies LoggingResource
597        // never propagates sink errors and counts losses.
598        struct RefuseAllSink;
599        impl LoggingSink for RefuseAllSink {
600            fn emit(&self, _r: LogRecord) -> Result<(), SinkError> {
601                Err(SinkError::Refused("test sink refuses all".into()))
602            }
603        }
604
605        let Some(device) = try_device() else {
606            return;
607        };
608        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 0));
609        let sink = Arc::new(RefuseAllSink);
610        let r = LoggingResource::new(inner, sink);
611
612        // Allocator semantics are still correct.
613        let block = r
614            .allocate(128, StreamId::DEFAULT, AllocTag("refuse-test"))
615            .expect("alloc must succeed even when sink refuses");
616        r.deallocate(block).expect("dealloc must succeed too");
617        // Two refused records (alloc + dealloc).
618        assert_eq!(r.dropped_records(), 2);
619    }
620
621    #[test]
622    fn forwards_bytes_outstanding_and_device_ordinal() {
623        let Some(device) = try_device() else {
624            return;
625        };
626        let inner = Box::new(DirectCudaResource::new(Arc::clone(&device), 3));
627        let sink = Arc::new(InMemorySink::new());
628        let r = LoggingResource::new(inner, sink);
629
630        assert_eq!(r.device_ordinal(), 3);
631        assert_eq!(r.bytes_outstanding(), 0);
632    }
633
634    #[test]
635    fn truncate_handles_non_ascii_at_cap_without_panicking() {
636        // `String::truncate(cap)` would panic if `cap` lands inside
637        // a multibyte UTF-8 sequence; the boundary-safe `truncate`
638        // helper must walk back to the previous boundary and
639        // succeed instead. Regression for the logging-path
640        // panic the decorator's "never panics on telemetry"
641        // contract requires.
642        // "héllo": h, é (2 bytes), l, l, o = 6 bytes total. Cap=2
643        // would slice into é if naive.
644        let s = String::from("héllo");
645        let out = truncate(s, 2);
646        assert!(out.starts_with('h'));
647        assert!(out.ends_with('…'));
648
649        // Cap larger than string is a no-op.
650        let out = truncate(String::from("ok"), 100);
651        assert_eq!(out, "ok");
652
653        // Cap = 0 produces just the ellipsis.
654        let out = truncate(String::from("héllo"), 0);
655        assert_eq!(out, "…");
656
657        // Cap on a valid char boundary does not regress the simple
658        // path.
659        let out = truncate(String::from("abcdefgh"), 3);
660        assert_eq!(out, "abc…");
661    }
662
663    #[test]
664    fn null_sink_accepts_records_without_retention() {
665        let sink = NullSink::new();
666        let rec = LogRecord {
667            action: LogAction::Allocate,
668            device_ordinal: 0,
669            stream_id: Some(StreamId::DEFAULT),
670            ptr: Some(0xdead_beef),
671            bytes: Some(64),
672            tag: Some(AllocTag::UNTAGGED),
673            generation: Some(Generation::next()),
674            thread_id: 0,
675            order_counter: 1,
676            timestamp_nanos: 0,
677            result: LogResult::Ok,
678        };
679        sink.emit(rec).expect("NullSink never errors");
680        // No retention surface to inspect — the contract is that
681        // emit succeeded and nothing else happened.
682    }
683}