diff --git a/src/orcapod/databases/__init__.py b/src/orcapod/databases/__init__.py index f47c734..e04f888 100644 --- a/src/orcapod/databases/__init__.py +++ b/src/orcapod/databases/__init__.py @@ -1,16 +1,4 @@ -# from .legacy.types import DataStore, ArrowDataStore -# from .legacy.legacy_arrow_data_stores import MockArrowDataStore, SimpleParquetDataStore -# from .legacy.dict_data_stores import DirDataStore, NoOpDataStore -# from .legacy.safe_dir_data_store import SafeDirDataStore - -# __all__ = [ -# "DataStore", -# "ArrowDataStore", -# "DirDataStore", -# "SafeDirDataStore", -# "NoOpDataStore", -# "MockArrowDataStore", -# "SimpleParquetDataStore", -# ] - +from .no_op_database import NoOpDatabase from .delta_lake_databases import DeltaTableDatabase + +__all__ = ["NoOpDatabase", "DeltaTableDatabase"] diff --git a/src/orcapod/databases/no_op_database.py b/src/orcapod/databases/no_op_database.py new file mode 100644 index 0000000..d4ca9b0 --- /dev/null +++ b/src/orcapod/databases/no_op_database.py @@ -0,0 +1,88 @@ +import logging +from collections.abc import Collection, Mapping +from typing import TYPE_CHECKING, Any, Literal + + +from orcapod.utils.lazy_module import LazyModule + +if TYPE_CHECKING: + import polars as pl + import pyarrow as pa +else: + pa = LazyModule("pyarrow") + pl = LazyModule("polars") + +# Module-level logger +logger = logging.getLogger(__name__) + + +class NoOpDatabase: + def add_record( + self, + record_path: tuple[str, ...], + record_id: str, + record: "pa.Table", + skip_duplicates: bool = False, + flush: bool = False, + schema_handling: Literal["merge", "error", "coerce"] = "error", + ) -> None: + return + + def add_records( + self, + record_path: tuple[str, ...], + records: pa.Table, + record_id_column: str | None = None, + skip_duplicates: bool = False, + flush: bool = False, + schema_handling: Literal["merge", "error", "coerce"] = "error", + ) -> None: + return + + def get_all_records( + self, + record_path: tuple[str, ...], + record_id_column: str | None = None, + retrieve_pending: bool = True, + ) -> pa.Table | None: + return None + + def get_records_with_column_value( + self, + record_path: tuple[str, ...], + column_values: Collection[tuple[str, Any]] | Mapping[str, Any], + record_id_column: str | None = None, + flush: bool = False, + ) -> "pa.Table | None": + return None + + def get_record_by_id( + self, + record_path: tuple[str, ...], + record_id: str, + record_id_column: str | None = None, + flush: bool = False, + ) -> "pa.Table | None": + return None + + def get_records_by_ids( + self, + record_path: tuple[str, ...], + record_ids: "Collection[str] | pl.Series | pa.Array", + record_id_column: str | None = None, + flush: bool = False, + ) -> "pa.Table | None": + return None + + def flush(self) -> None: + """Flush all pending batches.""" + return None + + def flush_batch(self, record_path: tuple[str, ...]) -> None: + """ + Flush pending batch for a specific source path. + + Args: + record_path: Tuple of path components + """ + return None diff --git a/src/orcapod/pipeline/graph.py b/src/orcapod/pipeline/graph.py index ddb7422..6eee0a2 100644 --- a/src/orcapod/pipeline/graph.py +++ b/src/orcapod/pipeline/graph.py @@ -1,9 +1,11 @@ +import warnings from orcapod.core.trackers import GraphTracker, Invocation from orcapod.pipeline.nodes import KernelNode, PodNode from orcapod.protocols.pipeline_protocols import Node from orcapod import contexts from orcapod.protocols import core_protocols as cp from orcapod.protocols import database_protocols as dbp +from orcapod.databases import NoOpDatabase from typing import Any from collections.abc import Collection import os @@ -73,7 +75,7 @@ class Pipeline(GraphTracker): def __init__( self, name: str | tuple[str, ...], - pipeline_database: dbp.ArrowDatabase, + pipeline_database: dbp.ArrowDatabase | None = None, results_database: dbp.ArrowDatabase | None = None, tracker_manager: cp.TrackerManager | None = None, data_context: str | contexts.DataContext | None = None, @@ -84,14 +86,18 @@ def __init__( name = (name,) self.name = name self.pipeline_store_path_prefix = self.name + + if pipeline_database is None: + pipeline_database = NoOpDatabase() + warnings.warn( + "No database was specified. Pipeline results will not be saved" + ) + self.results_store_path_prefix = () if results_database is None: - if pipeline_database is None: - raise ValueError( - "Either pipeline_database or results_database must be provided" - ) results_database = pipeline_database self.results_store_path_prefix = self.name + ("_results",) + self.pipeline_database = pipeline_database self.results_database = results_database self.nodes: dict[str, Node] = {}