From 2139155c78b1ebcd0a99f277de1ee13fc060645a Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 13:52:49 -0500 Subject: [PATCH 1/8] Initial version of ads-attribution-dap-collector job --- .../job-ads-attribution-dap-collector.yml | 56 ++++ .../.dockerignore | 7 + jobs/ads-attribution-dap-collector/.flake8 | 2 + jobs/ads-attribution-dap-collector/.gitignore | 4 + jobs/ads-attribution-dap-collector/Dockerfile | 42 +++ jobs/ads-attribution-dap-collector/README.md | 47 ++++ .../ads_attribution_dap_collector/collect.py | 190 +++++++++++++ .../ads_attribution_dap_collector/main.py | 154 +++++++++++ .../ads_attribution_dap_collector/parse.py | 148 ++++++++++ .../ads_attribution_dap_collector/persist.py | 103 +++++++ .../ads_attribution_dap_collector/schema.py | 52 ++++ .../ads-attribution-dap-collector/ci_job.yaml | 46 ++++ .../dev_run_docker.sh | 13 + jobs/ads-attribution-dap-collector/pytest.ini | 3 + .../requirements.txt | 10 + jobs/ads-attribution-dap-collector/setup.py | 15 + .../tests/__init__.py | 0 .../tests/test_collect.py | 244 +++++++++++++++++ .../tests/test_mocks.py | 259 ++++++++++++++++++ .../tests/test_parse.py | 132 +++++++++ .../tests/test_persist.py | 128 +++++++++ 21 files changed, 1655 insertions(+) create mode 100644 .github/workflows/job-ads-attribution-dap-collector.yml create mode 100644 jobs/ads-attribution-dap-collector/.dockerignore create mode 100644 jobs/ads-attribution-dap-collector/.flake8 create mode 100644 jobs/ads-attribution-dap-collector/.gitignore create mode 100644 jobs/ads-attribution-dap-collector/Dockerfile create mode 100644 jobs/ads-attribution-dap-collector/README.md create mode 100644 jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/collect.py create mode 100644 jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/main.py create mode 100644 jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py create mode 100644 jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/persist.py create mode 100644 jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py create mode 100644 jobs/ads-attribution-dap-collector/ci_job.yaml create mode 100755 jobs/ads-attribution-dap-collector/dev_run_docker.sh create mode 100644 jobs/ads-attribution-dap-collector/pytest.ini create mode 100644 jobs/ads-attribution-dap-collector/requirements.txt create mode 100644 jobs/ads-attribution-dap-collector/setup.py create mode 100644 jobs/ads-attribution-dap-collector/tests/__init__.py create mode 100644 jobs/ads-attribution-dap-collector/tests/test_collect.py create mode 100644 jobs/ads-attribution-dap-collector/tests/test_mocks.py create mode 100644 jobs/ads-attribution-dap-collector/tests/test_parse.py create mode 100644 jobs/ads-attribution-dap-collector/tests/test_persist.py diff --git a/.github/workflows/job-ads-attribution-dap-collector.yml b/.github/workflows/job-ads-attribution-dap-collector.yml new file mode 100644 index 00000000..0467ffbd --- /dev/null +++ b/.github/workflows/job-ads-attribution-dap-collector.yml @@ -0,0 +1,56 @@ +### +# This file was generated by docker-etl/ci_config.py. +# Changes should be made to job ci_job.yaml files and re-generated. +### + +name: ads-attribution-dap-collector + +on: + push: + branches: + - main + paths: + - 'jobs/ads-attribution-dap-collector/**' + - '.github/workflows/job-ads-attribution-dap-collector.yml' + pull_request: + paths: + - 'jobs/ads-attribution-dap-collector/**' + - '.github/workflows/job-ads-attribution-dap-collector.yml' + +jobs: + build-job-ads-attribution-dap-collector: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Build Docker image + run: docker build jobs/ads-attribution-dap-collector/ -t app:build + + - name: Test Code + run: docker run app:build pytest --flake8 --black + + push-job-ads-attribution-dap-collector: + runs-on: ubuntu-latest + needs: build-job-ads-attribution-dap-collector + if: github.ref == 'refs/heads/main' + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v2 + with: + credentials_json: ${{ secrets.GCP_CREDENTIALS }} + + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@v2 + + - name: Configure Docker for GCR + run: gcloud auth configure-docker + + - name: Build Docker image + run: docker build jobs/ads-attribution-dap-collector/ -t gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest + + - name: Push to GCR + run: docker push gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest \ No newline at end of file diff --git a/jobs/ads-attribution-dap-collector/.dockerignore b/jobs/ads-attribution-dap-collector/.dockerignore new file mode 100644 index 00000000..cff5d6ab --- /dev/null +++ b/jobs/ads-attribution-dap-collector/.dockerignore @@ -0,0 +1,7 @@ +.ci_job.yaml +.ci_workflow.yaml +.DS_Store +*.pyc +.pytest_cache/ +__pycache__/ +venv/ diff --git a/jobs/ads-attribution-dap-collector/.flake8 b/jobs/ads-attribution-dap-collector/.flake8 new file mode 100644 index 00000000..2bcd70e3 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 88 diff --git a/jobs/ads-attribution-dap-collector/.gitignore b/jobs/ads-attribution-dap-collector/.gitignore new file mode 100644 index 00000000..2e9942c0 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/.gitignore @@ -0,0 +1,4 @@ +.DS_Store +*.pyc +__pycache__/ +venv/ diff --git a/jobs/ads-attribution-dap-collector/Dockerfile b/jobs/ads-attribution-dap-collector/Dockerfile new file mode 100644 index 00000000..fddf86b9 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/Dockerfile @@ -0,0 +1,42 @@ +FROM python:3.12 +LABEL maintainer="Glenda Leonard " +ARG HOME="/janus_build" +WORKDIR ${HOME} + +RUN apt update && apt --yes install curl + +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +ENV PATH=$HOME/.cargo/bin:$PATH + +# build the CLI tool +RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69' +RUN cd janus && cargo build -r -p janus_tools --bin collect + +######### next stage + +FROM python:3.12 +LABEL maintainer="Glenda Leonard " +# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md +ARG USER_ID="10001" +ARG GROUP_ID="app" +ARG HOME="/app" +ENV HOME=${HOME} + +RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \ + useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID} + +WORKDIR ${HOME} +COPY --from=0 /janus_build/janus/target/release/collect ./ + +RUN pip install --upgrade pip + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . + +RUN pip install . + +# Drop root and change ownership of the application folder to the user +RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME} +USER ${USER_ID} diff --git a/jobs/ads-attribution-dap-collector/README.md b/jobs/ads-attribution-dap-collector/README.md new file mode 100644 index 00000000..f9d2864a --- /dev/null +++ b/jobs/ads-attribution-dap-collector/README.md @@ -0,0 +1,47 @@ +# Ads Attribution DAP Collection Job + +This job collects metrics from DAP and write the results to BigQuery. + +## Overview +This job is driven by a config file from a GCS bucket. Use `job_config_gcp_project` +and `job_config_bucket` to specify the file. The config file must be named +`attribution-conf.json` and a sample is available [here](https://github.com/mozilla-services/mars/tree/main/internal/gcp/storage/testdata/mars-attribution-config). + +## Usage + +This script is intended to be run in a docker container. +Build the docker image with: + +It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it +starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it +also documents those variables. + +Once the environment variables are set up, run the job with: + + +```sh +./dev_run_docker.sh +``` +To just build the docker image, use: +``` +docker build -t ads-attribution-dap-collector . +``` + +## Testing + +First create the job venv using +``` +python -m venv ./venv +source ./venv/bin/activat +pip install -r requirements.txt +``` +Run tests from `/jobs/ads-attribution-dap-collector` using: +`python -m pytest` + +## Linting and Formatting +``` +black . +``` +``` +flake8 . +``` \ No newline at end of file diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/collect.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/collect.py new file mode 100644 index 00000000..2d58cf08 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/collect.py @@ -0,0 +1,190 @@ +import ast +import logging +import subprocess +import re + +from datetime import date, datetime, timedelta + +DAP_LEADER = "https://dap-09-3.api.divviup.org" +VDAF = "histogram" +PROCESS_TIMEOUT = 1200 # 20 mins + + +def get_aggregated_results( + process_date: date, + batch_start: date, + batch_end: date, + task_id: str, + vdaf_length: int, + collector_duration: int, + bearer_token: str, + hpke_config: str, + hpke_private_key: str, +) -> dict: + process_batch = _should_collect_batch(process_date, batch_end) + + if process_batch: + # Step 4 Collect DAP results. + aggregated_results = collect_dap_result( + task_id=task_id, + vdaf_length=vdaf_length, + batch_start=batch_start, + duration=collector_duration, + bearer_token=bearer_token, + hpke_config=hpke_config, + hpke_private_key=hpke_private_key, + ) + + return aggregated_results + + +def current_batch_start( + process_date: date, partner_start_date: date, duration: int +) -> date | None: + if process_date < partner_start_date: + return None + + if ( + partner_start_date + <= process_date + < partner_start_date + timedelta(seconds=duration) + ): + return partner_start_date + + # After the first interval ... + batch_start = partner_start_date + while True: + next_start = batch_start + timedelta(seconds=duration) + # check if the process_date is the batch_end date + # if yes we only need to go back 1 duration to get the start + if next_start + timedelta(days=-1) == process_date: + return next_start + timedelta(seconds=-duration) + + # this means the process date is in the next interval so + # need to go back 2 durations to get the batch_start + if next_start > process_date: + return next_start + timedelta(seconds=-2 * duration) + + batch_start = next_start + + +def current_batch_end(batch_start: date, duration: int) -> date: + # since the start and end dates are inclusive need to subtract 1 from duration + return batch_start + timedelta(seconds=duration, days=-1) + + +def _should_collect_batch(process_date, batch_end) -> bool: + return batch_end == process_date + + +def _correct_wraparound(num: int) -> int: + field_prime = 340282366920938462946865773367900766209 + field_size = 128 + cutoff = 2 ** (field_size - 1) + if num > cutoff: + return num - field_prime + return num + + +def _parse_histogram(histogram_str: str) -> dict: + parsed_list = ast.literal_eval(histogram_str) + return {i: _correct_wraparound(val) for i, val in enumerate(parsed_list)} + + +def _parse_http_error(text: str) -> tuple[int, str, str | None] | None: + """ + Returns (status_code, status_text, error_message) + or None if the pattern is not found. + """ + ERROR_RE = re.compile( + r"HTTP response status\s+(\d+)\s+([A-Za-z ]+)(?:\s+-\s+(.*))?$" + ) + match = ERROR_RE.search(text) + if not match: + return None + + status_code = int(match.group(1)) + status_text = match.group(2).strip() + error_message = match.group(3).strip() if match.group(3) else None + return status_code, status_text, error_message + + +# DAP functions +def collect_dap_result( + task_id: str, + vdaf_length: int, + batch_start: date, + duration: int, + bearer_token: str, + hpke_config: str, + hpke_private_key: str, +) -> dict: + # Beware! This command string reveals secrets. Use logging only for + # debugging in local dev. + + batch_start_epoch = int( + datetime.combine(batch_start, datetime.min.time()).timestamp() + ) + + try: + result = subprocess.run( + [ + "./collect", + "--task-id", + task_id, + "--leader", + DAP_LEADER, + "--vdaf", + VDAF, + "--length", + f"{vdaf_length}", + "--authorization-bearer-token", + bearer_token, + "--batch-interval-start", + f"{batch_start_epoch}", + "--batch-interval-duration", + f"{duration}", + "--hpke-config", + hpke_config, + "--hpke-private-key", + hpke_private_key, + ], + capture_output=True, + text=True, + check=True, + timeout=PROCESS_TIMEOUT, + ) + for line in result.stdout.splitlines(): + if line.startswith("Aggregation result:"): + entries = _parse_histogram(line[21:-1]) + return entries + # Beware! Exceptions thrown by the subprocess reveal secrets. + # Log them and include traceback only for debugging in local dev. + except subprocess.CalledProcessError as e: + result = _parse_http_error(e.stderr) + if result is None: + logging.error(e) + raise Exception( + f"Collection failed for {task_id}, {e.returncode}, stderr: {e.stderr}" + ) from None + else: + status_code, status_text, error_message = result + if status_code == 400: + logging.info( + f"Collection failed for {task_id}, {status_code} {status_text}" + f" {error_message}" + ) + elif status_code == 404: + detail = ( + error_message + if error_message is not None + else "Verify start date is not more than 14 days ago." + ) + logging.info( + f"Collection failed for {task_id}, {status_code} {status_text} " + f"{detail}" + ) + except subprocess.TimeoutExpired as e: + raise Exception( + f"Collection timed out for {task_id}, {e.timeout}, stderr: {e.stderr}" + ) from None diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/main.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/main.py new file mode 100644 index 00000000..e6a075fe --- /dev/null +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/main.py @@ -0,0 +1,154 @@ +import click +import logging +import traceback + +from google.cloud import bigquery, storage + +from datetime import datetime + +from .parse import get_config, extract_advertisers_with_partners_and_ads +from .collect import get_aggregated_results, current_batch_start, current_batch_end +from .persist import create_bq_table_if_not_exists, create_bq_row, insert_into_bq + + +LOG_FILE_NAME = f"{datetime.now()}-ads-newtab-attribution-dap-collector.log" + + +def write_job_logs_to_bucket(gcp_project: str, config_bucket: str): + client = storage.Client(project=gcp_project) + try: + bucket = client.get_bucket(config_bucket) + blob = bucket.blob(f"logs/{LOG_FILE_NAME}") + blob.upload_from_filename(LOG_FILE_NAME) + except Exception as e: + raise Exception( + f"Failed to upload job log file: {LOG_FILE_NAME} " + f"to GCS bucket: {config_bucket} in project: {gcp_project}." + ) from e + + +@click.command() +@click.option( + "--job_config_gcp_project", + help="GCP project id for the GCS bucket containing the configuration file.", + required=True, +) +@click.option("--bq_project", help="BigQuery project id", required=True) +@click.option( + "--job_config_bucket", + help="GCS bucket where the configuration for this job can be found.", + required=True, +) +@click.option( + "--bearer_token", + envvar="DAP_BEARER_TOKEN", + help="The 'token' defined in the collector credentials.", + required=True, +) +@click.option( + "--hpke_private_key", + envvar="DAP_PRIVATE_KEY", + help="The 'private_key' defined in the collector credentials.", + required=True, +) +@click.option( + "--process_date", + type=click.DateTime(formats=["%Y-%m-%d"]), + help="Current processing date (ds)", + required=True, +) +def main( + job_config_gcp_project, + bq_project, + job_config_bucket, + bearer_token, + hpke_private_key, + process_date, +): + try: + process_date = process_date.date() + + logging.info( + f"Starting collector job with configuration from gcp project: " + f"{job_config_gcp_project} and gcs bucket: {job_config_bucket} " + f"for process date: {process_date}" + ) + + bq_client = bigquery.Client(project=bq_project) + + # Step 1 check for the table + full_table_id = create_bq_table_if_not_exists(bq_project, bq_client) + + # Step 2a Get newtab attribution config + json_config = get_config(job_config_gcp_project, job_config_bucket) + hpke_config, config = extract_advertisers_with_partners_and_ads(json_config) + + # Step 2b Get the hpke_config + for advertiser_config in config: + # Step 3 Get processing date range. + batch_start = current_batch_start( + process_date, + advertiser_config.start_date, + advertiser_config.collector_duration, + ) + if batch_start is None: + # The process_date is too early + logging.info( + f"Advertiser start_date: {advertiser_config.start_date} is after " + f"process_date: {process_date}, skipping." + ) + continue + batch_end = current_batch_end( + batch_start, advertiser_config.collector_duration + ) + + aggregated_results = get_aggregated_results( + process_date=process_date, + batch_start=batch_start, + batch_end=batch_end, + task_id=advertiser_config.partner.task_id, + vdaf_length=advertiser_config.partner.length, + collector_duration=advertiser_config.collector_duration, + bearer_token=bearer_token, + hpke_config=hpke_config, + hpke_private_key=hpke_private_key, + ) + + if aggregated_results is None: + logging.info( + f"No results available for advertiser: {advertiser_config.name} " + f"with start_date: {advertiser_config.start_date} a" + f"nd process_date: {process_date}" + ) + continue + for ad_config in advertiser_config.ads: + conversion_count = aggregated_results[ad_config.index] + row = create_bq_row( + collection_start=batch_start, + collection_end=batch_end, + provider=ad_config.source, + ad_id=ad_config.ad_id, + lookback_window=advertiser_config.lookback_window, + conversion_type=advertiser_config.conversion_type, + conversion_count=conversion_count, + ) + + insert_into_bq(row, bq_client, full_table_id) + + except Exception as e: + logging.error(f"Collector job failed. Error: {e}\n{traceback.format_exc()}") + raise e + finally: + write_job_logs_to_bucket(job_config_gcp_project, job_config_bucket) + + +if __name__ == "__main__": + logging.basicConfig( + filename=LOG_FILE_NAME, + filemode="a", + format="%(asctime)s,%(msecs)03d %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, + ) + logging.getLogger().setLevel(logging.INFO) + main() diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py new file mode 100644 index 00000000..b581e962 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py @@ -0,0 +1,148 @@ +import logging +from datetime import date +from dataclasses import dataclass +from google.cloud import storage +import json +from typing import Any +from .schema import ConfigModel +from pydantic import ValidationError +from uuid import UUID + + +CONFIG_FILE_NAME = "attribution-conf.json" + + +@dataclass(frozen=True) +class PartnerConfig: + partner_id: UUID + task_id: str + vdaf: str + bits: int + length: int + time_precision: int + default_measurement: int + + +@dataclass(frozen=True) +class AdConfig: + source: str + ad_id: int + index: int + partner_id: UUID + + +@dataclass(frozen=True) +class AdvertiserConfig: + name: str + partner_id: UUID + start_date: date + collector_duration: int + conversion_type: str + lookback_window: int + partner: PartnerConfig + ads: list[AdConfig] + + +def get_config(gcp_project: str, config_bucket: str) -> dict[str, Any]: + """Gets the attribution job's config from a file in a GCS bucket.""" + client = storage.Client(project=gcp_project) + try: + bucket = client.get_bucket(config_bucket) + blob = bucket.blob(CONFIG_FILE_NAME) + with blob.open("rt") as reader: + config: dict[str, Any] = json.load(reader) + return config + except Exception as e: + raise RuntimeError( + f"Failed to get or parse job config file: {CONFIG_FILE_NAME} " + f"from GCS bucket: {config_bucket} " + f"in project: {gcp_project}." + ) from e + + +def _require_ad_id_and_source(ad_key: str) -> tuple[str, int]: + if ":" not in ad_key: + raise ValueError( + f"Skipping invalid ad key '{ad_key}': " + f"missing ':' (expected 'source:id')" # noqa: E231 + ) + + source, ad_id_str = ad_key.split(":", 1) + try: + ad_id = int(ad_id_str) + except ValueError: + raise ValueError( + f"Skipping invalid ad key '{ad_key}': ad_id '{ad_id_str}' is not an integer" + ) + + return source, ad_id + + +def extract_advertisers_with_partners_and_ads( + raw_config: dict[str, Any] +) -> tuple[str, list[AdvertiserConfig]]: + """ + Returns: (hpke_config, advertisers) + - returns both hpke_config and advertisers. + """ + + try: + cfg = ConfigModel.model_validate(raw_config) + except ValidationError as e: + raise ValueError(f"Invalid config: {e}") from None + + hpke_config = cfg.collection_config.hpke_config + + # 1) Parse the leaf ads first and key by partnerId + ads_by_partner: dict[UUID, list[AdConfig]] = {} + for ad_key, ad in cfg.ads.items(): + try: + source, ad_id = _require_ad_id_and_source(ad_key) + except ValueError as e: + logging.error(f"Skipping invalid ad key '{ad_key}':{e}") # noqa: E231 + continue + + ad_cfg = AdConfig( + source=source, + ad_id=ad_id, + index=ad.index, + partner_id=ad.partner_id, + ) + ads_by_partner.setdefault(ad.partner_id, []).append(ad_cfg) + + # 2) Add advertiser to partner+ads config + out: list[AdvertiserConfig] = [] + + for adv in cfg.advertisers: + if adv.partner_id not in cfg.partners: + raise ValueError( + f"Advertiser '{adv.name}' references unknown partner_id " + f"'{adv.partner_id}, available partners: {cfg.partners}'" + ) + + p = cfg.partners[adv.partner_id] + + partner = PartnerConfig( + partner_id=adv.partner_id, + task_id=p.task_id, + vdaf=p.vdaf, + bits=p.bits, + length=p.length, + time_precision=p.time_precision, + default_measurement=p.default_measurement, + ) + + out.append( + AdvertiserConfig( + name=adv.name, + partner_id=adv.partner_id, + start_date=adv.start_date, + collector_duration=adv.collector_duration, + lookback_window=adv.lookback_window, + conversion_type=adv.conversion_type, + partner=partner, + ads=ads_by_partner.get(adv.partner_id, []), + ) + ) + + return hpke_config, out diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/persist.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/persist.py new file mode 100644 index 00000000..277cd751 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/persist.py @@ -0,0 +1,103 @@ +from datetime import date, datetime +from typing import Any + +from google.cloud import bigquery + + +NAMESPACE = "ads_dap_derived" +TABLE_NAME = "newtab_attribution_v1" + +COLLECTOR_RESULTS_SCHEMA = [ + bigquery.SchemaField( + "collection_start", + "DATE", + mode="REQUIRED", + description="Start date of the collected time window, inclusive.", + ), + bigquery.SchemaField( + "collection_end", + "DATE", + mode="REQUIRED", + description="End date of the collected time window, inclusive.", + ), + bigquery.SchemaField( + "provider", + "STRING", + mode="REQUIRED", + description="The external service providing the ad.", + ), + bigquery.SchemaField( + "ad_id", + "INT64", + mode="REQUIRED", + description="Id of ad, unique by provider.", + ), + bigquery.SchemaField( + "lookback_window", + "INT64", + mode="REQUIRED", + description="Maximum number of days to attribute an ad.", + ), + bigquery.SchemaField( + "conversion_type", + "STRING", + mode="REQUIRED", + description="Indicates the type of conversion [view, click, default]", + ), + bigquery.SchemaField( + "conversion_count", + "INT64", + mode="REQUIRED", + description="Aggregated number of conversions attributed to the ad_id.", + ), + bigquery.SchemaField( + "created_timestamp", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp for when this row was created.", + ), +] + + +def create_bq_table_if_not_exists(project: str, bq_client: bigquery.Client) -> str: + data_set = f"{project}.{NAMESPACE}" + bq_client.create_dataset(data_set, exists_ok=True) + + full_table_id = f"{data_set}.{TABLE_NAME}" + table = bigquery.Table(full_table_id, schema=COLLECTOR_RESULTS_SCHEMA) + try: + bq_client.create_table(table, exists_ok=True) + return full_table_id + except Exception as e: + raise Exception(f"Failed to create BQ table: {full_table_id}") from e + + +def create_bq_row( + collection_start: date, + collection_end: date, + provider: str, + ad_id: int, + lookback_window: int, + conversion_type: str, + conversion_count: int, +) -> dict[str, Any]: + """Creates a BQ row converting date to str where required.""" + row = { + "collection_start": collection_start.isoformat(), + "collection_end": collection_end.isoformat(), + "provider": provider, + "ad_id": ad_id, + "lookback_window": lookback_window, + "conversion_type": conversion_type, + "conversion_count": conversion_count, + "created_timestamp": datetime.now().isoformat(), + } + return row + + +def insert_into_bq(row, bqclient, table_id: str): + """Inserts the results into BQ. Assumes that they are already in the right format""" + if row: + insert_res = bqclient.insert_rows_json(table=table_id, json_rows=[row]) + if len(insert_res) != 0: + raise Exception(f"Error inserting rows into {table_id}: {insert_res}") diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py new file mode 100644 index 00000000..3d3c5e31 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py @@ -0,0 +1,52 @@ +from datetime import date +from pydantic import BaseModel, Field, ConfigDict +from typing import Literal +from uuid import UUID + + +class CollectionConfigModel(BaseModel): + model_config = ConfigDict(extra="forbid") + hpke_config: str = Field(min_length=1) + + +class AdvertiserModel(BaseModel): + model_config = ConfigDict(extra="forbid") + + name: str = Field(min_length=1) + partner_id: UUID + start_date: date + collector_duration: int = Field(gt=86399) # 1 day - 1 sec in seconds + conversion_type: Literal["view", "click", "default"] + lookback_window: int = Field(gt=0) + + +class PartnerModel(BaseModel): + model_config = ConfigDict(extra="forbid") + + task_id: str = Field(min_length=32) + vdaf: Literal["histogram", "sumvec", "sum"] + bits: int | None = None + length: int = Field(gt=0) + time_precision: int + default_measurement: int + + +class AdModel(BaseModel): + model_config = ConfigDict(extra="forbid") + + partner_id: UUID + index: int + + +class ConfigModel(BaseModel): + """ + Full config with validation. + Note: ads keys are dynamic (source:id), so they remain a dict[str, AdModel]. + """ + + model_config = ConfigDict(extra="forbid") + + collection_config: CollectionConfigModel + advertisers: list[AdvertiserModel] + partners: dict[UUID, PartnerModel] + ads: dict[str, AdModel] diff --git a/jobs/ads-attribution-dap-collector/ci_job.yaml b/jobs/ads-attribution-dap-collector/ci_job.yaml new file mode 100644 index 00000000..74d969e0 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/ci_job.yaml @@ -0,0 +1,46 @@ + build-job-ads-attribution-dap-collector: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Check if job files changed + id: changes + uses: dorny/paths-filter@v3 + with: + filters: | + job: + - 'jobs/ads-attribution-dap-collector/**' + + - name: Build Docker image + if: steps.changes.outputs.job == 'true' || github.ref == 'refs/heads/main' || contains(github.event.head_commit.message, '[run-tests]') + run: docker build jobs/ads-attribution-dap-collector/ -t app:build + + - name: Test Code + if: steps.changes.outputs.job == 'true' || github.ref == 'refs/heads/main' || contains(github.event.head_commit.message, '[run-tests]') + run: docker run app:build pytest --flake8 --black + + push-job-ads-attribution-dap-collector: + runs-on: ubuntu-latest + needs: build-job-ads-attribution-dap-collector + if: github.ref == 'refs/heads/main' + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v2 + with: + credentials_json: ${{ secrets.GCP_CREDENTIALS }} + + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@v2 + + - name: Configure Docker for GCR + run: gcloud auth configure-docker + + - name: Build Docker image + run: docker build jobs/ads-attribution-dap-collector/ -t gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest + + - name: Push to GCR + run: docker push gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest \ No newline at end of file diff --git a/jobs/ads-attribution-dap-collector/dev_run_docker.sh b/jobs/ads-attribution-dap-collector/dev_run_docker.sh new file mode 100755 index 00000000..cfe64a81 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/dev_run_docker.sh @@ -0,0 +1,13 @@ +docker build -t ads-attribution-dap-collector . + +docker run -it --rm \ + -v $HOME/.config/gcloud:/app/.config/gcloud \ + -e GOOGLE_CLOUD_PROJECT= \ + -e GOOGLE_APPLICATION_CREDENTIALS=.json \ + ads-attribution-dap-collector python -m ads_attribution_dap_collector.main \ + --job_config_gcp_project \ + --bq_project \ + --job_config_bucket \ + --bearer_token $DAP_BEARER_TOKEN \ + --hpke_private_key $DAP_PRIVATE_KEY \ + --process_date $PROCESS_DATE \ No newline at end of file diff --git a/jobs/ads-attribution-dap-collector/pytest.ini b/jobs/ads-attribution-dap-collector/pytest.ini new file mode 100644 index 00000000..e618d7a5 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +testpaths = + tests diff --git a/jobs/ads-attribution-dap-collector/requirements.txt b/jobs/ads-attribution-dap-collector/requirements.txt new file mode 100644 index 00000000..128e7e93 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/requirements.txt @@ -0,0 +1,10 @@ +cattrs==25.1.1 +click==8.3.1 +pydantic==2.12.5 +pytest==6.2.5 +pytest-black==0.3.11 +pytest-flake8==1.0.6 +pytz==2025.2 +google-cloud-bigquery==3.34.0 +google-cloud-storage==3.3.0 +tldextract==5.3.0 diff --git a/jobs/ads-attribution-dap-collector/setup.py b/jobs/ads-attribution-dap-collector/setup.py new file mode 100644 index 00000000..0b572eba --- /dev/null +++ b/jobs/ads-attribution-dap-collector/setup.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +from setuptools import setup, find_packages + +readme = open("README.md").read() + +setup( + name="ads-attribution-dap-collector-job", + version="0.1.0", + author="gleonard@mozilla.com", + packages=find_packages(include=["ads_attribution_dap_collector"]), + long_description=readme, + include_package_data=True, + license="MPL 2.0", +) diff --git a/jobs/ads-attribution-dap-collector/tests/__init__.py b/jobs/ads-attribution-dap-collector/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/jobs/ads-attribution-dap-collector/tests/test_collect.py b/jobs/ads-attribution-dap-collector/tests/test_collect.py new file mode 100644 index 00000000..8ffcec66 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/tests/test_collect.py @@ -0,0 +1,244 @@ +from datetime import date + +from unittest import TestCase +from unittest.mock import patch + +from ads_attribution_dap_collector.collect import ( + current_batch_start, + current_batch_end, + _should_collect_batch, + _parse_http_error, + _correct_wraparound, + _parse_histogram, + collect_dap_result, + get_aggregated_results, +) + +from tests.test_mocks import ( + JAN_1_2026, + DURATION_3_DAYS, + DURATION_7_DAYS, + DURATION_1_DAY, + mock_dap_subprocess_success, + MOCK_TASK_ID_1, +) + + +class TestHelpers(TestCase): + def test_current_batch_start_3_day(self): + """ + batches + [2026-01-01 : 2026-01-03] + [2026-01-04 : 2026-01-06] + [2026-01-07 : 2026-01-09] + [2026-01-10 : 2026-01-12] + [2026-01-13 : 2026-01-15] + [2026-01-16 : 2026-01-18] + [2026-01-19 : 2026-01-21] + """ + + # process date at start of first batch + batch_start = current_batch_start( + process_date=date(2026, 1, 1), + partner_start_date=JAN_1_2026, + duration=DURATION_3_DAYS, + ) + self.assertEqual(batch_start, date(2026, 1, 1)) + + # process date middle of first batch + batch_start = current_batch_start( + process_date=date(2026, 1, 2), + partner_start_date=JAN_1_2026, + duration=DURATION_3_DAYS, + ) + self.assertEqual(batch_start, date(2026, 1, 1)) + + # process date end of first batch + batch_start = current_batch_start( + process_date=date(2026, 1, 3), + partner_start_date=JAN_1_2026, + duration=DURATION_3_DAYS, + ) + self.assertEqual(batch_start, date(2026, 1, 1)) + + # process date at start of subsequent batch + batch_start = current_batch_start( + process_date=date(2026, 1, 10), + partner_start_date=JAN_1_2026, + duration=DURATION_3_DAYS, + ) + self.assertEqual(batch_start, date(2026, 1, 7)) + + # process date middle of subsequent batch + batch_start = current_batch_start( + process_date=date(2026, 1, 11), + partner_start_date=JAN_1_2026, + duration=DURATION_3_DAYS, + ) + self.assertEqual(batch_start, date(2026, 1, 7)) + + # process date end of subsequent batch + batch_start = current_batch_start( + process_date=date(2026, 1, 12), + partner_start_date=JAN_1_2026, + duration=DURATION_3_DAYS, + ) + self.assertEqual(batch_start, date(2026, 1, 10)) + + def test_current_batch_start_1_day(self): + """ + batches + [2026-01-01 : 2026-01-01] + [2026-01-02 : 2026-01-02] + [2026-01-03 : 2026-01-03] + [2026-01-04 : 2026-01-04] + """ + + # process date before start of first batch + batch_start = current_batch_start( + process_date=date(2025, 12, 30), + partner_start_date=JAN_1_2026, + duration=DURATION_1_DAY, + ) + self.assertIsNone(batch_start) + + # process date at start of first batch + batch_start = current_batch_start( + process_date=date(2026, 1, 1), + partner_start_date=JAN_1_2026, + duration=DURATION_1_DAY, + ) + self.assertEqual(batch_start, date(2026, 1, 1)) + + # process date at start of subsequent batch + batch_start = current_batch_start( + process_date=date(2026, 1, 15), + partner_start_date=JAN_1_2026, + duration=DURATION_1_DAY, + ) + self.assertEqual(batch_start, date(2026, 1, 15)) + + def test_current_batch_end_(self): + batch_end = current_batch_end( + batch_start=date(2026, 1, 1), duration=DURATION_3_DAYS + ) + self.assertEqual(batch_end, date(2026, 1, 3)) + + batch_end = current_batch_end( + batch_start=date(2026, 1, 10), duration=DURATION_7_DAYS + ) + self.assertEqual(batch_end, date(2026, 1, 16)) + + batch_end = current_batch_end( + batch_start=date(2026, 1, 12), duration=DURATION_1_DAY + ) + self.assertEqual(batch_end, date(2026, 1, 12)) + + def test_should_collect_batch(self): + # process_date is end of batch + batch_end = current_batch_end( + batch_start=date(2026, 1, 1), duration=DURATION_3_DAYS + ) + process_date = date(2026, 1, 3) + self.assertTrue(_should_collect_batch(process_date, batch_end)) + + # process date is not end of batch + batch_end = current_batch_end( + batch_start=date(2026, 1, 1), duration=DURATION_3_DAYS + ) + process_date = date(2026, 1, 2) + self.assertFalse(_should_collect_batch(process_date, batch_end)) + + # process date is start of batch + batch_end = current_batch_end( + batch_start=date(2026, 1, 1), duration=DURATION_3_DAYS + ) + process_date = date(2026, 1, 1) + self.assertFalse(_should_collect_batch(process_date, batch_end)) + + def test_parse_http_error_400(self): + msg = ( + "HTTP response status 400 Bad Request - " + "The number of reports included in the batch is invalid." + ) + + status_code, status_text, error_message = _parse_http_error(msg) + + self.assertEqual(status_code, 400) + self.assertEqual(status_text, "Bad Request") + self.assertEqual( + error_message, "The number of reports included in the batch is invalid." + ) + + def test_parse_http_error_404(self): + msg = "HTTP response status 404 Not Found" + + status_code, status_text, error_message = _parse_http_error(msg) + + self.assertEqual(status_code, 404) + self.assertEqual(status_text, "Not Found") + self.assertIsNone(error_message) + + def test_correct_wraparound(self): + wrapped = _correct_wraparound(340282366920938462946865773367900766210) + self.assertEqual(wrapped, 1) + + def test_parse_histogram(self): + histogram_string = "5,3, 6,0, 8" + parse_dict = _parse_histogram(histogram_string) + self.assertEqual(parse_dict[0], 5) + self.assertEqual(parse_dict[1], 3) + self.assertEqual(parse_dict[2], 6) + self.assertEqual(parse_dict[3], 0) + self.assertEqual(parse_dict[4], 8) + + @patch("subprocess.run", side_effect=mock_dap_subprocess_success) + def test_collect_dap_result_success(self, mock_dap_subprocess_success): + task_id = MOCK_TASK_ID_1 + collected_tasks = collect_dap_result( + task_id=task_id, + vdaf_length=4, + batch_start=date(2026, 1, 1), + duration=123, + bearer_token="token", + hpke_config="config", + hpke_private_key="private_key", + ) + self.assertEqual(1, mock_dap_subprocess_success.call_count) + self.assertEqual(len(collected_tasks), 4) + self.assertEqual(collected_tasks[1], 11) + self.assertEqual(collected_tasks[2], 22) + self.assertEqual(collected_tasks[3], 33) + + @patch("subprocess.run", side_effect=mock_dap_subprocess_success) + def test_get_aggregated_results(self, mock_dap_subprocess_success): + task_id = MOCK_TASK_ID_1 + process_date = date(2026, 1, 7) + batch_end = current_batch_end(batch_start=JAN_1_2026, duration=DURATION_7_DAYS) + + aggregated_results = get_aggregated_results( + process_date=process_date, + task_id=task_id, + vdaf_length=4, + batch_start=JAN_1_2026, + batch_end=batch_end, + collector_duration=DURATION_7_DAYS, + bearer_token="token", + hpke_config="config", + hpke_private_key="private_key", + ) + self.assertIsNotNone(aggregated_results) + + process_date = date(2026, 1, 8) + aggregated_results = get_aggregated_results( + process_date=process_date, + task_id=task_id, + vdaf_length=4, + batch_start=JAN_1_2026, + batch_end=batch_end, + collector_duration=DURATION_7_DAYS, + bearer_token="token", + hpke_config="config", + hpke_private_key="private_key", + ) + self.assertIsNone(aggregated_results) diff --git a/jobs/ads-attribution-dap-collector/tests/test_mocks.py b/jobs/ads-attribution-dap-collector/tests/test_mocks.py new file mode 100644 index 00000000..d4bf461f --- /dev/null +++ b/jobs/ads-attribution-dap-collector/tests/test_mocks.py @@ -0,0 +1,259 @@ +from google.cloud import bigquery +from collections.abc import Mapping, Sequence +from subprocess import CompletedProcess +from typing import Any +from uuid import uuid4 +from datetime import date + +JAN_1_2026 = date(2026, 1, 1) +JAN_7_2026 = date(2026, 1, 7) +JAN_15_2026 = date(2026, 1, 5) + +DURATION_3_DAYS = 259200 +DURATION_7_DAYS = 604800 +DURATION_1_DAY = 86400 + +MOCK_PARTNER_ID_1 = uuid4() +MOCK_PARTNER_ID_2 = uuid4() + +MOCK_TASK_ID_1 = "0QqFBHvuEk1_y4v4GIa9bTaa3vXXtLjsK64QeifzHp1" +MOCK_TASK_ID_2 = "0QqFBHvuEk1_y4v4GIa9bTaa3vXXtLjsK64QeifzHp2" + + +def mock_get_valid_config() -> dict[str, Any]: + return { + "collection_config": { + "hpke_config": "AQAgAAEAAQAgAjnUwz-F_tIm85OQd5dlfGqm0VhycGn2D1rkQCB4Lyk" + }, + "advertisers": [ + { + "name": "advertiser_1", + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "start_date": "2026-01-01", + "collector_duration": 604800, + "conversion_type": "view", + "lookback_window": 7, + }, + { + "name": "advertiser_2", + "partner_id": "c8d55857-ab7a-470a-9853-23e303e4594d", + "start_date": "2026-01-08", + "collector_duration": 259200, + "conversion_type": "click", + "lookback_window": 14, + }, + ], + "partners": { + "295beef7-1e3b-4128-b8f8-858e12aa660a": { + "task_id": "ix_ucynIiL-tUOPDqLTI2KrhOy4j4vpnIGZKq6jlSeA", + "vdaf": "histogram", + "bits": 0, + "length": 40, + "time_precision": 60, + "default_measurement": 0, + }, + "c8d55857-ab7a-470a-9853-23e303e4594d": { + "task_id": "0QqFBHvuEk1_y4v4GIa9bTaa3vXXtLjsK64QeifzHp2", + "vdaf": "histogram", + "bits": 0, + "length": 101, + "time_precision": 3600, + "default_measurement": 100, + }, + }, + "ads": { + "provider_a:1234": { + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "index": 9, + }, + "provider_a:5678": { + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "index": 2, + }, + "provider_a:9876": { + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "index": 3, + }, + "provider_b:1234": { + "partner_id": "c8d55857-ab7a-470a-9853-23e303e4594d", + "index": 1, + }, + "provider_b:5678": { + "partner_id": "c8d55857-ab7a-470a-9853-23e303e4594d", + "index": 2, + }, + }, + } + + +def mock_get_config_invalid_conversion() -> dict[str, Any]: + return { + "collection_config": { + "hpke_config": "AQAgAAEAAQAgAjnUwz-F_tIm85OQd5dlfGqm0VhycGn2D1rkQCB4Lyk" + }, + "advertisers": [ + { + "name": "advertiser_1", + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "start_date": "2026-01-01", + "collector_duration": 604800, + "conversion_type": "viewandclick", + "lookback_window": 7, + } + ], + "partners": { + "295beef7-1e3b-4128-b8f8-858e12aa660a": { + "task_id": "ix_ucynIiL-tUOPDqLTI2KrhOy4j4vpnIGZKq6jlSeA", + "vdaf": "histogram", + "bits": 0, + "length": 40, + "time_precision": 60, + "default_measurement": 0, + } + }, + "ads": { + "provider_a:1234": { + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "index": 9, + } + }, + } + + +def mock_get_config_invalid_duration_value() -> dict[str, Any]: + return { + "collection_config": { + "hpke_config": "AQAgAAEAAQAgAjnUwz-F_tIm85OQd5dlfGqm0VhycGn2D1rkQCB4Lyk" + }, + "advertisers": [ + { + "name": "advertiser_1", + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "start_date": "2026-01-01", + "collector_duration": 7, + "conversion_type": "view", + "lookback_window": 7, + } + ], + "partners": { + "295beef7-1e3b-4128-b8f8-858e12aa660a": { + "task_id": "ix_ucynIiL-tUOPDqLTI2KrhOy4j4vpnIGZKq6jlSeA", + "vdaf": "histogram", + "bits": 0, + "length": 40, + "time_precision": 60, + "default_measurement": 0, + } + }, + "ads": { + "provider_a:1234": { + "partner_id": "295beef7-1e3b-4128-b8f8-858e12aa660a", + "index": 9, + } + }, + } + + +def mock_dap_subprocess_success( + args: list[str], capture_output: bool, text: bool, check: bool, timeout: int +) -> CompletedProcess: + return CompletedProcess( + args=[ + "./collect", + "--task-id", + MOCK_TASK_ID_1, + "--leader", + "https://dap-leader-url", + "--vdaf", + "histogram", + "--length", + "4", + "--authorization-bearer-token", + "ssh_secret_token", + "--batch-interval-start", + "1768780800", + "--batch-interval-duration", + "604800", + "--hpke-config", + "AQAgAAEAAQAgpdceoGiuWvIiogA8SPCdprkhWMNtLq_y0GSePI7EhXE", + "--hpke-private-key", + "ssh-secret-private-key", + ], + returncode=0, + stdout="Number of reports: 150\nInterval start: 2026-01-19 00:00:00 UTC\nInterval end: 2026-01-25 00:00:00 UTC\nInterval length: 120s\nAggregation result: [50, 11, 22, 33]\n", # noqa: E501 + stderr="", + ) + + +def mock_create_dataset(data_set: str, exists_ok: bool): + pass + + +def mock_create_table(table: bigquery.Table, exists_ok: bool): + pass + + +def mock_insert_rows_json(table: str, json_rows: dict) -> Sequence[Mapping]: + return [] + + +def mock_insert_rows_json_fail(table: str, json_rows: dict) -> Sequence[Mapping]: + return [ + {"key": 0, "errors": "Problem writing bucket 1 results"}, + ] + + +def mock_bq_table() -> bigquery.Table: + return bigquery.Table( + "some-gcp-project-id.ads_dap_derived.newtab_attribution_v1", + schema=[ + bigquery.SchemaField( + "collection_start", + "DATE", + mode="REQUIRED", + description="Start date of the collected time window, inclusive.", + ), + bigquery.SchemaField( + "collection_end", + "DATE", + mode="REQUIRED", + description="End date of the collected time window, inclusive.", + ), + bigquery.SchemaField( + "provider", + "STRING", + mode="REQUIRED", + description="The external service providing the ad.", + ), + bigquery.SchemaField( + "ad_id", + "INT64", + mode="REQUIRED", + description="Id of ad, unique by provider.", + ), + bigquery.SchemaField( + "lookback_window", + "INT64", + mode="REQUIRED", + description="Maximum number of days to attribute an ad.", + ), + bigquery.SchemaField( + "conversion_type", + "STRING", + mode="REQUIRED", + description="Indicates the type of conversion [view, click, default]", + ), + bigquery.SchemaField( + "conversion_count", + "INT64", + mode="REQUIRED", + description="Aggregated number of conversions attributed to the ad_id.", + ), + bigquery.SchemaField( + "created_timestamp", + "TIMESTAMP", + mode="REQUIRED", + description="Timestamp for when this row was created.", + ), + ], + ) diff --git a/jobs/ads-attribution-dap-collector/tests/test_parse.py b/jobs/ads-attribution-dap-collector/tests/test_parse.py new file mode 100644 index 00000000..ca40c23d --- /dev/null +++ b/jobs/ads-attribution-dap-collector/tests/test_parse.py @@ -0,0 +1,132 @@ +import pytest +import re + +from unittest import TestCase + +from tests.test_mocks import ( + mock_get_valid_config, + mock_get_config_invalid_conversion, + mock_get_config_invalid_duration_value, +) + +from ads_attribution_dap_collector.parse import ( + extract_advertisers_with_partners_and_ads, + _require_ad_id_and_source, +) + + +class TestHelpers(TestCase): + def test_require_ad_id_and_source(self): + with self.assertRaises(ValueError): + _require_ad_id_and_source("test_provider1234") + + with self.assertRaises(ValueError): + _require_ad_id_and_source("test_provider:1234a") + + source, ad_id = _require_ad_id_and_source("test_provider:1234") + self.assertEqual("test_provider", source) + self.assertEqual(1234, ad_id) + + def test_extract_advertisers_with_partners_and_ads(self): + json_config = mock_get_valid_config() + + hpke_config, advertiser_configs = extract_advertisers_with_partners_and_ads( + json_config + ) + + self.assertEqual(hpke_config, json_config["collection_config"]["hpke_config"]) + + self.assertEqual(len(advertiser_configs), 2) + + for i, advertiser in enumerate(advertiser_configs): + # check top level advertiser values + self.assertEqual(advertiser.name, json_config["advertisers"][i]["name"]) + self.assertEqual( + str(advertiser.partner_id), json_config["advertisers"][i]["partner_id"] + ) + self.assertEqual( + advertiser.start_date.isoformat(), + json_config["advertisers"][i]["start_date"], + ) + self.assertEqual( + advertiser.collector_duration, + json_config["advertisers"][i]["collector_duration"], + ) + self.assertEqual( + advertiser.conversion_type, + json_config["advertisers"][i]["conversion_type"], + ) + self.assertEqual( + advertiser.lookback_window, + json_config["advertisers"][i]["lookback_window"], + ) + + partner_config = advertiser.partner + self.assertEqual( + str(partner_config.partner_id), + json_config["advertisers"][i]["partner_id"], + ) + self.assertEqual( + partner_config.task_id, + json_config["partners"][json_config["advertisers"][i]["partner_id"]][ + "task_id" + ], + ) + self.assertEqual( + partner_config.vdaf, + json_config["partners"][json_config["advertisers"][i]["partner_id"]][ + "vdaf" + ], + ) + self.assertEqual( + partner_config.bits, + json_config["partners"][json_config["advertisers"][i]["partner_id"]][ + "bits" + ], + ) + self.assertEqual( + partner_config.length, + json_config["partners"][json_config["advertisers"][i]["partner_id"]][ + "length" + ], + ) + self.assertEqual( + partner_config.time_precision, + json_config["partners"][json_config["advertisers"][i]["partner_id"]][ + "time_precision" + ], + ) + self.assertEqual( + partner_config.default_measurement, + json_config["partners"][json_config["advertisers"][i]["partner_id"]][ + "default_measurement" + ], + ) + + # check that the parsed ads match the json + ads_config = advertiser.ads + json_ads = json_config["ads"] + for ad in ads_config: + ad_key = f"{ad.source}:{ad.ad_id}" # noqa: E231 + self.assertIn(ad_key, json_ads) + json_ad = json_ads[ad_key] + self.assertEqual(json_ad["index"], ad.index) + self.assertEqual( + str(ad.partner_id), json_config["ads"][ad_key]["partner_id"] + ) + + def test_extract_advertisers_invalid_conversion(self): + json_config = mock_get_config_invalid_conversion() + with pytest.raises( + Exception, + match=re.escape("Input should be 'view', 'click' or 'default'"), + ): + extract_advertisers_with_partners_and_ads(json_config) + + def test_extract_advertisers_invalid_duration_value(self): + json_config = mock_get_config_invalid_duration_value() + with pytest.raises( + Exception, + match=re.escape("Input should be greater than 86399"), + ): + extract_advertisers_with_partners_and_ads(json_config) diff --git a/jobs/ads-attribution-dap-collector/tests/test_persist.py b/jobs/ads-attribution-dap-collector/tests/test_persist.py new file mode 100644 index 00000000..5a03cd62 --- /dev/null +++ b/jobs/ads-attribution-dap-collector/tests/test_persist.py @@ -0,0 +1,128 @@ +import pytest +import re +from unittest import TestCase +from unittest.mock import call, patch +from tests.test_mocks import ( + JAN_1_2026, + JAN_7_2026, + mock_create_dataset, + mock_create_table, + mock_insert_rows_json, + mock_insert_rows_json_fail, + mock_bq_table, +) + +from ads_attribution_dap_collector.persist import ( + NAMESPACE, + COLLECTOR_RESULTS_SCHEMA, + create_bq_table_if_not_exists, + create_bq_row, + insert_into_bq, +) + +from google.cloud import bigquery + +CREATED_TIMESTAMP = "2026-01-20T15:56:39.003071" + + +class TestHelpers(TestCase): + @patch("google.cloud.bigquery.Table") + @patch("google.cloud.bigquery.Client") + def test_write_record_to_bigquery_success( + self, + bq_client, + bq_table, + ): + bq_client.return_value.create_dataset.side_effect = mock_create_dataset + bq_client.return_value.create_table.side_effect = mock_create_table + bq_client.return_value.insert_rows_json.side_effect = mock_insert_rows_json + + project_id = "test-project-id" + bq_test_client = bigquery.Client(project=project_id) + + full_table_id = create_bq_table_if_not_exists(project_id, bq_test_client) + + self.assertIn("ads_dap_derived", full_table_id) + self.assertIn("newtab_attribution_v1", full_table_id) + + bq_client.return_value.create_dataset.assert_called_once_with( + f"{project_id}.{NAMESPACE}", exists_ok=True + ) + + bq_client.assert_called_once_with(project=project_id) + bq_table.assert_called_once_with( + full_table_id, + schema=COLLECTOR_RESULTS_SCHEMA, + ) + + bq_client.return_value.create_table.assert_called_once_with( + mock_bq_table(), exists_ok=True + ) + row = create_bq_row( + collection_start=JAN_1_2026, + collection_end=JAN_7_2026, + provider="test", + ad_id=1234, + lookback_window=7, + conversion_type="default", + conversion_count=150, + ) + + # overwrite created_timestamp for test stability + row["created_timestamp"] = CREATED_TIMESTAMP + insert_into_bq(row, bq_test_client, full_table_id) + + calls = [ + call( + table=full_table_id, + json_rows=[ + { + "collection_start": "2026-01-01", + "collection_end": "2026-01-07", + "provider": "test", + "ad_id": 1234, + "lookback_window": 7, + "conversion_type": "default", + "conversion_count": 150, + "created_timestamp": CREATED_TIMESTAMP, + } + ], + ) + ] + bq_client.return_value.insert_rows_json.assert_has_calls(calls) + + # what can cause it to fail + @patch("google.cloud.bigquery.Table") + @patch("google.cloud.bigquery.Client") + def test_write_record_to_bigquery_fail_insert_row( + self, + bq_client, + bq_table, + ): + bq_client.return_value.create_dataset.side_effect = mock_create_dataset + bq_client.return_value.create_table.side_effect = mock_create_table + bq_client.return_value.insert_rows_json.side_effect = mock_insert_rows_json_fail + + project_id = "test-project-id" + bq_test_client = bigquery.Client(project=project_id) + + full_table_id = create_bq_table_if_not_exists(project_id, bq_test_client) + + row = create_bq_row( + collection_start=JAN_1_2026, + collection_end=JAN_7_2026, + provider="test", + ad_id=1234, + lookback_window=7, + conversion_type="default", + conversion_count=150, + ) + + with pytest.raises( + Exception, + match=re.escape( + "test-project-id.ads_dap_derived.newtab_attribution_v1: " + "[{'key': 0, 'errors': 'Problem writing bucket 1 results'}]" + ), + ): + insert_into_bq(row, bq_test_client, full_table_id) From 2c34e4904f0df9372d337fc8419c426588211134 Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 13:54:04 -0500 Subject: [PATCH 2/8] Initial version of ads-attribution-dap-collector job --- .../ads_attribution_dap_collector/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/__init__.py diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/__init__.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/__init__.py new file mode 100644 index 00000000..e69de29b From 42e9be74a884b2b88b7fa9ab7f8a1ba9cfcbedc1 Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 14:13:05 -0500 Subject: [PATCH 3/8] Downgraded pytest version --- jobs/ads-attribution-dap-collector/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/ads-attribution-dap-collector/requirements.txt b/jobs/ads-attribution-dap-collector/requirements.txt index 128e7e93..a273ceaa 100644 --- a/jobs/ads-attribution-dap-collector/requirements.txt +++ b/jobs/ads-attribution-dap-collector/requirements.txt @@ -1,7 +1,7 @@ cattrs==25.1.1 click==8.3.1 pydantic==2.12.5 -pytest==6.2.5 +pytest==6.0.2 pytest-black==0.3.11 pytest-flake8==1.0.6 pytz==2025.2 From 64160ab82cda9b54fa8d1e96d4296f3840eae94f Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 14:28:09 -0500 Subject: [PATCH 4/8] Downgraded python version --- jobs/ads-attribution-dap-collector/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jobs/ads-attribution-dap-collector/Dockerfile b/jobs/ads-attribution-dap-collector/Dockerfile index fddf86b9..06a44cbf 100644 --- a/jobs/ads-attribution-dap-collector/Dockerfile +++ b/jobs/ads-attribution-dap-collector/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.12 +FROM python:3.10 LABEL maintainer="Glenda Leonard " ARG HOME="/janus_build" WORKDIR ${HOME} @@ -14,7 +14,7 @@ RUN cd janus && cargo build -r -p janus_tools --bin collect ######### next stage -FROM python:3.12 +FROM python:3.10 LABEL maintainer="Glenda Leonard " # https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md ARG USER_ID="10001" From 778df84df4ba7775ad79a9bd8d4e7e46ca473b57 Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 14:41:08 -0500 Subject: [PATCH 5/8] Updated pytest version --- jobs/ads-attribution-dap-collector/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobs/ads-attribution-dap-collector/requirements.txt b/jobs/ads-attribution-dap-collector/requirements.txt index a273ceaa..128e7e93 100644 --- a/jobs/ads-attribution-dap-collector/requirements.txt +++ b/jobs/ads-attribution-dap-collector/requirements.txt @@ -1,7 +1,7 @@ cattrs==25.1.1 click==8.3.1 pydantic==2.12.5 -pytest==6.0.2 +pytest==6.2.5 pytest-black==0.3.11 pytest-flake8==1.0.6 pytz==2025.2 From 6f38257972c79048b5179dfad5e30908bf99ceda Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 15:27:26 -0500 Subject: [PATCH 6/8] Use python 3.12 and update ci_job.yml --- jobs/ads-attribution-dap-collector/Dockerfile | 4 ++-- jobs/ads-attribution-dap-collector/ci_job.yaml | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/jobs/ads-attribution-dap-collector/Dockerfile b/jobs/ads-attribution-dap-collector/Dockerfile index 06a44cbf..fddf86b9 100644 --- a/jobs/ads-attribution-dap-collector/Dockerfile +++ b/jobs/ads-attribution-dap-collector/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.10 +FROM python:3.12 LABEL maintainer="Glenda Leonard " ARG HOME="/janus_build" WORKDIR ${HOME} @@ -14,7 +14,7 @@ RUN cd janus && cargo build -r -p janus_tools --bin collect ######### next stage -FROM python:3.10 +FROM python:3.12 LABEL maintainer="Glenda Leonard " # https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md ARG USER_ID="10001" diff --git a/jobs/ads-attribution-dap-collector/ci_job.yaml b/jobs/ads-attribution-dap-collector/ci_job.yaml index 74d969e0..dc58d708 100644 --- a/jobs/ads-attribution-dap-collector/ci_job.yaml +++ b/jobs/ads-attribution-dap-collector/ci_job.yaml @@ -11,14 +11,15 @@ filters: | job: - 'jobs/ads-attribution-dap-collector/**' - - - name: Build Docker image - if: steps.changes.outputs.job == 'true' || github.ref == 'refs/heads/main' || contains(github.event.head_commit.message, '[run-tests]') - run: docker build jobs/ads-attribution-dap-collector/ -t app:build - + - name: Build the Docker image + if: steps.changes.outputs.job == 'true' + # yamllint disable + run: | + docker build jobs/ads-attribution-dap-collector -t us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest + # yamllint enable - name: Test Code - if: steps.changes.outputs.job == 'true' || github.ref == 'refs/heads/main' || contains(github.event.head_commit.message, '[run-tests]') - run: docker run app:build pytest --flake8 --black + if: steps.changes.outputs.job == 'true' + run: docker run us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest python3 -m pytest push-job-ads-attribution-dap-collector: runs-on: ubuntu-latest From 4c49a27abdb90c3abc38d26b774e2b5344f124dd Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Thu, 22 Jan 2026 15:31:33 -0500 Subject: [PATCH 7/8] Update job-ads-attribution-dap-collector.yml --- .../workflows/job-ads-attribution-dap-collector.yml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/job-ads-attribution-dap-collector.yml b/.github/workflows/job-ads-attribution-dap-collector.yml index 0467ffbd..420fe5d7 100644 --- a/.github/workflows/job-ads-attribution-dap-collector.yml +++ b/.github/workflows/job-ads-attribution-dap-collector.yml @@ -24,11 +24,13 @@ jobs: - name: Checkout code uses: actions/checkout@v6 - - name: Build Docker image - run: docker build jobs/ads-attribution-dap-collector/ -t app:build - + - name: Build the Docker image + # yamllint disable + run: | + docker build jobs/ads-attribution-dap-collector -t us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest + # yamllint enable - name: Test Code - run: docker run app:build pytest --flake8 --black + run: docker run us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest python3 -m pytest push-job-ads-attribution-dap-collector: runs-on: ubuntu-latest From 16fab7ca0c46f6b21696b48a2ab577b7d859660e Mon Sep 17 00:00:00 2001 From: Glenda Leonard Date: Fri, 23 Jan 2026 09:05:02 -0500 Subject: [PATCH 8/8] Renamed schema classes --- .../ads_attribution_dap_collector/parse.py | 4 ++-- .../ads_attribution_dap_collector/schema.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py index b581e962..34d070d1 100644 --- a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/parse.py @@ -4,7 +4,7 @@ from google.cloud import storage import json from typing import Any -from .schema import ConfigModel +from .schema import JobConfig from pydantic import ValidationError from uuid import UUID @@ -87,7 +87,7 @@ def extract_advertisers_with_partners_and_ads( """ try: - cfg = ConfigModel.model_validate(raw_config) + cfg = JobConfig.model_validate(raw_config) except ValidationError as e: raise ValueError(f"Invalid config: {e}") from None diff --git a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py index 3d3c5e31..b4767548 100644 --- a/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py +++ b/jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/schema.py @@ -4,12 +4,12 @@ from uuid import UUID -class CollectionConfigModel(BaseModel): +class CollectionConfig(BaseModel): model_config = ConfigDict(extra="forbid") hpke_config: str = Field(min_length=1) -class AdvertiserModel(BaseModel): +class Advertiser(BaseModel): model_config = ConfigDict(extra="forbid") name: str = Field(min_length=1) @@ -20,7 +20,7 @@ class AdvertiserModel(BaseModel): lookback_window: int = Field(gt=0) -class PartnerModel(BaseModel): +class Partner(BaseModel): model_config = ConfigDict(extra="forbid") task_id: str = Field(min_length=32) @@ -31,14 +31,14 @@ class PartnerModel(BaseModel): default_measurement: int -class AdModel(BaseModel): +class Ad(BaseModel): model_config = ConfigDict(extra="forbid") partner_id: UUID index: int -class ConfigModel(BaseModel): +class JobConfig(BaseModel): """ Full config with validation. Note: ads keys are dynamic (source:id), so they remain a dict[str, AdModel]. @@ -46,7 +46,7 @@ class ConfigModel(BaseModel): model_config = ConfigDict(extra="forbid") - collection_config: CollectionConfigModel - advertisers: list[AdvertiserModel] - partners: dict[UUID, PartnerModel] - ads: dict[str, AdModel] + collection_config: CollectionConfig + advertisers: list[Advertiser] + partners: dict[UUID, Partner] + ads: dict[str, Ad]