Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48dcd43
refactor multithreading into WattTimeBase
sam-watttime Feb 11, 2025
9217d5c
combine _fetch_data and _fetch_data_multithreaded methods
skoeb Feb 11, 2025
67fe635
remove test for seperate _fetch_data_multithreaded method
skoeb Feb 11, 2025
26decac
remove use of Session for now, will move into seperate PR
skoeb Feb 12, 2025
ae09336
track _last_request_meta in request method
jcofield Feb 13, 2025
5e11102
Test for every region in my-access in maps geojson (#40)
sam-watttime Mar 12, 2025
425c2e3
refactor multithreading into WattTimeBase (#35)
sam-watttime Mar 12, 2025
fc2899f
do to expose credentials as persistent attributes (#36)
sam-watttime Mar 12, 2025
743b522
Speed up historical forecast parsing by avoiding pd.json_normalize (#38)
sam-watttime Mar 12, 2025
65a45b3
Use sessions instead of individual connections (#37)
sam-watttime Mar 12, 2025
de8a13f
reduce max threadpool workers to appease low CPU runtimes (e.g. githu…
skoeb Mar 14, 2025
cbf6058
default to 1 cpu if os.cpu_count() is undetermined
skoeb Mar 14, 2025
2c7cd98
move multithreaded tests into their own classes to reduce setUp cost …
skoeb Mar 14, 2025
3209e24
close connections in tearDown in testing
skoeb Mar 14, 2025
81ee0dd
fix mock.patch for test_mock_register resulting in 400
skoeb Mar 14, 2025
cbf853c
restore ratelimit in base setUp tests
skoeb Mar 14, 2025
f511b4c
Merge branch 'main' into future-release
sam-watttime Apr 3, 2025
33eeef9
increase timeout from 20 to (10, 60)
skoeb Apr 3, 2025
5677dfe
comment out pytest.makr.skip, not working anyways?
skoeb Apr 3, 2025
83bc623
login before entering multithreading
skoeb Apr 3, 2025
a4d3c3b
reduce rate_limit for WattTimeHistoricalMultiThreaded test
skoeb Apr 3, 2025
6f47e43
TEMP: try 2 max_workers to see if github action passes
skoeb Apr 3, 2025
773c0a3
Revert "TEMP: try 2 max_workers to see if github action passes"
skoeb Apr 3, 2025
4af96d1
set worker_count to allow tests to pass
skoeb Apr 3, 2025
7c76b94
allow pytest to retry any failed tests 2 times
skoeb Apr 3, 2025
1bb7ec7
add pytest-rerunfailrues dependency for testing
skoeb Apr 3, 2025
1681e98
reduce rate_limit used by all tests from 5 -> 1 to avoid stepping on …
skoeb Apr 3, 2025
a90f72d
Merge branch 'main' into future-release
sam-watttime May 30, 2025
ff37d7b
warning logging and handling in api
sam-watttime Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ def test_mock_register(self, mock_post):
resp = self.base.register(email=os.getenv("WATTTIME_EMAIL"))
self.assertEqual(len(mock_post.call_args_list), 1)

def test_get_password(self):

with mock.patch.dict(os.environ, {}, clear=True), self.assertRaises(ValueError):
wt_base = WattTimeBase()

with mock.patch.dict(os.environ, {}, clear=True):
wt_base = WattTimeBase(
username="WATTTIME_USERNAME", password="WATTTIME_PASSWORD"
)
self.assertEqual(wt_base.username, "WATTTIME_USERNAME")
self.assertEqual(wt_base.password, "WATTTIME_PASSWORD")


class TestWattTimeHistorical(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -464,6 +476,11 @@ def test_historical_forecast_jsons_multithreaded(self):
class TestWattTimeMaps(unittest.TestCase):
def setUp(self):
self.maps = WattTimeMaps()
self.myaccess = WattTimeMyAccess()

def tearDown(self):
self.maps.session.close()
self.myaccess.session.close()

def tearDown(self):
self.maps.session.close()
Expand Down Expand Up @@ -503,6 +520,22 @@ def test_region_from_loc(self):
self.assertEqual(region["region_full_name"], "Public Service Co of Colorado")
self.assertEqual(region["signal_type"], "co2_moer")

def test_my_access_in_geojson(self):
access = self.myaccess.get_access_pandas()
for signal_type in ["co2_moer", "co2_aoer", "health_damage"]:
access_regions = access.loc[
access["signal_type"] == signal_type, "region"
].unique()
maps = self.maps.get_maps_json(signal_type=signal_type)
maps_regions = [i["properties"]["region"] for i in maps["features"]]

assert (
set(access_regions) - set(maps_regions) == set()
), f"Missing regions in geojson for {signal_type}: {set(access_regions) - set(maps_regions)}"
assert (
set(maps_regions) - set(access_regions) == set()
), f"Extra regions in geojson for {signal_type}: {set(maps_regions) - set(access_regions)}"


class TestWattTimeMarginalFuelMix(unittest.TestCase):
def setUp(self):
Expand Down
123 changes: 107 additions & 16 deletions watttime/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,57 @@
import time
import threading
import time
import logging
from collections import defaultdict
from datetime import date, datetime, timedelta, time as dt_time
from collections import defaultdict
from functools import cache
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

import pandas as pd
import requests
from dateutil.parser import parse
from pytz import UTC


class WattTimeAPIWarning:
def __init__(self, url: str, params: Dict[str, Any], warning_message: str):
self.url = url
self.params = params
self.warning_message = warning_message

def __repr__(self):
return f"<WattTimeAPIWarning url={self.url}, params={self.params}, warning={self.warning_message}>\n"

def to_dict(self) -> Dict[str, Any]:
def stringify(value: Any) -> Any:
if isinstance(value, datetime):
return value.isoformat()
return value

return {
"url": self.url,
"params": {k: stringify(v) for k, v in self.params.items()},
"warning_message": self.warning_message,
}


def get_log():
logging.basicConfig(
format="%(asctime)s [%(levelname)-1s] " "%(message)s",
level=logging.INFO,
handlers=[logging.StreamHandler()],
)
return logging.getLogger()


LOG = get_log()


class WattTimeBase:
url_base = os.getenv("WATTTIME_API_URL", "https://api.watttime.org")

Expand All @@ -37,8 +75,17 @@ def __init__(
worker_count (int): The number of worker threads to use for multithreading. Default is min(10, (os.cpu_count() or 1) * 2).

"""
self.username = username or os.getenv("WATTTIME_USER")
self.password = password or os.getenv("WATTTIME_PASSWORD")

# This only applies to the current session, is not stored persistently
if username and not os.getenv("WATTTIME_USER"):
os.environ["WATTTIME_USER"] = username
if password and not os.getenv("WATTTIME_PASSWORD"):
os.environ["WATTTIME_PASSWORD"] = password

# Accessing attributes will raise exception if variables are not set
_ = self.password
_ = self.username

self.token = None
self.headers = None
self.token_valid_until = None
Expand All @@ -47,14 +94,47 @@ def __init__(
self.rate_limit = rate_limit
self._last_request_times = []
self.worker_count = worker_count
self.raised_warnings: List[WattTimeAPIWarning] = []

if self.multithreaded:
self._rate_limit_lock = (
threading.Lock()
) # prevent multiple threads from modifying _last_request_times simultaneously
self._rate_limit_condition = threading.Condition(self._rate_limit_lock)

retry_strategy = Retry(
total=3,
status_forcelist=[500, 502, 503, 504],
backoff_factor=1,
raise_on_status=False,
)

adapter = HTTPAdapter(max_retries=retry_strategy)
self.session = requests.Session()
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)

@property
def password(self):
password = os.getenv("WATTTIME_PASSWORD")
if not password:
raise ValueError(
"WATTTIME_PASSWORD env variable is not set."
+ "Please set this variable, or pass in a password upon initialization,"
+ "which will store it as a variable only for the current session"
)
return password

@property
def username(self):
username = os.getenv("WATTTIME_USER")
if not username:
raise ValueError(
"WATTTIME_USER env variable is not set."
+ "Please set this variable, or pass in a username upon initialization,"
+ "which will store it as a variable only for the current session"
)
return username

def _login(self):
"""
Expand Down Expand Up @@ -158,7 +238,7 @@ def register(self, email: str, organization: Optional[str] = None) -> None:

rsp = self.session.post(url, json=params, timeout=(10, 60))
rsp.raise_for_status()
print(
LOG.info(
f"Successfully registered {self.username}, please check {email} for a verification email"
)

Expand Down Expand Up @@ -222,10 +302,19 @@ def _make_rate_limited_request(self, url: str, params: Dict[str, Any]) -> Dict:
f"API Request Failed: {e}\nURL: {url}\nParams: {params}"
) from e

if j.get("meta", {}).get("warnings"):
print("Warnings Returned: %s | Response: %s", params, j["meta"])
meta = j.get("meta", {})
warnings = meta.get("warnings")
if warnings:
for warning_message in warnings:
warning = WattTimeAPIWarning(
url=url, params=params, warning_message=warning_message
)
self.raised_warnings.append(warning)
LOG.warning(
f"API Warning: {warning_message} | URL: {url} | Params: {params}"
)

self._last_request_meta = j.get("meta", {})
self._last_request_meta = meta

return j

Expand Down Expand Up @@ -409,7 +498,7 @@ def get_historical_csv(
start, end = self._parse_dates(start, end)
fp = out_dir / f"{region}_{signal_type}_{start.date()}_{end.date()}.csv"
df.to_csv(fp, index=False)
print(f"file written to {fp}")
LOG.info(f"file written to {fp}")


class WattTimeMyAccess(WattTimeBase):
Expand Down Expand Up @@ -479,13 +568,16 @@ def _parse_historical_forecast_json(
Returns:
pd.DataFrame: A pandas DataFrame containing the parsed historical forecast data.
"""
out = pd.DataFrame()
for json in json_list:
for entry in json.get("data", []):
_df = pd.json_normalize(entry, record_path=["forecast"])
_df = _df.assign(generated_at=pd.to_datetime(entry["generated_at"]))
out = pd.concat([out, _df], ignore_index=True)
return out
data = []
for j in json_list:
for gen_at in j["data"]:
for point_time in gen_at["forecast"]:
point_time["generated_at"] = gen_at["generated_at"]
data.append(point_time)
df = pd.DataFrame.from_records(data)
df["point_time"] = pd.to_datetime(df["point_time"])
df["generated_at"] = pd.to_datetime(df["generated_at"])
return df

def get_forecast_json(
self,
Expand Down Expand Up @@ -709,7 +801,6 @@ def get_maps_json(


class WattTimeMarginalFuelMix(WattTimeBase):

def get_fuel_mix_jsons(
self,
start: Union[str, datetime],
Expand All @@ -731,7 +822,7 @@ def get_fuel_mix_jsons(
chunks = self._get_chunks(start, end, chunk_size=timedelta(days=30))

# No model will default to the most recent model version available
if model:
if model is not None:
params["model"] = model

param_chunks = [{**params, "start": c[0], "end": c[1]} for c in chunks]
Expand Down