diff --git a/pyproject.toml b/pyproject.toml index e5e1ae7..b1dd311 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ Source = "https://github.com/NREL/chronify" files = [ "src", ] +disable_error_code = ["no-untyped-call"] strict = true [tool.pytest.ini_options] diff --git a/src/chronify/__init__.py b/src/chronify/__init__.py index 230bac8..0ca60e7 100644 --- a/src/chronify/__init__.py +++ b/src/chronify/__init__.py @@ -21,9 +21,10 @@ from chronify.time_configs import ( AnnualTimeRange, DatetimeRange, + DatetimeRangeWithTZColumn, IndexTimeRangeNTZ, IndexTimeRangeTZ, - IndexTimeRangeLocalTime, + IndexTimeRangeWithTZColumn, RepresentativePeriodTimeNTZ, RepresentativePeriodTimeTZ, TimeBaseModel, @@ -37,7 +38,8 @@ "ConflictingInputsError", "CsvTableSchema", "DatetimeRange", - "IndexTimeRangeLocalTime", + "DatetimeRangeWithTZColumn", + "IndexTimeRangeWithTZColumn", "IndexTimeRangeNTZ", "IndexTimeRangeTZ", "InvalidOperation", diff --git a/src/chronify/annual_time_range_generator.py b/src/chronify/annual_time_range_generator.py index d8e7282..ce127bb 100644 --- a/src/chronify/annual_time_range_generator.py +++ b/src/chronify/annual_time_range_generator.py @@ -11,10 +11,13 @@ def __init__(self, model: AnnualTimeRange) -> None: super().__init__() self._model = model - def iter_timestamps(self) -> Generator[int, None, None]: + def _iter_timestamps(self) -> Generator[int, None, None]: for i in range(1, self._model.length + 1): yield i + def list_timestamps(self) -> list[int]: + return list(self._iter_timestamps()) + def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[Any]: raise NotImplementedError diff --git a/src/chronify/column_representative_time_range_generator.py b/src/chronify/column_representative_time_range_generator.py index 74e09b3..5c24429 100644 --- a/src/chronify/column_representative_time_range_generator.py +++ b/src/chronify/column_representative_time_range_generator.py @@ -37,9 +37,12 @@ def __init__(self, model: ColumnRepresentativeBase): msg = f"No time generator for ColumnRepresentative time with time_config {type(self._model)}" raise exceptions.InvalidOperation(msg) - def iter_timestamps(self) -> Generator[tuple[int, ...], None, None]: + def _iter_timestamps(self) -> Generator[tuple[int, ...], None, None]: yield from self._handler._iter_timestamps() + def list_timestamps(self) -> list[tuple[int, ...]]: + return list(self._iter_timestamps()) + def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[tuple[int, ...]]: return self._handler.list_distinct_timestamps_from_dataframe(df) diff --git a/src/chronify/csv_time_series_parser.py b/src/chronify/csv_time_series_parser.py index b3484eb..1364b3d 100644 --- a/src/chronify/csv_time_series_parser.py +++ b/src/chronify/csv_time_series_parser.py @@ -109,7 +109,10 @@ def __init__(self, store: Store) -> None: def _check_input_format(data_file: Path) -> None: valid_extensions = [".csv"] if data_file.suffix not in valid_extensions: - msg = f"{data_file.name} does not have a file extension in the supported extensions: {valid_extensions}" + msg = ( + f"{data_file.name} does not have a file extension in the " + f"supported extensions: {valid_extensions}" + ) raise InvalidValue(msg) @staticmethod @@ -130,7 +133,10 @@ def _ingest_data(self, data: pd.DataFrame, table_name: str, year: int, length: i def _create_schemas( csv_fmt: CsvTimeSeriesFormats, name: str, year: int, length: int ) -> tuple[PivotedTableSchema | None, TableSchema]: - """Create a PivotedTableSchema if necessary, and a TableSchema for both the time format and datetime format.""" + """ + Create a PivotedTableSchema if necessary, and a TableSchema for both + the time format and datetime format. + """ create_pivoted_schema = True pivoted_dimension_name = "hour" value_columns = [str(x) for x in range(1, 25)] @@ -166,7 +172,8 @@ def ingest_to_datetime( self, data_file: Path, table_name: str, data_year: int, length: int ) -> None: """ - Given a file of csv time series data, convert the time format to datetime timestamps + Given a file of csv time series data, convert the time format to datetime + timestamps and ingest into database """ self._check_input_format(data_file) diff --git a/src/chronify/datetime_range_generator.py b/src/chronify/datetime_range_generator.py index 7402c03..b030e68 100644 --- a/src/chronify/datetime_range_generator.py +++ b/src/chronify/datetime_range_generator.py @@ -1,48 +1,54 @@ -from datetime import datetime, timedelta +from datetime import datetime, tzinfo from typing import Generator, Optional from zoneinfo import ZoneInfo +from itertools import chain import pandas as pd from chronify.time import ( LeapDayAdjustmentType, ) -from chronify.time_configs import ( - DatetimeRange, -) -from chronify.time_utils import adjust_timestamp_by_dst_offset +from chronify.time_configs import DatetimeRanges, DatetimeRange, DatetimeRangeWithTZColumn +from chronify.time_utils import adjust_timestamp_by_dst_offset, get_tzname from chronify.time_range_generator_base import TimeRangeGeneratorBase +from chronify.exceptions import InvalidValue -class DatetimeRangeGenerator(TimeRangeGeneratorBase): - """Generates datetime ranges based on a DatetimeRange model.""" +class DatetimeRangeGeneratorBase(TimeRangeGeneratorBase): + """Base class that generates datetime ranges based on a DatetimeRange model.""" def __init__( self, - model: DatetimeRange, + model: DatetimeRanges, leap_day_adjustment: Optional[LeapDayAdjustmentType] = None, ) -> None: self._model = model self._adjustment = leap_day_adjustment or LeapDayAdjustmentType.NONE - def iter_timestamps(self) -> Generator[datetime, None, None]: + def _iter_timestamps( + self, start: Optional[datetime] = None + ) -> Generator[datetime, None, None]: + """ + if start is supplied, override self._model.start + """ + if start is None: + start = self._model.start + tz = start.tzinfo + for i in range(self._model.length): - if self._model.start_time_is_tz_naive(): + if not tz: cur = adjust_timestamp_by_dst_offset( - self._model.start + i * self._model.resolution, self._model.resolution + start + i * self._model.resolution, self._model.resolution ) else: - tz = self._model.start.tzinfo # always step in standard time - cur_utc = ( - self._model.start.astimezone(ZoneInfo("UTC")) + i * self._model.resolution - ) + cur_utc = start.astimezone(ZoneInfo("UTC")) + i * self._model.resolution cur = adjust_timestamp_by_dst_offset( cur_utc.astimezone(tz), self._model.resolution ) is_leap_year = ( - pd.Timestamp(f"{cur.year}-01-01") + timedelta(days=365) + pd.Timestamp(f"{cur.year}-01-01") + pd.Timedelta(days=365) ).year == cur.year if not is_leap_year: yield pd.Timestamp(cur) @@ -65,8 +71,89 @@ def iter_timestamps(self) -> Generator[datetime, None, None]: ): yield pd.Timestamp(cur) - def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[datetime]: - return sorted(df[self._model.time_column].unique()) - def list_time_columns(self) -> list[str]: return self._model.list_time_columns() + + def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[datetime]: + result = sorted(df[self._model.time_column].unique()) + if not isinstance(result[0], datetime): + result = [pd.Timestamp(x) for x in result] + return result + + +class DatetimeRangeGenerator(DatetimeRangeGeneratorBase): + """Generates datetime ranges based on a DatetimeRange model.""" + + def __init__( + self, + model: DatetimeRange, + leap_day_adjustment: Optional[LeapDayAdjustmentType] = None, + ) -> None: + super().__init__(model, leap_day_adjustment=leap_day_adjustment) + assert isinstance(self._model, DatetimeRange) + + def list_timestamps(self) -> list[datetime]: + return list(self._iter_timestamps()) + + +class DatetimeRangeGeneratorExternalTimeZone(DatetimeRangeGeneratorBase): + """Generates datetime ranges based on a DatetimeRangeWithTZColumn model. + datetime ranges will be tz-naive and can be listed by time_zone name using special class func + These ranges may be localized by the time_zone name. + # TODO: add offset as a column + """ + + def __init__( + self, + model: DatetimeRangeWithTZColumn, + leap_day_adjustment: Optional[LeapDayAdjustmentType] = None, + ) -> None: + super().__init__(model, leap_day_adjustment=leap_day_adjustment) + assert isinstance(self._model, DatetimeRangeWithTZColumn) + if self._model.get_time_zones() == []: + msg = ( + f"DatetimeRangeWithTZColumn.time_zones needs to be instantiated for " + f"DatetimeRangeGeneratorExternalTimeZone: {self._model}" + ) + raise InvalidValue(msg) + + def _list_timestamps(self, time_zone: Optional[tzinfo]) -> list[datetime]: + """always return tz-naive timestamps relative to input time_zone""" + if self._model.start_time_is_tz_naive(): + if time_zone: + start = self._model.start.replace(tzinfo=time_zone) + else: + start = None + else: + if time_zone: + start = self._model.start.astimezone(time_zone) + else: + start = self._model.start.replace(tzinfo=None) + timestamps = list(self._iter_timestamps(start=start)) + return [x.replace(tzinfo=None) for x in timestamps] + + def list_timestamps(self) -> list[datetime]: + """return ordered timestamps across all time zones in the order of the time zones.""" + dct = self.list_timestamps_by_time_zone() + return list(chain(*dct.values())) + + def list_timestamps_by_time_zone(self) -> dict[str, list[datetime]]: + """for each time zone, returns full timestamp iteration (duplicates allowed)""" + dct = {} + for tz in self._model.get_time_zones(): + tz_name = get_tzname(tz) + dct[tz_name] = self._list_timestamps(tz) + + return dct + + def list_distinct_timestamps_by_time_zone_from_dataframe( + self, df: pd.DataFrame + ) -> dict[str, list[datetime]]: + tz_col = self._model.get_time_zone_column() + t_col = self._model.time_column + df[t_col] = pd.to_datetime(df[t_col]) + df2 = df[[tz_col, t_col]].drop_duplicates() + dct = {} + for tz_name in sorted(df2[tz_col].unique()): + dct[tz_name] = sorted(df2.loc[df2[tz_col] == tz_name, t_col].tolist()) + return dct diff --git a/src/chronify/exceptions.py b/src/chronify/exceptions.py index 8326f33..8186e62 100644 --- a/src/chronify/exceptions.py +++ b/src/chronify/exceptions.py @@ -14,6 +14,10 @@ class InvalidOperation(ChronifyExceptionBase): """Raised when an invalid operation is requested.""" +class InvalidModel(ChronifyExceptionBase): + """Raised when an invalid model is passed.""" + + class InvalidParameter(ChronifyExceptionBase): """Raised when an invalid parameter is passed.""" @@ -22,6 +26,10 @@ class InvalidValue(ChronifyExceptionBase): """Raised when an invalid value is passed.""" +class MissingValue(ChronifyExceptionBase): + """Raised when an expecting value is missing.""" + + class MissingParameter(ChronifyExceptionBase): """Raised when a parameter is not found or missing.""" diff --git a/src/chronify/index_time_range_generator.py b/src/chronify/index_time_range_generator.py index c78a31f..44402eb 100644 --- a/src/chronify/index_time_range_generator.py +++ b/src/chronify/index_time_range_generator.py @@ -13,9 +13,12 @@ def __init__(self, model: IndexTimeRangeBase) -> None: super().__init__() self._model = model - def iter_timestamps(self) -> Generator[int, None, None]: + def _iter_timestamps(self) -> Generator[int, None, None]: yield from range(self._model.start, self._model.length + self._model.start) + def list_timestamps(self) -> list[int]: + return list(self._iter_timestamps()) + def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[Any]: return sorted(df[self._model.time_column].unique()) diff --git a/src/chronify/models.py b/src/chronify/models.py index 348fba1..b770200 100644 --- a/src/chronify/models.py +++ b/src/chronify/models.py @@ -9,7 +9,7 @@ from typing_extensions import Annotated from chronify.base_models import ChronifyBaseModel -from chronify.exceptions import InvalidParameter +from chronify.exceptions import InvalidParameter, InvalidValue from chronify.time_configs import TimeConfig @@ -67,7 +67,7 @@ def check_name(cls, name: str) -> str: _check_name(name) if name.lower() == "table": msg = f"Table schema cannot use {name=}." - raise ValueError(msg) + raise InvalidValue(msg) return name @field_validator("value_column") @@ -102,7 +102,7 @@ def check_column(cls, value_columns: str) -> str: def check_time_array_id_columns(cls, value: list[str]) -> list[str]: if value: msg = f"PivotedTableSchema doesn't yet support time_array_id_columns: {value}" - raise ValueError(msg) + raise InvalidValue(msg) return value def list_columns(self) -> list[str]: @@ -124,7 +124,7 @@ def check_name(cls, name: str) -> str: _check_name(name) if name.lower() == "table": msg = f"Table schema cannot use {name=}." - raise ValueError(msg) + raise InvalidValue(msg) return name @field_validator("time_configs") @@ -239,11 +239,11 @@ def fix_data_type(cls, data: dict[str, Any]) -> dict[str, Any]: if val is None: options = sorted(_COLUMN_TYPES.keys()) + list(_DB_TYPES) msg = f"{dtype=} must be one of {options}" - raise ValueError(msg) + raise InvalidValue(msg) data["dtype"] = val() else: msg = f"dtype is an unsupported type: {type(dtype)}. It must be a str or type." - raise ValueError(msg) + raise InvalidValue(msg) return data @@ -287,4 +287,4 @@ class CsvTableSchemaSingleTimeArrayPivotedByComponent(CsvTableSchema): def _check_name(name: str) -> None: if not REGEX_NAME_REQUIREMENT.search(name): msg = f"A name can only have alphanumeric characters: {name=}" - raise ValueError(msg) + raise InvalidValue(msg) diff --git a/src/chronify/representative_time_range_generator.py b/src/chronify/representative_time_range_generator.py index 43dcaa9..9dbc269 100644 --- a/src/chronify/representative_time_range_generator.py +++ b/src/chronify/representative_time_range_generator.py @@ -28,8 +28,11 @@ def __init__(self, model: RepresentativePeriodTimeBase) -> None: case RepresentativePeriodFormat.ONE_WEEKDAY_DAY_AND_ONE_WEEKEND_DAY_PER_MONTH_BY_HOUR: self._handler = OneWeekdayDayAndWeekendDayPerMonthByHourHandler() - def iter_timestamps(self) -> Generator[NamedTuple, None, None]: - return self._handler.iter_timestamps() + def _iter_timestamps(self) -> Generator[NamedTuple, None, None]: + return self._handler._iter_timestamps() + + def list_timestamps(self) -> list[NamedTuple]: + return list(self._iter_timestamps()) def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[Any]: columns = self._model.list_time_columns() @@ -77,7 +80,7 @@ def get_time_type(self) -> str: """Return the time type name representing the data.""" @abc.abstractmethod - def iter_timestamps(self) -> Generator[Any, None, None]: + def _iter_timestamps(self) -> Generator[Any, None, None]: """Return an iterator over all time indexes in the table. Type of the time is dependent on the class. """ @@ -97,7 +100,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: def get_time_type() -> str: return OneWeekPerMonthByHour.__name__ - def iter_timestamps(self) -> Generator[OneWeekPerMonthByHour, None, None]: + def _iter_timestamps(self) -> Generator[OneWeekPerMonthByHour, None, None]: for month in range(1, 13): for dow in range(7): for hour in range(24): @@ -123,7 +126,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: def get_time_type() -> str: return OneWeekdayDayOneWeekendDayPerMonthByHour.__name__ - def iter_timestamps(self) -> Generator[OneWeekdayDayOneWeekendDayPerMonthByHour, None, None]: + def _iter_timestamps(self) -> Generator[OneWeekdayDayOneWeekendDayPerMonthByHour, None, None]: for month in range(1, 13): for is_weekday in [False, True]: for hour in range(24): diff --git a/src/chronify/sqlalchemy/functions.py b/src/chronify/sqlalchemy/functions.py index 7203d60..6539ec8 100644 --- a/src/chronify/sqlalchemy/functions.py +++ b/src/chronify/sqlalchemy/functions.py @@ -16,7 +16,7 @@ from sqlalchemy import Connection, Engine, Selectable, text from chronify.exceptions import InvalidOperation, InvalidParameter -from chronify.time_configs import DatetimeRange, TimeBaseModel +from chronify.time_configs import DatetimeRangeBase, DatetimeRange, TimeBaseModel from chronify.utils.path_utils import check_overwrite, delete_if_exists, to_path # Copied from Pandas/Polars @@ -72,7 +72,7 @@ def write_database( def _check_one_config_per_datetime_column(configs: Sequence[TimeBaseModel]) -> None: time_col_count = Counter( - [config.time_column for config in configs if isinstance(config, DatetimeRange)] + [config.time_column for config in configs if isinstance(config, DatetimeRangeBase)] ) time_col_dup = {k: v for k, v in time_col_count.items() if v > 1} if len(time_col_dup) > 0: @@ -144,7 +144,7 @@ def _write_to_hive( ) -> None: df2 = df.copy() for config in configs: - if isinstance(config, DatetimeRange): + if isinstance(config, DatetimeRangeBase): if isinstance(df2[config.time_column].dtype, DatetimeTZDtype): # Spark doesn't like ns. That might change in the future. # Pandas might offer a better way to change from ns to us in the future. @@ -155,10 +155,16 @@ def _write_to_hive( elif isinstance(df2[config.time_column].dtype, DateTime64DType): new_dtype = "datetime64[us]" df2[config.time_column] = df2[config.time_column].astype(new_dtype) # type: ignore + else: + new_dtype = "datetime64[us]" + df2[config.time_column] = pd.to_datetime( + df2[config.time_column], utc=False, errors="raise" + ).astype(new_dtype) # type: ignore with NamedTemporaryFile(suffix=".parquet", dir=scratch_dir) as f: f.close() output = Path(f.name) + df2.to_parquet(output) atexit.register(lambda: delete_if_exists(output)) select_stmt = f"SELECT * FROM parquet.`{output}`" @@ -183,7 +189,11 @@ def _read_from_hive( query: Selectable | str, conn: Connection, config: TimeBaseModel, params: Any = None ) -> pd.DataFrame: df = pd.read_sql_query(query, conn, params=params) - if isinstance(config, DatetimeRange) and not config.start_time_is_tz_naive(): + if ( + isinstance(config, DatetimeRange) + and config.time_column in df.columns + and not config.start_time_is_tz_naive() + ): # This is tied to the fact that we set the Spark session to UTC. # Otherwise, there is confusion with the computer's local time zone. df[config.time_column] = df[config.time_column].dt.tz_localize("UTC") diff --git a/src/chronify/store.py b/src/chronify/store.py index c5ecc31..dec92f6 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -3,6 +3,7 @@ import shutil from typing import Any, Optional from chronify.utils.sql import make_temp_view_name +from datetime import tzinfo import duckdb import pandas as pd @@ -49,6 +50,7 @@ from chronify.time_configs import DatetimeRange, IndexTimeRangeBase, TimeBasedDataAdjustment from chronify.time_series_checker import check_timestamps from chronify.time_series_mapper import map_time +from chronify.time_zone_converter import TimeZoneConverter, TimeZoneConverterByColumn from chronify.utils.path_utils import check_overwrite, to_path from chronify.utils.sqlalchemy_view import create_view @@ -870,6 +872,8 @@ def map_table_time_config( config in dst_schema when it does not line up with the time config scratch_dir Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file + If set, write the mapped table to this Parquet file. check_mapped_timestamps Perform time checks on the result of the mapping operation. This can be slow and is not required. @@ -940,6 +944,186 @@ def map_table_time_config( with self._engine.begin() as conn: self._schema_mgr.add_schema(conn, dst_schema) + def convert_time_zone( + self, + src_name: str, + time_zone: tzinfo | None, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> TableSchema: + """ + Convert the time zone of the existing table represented by src_name to a new time zone + + Parameters + ---------- + src_name + Refers to the table name of the source data. + time_zone + Time zone to convert to. + scratch_dir + Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file + If set, write the mapped table to this Parquet file. + check_mapped_timestamps + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Raises + ------ + TableAlreadyExists + Raised if the dst_schema name already exists. + + Examples + -------- + >>> store = Store() + >>> start = datetime(year=2018, month=1, day=1, tzinfo=ZoneInfo("EST")) + >>> freq = timedelta(hours=1) + >>> hours_per_year = 8760 + >>> num_time_arrays = 1 + >>> df = pd.DataFrame( + ... { + ... "id": np.concat( + ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] + ... ), + ... "timestamp": np.tile( + ... pd.date_range(start, periods=hours_per_year, freq="h"), num_time_arrays + ... ), + ... "value": np.random.random(hours_per_year * num_time_arrays), + ... } + ... ) + >>> schema = TableSchema( + ... name="some_data", + ... time_config=DatetimeRange( + ... time_column="timestamp", + ... start=start, + ... length=hours_per_year, + ... resolution=freq, + ... ), + ... time_array_id_columns=["id"], + ... value_column="value", + ... ) + >>> store.ingest_table(df, schema) + >>> to_time_zone = ZoneInfo("US/Mountain") + >>> dst_schema = store.convert_time_zone( + ... schema.name, to_time_zone, check_mapped_timestamps=True + ... ) + """ + + src_schema = self._schema_mgr.get_schema(src_name) + tzc = TimeZoneConverter(self._engine, self._metadata, src_schema, time_zone) + + dst_schema = tzc.generate_to_schema() + if self.has_table(dst_schema.name): + msg = dst_schema.name + raise TableAlreadyExists(msg) + + tzc.convert_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + with self._engine.begin() as conn: + self._schema_mgr.add_schema(conn, dst_schema) + + return dst_schema + + def convert_time_zone_by_column( + self, + src_name: str, + time_zone_column: str, + wrap_time_allowed: bool = False, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> TableSchema: + """ + Convert the time zone of the existing table represented by src_name to new time zone(s) defined by a column + + Parameters + ---------- + src_name + Refers to the table name of the source data. + time_zone_column + Name of the time zone column for conversion. + wrap_time_allowed + Defines whether the time column is allowed to be wrapped to reflect the same time + range as the src_name schema in tz-naive clock time + scratch_dir + Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file + If set, write the mapped table to this Parquet file. + check_mapped_timestamps + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Raises + ------ + TableAlreadyExists + Raised if the dst_schema name already exists. + + Examples + -------- + >>> store = Store() + >>> start = datetime(year=2018, month=1, day=1, tzinfo=ZoneInfo("EST")) + >>> freq = timedelta(hours=1) + >>> hours_per_year = 8760 + >>> num_time_arrays = 3 + >>> df = pd.DataFrame( + ... { + ... "id": np.concat( + ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] + ... ), + ... "timestamp": np.tile( + ... pd.date_range(start, periods=hours_per_year, freq="h"), num_time_arrays + ... ), + ... "time_zone": np.repeat(["US/Eastern", "US/Mountain", "None"], hours_per_year), + ... "value": np.random.random(hours_per_year * num_time_arrays), + ... } + ... ) + >>> schema = TableSchema( + ... name="some_data", + ... time_config=DatetimeRange( + ... time_column="timestamp", + ... start=start, + ... length=hours_per_year, + ... resolution=freq, + ... ), + ... time_array_id_columns=["id"], + ... value_column="value", + ... ) + >>> store.ingest_table(df, schema) + >>> time_zone_column = "time_zone" + >>> dst_schema = store.convert_time_zone_by_column( + ... schema.name, + ... time_zone_column, + ... wrap_time_allowed=False, + ... check_mapped_timestamps=True, + ... ) + """ + + src_schema = self._schema_mgr.get_schema(src_name) + tzc = TimeZoneConverterByColumn( + self._engine, self._metadata, src_schema, time_zone_column, wrap_time_allowed + ) + + dst_schema = tzc.generate_to_schema() + if self.has_table(dst_schema.name): + msg = dst_schema.name + raise TableAlreadyExists(msg) + + tzc.convert_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + with self._engine.begin() as conn: + self._schema_mgr.add_schema(conn, dst_schema) + + return dst_schema + def read_query( self, name: str, diff --git a/src/chronify/time.py b/src/chronify/time.py index 74a9164..08651ef 100644 --- a/src/chronify/time.py +++ b/src/chronify/time.py @@ -11,10 +11,11 @@ class TimeType(StrEnum): """Defines the supported time formats in the load data.""" DATETIME = "datetime" + DATETIME_TZ_COL = "datetime_tz_col" ANNUAL = "annual" INDEX_NTZ = "index_ntz" INDEX_TZ = "index_tz" - INDEX_LOCAL = "index_local" + INDEX_TZ_COL = "index_tz_col" REPRESENTATIVE_PERIOD_NTZ = "representative_period_ntz" REPRESENTATIVE_PERIOD_TZ = "representative_period_tz" YEAR_MONTH_DAY_HOUR_NTZ = "year_month_day_hour" diff --git a/src/chronify/time_configs.py b/src/chronify/time_configs.py index 9e115a0..e14709c 100644 --- a/src/chronify/time_configs.py +++ b/src/chronify/time_configs.py @@ -1,6 +1,7 @@ import abc import logging -from datetime import datetime, timedelta +from zoneinfo import ZoneInfo +from datetime import datetime, timedelta, tzinfo from typing import Union, Literal, Optional from pydantic import Field, field_validator from typing_extensions import Annotated @@ -15,7 +16,7 @@ RepresentativePeriodFormat, list_representative_time_columns, ) - +from chronify.exceptions import InvalidValue, InvalidParameter logger = logging.getLogger(__name__) @@ -58,29 +59,81 @@ def list_time_columns(self) -> list[str]: def get_time_zone_column(self) -> Optional[str]: """Return the column in the table that contains time zone or offset information.""" + @abc.abstractmethod + def get_time_zones(self) -> list[tzinfo | None]: + """Return a list of unique time zones represented by the time column(s).""" + -class DatetimeRange(TimeBaseModel): - """Defines a time range that uses Python datetime instances.""" +class DatetimeRangeBase(TimeBaseModel): + """Defines a time range base class that uses Python datetime instances.""" time_column: str = Field(description="Column in the table that represents time.") - time_type: Literal[TimeType.DATETIME] = TimeType.DATETIME - start: datetime = Field( - description="Start time of the range. If it includes a time zone, the timestamps in " - "the data must be time zone-aware." - ) length: int resolution: timedelta + start: datetime + + def list_time_columns(self) -> list[str]: + return [self.time_column] def start_time_is_tz_naive(self) -> bool: """Return True if the timestamps in the range do not have time zones.""" return self.start.tzinfo is None - def list_time_columns(self) -> list[str]: - return [self.time_column] + +class DatetimeRange(DatetimeRangeBase): + """Defines a time range with a single time zone.""" + + time_type: Literal[TimeType.DATETIME] = TimeType.DATETIME + start: datetime = Field( + description="Start time of the range. If it includes a time zone, the timestamps in " + "the data must be time zone-aware." + ) def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + + +class DatetimeRangeWithTZColumn(DatetimeRangeBase): + """Defines a time range that uses an external time zone column to interpret timestamps.""" + + time_type: Literal[TimeType.DATETIME_TZ_COL] = TimeType.DATETIME_TZ_COL + start: datetime = Field( + description=( + "Start time of the range. If tz-naive, timestamps of different time zones " + "are expected to align in clock time. If tz-aware, timestamps of different " + "time zones are expected to align in real time." + ) + ) + time_zone_column: str = Field( + description="Column in the table that has time zone or offset information." + ) + time_zones: list[tzinfo | ZoneInfo | None] = Field( + description="Unique time zones from the table." + ) + + def get_time_zone_column(self) -> str: + return self.time_zone_column + + def get_time_zones(self) -> list[tzinfo | None]: + return self.time_zones + + @field_validator("time_zones") + @classmethod + def check_duplicated_time_zones(cls, time_zones: list[tzinfo | None]) -> list[tzinfo | None]: + if len(set(time_zones)) < len(time_zones): + msg = f"DatetimeRangeWithTZColumn.time_zones has duplicates: {time_zones}" + raise InvalidValue(msg) + return time_zones + + +DatetimeRanges = Union[ + DatetimeRange, + DatetimeRangeWithTZColumn, +] + class AnnualTimeRange(TimeBaseModel): """Defines a time range that uses years as integers.""" @@ -97,6 +150,9 @@ def list_time_columns(self) -> list[str]: def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + class IndexTimeRangeBase(TimeBaseModel): """Defines a time range in the form of indexes""" @@ -130,12 +186,15 @@ class IndexTimeRangeNTZ(IndexTimeRangeBase): def check_start_timestamp(cls, start_timestamp: datetime) -> datetime: if start_timestamp.tzinfo is not None: msg = "start_timestamp must be tz-naive for IndexTimeRangeNTZ" - raise ValueError(msg) + raise InvalidValue(msg) return start_timestamp def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + class IndexTimeRangeTZ(IndexTimeRangeBase): """Index time that represents tz-aware timestamps of a single time zone. @@ -151,21 +210,24 @@ class IndexTimeRangeTZ(IndexTimeRangeBase): def check_start_timestamp(cls, start_timestamp: datetime) -> datetime: if start_timestamp.tzinfo is None: msg = "start_timestamp must be tz-aware for IndexTimeRangeTZ" - raise ValueError(msg) + raise InvalidValue(msg) return start_timestamp def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + -class IndexTimeRangeLocalTime(IndexTimeRangeBase): - """Index time that reprsents local time relative to a time zone column. +class IndexTimeRangeWithTZColumn(IndexTimeRangeBase): + """Index time that represents local time relative to a time zone column. start_timestamp is tz-naive. Used for dataset where the timeseries for all geographies start at the same clock time. """ - time_type: Literal[TimeType.INDEX_LOCAL] = TimeType.INDEX_LOCAL + time_type: Literal[TimeType.INDEX_TZ_COL] = TimeType.INDEX_TZ_COL time_zone_column: str = Field( description="Column in the table that has time zone or offset information." ) @@ -174,18 +236,21 @@ class IndexTimeRangeLocalTime(IndexTimeRangeBase): @classmethod def check_start_timestamp(cls, start_timestamp: datetime) -> datetime: if start_timestamp.tzinfo is not None: - msg = "start_timestamp must be tz-naive for IndexTimeRangeLocalTime" - raise ValueError(msg) + msg = "start_timestamp must be tz-naive for IndexTimeRangeWithTZColumn" + raise InvalidValue(msg) return start_timestamp def get_time_zone_column(self) -> str: return self.time_zone_column + def get_time_zones(self) -> list[tzinfo | None]: + return [] # Issue 57 + IndexTimeRanges = Union[ IndexTimeRangeNTZ, IndexTimeRangeTZ, - IndexTimeRangeLocalTime, + IndexTimeRangeWithTZColumn, ] @@ -206,6 +271,9 @@ class RepresentativePeriodTimeNTZ(RepresentativePeriodTimeBase): def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + class RepresentativePeriodTimeTZ(RepresentativePeriodTimeBase): """Defines a tz-aware representative time dimension that covers one full year of time.""" @@ -218,6 +286,9 @@ class RepresentativePeriodTimeTZ(RepresentativePeriodTimeBase): def get_time_zone_column(self) -> str: return self.time_zone_column + def get_time_zones(self) -> list[tzinfo | None]: + return [] # Issue 57 + class ColumnRepresentativeBase(TimeBaseModel): """Base class for time formats that use multiple integer columns to represent time.""" @@ -261,7 +332,7 @@ class YearMonthDayPeriodTimeNTZ(ColumnRepresentativeBase): def one_hour_column(cls, value: list[str]) -> list[str]: if len(value) != 1: msg = "YearMonthDayPeriodTimeNTZ requires exactly one hour column." - raise ValueError(msg) + raise InvalidParameter(msg) return value def list_time_columns(self) -> list[str]: @@ -270,6 +341,9 @@ def list_time_columns(self) -> list[str]: def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + @property def unique_timestamps_length(self) -> int: return int(self.length / 24) @@ -302,6 +376,9 @@ def list_time_columns(self) -> list[str]: def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + @classmethod def default_config(cls, length: int, year: int) -> "YearMonthDayHourTimeNTZ": return cls( @@ -325,6 +402,9 @@ def list_time_columns(self) -> list[str]: def get_time_zone_column(self) -> None: return None + def get_time_zones(self) -> list[tzinfo | None]: + return [] + @classmethod def default_config(cls, length: int, year: int) -> "MonthDayHourTimeNTZ": return cls( @@ -350,9 +430,10 @@ def default_config(cls, length: int, year: int) -> "MonthDayHourTimeNTZ": Union[ AnnualTimeRange, DatetimeRange, + DatetimeRangeWithTZColumn, IndexTimeRangeNTZ, IndexTimeRangeTZ, - IndexTimeRangeLocalTime, + IndexTimeRangeWithTZColumn, RepresentativePeriodTimeNTZ, RepresentativePeriodTimeTZ, YearMonthDayPeriodTimeNTZ, diff --git a/src/chronify/time_range_generator_base.py b/src/chronify/time_range_generator_base.py index b1065fe..a009973 100644 --- a/src/chronify/time_range_generator_base.py +++ b/src/chronify/time_range_generator_base.py @@ -1,5 +1,5 @@ import abc -from typing import Any, Generator +from typing import Any import pandas as pd @@ -8,20 +8,16 @@ class TimeRangeGeneratorBase(abc.ABC): """Base class for classes that generate time ranges.""" @abc.abstractmethod - def iter_timestamps(self) -> Generator[Any, None, None]: - """Return an iterator over all time indexes in the table. - Type of the time is dependent on the class. - """ - def list_timestamps(self) -> list[Any]: """Return a list of timestamps for a time range. Type of the timestamps depends on the class. + Note: For DatetimeRangeGeneratorExternalTimeZone class with more than one time zone, + this shows all timestamps across all time zones in the order of the time zones. Returns ------- list[Any] """ - return list(self.iter_timestamps()) @abc.abstractmethod def list_distinct_timestamps_from_dataframe(self, df: pd.DataFrame) -> list[Any]: diff --git a/src/chronify/time_range_generator_factory.py b/src/chronify/time_range_generator_factory.py index bbdf4fb..d20de0a 100644 --- a/src/chronify/time_range_generator_factory.py +++ b/src/chronify/time_range_generator_factory.py @@ -3,6 +3,7 @@ from chronify.time_configs import ( AnnualTimeRange, DatetimeRange, + DatetimeRangeWithTZColumn, IndexTimeRangeBase, RepresentativePeriodTimeBase, TimeBaseModel, @@ -10,7 +11,10 @@ ) from chronify.time import LeapDayAdjustmentType from chronify.annual_time_range_generator import AnnualTimeRangeGenerator -from chronify.datetime_range_generator import DatetimeRangeGenerator +from chronify.datetime_range_generator import ( + DatetimeRangeGenerator, + DatetimeRangeGeneratorExternalTimeZone, +) from chronify.index_time_range_generator import IndexTimeRangeGenerator from chronify.representative_time_range_generator import RepresentativePeriodTimeGenerator from chronify.time_range_generator_base import TimeRangeGeneratorBase @@ -24,6 +28,10 @@ def make_time_range_generator( match model: case DatetimeRange(): return DatetimeRangeGenerator(model, leap_day_adjustment=leap_day_adjustment) + case DatetimeRangeWithTZColumn(): + return DatetimeRangeGeneratorExternalTimeZone( + model, leap_day_adjustment=leap_day_adjustment + ) case AnnualTimeRange(): return AnnualTimeRangeGenerator(model) case IndexTimeRangeBase(): diff --git a/src/chronify/time_series_checker.py b/src/chronify/time_series_checker.py index e09f0eb..17c94c6 100644 --- a/src/chronify/time_series_checker.py +++ b/src/chronify/time_series_checker.py @@ -1,13 +1,17 @@ from sqlalchemy import Connection, Table, select, text from typing import Optional +from datetime import datetime, tzinfo import pandas as pd from chronify.exceptions import InvalidTable from chronify.models import TableSchema +from chronify.time_configs import DatetimeRangeWithTZColumn from chronify.sqlalchemy.functions import read_database from chronify.time_range_generator_factory import make_time_range_generator +from chronify.datetime_range_generator import DatetimeRangeGeneratorExternalTimeZone from chronify.time import LeapDayAdjustmentType +from chronify.time_utils import is_prevailing_time_zone def check_timestamps( @@ -44,7 +48,21 @@ def check_timestamps(self) -> None: self._check_null_consistency() self._check_expected_timestamps_by_time_array(count) + @staticmethod + def _has_prevailing_time_zone(lst: list[tzinfo | None]) -> bool: + for tz in lst: + if is_prevailing_time_zone(tz): + return True + return False + def _check_expected_timestamps(self) -> int: + """Check that the timestamps in the table match the expected timestamps.""" + if isinstance(self._time_generator, DatetimeRangeGeneratorExternalTimeZone): + return self._check_expected_timestamps_with_external_time_zone() + return self._check_expected_timestamps_datetime() + + def _check_expected_timestamps_datetime(self) -> int: + """For tz-naive or tz-aware time without external time zone column""" expected = self._time_generator.list_timestamps() time_columns = self._time_generator.list_time_columns() stmt = select(*(self._table.c[x] for x in time_columns)).distinct() @@ -52,9 +70,42 @@ def _check_expected_timestamps(self) -> int: stmt = stmt.where(self._table.c[col].is_not(None)) df = read_database(stmt, self._conn, self._schema.time_config) actual = self._time_generator.list_distinct_timestamps_from_dataframe(df) + expected = sorted(set(expected)) # drop duplicates for tz-naive prevailing time check_timestamp_lists(actual, expected) return len(expected) + def _check_expected_timestamps_with_external_time_zone(self) -> int: + """For tz-naive time with external time zone column""" + assert isinstance(self._time_generator, DatetimeRangeGeneratorExternalTimeZone) # for mypy + expected_dct = self._time_generator.list_timestamps_by_time_zone() + time_columns = self._time_generator.list_time_columns() + assert isinstance(self._schema.time_config, DatetimeRangeWithTZColumn) # for mypy + time_columns.append(self._schema.time_config.get_time_zone_column()) + stmt = select(*(self._table.c[x] for x in time_columns)).distinct() + for col in time_columns: + stmt = stmt.where(self._table.c[col].is_not(None)) + df = read_database(stmt, self._conn, self._schema.time_config) + actual_dct = self._time_generator.list_distinct_timestamps_by_time_zone_from_dataframe(df) + + if sorted(expected_dct.keys()) != sorted(actual_dct.keys()): + msg = ( + "Time zone records do not match between expected and actual from table " + f"\nexpected: {sorted(expected_dct.keys())} vs. \nactual: {sorted(actual_dct.keys())}" + ) + raise InvalidTable(msg) + + assert len(expected_dct) > 0 # for mypy + count = set() + for tz_name in expected_dct.keys(): + count.add(len(expected_dct[tz_name])) + # drops duplicates for tz-naive prevailing time + expected = sorted(set(expected_dct[tz_name])) + actual = actual_dct[tz_name] + check_timestamp_lists(actual, expected, msg_prefix=f"For {tz_name}\n") + # return len by preserving duplicates for tz-naive prevailing time + assert len(count) == 1, "Mismatch in counts among time zones" + return count.pop() + def _check_null_consistency(self) -> None: # If any time column has a NULL, all time columns must have a NULL. time_columns = self._time_generator.list_time_columns() @@ -79,6 +130,14 @@ def _check_null_consistency(self) -> None: raise InvalidTable(msg) def _check_expected_timestamps_by_time_array(self, count: int) -> None: + if isinstance( + self._time_generator, DatetimeRangeGeneratorExternalTimeZone + ) and self._has_prevailing_time_zone(self._schema.time_config.get_time_zones()): + # cannot check counts by timestamps when tz-naive prevailing time zones are present + has_tz_naive_prevailing = True + else: + has_tz_naive_prevailing = False + id_cols = ",".join(self._schema.time_array_id_columns) time_cols = ",".join(self._schema.time_config.list_time_columns()) # NULL consistency was checked above. @@ -137,7 +196,20 @@ def _check_expected_timestamps_by_time_array(self, count: int) -> None: for result in self._conn.execute(text(query)).fetchall(): distinct_count_by_ta = result[0] count_by_ta = result[1] - if not count_by_ta == count == distinct_count_by_ta: + + if has_tz_naive_prevailing and not count_by_ta == count: + id_vals = result[2:] + values = ", ".join( + f"{x}={y}" for x, y in zip(self._schema.time_array_id_columns, id_vals) + ) + msg = ( + f"The count of time values in each time array must be {count}." + f"Time array identifiers: {values}. " + f"count = {count_by_ta}" + ) + raise InvalidTable(msg) + + if not has_tz_naive_prevailing and not count_by_ta == count == distinct_count_by_ta: id_vals = result[2:] values = ", ".join( f"{x}={y}" for x, y in zip(self._schema.time_array_id_columns, id_vals) @@ -151,13 +223,16 @@ def _check_expected_timestamps_by_time_array(self, count: int) -> None: raise InvalidTable(msg) -def check_timestamp_lists(actual: list[pd.Timestamp], expected: list[pd.Timestamp]) -> None: +def check_timestamp_lists( + actual: list[pd.Timestamp] | list[datetime], + expected: list[pd.Timestamp] | list[datetime], + msg_prefix: str = "", +) -> None: match = actual == expected + msg = msg_prefix if not match: if len(actual) != len(expected): - msg = f"Mismatch number of timestamps: actual: {len(actual)} vs. expected: {len(expected)}\n" - else: - msg = "" + msg += f"Mismatch number of timestamps: actual: {len(actual)} vs. expected: {len(expected)}\n" missing = set(expected).difference(set(actual)) extra = set(actual).difference(set(expected)) msg += "Actual timestamps do not match expected timestamps. \n" diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index 073c526..127d9db 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -15,7 +15,7 @@ write_query_to_parquet, ) from chronify.models import TableSchema, MappingTableSchema -from chronify.exceptions import ConflictingInputsError +from chronify.exceptions import ConflictingInputsError, InvalidOperation from chronify.utils.sqlalchemy_table import create_table from chronify.time_series_checker import check_timestamps from chronify.time import TimeIntervalType, ResamplingOperationType, AggregationType @@ -112,7 +112,6 @@ def apply_mapping( scratch_dir=scratch_dir, ) metadata.reflect(engine, views=True) - created_tmp_view = False try: _apply_mapping( @@ -209,20 +208,15 @@ def _apply_mapping( # select_stmt.append(func.max(tval_col).label(val_col)) case _: msg = f"Unsupported {resampling_operation=}" - raise ValueError(msg) - - keys = from_schema.time_config.list_time_columns() - # check time_zone - tz_col = from_schema.time_config.get_time_zone_column() - if tz_col is not None: - keys.append(tz_col) - assert tz_col in left_table_columns, f"{tz_col} not in table={from_schema.name}" - ftz_col = "from_" + tz_col - assert ( - ftz_col in right_table_columns - ), f"{ftz_col} not in mapping table={mapping_table_name}" + raise InvalidOperation(msg) + from_keys = [x for x in right_table_columns if x.startswith("from_")] + keys = [x.removeprefix("from_") for x in from_keys] + assert set(keys).issubset( + set(left_table_columns) + ), f"Keys {keys} not in table={from_schema.name}" on_stmt = reduce(and_, (left_table.c[x] == right_table.c["from_" + x] for x in keys)) + query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt) if resampling_operation: query = query.group_by(*groupby_stmt) diff --git a/src/chronify/time_series_mapper_column_representative_to_datetime.py b/src/chronify/time_series_mapper_column_representative_to_datetime.py index 166c3a2..ed549de 100644 --- a/src/chronify/time_series_mapper_column_representative_to_datetime.py +++ b/src/chronify/time_series_mapper_column_representative_to_datetime.py @@ -195,7 +195,7 @@ def _create_intermediate_ymdh_schema( def _iter_datetime(self) -> Generator[datetime, None, None]: datetime_generator = DatetimeRangeGenerator(self._to_time_config) - yield from datetime_generator.iter_timestamps() + yield from datetime_generator._iter_timestamps() def _create_ymdh_mapping( self, col_names: list[str] = ["year", "month", "day", "hour"] @@ -232,7 +232,7 @@ def mdh_from_datetime(timestamp: datetime) -> tuple[int, int, int]: return timestamp.month, timestamp.day, timestamp.hour + 1 -def generate_period_mapping(periods: pd.Series) -> pd.DataFrame: +def generate_period_mapping(periods: "pd.Series[str]") -> pd.DataFrame: unique_periods = periods.unique() mappings = [] for period_str in unique_periods: diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index d4dd392..2d70551 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -10,7 +10,11 @@ from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping from chronify.time_configs import DatetimeRange, TimeBasedDataAdjustment from chronify.time_range_generator_factory import make_time_range_generator -from chronify.time_utils import roll_time_interval, wrap_timestamps, get_standard_time_zone +from chronify.time_utils import ( + rolled_interval_timestamps, + wrapped_time_timestamps, + get_standard_time_zone, +) logger = logging.getLogger(__name__) @@ -110,14 +114,19 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: ser_from = ser_from.dt.tz_convert(fm_tz_std).dt.tz_localize(to_tz) match (self._adjust_interval, self._wrap_time_allowed): case (True, _): - ser = roll_time_interval( - ser_from, - self._from_time_config.interval_type, - self._to_time_config.interval_type, - to_time_data, + ser = pd.Series( + rolled_interval_timestamps( + ser_from.tolist(), + self._from_time_config.interval_type, + self._to_time_config.interval_type, + to_time_data, + ), + index=ser_from.index, ) case (False, True): - ser = wrap_timestamps(ser_from, to_time_data) + ser = pd.Series( + wrapped_time_timestamps(ser_from.tolist(), to_time_data), index=ser_from.index + ) case (False, False): ser = ser_from @@ -131,8 +140,7 @@ def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: assert ( df[to_time_col].nunique() == self._to_time_config.length ), "to_time_col does not have the right number of timestamps" - from_time_config = self._from_time_config.model_copy() - from_time_config.time_column = from_time_col + from_time_config = self._from_time_config.model_copy(update={"time_column": from_time_col}) mapping_schema = MappingTableSchema( name="mapping_table", time_configs=[ diff --git a/src/chronify/time_series_mapper_index_time.py b/src/chronify/time_series_mapper_index_time.py index 4f664ef..34c4094 100644 --- a/src/chronify/time_series_mapper_index_time.py +++ b/src/chronify/time_series_mapper_index_time.py @@ -14,7 +14,7 @@ DatetimeRange, IndexTimeRanges, IndexTimeRangeBase, - IndexTimeRangeLocalTime, + IndexTimeRangeWithTZColumn, TimeBasedDataAdjustment, ) from chronify.time_range_generator_factory import make_time_range_generator @@ -77,7 +77,7 @@ def map_time( self.check_schema_consistency() # Convert from index time to its represented datetime - if self._from_time_config.time_type == TimeType.INDEX_LOCAL: + if self._from_time_config.time_type == TimeType.INDEX_TZ_COL: if ( self._dst_adjustment == DaylightSavingAdjustmentType.DROP_SPRING_FORWARD_DUPLICATE_FALLBACK @@ -190,8 +190,7 @@ def _create_interm_map(self) -> tuple[pd.DataFrame, MappingTableSchema, TableSch from_time_col = "from_" + self._from_time_config.time_column from_time_data = make_time_range_generator(self._from_time_config).list_timestamps() - from_time_config = self._from_time_config.model_copy() - from_time_config.time_column = from_time_col + from_time_config = self._from_time_config.model_copy(update={"time_column": from_time_col}) mapping_schema = MappingTableSchema( name="mapping_table", time_configs=[from_time_config, mapped_schema.time_config], @@ -208,7 +207,7 @@ def _create_interm_map(self) -> tuple[pd.DataFrame, MappingTableSchema, TableSch def _create_interm_map_with_time_zone( self, ) -> tuple[pd.DataFrame, MappingTableSchema, TableSchema]: - """Create mapping dataframe for converting INDEX_LOCAL time to its represented datetime""" + """Create mapping dataframe for converting INDEX_TZ_COL time to its represented datetime""" mapped_schema = self._create_intermediate_schema() assert isinstance(mapped_schema.time_config, DatetimeRange) mapped_time_col = mapped_schema.time_config.time_column @@ -217,7 +216,7 @@ def _create_interm_map_with_time_zone( from_time_data = make_time_range_generator(self._from_time_config).list_timestamps() tz_col = self._from_time_config.get_time_zone_column() - assert tz_col is not None, "Expecting a time zone column for INDEX_LOCAL" + assert tz_col is not None, "Expecting a time zone column for INDEX_TZ_COL" from_tz_col = "from_" + tz_col with self._engine.connect() as conn: @@ -225,10 +224,10 @@ def _create_interm_map_with_time_zone( stmt = select(table.c[tz_col]).distinct().where(table.c[tz_col].is_not(None)) time_zones = read_database(stmt, conn, self._from_time_config)[tz_col].to_list() - from_time_config = self._from_time_config.model_copy() - assert isinstance(from_time_config, IndexTimeRangeLocalTime) - from_time_config.time_column = from_time_col - from_time_config.time_zone_column = from_tz_col + from_time_config = self._from_time_config.model_copy( + update={"time_column": from_time_col, "time_zone_column": from_tz_col} + ) + assert isinstance(from_time_config, IndexTimeRangeWithTZColumn) df_tz = [] to_tz = self._to_time_config.start.tzinfo @@ -263,7 +262,7 @@ def _create_interm_map_with_time_zone_and_dst_adjustment( self, interpolate_fallback: bool = False, ) -> tuple[pd.DataFrame, MappingTableSchema, TableSchema]: - """Create mapping dataframe for converting INDEX_LOCAL time to its represented datetime + """Create mapping dataframe for converting INDEX_TZ_COL time to its represented datetime with time-based daylight_saving adjustment that drops the spring-forward hour and, per user input, interpolates or duplicates the fall-back hour @@ -287,7 +286,7 @@ def _create_interm_map_with_time_zone_and_dst_adjustment( df_ntz["clock_time"] = df_ntz["clock_time"].astype(str) tz_col = self._from_time_config.get_time_zone_column() - assert tz_col is not None, "Expecting a time zone column for INDEX_LOCAL" + assert tz_col is not None, "Expecting a time zone column for INDEX_TZ_COL" from_tz_col = "from_" + tz_col with self._engine.connect() as conn: @@ -295,10 +294,10 @@ def _create_interm_map_with_time_zone_and_dst_adjustment( stmt = select(table.c[tz_col]).distinct().where(table.c[tz_col].is_not(None)) time_zones = read_database(stmt, conn, self._from_time_config)[tz_col].to_list() - from_time_config = self._from_time_config.model_copy() - assert isinstance(from_time_config, IndexTimeRangeLocalTime) - from_time_config.time_column = from_time_col - from_time_config.time_zone_column = from_tz_col + from_time_config = self._from_time_config.model_copy( + update={"time_column": from_time_col, "time_zone_column": from_tz_col} + ) + assert isinstance(from_time_config, IndexTimeRangeWithTZColumn) df_tz = [] for time_zone in time_zones: diff --git a/src/chronify/time_series_mapper_representative.py b/src/chronify/time_series_mapper_representative.py index 7670e5b..9c545cd 100644 --- a/src/chronify/time_series_mapper_representative.py +++ b/src/chronify/time_series_mapper_representative.py @@ -19,7 +19,7 @@ RepresentativePeriodTimeBase, TimeBasedDataAdjustment, ) -from chronify.time_utils import shift_time_interval +from chronify.time_utils import shifted_interval_timestamps logger = logging.getLogger(__name__) @@ -93,8 +93,8 @@ def _create_mapping(self, is_tz_naive: bool) -> tuple[pd.DataFrame, MappingTable time_col = "to_" + to_time_col # Mapping works backward for representative time by shifting interval type of # to_time_config to match from_time_config before extracting time info - dft[time_col] = shift_time_interval( - dft[to_time_col], + dft[time_col] = shifted_interval_timestamps( + dft[to_time_col].tolist(), self._to_time_config.interval_type, self._from_time_config.interval_type, ) diff --git a/src/chronify/time_utils.py b/src/chronify/time_utils.py index bb7c6a4..0744c46 100644 --- a/src/chronify/time_utils.py +++ b/src/chronify/time_utils.py @@ -1,9 +1,10 @@ """Functions related to time""" import logging +from numpy.typing import NDArray import numpy as np from datetime import datetime, timedelta, timezone, tzinfo -import zoneinfo +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError import pandas as pd from chronify.time import ( @@ -18,8 +19,8 @@ def adjust_timestamp_by_dst_offset(timestamp: datetime, resolution: timedelta) - """Reduce the timestamps within the daylight saving range by 1 hour. Used to ensure that a time series at daily (or lower) resolution returns each day at the same timestamp in prevailing time, an expected behavior in most standard libraries. - (e.g., ensure a time series can return 2018-03-11 00:00, 2018-03-12 00:00... - instead of 2018-03-11 00:00, 2018-03-12 01:00...) + (e.g., ensure a time series can return 2018-03-11 00:00, 2018-03-12 00:00, 2018-03-13 00:00... + instead of 2018-03-11 00:00, 2018-03-12 01:00, 2018-03-13 01:00...) """ if resolution < timedelta(hours=24): return timestamp @@ -28,36 +29,33 @@ def adjust_timestamp_by_dst_offset(timestamp: datetime, resolution: timedelta) - return timestamp - offset -def shift_time_interval( - ser: "pd.Series[pd.Timestamp]", +def shifted_interval_timestamps( + ts_list: list[datetime], from_interval_type: TimeIntervalType, to_interval_type: TimeIntervalType, -) -> "pd.Series[pd.Timestamp]": - """Shift pandas timeseries by ONE time interval based on interval type. +) -> list[datetime]: + """Shift ts_list by ONE time interval based on interval type. Example: - >>> ser = pd.Series(pd.date_range("2018-12-31 22:00", periods=4, freq="h")) - 0 2018-12-31 22:00:00 - 1 2018-12-31 23:00:00 - 2 2019-01-01 00:00:00 - 3 2019-01-01 01:00:00 - dtype: datetime64[ns] - - >>> ser2 = shift_time_interval( - ... ser, TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING + >>> ts_list = pd.date_range("2018-12-31 23:00", periods=3, freq="h").tolist() + [Timestamp('2018-12-31 23:00:00'), Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00')] + + >>> ts_list2 = shifted_interval_timestamps( + ... ts_list, TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING + ... ) + [Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00'), Timestamp('2019-01-01 02:00:00')] + + >>> ts_list2 = shifted_interval_timestamps( + ... ts_list, TimeIntervalType.PERIOD_ENDING, TimeIntervalType.PERIOD_BEGINNING ... ) - 0 2018-12-31 23:00:00 - 1 2019-01-01 00:00:00 - 2 2019-01-01 01:00:00 - 3 2019-01-01 02:00:00 - dtype: datetime64[ns] + [Timestamp('2018-12-31 22:00:00'), Timestamp('2018-12-31 23:00:00'), Timestamp('2019-01-01 00:00:00')] """ assert ( from_interval_type != to_interval_type ), f"from_ and to_interval_type are the same: {from_interval_type}" - arr = np.sort(ser) + arr: NDArray[np.datetime64] = np.sort(ts_list) # type: ignore freqs = set((np.roll(arr, -1) - arr)[:-1]) - assert len(freqs), f"Timeseries has more than one frequency, {freqs}" + assert len(freqs) == 1, f"Timeseries must have exactly one frequency, found: {freqs}" freq: np.timedelta64 = next(iter(freqs)) match (from_interval_type, to_interval_type): @@ -70,91 +68,119 @@ def shift_time_interval( case _: msg = f"Cannot handle from {from_interval_type} to {to_interval_type}" raise InvalidParameter(msg) - return ser + freq * mult + ts_list2 = (arr + freq * mult).tolist() + return ts_list2 # type: ignore -def wrap_timestamps( - ser: "pd.Series[pd.Timestamp]", to_timestamps: list[pd.Timestamp] -) -> "pd.Series[pd.Timestamp]": - """Wrap pandas timeseries so it stays within a list of timestamps. +def wrapped_time_timestamps( + ts_list: list[datetime], + to_timestamps: list[datetime], +) -> list[datetime]: + """Returns the replacement timestamps in order to wrap the ts_list into the to_timestamps range. Example: - >>> ser = pd.Series(pd.date_range("2018-12-31 22:00", periods=4, freq="h")) - 0 2018-12-31 22:00:00 - 1 2018-12-31 23:00:00 - 2 2019-01-01 00:00:00 - 3 2019-01-01 01:00:00 - dtype: datetime64[ns] - - >>> to_timestamps = pd.date_range("2019-01-01 00:00", periods=4, freq="h").tolist() - [Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00'), Timestamp('2019-01-01 02:00:00'), Timestamp('2019-01-01 03:00:00')] - - >>> ser2 = wrap_timestamps(ser, to_timestamps) - 0 2019-01-01 02:00:00 - 1 2019-01-01 03:00:00 - 2 2019-01-01 00:00:00 - 3 2019-01-01 01:00:00 - dtype: datetime64[ns] + >>> ts_list = pd.date_range("2018-12-31 23:00", periods=3, freq="h").tolist() + [Timestamp('2018-12-31 23:00:00'), Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00')] + + >>> to_timestamps = pd.date_range("2019-01-01 00:00", periods=3, freq="h").tolist() + [Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00'), Timestamp('2019-01-01 02:00:00')] + + >>> ts_list2 = wrapped_time_timestamps(ts_list, to_timestamps) + [Timestamp('2019-01-01 02:00:00'), Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00')] """ - arr = np.sort(np.array(to_timestamps)) - freqs = set((np.roll(arr, -1) - arr)[:-1]) - assert len(freqs), f"Timeseries has more than one frequency, {freqs}" + to_arr = np.sort(np.array(to_timestamps)) + freqs = set((np.roll(to_arr, -1) - to_arr)[:-1]) + assert len(freqs) == 1, f"Timeseries must have exactly one frequency, found: {freqs}" freq = next(iter(freqs)) - tmin, tmax = arr[0], arr[-1] + tmin, tmax = to_arr[0], to_arr[-1] tdelta = tmax - tmin + freq - ser2 = ser.copy() - lower_cond = ser < tmin + + arr = pd.Series(ts_list) # np.array is not as robust as pd.Series here + arr2 = arr.copy() + lower_cond = arr < tmin if lower_cond.sum() > 0: - ser2.loc[lower_cond] += tdelta - upper_cond = ser > tmax + arr2.loc[lower_cond] += tdelta + upper_cond = arr > tmax if upper_cond.sum() > 0: - ser2.loc[upper_cond] -= tdelta - return ser2 + arr2.loc[upper_cond] -= tdelta + ts_list2 = arr2.tolist() + return ts_list2 # type: ignore -def roll_time_interval( - ser: "pd.Series[pd.Timestamp]", +def rolled_interval_timestamps( + ts_list: list[datetime], from_interval_type: TimeIntervalType, to_interval_type: TimeIntervalType, - to_timestamps: list[pd.Timestamp], -) -> "pd.Series[pd.Timestamp]": - """Roll pandas timeseries by shifting time interval based on interval type and then - wrapping timestamps + to_timestamps: list[datetime], +) -> list[datetime]: + """Roll ts_list by shifting time interval based on interval type and then + wrapping timestamps according to to_timestamps. Example: - >>> ser = pd.Series(pd.date_range("2018-12-31 22:00", periods=4, freq="h")) - 0 2018-12-31 22:00:00 - 1 2018-12-31 23:00:00 - 2 2019-01-01 00:00:00 - 3 2019-01-01 01:00:00 - dtype: datetime64[ns] - - >>> to_timestamps = pd.date_range("2019-01-01 00:00", periods=4, freq="h").tolist() - [Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00'), Timestamp('2019-01-01 02:00:00'), Timestamp('2019-01-01 03:00:00')] - - >>> ser2 = roll_time_interval( - ... ser, TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING, to_timestamps + >>> ts_list = pd.date_range("2019-01-01 00:00", periods=3, freq="h").tolist() # period-ending + [Timestamp('2018-12-31 23:00:00'), Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00')] + + >>> to_timestamps = pd.date_range( + ... "2019-01-01 00:00", periods=3, freq="h" + ... ).tolist() # period-beginning + [Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00'), Timestamp('2019-01-01 02:00:00')] + + >>> ts_list2 = rolled_interval_timestamps( + ... ts_list, + ... TimeIntervalType.PERIOD_ENDING, + ... TimeIntervalType.PERIOD_BEGINNING, + ... to_timestamps, ... ) - 0 2019-01-01 03:00:00 - 1 2019-01-01 00:00:00 - 2 2019-01-01 01:00:00 - 3 2019-01-01 02:00:00 - dtype: datetime64[ns] + [Timestamp('2019-01-01 02:00:00'), Timestamp('2019-01-01 00:00:00'), Timestamp('2019-01-01 01:00:00')] """ - ser = shift_time_interval(ser, from_interval_type, to_interval_type) - ser = wrap_timestamps(ser, to_timestamps) - return ser + ts_list2 = shifted_interval_timestamps(ts_list, from_interval_type, to_interval_type) + ts_list3 = wrapped_time_timestamps(ts_list2, to_timestamps) + return ts_list3 + + +def is_prevailing_time_zone(tz: tzinfo | None) -> bool: + """Check that tz is a prevailing time zone""" + if not tz: + return False + ts1 = datetime(year=2020, month=1, day=1, tzinfo=tz) + ts2 = datetime(year=2020, month=6, day=1, tzinfo=tz) + + return ts1.utcoffset() != ts2.utcoffset() + + +def is_standard_time_zone(tz: tzinfo | None) -> bool: + """Check that tz is a standard time zone""" + if not tz: + return False + ts1 = datetime(year=2020, month=1, day=1, tzinfo=tz) + ts2 = datetime(year=2020, month=6, day=1, tzinfo=tz) + + return ts1.utcoffset() == ts2.utcoffset() def get_standard_time_zone(tz: tzinfo | None) -> tzinfo | None: + """Get the standard time zone counterpart of tz""" ts = datetime(year=2020, month=1, day=1, tzinfo=tz) std_tz_name = ts.tzname() if not std_tz_name: return None try: - return zoneinfo.ZoneInfo(std_tz_name) - except zoneinfo.ZoneInfoNotFoundError: + return ZoneInfo(std_tz_name) + except ZoneInfoNotFoundError: utcoffset = ts.utcoffset() if not utcoffset: return None return timezone(utcoffset) + + +def get_tzname(tz: tzinfo | None) -> str: + """Get the time zone name of tz + Note: except for the tzname extracted from ZoneInfo, + tzname may not be reinstantiated into a tzinfo object + """ + if not tz: + return "None" + if isinstance(tz, ZoneInfo): + return tz.key + ts = datetime(year=2020, month=1, day=1, tzinfo=tz) + return tz.tzname(ts) # type: ignore diff --git a/src/chronify/time_zone_converter.py b/src/chronify/time_zone_converter.py new file mode 100644 index 0000000..115c112 --- /dev/null +++ b/src/chronify/time_zone_converter.py @@ -0,0 +1,401 @@ +import abc +from zoneinfo import ZoneInfo +from datetime import datetime, tzinfo +from sqlalchemy import Engine, MetaData, Table, select +from typing import Optional +from pathlib import Path +import pandas as pd + +from chronify.models import TableSchema, MappingTableSchema +from chronify.time_configs import ( + DatetimeRangeBase, + DatetimeRange, + DatetimeRangeWithTZColumn, + TimeBasedDataAdjustment, +) +from chronify.datetime_range_generator import ( + DatetimeRangeGeneratorExternalTimeZone, +) +from chronify.exceptions import InvalidParameter, MissingValue +from chronify.time_series_mapper_base import apply_mapping +from chronify.time_range_generator_factory import make_time_range_generator +from chronify.sqlalchemy.functions import read_database +from chronify.time import TimeType +from chronify.time_utils import wrapped_time_timestamps, get_tzname + + +def convert_time_zone( + engine: Engine, + metadata: MetaData, + src_schema: TableSchema, + to_time_zone: tzinfo | None, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, +) -> TableSchema: + """Convert time zone of a table to a specified time zone. + + Output timestamp is tz-naive with a new time_zone column added. + + Parameters + ---------- + engine : sqlalchemy.Engine + SQLAlchemy engine. + metadata : sqlalchemy.MetaData + SQLAlchemy metadata. + src_schema : TableSchema + Defines the source table in the database. + to_time_zone : tzinfo or None + Time zone to convert to. If None, convert to tz-naive. + scratch_dir : pathlib.Path, optional + Directory to use for temporary writes. Defaults to the system's tmp filesystem. + output_file : pathlib.Path, optional + If set, write the mapped table to this Parquet file. + check_mapped_timestamps : bool, optional + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Returns + ------- + TableSchema + Schema of output table with converted timestamps. + """ + tzc = TimeZoneConverter(engine, metadata, src_schema, to_time_zone) + tzc.convert_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + return tzc._to_schema + + +def convert_time_zone_by_column( + engine: Engine, + metadata: MetaData, + src_schema: TableSchema, + time_zone_column: str, + wrap_time_allowed: Optional[bool] = False, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, +) -> TableSchema: + """Convert time zone of a table to multiple time zones specified by a column. + Output timestamp is tz-naive, reflecting the local time relative to the time_zone_column. + + Parameters + ---------- + engine : sqlalchemy.Engine + sqlalchemy engine + metadata : sqlalchemy.MetaData + sqlalchemy metadata + src_schema : TableSchema + Defines the source table in the database. + time_zone_column : str + Column name in the source table that contains the time zone information. + wrap_time_allowed : bool + If False, the converted timestamps will be aligned with the original timestamps in real time scale + E.g. 2018-01-01 00:00 ~ 2018-12-31 23:00 in US/Eastern becomes + 2017-12-31 23:00 ~ 2018-12-31 22:00 in US/Central + If True, the converted timestamps will fit into the time range of the src_schema in tz-naive clock time + E.g. 2018-01-01 00:00 ~ 2018-12-31 23:00 in US/Eastern becomes + 2017-12-31 23:00 ~ 2018-12-31 22:00 in US/Central, which is then wrapped such that + no clock time timestamps are in 2017. The final timestamps are: + 2018-12-31 23:00, 2018-01-01 00:00 ~ 2018-12-31 22:00 in US/Central + scratch_dir : pathlib.Path, optional + Directory to use for temporary writes. Default to the system's tmp filesystem. + output_file : pathlib.Path, optional + If set, write the mapped table to this Parquet file. + check_mapped_timestamps : bool, optional + Perform time checks on the result of the mapping operation. This can be slow and + is not required. + + Returns + ------- + dst_schema : TableSchema + schema of output table with converted timestamps + """ + tzc = TimeZoneConverterByColumn( + engine, metadata, src_schema, time_zone_column, wrap_time_allowed + ) + tzc.convert_time_zone( + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + return tzc._to_schema + + +class TimeZoneConverterBase(abc.ABC): + """Base class for time zone conversion of time series data.""" + + def __init__( + self, + engine: Engine, + metadata: MetaData, + from_schema: TableSchema, + ): + self._engine = engine + self._metadata = metadata + self._check_from_schema(from_schema) + self._from_schema = from_schema + + def _check_from_schema(self, from_schema: TableSchema) -> None: + msg = "" + if not isinstance(from_schema.time_config, DatetimeRange): + msg += "Source schema does not have DatetimeRange time config. " + if ( + isinstance(from_schema.time_config, DatetimeRange) + and from_schema.time_config.start_time_is_tz_naive() + ): + msg += ( + "Source schema start_time must be timezone-aware. " + "To convert from timezone-naive to timezone-aware, " + "use the TimeSeriesMapperDatetime.map_time() method instead. " + ) + if msg != "": + raise InvalidParameter(msg) + + @abc.abstractmethod + def generate_to_schema(self) -> TableSchema: + """Generate to_schema based on from_schema""" + + @abc.abstractmethod + def convert_time_zone( + self, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> None: + """Convert time zone of the from_schema""" + + +class TimeZoneConverter(TimeZoneConverterBase): + """Class for time zone conversion of time series data to a specified time zone.""" + + def __init__( + self, + engine: Engine, + metadata: MetaData, + from_schema: TableSchema, + to_time_zone: tzinfo | None, + ): + super().__init__(engine, metadata, from_schema) + self._to_time_zone = to_time_zone + self._to_schema = self.generate_to_schema() + + def generate_to_time_config(self) -> DatetimeRangeWithTZColumn: + assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy + to_time_config = self._from_schema.time_config.model_copy() + if self._to_time_zone: + to_time_config.start = to_time_config.start.astimezone(self._to_time_zone).replace( + tzinfo=None + ) + else: + to_time_config.start = to_time_config.start.replace(tzinfo=None) + time_kwargs = to_time_config.model_dump() + time_kwargs = dict( + filter( + lambda k_v: k_v[0] in DatetimeRangeWithTZColumn.model_fields, + time_kwargs.items(), + ) + ) + time_kwargs["time_type"] = TimeType.DATETIME_TZ_COL + time_kwargs["time_zone_column"] = "time_zone" + time_kwargs["time_zones"] = [self._to_time_zone] + return DatetimeRangeWithTZColumn(**time_kwargs) + + def generate_to_schema(self) -> TableSchema: + to_time_config = self.generate_to_time_config() + id_cols = self._from_schema.time_array_id_columns + if to_time_config.time_zone_column not in id_cols: + id_cols.append(to_time_config.time_zone_column) + to_schema: TableSchema = self._from_schema.model_copy( + update={ + "name": f"{self._from_schema.name}_tz_converted", + "time_config": to_time_config, + "time_array_id_columns": id_cols, + } + ) + return to_schema + + def convert_time_zone( + self, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> None: + df, mapping_schema = self._create_mapping() + + apply_mapping( + df, + mapping_schema, + self._from_schema, + self._to_schema, + self._engine, + self._metadata, + TimeBasedDataAdjustment(), + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: + """Create mapping dataframe for converting datetime to geography-based time zone""" + assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy + time_col = self._from_schema.time_config.time_column + from_time_col = "from_" + time_col + from_time_data = make_time_range_generator(self._from_schema.time_config).list_timestamps() + to_time_generator = make_time_range_generator(self._to_schema.time_config) + assert isinstance(to_time_generator, DatetimeRangeGeneratorExternalTimeZone) # mypy + to_time_data_dct = to_time_generator.list_timestamps_by_time_zone() + + from_time_config = self._from_schema.time_config.model_copy( + update={"time_column": from_time_col} + ) + to_time_config = self._to_schema.time_config + assert isinstance(to_time_config, DatetimeRangeWithTZColumn) # mypy + tz_col = to_time_config.time_zone_column + tz_name = get_tzname(self._to_time_zone) + to_time_data = to_time_data_dct[tz_name] + df = pd.DataFrame( + { + from_time_col: from_time_data, + tz_col: tz_name, + time_col: to_time_data, + } + ) + + mapping_schema = MappingTableSchema( + name="mapping_table_gtz_conversion", + time_configs=[from_time_config, to_time_config], + ) + return df, mapping_schema + + +class TimeZoneConverterByColumn(TimeZoneConverterBase): + """Class for time zone conversion of time series data based on a time zone column.""" + + def __init__( + self, + engine: Engine, + metadata: MetaData, + from_schema: TableSchema, + time_zone_column: str, + wrap_time_allowed: Optional[bool] = False, + ): + if time_zone_column not in from_schema.time_array_id_columns: + msg = f"{time_zone_column=} is missing from {from_schema.time_array_id_columns=}" + raise MissingValue(msg) + super().__init__(engine, metadata, from_schema) + self.time_zone_column = time_zone_column + self._wrap_time_allowed = wrap_time_allowed + self._to_schema = self.generate_to_schema() + + def generate_to_time_config(self) -> DatetimeRangeBase: + assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy + to_time_config = self._from_schema.time_config.model_copy() + if self._wrap_time_allowed: + to_time_config.start = to_time_config.start.replace(tzinfo=None) + time_kwargs = to_time_config.model_dump() + time_kwargs = dict( + filter( + lambda k_v: k_v[0] in DatetimeRangeWithTZColumn.model_fields, + time_kwargs.items(), + ) + ) + time_kwargs["time_type"] = TimeType.DATETIME_TZ_COL + time_kwargs["time_zone_column"] = self.time_zone_column + time_kwargs["time_zones"] = self._get_time_zones() + return DatetimeRangeWithTZColumn(**time_kwargs) + + def generate_to_schema(self) -> TableSchema: + id_cols = self._from_schema.time_array_id_columns + if "time_zone" not in id_cols: + id_cols.append("time_zone") + to_schema: TableSchema = self._from_schema.model_copy( + update={ + "name": f"{self._from_schema.name}_tz_converted", + "time_config": self.generate_to_time_config(), + "time_array_id_columns": id_cols, + } + ) + return to_schema + + def convert_time_zone( + self, + scratch_dir: Optional[Path] = None, + output_file: Optional[Path] = None, + check_mapped_timestamps: bool = False, + ) -> None: + df, mapping_schema = self._create_mapping() + + apply_mapping( + df, + mapping_schema, + self._from_schema, + self._to_schema, + self._engine, + self._metadata, + TimeBasedDataAdjustment(), + scratch_dir=scratch_dir, + output_file=output_file, + check_mapped_timestamps=check_mapped_timestamps, + ) + + def _get_time_zones(self) -> list[tzinfo | None]: + with self._engine.connect() as conn: + table = Table(self._from_schema.name, self._metadata) + stmt = ( + select(table.c[self.time_zone_column]) + .distinct() + .where(table.c[self.time_zone_column].is_not(None)) + ) + time_zones = read_database(stmt, conn, self._from_schema.time_config)[ + self.time_zone_column + ].to_list() + + time_zones = [None if tz == "None" else ZoneInfo(tz) for tz in time_zones] + return time_zones + + def _create_mapping(self) -> tuple[pd.DataFrame, MappingTableSchema]: + """Create mapping dataframe for converting datetime to column time zones""" + assert isinstance(self._from_schema.time_config, DatetimeRange) # mypy + time_col = self._from_schema.time_config.time_column + from_time_col = "from_" + time_col + from_time_data = make_time_range_generator(self._from_schema.time_config).list_timestamps() + to_time_generator = make_time_range_generator(self._to_schema.time_config) + assert isinstance(to_time_generator, DatetimeRangeGeneratorExternalTimeZone) # mypy + to_time_data_dct = to_time_generator.list_timestamps_by_time_zone() + + from_tz_col = "from_" + self.time_zone_column + from_time_config = self._from_schema.time_config.model_copy( + update={"time_column": from_time_col} + ) + to_time_config = self._to_schema.time_config + + df_tz = [] + for tz_name, time_data in to_time_data_dct.items(): + to_time_data: list[datetime] | list[pd.Timestamp] + if self._wrap_time_allowed: + # assume it is being wrapped based on the tz-naive version of the original time data + final_time_data = [x.replace(tzinfo=None) for x in from_time_data] + to_time_data = wrapped_time_timestamps(time_data, final_time_data) + else: + to_time_data = time_data + df_tz.append( + pd.DataFrame( + { + from_time_col: from_time_data, + from_tz_col: tz_name, + time_col: to_time_data, + } + ) + ) + df = pd.concat(df_tz, ignore_index=True) + + mapping_schema = MappingTableSchema( + name="mapping_table_gtz_conversion", + time_configs=[from_time_config, to_time_config], + ) + return df, mapping_schema diff --git a/src/chronify/utils/sqlalchemy_view.py b/src/chronify/utils/sqlalchemy_view.py index b7eec2f..bf4c495 100644 --- a/src/chronify/utils/sqlalchemy_view.py +++ b/src/chronify/utils/sqlalchemy_view.py @@ -53,9 +53,17 @@ def create_view( sa.event.listen( metadata, "after_create", - CreateView(name, selectable).execute_if(callable_=_view_doesnt_exist), # type: ignore + CreateView(name, selectable).execute_if( + callable_=_view_doesnt_exist # type: ignore + ), + ) + sa.event.listen( + metadata, + "before_drop", + DropView(name).execute_if( + callable_=_view_exists # type: ignore + ), ) - sa.event.listen(metadata, "before_drop", DropView(name).execute_if(callable_=_view_exists)) # type: ignore metadata.create_all(engine) metadata.reflect(engine, views=True) return view diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py index ba9fa00..3d144e9 100644 --- a/tests/test_mapper_datetime_to_datetime.py +++ b/tests/test_mapper_datetime_to_datetime.py @@ -1,4 +1,5 @@ from zoneinfo import ZoneInfo +from datetime import tzinfo import pytest from datetime import datetime, timedelta from typing import Any @@ -14,11 +15,15 @@ from chronify.time import TimeIntervalType, MeasurementType from chronify.exceptions import ConflictingInputsError, InvalidParameter from chronify.datetime_range_generator import DatetimeRangeGenerator -from chronify.time_utils import shift_time_interval, roll_time_interval, wrap_timestamps +from chronify.time_utils import ( + shifted_interval_timestamps, + rolled_interval_timestamps, + wrapped_time_timestamps, +) def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: # type: ignore - return pd.to_datetime(list(DatetimeRangeGenerator(time_config).iter_timestamps())) + return pd.to_datetime(list(DatetimeRangeGenerator(time_config)._iter_timestamps())) def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: @@ -31,7 +36,7 @@ def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: def get_datetime_schema( - year: int, tzinfo: ZoneInfo | None, interval_type: TimeIntervalType, name: str + year: int, tzinfo: tzinfo | None, interval_type: TimeIntervalType, name: str ) -> TableSchema: start = datetime(year=year, month=1, day=1, tzinfo=tzinfo) end = datetime(year=year + 1, month=1, day=1, tzinfo=tzinfo) @@ -133,19 +138,19 @@ def test_roll_time_using_shift_and_wrap() -> None: to_schema = get_datetime_schema(2024, None, TimeIntervalType.PERIOD_BEGINNING, "to_table") data = generate_datetime_data(to_schema.time_config) - df["rolled"] = roll_time_interval( - df[from_schema.time_config.time_column], + df["rolled"] = rolled_interval_timestamps( + df[from_schema.time_config.time_column].tolist(), from_schema.time_config.interval_type, to_schema.time_config.interval_type, data, ) - df["rolled2"] = shift_time_interval( - df[from_schema.time_config.time_column], + df["rolled2"] = shifted_interval_timestamps( + df[from_schema.time_config.time_column].tolist(), from_schema.time_config.interval_type, to_schema.time_config.interval_type, ) - df["rolled2"] = wrap_timestamps( - df["rolled2"], + df["rolled2"] = wrapped_time_timestamps( + df["rolled2"].tolist(), data, ) assert df["rolled"].equals(df["rolled2"]) @@ -155,7 +160,7 @@ def test_roll_time_using_shift_and_wrap() -> None: @pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) def test_time_interval_shift( iter_engines: Engine, - tzinfo: ZoneInfo | None, + tzinfo: tzinfo | None, ) -> None: from_schema = get_datetime_schema( 2020, tzinfo, TimeIntervalType.PERIOD_BEGINNING, "from_table" @@ -171,7 +176,7 @@ def test_time_interval_shift( @pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) def test_time_interval_shift_different_time_ranges( iter_engines: Engine, - tzinfo: ZoneInfo | None, + tzinfo: tzinfo | None, ) -> None: from_schema = get_datetime_schema( 2020, tzinfo, TimeIntervalType.PERIOD_BEGINNING, "from_table" @@ -194,7 +199,7 @@ def test_time_interval_shift_different_time_ranges( ], ) def test_time_shift_different_timezones( - iter_engines: Engine, tzinfo_tuple: tuple[ZoneInfo | None] + iter_engines: Engine, tzinfo_tuple: tuple[tzinfo | None] ) -> None: from_schema = get_datetime_schema( 2020, tzinfo_tuple[0], TimeIntervalType.PERIOD_BEGINNING, "from_table" diff --git a/tests/test_mapper_index_time_to_datetime.py b/tests/test_mapper_index_time_to_datetime.py index 87bf19d..580537e 100644 --- a/tests/test_mapper_index_time_to_datetime.py +++ b/tests/test_mapper_index_time_to_datetime.py @@ -11,7 +11,7 @@ DatetimeRange, IndexTimeRangeNTZ, IndexTimeRangeTZ, - IndexTimeRangeLocalTime, + IndexTimeRangeWithTZColumn, TimeBasedDataAdjustment, ) from chronify.exceptions import ConflictingInputsError @@ -118,7 +118,7 @@ def data_for_unaligned_time_mapping( ] ) - time_config = IndexTimeRangeLocalTime( + time_config = IndexTimeRangeWithTZColumn( start=1, length=time_array_len, start_timestamp=pd.Timestamp("2018-01-01 00:00"), diff --git a/tests/test_mapper_representative_time_to_datetime.py b/tests/test_mapper_representative_time_to_datetime.py index f1e141f..06ead60 100644 --- a/tests/test_mapper_representative_time_to_datetime.py +++ b/tests/test_mapper_representative_time_to_datetime.py @@ -1,6 +1,6 @@ from zoneinfo import ZoneInfo import pytest -from datetime import datetime, timedelta +from datetime import datetime, timedelta, tzinfo from typing import Any, Optional import pandas as pd @@ -15,10 +15,10 @@ def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: - return pd.to_datetime(list(DatetimeRangeGenerator(time_config).iter_timestamps())) + return pd.to_datetime(list(DatetimeRangeGenerator(time_config)._iter_timestamps())) -def get_datetime_schema(year: int, tzinfo: ZoneInfo | None) -> TableSchema: +def get_datetime_schema(year: int, tzinfo: tzinfo | None) -> TableSchema: start = datetime(year=year, month=1, day=1, tzinfo=tzinfo) end = datetime(year=year + 1, month=1, day=1, tzinfo=tzinfo) resolution = timedelta(hours=1) diff --git a/tests/test_models.py b/tests/test_models.py index 18a05c3..480931a 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -2,6 +2,7 @@ from sqlalchemy import BigInteger, Boolean, DateTime, Double, Integer, String from chronify.models import ColumnDType, _check_name +from chronify.exceptions import InvalidValue def test_column_dtypes() -> None: @@ -12,10 +13,10 @@ def test_column_dtypes() -> None: for string_type in ("int", "bigint", "bool", "datetime", "float", "str"): ColumnDType(name="col1", dtype=string_type) - with pytest.raises(ValueError): + with pytest.raises(InvalidValue): ColumnDType(name="col1", dtype="invalid") def test_invalid_column_name() -> None: - with pytest.raises(ValueError): + with pytest.raises(InvalidValue): _check_name(name="invalid - name") diff --git a/tests/test_store.py b/tests/test_store.py index 9c5b0cc..e9204a9 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -1,7 +1,7 @@ import fileinput import gc import shutil -from datetime import datetime, timedelta +from datetime import datetime, timedelta, tzinfo from pathlib import Path from zoneinfo import ZoneInfo from itertools import chain @@ -34,7 +34,12 @@ from chronify.models import ColumnDType, CsvTableSchema, PivotedTableSchema, TableSchema from chronify.store import Store from chronify.time import TimeIntervalType, DaylightSavingAdjustmentType -from chronify.time_configs import DatetimeRange, IndexTimeRangeLocalTime, TimeBasedDataAdjustment +from chronify.time_configs import ( + DatetimeRange, + DatetimeRangeWithTZColumn, + IndexTimeRangeWithTZColumn, + TimeBasedDataAdjustment, +) from chronify.time_range_generator_factory import make_time_range_generator from chronify.time_series_checker import check_timestamp_lists from chronify.utils.sql import make_temp_view_name @@ -516,7 +521,7 @@ def test_map_index_time_to_datetime( name="generators_index", time_array_id_columns=["generator", "time_zone"], value_column="value", - time_config=IndexTimeRangeLocalTime( + time_config=IndexTimeRangeWithTZColumn( start=0, length=time_array_len, start_timestamp=pd.Timestamp(f"{year}-01-01 00:00"), @@ -779,3 +784,158 @@ def test_check_timestamps(iter_stores_by_engine: Store, one_week_per_month_by_ho store.check_timestamps(schema.name) with store.engine.begin() as conn: store.check_timestamps(schema.name, connection=conn) + + +@pytest.mark.parametrize("to_time_zone", [ZoneInfo("US/Eastern"), ZoneInfo("US/Mountain"), None]) +def test_convert_time_zone( + tmp_path, iter_stores_by_engine_no_data_ingestion: Store, to_time_zone: tzinfo | None +): + store = iter_stores_by_engine_no_data_ingestion + time_array_len = 8784 + year = 2020 + tzinfo = ZoneInfo("EST") + + src_time_config = DatetimeRange( + start=datetime(year=year, month=1, day=1, hour=0, tzinfo=tzinfo), + resolution=timedelta(hours=1), + length=time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + + src_csv_schema = CsvTableSchema( + time_config=src_time_config, + column_dtypes=[ + ColumnDType(name="timestamp", dtype=DateTime(timezone=False)), + ColumnDType(name="gen1", dtype=Double()), + ColumnDType(name="gen2", dtype=Double()), + ColumnDType(name="gen3", dtype=Double()), + ], + value_columns=["gen1", "gen2", "gen3"], + pivoted_dimension_name="generator", + time_array_id_columns=[], + ) + rel = read_csv(GENERATOR_TIME_SERIES_FILE, src_csv_schema) + rel2 = unpivot(rel, ("gen1", "gen2", "gen3"), "generator", "value") # noqa: F841 + + src_schema = TableSchema( + name="generators_pb", + time_config=src_time_config, + time_array_id_columns=["generator"], + value_column="value", + ) + if store.engine.name == "hive": + out_file = tmp_path / "data.parquet" + rel2.to_df().to_parquet(out_file) + store.create_view_from_parquet(out_file, src_schema) + else: + store.ingest_table(rel2, src_schema) + + if tzinfo is None and store.engine.name != "sqlite": + output_file = tmp_path / "mapped_data" + else: + output_file = None + + dst_schema = store.convert_time_zone( + src_schema.name, to_time_zone, output_file=output_file, check_mapped_timestamps=True + ) + if output_file is None or store.engine.name == "sqlite": + df2 = store.read_table(dst_schema.name) + else: + df2 = pd.read_parquet(output_file) + df2["timestamp"] = pd.to_datetime(df2["timestamp"]) + assert len(df2) == time_array_len * 3 + actual = sorted(df2["timestamp"].unique()) + assert isinstance(dst_schema.time_config, DatetimeRangeWithTZColumn) + if to_time_zone: + expected_start = src_time_config.start.astimezone(to_time_zone).replace(tzinfo=None) + else: + expected_start = src_time_config.start.replace(tzinfo=None) + assert dst_schema.time_config.start == expected_start + assert pd.Timestamp(actual[0]) == dst_schema.time_config.start + expected = make_time_range_generator(dst_schema.time_config).list_timestamps() + expected = sorted(set(expected)) + check_timestamp_lists(actual, expected) + + +@pytest.mark.parametrize("wrapped_time_allowed", [False, True]) +def test_convert_time_zone_by_column( + tmp_path, iter_stores_by_engine_no_data_ingestion: Store, wrapped_time_allowed: bool +): + store = iter_stores_by_engine_no_data_ingestion + time_array_len = 8784 + year = 2020 + tzinfo = ZoneInfo("EST") + + src_time_config = DatetimeRange( + start=datetime(year=year, month=1, day=1, hour=0, tzinfo=tzinfo), + resolution=timedelta(hours=1), + length=time_array_len, + interval_type=TimeIntervalType.PERIOD_BEGINNING, + time_column="timestamp", + ) + + src_csv_schema = CsvTableSchema( + time_config=src_time_config, + column_dtypes=[ + ColumnDType(name="timestamp", dtype=DateTime(timezone=False)), + ColumnDType(name="gen1", dtype=Double()), + ColumnDType(name="gen2", dtype=Double()), + ColumnDType(name="gen3", dtype=Double()), + ], + value_columns=["gen1", "gen2", "gen3"], + pivoted_dimension_name="generator", + time_array_id_columns=[], + ) + rel = read_csv(GENERATOR_TIME_SERIES_FILE, src_csv_schema) + rel2 = unpivot(rel, ("gen1", "gen2", "gen3"), "generator", "value") # noqa: F841 + # add time_zone column + stmt = ", ".join(rel2.columns) + tz_col_stmt = "CASE WHEN generator='gen1' THEN 'US/Eastern' WHEN generator='gen2' THEN 'US/Central' ELSE 'None' END AS time_zone" + stmt += f", {tz_col_stmt}" + rel2 = rel2.project(stmt) + + src_schema = TableSchema( + name="generators_pb", + time_config=src_time_config, + time_array_id_columns=["generator", "time_zone"], + value_column="value", + ) + if store.engine.name == "hive": + out_file = tmp_path / "data.parquet" + rel2.to_df().to_parquet(out_file) + store.create_view_from_parquet(out_file, src_schema) + else: + store.ingest_table(rel2, src_schema) + + if tzinfo is None and store.engine.name != "sqlite": + output_file = tmp_path / "mapped_data" + else: + output_file = None + + dst_schema = store.convert_time_zone_by_column( + src_schema.name, + "time_zone", + output_file=output_file, + wrap_time_allowed=wrapped_time_allowed, + check_mapped_timestamps=True, + ) + if output_file is None or store.engine.name == "sqlite": + df2 = store.read_table(dst_schema.name) + else: + df2 = pd.read_parquet(output_file) + df2["timestamp"] = pd.to_datetime(df2["timestamp"]) + df_stats = df2.groupby(["time_zone"])["timestamp"].agg(["min", "max", "count"]) + assert set(df_stats["count"]) == {time_array_len} + if wrapped_time_allowed: + assert set(df_stats["min"]) == {dst_schema.time_config.start.replace(tzinfo=None)} + else: + assert (df_stats.loc["US/Eastern"] == df_stats.loc["None"]).prod() == 1 + assert df_stats.loc["US/Central", "min"] == dst_schema.time_config.start.astimezone( + ZoneInfo("US/Central") + ).replace(tzinfo=None) + assert isinstance(dst_schema.time_config, DatetimeRangeWithTZColumn) + expected_dct = make_time_range_generator(dst_schema.time_config).list_timestamps_by_time_zone() + for tz, expected in expected_dct.items(): + actual = sorted(df2.loc[df2["time_zone"] == tz, "timestamp"]) + check_timestamp_lists(actual, expected) diff --git a/tests/test_time_series_checker.py b/tests/test_time_series_checker.py index eba800c..cf05c30 100644 --- a/tests/test_time_series_checker.py +++ b/tests/test_time_series_checker.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, tzinfo from typing import Optional from zoneinfo import ZoneInfo @@ -17,42 +17,42 @@ from chronify.time_series_checker import check_timestamps -def test_valid_datetimes_with_tz(iter_engines: Engine): +def test_valid_datetimes_with_tz(iter_engines: Engine) -> None: """Valid timestamps with time zones.""" _run_test(iter_engines, *_get_inputs_for_valid_datetimes_with_tz()) -def test_valid_datetimes_without_tz(iter_engines: Engine): +def test_valid_datetimes_without_tz(iter_engines: Engine) -> None: """Valid timestamps without time zones.""" _run_test(iter_engines, *_get_inputs_for_valid_datetimes_without_tz()) -def test_invalid_datetimes(iter_engines: Engine): +def test_invalid_datetimes(iter_engines: Engine) -> None: """Timestamps do not match the schema.""" _run_test(iter_engines, *_get_inputs_for_incorrect_datetimes()) -def test_invalid_datetime_length(iter_engines: Engine): +def test_invalid_datetime_length(iter_engines: Engine) -> None: """Timestamps do not match the schema.""" _run_test(iter_engines, *_get_inputs_for_incorrect_datetime_length()) -def test_mismatched_time_array_lengths(iter_engines: Engine): +def test_mismatched_time_array_lengths(iter_engines: Engine) -> None: """Some time arrays have different lengths.""" _run_test(iter_engines, *_get_inputs_for_mismatched_time_array_lengths()) -def test_incorrect_lengths(iter_engines: Engine): +def test_incorrect_lengths(iter_engines: Engine) -> None: """All time arrays are consistent but have the wrong length.""" _run_test(iter_engines, *_get_inputs_for_incorrect_lengths()) -def test_incorrect_time_arrays(iter_engines: Engine): +def test_incorrect_time_arrays(iter_engines: Engine) -> None: """The time arrays form a complete set but are individually incorrect.""" _run_test(iter_engines, *_get_inputs_for_incorrect_time_arrays()) -def test_incorrect_time_arrays_with_duplicates(iter_engines: Engine): +def test_incorrect_time_arrays_with_duplicates(iter_engines: Engine) -> None: """The time arrays form a complete set but are individually incorrect.""" _run_test(iter_engines, *_get_inputs_for_incorrect_time_arrays_with_duplicates()) @@ -60,7 +60,7 @@ def test_incorrect_time_arrays_with_duplicates(iter_engines: Engine): def _run_test( engine: Engine, df: pd.DataFrame, - tzinfo: Optional[ZoneInfo], + tzinfo: Optional[tzinfo], length: int, message: Optional[str], ) -> None: diff --git a/tests/test_time_utils.py b/tests/test_time_utils.py new file mode 100644 index 0000000..ed52ad1 --- /dev/null +++ b/tests/test_time_utils.py @@ -0,0 +1,129 @@ +import pytest +import numpy as np +import pandas as pd +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + +from chronify.time_utils import ( + adjust_timestamp_by_dst_offset, + shifted_interval_timestamps, + wrapped_time_timestamps, + rolled_interval_timestamps, + is_prevailing_time_zone, + is_standard_time_zone, + get_standard_time_zone, + get_tzname, +) +from chronify.time import TimeIntervalType + + +def test_adjust_timestamp_by_dst_offset() -> None: + # DST-aware datetime vs standard time zone + tzs = [ZoneInfo("America/New_York"), ZoneInfo("EST")] + hours = [23, 0] + for tz, hour in zip(tzs, hours): + dt = datetime(2020, 7, 1, 0, 0, tzinfo=tz) + res = adjust_timestamp_by_dst_offset(dt, timedelta(days=1)) + assert res.hour == hour + + +def test_shifted_interval_timestamps_period_beginning_to_ending() -> None: + ser = pd.date_range("2018-12-31 22:00", periods=4, freq="h").tolist() + shifted = shifted_interval_timestamps( + ser, + TimeIntervalType.PERIOD_BEGINNING, + TimeIntervalType.PERIOD_ENDING, + ) + assert all(np.array(shifted) == np.array(ser) + pd.Timedelta(hours=1)) + + +def test_shifted_interval_timestamps_period_ending_to_beginning() -> None: + ser = pd.date_range("2018-12-31 22:00", periods=4, freq="h").tolist() + shifted = shifted_interval_timestamps( + ser, + TimeIntervalType.PERIOD_ENDING, + TimeIntervalType.PERIOD_BEGINNING, + ) + assert all(np.array(shifted) == np.array(ser) - pd.Timedelta(hours=1)) + + +def test_shifted_interval_timestamps_invalid() -> None: + ser = pd.date_range("2018-12-31 22:00", periods=4, freq="h").tolist() + with pytest.raises(Exception): + shifted_interval_timestamps( + ser, + TimeIntervalType.PERIOD_BEGINNING, + TimeIntervalType.PERIOD_BEGINNING, + ) + + +def test_wrapped_time_timestamps() -> None: + ser = pd.date_range("2018-12-31 22:00", periods=4, freq="h").tolist() + to_timestamps = pd.date_range("2019-01-01 00:00", periods=4, freq="h").tolist() + wrapped = wrapped_time_timestamps(ser, to_timestamps) + assert set(wrapped) <= set(to_timestamps) + + +def test_rolled_interval_timestamps() -> None: + ser = pd.date_range("2018-12-31 22:00", periods=4, freq="h").tolist() + to_timestamps = pd.date_range("2019-01-01 00:00", periods=4, freq="h").tolist() + rolled = rolled_interval_timestamps( + ser, + TimeIntervalType.PERIOD_BEGINNING, + TimeIntervalType.PERIOD_ENDING, + to_timestamps, + ) + assert set(rolled) <= set(to_timestamps) + + +def test_is_prevailing_time_zone() -> None: + tz = ZoneInfo("America/New_York") + assert is_prevailing_time_zone(tz) is True + assert is_prevailing_time_zone(None) is False + + +def test_is_standard_time_zone() -> None: + tz = timezone(timedelta(hours=0)) + assert is_standard_time_zone(tz) is True + assert is_standard_time_zone(None) is False + + +def test_get_standard_time_zone() -> None: + tzs = [ + ZoneInfo("America/New_York"), + ZoneInfo("EST"), + timezone(timedelta(hours=-5)), + None, + ] + stzs = [ + ZoneInfo("EST"), + ZoneInfo("EST"), + timezone(timedelta(hours=-5)), + None, + ] + for tz, stz in zip(tzs, stzs): + std_tz = get_standard_time_zone(tz) + if tz is None: + assert std_tz is None + continue + assert std_tz == stz + + +def test_get_tzname() -> None: + tzs = [ + ZoneInfo("America/New_York"), + ZoneInfo("EST"), + timezone(timedelta(hours=-5)), + None, + ] + etzs = [ + "America/New_York", + "EST", + "UTC-05:00", + "None", + ] + + for tz, etz in zip(tzs, etzs): + name = get_tzname(tz) + assert isinstance(name, str) + assert name == etz diff --git a/tests/test_time_zone_converter.py b/tests/test_time_zone_converter.py new file mode 100644 index 0000000..3b0230a --- /dev/null +++ b/tests/test_time_zone_converter.py @@ -0,0 +1,215 @@ +from zoneinfo import ZoneInfo +from datetime import datetime, timedelta, tzinfo +import numpy as np +import pytest +from typing import Any + +import pandas as pd +from sqlalchemy import Engine, MetaData + +from chronify.sqlalchemy.functions import read_database, write_database +from chronify.time_zone_converter import ( + TimeZoneConverter, + TimeZoneConverterByColumn, + convert_time_zone, + convert_time_zone_by_column, +) +from chronify.time_configs import DatetimeRange +from chronify.models import TableSchema +from chronify.time import TimeIntervalType +from chronify.datetime_range_generator import DatetimeRangeGenerator +from chronify.exceptions import InvalidParameter + + +def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: # type: ignore + return pd.to_datetime(list(DatetimeRangeGenerator(time_config)._iter_timestamps())) + + +def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: + df = pd.DataFrame({schema.time_config.time_column: generate_datetime_data(schema.time_config)}) + + for i, x in enumerate(schema.time_array_id_columns): + df[x] = i + df[schema.value_column] = np.random.rand(len(df)) + return df + + +def generate_dataframe_with_tz_col(schema: TableSchema) -> pd.DataFrame: + df = generate_datetime_dataframe(schema).drop(columns=["id"]) + time_zones = [ + ZoneInfo("US/Eastern"), + ZoneInfo("US/Central"), + ZoneInfo("US/Mountain"), + None, + ] + time_zones = [tz.key if tz else "None" for tz in time_zones] + dfo = pd.merge( + df, pd.DataFrame({"id": range(len(time_zones)), "time_zone": time_zones}), how="cross" + ) + dfo = ( + dfo.drop(columns=["time_zone_x"]) + .rename(columns={"time_zone_y": "time_zone"}) + .reset_index() + ) + return dfo + + +def get_datetime_schema( + year: int, + tzinfo: tzinfo | None, + interval_type: TimeIntervalType, + name: str, + has_tz_col: bool = False, +) -> TableSchema: + start = datetime(year=year, month=1, day=1, tzinfo=tzinfo) + end = datetime(year=year, month=1, day=2, tzinfo=tzinfo) + resolution = timedelta(hours=1) + length = (end - start) / resolution + 1 + cols = ["id"] + cols += ["time_zone"] if has_tz_col else [] + schema = TableSchema( + name=name, + time_config=DatetimeRange( + start=start, + resolution=resolution, + length=length, + interval_type=interval_type, + time_column="timestamp", + ), + time_array_id_columns=cols, + value_column="value", + ) + return schema + + +def ingest_data( + engine: Engine, + metadata: MetaData, + df: pd.DataFrame, + schema: TableSchema, +) -> None: + with engine.begin() as conn: + write_database(df, conn, schema.name, [schema.time_config], if_table_exists="replace") + metadata.reflect(engine, views=True) + + +def get_mapped_dataframe( + engine: Engine, + table_name: str, + time_config: DatetimeRange, +) -> pd.DataFrame: + with engine.connect() as conn: + query = f"select * from {table_name}" + queried = read_database(query, conn, time_config) + queried = queried.sort_values(by=["id", "timestamp"]).reset_index(drop=True) + return queried + + +def run_conversion( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + to_time_zone: tzinfo | None, +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + to_schema = convert_time_zone( + engine, metadata, from_schema, to_time_zone, check_mapped_timestamps=True + ) + dfo = get_mapped_dataframe(engine, to_schema.name, to_schema.time_config) + assert df["value"].equals(dfo["value"]) + if to_time_zone is None: + expected = df["timestamp"].dt.tz_localize(None) + else: + expected = df["timestamp"].dt.tz_convert(to_time_zone).dt.tz_localize(None) + assert (dfo["timestamp"] == expected).prod() == 1 + + +def run_conversion_to_column_time_zones( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + wrap_time_allowed: bool, +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + to_schema = convert_time_zone_by_column( + engine, + metadata, + from_schema, + "time_zone", + wrap_time_allowed=wrap_time_allowed, + check_mapped_timestamps=True, + ) + dfo = get_mapped_dataframe(engine, to_schema.name, to_schema.time_config) + dfo = dfo[df.columns].sort_values(by="index").reset_index(drop=True) + dfo["timestamp"] = pd.to_datetime(dfo["timestamp"]) # needed for engine 2, not sure why + + assert df["value"].equals(dfo["value"]) + if wrap_time_allowed: + assert set(dfo["timestamp"].value_counts()) == {4} + expected = [x.replace(tzinfo=None) for x in sorted(set(df["timestamp"]))] + assert set(dfo["timestamp"]) == set(expected) + else: + for i in range(len(df)): + tzn = df.loc[i, "time_zone"] + if tzn == "None": + ts = df.loc[i, "timestamp"].replace(tzinfo=None) + else: + ts = df.loc[i, "timestamp"].tz_convert(ZoneInfo(tzn)).replace(tzinfo=None) + assert dfo.loc[i, "timestamp"] == ts + + +def run_conversion_with_error( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + use_tz_col: bool, + error: tuple[Any, str], +) -> None: + metadata = MetaData() + ingest_data(engine, metadata, df, from_schema) + with pytest.raises(error[0], match=error[1]): + if use_tz_col: + tzc = TimeZoneConverterByColumn( + engine, metadata, from_schema, "time_zone", wrap_time_allowed=False + ) + tzc.convert_time_zone(check_mapped_timestamps=True) + else: + tzc2 = TimeZoneConverter(engine, metadata, from_schema, None) + tzc2.convert_time_zone(check_mapped_timestamps=True) + + +def test_src_table_no_time_zone(iter_engines: Engine) -> None: + from_schema = get_datetime_schema(2018, None, TimeIntervalType.PERIOD_BEGINNING, "base_table") + df = generate_datetime_dataframe(from_schema) + error = (InvalidParameter, "Source schema start_time must be timezone-aware") + run_conversion_with_error( + iter_engines, df, from_schema, False, error + ) # TODO, support tz-naive to tz-aware conversion + + +@pytest.mark.parametrize( + "to_time_zone", [None, ZoneInfo("US/Central"), ZoneInfo("America/Los_Angeles")] +) +def test_time_conversion(iter_engines: Engine, to_time_zone: tzinfo | None) -> None: + from_schema = get_datetime_schema( + 2018, ZoneInfo("US/Mountain"), TimeIntervalType.PERIOD_BEGINNING, "base_table" + ) + df = generate_datetime_dataframe(from_schema) + run_conversion(iter_engines, df, from_schema, to_time_zone) + + +@pytest.mark.parametrize("wrap_time_allowed", [False, True]) +def test_time_conversion_to_column_time_zones( + iter_engines: Engine, wrap_time_allowed: bool +) -> None: + from_schema = get_datetime_schema( + 2018, + ZoneInfo("US/Mountain"), + TimeIntervalType.PERIOD_BEGINNING, + "base_table", + has_tz_col=True, + ) + df = generate_dataframe_with_tz_col(from_schema) + run_conversion_to_column_time_zones(iter_engines, df, from_schema, wrap_time_allowed)