From 25d6250e7ee57bf53de66d12f94387e588c827aa Mon Sep 17 00:00:00 2001 From: Xavier Lavallee Date: Thu, 2 Oct 2025 22:17:57 -0400 Subject: [PATCH] Improve logging and error handling - Prevents issue with loading event metadata from crashing ingestion. - Changes the default logging level to improve debugging. --- compose.yml | 3 +- packages/flare/bin/cron_job_ingest_events.py | 15 ++--- packages/flare/bin/data_store.py | 4 ++ packages/flare/bin/flare.py | 18 ++++-- packages/flare/bin/logger.py | 2 +- .../flare/tests/bin/test_flare_wrapper.py | 64 +++---------------- .../flare/tests/bin/test_ingest_events.py | 2 +- 7 files changed, 36 insertions(+), 72 deletions(-) diff --git a/compose.yml b/compose.yml index 2cc913b..2695dbd 100644 --- a/compose.yml +++ b/compose.yml @@ -9,7 +9,8 @@ services: environment: - SPLUNK_START_ARGS=--accept-license - SPLUNK_PASSWORD=a_password + - SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com volumes: - ./output/flare:/opt/splunk/etc/apps/flare - ./splunk/default.yml:/tmp/defaults/default.yml - - ./logs:/opt/splunk/var/log/splunk \ No newline at end of file + - ./logs:/opt/splunk/var/log/splunk diff --git a/packages/flare/bin/cron_job_ingest_events.py b/packages/flare/bin/cron_job_ingest_events.py index 663fe47..86d4859 100644 --- a/packages/flare/bin/cron_job_ingest_events.py +++ b/packages/flare/bin/cron_job_ingest_events.py @@ -47,6 +47,9 @@ def main( ingest_full_event_data = get_ingest_full_event_data( storage_passwords=storage_passwords ) + number_of_days_to_backfill = get_number_of_days_to_backfill( + storage_passwords=storage_passwords + ) severities_filter = get_severities_filter(storage_passwords=storage_passwords) source_types_filter = get_source_types_filter(storage_passwords=storage_passwords) @@ -61,9 +64,6 @@ def main( # for identifiers 30 days prior to the day a tenant was first configured. start_date = data_store.get_earliest_ingested_by_tenant(tenant_id) if not start_date: - number_of_days_to_backfill = get_number_of_days_to_backfill( - storage_passwords=storage_passwords - ) start_date = datetime.now(timezone.utc) - timedelta( days=number_of_days_to_backfill ) @@ -93,7 +93,7 @@ def main( logger.info(f"Fetched {events_fetched_count} events on tenant {tenant_id}") total_events_fetched_count += events_fetched_count - logger.info(f"Fetched {events_fetched_count} events across all tenants") + logger.info(f"Fetched {total_events_fetched_count} events across all tenants") def fetch_feed( @@ -163,14 +163,13 @@ def get_tenant_ids(storage_passwords: client.StoragePasswords) -> list[int]: stored_tenant_ids = get_storage_password_value( storage_passwords=storage_passwords, password_key=PasswordKeys.TENANT_IDS.value ) + tenant_ids = None try: - tenant_ids: Optional[list[int]] = ( - json.loads(stored_tenant_ids) if stored_tenant_ids else None - ) + tenant_ids = json.loads(stored_tenant_ids) if stored_tenant_ids else None except Exception: pass - if not tenant_ids: + if tenant_ids is None: raise Exception("Tenant IDs not found") return tenant_ids diff --git a/packages/flare/bin/data_store.py b/packages/flare/bin/data_store.py index a43e6a7..ea41c20 100644 --- a/packages/flare/bin/data_store.py +++ b/packages/flare/bin/data_store.py @@ -33,6 +33,10 @@ def _commit(self) -> None: def _sync(self) -> None: self._store.read(config_path) + def reset(self) -> None: + self._store.clear() + self._commit() + def get_last_fetch(self) -> Optional[datetime]: self._sync() last_fetched = self._store.get( diff --git a/packages/flare/bin/flare.py b/packages/flare/bin/flare.py index b222914..b80da3f 100644 --- a/packages/flare/bin/flare.py +++ b/packages/flare/bin/flare.py @@ -78,12 +78,18 @@ def fetch_feed_events( self.logger.debug(event_feed) next_token = event_feed["next"] for event in event_feed["items"]: - if ingest_full_event_data: - event = self._fetch_full_event_from_uid( - uid=event["metadata"]["uid"] - ) - time.sleep(1) # Don't hit rate limit - yield (event, next_token) + try: + if ingest_full_event_data: + event = self._fetch_full_event_from_uid( + uid=event["metadata"]["uid"] + ) + time.sleep(1) # Don't hit rate limit + except: + # There is already logging in the _fetch_full_event_from_uid + # we want to continue getting the other events even if one fails. + pass + finally: + yield (event, next_token) def _fetch_event_feed_metadata( self, diff --git a/packages/flare/bin/logger.py b/packages/flare/bin/logger.py index 4a46d9b..b61c0b0 100644 --- a/packages/flare/bin/logger.py +++ b/packages/flare/bin/logger.py @@ -24,7 +24,7 @@ def __init__(self, *, class_name: str) -> None: if os.environ.get("FLARE_ENV") == "dev": self._logger.setLevel(logging.DEBUG) else: - self._logger.setLevel(logging.ERROR) + self._logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s %(levelname)-5s %(message)s") handler = TimedRotatingFileHandler( log_filepath, when="d", interval=1, backupCount=5 diff --git a/packages/flare/tests/bin/test_flare_wrapper.py b/packages/flare/tests/bin/test_flare_wrapper.py index a870cdf..3f5b5f7 100644 --- a/packages/flare/tests/bin/test_flare_wrapper.py +++ b/packages/flare/tests/bin/test_flare_wrapper.py @@ -1,4 +1,3 @@ -import pytest import requests_mock from conftest import FakeLogger @@ -159,48 +158,7 @@ def test_flare_full_data_with_metadata( assert mock_full_event_2.called -def test_flare_full_data_with_metadata_and_exception( - logger: FakeLogger, - disable_sleep: Any, -) -> None: - with requests_mock.Mocker() as mocker: - mocker.register_uri( - "POST", - "https://api.flare.io/tokens/generate", - status_code=200, - json={"token": "access_token"}, - ) - - tenant_resp_page_1 = { - "next": "some_next_value", - "items": [ - {"not_metadata": {"uid": "some_uid_1"}}, - {"metadata": {"uid": "some_uid_2"}}, - ], - } - - mocker.register_uri( - "POST", - "https://api.flare.io/firework/v4/events/tenant/_search", - status_code=200, - json=tenant_resp_page_1, - ) - - flare_api = FlareAPI(api_key="some_key", tenant_id=111, logger=logger) - - with pytest.raises(KeyError, match="metadata"): - next( - flare_api.fetch_feed_events( - next=None, - start_date=None, - ingest_full_event_data=True, - severities=[], - source_types=[], - ) - ) - - -def test_flare_full_data_retry_exception( +def test_flare_full_data_retry_errors( logger: FakeLogger, disable_sleep: Any, ) -> None: @@ -235,19 +193,15 @@ def test_flare_full_data_retry_exception( flare_api = FlareAPI(api_key="some_key", tenant_id=111, logger=logger) - with pytest.raises( - Exception, - match="failed to fetch full event data for some_uid_1 after 3 tries", - ): - next( - flare_api.fetch_feed_events( - next=None, - start_date=None, - ingest_full_event_data=True, - severities=[], - source_types=[], - ) + next( + flare_api.fetch_feed_events( + next=None, + start_date=None, + ingest_full_event_data=True, + severities=[], + source_types=[], ) + ) assert logger.messages == [ "INFO: Failed to fetch event 1/3 retries: 500 Server Error: None for url: https://api.flare.io/firework/v2/activities/some_uid_1", diff --git a/packages/flare/tests/bin/test_ingest_events.py b/packages/flare/tests/bin/test_ingest_events.py index f47b9a4..30f993a 100644 --- a/packages/flare/tests/bin/test_ingest_events.py +++ b/packages/flare/tests/bin/test_ingest_events.py @@ -139,5 +139,5 @@ def test_main_expect_normal_run( "INFO: Fetched 2 events on tenant 11111", "INFO: Fetching tenant_id=22222, next=None, start_date=FakeDatetime(1999, 12, 2, 0, 0, tzinfo=datetime.timezone.utc)", "INFO: Fetched 2 events on tenant 22222", - "INFO: Fetched 2 events across all tenants", + "INFO: Fetched 4 events across all tenants", ]