Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/chronify/sqlalchemy/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from chronify.exceptions import InvalidOperation, InvalidParameter
from chronify.time_configs import DatetimeRange, TimeBaseModel
from chronify.utils.path_utils import check_overwrite, delete_if_exists
from chronify.utils.path_utils import check_overwrite, delete_if_exists, to_path

# Copied from Pandas/Polars
DbWriteMode: TypeAlias = Literal["replace", "append", "fail"]
Expand Down Expand Up @@ -224,6 +224,7 @@ def write_query_to_parquet(
partition_columns: Optional[list[str]] = None,
) -> None:
"""Write the query to a Parquet file."""
output_file = to_path(output_file)
check_overwrite(output_file, overwrite)
match engine.name:
case "duckdb":
Expand Down
6 changes: 6 additions & 0 deletions src/chronify/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from chronify.time_series_mapper import map_time
from chronify.utils.path_utils import check_overwrite, to_path
from chronify.utils.sqlalchemy_view import create_view
from chronify.time import ResamplingOperationType


class Store:
Expand Down Expand Up @@ -845,6 +846,7 @@ def map_table_time_config(
dst_schema: TableSchema,
data_adjustment: Optional[TimeBasedDataAdjustment] = None,
wrap_time_allowed: bool = False,
resampling_operation: Optional[ResamplingOperationType] = None,
scratch_dir: Optional[Path] = None,
output_file: Optional[Path] = None,
check_mapped_timestamps: bool = False,
Expand All @@ -864,6 +866,9 @@ def map_table_time_config(
wrap_time_allowed
Defines whether the time column is allowed to be wrapped according to the time
config in dst_schema when it does not line up with the time config
resampling_operation
Defines the operation type for resampling when the time resolution in the source
data differs from the dst_schema
scratch_dir
Directory to use for temporary writes. Default to the system's tmp filesystem.
check_mapped_timestamps
Expand Down Expand Up @@ -929,6 +934,7 @@ def map_table_time_config(
dst_schema,
data_adjustment=data_adjustment,
wrap_time_allowed=wrap_time_allowed,
resampling_operation=resampling_operation,
scratch_dir=scratch_dir,
output_file=output_file,
check_mapped_timestamps=check_mapped_timestamps,
Expand Down
23 changes: 22 additions & 1 deletion src/chronify/time.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Definitions related to time"""

from enum import StrEnum
from typing import NamedTuple
from typing import NamedTuple, Union
from zoneinfo import ZoneInfo

from chronify.exceptions import InvalidParameter
Expand Down Expand Up @@ -114,6 +114,27 @@ class MeasurementType(StrEnum):
# description="Data values represent the sum of values in a time range",


class AggregationType(StrEnum):
"""Operation types for resampling / aggregation"""

SUM = "sum"
AVG = "avg"
MIN = "min"
MAX = "max"


class DisaggregationType(StrEnum):
"""Operation types for resampling / disaggregation"""

INTERPOLATE = "interpolate"
DUPLICATE_FFILL = "duplicate_ffill"
DUPLICATE_BFILL = "duplicate_bfill"
UNIFORM_DISAGGREGATE = "uniform_disaggregate"


ResamplingOperationType = Union[AggregationType, DisaggregationType]


class TimeZone(StrEnum):
"""Time zones"""

Expand Down
34 changes: 30 additions & 4 deletions src/chronify/time_series_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
TimeBasedDataAdjustment,
ColumnRepresentativeBase,
)
from chronify.time import ResamplingOperationType


def map_time(
Expand All @@ -26,6 +27,7 @@ def map_time(
to_schema: TableSchema,
data_adjustment: Optional[TimeBasedDataAdjustment] = None,
wrap_time_allowed: bool = False,
resampling_operation: Optional[ResamplingOperationType] = None,
scratch_dir: Optional[Path] = None,
output_file: Optional[Path] = None,
check_mapped_timestamps: bool = False,
Expand All @@ -35,7 +37,13 @@ def map_time(
to_schema.time_config, DatetimeRange
):
MapperRepresentativeTimeToDatetime(
engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed
engine,
metadata,
from_schema,
to_schema,
data_adjustment,
wrap_time_allowed,
resampling_operation,
).map_time(
scratch_dir=scratch_dir,
output_file=output_file,
Expand All @@ -45,7 +53,13 @@ def map_time(
to_schema.time_config, DatetimeRange
):
MapperDatetimeToDatetime(
engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed
engine,
metadata,
from_schema,
to_schema,
data_adjustment,
wrap_time_allowed,
resampling_operation,
).map_time(
scratch_dir=scratch_dir,
output_file=output_file,
Expand All @@ -55,7 +69,13 @@ def map_time(
to_schema.time_config, DatetimeRange
):
MapperIndexTimeToDatetime(
engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed
engine,
metadata,
from_schema,
to_schema,
data_adjustment,
wrap_time_allowed,
resampling_operation,
).map_time(
scratch_dir=scratch_dir,
output_file=output_file,
Expand All @@ -67,7 +87,13 @@ def map_time(
# No way to generate expected timestamps for YearMonthDayPeriodTimeNTZ
# Is there a way to only check the output datetime timestamps?
MapperColumnRepresentativeToDatetime(
engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed
engine,
metadata,
from_schema,
to_schema,
data_adjustment,
wrap_time_allowed,
resampling_operation,
).map_time(
scratch_dir=scratch_dir,
output_file=output_file,
Expand Down
58 changes: 51 additions & 7 deletions src/chronify/time_series_mapper_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from functools import reduce
from operator import and_
from pathlib import Path
from typing import Optional
from typing import Any, Optional

import pandas as pd
from loguru import logger
from sqlalchemy import Engine, MetaData, Table, select, text
from sqlalchemy import Engine, MetaData, Table, select, text, func
from chronify.hive_functions import create_materialized_view

from chronify.sqlalchemy.functions import (
Expand All @@ -18,8 +18,9 @@
from chronify.exceptions import ConflictingInputsError
from chronify.utils.sqlalchemy_table import create_table
from chronify.time_series_checker import check_timestamps
from chronify.time import TimeIntervalType
from chronify.time import TimeIntervalType, ResamplingOperationType, AggregationType
from chronify.time_configs import TimeBasedDataAdjustment
from chronify.utils.path_utils import to_path


class TimeSeriesMapperBase(abc.ABC):
Expand All @@ -33,6 +34,7 @@ def __init__(
to_schema: TableSchema,
data_adjustment: Optional[TimeBasedDataAdjustment] = None,
wrap_time_allowed: bool = False,
resampling_operation: Optional[ResamplingOperationType] = None,
) -> None:
self._engine = engine
self._metadata = metadata
Expand All @@ -45,6 +47,7 @@ def __init__(
self._from_schema.time_config.interval_type
!= self._to_schema.time_config.interval_type
)
self._resampling_operation = resampling_operation

@abc.abstractmethod
def check_schema_consistency(self) -> None:
Expand Down Expand Up @@ -91,6 +94,7 @@ def apply_mapping(
engine: Engine,
metadata: MetaData,
data_adjustment: TimeBasedDataAdjustment,
resampling_operation: Optional[ResamplingOperationType] = None,
scratch_dir: Optional[Path] = None,
output_file: Optional[Path] = None,
check_mapped_timestamps: bool = False,
Expand All @@ -116,18 +120,25 @@ def apply_mapping(
to_schema,
engine,
metadata,
scratch_dir,
output_file,
resampling_operation=resampling_operation,
scratch_dir=scratch_dir,
output_file=output_file,
)
if check_mapped_timestamps:
if output_file is not None:
output_file = to_path(output_file)
with engine.begin() as conn:
create_view_from_parquet(conn, to_schema.name, output_file)
metadata.reflect(engine, views=True)
created_tmp_view = True
mapped_table = Table(to_schema.name, metadata)
with engine.connect() as conn:
try:
# TODO <---
# if resampling_operation:
# with engine.connect() as conn:
# df = read_database(f"select * from {mapped_table.name}", conn, to_schema.time_config)
# breakpoint()
check_timestamps(
conn,
mapped_table,
Expand Down Expand Up @@ -161,6 +172,7 @@ def _apply_mapping(
to_schema: TableSchema,
engine: Engine,
metadata: MetaData,
resampling_operation: Optional[ResamplingOperationType] = None,
scratch_dir: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
Expand All @@ -175,13 +187,34 @@ def _apply_mapping(
set(from_schema.list_columns())
)

val_col = to_schema.value_column # from left_table
final_cols = set(to_schema.list_columns()).union(left_table_pass_thru_columns)
right_cols = set(right_table_columns).intersection(final_cols)
left_cols = final_cols - right_cols
left_cols = final_cols - right_cols - {val_col}

select_stmt = [left_table.c[x] for x in left_cols]
select_stmt: list[Any] = [left_table.c[x] for x in left_cols]
select_stmt += [right_table.c[x] for x in right_cols]

tval_col = left_table.c[val_col]
if "factor" in right_table_columns:
tval_col *= right_table.c["factor"] # type: ignore
if not resampling_operation:
select_stmt.append(tval_col)
else:
groupby_stmt = select_stmt.copy()
match resampling_operation:
case AggregationType.SUM:
select_stmt.append(func.sum(tval_col).label(val_col))
case AggregationType.AVG:
select_stmt.append(func.avg(tval_col).label(val_col))
case AggregationType.MIN:
select_stmt.append(func.min(tval_col).label(val_col))
case AggregationType.MAX:
select_stmt.append(func.max(tval_col).label(val_col))
case _:
msg = f"Unsupported {resampling_operation=}"
raise ValueError(msg)

keys = from_schema.time_config.list_time_columns()
# check time_zone
tz_col = from_schema.time_config.get_time_zone_column()
Expand All @@ -195,8 +228,19 @@ def _apply_mapping(

on_stmt = reduce(and_, (left_table.c[x] == right_table.c["from_" + x] for x in keys))
query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt)
if resampling_operation:
query = query.group_by(*groupby_stmt)

# # TODO <---
# from chronify.sqlalchemy.functions import read_database
# with engine.connect() as conn:
# df_map = read_database(f"select * from {mapping_table_name}", conn, to_schema.time_config)
# dfi = read_database(f"select * from {from_schema.name}", conn, from_schema.time_config)
# df = read_database(query, conn, to_schema.time_config)
# breakpoint()

if output_file is not None:
output_file = to_path(output_file)
write_query_to_parquet(engine, str(query), output_file, overwrite=True)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from chronify.models import MappingTableSchema, TableSchema
from chronify.sqlalchemy.functions import read_database, write_database
from chronify.utils.sqlalchemy_table import create_table
from chronify.time import ResamplingOperationType


class MapperColumnRepresentativeToDatetime(TimeSeriesMapperBase):
Expand Down Expand Up @@ -57,9 +58,16 @@ def __init__(
to_schema: TableSchema,
data_adjustment: Optional[TimeBasedDataAdjustment] = None,
wrap_time_allowed: bool = False,
resampling_operation: Optional[ResamplingOperationType] = None,
) -> None:
super().__init__(
engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed
engine,
metadata,
from_schema,
to_schema,
data_adjustment,
wrap_time_allowed,
resampling_operation,
)

if not isinstance(to_schema.time_config, DatetimeRange):
Expand Down Expand Up @@ -105,9 +113,10 @@ def map_time(
self._engine,
self._metadata,
self._data_adjustment,
scratch_dir,
output_file,
check_mapped_timestamps,
resampling_operation=self._resampling_operation,
scratch_dir=scratch_dir,
output_file=output_file,
check_mapped_timestamps=check_mapped_timestamps,
)

if drop_table:
Expand Down
Loading
Loading