Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions .github/workflows/job-ads-attribution-dap-collector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
###
# This file was generated by docker-etl/ci_config.py.
# Changes should be made to job ci_job.yaml files and re-generated.
###

name: ads-attribution-dap-collector

on:
push:
branches:
- main
paths:
- 'jobs/ads-attribution-dap-collector/**'
- '.github/workflows/job-ads-attribution-dap-collector.yml'
pull_request:
paths:
- 'jobs/ads-attribution-dap-collector/**'
- '.github/workflows/job-ads-attribution-dap-collector.yml'

jobs:
build-job-ads-attribution-dap-collector:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Build the Docker image
# yamllint disable
run: |
docker build jobs/ads-attribution-dap-collector -t us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest
# yamllint enable
- name: Test Code
run: docker run us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest python3 -m pytest

push-job-ads-attribution-dap-collector:
runs-on: ubuntu-latest
needs: build-job-ads-attribution-dap-collector
if: github.ref == 'refs/heads/main'
steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_CREDENTIALS }}

- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v2

- name: Configure Docker for GCR
run: gcloud auth configure-docker

- name: Build Docker image
run: docker build jobs/ads-attribution-dap-collector/ -t gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest

- name: Push to GCR
run: docker push gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest
7 changes: 7 additions & 0 deletions jobs/ads-attribution-dap-collector/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.ci_job.yaml
.ci_workflow.yaml
.DS_Store
*.pyc
.pytest_cache/
__pycache__/
venv/
2 changes: 2 additions & 0 deletions jobs/ads-attribution-dap-collector/.flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 88
4 changes: 4 additions & 0 deletions jobs/ads-attribution-dap-collector/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.DS_Store
*.pyc
__pycache__/
venv/
42 changes: 42 additions & 0 deletions jobs/ads-attribution-dap-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
FROM python:3.12
LABEL maintainer="Glenda Leonard <gleonard@mozilla.com>"
ARG HOME="/janus_build"
WORKDIR ${HOME}

RUN apt update && apt --yes install curl

RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
ENV PATH=$HOME/.cargo/bin:$PATH

# build the CLI tool
RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69'
RUN cd janus && cargo build -r -p janus_tools --bin collect

######### next stage

FROM python:3.12
LABEL maintainer="Glenda Leonard <gleonard@mozilla.com>"
# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md
ARG USER_ID="10001"
ARG GROUP_ID="app"
ARG HOME="/app"
ENV HOME=${HOME}

RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \
useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID}

WORKDIR ${HOME}
COPY --from=0 /janus_build/janus/target/release/collect ./

RUN pip install --upgrade pip

COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY . .

RUN pip install .

# Drop root and change ownership of the application folder to the user
RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME}
USER ${USER_ID}
47 changes: 47 additions & 0 deletions jobs/ads-attribution-dap-collector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Ads Attribution DAP Collection Job

This job collects metrics from DAP and write the results to BigQuery.

