diff --git a/src/chronify/csv_utils.py b/src/chronify/csv_utils.py new file mode 100644 index 0000000..88f11f3 --- /dev/null +++ b/src/chronify/csv_utils.py @@ -0,0 +1,26 @@ +"""CSV utilities for auto-detection features.""" + +import os +from functools import lru_cache +from pathlib import Path +from typing import Optional + + +@lru_cache(maxsize=32) +def _get_file_size(path: str | Path) -> int: + try: + return os.path.getsize(path) + except (OSError, FileNotFoundError): + return 0 + + +def _is_large_file(path: str | Path, threshold_mb: int = 50) -> bool: + size_mb = _get_file_size(path) / (1024 * 1024) + return size_mb > threshold_mb + + +def _should_use_auto_detect(auto_detect: Optional[bool] = None) -> bool: + """Check if auto-detection should be used: parameter > environment > False.""" + if auto_detect is not None: + return auto_detect + return os.environ.get("CHRONIFY_AUTO_DETECT_CSV", "").lower() in ("true", "1", "yes") diff --git a/src/chronify/store.py b/src/chronify/store.py index c5ecc31..2b20b85 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -1,6 +1,7 @@ from collections.abc import Iterable from pathlib import Path import shutil +import os from typing import Any, Optional from chronify.utils.sql import make_temp_view_name @@ -39,6 +40,7 @@ get_duckdb_types_from_pandas, get_sqlalchemy_type_from_duckdb, ) +from chronify.csv_utils import _should_use_auto_detect, _is_large_file, _get_file_size from chronify.sqlalchemy.functions import ( create_view_from_parquet, read_database, @@ -349,6 +351,7 @@ def ingest_from_csv( src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Optional[Connection] = None, + auto_detect: Optional[bool] = None, ) -> bool: """Ingest data from a CSV file. @@ -362,6 +365,8 @@ def ingest_from_csv( Defines the destination table in the database. connection Optional connection to reuse. Refer to :meth:`ingest_table` for notes. + auto_detect + Enable auto-detection. If None, checks CHRONIFY_AUTO_DETECT_CSV environment variable. Returns ------- @@ -402,7 +407,12 @@ def ingest_from_csv( -------- ingest_from_csvs """ - return self.ingest_from_csvs((path,), src_schema, dst_schema, connection=connection) + if _should_use_auto_detect(auto_detect) and _is_large_file(path): + logger.info(f"Processing large file {path} with auto-detection optimizations") + + return self.ingest_from_csvs( + (path,), src_schema, dst_schema, connection=connection, auto_detect=auto_detect + ) def ingest_from_csvs( self, @@ -410,12 +420,13 @@ def ingest_from_csvs( src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Optional[Connection] = None, + auto_detect: Optional[bool] = None, ) -> bool: - """Ingest data into the table specifed by schema. If the table does not exist, - create it. This is faster than calling :meth:`ingest_from_csv` many times. - Each file is loaded into memory one at a time. - If any error occurs, all added data will be removed and the state of the database will - be the same as the original state. + """Ingest data from multiple CSV files. + + This is faster than calling :meth:`ingest_from_csv` multiple times. + Each file is loaded one at a time. If any error occurs, all added data + will be removed and the database state will be unchanged. Parameters ---------- @@ -427,6 +438,8 @@ def ingest_from_csvs( Defines the destination table in the database. conn Optional connection to reuse. Refer to :meth:`ingest_table` for notes. + auto_detect + Enable auto-detection. If None, checks CHRONIFY_AUTO_DETECT_CSV environment variable. Returns ------- @@ -1234,6 +1247,67 @@ def _handle_sqlite_error_case(self, name: str, connection: Optional[Connection]) with self._engine.begin() as conn: conn.execute(text(f"DROP TABLE IF EXISTS {name}")) + # Simple CSV Inspection (Always Available) + def inspect_csv( + self, path: Path | str, peek_rows: int = 5, auto_detect: Optional[bool] = None + ) -> dict[str, Any]: + """Inspect CSV file structure and provide recommendations. + + Parameters + ---------- + path + Path to CSV file + peek_rows + Number of rows to sample + auto_detect + Enable auto-detection. If None, checks CHRONIFY_AUTO_DETECT_CSV environment variable. + + Returns + ------- + dict + File information including columns, detected format, size, and recommendations + """ + import duckdb + + file_size_mb = _get_file_size(str(path)) / (1024 * 1024) if os.path.exists(path) else 0 + is_large = _is_large_file(path) + + try: + rel = duckdb.sql(f"SELECT * FROM read_csv('{path}') LIMIT {peek_rows}") + columns = rel.columns + sample_data = rel.to_df().to_dict("records") + except Exception as e: + return {"error": f"Could not read CSV: {e}"} + + auto_detect_enabled = _should_use_auto_detect(auto_detect) + recommendations = [] + if is_large: + if auto_detect_enabled: + recommendations.append("Large file detected - auto-detection optimizations active") + else: + recommendations.append( + "Consider enabling auto-detection for better large file handling" + ) + + detected_format = None + col_set = {col.lower().strip() for col in columns} + if "name" in col_set and "value" in col_set and len(col_set) == 2: + detected_format = "name_value" + elif "datetime" in col_set or "timestamp" in col_set: + detected_format = "datetime_series" + elif any(f"m{i:02d}" in col_set for i in range(1, 13)): + detected_format = "monthly_data" + + return { + "columns": columns, + "detected_format": detected_format, + "file_size_mb": round(file_size_mb, 2), + "is_large_file": is_large, + "auto_detect_enabled": auto_detect_enabled, + "sample_data": sample_data, + "recommendations": recommendations, + } + def check_columns( table_columns: Iterable[str], diff --git a/tests/test_auto_detect_csv.py b/tests/test_auto_detect_csv.py new file mode 100644 index 0000000..f008dad --- /dev/null +++ b/tests/test_auto_detect_csv.py @@ -0,0 +1,74 @@ +"""Test auto-detect CSV functionality.""" + +import os +import tempfile +from collections.abc import Generator +from pathlib import Path + +import pytest + +from chronify import Store +from chronify.csv_utils import _should_use_auto_detect + + +@pytest.fixture +def csv_file() -> Generator[str, None, None]: + """Create temporary CSV file for testing.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f: + f.write("timestamp,device,value\n") + f.write("2020-01-01 00:00,A,100\n") + f.write("2020-01-01 01:00,A,200\n") + csv_path = f.name + + yield csv_path + + Path(csv_path).unlink(missing_ok=True) + + +@pytest.fixture +def clean_env() -> Generator[None, None, None]: + """Ensure clean environment variable state.""" + yield + if "CHRONIFY_AUTO_DETECT_CSV" in os.environ: + del os.environ["CHRONIFY_AUTO_DETECT_CSV"] + + +def test_inspect_csv_default(csv_file: str) -> None: + """Test CSV inspection without auto-detect.""" + store = Store() + result = store.inspect_csv(csv_file) + assert "error" not in result + + +def test_inspect_csv_with_parameter(csv_file: str) -> None: + """Test CSV inspection with auto-detect parameter.""" + store = Store() + result = store.inspect_csv(csv_file, auto_detect=True) + assert "error" not in result + + +def test_inspect_csv_with_env_variable(csv_file: str, clean_env: None) -> None: + """Test CSV inspection with environment variable.""" + os.environ["CHRONIFY_AUTO_DETECT_CSV"] = "true" + store = Store() + result = store.inspect_csv(csv_file) + assert "error" not in result + + +def test_parameter_overrides_env(csv_file: str, clean_env: None) -> None: + """Test parameter override of environment variable.""" + os.environ["CHRONIFY_AUTO_DETECT_CSV"] = "true" + store = Store() + result = store.inspect_csv(csv_file, auto_detect=False) + assert "error" not in result + + +def test_should_use_auto_detect(clean_env: None) -> None: + """Test auto-detect priority logic.""" + assert _should_use_auto_detect(None) is False + assert _should_use_auto_detect(True) is True + assert _should_use_auto_detect(False) is False + + os.environ["CHRONIFY_AUTO_DETECT_CSV"] = "true" + assert _should_use_auto_detect(None) is True + assert _should_use_auto_detect(False) is False