Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/chronify/csv_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""CSV utilities for auto-detection features."""

import os
from functools import lru_cache
from pathlib import Path
from typing import Optional


@lru_cache(maxsize=32)
def _get_file_size(path: str | Path) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Wouldn't it be risky? It seems like someone could change the file during a session.

try:
return os.path.getsize(path)
except (OSError, FileNotFoundError):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you return an error if the file doesn't exist?

return 0


def _is_large_file(path: str | Path, threshold_mb: int = 50) -> bool:
size_mb = _get_file_size(path) / (1024 * 1024)
return size_mb > threshold_mb


def _should_use_auto_detect(auto_detect: Optional[bool] = None) -> bool:
"""Check if auto-detection should be used: parameter > environment > False."""
if auto_detect is not None:
return auto_detect
return os.environ.get("CHRONIFY_AUTO_DETECT_CSV", "").lower() in ("true", "1", "yes")
86 changes: 80 additions & 6 deletions src/chronify/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections.abc import Iterable
from pathlib import Path
import shutil
import os
from typing import Any, Optional
from chronify.utils.sql import make_temp_view_name

Expand Down Expand Up @@ -39,6 +40,7 @@
get_duckdb_types_from_pandas,
get_sqlalchemy_type_from_duckdb,
)
from chronify.csv_utils import _should_use_auto_detect, _is_large_file, _get_file_size
from chronify.sqlalchemy.functions import (
create_view_from_parquet,
read_database,
Expand Down Expand Up @@ -349,6 +351,7 @@ def ingest_from_csv(
src_schema: CsvTableSchema,
dst_schema: TableSchema,
connection: Optional[Connection] = None,
auto_detect: Optional[bool] = None,
) -> bool:
"""Ingest data from a CSV file.

