1use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use xlog_core::{Result, XlogError};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum GraphInputFormat {
13 Jsonl,
15 Csv,
17 NTriples,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct BoundedMemoryTelemetry {
24 pub max_chunk_rows: usize,
26 pub chunks: usize,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct GraphEdgeRow {
33 pub subject: String,
35 pub predicate: String,
37 pub object: String,
39 pub split: String,
41 pub row_hash: String,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct GraphRelationLoadReport {
48 pub total_rows: usize,
50 pub edge_rows: usize,
52 pub relation_histogram: BTreeMap<String, usize>,
54 pub split_histogram: BTreeMap<String, usize>,
56 pub row_hashes: Vec<String>,
58 pub bounded_memory: BoundedMemoryTelemetry,
60 pub relation_columns: Vec<String>,
62}
63
64#[derive(Debug, Clone)]
66pub struct StreamingGraphRelationLoader {
67 format: GraphInputFormat,
68 chunk_rows: usize,
69}
70
71impl StreamingGraphRelationLoader {
72 pub fn new(format: GraphInputFormat) -> Self {
74 Self {
75 format,
76 chunk_rows: 100_000,
77 }
78 }
79
80 pub fn with_chunk_rows(mut self, chunk_rows: usize) -> Self {
82 self.chunk_rows = chunk_rows.max(1);
83 self
84 }
85
86 pub fn load_path(&self, path: impl AsRef<Path>) -> Result<GraphRelationLoadReport> {
88 self.load_path_with_sink(path, |_| {})
89 }
90
91 pub fn load_path_with_sink(
93 &self,
94 path: impl AsRef<Path>,
95 mut sink: impl FnMut(GraphEdgeRow),
96 ) -> Result<GraphRelationLoadReport> {
97 let file = File::open(path.as_ref()).map_err(|e| {
98 XlogError::Execution(format!(
99 "Failed to open biomedical graph stream {}: {}",
100 path.as_ref().display(),
101 e
102 ))
103 })?;
104 let mut reader = BufReader::new(file);
105 match self.format {
106 GraphInputFormat::Jsonl => self.load_jsonl(&mut reader, &mut sink),
107 GraphInputFormat::Csv => self.load_csv(&mut reader, &mut sink),
108 GraphInputFormat::NTriples => self.load_ntriples(&mut reader, &mut sink),
109 }
110 }
111
112 fn empty_report(&self) -> GraphRelationLoadReport {
113 GraphRelationLoadReport {
114 total_rows: 0,
115 edge_rows: 0,
116 relation_histogram: BTreeMap::new(),
117 split_histogram: BTreeMap::new(),
118 row_hashes: Vec::new(),
119 bounded_memory: BoundedMemoryTelemetry {
120 max_chunk_rows: self.chunk_rows,
121 chunks: 0,
122 },
123 relation_columns: vec![
124 "subject".to_string(),
125 "predicate".to_string(),
126 "object".to_string(),
127 ],
128 }
129 }
130
131 fn record_edge(
132 &self,
133 report: &mut GraphRelationLoadReport,
134 subject: String,
135 predicate: String,
136 object: String,
137 split: String,
138 sink: &mut dyn FnMut(GraphEdgeRow),
139 ) {
140 let row_hash = stable_row_hash(&subject, &predicate, &object, &split);
141 report.total_rows += 1;
142 report.edge_rows += 1;
143 *report
144 .relation_histogram
145 .entry(predicate.clone())
146 .or_insert(0) += 1;
147 *report.split_histogram.entry(split.clone()).or_insert(0) += 1;
148 report.row_hashes.push(row_hash.clone());
149 report.bounded_memory.chunks = report.total_rows.div_ceil(self.chunk_rows);
150 sink(GraphEdgeRow {
151 subject,
152 predicate,
153 object,
154 split,
155 row_hash,
156 });
157 }
158
159 fn load_jsonl(
160 &self,
161 reader: &mut dyn BufRead,
162 sink: &mut dyn FnMut(GraphEdgeRow),
163 ) -> Result<GraphRelationLoadReport> {
164 let mut report = self.empty_report();
165 for line in reader.lines() {
166 let line = line.map_err(read_error)?;
167 let trimmed = line.trim();
168 if trimmed.is_empty() {
169 continue;
170 }
171 let subject = json_string_field(trimmed, "subject")?;
172 let predicate = json_string_field(trimmed, "predicate")?;
173 let object = json_string_field(trimmed, "object")?;
174 let split =
175 json_string_field(trimmed, "split").unwrap_or_else(|_| "unspecified".to_string());
176 self.record_edge(&mut report, subject, predicate, object, split, sink);
177 }
178 Ok(report)
179 }
180
181 fn load_csv(
182 &self,
183 reader: &mut dyn BufRead,
184 sink: &mut dyn FnMut(GraphEdgeRow),
185 ) -> Result<GraphRelationLoadReport> {
186 let mut report = self.empty_report();
187 let mut lines = reader.lines();
188 let header = match lines.next() {
189 Some(line) => line.map_err(read_error)?,
190 None => return Ok(report),
191 };
192 let headers: Vec<String> = header
193 .split(',')
194 .map(|item| item.trim().to_string())
195 .collect();
196 let subject_idx = csv_column(&headers, "subject")?;
197 let predicate_idx = csv_column(&headers, "predicate")?;
198 let object_idx = csv_column(&headers, "object")?;
199 let split_idx = headers.iter().position(|item| item == "split");
200
201 for line in lines {
202 let line = line.map_err(read_error)?;
203 let trimmed = line.trim();
204 if trimmed.is_empty() {
205 continue;
206 }
207 let cells: Vec<&str> = trimmed.split(',').map(str::trim).collect();
208 let split = split_idx
209 .and_then(|idx| cells.get(idx))
210 .filter(|value| !value.is_empty())
211 .copied()
212 .unwrap_or("unspecified");
213 self.record_edge(
214 &mut report,
215 csv_cell(&cells, subject_idx, "subject")?.to_string(),
216 csv_cell(&cells, predicate_idx, "predicate")?.to_string(),
217 csv_cell(&cells, object_idx, "object")?.to_string(),
218 split.to_string(),
219 sink,
220 );
221 }
222 Ok(report)
223 }
224
225 fn load_ntriples(
226 &self,
227 reader: &mut dyn BufRead,
228 sink: &mut dyn FnMut(GraphEdgeRow),
229 ) -> Result<GraphRelationLoadReport> {
230 let mut report = self.empty_report();
231 for line in reader.lines() {
232 let line = line.map_err(read_error)?;
233 let trimmed = line.trim();
234 if trimmed.is_empty() {
235 continue;
236 }
237 let parts: Vec<&str> = trimmed.split_whitespace().collect();
238 if parts.len() < 4 || parts[3] != "." {
239 return Err(XlogError::Execution(format!(
240 "Invalid N-Triples row: {}",
241 trimmed
242 )));
243 }
244 self.record_edge(
245 &mut report,
246 trim_iri(parts[0]).to_string(),
247 trim_iri(parts[1]).to_string(),
248 trim_iri(parts[2]).to_string(),
249 "unspecified".to_string(),
250 sink,
251 );
252 }
253 Ok(report)
254 }
255}
256
257fn json_string_field(line: &str, key: &str) -> Result<String> {
258 let needle = format!("\"{}\"", key);
259 let key_pos = line
260 .find(&needle)
261 .ok_or_else(|| XlogError::Execution(format!("Missing JSONL field {}", key)))?;
262 let after_key = &line[key_pos + needle.len()..];
263 let colon_pos = after_key
264 .find(':')
265 .ok_or_else(|| XlogError::Execution(format!("Missing ':' for JSONL field {}", key)))?;
266 let after_colon = after_key[colon_pos + 1..].trim_start();
267 if !after_colon.starts_with('"') {
268 return Err(XlogError::Execution(format!(
269 "JSONL field {} must be a string",
270 key
271 )));
272 }
273 let rest = &after_colon[1..];
274 let end = rest
275 .find('"')
276 .ok_or_else(|| XlogError::Execution(format!("Unterminated JSONL field {}", key)))?;
277 Ok(rest[..end].to_string())
278}
279
280fn csv_column(headers: &[String], name: &str) -> Result<usize> {
281 headers
282 .iter()
283 .position(|item| item == name)
284 .ok_or_else(|| XlogError::Execution(format!("Missing CSV column {}", name)))
285}
286
287fn csv_cell<'a>(cells: &'a [&str], index: usize, name: &str) -> Result<&'a str> {
288 cells
289 .get(index)
290 .copied()
291 .filter(|value| !value.is_empty())
292 .ok_or_else(|| XlogError::Execution(format!("Missing CSV value for {}", name)))
293}
294
295fn trim_iri(value: &str) -> &str {
296 value.trim_start_matches('<').trim_end_matches('>')
297}
298
299fn stable_row_hash(subject: &str, predicate: &str, object: &str, split: &str) -> String {
300 let mut hash = 0xcbf29ce484222325u64;
301 for byte in format!("{subject}|{predicate}|{object}|{split}").bytes() {
302 hash ^= u64::from(byte);
303 hash = hash.wrapping_mul(0x100000001b3);
304 }
305 format!("{hash:016x}")
306}
307
308fn read_error(error: std::io::Error) -> XlogError {
309 XlogError::Execution(format!("Failed to read biomedical graph stream: {}", error))
310}