From e8fa5ab64bdf60687e2ea4ff7d2f8a40dab770c4 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 11 Mar 2025 10:17:40 -0600 Subject: [PATCH 01/11] bug fix --- src/chronify/time_series_mapper_datetime.py | 7 +++++-- tests/test_mapper_datetime_to_datetime.py | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 57103b2..335acad 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -106,7 +106,11 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: # tz-naive time does not have skips/dups, so always localize in std tz first ser_from = ser_from.dt.tz_localize(to_tz_std).dt.tz_convert(to_tz) case (False, True): - ser_from = ser_from.dt.tz_localize(to_tz) + # get standard time zone of fm_tz + year = self._from_time_config.start.year + fm_tz_std = datetime(year=year, month=1, day=1, tzinfo=fm_tz).tzname() + # convert to standard time zone of fm_tz before making it tz-naive + ser_from = ser_from.dt.tz_convert(fm_tz_std).dt.tz_localize(to_tz) match (self._adjust_interval, self._wrap_time_allowed): case (True, _): ser = roll_time_interval( @@ -126,7 +130,6 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: to_time_col: ser, } ) - assert ( df[to_time_col].nunique() == self._to_time_config.length ), "to_time_col does not have the right number of timestamps" diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py index ba9fa00..a65671b 100644 --- a/tests/test_mapper_datetime_to_datetime.py +++ b/tests/test_mapper_datetime_to_datetime.py @@ -188,9 +188,9 @@ def test_time_interval_shift_different_time_ranges( @pytest.mark.parametrize( "tzinfo_tuple", [ - # (ZoneInfo("US/Eastern"), None), + (ZoneInfo("US/Eastern"), None), (None, ZoneInfo("EST")), - # (ZoneInfo("US/Eastern"), ZoneInfo("US/Mountain")), + (ZoneInfo("US/Eastern"), ZoneInfo("US/Mountain")), ], ) def test_time_shift_different_timezones( From f805624e44e3af28a08b0594cfed3f4810c04d9c Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 11 Mar 2025 10:54:30 -0600 Subject: [PATCH 02/11] refactor for resolution --- src/chronify/time_series_mapper_datetime.py | 48 ++++++++++++++------- tests/test_mapper_datetime_to_datetime.py | 19 ++++++++ 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 335acad..eafc186 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -40,20 +40,22 @@ def __init__( raise InvalidParameter(msg) self._from_time_config: DatetimeRange = self._from_schema.time_config self._to_time_config: DatetimeRange = self._to_schema.time_config + self._resampling_type = self.get_resampling_type() + + def get_resampling_type(self) -> Optional[str]: + if self._from_time_config.resolution < self._to_time_config.resolution: + return "aggregation" + if self._from_time_config.resolution > self._to_time_config.resolution: + return "disaggregation" + return None def check_schema_consistency(self) -> None: """Check that from_schema can produce to_schema.""" self._check_table_columns_producibility() self._check_measurement_type_consistency() self._check_time_interval_type() - self._check_time_resolution() self._check_time_length() - def _check_time_resolution(self) -> None: - if self._from_time_config.resolution != self._to_time_config.resolution: - msg = "Handling of changing time resolution is not supported yet." - raise NotImplementedError(msg) - def _check_time_length(self) -> None: flen, tlen = self._from_time_config.length, self._to_time_config.length if flen != tlen and not self._wrap_time_allowed: @@ -67,13 +69,21 @@ def map_time( check_mapped_timestamps: bool = False, ) -> None: """Convert time columns with from_schema to to_schema configuration.""" + if self._resampling_type: + to_schema = self._to_schema.model_copy(deep=True) + to_schema.time_config.resolution = self._from_time_config.resolution + to_time_config = to_schema.time_config + else: + to_schema = self._to_schema + to_time_config = self._to_time_config + self.check_schema_consistency() - df, mapping_schema = self._create_mapping() + df, mapping_schema = self._create_mapping(to_time_config) apply_mapping( df, mapping_schema, self._from_schema, - self._to_schema, + to_schema, self._engine, self._metadata, self._data_adjustment, @@ -81,27 +91,33 @@ def map_time( output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, ) + # if self._resampling_type == "aggregation": + # df, mapping_schema = self._create_aggregation_mapping(from_res, to_res) + # elif self._resampling_type == "disaggregation": + # df, mapping_schema = self._create_disaggregation_mapping(from_res, to_res) # TODO - add handling for changing resolution - Issue #30 - def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: + def _create_mapping( + self, to_time_config: DatetimeRange + ) -> tuple[pd.DataFrame, MappingTableSchema]: """Create mapping dataframe Handles time interval type """ from_time_col = "from_" + self._from_time_config.time_column - to_time_col = self._to_time_config.time_column + to_time_col = to_time_config.time_column from_time_data = make_time_range_generator(self._from_time_config).list_timestamps() to_time_data = make_time_range_generator( - self._to_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment + to_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment ).list_timestamps() ser_from = pd.Series(from_time_data) # If from_tz or to_tz is naive, use tz_localize fm_tz = self._from_time_config.start.tzinfo - to_tz = self._to_time_config.start.tzinfo + to_tz = to_time_config.start.tzinfo match (fm_tz is None, to_tz is None): case (True, False): # get standard time zone of to_tz - year = self._to_time_config.start.year + year = to_time_config.start.year to_tz_std = datetime(year=year, month=1, day=1, tzinfo=to_tz).tzname() # tz-naive time does not have skips/dups, so always localize in std tz first ser_from = ser_from.dt.tz_localize(to_tz_std).dt.tz_convert(to_tz) @@ -116,7 +132,7 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: ser = roll_time_interval( ser_from, self._from_time_config.interval_type, - self._to_time_config.interval_type, + to_time_config.interval_type, to_time_data, ) case (False, True): @@ -131,7 +147,7 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: } ) assert ( - df[to_time_col].nunique() == self._to_time_config.length + df[to_time_col].nunique() == to_time_config.length ), "to_time_col does not have the right number of timestamps" from_time_config = self._from_time_config.model_copy() from_time_config.time_column = from_time_col @@ -139,7 +155,7 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: name="mapping_table", time_configs=[ from_time_config, - self._to_time_config, + to_time_config, ], ) return df, mapping_schema diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py index a65671b..07e379a 100644 --- a/tests/test_mapper_datetime_to_datetime.py +++ b/tests/test_mapper_datetime_to_datetime.py @@ -168,6 +168,25 @@ def test_time_interval_shift( check_time_shift_values(df, queried, from_schema.time_config, to_schema.time_config) +@pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern")]) # , None]) +def test_map_time_resolution( + iter_engines: Engine, + tzinfo: ZoneInfo | None, +) -> None: + from_schema = get_datetime_schema( + 2020, tzinfo, TimeIntervalType.PERIOD_BEGINNING, "from_table" + ) + df = generate_datetime_dataframe(from_schema) + to_schema = get_datetime_schema(2020, tzinfo, TimeIntervalType.PERIOD_ENDING, "to_table") + to_schema.time_config.resolution = timedelta(minutes=30) + + breakpoint() + + queried = get_mapped_results(iter_engines, df, from_schema, to_schema) + check_time_shift_timestamps(df, queried, to_schema.time_config) + check_time_shift_values(df, queried, from_schema.time_config, to_schema.time_config) + + @pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) def test_time_interval_shift_different_time_ranges( iter_engines: Engine, From 2890c538cc978aa9e8f866563c61e4bc05f6593f Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 11 Mar 2025 13:27:43 -0600 Subject: [PATCH 03/11] add resampling_operation param --- src/chronify/store.py | 6 ++ src/chronify/time.py | 23 ++++- src/chronify/time_series_mapper.py | 34 ++++++- src/chronify/time_series_mapper_base.py | 4 +- ...apper_column_representative_to_datetime.py | 10 ++- src/chronify/time_series_mapper_datetime.py | 86 ++++++++++++++++-- src/chronify/time_series_mapper_index_time.py | 11 ++- .../time_series_mapper_representative.py | 10 ++- tests/test_mapper_datetime_to_datetime.py | 19 ---- tests/test_mapper_index_time_to_datetime.py | 2 +- tests/test_store.py | 90 ++++++++++++++++++- 11 files changed, 255 insertions(+), 40 deletions(-) diff --git a/src/chronify/store.py b/src/chronify/store.py index cbf82fd..15bcb67 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -51,6 +51,7 @@ from chronify.time_series_mapper import map_time from chronify.utils.path_utils import check_overwrite, to_path from chronify.utils.sqlalchemy_view import create_view +from chronify.time import ResamplingOperationType class Store: @@ -845,6 +846,7 @@ def map_table_time_config( dst_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, check_mapped_timestamps: bool = False, @@ -864,6 +866,9 @@ def map_table_time_config( wrap_time_allowed Defines whether the time column is allowed to be wrapped according to the time config in dst_schema when it does not line up with the time config + resampling_operation + Defines the operation type for resampling when the time resolution in the source + data differs from the dst_schema scratch_dir Directory to use for temporary writes. Default to the system's tmp filesystem. check_mapped_timestamps @@ -929,6 +934,7 @@ def map_table_time_config( dst_schema, data_adjustment=data_adjustment, wrap_time_allowed=wrap_time_allowed, + resampling_operation=resampling_operation, scratch_dir=scratch_dir, output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, diff --git a/src/chronify/time.py b/src/chronify/time.py index ab57b86..74a9164 100644 --- a/src/chronify/time.py +++ b/src/chronify/time.py @@ -1,7 +1,7 @@ """Definitions related to time""" from enum import StrEnum -from typing import NamedTuple +from typing import NamedTuple, Union from zoneinfo import ZoneInfo from chronify.exceptions import InvalidParameter @@ -114,6 +114,27 @@ class MeasurementType(StrEnum): # description="Data values represent the sum of values in a time range", +class AggregationType(StrEnum): + """Operation types for resampling / aggregation""" + + SUM = "sum" + AVG = "avg" + MIN = "min" + MAX = "max" + + +class DisaggregationType(StrEnum): + """Operation types for resampling / disaggregation""" + + INTERPOLATE = "interpolate" + DUPLICATE_FFILL = "duplicate_ffill" + DUPLICATE_BFILL = "duplicate_bfill" + UNIFORM_DISAGGREGATE = "uniform_disaggregate" + + +ResamplingOperationType = Union[AggregationType, DisaggregationType] + + class TimeZone(StrEnum): """Time zones""" diff --git a/src/chronify/time_series_mapper.py b/src/chronify/time_series_mapper.py index a03a466..cbf10ca 100644 --- a/src/chronify/time_series_mapper.py +++ b/src/chronify/time_series_mapper.py @@ -17,6 +17,7 @@ TimeBasedDataAdjustment, ColumnRepresentativeBase, ) +from chronify.time import ResamplingOperationType def map_time( @@ -26,6 +27,7 @@ def map_time( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, check_mapped_timestamps: bool = False, @@ -35,7 +37,13 @@ def map_time( to_schema.time_config, DatetimeRange ): MapperRepresentativeTimeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, @@ -45,7 +53,13 @@ def map_time( to_schema.time_config, DatetimeRange ): MapperDatetimeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, @@ -55,7 +69,13 @@ def map_time( to_schema.time_config, DatetimeRange ): MapperIndexTimeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, @@ -67,7 +87,13 @@ def map_time( # No way to generate expected timestamps for YearMonthDayPeriodTimeNTZ # Is there a way to only check the output datetime timestamps? MapperColumnRepresentativeToDatetime( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index 5b36339..82e4d96 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -18,7 +18,7 @@ from chronify.exceptions import ConflictingInputsError from chronify.utils.sqlalchemy_table import create_table from chronify.time_series_checker import check_timestamps -from chronify.time import TimeIntervalType +from chronify.time import TimeIntervalType, ResamplingOperationType from chronify.time_configs import TimeBasedDataAdjustment @@ -33,6 +33,7 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: self._engine = engine self._metadata = metadata @@ -45,6 +46,7 @@ def __init__( self._from_schema.time_config.interval_type != self._to_schema.time_config.interval_type ) + self._resampling_operation = resampling_operation @abc.abstractmethod def check_schema_consistency(self) -> None: diff --git a/src/chronify/time_series_mapper_column_representative_to_datetime.py b/src/chronify/time_series_mapper_column_representative_to_datetime.py index 18ed8cb..09f0645 100644 --- a/src/chronify/time_series_mapper_column_representative_to_datetime.py +++ b/src/chronify/time_series_mapper_column_representative_to_datetime.py @@ -20,6 +20,7 @@ from chronify.models import MappingTableSchema, TableSchema from chronify.sqlalchemy.functions import read_database, write_database from chronify.utils.sqlalchemy_table import create_table +from chronify.time import ResamplingOperationType class MapperColumnRepresentativeToDatetime(TimeSeriesMapperBase): @@ -57,9 +58,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) if not isinstance(to_schema.time_config, DatetimeRange): diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index eafc186..8fb1957 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -12,6 +12,7 @@ from chronify.time_configs import DatetimeRange, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_utils import roll_time_interval, wrap_timestamps +from chronify.time import ResamplingOperationType, AggregationType, DisaggregationType logger = logging.getLogger(__name__) @@ -25,9 +26,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) if self._from_schema == self._to_schema and self._data_adjustment is None: msg = f"from_schema is the same as to_schema and no data_adjustment, nothing to do.\n{self._from_schema}" @@ -54,14 +62,39 @@ def check_schema_consistency(self) -> None: self._check_table_columns_producibility() self._check_measurement_type_consistency() self._check_time_interval_type() - self._check_time_length() + self._check_time_resolution_and_length() + self._check_resampling_consistency() - def _check_time_length(self) -> None: + def _check_time_resolution_and_length(self) -> None: + """Check that from_schema time resolution and length can produce to_schema counterparts.""" flen, tlen = self._from_time_config.length, self._to_time_config.length - if flen != tlen and not self._wrap_time_allowed: - msg = f"Length must match between {self._from_schema.__class__} from_schema and {self._to_schema.__class__} to_schema. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + fres, tres = self._from_time_config.resolution, self._to_time_config.resolution + if fres == tres and flen != tlen and not self._wrap_time_allowed: + msg = f"For unchanging time resolution, length must match between {self._from_schema.__class__} from_schema and {self._to_schema.__class__} to_schema. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + raise ConflictingInputsError(msg) + + # Resolution must be mutiples of each other + if fres < tres and tres % fres > 0: + msg = f"For aggregation, the time resolution in {self._to_schema.__class__} to_schema must be a multiple of that in {self._from_schema.__class__} from_schema. {tres} vs {fres}" + if fres > tres and fres % tres > 0: + msg = f"For disaggregation, the time resolution in {self._from_schema.__class__} from_schema must be a multiple of that in {self._to_schema.__class__} to_schema. {fres} vs {tres}" + + # No extrapolation allowed + if flen * fres < tlen * tres: + msg = f"The product of time resolution and length in {self._from_schema.__class__} from_schema cannot be greater than that in {self._to_schema.__class__} to_schema." raise ConflictingInputsError(msg) + def _check_resampling_consistency(self) -> None: + """Check resampling operation type is consistent with time resolution inputs.""" + if self._resampling_type is None and self._resampling_operation is not None: + msg = f"For unchanging time resolution, {self._resampling_operation=} must be set to None." + raise ConflictingInputsError(msg) + + for typ, opt in [("aggregation", AggregationType), ("disaggregation", DisaggregationType)]: + if self._resampling_type == typ and not isinstance(self._resampling_operation, opt): + msg = f"{typ} detected, {self._resampling_operation=} must be set to an option from {opt}" + raise ConflictingInputsError(msg) + def map_time( self, scratch_dir: Optional[Path] = None, @@ -72,6 +105,7 @@ def map_time( if self._resampling_type: to_schema = self._to_schema.model_copy(deep=True) to_schema.time_config.resolution = self._from_time_config.resolution + to_schema.time_config.length = self._from_time_config.length to_time_config = to_schema.time_config else: to_schema = self._to_schema @@ -91,17 +125,19 @@ def map_time( output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, ) - # if self._resampling_type == "aggregation": - # df, mapping_schema = self._create_aggregation_mapping(from_res, to_res) + if self._resampling_type == "aggregation": + df, mapping_schema = self._create_aggregation_mapping( + to_time_config, self._to_time_config + ) # elif self._resampling_type == "disaggregation": - # df, mapping_schema = self._create_disaggregation_mapping(from_res, to_res) + # df, mapping_schema = self._create_disaggregation_mapping(to_time_config, self._to_time_config) # TODO - add handling for changing resolution - Issue #30 def _create_mapping( self, to_time_config: DatetimeRange ) -> tuple[pd.DataFrame, MappingTableSchema]: """Create mapping dataframe - Handles time interval type + Handles time interval type but not resolution """ from_time_col = "from_" + self._from_time_config.time_column to_time_col = to_time_config.time_column @@ -159,3 +195,35 @@ def _create_mapping( ], ) return df, mapping_schema + + def _create_aggregation_mapping( + self, from_time_config: DatetimeRange, to_time_config: DatetimeRange + ) -> tuple[pd.DataFrame, MappingTableSchema]: + """Create mapping dataframe for aggregation""" + from_time_col = "from_" + from_time_config.time_column + to_time_col = to_time_config.time_column + from_time_data = make_time_range_generator( + from_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment + ).list_timestamps() + to_time_data = make_time_range_generator( + to_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment + ).list_timestamps() + df = pd.Series(from_time_data).rename(from_time_col).to_frame() + df = df.join( + pd.Series(to_time_data, index=to_time_data).rename(to_time_col), on=from_time_col + ) + limit = to_time_config.resolution / from_time_config.resolution - 1 + assert limit % 1 == 0, f"limit is not a whole number {limit}" + df[to_time_col].ffill(limit=int(limit), inplace=True) + + # mapping schema + from_time_config = from_time_config.model_copy() + from_time_config.time_column = from_time_col + mapping_schema = MappingTableSchema( + name="mapping_table", + time_configs=[ + from_time_config, + to_time_config, + ], + ) + return df, mapping_schema diff --git a/src/chronify/time_series_mapper_index_time.py b/src/chronify/time_series_mapper_index_time.py index 748e70b..63431a1 100644 --- a/src/chronify/time_series_mapper_index_time.py +++ b/src/chronify/time_series_mapper_index_time.py @@ -17,7 +17,7 @@ ) from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_mapper_datetime import MapperDatetimeToDatetime -from chronify.time import TimeType, DaylightSavingAdjustmentType +from chronify.time import TimeType, DaylightSavingAdjustmentType, ResamplingOperationType from chronify.sqlalchemy.functions import read_database logger = logging.getLogger(__name__) @@ -32,9 +32,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) self._dst_adjustment = self._data_adjustment.daylight_saving_adjustment if not isinstance(self._from_schema.time_config, IndexTimeRangeBase): diff --git a/src/chronify/time_series_mapper_representative.py b/src/chronify/time_series_mapper_representative.py index 7670e5b..5ceb7f0 100644 --- a/src/chronify/time_series_mapper_representative.py +++ b/src/chronify/time_series_mapper_representative.py @@ -20,6 +20,7 @@ TimeBasedDataAdjustment, ) from chronify.time_utils import shift_time_interval +from chronify.time import ResamplingOperationType logger = logging.getLogger(__name__) @@ -33,9 +34,16 @@ def __init__( to_schema: TableSchema, data_adjustment: Optional[TimeBasedDataAdjustment] = None, wrap_time_allowed: bool = False, + resampling_operation: Optional[ResamplingOperationType] = None, ) -> None: super().__init__( - engine, metadata, from_schema, to_schema, data_adjustment, wrap_time_allowed + engine, + metadata, + from_schema, + to_schema, + data_adjustment, + wrap_time_allowed, + resampling_operation, ) if not isinstance(from_schema.time_config, RepresentativePeriodTimeBase): msg = "source schema does not have RepresentativePeriodTimeBase time config. Use a different mapper." diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py index 07e379a..a65671b 100644 --- a/tests/test_mapper_datetime_to_datetime.py +++ b/tests/test_mapper_datetime_to_datetime.py @@ -168,25 +168,6 @@ def test_time_interval_shift( check_time_shift_values(df, queried, from_schema.time_config, to_schema.time_config) -@pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern")]) # , None]) -def test_map_time_resolution( - iter_engines: Engine, - tzinfo: ZoneInfo | None, -) -> None: - from_schema = get_datetime_schema( - 2020, tzinfo, TimeIntervalType.PERIOD_BEGINNING, "from_table" - ) - df = generate_datetime_dataframe(from_schema) - to_schema = get_datetime_schema(2020, tzinfo, TimeIntervalType.PERIOD_ENDING, "to_table") - to_schema.time_config.resolution = timedelta(minutes=30) - - breakpoint() - - queried = get_mapped_results(iter_engines, df, from_schema, to_schema) - check_time_shift_timestamps(df, queried, to_schema.time_config) - check_time_shift_values(df, queried, from_schema.time_config, to_schema.time_config) - - @pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) def test_time_interval_shift_different_time_ranges( iter_engines: Engine, diff --git a/tests/test_mapper_index_time_to_datetime.py b/tests/test_mapper_index_time_to_datetime.py index 735d834..d1283c3 100644 --- a/tests/test_mapper_index_time_to_datetime.py +++ b/tests/test_mapper_index_time_to_datetime.py @@ -218,7 +218,7 @@ def test_unaligned_time_mapping_without_wrap_time(iter_engines: Engine) -> None: src_df, src_schema, dst_schema = data_for_unaligned_time_mapping() error = ( ConflictingInputsError, - "Length must match between", + "For unchanging time resolution, length must match between", ) run_test( iter_engines, diff --git a/tests/test_store.py b/tests/test_store.py index 9c5b0cc..f8ab953 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -33,7 +33,7 @@ ) from chronify.models import ColumnDType, CsvTableSchema, PivotedTableSchema, TableSchema from chronify.store import Store -from chronify.time import TimeIntervalType, DaylightSavingAdjustmentType +from chronify.time import TimeIntervalType, DaylightSavingAdjustmentType, AggregationType from chronify.time_configs import DatetimeRange, IndexTimeRangeLocalTime, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_checker import check_timestamp_lists @@ -506,6 +506,94 @@ def test_map_datetime_to_datetime( check_timestamp_lists(actual, expected) +@pytest.mark.skip() +@pytest.mark.parametrize( + "params", + [ + (ZoneInfo("EST"), timedelta(hours=1), timedelta(hours=24), AggregationType.SUM), + # (None, timedelta(hours=1)), timedelta(hours=24), AggregationType.SUM) + ], +) +def test_map_datetime_to_datetime_with_resampling( + tmp_path, iter_stores_by_engine_no_data_ingestion: Store, params +): + tzinfo, fm_res, to_res, operation = params + store = iter_stores_by_engine_no_data_ingestion + time_array_len = 8784 + year = 2020 + + src_time_config = DatetimeRange( + start=datetime(year=year, month=1, day=1, hour=0, tzinfo=tzinfo), + resolution=fm_res, + length=time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + dst_time_config = DatetimeRange( + start=datetime(year=year, month=1, day=1, hour=1, tzinfo=tzinfo), + resolution=to_res, + length=time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + + src_csv_schema = CsvTableSchema( + time_config=src_time_config, + column_dtypes=[ + ColumnDType(name="timestamp", dtype=DateTime(timezone=False)), + ColumnDType(name="gen1", dtype=Double()), + ColumnDType(name="gen2", dtype=Double()), + ColumnDType(name="gen3", dtype=Double()), + ], + value_columns=["gen1", "gen2", "gen3"], + pivoted_dimension_name="generator", + time_array_id_columns=[], + ) + dst_schema = TableSchema( + name="generators_pe", + time_config=dst_time_config, + time_array_id_columns=["generator"], + value_column="value", + ) + rel = read_csv(GENERATOR_TIME_SERIES_FILE, src_csv_schema) + rel2 = unpivot(rel, ("gen1", "gen2", "gen3"), "generator", "value") # noqa: F841 + + src_schema = TableSchema( + name="generators_pb", + time_config=src_time_config, + time_array_id_columns=["generator"], + value_column="value", + ) + if store.engine.name == "hive": + out_file = tmp_path / "data.parquet" + rel2.to_df().to_parquet(out_file) + store.create_view_from_parquet(out_file, src_schema) + else: + store.ingest_table(rel2, src_schema) + + if tzinfo is None and store.engine.name != "sqlite": + output_file = tmp_path / "mapped_data" + else: + output_file = None + store.map_table_time_config( + src_schema.name, + dst_schema, + resampling_operation=operation, + output_file=output_file, + check_mapped_timestamps=True, + ) + if output_file is None or store.engine.name == "sqlite": + df2 = store.read_table(dst_schema.name) + else: + df2 = pd.read_parquet(output_file) + assert len(df2) == time_array_len * 3 + actual = sorted(df2["timestamp"].unique()) + assert isinstance(src_schema.time_config, DatetimeRange) + assert actual[0] == src_schema.time_config.start + timedelta(hours=1) + expected = make_time_range_generator(dst_schema.time_config).list_timestamps() + check_timestamp_lists(actual, expected) + + def test_map_index_time_to_datetime( tmp_path: Path, iter_stores_by_engine_no_data_ingestion: Store ) -> None: From d04f5d6e1ab2da8a91979a25ccd6967d9b76f4f1 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 11 Mar 2025 15:55:11 -0600 Subject: [PATCH 04/11] aggregation --- src/chronify/time_series_mapper_base.py | 41 ++++++++++++++-- src/chronify/time_series_mapper_datetime.py | 48 ++++++++++++++----- src/chronify/time_series_mapper_index_time.py | 2 +- tests/test_store.py | 25 +++++----- 4 files changed, 88 insertions(+), 28 deletions(-) diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index 82e4d96..22a9688 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -6,7 +6,7 @@ import pandas as pd from loguru import logger -from sqlalchemy import Engine, MetaData, Table, select, text +from sqlalchemy import Engine, MetaData, Table, select, text, func from chronify.hive_functions import create_materialized_view from chronify.sqlalchemy.functions import ( @@ -18,7 +18,7 @@ from chronify.exceptions import ConflictingInputsError from chronify.utils.sqlalchemy_table import create_table from chronify.time_series_checker import check_timestamps -from chronify.time import TimeIntervalType, ResamplingOperationType +from chronify.time import TimeIntervalType, ResamplingOperationType, AggregationType from chronify.time_configs import TimeBasedDataAdjustment @@ -93,6 +93,7 @@ def apply_mapping( engine: Engine, metadata: MetaData, data_adjustment: TimeBasedDataAdjustment, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, check_mapped_timestamps: bool = False, @@ -118,6 +119,7 @@ def apply_mapping( to_schema, engine, metadata, + resampling_operation, scratch_dir, output_file, ) @@ -130,6 +132,11 @@ def apply_mapping( mapped_table = Table(to_schema.name, metadata) with engine.connect() as conn: try: + # TODO <--- + # if resampling_operation: + # with engine.connect() as conn: + # df = read_database(f"select * from {mapped_table.name}", conn, to_schema.time_config) + # breakpoint() check_timestamps( conn, mapped_table, @@ -163,6 +170,7 @@ def _apply_mapping( to_schema: TableSchema, engine: Engine, metadata: MetaData, + resampling_operation: Optional[ResamplingOperationType] = None, scratch_dir: Optional[Path] = None, output_file: Optional[Path] = None, ) -> None: @@ -177,13 +185,32 @@ def _apply_mapping( set(from_schema.list_columns()) ) + val_col = to_schema.value_column # from left_table final_cols = set(to_schema.list_columns()).union(left_table_pass_thru_columns) right_cols = set(right_table_columns).intersection(final_cols) - left_cols = final_cols - right_cols + left_cols = final_cols - right_cols - {val_col} select_stmt = [left_table.c[x] for x in left_cols] select_stmt += [right_table.c[x] for x in right_cols] + tval_col = left_table.c[val_col] + if not resampling_operation: + select_stmt.append(tval_col) + else: + groupby_stmt = select_stmt.copy() + match resampling_operation: + case AggregationType.SUM: + select_stmt.append(func.sum(tval_col).label(val_col)) + case AggregationType.AVG: + select_stmt.append(func.avg(tval_col).label(val_col)) + case AggregationType.MIN: + select_stmt.append(func.min(tval_col).label(val_col)) + case AggregationType.MAX: + select_stmt.append(func.max(tval_col).label(val_col)) + case _: + msg = f"Unsupported {resampling_operation=}" + raise ValueError(msg) + keys = from_schema.time_config.list_time_columns() # check time_zone tz_col = from_schema.time_config.get_time_zone_column() @@ -197,6 +224,14 @@ def _apply_mapping( on_stmt = reduce(and_, (left_table.c[x] == right_table.c["from_" + x] for x in keys)) query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt) + if resampling_operation: + query = query.group_by(*groupby_stmt) + + # TODO <--- + # with engine.connect() as conn: + # df_map = read_database(f"select * from {mapping_table_name}", conn, to_schema.time_config) + # df = read_database(query, conn, to_schema.time_config) + # breakpoint() if output_file is not None: write_query_to_parquet(engine, str(query), output_file, overwrite=True) diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 8fb1957..3099d6d 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -70,18 +70,18 @@ def _check_time_resolution_and_length(self) -> None: flen, tlen = self._from_time_config.length, self._to_time_config.length fres, tres = self._from_time_config.resolution, self._to_time_config.resolution if fres == tres and flen != tlen and not self._wrap_time_allowed: - msg = f"For unchanging time resolution, length must match between {self._from_schema.__class__} from_schema and {self._to_schema.__class__} to_schema. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + msg = f"For unchanging time resolution, length must match between from_time_config and to_time_config. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" raise ConflictingInputsError(msg) # Resolution must be mutiples of each other - if fres < tres and tres % fres > 0: - msg = f"For aggregation, the time resolution in {self._to_schema.__class__} to_schema must be a multiple of that in {self._from_schema.__class__} from_schema. {tres} vs {fres}" - if fres > tres and fres % tres > 0: - msg = f"For disaggregation, the time resolution in {self._from_schema.__class__} from_schema must be a multiple of that in {self._to_schema.__class__} to_schema. {fres} vs {tres}" + if fres < tres and tres.total_seconds() % fres.total_seconds() > 0: + msg = f"For aggregation, the time resolution in to_time_config must be a multiple of that in from_time_config. {tres} vs {fres}" + if fres > tres and fres.total_seconds() % tres.total_seconds() > 0: + msg = f"For disaggregation, the time resolution in from_time_config must be a multiple of that in to_time_config. {fres} vs {tres}" # No extrapolation allowed - if flen * fres < tlen * tres: - msg = f"The product of time resolution and length in {self._from_schema.__class__} from_schema cannot be greater than that in {self._to_schema.__class__} to_schema." + if flen * fres.total_seconds() < tlen * tres.total_seconds(): + msg = "The product of time resolution and length in from_time_config cannot be greater than that in to_time_config." raise ConflictingInputsError(msg) def _check_resampling_consistency(self) -> None: @@ -95,6 +95,18 @@ def _check_resampling_consistency(self) -> None: msg = f"{typ} detected, {self._resampling_operation=} must be set to an option from {opt}" raise ConflictingInputsError(msg) + def _create_intermediate_schema(self) -> TableSchema: + """Create the intermediate table schema for all time processing except resampling, + by producing a version of to_schema with the same time resolution and length as from_schema + """ + schema_kwargs = self._to_schema.model_dump() + schema_kwargs["time_config"]["resolution"] = self._from_time_config.resolution + schema_kwargs["time_config"]["length"] = self._from_time_config.length + schema_kwargs["name"] += "_intermediate" + schema = TableSchema(**schema_kwargs) + + return schema + def map_time( self, scratch_dir: Optional[Path] = None, @@ -103,9 +115,7 @@ def map_time( ) -> None: """Convert time columns with from_schema to to_schema configuration.""" if self._resampling_type: - to_schema = self._to_schema.model_copy(deep=True) - to_schema.time_config.resolution = self._from_time_config.resolution - to_schema.time_config.length = self._from_time_config.length + to_schema = self._create_intermediate_schema() to_time_config = to_schema.time_config else: to_schema = self._to_schema @@ -125,10 +135,24 @@ def map_time( output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, ) + if self._resampling_type == "aggregation": df, mapping_schema = self._create_aggregation_mapping( to_time_config, self._to_time_config ) + apply_mapping( + df, + mapping_schema, + to_schema, + self._to_schema, + self._engine, + self._metadata, + TimeBasedDataAdjustment(), + resampling_operation=self._resampling_operation, + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) # elif self._resampling_type == "disaggregation": # df, mapping_schema = self._create_disaggregation_mapping(to_time_config, self._to_time_config) # TODO - add handling for changing resolution - Issue #30 @@ -214,13 +238,13 @@ def _create_aggregation_mapping( ) limit = to_time_config.resolution / from_time_config.resolution - 1 assert limit % 1 == 0, f"limit is not a whole number {limit}" - df[to_time_col].ffill(limit=int(limit), inplace=True) + df[to_time_col] = df[to_time_col].ffill(limit=int(limit)) # mapping schema from_time_config = from_time_config.model_copy() from_time_config.time_column = from_time_col mapping_schema = MappingTableSchema( - name="mapping_table", + name="aggregation_table", time_configs=[ from_time_config, to_time_config, diff --git a/src/chronify/time_series_mapper_index_time.py b/src/chronify/time_series_mapper_index_time.py index 63431a1..ca5df60 100644 --- a/src/chronify/time_series_mapper_index_time.py +++ b/src/chronify/time_series_mapper_index_time.py @@ -69,7 +69,7 @@ def _check_time_resolution(self) -> None: def _check_time_length(self) -> None: flen, tlen = self._from_time_config.length, self._to_time_config.length if flen != tlen and not self._wrap_time_allowed: - msg = f"Length must match between {self._from_schema.__class__} from_schema and {self._to_schema.__class__} to_schema. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" + msg = f"Length must match between from_time_config and to_time_config. {flen} vs. {tlen} OR wrap_time_allowed must be set to True" raise ConflictingInputsError(msg) def map_time( diff --git a/tests/test_store.py b/tests/test_store.py index f8ab953..8387f4d 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -506,33 +506,35 @@ def test_map_datetime_to_datetime( check_timestamp_lists(actual, expected) -@pytest.mark.skip() +@pytest.mark.parametrize("tzinfo", [ZoneInfo("EST")]) # , None]) @pytest.mark.parametrize( "params", [ - (ZoneInfo("EST"), timedelta(hours=1), timedelta(hours=24), AggregationType.SUM), - # (None, timedelta(hours=1)), timedelta(hours=24), AggregationType.SUM) + (timedelta(hours=1), timedelta(hours=24), AggregationType.SUM), ], ) def test_map_datetime_to_datetime_with_resampling( - tmp_path, iter_stores_by_engine_no_data_ingestion: Store, params + tmp_path, iter_stores_by_engine_no_data_ingestion: Store, tzinfo, params ): - tzinfo, fm_res, to_res, operation = params + fm_res, to_res, operation = params store = iter_stores_by_engine_no_data_ingestion - time_array_len = 8784 year = 2020 + start = datetime(year=year, month=1, day=1, hour=0, tzinfo=tzinfo) + end = datetime(year=year + 1, month=1, day=1, hour=0, tzinfo=tzinfo) + fm_time_array_len = (end - start) / fm_res + to_time_array_len = (end - start) / to_res src_time_config = DatetimeRange( - start=datetime(year=year, month=1, day=1, hour=0, tzinfo=tzinfo), + start=start, resolution=fm_res, - length=time_array_len, + length=fm_time_array_len, interval_type=TimeIntervalType.PERIOD_BEGINNING, time_column="timestamp", ) dst_time_config = DatetimeRange( - start=datetime(year=year, month=1, day=1, hour=1, tzinfo=tzinfo), + start=start, resolution=to_res, - length=time_array_len, + length=to_time_array_len, interval_type=TimeIntervalType.PERIOD_BEGINNING, time_column="timestamp", ) @@ -586,10 +588,9 @@ def test_map_datetime_to_datetime_with_resampling( df2 = store.read_table(dst_schema.name) else: df2 = pd.read_parquet(output_file) - assert len(df2) == time_array_len * 3 + assert len(df2) == to_time_array_len * 3 actual = sorted(df2["timestamp"].unique()) assert isinstance(src_schema.time_config, DatetimeRange) - assert actual[0] == src_schema.time_config.start + timedelta(hours=1) expected = make_time_range_generator(dst_schema.time_config).list_timestamps() check_timestamp_lists(actual, expected) From d552014f2c124b09cb4d130374fef0af6b51e14c Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 11 Mar 2025 22:36:20 -0600 Subject: [PATCH 05/11] working for datetime --- src/chronify/time_series_mapper_base.py | 5 +- ...apper_column_representative_to_datetime.py | 7 +- src/chronify/time_series_mapper_datetime.py | 122 ++++++++++++++++-- src/chronify/time_series_mapper_index_time.py | 3 + .../time_series_mapper_representative.py | 1 + tests/test_store.py | 40 +++++- 6 files changed, 160 insertions(+), 18 deletions(-) diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index 22a9688..d61821a 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -194,6 +194,8 @@ def _apply_mapping( select_stmt += [right_table.c[x] for x in right_cols] tval_col = left_table.c[val_col] + if "factor" in right_table_columns: + tval_col *= right_table.c["factor"] if not resampling_operation: select_stmt.append(tval_col) else: @@ -227,7 +229,8 @@ def _apply_mapping( if resampling_operation: query = query.group_by(*groupby_stmt) - # TODO <--- + # # TODO <--- + # from chronify.sqlalchemy.functions import read_database # with engine.connect() as conn: # df_map = read_database(f"select * from {mapping_table_name}", conn, to_schema.time_config) # df = read_database(query, conn, to_schema.time_config) diff --git a/src/chronify/time_series_mapper_column_representative_to_datetime.py b/src/chronify/time_series_mapper_column_representative_to_datetime.py index 09f0645..5d03015 100644 --- a/src/chronify/time_series_mapper_column_representative_to_datetime.py +++ b/src/chronify/time_series_mapper_column_representative_to_datetime.py @@ -113,9 +113,10 @@ def map_time( self._engine, self._metadata, self._data_adjustment, - scratch_dir, - output_file, - check_mapped_timestamps, + resampling_operation=self._resampling_operation, + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, ) if drop_table: diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 3099d6d..9ae9b01 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -2,6 +2,7 @@ from pathlib import Path from typing import Optional from datetime import datetime +import numpy as np import pandas as pd from sqlalchemy import Engine, MetaData @@ -12,7 +13,12 @@ from chronify.time_configs import DatetimeRange, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_utils import roll_time_interval, wrap_timestamps -from chronify.time import ResamplingOperationType, AggregationType, DisaggregationType +from chronify.time import ( + ResamplingOperationType, + AggregationType, + DisaggregationType, + LeapDayAdjustmentType, +) logger = logging.getLogger(__name__) @@ -131,15 +137,26 @@ def map_time( self._engine, self._metadata, self._data_adjustment, + resampling_operation=None, scratch_dir=scratch_dir, output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, ) - if self._resampling_type == "aggregation": - df, mapping_schema = self._create_aggregation_mapping( - to_time_config, self._to_time_config - ) + if self._resampling_type: + if self._resampling_type == "aggregation": + df, mapping_schema = self._create_aggregation_mapping( + to_time_config, self._to_time_config, self._data_adjustment.leap_day_adjustment + ) + resampling_operation = self._resampling_operation + + else: + df, resampling_operation, mapping_schema = self._create_disaggregation_mapping( + to_time_config, + self._to_time_config, + self._data_adjustment.leap_day_adjustment, + self._resampling_operation, + ) apply_mapping( df, mapping_schema, @@ -148,13 +165,11 @@ def map_time( self._engine, self._metadata, TimeBasedDataAdjustment(), - resampling_operation=self._resampling_operation, + resampling_operation=resampling_operation, scratch_dir=scratch_dir, output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, ) - # elif self._resampling_type == "disaggregation": - # df, mapping_schema = self._create_disaggregation_mapping(to_time_config, self._to_time_config) # TODO - add handling for changing resolution - Issue #30 def _create_mapping( @@ -220,17 +235,20 @@ def _create_mapping( ) return df, mapping_schema + @staticmethod def _create_aggregation_mapping( - self, from_time_config: DatetimeRange, to_time_config: DatetimeRange + from_time_config: DatetimeRange, + to_time_config: DatetimeRange, + leap_day_adjustment: LeapDayAdjustmentType, ) -> tuple[pd.DataFrame, MappingTableSchema]: """Create mapping dataframe for aggregation""" from_time_col = "from_" + from_time_config.time_column to_time_col = to_time_config.time_column from_time_data = make_time_range_generator( - from_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment + from_time_config, leap_day_adjustment=leap_day_adjustment ).list_timestamps() to_time_data = make_time_range_generator( - to_time_config, leap_day_adjustment=self._data_adjustment.leap_day_adjustment + to_time_config, leap_day_adjustment=leap_day_adjustment ).list_timestamps() df = pd.Series(from_time_data).rename(from_time_col).to_frame() df = df.join( @@ -251,3 +269,85 @@ def _create_aggregation_mapping( ], ) return df, mapping_schema + + @staticmethod + def _create_disaggregation_mapping( + from_time_config: DatetimeRange, + to_time_config: DatetimeRange, + leap_day_adjustment: LeapDayAdjustmentType, + resampling_operation: DisaggregationType, + ) -> tuple[pd.DataFrame, AggregationType, MappingTableSchema]: + """Create mapping dataframe for disaggregation""" + from_time_col = "from_" + from_time_config.time_column + to_time_col = to_time_config.time_column + from_time_data = make_time_range_generator( + from_time_config, leap_day_adjustment=leap_day_adjustment + ).list_timestamps() + to_time_data = make_time_range_generator( + to_time_config, leap_day_adjustment=leap_day_adjustment + ).list_timestamps() + + df = pd.Series(to_time_data).rename(to_time_col).to_frame() + df = df.join( + pd.Series(from_time_data, index=from_time_data).rename(from_time_col), on=to_time_col + ) + limit = from_time_config.resolution / to_time_config.resolution - 1 + assert limit % 1 == 0, f"limit is not a whole number {limit}" + limit = int(limit) + + match resampling_operation: + case DisaggregationType.DUPLICATE_FFILL: + df[from_time_col] = df[from_time_col].ffill(limit=limit) + # floor: cap rows below the from_time_config start time at start time + if df[from_time_col].isna().sum() > 0: + assert df[from_time_col].isna().sum() <= limit + df[from_time_col] = df[from_time_col].bfill(limit=limit) + aggregation_operation = None + case DisaggregationType.DUPLICATE_BFILL: + df[from_time_col] = df[from_time_col].bfill(limit=limit) + # ceiling: cap rows beyond the from_time_config end time at end time + if df[from_time_col].isna().sum() > 0: + assert df[from_time_col].isna().sum() <= limit + df[from_time_col] = df[from_time_col].ffill(limit=limit) + aggregation_operation = None + case DisaggregationType.INTERPOLATE: + df.loc[~df[from_time_col].isna(), "factor"] = 1 + df["lb"] = df[from_time_col].ffill(limit=limit).where(df[from_time_col].isna()) + df["lb_factor"] = 1 + (df["lb"] - df["timestamp"]).div(from_time_config.resolution) + df["ub"] = df[from_time_col].bfill(limit=limit).where(df[from_time_col].isna()) + df["ub_factor"] = 1 + (df["timestamp"] - df["ub"]).div(from_time_config.resolution) + # capping: if a row do not have both lb and ub, cannot interpolate, set factor to 1 + for fact_col in ["lb_factor", "ub_factor"]: + cond = ~(df[fact_col].where(df["lb"].isna() | df["ub"].isna()).isna()) + df.loc[cond, fact_col] = 1 + lst = [] + for ts_col, fact_col in zip( + [from_time_col, "lb", "ub"], ["factor", "lb_factor", "ub_factor"] + ): + lst.append( + df.loc[~df[ts_col].isna(), [to_time_col, ts_col, fact_col]].rename( + columns={ts_col: from_time_col, fact_col: "factor"} + ) + ) + df = pd.concat(lst).sort_values(by=[to_time_col], ignore_index=True) + assert df.groupby(to_time_col)["factor"].sum().unique().round(3) == np.array([1]) + aggregation_operation = AggregationType.SUM + case DisaggregationType.UNIFORM_DISAGGREGATE: + df[from_time_col] = df[from_time_col].ffill(limit=int(limit)) + df["factor"] = to_time_config.resolution / from_time_config.resolution + aggregation_operation = AggregationType.SUM + case _: + msg = f"Unsupported disaggregation {resampling_operation=}" + raise ValueError(msg) + + # mapping schema + from_time_config = from_time_config.model_copy() + from_time_config.time_column = from_time_col + mapping_schema = MappingTableSchema( + name="disaggregation_table", + time_configs=[ + from_time_config, + to_time_config, + ], + ) + return df, aggregation_operation, mapping_schema diff --git a/src/chronify/time_series_mapper_index_time.py b/src/chronify/time_series_mapper_index_time.py index ca5df60..bbcbd3e 100644 --- a/src/chronify/time_series_mapper_index_time.py +++ b/src/chronify/time_series_mapper_index_time.py @@ -116,7 +116,9 @@ def map_time( self._engine, self._metadata, TimeBasedDataAdjustment(), + resampling_operation=None, scratch_dir=scratch_dir, + output_file=None, check_mapped_timestamps=False, ) @@ -128,6 +130,7 @@ def map_time( self._to_schema, self._data_adjustment, self._wrap_time_allowed, + resampling_operation=self._resampling_operation, ).map_time( scratch_dir=scratch_dir, output_file=output_file, diff --git a/src/chronify/time_series_mapper_representative.py b/src/chronify/time_series_mapper_representative.py index 5ceb7f0..d165ad1 100644 --- a/src/chronify/time_series_mapper_representative.py +++ b/src/chronify/time_series_mapper_representative.py @@ -80,6 +80,7 @@ def map_time( self._engine, self._metadata, self._data_adjustment, + resampling_operation=self._resampling_operation, scratch_dir=scratch_dir, output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, diff --git a/tests/test_store.py b/tests/test_store.py index 8387f4d..1ae805f 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -33,7 +33,12 @@ ) from chronify.models import ColumnDType, CsvTableSchema, PivotedTableSchema, TableSchema from chronify.store import Store -from chronify.time import TimeIntervalType, DaylightSavingAdjustmentType, AggregationType +from chronify.time import ( + TimeIntervalType, + DaylightSavingAdjustmentType, + AggregationType, + DisaggregationType, +) from chronify.time_configs import DatetimeRange, IndexTimeRangeLocalTime, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_checker import check_timestamp_lists @@ -506,14 +511,21 @@ def test_map_datetime_to_datetime( check_timestamp_lists(actual, expected) -@pytest.mark.parametrize("tzinfo", [ZoneInfo("EST")]) # , None]) +@pytest.mark.parametrize("tzinfo", [ZoneInfo("EST")]) # [ZoneInfo("EST"), None]) @pytest.mark.parametrize( "params", [ (timedelta(hours=1), timedelta(hours=24), AggregationType.SUM), + (timedelta(hours=1), timedelta(hours=24), AggregationType.AVG), + (timedelta(hours=1), timedelta(hours=24), AggregationType.MIN), + (timedelta(hours=1), timedelta(hours=24), AggregationType.MAX), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.DUPLICATE_FFILL), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.DUPLICATE_BFILL), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.UNIFORM_DISAGGREGATE), + (timedelta(hours=1), timedelta(minutes=20), DisaggregationType.INTERPOLATE), ], ) -def test_map_datetime_to_datetime_with_resampling( +def test_map_datetime_to_datetime_with_resampling( # noqa: C901 tmp_path, iter_stores_by_engine_no_data_ingestion: Store, tzinfo, params ): fm_res, to_res, operation = params @@ -594,6 +606,28 @@ def test_map_datetime_to_datetime_with_resampling( expected = make_time_range_generator(dst_schema.time_config).list_timestamps() check_timestamp_lists(actual, expected) + if isinstance(operation, AggregationType): + val = df2[df2["generator"] == "gen1"].sort_values(by="timestamp").iloc[0, 2] + if operation == AggregationType.SUM: + assert val == (1 + 24) * 24 / 2 + if operation == AggregationType.AVG: + assert val == (1 + 24) * 24 / 2 / 24 + if operation == AggregationType.MIN: + assert val == 1 + if operation == AggregationType.MAX: + assert val == 24 + + if isinstance(operation, DisaggregationType): + val = df2[df2["generator"] == "gen1"].sort_values(by="timestamp").iloc[1, 2].round(3) + if operation == DisaggregationType.DUPLICATE_FFILL: + assert val == 1 + if operation == DisaggregationType.DUPLICATE_BFILL: + assert val == 2 + if operation == DisaggregationType.INTERPOLATE: + assert val == 1.333 + if operation == DisaggregationType.UNIFORM_DISAGGREGATE: + assert val == 0.333 + def test_map_index_time_to_datetime( tmp_path: Path, iter_stores_by_engine_no_data_ingestion: Store From 36fd1086c99624aeb0ede8901b590c5826f11ebf Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Thu, 13 Mar 2025 13:19:52 -0600 Subject: [PATCH 06/11] minor fix --- src/chronify/time_series_mapper_base.py | 7 ++++--- src/chronify/time_series_mapper_datetime.py | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index d61821a..6edcf37 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -119,9 +119,9 @@ def apply_mapping( to_schema, engine, metadata, - resampling_operation, - scratch_dir, - output_file, + resampling_operation=resampling_operation, + scratch_dir=scratch_dir, + output_file=output_file, ) if check_mapped_timestamps: if output_file is not None: @@ -233,6 +233,7 @@ def _apply_mapping( # from chronify.sqlalchemy.functions import read_database # with engine.connect() as conn: # df_map = read_database(f"select * from {mapping_table_name}", conn, to_schema.time_config) + # dfi = read_database(f"select * from {from_schema.name}", conn, from_schema.time_config) # df = read_database(query, conn, to_schema.time_config) # breakpoint() diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 9ae9b01..c1fb884 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -139,7 +139,7 @@ def map_time( self._data_adjustment, resampling_operation=None, scratch_dir=scratch_dir, - output_file=output_file, + output_file=None, check_mapped_timestamps=check_mapped_timestamps, ) @@ -313,9 +313,9 @@ def _create_disaggregation_mapping( case DisaggregationType.INTERPOLATE: df.loc[~df[from_time_col].isna(), "factor"] = 1 df["lb"] = df[from_time_col].ffill(limit=limit).where(df[from_time_col].isna()) - df["lb_factor"] = 1 + (df["lb"] - df["timestamp"]).div(from_time_config.resolution) + df["lb_factor"] = 1 + (df["lb"] - df[to_time_col]).div(from_time_config.resolution) df["ub"] = df[from_time_col].bfill(limit=limit).where(df[from_time_col].isna()) - df["ub_factor"] = 1 + (df["timestamp"] - df["ub"]).div(from_time_config.resolution) + df["ub_factor"] = 1 + (df[to_time_col] - df["ub"]).div(from_time_config.resolution) # capping: if a row do not have both lb and ub, cannot interpolate, set factor to 1 for fact_col in ["lb_factor", "ub_factor"]: cond = ~(df[fact_col].where(df["lb"].isna() | df["ub"].isna()).isna()) From 27855931ed2bb90be80f47f9f65e7d6cae55831c Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Thu, 13 Mar 2025 17:06:27 -0600 Subject: [PATCH 07/11] fix storage of intermediate output --- src/chronify/store.py | 2 +- src/chronify/time_series_mapper_datetime.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/chronify/store.py b/src/chronify/store.py index 15bcb67..dc00a88 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -936,7 +936,7 @@ def map_table_time_config( wrap_time_allowed=wrap_time_allowed, resampling_operation=resampling_operation, scratch_dir=scratch_dir, - output_file=output_file, + output_file=Path(output_file), check_mapped_timestamps=check_mapped_timestamps, ) with self._engine.begin() as conn: diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index c1fb884..f6df42e 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -139,7 +139,7 @@ def map_time( self._data_adjustment, resampling_operation=None, scratch_dir=scratch_dir, - output_file=None, + output_file=None if self._resampling_type else output_file, check_mapped_timestamps=check_mapped_timestamps, ) From 527faeea2112813fa4d220166cc22b830cd4e981 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Fri, 14 Mar 2025 09:57:20 -0600 Subject: [PATCH 08/11] Fix path issues --- src/chronify/sqlalchemy/functions.py | 3 ++- src/chronify/store.py | 2 +- src/chronify/time_series_mapper_base.py | 3 +++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/chronify/sqlalchemy/functions.py b/src/chronify/sqlalchemy/functions.py index 32e690c..2ca9fc6 100644 --- a/src/chronify/sqlalchemy/functions.py +++ b/src/chronify/sqlalchemy/functions.py @@ -17,7 +17,7 @@ from chronify.exceptions import InvalidOperation, InvalidParameter from chronify.time_configs import DatetimeRange, TimeBaseModel -from chronify.utils.path_utils import check_overwrite, delete_if_exists +from chronify.utils.path_utils import check_overwrite, delete_if_exists, to_path # Copied from Pandas/Polars DbWriteMode: TypeAlias = Literal["replace", "append", "fail"] @@ -224,6 +224,7 @@ def write_query_to_parquet( partition_columns: Optional[list[str]] = None, ) -> None: """Write the query to a Parquet file.""" + output_file = to_path(output_file) check_overwrite(output_file, overwrite) match engine.name: case "duckdb": diff --git a/src/chronify/store.py b/src/chronify/store.py index dc00a88..15bcb67 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -936,7 +936,7 @@ def map_table_time_config( wrap_time_allowed=wrap_time_allowed, resampling_operation=resampling_operation, scratch_dir=scratch_dir, - output_file=Path(output_file), + output_file=output_file, check_mapped_timestamps=check_mapped_timestamps, ) with self._engine.begin() as conn: diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index 6edcf37..c886c6b 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -20,6 +20,7 @@ from chronify.time_series_checker import check_timestamps from chronify.time import TimeIntervalType, ResamplingOperationType, AggregationType from chronify.time_configs import TimeBasedDataAdjustment +from chronify.utils.path_utils import to_path class TimeSeriesMapperBase(abc.ABC): @@ -125,6 +126,7 @@ def apply_mapping( ) if check_mapped_timestamps: if output_file is not None: + output_file = to_path(output_file) with engine.begin() as conn: create_view_from_parquet(conn, to_schema.name, output_file) metadata.reflect(engine, views=True) @@ -238,6 +240,7 @@ def _apply_mapping( # breakpoint() if output_file is not None: + output_file = to_path(output_file) write_query_to_parquet(engine, str(query), output_file, overwrite=True) return From 5baaa6b9d9428660beb39aa7fc13eae88b2d24a3 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 22 Apr 2025 23:15:33 -0600 Subject: [PATCH 09/11] add get_standard_time_zone --- src/chronify/time_series_mapper_datetime.py | 14 +++++++------- src/chronify/time_utils.py | 19 ++++++++++++++++++- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index f6df42e..9ebf86e 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -12,7 +12,7 @@ from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping from chronify.time_configs import DatetimeRange, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator -from chronify.time_utils import roll_time_interval, wrap_timestamps +from chronify.time_utils import roll_time_interval, wrap_timestamps, get_standard_time_zone from chronify.time import ( ResamplingOperationType, AggregationType, @@ -192,14 +192,12 @@ def _create_mapping( match (fm_tz is None, to_tz is None): case (True, False): # get standard time zone of to_tz - year = to_time_config.start.year - to_tz_std = datetime(year=year, month=1, day=1, tzinfo=to_tz).tzname() + to_tz_std = get_standard_time_zone(to_tz) # tz-naive time does not have skips/dups, so always localize in std tz first ser_from = ser_from.dt.tz_localize(to_tz_std).dt.tz_convert(to_tz) case (False, True): # get standard time zone of fm_tz - year = self._from_time_config.start.year - fm_tz_std = datetime(year=year, month=1, day=1, tzinfo=fm_tz).tzname() + fm_tz_std = get_standard_time_zone(fm_tz) # convert to standard time zone of fm_tz before making it tz-naive ser_from = ser_from.dt.tz_convert(fm_tz_std).dt.tz_localize(to_tz) match (self._adjust_interval, self._wrap_time_allowed): @@ -255,7 +253,8 @@ def _create_aggregation_mapping( pd.Series(to_time_data, index=to_time_data).rename(to_time_col), on=from_time_col ) limit = to_time_config.resolution / from_time_config.resolution - 1 - assert limit % 1 == 0, f"limit is not a whole number {limit}" + assert (limit % 1 == 0) and (limit > 0), f"limit must be an integer, {limit}" + limit = int(limit) df[to_time_col] = df[to_time_col].ffill(limit=int(limit)) # mapping schema @@ -292,7 +291,7 @@ def _create_disaggregation_mapping( pd.Series(from_time_data, index=from_time_data).rename(from_time_col), on=to_time_col ) limit = from_time_config.resolution / to_time_config.resolution - 1 - assert limit % 1 == 0, f"limit is not a whole number {limit}" + assert (limit % 1 == 0) and (limit > 0), f"limit must be an integer, {limit}" limit = int(limit) match resampling_operation: @@ -350,4 +349,5 @@ def _create_disaggregation_mapping( to_time_config, ], ) + if resampling_operation == DisaggregationType.INTERPOLATE: return df, aggregation_operation, mapping_schema diff --git a/src/chronify/time_utils.py b/src/chronify/time_utils.py index c55e7a6..633436c 100644 --- a/src/chronify/time_utils.py +++ b/src/chronify/time_utils.py @@ -2,7 +2,8 @@ import logging import numpy as np -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone +import zoneinfo import pandas as pd from chronify.time import ( @@ -143,3 +144,19 @@ def roll_time_interval( ser = shift_time_interval(ser, from_interval_type, to_interval_type) ser = wrap_timestamps(ser, to_timestamps) return ser + + +def get_standard_time_zone(tz: zoneinfo.ZoneInfo) -> zoneinfo.ZoneInfo | timezone | None: + ts = datetime(year=2020, month=1, day=1, tzinfo=tz) + std_tz_name = ts.tzname() + if not std_tz_name: + return None + try: + std_tz = zoneinfo.ZoneInfo(std_tz_name) + except zoneinfo._common.ZoneInfoNotFoundError: + utcoffset = ts.utcoffset() + if not utcoffset: + return None + std_tz = timezone(utcoffset) + + return std_tz From c960f589c8b87898391df12a771122565f986821 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Tue, 22 Apr 2025 23:19:49 -0600 Subject: [PATCH 10/11] remove false line --- src/chronify/time_series_mapper_datetime.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 9ebf86e..e2ec1bf 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -349,5 +349,4 @@ def _create_disaggregation_mapping( to_time_config, ], ) - if resampling_operation == DisaggregationType.INTERPOLATE: return df, aggregation_operation, mapping_schema From 42a28187f03c5d9af5f85698304aa8aac8a06d99 Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Fri, 25 Apr 2025 13:21:01 -0600 Subject: [PATCH 11/11] bug fix --- src/chronify/time_series_mapper_base.py | 6 +++--- src/chronify/time_series_mapper_datetime.py | 2 +- src/chronify/time_utils.py | 12 +++++------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index c886c6b..af3edd2 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -2,7 +2,7 @@ from functools import reduce from operator import and_ from pathlib import Path -from typing import Optional +from typing import Any, Optional import pandas as pd from loguru import logger @@ -192,12 +192,12 @@ def _apply_mapping( right_cols = set(right_table_columns).intersection(final_cols) left_cols = final_cols - right_cols - {val_col} - select_stmt = [left_table.c[x] for x in left_cols] + select_stmt: list[Any] = [left_table.c[x] for x in left_cols] select_stmt += [right_table.c[x] for x in right_cols] tval_col = left_table.c[val_col] if "factor" in right_table_columns: - tval_col *= right_table.c["factor"] + tval_col *= right_table.c["factor"] # type: ignore if not resampling_operation: select_stmt.append(tval_col) else: diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index e2ec1bf..a91cbe2 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -211,7 +211,7 @@ def _create_mapping( case (False, True): ser = wrap_timestamps(ser_from, to_time_data) case (False, False): - ser = pd.Series(to_time_data) + ser = ser_from df = pd.DataFrame( { diff --git a/src/chronify/time_utils.py b/src/chronify/time_utils.py index 633436c..bb7c6a4 100644 --- a/src/chronify/time_utils.py +++ b/src/chronify/time_utils.py @@ -2,7 +2,7 @@ import logging import numpy as np -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta, timezone, tzinfo import zoneinfo import pandas as pd @@ -146,17 +146,15 @@ def roll_time_interval( return ser -def get_standard_time_zone(tz: zoneinfo.ZoneInfo) -> zoneinfo.ZoneInfo | timezone | None: +def get_standard_time_zone(tz: tzinfo | None) -> tzinfo | None: ts = datetime(year=2020, month=1, day=1, tzinfo=tz) std_tz_name = ts.tzname() if not std_tz_name: return None try: - std_tz = zoneinfo.ZoneInfo(std_tz_name) - except zoneinfo._common.ZoneInfoNotFoundError: + return zoneinfo.ZoneInfo(std_tz_name) + except zoneinfo.ZoneInfoNotFoundError: utcoffset = ts.utcoffset() if not utcoffset: return None - std_tz = timezone(utcoffset) - - return std_tz + return timezone(utcoffset)