Expand All @@ -362,6 +365,8 @@ def ingest_from_csv(
Defines the destination table in the database.
connection
Optional connection to reuse. Refer to :meth:`ingest_table` for notes.
auto_detect
Enable auto-detection. If None, checks CHRONIFY_AUTO_DETECT_CSV environment variable.

Returns
-------
Expand Down Expand Up @@ -402,20 +407,26 @@ def ingest_from_csv(
--------
ingest_from_csvs
"""
return self.ingest_from_csvs((path,), src_schema, dst_schema, connection=connection)
if _should_use_auto_detect(auto_detect) and _is_large_file(path):
logger.info(f"Processing large file {path} with auto-detection optimizations")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the optimization?


return self.ingest_from_csvs(
(path,), src_schema, dst_schema, connection=connection, auto_detect=auto_detect
)

def ingest_from_csvs(
self,
paths: Iterable[Path | str],
src_schema: CsvTableSchema,
dst_schema: TableSchema,
connection: Optional[Connection] = None,
auto_detect: Optional[bool] = None,
) -> bool:
"""Ingest data into the table specifed by schema. If the table does not exist,
create it. This is faster than calling :meth:`ingest_from_csv` many times.
Each file is loaded into memory one at a time.
If any error occurs, all added data will be removed and the state of the database will
be the same as the original state.
"""Ingest data from multiple CSV files.

This is faster than calling :meth:`ingest_from_csv` multiple times.
Each file is loaded one at a time. If any error occurs, all added data
will be removed and the database state will be unchanged.

Parameters
----------
Expand All @@ -427,6 +438,8 @@ def ingest_from_csvs(
Defines the destination table in the database.
conn
Optional connection to reuse. Refer to :meth:`ingest_table` for notes.
auto_detect
Enable auto-detection. If None, checks CHRONIFY_AUTO_DETECT_CSV environment variable.

Returns
-------
Expand Down Expand Up @@ -1234,6 +1247,67 @@ def _handle_sqlite_error_case(self, name: str, connection: Optional[Connection])
with self._engine.begin() as conn:
conn.execute(text(f"DROP TABLE IF EXISTS {name}"))

# Simple CSV Inspection (Always Available)
def inspect_csv(
self, path: Path | str, peek_rows: int = 5, auto_detect: Optional[bool] = None
) -> dict[str, Any]:
"""Inspect CSV file structure and provide recommendations.

Parameters
----------
path
Path to CSV file
peek_rows
Number of rows to sample
auto_detect
Enable auto-detection. If None, checks CHRONIFY_AUTO_DETECT_CSV environment variable.

Returns
-------
dict
File information including columns, detected format, size, and recommendations
"""
import duckdb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function could reside in csv_io.py. It doesn't need self, right?


file_size_mb = _get_file_size(str(path)) / (1024 * 1024) if os.path.exists(path) else 0
is_large = _is_large_file(path)

try:
rel = duckdb.sql(f"SELECT * FROM read_csv('{path}') LIMIT {peek_rows}")
columns = rel.columns
sample_data = rel.to_df().to_dict("records")
except Exception as e:
return {"error": f"Could not read CSV: {e}"}

auto_detect_enabled = _should_use_auto_detect(auto_detect)
recommendations = []
if is_large:
if auto_detect_enabled:
recommendations.append("Large file detected - auto-detection optimizations active")
else:
recommendations.append(
"Consider enabling auto-detection for better large file handling"
)

detected_format = None
col_set = {col.lower().strip() for col in columns}
if "name" in col_set and "value" in col_set and len(col_set) == 2:
detected_format = "name_value"
elif "datetime" in col_set or "timestamp" in col_set:
detected_format = "datetime_series"
elif any(f"m{i:02d}" in col_set for i in range(1, 13)):
detected_format = "monthly_data"

return {
"columns": columns,
"detected_format": detected_format,
"file_size_mb": round(file_size_mb, 2),
"is_large_file": is_large,
"auto_detect_enabled": auto_detect_enabled,
"sample_data": sample_data,
"recommendations": recommendations,
}


def check_columns(
table_columns: Iterable[str],
Expand Down
74 changes: 74 additions & 0 deletions tests/test_auto_detect_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Test auto-detect CSV functionality."""

import os
import tempfile
from collections.abc import Generator
from pathlib import Path

import pytest

from chronify import Store
from chronify.csv_utils import _should_use_auto_detect


@pytest.fixture
def csv_file() -> Generator[str, None, None]:
"""Create temporary CSV file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".csv", delete=False) as f:
f.write("timestamp,device,value\n")
f.write("2020-01-01 00:00,A,100\n")
f.write("2020-01-01 01:00,A,200\n")
csv_path = f.name

yield csv_path

Path(csv_path).unlink(missing_ok=True)


@pytest.fixture
def clean_env() -> Generator[None, None, None]:
"""Ensure clean environment variable state."""
yield
if "CHRONIFY_AUTO_DETECT_CSV" in os.environ:
del os.environ["CHRONIFY_AUTO_DETECT_CSV"]


def test_inspect_csv_default(csv_file: str) -> None:
"""Test CSV inspection without auto-detect."""
store = Store()
result = store.inspect_csv(csv_file)
assert "error" not in result


def test_inspect_csv_with_parameter(csv_file: str) -> None:
"""Test CSV inspection with auto-detect parameter."""
store = Store()
result = store.inspect_csv(csv_file, auto_detect=True)
assert "error" not in result


def test_inspect_csv_with_env_variable(csv_file: str, clean_env: None) -> None:
"""Test CSV inspection with environment variable."""
os.environ["CHRONIFY_AUTO_DETECT_CSV"] = "true"
store = Store()
result = store.inspect_csv(csv_file)
assert "error" not in result


def test_parameter_overrides_env(csv_file: str, clean_env: None) -> None:
"""Test parameter override of environment variable."""
os.environ["CHRONIFY_AUTO_DETECT_CSV"] = "true"
store = Store()
result = store.inspect_csv(csv_file, auto_detect=False)
assert "error" not in result


def test_should_use_auto_detect(clean_env: None) -> None:
"""Test auto-detect priority logic."""
assert _should_use_auto_detect(None) is False
assert _should_use_auto_detect(True) is True
assert _should_use_auto_detect(False) is False

os.environ["CHRONIFY_AUTO_DETECT_CSV"] = "true"
assert _should_use_auto_detect(None) is True
assert _should_use_auto_detect(False) is False
Loading