diff --git a/src/chronify/sqlalchemy/functions.py b/src/chronify/sqlalchemy/functions.py index 32e690c..2ca9fc6 100644 --- a/src/chronify/sqlalchemy/functions.py +++ b/src/chronify/sqlalchemy/functions.py @@ -17,7 +17,7 @@ from chronify.exceptions import InvalidOperation, InvalidParameter from chronify.time_configs import DatetimeRange, TimeBaseModel -from chronify.utils.path_utils import check_overwrite, delete_if_exists +from chronify.utils.path_utils import check_overwrite, delete_if_exists, to_path # Copied from Pandas/Polars DbWriteMode: TypeAlias = Literal["replace", "append", "fail"] @@ -224,6 +224,7 @@ def write_query_to_parquet( partition_columns: Optional[list[str]] = None, ) -> None: """Write the query to a Parquet file.""" + output_file = to_path(output_file) check_overwrite(output_file, overwrite) match engine.name: case "duckdb": diff --git a/src/chronify/store.py b/src/chronify/store.py index cbf82fd..15bcb67 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -51,6 +51,7 @@ from chronify.time_series_mapper import map_time from chronify.utils.path_utils import check_overwrite, to_path from chronify.utils.sqlalchemy_view import create_view +from chronify.time import ResamplingOperationType class Store: @@ -845,6 +846,7 @@ def map_table_time_config( dst_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, check_mapped_timestamps: bool = False, @@ -864,6 +866,9 @@ def map_table_time_config( wrap_time_allowed Defines whether the time column is allowed to be wrapped according to the time config in dst_schema when it does not line up with the time config + resampling_operation + Defines the operation type for resampling when the time resolution in the source + data differs from the dst_schema scratch_dir Directory to use for temporary writes. Default to the system's tmp filesystem. check_mapped_timestamps @@ -929,6 +934,7 @@ def map_table_time_config( dst_schema, data_adjustment=data_adjustment, wrap_time_allowed=wrap_time_allowed, + resampling_operation=resampling_operation, scratch_dir=scratch_dir, output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, diff --git a/src/chronify/time.py b/src/chronify/time.py index ab57b86..74a9164 100644 --- a/src/chronify/time.py +++ b/src/chronify/time.py @@ -1,7 +1,7 @@ """Definitions related to time""" from enum import StrEnum -from typing import NamedTuple +from typing import NamedTuple, Union from zoneinfo import ZoneInfo from chronify.exceptions import InvalidParameter @@ -114,6 +114,27 @@ class MeasurementType(StrEnum): # description="Data values represent the sum of values in a time range", +class AggregationType(StrEnum): + """Operation types for resampling / aggregation""" + + SUM = "sum" + AVG = "avg" + MIN = "min" + MAX = "max" + + +class DisaggregationType(StrEnum): + """Operation types for resampling / disaggregation""" + + INTERPOLATE = "interpolate" + DUPLICATE_FFILL = "duplicate_ffill" + DUPLICATE_BFILL = "duplicate_bfill" + UNIFORM_DISAGGREGATE = "uniform_disaggregate" + + +ResamplingOperationType = Union[AggregationType, DisaggregationType] + + class TimeZone(StrEnum): """Time zones""" diff --git a/src/chronify/time_series_mapper.py b/src/chronify/time_series_mapper.py index a03a466..cbf10ca 100644 --- a/src/chronify/time_series_mapper.py +++ b/src/chronify/time_series_mapper.py @@ -17,6 +17,7 @@ TimeBasedDataAdjustment, ColumnRepresentativeBase, ) +from chronify.time import ResamplingOperationType def map_time( @@ -26,6 +27,7 @@ def map_time( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, check_mapped_timestamps: bool = False, @@ -35,7 +37,13 @@ def map_time( to_schema.time_config, DatetimeRange ): MapperRepresentativeTimeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, @@ -45,7 +53,13 @@ def map_time( to_schema.time_config, DatetimeRange ): MapperDatetimeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, @@ -55,7 +69,13 @@ def map_time( to_schema.time_config, DatetimeRange ): MapperIndexTimeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, @@ -67,7 +87,13 @@ def map_time( # No way to generate expected timestamps for YearMonthDayPeriodTimeNTZ # Is there a way to only check the output datetime timestamps? MapperColumnRepresentativeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index 5b36339..af3edd2 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -2,11 +2,11 @@ from functools import reduce from operator import and_ from pathlib import Path -from typing import Optional +from typing import Any, Optional import pandas as pd from loguru import logger -from sqlalchemy import Engine, MetaData, Table, select, text +from sqlalchemy import Engine, MetaData, Table, select, text, func from chronify.hive_functions import create_materialized_view from chronify.sqlalchemy.functions import ( @@ -18,8 +18,9 @@ from chronify.exceptions import ConflictingInputsError from chronify.utils.sqlalchemy_table import create_table from chronify.time_series_checker import check_timestamps -from chronify.time import TimeIntervalType +from chronify.time import TimeIntervalType, ResamplingOperationType, AggregationType from chronify.time_configs import TimeBasedDataAdjustment +from chronify.utils.path_utils import to_path class TimeSeriesMapperBase(abc.ABC): @@ -33,6 +34,7 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: self._engine = engine self._metadata = metadata @@ -45,6 +47,7 @@ def __init__( self._from_schema.time_config.interval_type != self._to_schema.time_config.interval_type ) + self._resampling_operation = resampling_operation @abc.abstractmethod def check_schema_consistency(self) -> None: @@ -91,6 +94,7 @@ def apply_mapping( engine: Engine, metadata: MetaData, data_adjustment: TimeBasedDataAdjustment, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, check_mapped_timestamps: bool = False, @@ -116,11 +120,13 @@ def apply_mapping( to_schema, engine, metadata, - scratch_dir, - output_file, + resampling_operation=resampling_operation, + scratch_dir=scratch_dir, + output_file=output_file, ) if check_mapped_timestamps: if output_file is not None: + output_file = to_path(output_file) with engine.begin() as conn: create_view_from_parquet(conn, to_schema.name, output_file) metadata.reflect(engine, views=True) @@ -128,6 +134,11 @@ def apply_mapping( mapped_table = Table(to_schema.name, metadata) with engine.connect() as conn: try: + # TODO <--- + # if resampling_operation: + # with engine.connect() as conn: + # df = read_database(f"select * from {mapped_table.name}", conn, to_schema.time_config) + # breakpoint() check_timestamps( conn, mapped_table, @@ -161,6 +172,7 @@ def _apply_mapping( to_schema: TableSchema, engine: Engine, metadata: MetaData, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, ) -> None: @@ -175,13 +187,34 @@ def _apply_mapping( set(from_schema.list_columns()) ) + val_col = to_schema.value_column # from left_table final_cols = set(to_schema.list_columns()).union(left_table_pass_thru_columns) right_cols = set(right_table_columns).intersection(final_cols) - left_cols = final_cols - right_cols + left_cols = final_cols - right_cols - {val_col} - select_stmt = [left_table.c[x] for x in left_cols] + select_stmt: list[Any] = [left_table.c[x] for x in left_cols] select_stmt += [right_table.c[x] for x in right_cols] + tval_col = left_table.c[val_col] + if "factor" in right_table_columns: + tval_col *= right_table.c["factor"] # type: ignore + if not resampling_operation: + select_stmt.append(tval_col) + else: + groupby_stmt = select_stmt.copy() + match resampling_operation: + case AggregationType.SUM: + select_stmt.append(func.sum(tval_col).label(val_col)) + case AggregationType.AVG: + select_stmt.append(func.avg(tval_col).label(val_col)) + case AggregationType.MIN: + select_stmt.append(func.min(tval_col).label(val_col)) + case AggregationType.MAX: + select_stmt.append(func.max(tval_col).label(val_col)) + case _: + msg = f"Unsupported {resampling_operation=}" + raise ValueError(msg) + keys = from_schema.time_config.list_time_columns() # check time_zone tz_col = from_schema.time_config.get_time_zone_column() @@ -195,8 +228,19 @@ def _apply_mapping( on_stmt = reduce(and_, (left_table.c[x] == right_table.c["from_" + x] for x in keys)) query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt) + if resampling_operation: + query = query.group_by(*groupby_stmt) + + # # TODO <--- + # from chronify.sqlalchemy.functions import read_database + # with engine.connect() as conn: + # df_map = read_database(f"select * from {mapping_table_name}", conn, to_schema.time_config) + # dfi = read_database(f"select * from {from_schema.name}", conn, from_schema.time_config) + # df = read_database(query, conn, to_schema.time_config) + # breakpoint() if output_file is not None: + output_file = to_path(output_file) write_query_to_parquet(engine, str(query), output_file, overwrite=True) return diff --git a/src/chronify/time_series_mapper_column_representative_to_datetime.py b/src/chronify/time_series_mapper_column_representative_to_datetime.py index 18ed8cb..5d03015 100644 --- a/src/chronify/time_series_mapper_column_representative_to_datetime.py +++ b/src/chronify/time_series_mapper_column_representative_to_datetime.py @@ -20,6 +20,7 @@ from chronify.models import MappingTableSchema, TableSchema from chronify.sqlalchemy.functions import read_database, write_database from chronify.utils.sqlalchemy_table import create_table +from chronify.time import ResamplingOperationType class MapperColumnRepresentativeToDatetime(TimeSeriesMapperBase): @@ -57,9 +58,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) if not isinstance(to_schema.time_config, DatetimeRange): @@ -105,9 +113,10 @@ def map_time( self._engine, self._metadata, self._data_adjustment, - scratch_dir, - output_file, - check_mapped_timestamps, + resampling_operation=self._resampling_operation, + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, ) if drop_table: diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 57103b2..a91cbe2 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Optional from datetime import datetime +import numpy as np import pandas as pd from sqlalchemy import Engine, MetaData @@ -11,7 +12,13 @@ from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping from chronify.time_configs import DatetimeRange, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator -from chronify.time_utils import roll_time_interval, wrap_timestamps +from chronify.time_utils import roll_time_interval, wrap_timestamps, get_standard_time_zone +from chronify.time import ( + ResamplingOperationType, + AggregationType, + DisaggregationType, + LeapDayAdjustmentType, +) logger = logging.getLogger(__name__) @@ -25,9 +32,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) if self._from_schema == self._to_schema and self._data_adjustment is None: msg = f"from_schema is the same as to_schema and no data_adjustment, nothing to do.\n{self._from_schema}" @@ -40,26 +54,65 @@ def __init__( raise InvalidParameter(msg) self._from_time_config: DatetimeRange = self._from_schema.time_config self._to_time_config: DatetimeRange = self._to_schema.time_config + self._resampling_type = self.get_resampling_type() + + def get_resampling_type(self) -> Optional[str]: + if self._from_time_config.resolution < self._to_time_config.resolution: + return "aggregation" + if self._from_time_config.resolution > self._to_time_config.resolution: + return "disaggregation" + return None def check_schema_consistency(self) -> None: """Check that from_schema can produce to_schema.""" self._check_table_columns_producibility() self._check_measurement_type_consistency() self._check_time_interval_type() - self._check_time_resolution() - self._check_time_length() - - def _check_time_resolution(self) -> None: - if self._from_time_config.resolution != self._to_time_config.resolution: - msg = "Handling of changing time resolution is not supported yet." - raise NotImplementedError(msg) + self._check_time_resolution_and_length() + self._check_resampling_consistency() - def _check_time_length(self) -> None: + def _check_time_resolution_and_length(self) -> None: + """Check that from_schema time resolution and length can produce to_schema counterparts.""" flen, tlen = self._from_time_config.length, self._to_time_config.length - if flen != tlen and not self._wrap_time_allowed: - msg = f"Length must match between {self._from_schema.__class__} from_schema and {self._to_schema.__class__} to_schema. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + fres, tres = self._from_time_config.resolution, self._to_time_config.resolution + if fres == tres and flen != tlen and not self._wrap_time_allowed: + msg = f"For unchanging time resolution, length must match between from_time_config and to_time_config. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + raise ConflictingInputsError(msg) + + # Resolution must be mutiples of each other + if fres < tres and tres.total_seconds() % fres.total_seconds() > 0: + msg = f"For aggregation, the time resolution in to_time_config must be a multiple of that in from_time_config. {tres} vs {fres}" + if fres > tres and fres.total_seconds() % tres.total_seconds() > 0: + msg = f"For disaggregation, the time resolution in from_time_config must be a multiple of that in to_time_config. {fres} vs {tres}" + + # No extrapolation allowed + if flen * fres.total_seconds() < tlen * tres.total_seconds(): + msg = "The product of time resolution and length in from_time_config cannot be greater than that in to_time_config." raise ConflictingInputsError(msg) + def _check_resampling_consistency(self) -> None: + """Check resampling operation type is consistent with time resolution inputs.""" + if self._resampling_type is None and self._resampling_operation is not None: + msg = f"For unchanging time resolution, {self._resampling_operation=} must be set to None." + raise ConflictingInputsError(msg) + + for typ, opt in [("aggregation", AggregationType), ("disaggregation", DisaggregationType)]: + if self._resampling_type == typ and not isinstance(self._resampling_operation, opt): + msg = f"{typ} detected, {self._resampling_operation=} must be set to an option from {opt}" + raise ConflictingInputsError(msg) + + def _create_intermediate_schema(self) -> TableSchema: + """Create the intermediate table schema for all time processing except resampling, + by producing a version of to_schema with the same time resolution and length as from_schema + """ + schema_kwargs = self._to_schema.model_dump() + schema_kwargs["time_config"]["resolution"] = self._from_time_config.resolution + schema_kwargs["time_config"]["length"] = self._from_time_config.length + schema_kwargs["name"] += "_intermediate" + schema = TableSchema(**schema_kwargs) + + return schema + def map_time( self, scratch_dir: Optional[Path] = None, @@ -67,58 +120,98 @@ def map_time( check_mapped_timestamps: bool = False, ) -> None: """Convert time columns with from_schema to to_schema configuration.""" + if self._resampling_type: + to_schema = self._create_intermediate_schema() + to_time_config = to_schema.time_config + else: + to_schema = self._to_schema + to_time_config = self._to_time_config + self.check_schema_consistency() - df, mapping_schema = self._create_mapping() + df, mapping_schema = self._create_mapping(to_time_config) apply_mapping( df, mapping_schema, self._from_schema, - self._to_schema, + to_schema, self._engine, self._metadata, self._data_adjustment, + resampling_operation=None, scratch_dir=scratch_dir, - output_file=output_file, + output_file=None if self._resampling_type else output_file, check_mapped_timestamps=check_mapped_timestamps, ) + + if self._resampling_type: + if self._resampling_type == "aggregation": + df, mapping_schema = self._create_aggregation_mapping( + to_time_config, self._to_time_config, self._data_adjustment.leap_day_adjustment + ) + resampling_operation = self._resampling_operation + + else: + df, resampling_operation, mapping_schema = self._create_disaggregation_mapping( + to_time_config, + self._to_time_config, + self._data_adjustment.leap_day_adjustment, + self._resampling_operation, + ) + apply_mapping( + df, + mapping_schema, + to_schema, + self._to_schema, + self._engine, + self._metadata, + TimeBasedDataAdjustment(), + resampling_operation=resampling_operation, + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) # TODO - add handling for changing resolution - Issue #30 - def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: + def _create_mapping( + self, to_time_config: DatetimeRange + ) -> tuple[pd.DataFrame, MappingTableSchema]: """Create mapping dataframe - Handles time interval type + Handles time interval type but not resolution """ from_time_col = "from_" + self._from_time_config.time_column - to_time_col = self._to_time_config.time_column + to_time_col = to_time_config.time_column from_time_data = make_time_range_generator(self._from_time_config).list_timestamps() to_time_data = make_time_range_generator( - self._to_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment + to_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment ).list_timestamps() ser_from = pd.Series(from_time_data) # If from_tz or to_tz is naive, use tz_localize fm_tz = self._from_time_config.start.tzinfo - to_tz = self._to_time_config.start.tzinfo + to_tz = to_time_config.start.tzinfo match (fm_tz is None, to_tz is None): case (True, False): # get standard time zone of to_tz - year = self._to_time_config.start.year - to_tz_std = datetime(year=year, month=1, day=1, tzinfo=to_tz).tzname() + to_tz_std = get_standard_time_zone(to_tz) # tz-naive time does not have skips/dups, so always localize in std tz first ser_from = ser_from.dt.tz_localize(to_tz_std).dt.tz_convert(to_tz) case (False, True): - ser_from = ser_from.dt.tz_localize(to_tz) + # get standard time zone of fm_tz + fm_tz_std = get_standard_time_zone(fm_tz) + # convert to standard time zone of fm_tz before making it tz-naive + ser_from = ser_from.dt.tz_convert(fm_tz_std).dt.tz_localize(to_tz) match (self._adjust_interval, self._wrap_time_allowed): case (True, _): ser = roll_time_interval( ser_from, self._from_time_config.interval_type, - self._to_time_config.interval_type, + to_time_config.interval_type, to_time_data, ) case (False, True): ser = wrap_timestamps(ser_from, to_time_data) case (False, False): - ser = pd.Series(to_time_data) + ser = ser_from df = pd.DataFrame( { @@ -126,9 +219,8 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: to_time_col: ser, } ) - assert ( - df[to_time_col].nunique() == self._to_time_config.length + df[to_time_col].nunique() == to_time_config.length ), "to_time_col does not have the right number of timestamps" from_time_config = self._from_time_config.model_copy() from_time_config.time_column = from_time_col @@ -136,7 +228,125 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: name="mapping_table", time_configs=[ from_time_config, - self._to_time_config, + to_time_config, + ], + ) + return df, mapping_schema + + @staticmethod + def _create_aggregation_mapping( + from_time_config: DatetimeRange, + to_time_config: DatetimeRange, + leap_day_adjustment: LeapDayAdjustmentType, + ) -> tuple[pd.DataFrame, MappingTableSchema]: + """Create mapping dataframe for aggregation""" + from_time_col = "from_" + from_time_config.time_column + to_time_col = to_time_config.time_column + from_time_data = make_time_range_generator( + from_time_config, leap_day_adjustment=leap_day_adjustment + ).list_timestamps() + to_time_data = make_time_range_generator( + to_time_config, leap_day_adjustment=leap_day_adjustment + ).list_timestamps() + df = pd.Series(from_time_data).rename(from_time_col).to_frame() + df = df.join( + pd.Series(to_time_data, index=to_time_data).rename(to_time_col), on=from_time_col + ) + limit = to_time_config.resolution / from_time_config.resolution - 1 + assert (limit % 1 == 0) and (limit > 0), f"limit must be an integer, {limit}" + limit = int(limit) + df[to_time_col] = df[to_time_col].ffill(limit=int(limit)) + + # mapping schema + from_time_config = from_time_config.model_copy() + from_time_config.time_column = from_time_col + mapping_schema = MappingTableSchema( + name="aggregation_table", + time_configs=[ + from_time_config, + to_time_config, ], ) return df, mapping_schema + + @staticmethod + def _create_disaggregation_mapping( + from_time_config: DatetimeRange, + to_time_config: DatetimeRange, + leap_day_adjustment: LeapDayAdjustmentType, + resampling_operation: DisaggregationType, + ) -> tuple[pd.DataFrame, AggregationType, MappingTableSchema]: + """Create mapping dataframe for disaggregation""" + from_time_col = "from_" + from_time_config.time_column + to_time_col = to_time_config.time_column + from_time_data = make_time_range_generator( + from_time_config, leap_day_adjustment=leap_day_adjustment + ).list_timestamps() + to_time_data = make_time_range_generator( + to_time_config, leap_day_adjustment=leap_day_adjustment + ).list_timestamps() + + df = pd.Series(to_time_data).rename(to_time_col).to_frame() + df = df.join( + pd.Series(from_time_data, index=from_time_data).rename(from_time_col), on=to_time_col + ) + limit = from_time_config.resolution / to_time_config.resolution - 1 + assert (limit % 1 == 0) and (limit > 0), f"limit must be an integer, {limit}" + limit = int(limit) + + match resampling_operation: + case DisaggregationType.DUPLICATE_FFILL: + df[from_time_col] = df[from_time_col].ffill(limit=limit) + # floor: cap rows below the from_time_config start time at start time + if df[from_time_col].isna().sum() > 0: + assert df[from_time_col].isna().sum() <= limit + df[from_time_col] = df[from_time_col].bfill(limit=limit) + aggregation_operation = None + case DisaggregationType.DUPLICATE_BFILL: + df[from_time_col] = df[from_time_col].bfill(limit=limit) + # ceiling: cap rows beyond the from_time_config end time at end time + if df[from_time_col].isna().sum() > 0: + assert df[from_time_col].isna().sum() <= limit + df[from_time_col] = df[from_time_col].ffill(limit=limit) + aggregation_operation = None + case DisaggregationType.INTERPOLATE: + df.loc[~df[from_time_col].isna(), "factor"] = 1 + df["lb"] = df[from_time_col].ffill(limit=limit).where(df[from_time_col].isna()) + df["lb_factor"] = 1 + (df["lb"] - df[to_time_col]).div(from_time_config.resolution) + df["ub"] = df[from_time_col].bfill(limit=limit).where(df[from_time_col].isna()) + df["ub_factor"] = 1 + (df[to_time_col] - df["ub"]).div(from_time_config.resolution) + # capping: if a row do not have both lb and ub, cannot interpolate, set factor to 1 + for fact_col in ["lb_factor", "ub_factor"]: + cond = ~(df[fact_col].where(df["lb"].isna() | df["ub"].isna()).isna()) + df.loc[cond, fact_col] = 1 + lst = [] + for ts_col, fact_col in zip( + [from_time_col, "lb", "ub"], ["factor", "lb_factor", "ub_factor"] + ): + lst.append( + df.loc[~df[ts_col].isna(), [to_time_col, ts_col, fact_col]].rename( + columns={ts_col: from_time_col, fact_col: "factor"} + ) + ) + df = pd.concat(lst).sort_values(by=[to_time_col], ignore_index=True) + assert df.groupby(to_time_col)["factor"].sum().unique().round(3) == np.array([1]) + aggregation_operation = AggregationType.SUM + case DisaggregationType.UNIFORM_DISAGGREGATE: + df[from_time_col] = df[from_time_col].ffill(limit=int(limit)) + df["factor"] = to_time_config.resolution / from_time_config.resolution + aggregation_operation = AggregationType.SUM + case _: + msg = f"Unsupported disaggregation {resampling_operation=}" + raise ValueError(msg) + + # mapping schema + from_time_config = from_time_config.model_copy() + from_time_config.time_column = from_time_col + mapping_schema = MappingTableSchema( + name="disaggregation_table", + time_configs=[ + from_time_config, + to_time_config, + ], + ) + return df, aggregation_operation, mapping_schema diff --git a/src/chronify/time_series_mapper_index_time.py b/src/chronify/time_series_mapper_index_time.py index 748e70b..bbcbd3e 100644 --- a/src/chronify/time_series_mapper_index_time.py +++ b/src/chronify/time_series_mapper_index_time.py @@ -17,7 +17,7 @@ ) from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_mapper_datetime import MapperDatetimeToDatetime -from chronify.time import TimeType, DaylightSavingAdjustmentType +from chronify.time import TimeType, DaylightSavingAdjustmentType, ResamplingOperationType from chronify.sqlalchemy.functions import read_database logger = logging.getLogger(__name__) @@ -32,9 +32,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) self._dst_adjustment = self._data_adjustment.daylight_saving_adjustment if not isinstance(self._from_schema.time_config, IndexTimeRangeBase): @@ -62,7 +69,7 @@ def _check_time_resolution(self) -> None: def _check_time_length(self) -> None: flen, tlen = self._from_time_config.length, self._to_time_config.length if flen != tlen and not self._wrap_time_allowed: - msg = f"Length must match between {self._from_schema.__class__} from_schema and {self._to_schema.__class__} to_schema. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + msg = f"Length must match between from_time_config and to_time_config. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" raise ConflictingInputsError(msg) def map_time( @@ -109,7 +116,9 @@ def map_time( self._engine, self._metadata, TimeBasedDataAdjustment(), + resampling_operation=None, scratch_dir=scratch_dir, + output_file=None, check_mapped_timestamps=False, ) @@ -121,6 +130,7 @@ def map_time( self._to_schema, self._data_adjustment, self._wrap_time_allowed, + resampling_operation=self._resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, diff --git a/src/chronify/time_series_mapper_representative.py b/src/chronify/time_series_mapper_representative.py index 7670e5b..d165ad1 100644 --- a/src/chronify/time_series_mapper_representative.py +++ b/src/chronify/time_series_mapper_representative.py @@ -20,6 +20,7 @@ TimeBasedDataAdjustment, ) from chronify.time_utils import shift_time_interval +from chronify.time import ResamplingOperationType logger = logging.getLogger(__name__) @@ -33,9 +34,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) if not isinstance(from_schema.time_config, RepresentativePeriodTimeBase): msg = "source schema does not have RepresentativePeriodTimeBase time config. Use a different mapper." @@ -72,6 +80,7 @@ def map_time( self._engine, self._metadata, self._data_adjustment, + resampling_operation=self._resampling_operation, scratch_dir=scratch_dir, output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, diff --git a/src/chronify/time_utils.py b/src/chronify/time_utils.py index c55e7a6..bb7c6a4 100644 --- a/src/chronify/time_utils.py +++ b/src/chronify/time_utils.py @@ -2,7 +2,8 @@ import logging import numpy as np -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone, tzinfo +import zoneinfo import pandas as pd from chronify.time import ( @@ -143,3 +144,17 @@ def roll_time_interval( ser = shift_time_interval(ser, from_interval_type, to_interval_type) ser = wrap_timestamps(ser, to_timestamps) return ser + + +def get_standard_time_zone(tz: tzinfo | None) -> tzinfo | None: + ts = datetime(year=2020, month=1, day=1, tzinfo=tz) + std_tz_name = ts.tzname() + if not std_tz_name: + return None + try: + return zoneinfo.ZoneInfo(std_tz_name) + except zoneinfo.ZoneInfoNotFoundError: + utcoffset = ts.utcoffset() + if not utcoffset: + return None + return timezone(utcoffset) diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py index ba9fa00..a65671b 100644 --- a/tests/test_mapper_datetime_to_datetime.py +++ b/tests/test_mapper_datetime_to_datetime.py @@ -188,9 +188,9 @@ def test_time_interval_shift_different_time_ranges( @pytest.mark.parametrize( "tzinfo_tuple", [ - # (ZoneInfo("US/Eastern"), None), + (ZoneInfo("US/Eastern"), None), (None, ZoneInfo("EST")), - # (ZoneInfo("US/Eastern"), ZoneInfo("US/Mountain")), + (ZoneInfo("US/Eastern"), ZoneInfo("US/Mountain")), ], ) def test_time_shift_different_timezones( diff --git a/tests/test_mapper_index_time_to_datetime.py b/tests/test_mapper_index_time_to_datetime.py index 735d834..d1283c3 100644 --- a/tests/test_mapper_index_time_to_datetime.py +++ b/tests/test_mapper_index_time_to_datetime.py @@ -218,7 +218,7 @@ def test_unaligned_time_mapping_without_wrap_time(iter_engines: Engine) -> None: src_df, src_schema, dst_schema = data_for_unaligned_time_mapping() error = ( ConflictingInputsError, - "Length must match between", + "For unchanging time resolution, length must match between", ) run_test( iter_engines, diff --git a/tests/test_store.py b/tests/test_store.py index 9c5b0cc..1ae805f 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -33,7 +33,12 @@ ) from chronify.models import ColumnDType, CsvTableSchema, PivotedTableSchema, TableSchema from chronify.store import Store -from chronify.time import TimeIntervalType, DaylightSavingAdjustmentType +from chronify.time import ( + TimeIntervalType, + DaylightSavingAdjustmentType, + AggregationType, + DisaggregationType, +) from chronify.time_configs import DatetimeRange, IndexTimeRangeLocalTime, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_checker import check_timestamp_lists @@ -506,6 +511,124 @@ def test_map_datetime_to_datetime( check_timestamp_lists(actual, expected) +@pytest.mark.parametrize("tzinfo", [ZoneInfo("EST")]) # [ZoneInfo("EST"), None]) +@pytest.mark.parametrize( + "params", + [ + (timedelta(hours=1), timedelta(hours=24), AggregationType.SUM), + (timedelta(hours=1), timedelta(hours=24), AggregationType.AVG), + (timedelta(hours=1), timedelta(hours=24), AggregationType.MIN), + (timedelta(hours=1), timedelta(hours=24), AggregationType.MAX), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.DUPLICATE_FFILL), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.DUPLICATE_BFILL), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.UNIFORM_DISAGGREGATE), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.INTERPOLATE), + ], +) +def test_map_datetime_to_datetime_with_resampling( # noqa: C901 + tmp_path, iter_stores_by_engine_no_data_ingestion: Store, tzinfo, params +): + fm_res, to_res, operation = params + store = iter_stores_by_engine_no_data_ingestion + year = 2020 + start = datetime(year=year, month=1, day=1, hour=0, tzinfo=tzinfo) + end = datetime(year=year + 1, month=1, day=1, hour=0, tzinfo=tzinfo) + fm_time_array_len = (end - start) / fm_res + to_time_array_len = (end - start) / to_res + + src_time_config = DatetimeRange( + start=start, + resolution=fm_res, + length=fm_time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + dst_time_config = DatetimeRange( + start=start, + resolution=to_res, + length=to_time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + + src_csv_schema = CsvTableSchema( + time_config=src_time_config, + column_dtypes=[ + ColumnDType(name="timestamp", dtype=DateTime(timezone=False)), + ColumnDType(name="gen1", dtype=Double()), + ColumnDType(name="gen2", dtype=Double()), + ColumnDType(name="gen3", dtype=Double()), + ], + value_columns=["gen1", "gen2", "gen3"], + pivoted_dimension_name="generator", + time_array_id_columns=[], + ) + dst_schema = TableSchema( + name="generators_pe", + time_config=dst_time_config, + time_array_id_columns=["generator"], + value_column="value", + ) + rel = read_csv(GENERATOR_TIME_SERIES_FILE, src_csv_schema) + rel2 = unpivot(rel, ("gen1", "gen2", "gen3"), "generator", "value") # noqa: F841 + + src_schema = TableSchema( + name="generators_pb", + time_config=src_time_config, + time_array_id_columns=["generator"], + value_column="value", + ) + if store.engine.name == "hive": + out_file = tmp_path / "data.parquet" + rel2.to_df().to_parquet(out_file) + store.create_view_from_parquet(out_file, src_schema) + else: + store.ingest_table(rel2, src_schema) + + if tzinfo is None and store.engine.name != "sqlite": + output_file = tmp_path / "mapped_data" + else: + output_file = None + store.map_table_time_config( + src_schema.name, + dst_schema, + resampling_operation=operation, + output_file=output_file, + check_mapped_timestamps=True, + ) + if output_file is None or store.engine.name == "sqlite": + df2 = store.read_table(dst_schema.name) + else: + df2 = pd.read_parquet(output_file) + assert len(df2) == to_time_array_len * 3 + actual = sorted(df2["timestamp"].unique()) + assert isinstance(src_schema.time_config, DatetimeRange) + expected = make_time_range_generator(dst_schema.time_config).list_timestamps() + check_timestamp_lists(actual, expected) + + if isinstance(operation, AggregationType): + val = df2[df2["generator"] == "gen1"].sort_values(by="timestamp").iloc[0, 2] + if operation == AggregationType.SUM: + assert val == (1 + 24) * 24 / 2 + if operation == AggregationType.AVG: + assert val == (1 + 24) * 24 / 2 / 24 + if operation == AggregationType.MIN: + assert val == 1 + if operation == AggregationType.MAX: + assert val == 24 + + if isinstance(operation, DisaggregationType): + val = df2[df2["generator"] == "gen1"].sort_values(by="timestamp").iloc[1, 2].round(3) + if operation == DisaggregationType.DUPLICATE_FFILL: + assert val == 1 + if operation == DisaggregationType.DUPLICATE_BFILL: + assert val == 2 + if operation == DisaggregationType.INTERPOLATE: + assert val == 1.333 + if operation == DisaggregationType.UNIFORM_DISAGGREGATE: + assert val == 0.333 + + def test_map_index_time_to_datetime( tmp_path: Path, iter_stores_by_engine_no_data_ingestion: Store ) -> None: