From ab344e7e0b7872de082d14741ac6b07bc81ab642 Mon Sep 17 00:00:00 2001 From: anna-parker <50943381+anna-parker@users.noreply.github.com> Date: Fri, 19 Dec 2025 07:14:45 +0100 Subject: [PATCH 1/3] feat(silo-import): use orjsonl to increase speed of parsing ndjson and only check pipeline version of first record --- silo-import/pyproject.toml | 1 + silo-import/src/silo_import/decompressor.py | 28 ++++++------------- .../src/silo_import/download_manager.py | 4 +-- silo-import/src/silo_import/lineage.py | 9 ++---- silo-import/src/silo_import/runner.py | 2 +- 5 files changed, 14 insertions(+), 30 deletions(-) diff --git a/silo-import/pyproject.toml b/silo-import/pyproject.toml index 4a5586fde1..a83aba8c51 100644 --- a/silo-import/pyproject.toml +++ b/silo-import/pyproject.toml @@ -5,6 +5,7 @@ requires-python = ">=3.11" dependencies = [ "zstandard>=0.22,<0.26", "requests>=2.31,<3.0", + "orjsonl>=0.3,<1.0", ] [project.scripts] diff --git a/silo-import/src/silo_import/decompressor.py b/silo-import/src/silo_import/decompressor.py index ba2053757c..94fbf7ae8b 100644 --- a/silo-import/src/silo_import/decompressor.py +++ b/silo-import/src/silo_import/decompressor.py @@ -2,12 +2,11 @@ from __future__ import annotations -import io -import json import logging from dataclasses import dataclass from pathlib import Path +import orjsonl import zstandard logger = logging.getLogger(__name__) @@ -18,7 +17,7 @@ class NdjsonAnalysis: """Result of analyzing an NDJSON file.""" record_count: int - pipeline_versions: set[int] + pipeline_version: int | None def analyze_ndjson(path: Path) -> NdjsonAnalysis: @@ -35,30 +34,19 @@ def analyze_ndjson(path: Path) -> NdjsonAnalysis: RuntimeError: If decompression or JSON parsing fails """ record_count = 0 - pipeline_versions: set[int] = set() + pipeline_version: int | None = None decompressor = zstandard.ZstdDecompressor() try: with path.open("rb") as compressed, decompressor.stream_reader(compressed) as reader: - text_stream = io.TextIOWrapper(reader, encoding="utf-8") - for line in text_stream: - line_stripped = line.strip() - if not line_stripped: - continue + for record in orjsonl.stream(reader): record_count += 1 - try: - obj = json.loads(line_stripped) - except json.JSONDecodeError as exc: - msg = f"Invalid JSON record: {exc}" - raise RuntimeError(msg) from exc + if pipeline_version is None: + pipeline_version = record.get("metadata", {}).get("pipelineVersion") - metadata = obj.get("metadata") if isinstance(obj, dict) else None - if isinstance(metadata, dict): - pipeline_version = metadata.get("pipelineVersion") - if pipeline_version: - pipeline_versions.add(int(pipeline_version)) except zstandard.ZstdError as exc: msg = f"Failed to decompress {path}: {exc}" + logger.error(msg) raise RuntimeError(msg) from exc - return NdjsonAnalysis(record_count=record_count, pipeline_versions=pipeline_versions) + return NdjsonAnalysis(record_count=record_count, pipeline_version=pipeline_version) diff --git a/silo-import/src/silo_import/download_manager.py b/silo-import/src/silo_import/download_manager.py index 313e72b8fb..0df45cd9c1 100644 --- a/silo-import/src/silo_import/download_manager.py +++ b/silo-import/src/silo_import/download_manager.py @@ -45,7 +45,7 @@ class DownloadResult: directory: Path data_path: Path etag: str - pipeline_versions: set[int] + pipeline_version: int | None def _download_file( @@ -202,7 +202,7 @@ def download_release( directory=download_dir, data_path=data_path, etag=etag_value, - pipeline_versions=analysis.pipeline_versions, + pipeline_version=analysis.pipeline_version, ) except ( diff --git a/silo-import/src/silo_import/lineage.py b/silo-import/src/silo_import/lineage.py index bb4c5e55f4..35abbdd236 100644 --- a/silo-import/src/silo_import/lineage.py +++ b/silo-import/src/silo_import/lineage.py @@ -13,7 +13,7 @@ def update_lineage_definitions( - pipeline_versions: set[int], + pipeline_version: int | None, config: ImporterConfig, paths: ImporterPaths, ) -> None: @@ -21,17 +21,12 @@ def update_lineage_definitions( logger.info("LINEAGE_DEFINITIONS not provided; skipping lineage configuration") return - if not pipeline_versions: + if not pipeline_version: # required for dummy organisms logger.info("No pipeline version found; writing empty lineage definitions") write_text(paths.lineage_definition_file, "{}\n") return - if len(pipeline_versions) > 1: - msg = "Multiple pipeline versions found in released data" - raise RuntimeError(msg) - - pipeline_version = next(iter(pipeline_versions)) lineage_url: str | None = config.lineage_definitions.get(int(pipeline_version)) if not lineage_url: msg = f"No lineage definition URL configured for pipeline version {pipeline_version}" diff --git a/silo-import/src/silo_import/runner.py b/silo-import/src/silo_import/runner.py index 818edf49e0..9bd35ab991 100644 --- a/silo-import/src/silo_import/runner.py +++ b/silo-import/src/silo_import/runner.py @@ -79,7 +79,7 @@ def run_once(self) -> None: return try: - update_lineage_definitions(download.pipeline_versions, self.config, self.paths) + update_lineage_definitions(download.pipeline_version, self.config, self.paths) except Exception: logger.exception("Failed to download lineage definitions; cleaning up input") safe_remove(self.paths.silo_input_data_path) From 4ac56464def04eaa8c0011871cc64ef56062355c Mon Sep 17 00:00:00 2001 From: anna-parker <50943381+anna-parker@users.noreply.github.com> Date: Fri, 19 Dec 2025 07:24:26 +0100 Subject: [PATCH 2/3] testing --- silo-import/src/silo_import/decompressor.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/silo-import/src/silo_import/decompressor.py b/silo-import/src/silo_import/decompressor.py index 94fbf7ae8b..99ab9d3cdd 100644 --- a/silo-import/src/silo_import/decompressor.py +++ b/silo-import/src/silo_import/decompressor.py @@ -7,7 +7,6 @@ from pathlib import Path import orjsonl -import zstandard logger = logging.getLogger(__name__) @@ -35,16 +34,14 @@ def analyze_ndjson(path: Path) -> NdjsonAnalysis: """ record_count = 0 pipeline_version: int | None = None - decompressor = zstandard.ZstdDecompressor() try: - with path.open("rb") as compressed, decompressor.stream_reader(compressed) as reader: - for record in orjsonl.stream(reader): - record_count += 1 - if pipeline_version is None: - pipeline_version = record.get("metadata", {}).get("pipelineVersion") + for record in orjsonl.stream(path): + record_count += 1 + if pipeline_version is None: + pipeline_version = record.get("metadata", {}).get("pipelineVersion") - except zstandard.ZstdError as exc: + except Exception as exc: msg = f"Failed to decompress {path}: {exc}" logger.error(msg) raise RuntimeError(msg) from exc From 696898550d30e7ef8f39c485f24fc0034679ca9e Mon Sep 17 00:00:00 2001 From: anna-parker <50943381+anna-parker@users.noreply.github.com> Date: Fri, 19 Dec 2025 07:27:03 +0100 Subject: [PATCH 3/3] ignore --- silo-import/src/silo_import/decompressor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/silo-import/src/silo_import/decompressor.py b/silo-import/src/silo_import/decompressor.py index 99ab9d3cdd..2ae80fdbdf 100644 --- a/silo-import/src/silo_import/decompressor.py +++ b/silo-import/src/silo_import/decompressor.py @@ -39,7 +39,7 @@ def analyze_ndjson(path: Path) -> NdjsonAnalysis: for record in orjsonl.stream(path): record_count += 1 if pipeline_version is None: - pipeline_version = record.get("metadata", {}).get("pipelineVersion") + pipeline_version = record.get("metadata", {}).get("pipelineVersion") # type: ignore except Exception as exc: msg = f"Failed to decompress {path}: {exc}"