diff --git a/.bumpversion.toml b/.bumpversion.toml index 008d20d..3be582c 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "0.15.1" +current_version = "0.15.2" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(?:-(?Prc)(?P0|[1-9]\\d*))?" diff --git a/pyproject.toml b/pyproject.toml index 198c9d3..c33fe56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "otf-api" -version = "0.15.1" +version = "0.15.2" description = "Python OrangeTheory Fitness API Client" authors = [{ name = "Jessica Smith", email = "j.smith.git1@gmail.com" }] requires-python = ">=3.11" @@ -30,6 +30,7 @@ dependencies = [ "diskcache>=5.6.3", "platformdirs>=4.3.6", "packaging>=24.2", + "coloredlogs>=15.0.1", ] [project.urls] diff --git a/source/conf.py b/source/conf.py index e646ff9..c07eb49 100644 --- a/source/conf.py +++ b/source/conf.py @@ -14,7 +14,7 @@ project = "OrangeTheory API" copyright = "2025, Jessica Smith" author = "Jessica Smith" -release = "0.15.1" +release = "0.15.2" # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/src/otf_api/__init__.py b/src/otf_api/__init__.py index 27bf347..8a076b3 100644 --- a/src/otf_api/__init__.py +++ b/src/otf_api/__init__.py @@ -7,13 +7,15 @@ import logging import os +import coloredlogs + from otf_api import models from otf_api.api import Otf from otf_api.auth import OtfUser LOG_LEVEL = os.getenv("OTF_LOG_LEVEL", "INFO").upper() -LOG_LEVEL_NUM = getattr(logging, LOG_LEVEL, logging.INFO) -LOG_FMT = "{asctime} - {module}.{funcName}:{lineno} - {levelname} - {message}" + +LOG_FMT = "%(asctime)s - %(module)s.%(funcName)s:%(lineno)d - %(levelname)s - %(message)s" DATE_FMT = "%Y-%m-%d %H:%M:%S%z" @@ -24,19 +26,28 @@ def _setup_logging() -> None: return # Already set up # 2) Set the logger level to INFO (or whatever you need). - logger.setLevel(LOG_LEVEL_NUM) + logger.setLevel(LOG_LEVEL) # 3) Create a handler (e.g., console) and set its formatter. handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter(fmt=LOG_FMT, datefmt=DATE_FMT, style="{")) + handler.setFormatter(logging.Formatter(fmt=LOG_FMT, datefmt=DATE_FMT, style="%")) # 4) Add this handler to your package logger. logger.addHandler(handler) + coloredlogs.install( + level=LOG_LEVEL, + logger=logger, + fmt=LOG_FMT, + datefmt=DATE_FMT, + style="%", + isatty=True, # Use colored output only if the output is a terminal + ) + _setup_logging() -__version__ = "0.15.1" +__version__ = "0.15.2" __all__ = ["Otf", "OtfUser", "models"] diff --git a/src/otf_api/api/bookings/booking_api.py b/src/otf_api/api/bookings/booking_api.py index 394b9c3..3983651 100644 --- a/src/otf_api/api/bookings/booking_api.py +++ b/src/otf_api/api/bookings/booking_api.py @@ -8,19 +8,19 @@ from otf_api import exceptions as exc from otf_api import models from otf_api.api import utils -from otf_api.api.client import OtfClient from otf_api.models.bookings import HISTORICAL_BOOKING_STATUSES, ClassFilter from .booking_client import BookingClient if typing.TYPE_CHECKING: from otf_api import Otf + from otf_api.api.client import OtfClient LOGGER = getLogger(__name__) class BookingApi: - def __init__(self, otf: "Otf", otf_client: OtfClient): + def __init__(self, otf: "Otf", otf_client: "OtfClient"): """Initialize the Booking API client. Args: @@ -30,6 +30,39 @@ def __init__(self, otf: "Otf", otf_client: OtfClient): self.otf = otf self.client = BookingClient(otf_client) + def _get_all_bookings_new( + self, exclude_cancelled: bool = True, remove_duplicates: bool = True + ) -> list[models.BookingV2]: + """Get bookings from the new endpoint with no date filters. + + This is marked as private to avoid random users calling it. + Useful for testing and validating models. + + Args: + exclude_cancelled (bool): Whether to exclude cancelled bookings. Default is True. + remove_duplicates (bool): Whether to remove duplicate bookings. Default is True. + + Returns: + list[BookingV2]: List of bookings that match the search criteria. + """ + start_date = pendulum.datetime(1970, 1, 1) + end_date = pendulum.today().start_of("day").add(days=45) + return self.get_bookings_new(start_date, end_date, exclude_cancelled, remove_duplicates) + + def _get_all_bookings_new_by_date(self) -> dict[datetime, models.BookingV2]: + """Get all bookings from the new endpoint by date. + + This is marked as private to avoid random users calling it. + Useful for testing and validating models. + + Returns: + dict[datetime, BookingV2]: Dictionary of bookings by date. + """ + start_date = pendulum.datetime(1970, 1, 1) + end_date = pendulum.today().start_of("day").add(days=45) + bookings = self.get_bookings_new_by_date(start_date, end_date) + return bookings + def get_bookings_new( self, start_date: datetime | date | str | None = None, @@ -79,6 +112,7 @@ def get_bookings_new( bookings_resp = self.client.get_bookings_new( ends_before=end_date, starts_after=start_date, include_canceled=include_canceled, expand=expand ) + LOGGER.debug("Found %d bookings between %s and %s", len(bookings_resp), start_date, end_date) # filter out bookings with ids that start with "no-booking-id" # no idea what these are, but I am praying for the poor sap stuck with maintaining OTF's data model @@ -89,7 +123,7 @@ def get_bookings_new( try: results.append(models.BookingV2.create(**b, api=self.otf)) except ValueError as e: - LOGGER.warning(f"Failed to create BookingV2 from response: {e}. Booking data:\n{b}") + LOGGER.error("Failed to create BookingV2 from response: %s. Booking data:\n%s", e, b) continue if not remove_duplicates: @@ -112,6 +146,9 @@ def _deduplicate_bookings( list[BookingV2]: The deduplicated list of bookings. """ # remove duplicates by class_id, keeping the one with the most recent updated_at timestamp + + orig_count = len(results) + seen_classes: dict[str, models.BookingV2] = {} for booking in results: @@ -127,11 +164,20 @@ def _deduplicate_bookings( "this is unexpected behavior." ) if booking.updated_at > existing_booking.updated_at: + LOGGER.debug( + "Replacing existing booking for class_id %s with more recent booking %s", class_id, booking + ) seen_classes[class_id] = booking results = list(seen_classes.values()) results = sorted(results, key=lambda x: x.starts_at) + new_count = len(results) + diff = orig_count - new_count + + if diff: + LOGGER.debug("Removed %d duplicate bookings, returning %d unique bookings", diff, new_count) + return results def get_bookings_new_by_date( @@ -615,36 +661,3 @@ def rate_class( if e.response.status_code == 403: raise exc.AlreadyRatedError(f"Workout {performance_summary_id} is already rated.") from None raise - - def _get_all_bookings_new( - self, exclude_cancelled: bool = True, remove_duplicates: bool = True - ) -> list[models.BookingV2]: - """Get bookings from the new endpoint with no date filters. - - This is marked as private to avoid random users calling it. - Useful for testing and validating models. - - Args: - exclude_cancelled (bool): Whether to exclude cancelled bookings. Default is True. - remove_duplicates (bool): Whether to remove duplicate bookings. Default is True. - - Returns: - list[BookingV2]: List of bookings that match the search criteria. - """ - start_date = pendulum.datetime(1970, 1, 1) - end_date = pendulum.today().start_of("day").add(days=45) - return self.get_bookings_new(start_date, end_date, exclude_cancelled, remove_duplicates) - - def _get_all_bookings_new_by_date(self) -> dict[datetime, models.BookingV2]: - """Get all bookings from the new endpoint by date. - - This is marked as private to avoid random users calling it. - Useful for testing and validating models. - - Returns: - dict[datetime, BookingV2]: Dictionary of bookings by date. - """ - start_date = pendulum.datetime(1970, 1, 1) - end_date = pendulum.today().start_of("day").add(days=45) - bookings = self.get_bookings_new_by_date(start_date, end_date) - return bookings diff --git a/src/otf_api/api/client.py b/src/otf_api/api/client.py index 3acb5d5..11d5b13 100644 --- a/src/otf_api/api/client.py +++ b/src/otf_api/api/client.py @@ -1,4 +1,6 @@ import atexit +import json +import os import re from json import JSONDecodeError from logging import getLogger @@ -47,6 +49,7 @@ def __init__(self, user: OtfUser | None = None): self.session = httpx.Client( headers=HEADERS, auth=self.user.httpx_auth, timeout=httpx.Timeout(20.0, connect=60.0) ) + self.log_raw_response = os.getenv("OTF_LOG_RAW_RESPONSE", "false").lower() == "true" atexit.register(self.session.close) def __getstate__(self): @@ -110,7 +113,7 @@ def do( """ full_url = str(URL.build(scheme="https", host=base_url, path=path)) request = self._build_request(method, full_url, params, headers, **kwargs) - LOGGER.debug(f"Making {method!r} request to '{full_url}', params: {params}, headers: {headers}") + LOGGER.debug("Making %r request to '%s'", method, str(request.url)) try: response = self.session.send(request) @@ -158,10 +161,14 @@ def _map_http_error( if error_code == "602": raise exc.OutsideSchedulingWindowError("Class is outside scheduling window") - msg = f"HTTP error {error.response.status_code} for {request.method} {request.url}" - LOGGER.error(msg) + LOGGER.error("HTTP error %s for %s %s", response.status_code, request.method, request.url) error_cls = exc.RetryableOtfRequestError if response.status_code >= 500 else exc.OtfRequestError - raise error_cls(message=msg, original_exception=error, request=request, response=response) + raise error_cls( + message=f"HTTP error {response.status_code} for {request.method} {request.url}", + original_exception=error, + request=request, + response=response, + ) def _handle_transport_error(self, error: Exception, request: httpx.Request) -> None: """Handle transport errors during API requests. @@ -177,7 +184,7 @@ def _handle_transport_error(self, error: Exception, request: httpx.Request) -> N url = request.url if not isinstance(error, httpx.HTTPStatusError): - LOGGER.exception(f"Unexpected error during {method!r} {url!r}: {type(error).__name__} - {error}") + LOGGER.exception("Unexpected error during %r %r: %s - %s", method, url, type(error).__name__, error) return json_data = get_json_from_response(error.response) @@ -190,7 +197,7 @@ def _map_logical_error(self, data: dict, response: httpx.Response, request: http data_status: int | None = data.get("Status") or data.get("status") or None if isinstance(data, dict) and isinstance(data_status, int) and not 200 <= data_status <= 299: - LOGGER.error(f"API returned error: {data}") + LOGGER.error("API returned error: %s", data) raise exc.OtfRequestError("Bad API response", None, response=response, request=request) raise exc.OtfRequestError( @@ -202,17 +209,20 @@ def _handle_response(self, method: str, response: httpx.Response, request: httpx if method == "GET": raise exc.OtfRequestError("Empty response", None, response=response, request=request) - LOGGER.debug(f"No content returned from {method} {response.url}") + LOGGER.debug("No content returned from %s %s", method, response.url) return None try: json_data = response.json() except JSONDecodeError as e: - LOGGER.error(f"Invalid JSON: {e}") - LOGGER.error(f"Response content: {response.text}") + LOGGER.error("Invalid JSON: %s", e) + LOGGER.error("Response content: %s", response.text) raise if is_error_response(json_data): self._map_logical_error(json_data, response, request) + if self.log_raw_response: + LOGGER.debug("Response from %s %s: %s", method, response.url, json.dumps(json_data, indent=4)) + return json_data diff --git a/src/otf_api/api/members/member_api.py b/src/otf_api/api/members/member_api.py index d894989..4b3189c 100644 --- a/src/otf_api/api/members/member_api.py +++ b/src/otf_api/api/members/member_api.py @@ -3,18 +3,18 @@ from typing import Any from otf_api import models -from otf_api.api.client import OtfClient from .member_client import MemberClient if typing.TYPE_CHECKING: from otf_api import Otf + from otf_api.api.client import OtfClient LOGGER = getLogger(__name__) class MemberApi: - def __init__(self, otf: "Otf", otf_client: OtfClient): + def __init__(self, otf: "Otf", otf_client: "OtfClient"): """Initialize the Member API client. Args: diff --git a/src/otf_api/api/studios/studio_api.py b/src/otf_api/api/studios/studio_api.py index e1a984b..23c0bc0 100644 --- a/src/otf_api/api/studios/studio_api.py +++ b/src/otf_api/api/studios/studio_api.py @@ -4,18 +4,18 @@ from otf_api import exceptions as exc from otf_api import models from otf_api.api import utils -from otf_api.api.client import OtfClient from .studio_client import StudioClient if typing.TYPE_CHECKING: from otf_api import Otf + from otf_api.api.client import OtfClient LOGGER = getLogger(__name__) class StudioApi: - def __init__(self, otf: "Otf", otf_client: OtfClient): + def __init__(self, otf: "Otf", otf_client: "OtfClient"): """Initialize the Studio API client. Args: @@ -25,6 +25,51 @@ def __init__(self, otf: "Otf", otf_client: OtfClient): self.otf = otf self.client = StudioClient(otf_client) + def _get_all_studios(self) -> list[models.StudioDetail]: + """Gets all studios. Marked as private to avoid random users calling it. + + Useful for testing and validating models. + + Returns: + list[StudioDetail]: List of studios that match the search criteria. + """ + # long/lat being None will cause the endpoint to return all studios + results = self.client.get_studios_by_geo(None, None) + + studios: list[models.StudioDetail] = [] + for studio in results: + try: + studios.append(models.StudioDetail.create(**studio, api=self.otf)) + except ValueError as e: + LOGGER.error(f"Failed to create StudioDetail for studio {studio}: {e}") + continue + + return studios + + def _get_studio_detail_threaded(self, studio_uuids: list[str]) -> dict[str, models.StudioDetail]: + """Get detailed information about multiple studios in a threaded manner. + + This is used to improve performance when fetching details for multiple studios at once. + This method is on the Otf class because StudioDetail is a model that requires the API instance. + + Args: + studio_uuids (list[str]): List of studio UUIDs to get details for. + + Returns: + dict[str, StudioDetail]: A dictionary mapping studio UUIDs to their detailed information. + """ + studio_dicts = self.client.get_studio_detail_threaded(studio_uuids) + + studios: dict[str, models.StudioDetail] = {} + for studio_uuid, studio in studio_dicts.items(): + try: + studios[studio_uuid] = models.StudioDetail.create(**studio, api=self.otf) + except ValueError as e: + LOGGER.error(f"Failed to create StudioDetail for studio {studio_uuid}: {e}") + continue + + return studios + def get_favorite_studios(self) -> list[models.StudioDetail]: """Get the member's favorite studios. @@ -159,48 +204,3 @@ def search_studios_by_geo( continue return studios - - def _get_all_studios(self) -> list[models.StudioDetail]: - """Gets all studios. Marked as private to avoid random users calling it. - - Useful for testing and validating models. - - Returns: - list[StudioDetail]: List of studios that match the search criteria. - """ - # long/lat being None will cause the endpoint to return all studios - results = self.client.get_studios_by_geo(None, None) - - studios: list[models.StudioDetail] = [] - for studio in results: - try: - studios.append(models.StudioDetail.create(**studio, api=self.otf)) - except ValueError as e: - LOGGER.error(f"Failed to create StudioDetail for studio {studio}: {e}") - continue - - return studios - - def _get_studio_detail_threaded(self, studio_uuids: list[str]) -> dict[str, models.StudioDetail]: - """Get detailed information about multiple studios in a threaded manner. - - This is used to improve performance when fetching details for multiple studios at once. - This method is on the Otf class because StudioDetail is a model that requires the API instance. - - Args: - studio_uuids (list[str]): List of studio UUIDs to get details for. - - Returns: - dict[str, StudioDetail]: A dictionary mapping studio UUIDs to their detailed information. - """ - studio_dicts = self.client.get_studio_detail_threaded(studio_uuids) - - studios: dict[str, models.StudioDetail] = {} - for studio_uuid, studio in studio_dicts.items(): - try: - studios[studio_uuid] = models.StudioDetail.create(**studio, api=self.otf) - except ValueError as e: - LOGGER.error(f"Failed to create StudioDetail for studio {studio_uuid}: {e}") - continue - - return studios diff --git a/src/otf_api/api/workouts/workout_api.py b/src/otf_api/api/workouts/workout_api.py index 7247829..e63159e 100644 --- a/src/otf_api/api/workouts/workout_api.py +++ b/src/otf_api/api/workouts/workout_api.py @@ -9,18 +9,18 @@ from otf_api import exceptions as exc from otf_api import models from otf_api.api import utils -from otf_api.api.client import OtfClient from .workout_client import WorkoutClient if typing.TYPE_CHECKING: from otf_api import Otf + from otf_api.api.client import OtfClient LOGGER = getLogger(__name__) class WorkoutApi: - def __init__(self, otf: "Otf", otf_client: OtfClient): + def __init__(self, otf: "Otf", otf_client: "OtfClient"): """Initialize the Workout API client. Args: @@ -262,7 +262,7 @@ def get_workouts( bookings = self.otf.bookings.get_bookings_new( start_dtme, end_dtme, exclude_cancelled=True, remove_duplicates=True ) - bookings_dict = {b.workout.id: b for b in bookings if b.workout} + bookings_dict = self._filter_bookings_for_workouts(bookings) perf_summaries_dict = self.client.get_perf_summaries_threaded(list(bookings_dict.keys())) telemetry_dict = self.client.get_telemetry_threaded(list(perf_summaries_dict.keys()), max_data_points) @@ -279,12 +279,74 @@ def get_workouts( api=self.otf, ) workouts.append(workout) - except ValueError as e: - LOGGER.error(f"Failed to create Workout for performance summary {perf_id}: {e}") - continue + except ValueError: + LOGGER.exception("Failed to create Workout for performance summary %s", perf_id) + + LOGGER.debug("Returning %d workouts", len(workouts)) return workouts + def _filter_bookings_for_workouts(self, bookings: list[models.BookingV2]) -> dict[str, models.BookingV2]: + """Filter bookings to only those that have a workout and are not in the future. + + This is being pulled out of `get_workouts` to add more robust logging and error handling. + + Args: + bookings (list[BookingV2]): The list of bookings to filter. + + Returns: + dict[str, BookingV2]: A dictionary mapping workout IDs to bookings that have workouts. + """ + future_bookings = [b for b in bookings if b.starts_at and b.starts_at > pendulum.now().naive()] + missing_workouts = [b for b in bookings if not b.workout and b not in future_bookings] + LOGGER.debug("Found %d future bookings and %d missing workouts", len(future_bookings), len(missing_workouts)) + + if future_bookings: + for booking in future_bookings: + LOGGER.warning( + "Booking %s for class '%s' (class_uuid=%s) is in the future, filtering out.", + booking.booking_id, + booking.otf_class, + booking.class_uuid or "Unknown", + ) + + if missing_workouts: + for booking in missing_workouts: + LOGGER.warning( + "Booking %s for class '%s' (class_uuid=%s) is missing a workout, filtering out.", + booking.booking_id, + booking.otf_class, + booking.class_uuid or "Unknown", + ) + + bookings_dict = { + b.workout.id: b for b in bookings if b.workout and b not in future_bookings and b not in missing_workouts + } + + LOGGER.debug("Filtered bookings to %d valid bookings for workouts mapping", len(bookings_dict)) + + return bookings_dict + + def get_lifetime_workouts(self) -> list[models.Workout]: + """Get the member's lifetime workouts. + + This is a convenience method that calls `get_workouts` with no date range. + + Returns: + list[Workout]: The member's lifetime workouts. + + Raises: + ResourceNotFoundError: If the member's created date is not set, as we cannot determine the start date for + the workouts. + """ + if not self.otf.member.created_date: + raise exc.ResourceNotFoundError("Member created date not found, cannot get lifetime workouts.") + + start_date = self.otf.member.created_date.date() + end_date = pendulum.tomorrow().date() + + return self.get_workouts(start_date=start_date, end_date=end_date) + def rate_class_from_workout( self, workout: models.Workout, diff --git a/src/otf_api/api/workouts/workout_client.py b/src/otf_api/api/workouts/workout_client.py index 0e2a7e7..e4828ad 100644 --- a/src/otf_api/api/workouts/workout_client.py +++ b/src/otf_api/api/workouts/workout_client.py @@ -1,9 +1,12 @@ from concurrent.futures import ThreadPoolExecutor from functools import partial +from logging import getLogger from typing import Any from otf_api.api.client import API_IO_BASE_URL, API_TELEMETRY_BASE_URL, CACHE, OtfClient +LOGGER = getLogger(__name__) + class WorkoutClient: """Client for retrieving workout and performance data from the OTF API. @@ -98,7 +101,12 @@ def get_perf_summary_to_class_uuid_mapping(self) -> dict[str, str | None]: dict[str, str | None]: A dictionary mapping performance summary IDs to class UUIDs. """ perf_summaries = self.get_performance_summaries()["items"] - return {item["id"]: item["class"].get("ot_base_class_uuid") for item in perf_summaries} + LOGGER.debug("Retrieved %d performance summaries for mapping", len(perf_summaries)) + + perf_summary_dict = {item["id"]: item["class"].get("ot_base_class_uuid") for item in perf_summaries} + + LOGGER.debug("Created performance summary to class UUID mapping with %d entries", len(perf_summary_dict)) + return perf_summary_dict def get_perf_summaries_threaded(self, performance_summary_ids: list[str]) -> dict[str, dict[str, Any]]: """Get performance summaries in a ThreadPoolExecutor, to speed up the process. @@ -112,7 +120,13 @@ def get_perf_summaries_threaded(self, performance_summary_ids: list[str]) -> dic with ThreadPoolExecutor(max_workers=10) as pool: perf_summaries = pool.map(self.get_performance_summary, performance_summary_ids) - perf_summaries_dict = {perf_summary["id"]: perf_summary for perf_summary in perf_summaries} + perf_summaries_list = list(perf_summaries) + LOGGER.debug("Retrieved %d performance summaries in threaded mode", len(perf_summaries_list)) + + perf_summaries_dict = {perf_summary["id"]: perf_summary for perf_summary in perf_summaries_list} + + LOGGER.debug("Returning %d performance summaries", len(perf_summaries_dict)) + return perf_summaries_dict def get_telemetry_threaded( @@ -130,7 +144,15 @@ def get_telemetry_threaded( partial_fn = partial(self.get_telemetry, max_data_points=max_data_points) with ThreadPoolExecutor(max_workers=10) as pool: telemetry = pool.map(partial_fn, performance_summary_ids) - telemetry_dict = {perf_summary["classHistoryUuid"]: perf_summary for perf_summary in telemetry} + + telemetry_list = list(telemetry) + + LOGGER.debug("Retrieved %d telemetry records in threaded mode", len(telemetry_list)) + + telemetry_dict = {perf_summary["classHistoryUuid"]: perf_summary for perf_summary in telemetry_list} + + LOGGER.debug("Returning %d telemetry records", len(telemetry_dict)) + return telemetry_dict def get_aspire_data(self, datetime: str | None, unit: str | None) -> dict: diff --git a/src/otf_api/models/workouts/performance_summary.py b/src/otf_api/models/workouts/performance_summary.py index 2d90f73..f8c3eb6 100644 --- a/src/otf_api/models/workouts/performance_summary.py +++ b/src/otf_api/models/workouts/performance_summary.py @@ -1,9 +1,12 @@ -from datetime import time +from logging import getLogger +from typing import Any -from pydantic import AliasPath, Field, field_validator +from pydantic import AliasPath, Field from otf_api.models.base import OtfItemBase +LOGGER = getLogger(__name__) + class ZoneTimeMinutes(OtfItemBase): gray: int @@ -22,41 +25,17 @@ class HeartRate(OtfItemBase): class PerformanceMetric(OtfItemBase): - display_value: time | float | None + display_value: Any display_unit: str - metric_value: float + metric_value: float | int = Field( + coerce_numbers_to_str=True, + description="The raw value of the metric, as a float or int. When time this reflects seconds.", + ) def __str__(self) -> str: """Return a string representation of the PerformanceMetric.""" return f"{self.display_value} {self.display_unit}" - @field_validator("display_value", mode="before") - @classmethod - def convert_to_time_format(cls, value: str | None | float | int) -> time | float | None: - """Convert display_value to a time object if it is in the format of HH:MM:SS or MM:SS. - - Args: - value (str | None | float | int): The value to convert. - - Returns: - time | float: The converted value, or the original value if it is not in the expected format. - """ - if not value: - return None - - if isinstance(value, float | int): - return value - - if isinstance(value, str) and ":" in value: - if value.count(":") == 1: - minutes, seconds = value.split(":") - return time(minute=int(minutes), second=int(seconds)) - if value.count(":") == 2: - hours, minutes, seconds = value.split(":") - return time(hour=int(hours), minute=int(minutes), second=int(seconds)) - - return value # type: ignore - class BaseEquipment(OtfItemBase): avg_pace: PerformanceMetric diff --git a/uv.lock b/uv.lock index 0597441..33e1e28 100644 --- a/uv.lock +++ b/uv.lock @@ -316,6 +316,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "coloredlogs" +version = "15.0.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "humanfriendly" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cc/c7/eed8f27100517e8c0e6b923d5f0845d0cb99763da6fdee00478f91db7325/coloredlogs-15.0.1.tar.gz", hash = "sha256:7c991aa71a4577af2f82600d8f8f3a89f936baeaf9b50a9c197da014e5bf16b0", size = 278520, upload-time = "2021-06-11T10:22:45.202Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a7/06/3d6badcf13db419e25b07041d9c7b4a2c331d3f4e7134445ec5df57714cd/coloredlogs-15.0.1-py2.py3-none-any.whl", hash = "sha256:612ee75c546f53e92e70049c9dbfcc18c935a2b9a53b66085ce9ef6a6e5c0934", size = 46018, upload-time = "2021-06-11T10:22:42.561Z" }, +] + [[package]] name = "coverage" version = "7.6.10" @@ -521,6 +533,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, ] +[[package]] +name = "humanfriendly" +version = "10.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyreadline3", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cc/3f/2c29224acb2e2df4d2046e4c73ee2662023c58ff5b113c4c1adac0886c43/humanfriendly-10.0.tar.gz", hash = "sha256:6b0b831ce8f15f7300721aa49829fc4e83921a9a301cc7f606be6686a2288ddc", size = 360702, upload-time = "2021-09-17T21:40:43.31Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f0/0f/310fb31e39e2d734ccaa2c0fb981ee41f7bd5056ce9bc29b2248bd569169/humanfriendly-10.0-py2.py3-none-any.whl", hash = "sha256:1697e1a8a8f550fd43c2865cd84542fc175a61dcb779b6fee18cf6b6ccba1477", size = 86794, upload-time = "2021-09-17T21:40:39.897Z" }, +] + [[package]] name = "humanize" version = "4.11.0" @@ -869,11 +893,12 @@ wheels = [ [[package]] name = "otf-api" -version = "0.15.1" +version = "0.15.2" source = { editable = "." } dependencies = [ { name = "attrs" }, { name = "cachetools" }, + { name = "coloredlogs" }, { name = "diskcache" }, { name = "httpx" }, { name = "humanize" }, @@ -920,6 +945,7 @@ docs = [ requires-dist = [ { name = "attrs", specifier = ">=24.3.0,<25" }, { name = "cachetools", specifier = ">=5.5.0" }, + { name = "coloredlogs", specifier = ">=15.0.1" }, { name = "diskcache", specifier = ">=5.6.3" }, { name = "httpx", specifier = ">=0.27.2" }, { name = "humanize", specifier = ">=4.9.0,<5" }, @@ -1273,6 +1299,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bd/24/12818598c362d7f300f18e74db45963dbcb85150324092410c8b49405e42/pyproject_hooks-1.2.0-py3-none-any.whl", hash = "sha256:9e5c6bfa8dcc30091c74b0cf803c81fdd29d94f01992a7707bc97babb1141913", size = 10216, upload-time = "2024-09-29T09:24:11.978Z" }, ] +[[package]] +name = "pyreadline3" +version = "3.5.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0f/49/4cea918a08f02817aabae639e3d0ac046fef9f9180518a3ad394e22da148/pyreadline3-3.5.4.tar.gz", hash = "sha256:8d57d53039a1c75adba8e50dd3d992b28143480816187ea5efbd5c78e6c885b7", size = 99839, upload-time = "2024-09-19T02:40:10.062Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5a/dc/491b7661614ab97483abf2056be1deee4dc2490ecbf7bff9ab5cdbac86e1/pyreadline3-3.5.4-py3-none-any.whl", hash = "sha256:eaf8e6cc3c49bcccf145fc6067ba8643d1df34d604a1ec0eccbf7a18e6d3fae6", size = 83178, upload-time = "2024-09-19T02:40:08.598Z" }, +] + [[package]] name = "pytest" version = "8.2.2"