## Overview
This job is driven by a config file from a GCS bucket. Use `job_config_gcp_project`
and `job_config_bucket` to specify the file. The config file must be named
`attribution-conf.json` and a sample is available [here](https://github.com/mozilla-services/mars/tree/main/internal/gcp/storage/testdata/mars-attribution-config).

## Usage

This script is intended to be run in a docker container.
Build the docker image with:

It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it
starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it
also documents those variables.

Once the environment variables are set up, run the job with:


```sh
./dev_run_docker.sh
```
To just build the docker image, use:
```
docker build -t ads-attribution-dap-collector .
```

## Testing

First create the job venv using
```
python -m venv ./venv
source ./venv/bin/activat
pip install -r requirements.txt
```
Run tests from `/jobs/ads-attribution-dap-collector` using:
`python -m pytest`

## Linting and Formatting
```
black .
```
```
flake8 .
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import ast
import logging
import subprocess
import re

from datetime import date, datetime, timedelta

DAP_LEADER = "https://dap-09-3.api.divviup.org"
VDAF = "histogram"
PROCESS_TIMEOUT = 1200 # 20 mins


def get_aggregated_results(
process_date: date,
batch_start: date,
batch_end: date,
task_id: str,
vdaf_length: int,
collector_duration: int,
bearer_token: str,
hpke_config: str,
hpke_private_key: str,
) -> dict:
process_batch = _should_collect_batch(process_date, batch_end)

if process_batch:
# Step 4 Collect DAP results.
aggregated_results = collect_dap_result(
task_id=task_id,
vdaf_length=vdaf_length,
batch_start=batch_start,
duration=collector_duration,
bearer_token=bearer_token,
hpke_config=hpke_config,
hpke_private_key=hpke_private_key,
)

return aggregated_results


def current_batch_start(
process_date: date, partner_start_date: date, duration: int
) -> date | None:
if process_date < partner_start_date:
return None

if (
partner_start_date
<= process_date
< partner_start_date + timedelta(seconds=duration)
):
return partner_start_date

# After the first interval ...
batch_start = partner_start_date
while True:
next_start = batch_start + timedelta(seconds=duration)
# check if the process_date is the batch_end date
# if yes we only need to go back 1 duration to get the start
if next_start + timedelta(days=-1) == process_date:
return next_start + timedelta(seconds=-duration)

# this means the process date is in the next interval so
# need to go back 2 durations to get the batch_start
if next_start > process_date:
return next_start + timedelta(seconds=-2 * duration)

batch_start = next_start


def current_batch_end(batch_start: date, duration: int) -> date:
# since the start and end dates are inclusive need to subtract 1 from duration
return batch_start + timedelta(seconds=duration, days=-1)


def _should_collect_batch(process_date, batch_end) -> bool:
return batch_end == process_date


def _correct_wraparound(num: int) -> int:
field_prime = 340282366920938462946865773367900766209
field_size = 128
cutoff = 2 ** (field_size - 1)
if num > cutoff:
return num - field_prime
return num


def _parse_histogram(histogram_str: str) -> dict:
parsed_list = ast.literal_eval(histogram_str)
return {i: _correct_wraparound(val) for i, val in enumerate(parsed_list)}


def _parse_http_error(text: str) -> tuple[int, str, str | None] | None:
"""
Returns (status_code, status_text, error_message)
or None if the pattern is not found.
"""
ERROR_RE = re.compile(
r"HTTP response status\s+(\d+)\s+([A-Za-z ]+)(?:\s+-\s+(.*))?$"
)
match = ERROR_RE.search(text)
if not match:
return None

status_code = int(match.group(1))
status_text = match.group(2).strip()
error_message = match.group(3).strip() if match.group(3) else None
return status_code, status_text, error_message


# DAP functions
def collect_dap_result(
task_id: str,
vdaf_length: int,
batch_start: date,
duration: int,
bearer_token: str,
hpke_config: str,
hpke_private_key: str,
) -> dict:
# Beware! This command string reveals secrets. Use logging only for
# debugging in local dev.

batch_start_epoch = int(
datetime.combine(batch_start, datetime.min.time()).timestamp()
)

try:
result = subprocess.run(
[
"./collect",
"--task-id",
task_id,
"--leader",
DAP_LEADER,
"--vdaf",
VDAF,
"--length",
f"{vdaf_length}",
"--authorization-bearer-token",
bearer_token,
"--batch-interval-start",
f"{batch_start_epoch}",
"--batch-interval-duration",
f"{duration}",
"--hpke-config",
hpke_config,
"--hpke-private-key",
hpke_private_key,
],
capture_output=True,
text=True,
check=True,
timeout=PROCESS_TIMEOUT,
)
for line in result.stdout.splitlines():
if line.startswith("Aggregation result:"):
entries = _parse_histogram(line[21:-1])
return entries
# Beware! Exceptions thrown by the subprocess reveal secrets.
# Log them and include traceback only for debugging in local dev.
except subprocess.CalledProcessError as e:
result = _parse_http_error(e.stderr)
if result is None:
logging.error(e)
raise Exception(
f"Collection failed for {task_id}, {e.returncode}, stderr: {e.stderr}"
) from None
else:
status_code, status_text, error_message = result
if status_code == 400:
logging.info(
f"Collection failed for {task_id}, {status_code} {status_text}"
f" {error_message}"
)
elif status_code == 404:
detail = (
error_message
if error_message is not None
else "Verify start date is not more than 14 days ago."
)
logging.info(
f"Collection failed for {task_id}, {status_code} {status_text} "
f"{detail}"
)
except subprocess.TimeoutExpired as e:
raise Exception(
f"Collection timed out for {task_id}, {e.timeout}, stderr: {e.stderr}"
) from None
Loading