Skip to main content

xlog_gpu/
biokg.rs

1//! Streaming biomedical graph relation loading helpers.
2
3use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use xlog_core::{Result, XlogError};
9
10/// Supported graph stream formats for relation loading.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum GraphInputFormat {
13    /// One JSON object per line with subject, predicate, object, and optional split fields.
14    Jsonl,
15    /// CSV with subject, predicate, object, and optional split columns.
16    Csv,
17    /// RDF N-Triples with subject predicate object triples.
18    NTriples,
19}
20
21/// Bounded-memory telemetry collected while streaming rows.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct BoundedMemoryTelemetry {
24    /// Maximum row count accepted per loader chunk.
25    pub max_chunk_rows: usize,
26    /// Number of chunks observed while streaming the input.
27    pub chunks: usize,
28}
29
30/// One typed biomedical graph edge row emitted by the streaming loader.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct GraphEdgeRow {
33    /// Subject node identifier.
34    pub subject: String,
35    /// Predicate/relation label.
36    pub predicate: String,
37    /// Object node identifier.
38    pub object: String,
39    /// Split/provenance label.
40    pub split: String,
41    /// Stable row hash over normalized subject, predicate, object, and split fields.
42    pub row_hash: String,
43}
44
45/// Provenance and histogram summary for a streamed biomedical graph relation.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct GraphRelationLoadReport {
48    /// Total non-empty rows read from the input stream.
49    pub total_rows: usize,
50    /// Rows that produced typed graph edges.
51    pub edge_rows: usize,
52    /// Edge count per predicate/relation label.
53    pub relation_histogram: BTreeMap<String, usize>,
54    /// Edge count per split label.
55    pub split_histogram: BTreeMap<String, usize>,
56    /// Stable per-row hashes over normalized subject, predicate, object, and split fields.
57    pub row_hashes: Vec<String>,
58    /// Streaming chunk telemetry.
59    pub bounded_memory: BoundedMemoryTelemetry,
60    /// Relation column names emitted by the loader.
61    pub relation_columns: Vec<String>,
62}
63
64/// Streaming loader for typed biomedical graph relation rows.
65#[derive(Debug, Clone)]
66pub struct StreamingGraphRelationLoader {
67    format: GraphInputFormat,
68    chunk_rows: usize,
69}
70
71impl StreamingGraphRelationLoader {
72    /// Create a loader for the given input format.
73    pub fn new(format: GraphInputFormat) -> Self {
74        Self {
75            format,
76            chunk_rows: 100_000,
77        }
78    }
79
80    /// Set the maximum number of rows accounted to a single streaming chunk.
81    pub fn with_chunk_rows(mut self, chunk_rows: usize) -> Self {
82        self.chunk_rows = chunk_rows.max(1);
83        self
84    }
85
86    /// Stream a graph file and return provenance and histogram telemetry.
87    pub fn load_path(&self, path: impl AsRef<Path>) -> Result<GraphRelationLoadReport> {
88        self.load_path_with_sink(path, |_| {})
89    }
90
91    /// Stream a graph file into a caller-provided edge sink and return telemetry.
92    pub fn load_path_with_sink(
93        &self,
94        path: impl AsRef<Path>,
95        mut sink: impl FnMut(GraphEdgeRow),
96    ) -> Result<GraphRelationLoadReport> {
97        let file = File::open(path.as_ref()).map_err(|e| {
98            XlogError::Execution(format!(
99                "Failed to open biomedical graph stream {}: {}",
100                path.as_ref().display(),
101                e
102            ))
103        })?;
104        let mut reader = BufReader::new(file);
105        match self.format {
106            GraphInputFormat::Jsonl => self.load_jsonl(&mut reader, &mut sink),
107            GraphInputFormat::Csv => self.load_csv(&mut reader, &mut sink),
108            GraphInputFormat::NTriples => self.load_ntriples(&mut reader, &mut sink),
109        }
110    }
111
112    fn empty_report(&self) -> GraphRelationLoadReport {
113        GraphRelationLoadReport {
114            total_rows: 0,
115            edge_rows: 0,
116            relation_histogram: BTreeMap::new(),
117            split_histogram: BTreeMap::new(),
118            row_hashes: Vec::new(),
119            bounded_memory: BoundedMemoryTelemetry {
120                max_chunk_rows: self.chunk_rows,
121                chunks: 0,
122            },
123            relation_columns: vec![
124                "subject".to_string(),
125                "predicate".to_string(),
126                "object".to_string(),
127            ],
128        }
129    }
130
131    fn record_edge(
132        &self,
133        report: &mut GraphRelationLoadReport,
134        subject: String,
135        predicate: String,
136        object: String,
137        split: String,
138        sink: &mut dyn FnMut(GraphEdgeRow),
139    ) {
140        let row_hash = stable_row_hash(&subject, &predicate, &object, &split);
141        report.total_rows += 1;
142        report.edge_rows += 1;
143        *report
144            .relation_histogram
145            .entry(predicate.clone())
146            .or_insert(0) += 1;
147        *report.split_histogram.entry(split.clone()).or_insert(0) += 1;
148        report.row_hashes.push(row_hash.clone());
149        report.bounded_memory.chunks = report.total_rows.div_ceil(self.chunk_rows);
150        sink(GraphEdgeRow {
151            subject,
152            predicate,
153            object,
154            split,
155            row_hash,
156        });
157    }
158
159    fn load_jsonl(
160        &self,
161        reader: &mut dyn BufRead,
162        sink: &mut dyn FnMut(GraphEdgeRow),
163    ) -> Result<GraphRelationLoadReport> {
164        let mut report = self.empty_report();
165        for line in reader.lines() {
166            let line = line.map_err(read_error)?;
167            let trimmed = line.trim();
168            if trimmed.is_empty() {
169                continue;
170            }
171            let subject = json_string_field(trimmed, "subject")?;
172            let predicate = json_string_field(trimmed, "predicate")?;
173            let object = json_string_field(trimmed, "object")?;
174            let split =
175                json_string_field(trimmed, "split").unwrap_or_else(|_| "unspecified".to_string());
176            self.record_edge(&mut report, subject, predicate, object, split, sink);
177        }
178        Ok(report)
179    }
180
181    fn load_csv(
182        &self,
183        reader: &mut dyn BufRead,
184        sink: &mut dyn FnMut(GraphEdgeRow),
185    ) -> Result<GraphRelationLoadReport> {
186        let mut report = self.empty_report();
187        let mut lines = reader.lines();
188        let header = match lines.next() {
189            Some(line) => line.map_err(read_error)?,
190            None => return Ok(report),
191        };
192        let headers: Vec<String> = header
193            .split(',')
194            .map(|item| item.trim().to_string())
195            .collect();
196        let subject_idx = csv_column(&headers, "subject")?;
197        let predicate_idx = csv_column(&headers, "predicate")?;
198        let object_idx = csv_column(&headers, "object")?;
199        let split_idx = headers.iter().position(|item| item == "split");
200
201        for line in lines {
202            let line = line.map_err(read_error)?;
203            let trimmed = line.trim();
204            if trimmed.is_empty() {
205                continue;
206            }
207            let cells: Vec<&str> = trimmed.split(',').map(str::trim).collect();
208            let split = split_idx
209                .and_then(|idx| cells.get(idx))
210                .filter(|value| !value.is_empty())
211                .copied()
212                .unwrap_or("unspecified");
213            self.record_edge(
214                &mut report,
215                csv_cell(&cells, subject_idx, "subject")?.to_string(),
216                csv_cell(&cells, predicate_idx, "predicate")?.to_string(),
217                csv_cell(&cells, object_idx, "object")?.to_string(),
218                split.to_string(),
219                sink,
220            );
221        }
222        Ok(report)
223    }
224
225    fn load_ntriples(
226        &self,
227        reader: &mut dyn BufRead,
228        sink: &mut dyn FnMut(GraphEdgeRow),
229    ) -> Result<GraphRelationLoadReport> {
230        let mut report = self.empty_report();
231        for line in reader.lines() {
232            let line = line.map_err(read_error)?;
233            let trimmed = line.trim();
234            if trimmed.is_empty() {
235                continue;
236            }
237            let parts: Vec<&str> = trimmed.split_whitespace().collect();
238            if parts.len() < 4 || parts[3] != "." {
239                return Err(XlogError::Execution(format!(
240                    "Invalid N-Triples row: {}",
241                    trimmed
242                )));
243            }
244            self.record_edge(
245                &mut report,
246                trim_iri(parts[0]).to_string(),
247                trim_iri(parts[1]).to_string(),
248                trim_iri(parts[2]).to_string(),
249                "unspecified".to_string(),
250                sink,
251            );
252        }
253        Ok(report)
254    }
255}
256
257fn json_string_field(line: &str, key: &str) -> Result<String> {
258    let needle = format!("\"{}\"", key);
259    let key_pos = line
260        .find(&needle)
261        .ok_or_else(|| XlogError::Execution(format!("Missing JSONL field {}", key)))?;
262    let after_key = &line[key_pos + needle.len()..];
263    let colon_pos = after_key
264        .find(':')
265        .ok_or_else(|| XlogError::Execution(format!("Missing ':' for JSONL field {}", key)))?;
266    let after_colon = after_key[colon_pos + 1..].trim_start();
267    if !after_colon.starts_with('"') {
268        return Err(XlogError::Execution(format!(
269            "JSONL field {} must be a string",
270            key
271        )));
272    }
273    let rest = &after_colon[1..];
274    let end = rest
275        .find('"')
276        .ok_or_else(|| XlogError::Execution(format!("Unterminated JSONL field {}", key)))?;
277    Ok(rest[..end].to_string())
278}
279
280fn csv_column(headers: &[String], name: &str) -> Result<usize> {
281    headers
282        .iter()
283        .position(|item| item == name)
284        .ok_or_else(|| XlogError::Execution(format!("Missing CSV column {}", name)))
285}
286
287fn csv_cell<'a>(cells: &'a [&str], index: usize, name: &str) -> Result<&'a str> {
288    cells
289        .get(index)
290        .copied()
291        .filter(|value| !value.is_empty())
292        .ok_or_else(|| XlogError::Execution(format!("Missing CSV value for {}", name)))
293}
294
295fn trim_iri(value: &str) -> &str {
296    value.trim_start_matches('<').trim_end_matches('>')
297}
298
299fn stable_row_hash(subject: &str, predicate: &str, object: &str, split: &str) -> String {
300    let mut hash = 0xcbf29ce484222325u64;
301    for byte in format!("{subject}|{predicate}|{object}|{split}").bytes() {
302        hash ^= u64::from(byte);
303        hash = hash.wrapping_mul(0x100000001b3);
304    }
305    format!("{hash:016x}")
306}
307
308fn read_error(error: std::io::Error) -> XlogError {
309    XlogError::Execution(format!("Failed to read biomedical graph stream: {}", error))
310}