From e55f25422ae782c0ad26a750f5f1bc83d7ff13d8 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Tue, 30 Dec 2025 21:58:04 -0800 Subject: [PATCH 1/3] refactor: remove telegraf service and switch to direct influxdb write --- README.md | 21 ++--- dev-utils/ci/stack-smoke-test.sh | 1 - docs/containers/influxdb2-legacy-build.md | 4 +- docs/containers/influxdb3.md | 2 +- docs/containers/startup-data-loader.md | 7 +- docs/containers/telegraf.md | 23 ----- docs/docker-compose.md | 11 +-- installer/README.md | 6 +- installer/docker-compose.yml | 28 +----- installer/startup-data-loader/Dockerfile | 7 +- installer/startup-data-loader/README.md | 7 +- installer/startup-data-loader/load_data.py | 101 ++++++++------------- installer/telegraf.conf | 84 ----------------- 13 files changed, 68 insertions(+), 234 deletions(-) delete mode 100644 docs/containers/telegraf.md delete mode 100644 installer/telegraf.conf diff --git a/README.md b/README.md index c722b25..55a3338 100644 --- a/README.md +++ b/README.md @@ -34,18 +34,17 @@ All services share a bridge network named `datalink` and rely on the admin token ## System overview -The compose stack deploys nine cooperating containers: +The compose stack deploys eight cooperating containers: 1. **InfluxDB 3** – Time-series database seeded with a tiny example dataset. 2. **InfluxDB 3 Explorer** – Web UI for browsing and querying telemetry. -3. **Telegraf** – Reads the importer’s line protocol output and forwards metrics to InfluxDB. -4. **Grafana** – Pre-provisioned dashboards that visualise the stored telemetry. Load your own dashboard provisioning files into `installer/grafana/dashboards/`. -5. **Sandbox** - *Under active development.* Connecting InfluxDB3 with LLM for natural language queries and analysis. -6. **Slack bot d.b.a. Lappy** – Optional automation/notification bot for race ops. -7. **Lap analysis app** – *Under active development.* Dash-based location data visualiser and lap timer. (Useful if GPS data is available.) -8. **Startup data loader** – Seeds the database on boot with sample CAN frames. -9. **File uploader** – Streams uploaded CSV logs into InfluxDB using the shared DBC file. -10. **Data downloader** - Scans InfluxDB periodically, visual SQL query builder, and CSV export service. +3. **Grafana** – Pre-provisioned dashboards that visualise the stored telemetry. Load your own dashboard provisioning files into `installer/grafana/dashboards/`. +4. **Sandbox** - *Under active development.* Connecting InfluxDB3 with LLM for natural language queries and analysis. +5. **Slack bot d.b.a. Lappy** – Optional automation/notification bot for race ops. +6. **Lap analysis app** – *Under active development.* Dash-based location data visualiser and lap timer. (Useful if GPS data is available.) +7. **Startup data loader** – Seeds the database on boot with sample CAN frames. +8. **File uploader** – Streams uploaded CSV logs into InfluxDB using the shared DBC file. +9. **Data downloader** - Scans InfluxDB periodically, visual SQL query builder, and CSV export service. Detailed documentation for each service is available in `docs/containers/`. @@ -80,7 +79,7 @@ https://github.com/Western-Formula-Racing/daq-2023 We also want to acknowledge the open-source tools and libraries that make this project possible. Key components include: * Docker / Docker Compose for containerisation -* InfluxDB 3 and Telegraf for time-series storage and ingestion +* InfluxDB 3 for time-series storage * Grafana for visualisation * Python open-source packages (NumPy, Pandas, Requests, etc.) used throughout the stack @@ -97,4 +96,4 @@ If you’re interested in our team’s broader engineering projects, here are so ## License -AGPL-3.0 License. See LICENSE file for details. +AGPL-3.0 License. See LICENSE file for details. \ No newline at end of file diff --git a/dev-utils/ci/stack-smoke-test.sh b/dev-utils/ci/stack-smoke-test.sh index 00e012f..42f5dc4 100755 --- a/dev-utils/ci/stack-smoke-test.sh +++ b/dev-utils/ci/stack-smoke-test.sh @@ -57,7 +57,6 @@ trap 'cleanup $?' EXIT ENABLED_SERVICES=( influxdb3 influxdb3-explorer - telegraf grafana frontend lap-detector diff --git a/docs/containers/influxdb2-legacy-build.md b/docs/containers/influxdb2-legacy-build.md index 19e065d..87a01d7 100644 --- a/docs/containers/influxdb2-legacy-build.md +++ b/docs/containers/influxdb2-legacy-build.md @@ -17,4 +17,6 @@ docker run -d --name influxdb2 \ influxdb:2 ``` -For current deployments use the Docker Compose stack under `installer/`, which provisions InfluxDB 3 along with Grafana, Telegraf, and the rest of the telemetry tooling. +For current deployments use the Docker Compose stack under `installer/`, which provisions InfluxDB 3 along with Grafana and the rest of the telemetry tooling. + +``` \ No newline at end of file diff --git a/docs/containers/influxdb3.md b/docs/containers/influxdb3.md index d8300b5..a9fac23 100644 --- a/docs/containers/influxdb3.md +++ b/docs/containers/influxdb3.md @@ -31,4 +31,4 @@ Data is stored in the `influxdb3-data` Docker volume. Removing the volume (`dock ## Related services - **Startup data loader** seeds the bucket with the example dataset on first run. -- **Grafana**, **telegraf**, **file-uploader**, and **slackbot** authenticate using `INFLUXDB_ADMIN_TOKEN`. +- **Grafana**, **file-uploader**, and **slackbot** authenticate using `INFLUXDB_ADMIN_TOKEN`. \ No newline at end of file diff --git a/docs/containers/startup-data-loader.md b/docs/containers/startup-data-loader.md index 2b553db..cdf045e 100644 --- a/docs/containers/startup-data-loader.md +++ b/docs/containers/startup-data-loader.md @@ -6,15 +6,14 @@ The startup data loader seeds InfluxDB 3 with a small, deterministic dataset on - Loads CSV files from `/data` (mounted from `installer/startup-data-loader/data/`; copy `2024-01-01-00-00-00.csv.md` to a `.csv` file for the bundled sample). - Uses `example.dbc` to decode CAN frames into human-readable metrics. -- Generates line protocol for Telegraf and optionally writes directly to InfluxDB. +- Writes decoded metrics directly to InfluxDB. ## Environment variables | Variable | Description | Default | | --- | --- | --- | -| `INFLUXDB_TOKEN` | Token used for direct writes when `BACKFILL=1`. | `dev-influxdb-admin-token` | +| `INFLUXDB_TOKEN` | Token used for direct writes. | `dev-influxdb-admin-token` | | `INFLUXDB_URL` | Target InfluxDB endpoint. | `http://influxdb3:8181` | -| `BACKFILL` | Set to `1` to stream directly into InfluxDB; otherwise only emits line protocol. | `1` | ## Extending the dataset @@ -26,4 +25,4 @@ The startup data loader seeds InfluxDB 3 with a small, deterministic dataset on - Logs are available via `docker compose logs -f startup-data-loader`. - Progress is tracked in `/app/load_data_progress.json` inside the container. -- The importer supports resuming partially processed files; remove the progress file to force a clean run. +- The importer supports resuming partially processed files; remove the progress file to force a clean run. \ No newline at end of file diff --git a/docs/containers/telegraf.md b/docs/containers/telegraf.md deleted file mode 100644 index 4a32102..0000000 --- a/docs/containers/telegraf.md +++ /dev/null @@ -1,23 +0,0 @@ -# Telegraf - -Telegraf consumes the CAN metrics generated by the startup data loader and forwards them to InfluxDB 3 using line protocol. - -## Responsibilities - -- Watches `/var/lib/telegraf/can_metrics.out` for new line protocol files generated by the importer. -- Writes data to the `WFR25` bucket in InfluxDB 3. - -## Configuration - -| Variable | Description | Default | -| --- | --- | --- | -| `INFLUXDB_URL` | Target database URL. | `http://influxdb3:8181` | -| `INFLUXDB_TOKEN` | Token with write access to the target bucket. | `dev-influxdb-admin-token` | - -The configuration file lives at `installer/telegraf.conf` and supports environment variable substitution for the URL and token. - -## Debugging tips - -- Tail Telegraf logs: `docker compose logs -f telegraf`. -- Inspect generated line protocol: `docker compose exec telegraf tail -f /var/lib/telegraf/can_metrics.out`. -- Validate that the token grants write access by running `docker compose exec telegraf env | grep INFLUXDB`. diff --git a/docs/docker-compose.md b/docs/docker-compose.md index ede61c3..9a7772c 100644 --- a/docs/docker-compose.md +++ b/docs/docker-compose.md @@ -5,10 +5,10 @@ The `installer/docker-compose.yml` file orchestrates the complete DAQ telemetry ## High-level architecture ```text -┌────────────┐ ┌──────────────────┐ ┌────────────┐ -│ Startup │ │ Telegraf │ │ InfluxDB 3 │ -│ data loader├─────▶│ (line protocol) ├─────▶│ + Explorer │ -└────────────┘ └──────────────────┘ └────────────┘ +┌────────────┐ ┌────────────┐ +│ Startup │ │ InfluxDB 3 │ +│ data loader├───────────────────────────────▶│ + Explorer │ +└────────────┘ └────────────┘ │ │ │ ▼ │ ┌─────────────────────┐ @@ -29,7 +29,6 @@ All containers join the `datalink` bridge network, enabling them to communicate | `influxdb3-data` | `influxdb3` | Persists InfluxDB 3 metadata and stored telemetry. | | `influxdb3-explorer-db` | `influxdb3-explorer` | Keeps explorer UI preferences. | | `grafana-storage` | `grafana` | Stores dashboards, plugins, and Grafana state. | -| `telegraf-data` | `telegraf`, `startup-data-loader` | Shared folder for generated line protocol and importer progress. | Remove volumes with `docker compose down -v` if you need a clean slate. @@ -66,4 +65,4 @@ docker compose logs -f startup-data-loader docker compose exec influxdb3 /bin/sh ``` -For detailed service documentation, browse the files under [`docs/containers/`](containers/). +For detailed service documentation, browse the files under [`docs/containers/`](containers/). \ No newline at end of file diff --git a/installer/README.md b/installer/README.md index 4bb738d..44d785a 100644 --- a/installer/README.md +++ b/installer/README.md @@ -67,7 +67,6 @@ All secrets and tokens are defined in `.env`. The defaults provided in `.env.exa | `influxdb3` | `9000` (mapped to `8181` internally) | Core time-series database. Initialised with the admin token from `.env`. | | `influxdb3-explorer` | `8888` | Lightweight UI for browsing data in InfluxDB 3. | | `data-downloader` | `3000` | Periodically downloads CAN CSV archives from the DAQ server. Visual SQL query builder included. | -| `telegraf` | n/a | Collects CAN metrics produced by the importer and forwards them to InfluxDB. | | `grafana` | `8087` | Visualises telemetry with pre-provisioned dashboards. | | `slackbot` | n/a | Socket-mode Slack bot for notifications and automation (optional). | | `lap-detector` | `8050` | Dash-based lap analysis web application. | @@ -82,16 +81,15 @@ All secrets and tokens are defined in `.env`. The defaults provided in `.env.exa ## Observability - Grafana dashboards are provisioned automatically from `grafana/dashboards/` and use the datasource in `grafana/provisioning/datasources/`. -- Telegraf writes processed metrics to `/var/lib/telegraf/can_metrics.out` before forwarding them to InfluxDB. Inspect this file inside the container for debugging (`docker compose exec telegraf tail -f /var/lib/telegraf/can_metrics.out`). ## Troubleshooting tips - **Service fails to connect to InfluxDB** – Confirm the token in `.env` matches `influxdb3-admin-token.json`. Regenerate the volumes with `docker compose down -v` if you rotate credentials. -- **Re-import sample data** – Remove the `telegraf-data` volume and rerun the stack. +- **Re-import sample data** – Run `docker compose down -v` and restart the stack to re-trigger the data loader. - **Slack services are optional** – Leave Slack variables empty or set `ENABLE_SLACK=false` to skip starting the bot during development. ## Next steps - Replace the example dataset and `example.dbc` file with production equivalents once you are ready to ingest real telemetry. - Update the Grafana dashboards under `grafana/dashboards/` to match your data model. -- Review each service’s README in its respective directory for implementation details. +- Review each service’s README in its respective directory for implementation details. \ No newline at end of file diff --git a/installer/docker-compose.yml b/installer/docker-compose.yml index a72f412..1242fac 100644 --- a/installer/docker-compose.yml +++ b/installer/docker-compose.yml @@ -6,7 +6,6 @@ volumes: grafana-storage: influxdb3-data: influxdb3-explorer-db: - telegraf-data: services: influxdb3: @@ -57,24 +56,6 @@ services: depends_on: - influxdb3 - telegraf: - image: telegraf:1.30 - container_name: telegraf - restart: unless-stopped - depends_on: - - influxdb3 - environment: - INFLUXDB_TOKEN: "${INFLUXDB_ADMIN_TOKEN:-apiv3_dev-influxdb-admin-token}" - INFLUXDB_URL: "${INFLUXDB_URL:-http://influxdb3:8181}" - volumes: - - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro - - telegraf-data:/var/lib/telegraf - networks: - - datalink - command: > - sh -c "touch /var/lib/telegraf/can_metrics.out && - telegraf --config /etc/telegraf/telegraf.conf" - grafana: image: grafana/grafana container_name: grafana @@ -148,7 +129,7 @@ services: environment: INFLUXDB_TOKEN: "${INFLUXDB_ADMIN_TOKEN:-apiv3_dev-influxdb-admin-token}" INFLUXDB_URL: "${INFLUXDB_URL:-http://influxdb3:8181}" - BACKFILL: "1" + # Removed BACKFILL env var as it's now the default/only mode CSV_RESTART_INTERVAL: "${CSV_RESTART_INTERVAL:-10}" # Fixed container path, host path remains flexible DBC_FILE_PATH: "/installer/example.dbc" @@ -156,7 +137,6 @@ services: # Host can name DBC file anything they want - ./${DBC_FILE_PATH:-example.dbc}:/installer/example.dbc:ro - ./startup-data-loader/data:/data:ro - - telegraf-data:/var/lib/telegraf working_dir: /app networks: - datalink @@ -165,9 +145,7 @@ services: command: > sh -c "echo 'Waiting additional 5 seconds for InfluxDB 3 to stabilize...' && sleep 5 && - echo 'Creating telegraf input file...' && - python load_data.py && - echo 'Telegraf input file created successfully'" + python load_data.py" file-uploader: @@ -245,4 +223,4 @@ services: - datalink depends_on: data-downloader-api: - condition: service_started + condition: service_started \ No newline at end of file diff --git a/installer/startup-data-loader/Dockerfile b/installer/startup-data-loader/Dockerfile index 71f9d44..ddedc88 100644 --- a/installer/startup-data-loader/Dockerfile +++ b/installer/startup-data-loader/Dockerfile @@ -15,11 +15,8 @@ RUN pip install --no-cache-dir -r requirements.txt # Keep the build context small by only copying the files needed to run the loader. COPY load_data.py requirements.txt ./ -# Create Telegraf output directory -RUN mkdir -p /var/lib/telegraf - # Make the script executable RUN chmod +x load_data.py -# Run the Telegraf/direct-write loader -CMD ["python", "load_data.py"] +# Run the direct-write loader +CMD ["python", "load_data.py"] \ No newline at end of file diff --git a/installer/startup-data-loader/README.md b/installer/startup-data-loader/README.md index e4ce934..3ccef81 100644 --- a/installer/startup-data-loader/README.md +++ b/installer/startup-data-loader/README.md @@ -7,7 +7,7 @@ This container pre-loads InfluxDB 3 with small CAN datasets whenever the compose 1. Waits for InfluxDB 3 to pass its health check. 2. Reads CSV files from the mounted `/data` directory (copy `2024-01-01-00-00-00.csv.md` to `2024-01-01-00-00-00.csv` for the sample dataset). 3. Uses the shared `/installer/example.dbc` file (or the path specified by `DBC_FILE_PATH`) to decode each CAN frame into human-readable signals. -4. Writes the decoded metrics to InfluxDB 3 (`WFR25` bucket, `WFR` organisation) and emits line protocol for Telegraf. +4. Writes the decoded metrics directly to InfluxDB 3 (`WFR25` bucket, `WFR` organisation). 5. Exits once all files finish processing. ## CSV format @@ -22,9 +22,8 @@ relative_ms,protocol,can_id,byte0,byte1,byte2,byte3,byte4,byte5,byte6,byte7 | Variable | Description | | --- | --- | -| `INFLUXDB_TOKEN` | Token used for direct writes when `BACKFILL=1` (injected from `.env`). | +| `INFLUXDB_TOKEN` | Token used for direct writes (injected from `.env`). | | `INFLUXDB_URL` | URL for the InfluxDB 3 instance (defaults to `http://influxdb3:8181`). | -| `BACKFILL` | Set to `1` to stream directly into InfluxDB; set to `0` to only generate line protocol for Telegraf. | | `CSV_RESTART_INTERVAL` | Number of CSV files to process before the loader re-execs itself (defaults to `10`; set to `0` to disable). | ## Adding real data @@ -41,4 +40,4 @@ Check progress with: docker compose logs -f startup-data-loader ``` -The loader also tracks its state in `load_data_progress.json` inside the container so that it can resume large imports after interruptions. +The loader also tracks its state in `load_data_progress.json` inside the container so that it can resume large imports after interruptions. \ No newline at end of file diff --git a/installer/startup-data-loader/load_data.py b/installer/startup-data-loader/load_data.py index c8c5023..b092b2b 100644 --- a/installer/startup-data-loader/load_data.py +++ b/installer/startup-data-loader/load_data.py @@ -1,18 +1,17 @@ #!/usr/bin/env python3 -# To run locally: BACKFILL=1 python3 load_data.py +# To run locally: python3 load_data.py # Options # Add 5 second pause between files # INTER_FILE_DELAY=5 python3 load_data.py # Combine both delays -# BACKFILL=1 BATCH_DELAY=0.05 INTER_FILE_DELAY=10 python3 load_data.py +# BATCH_DELAY=0.05 INTER_FILE_DELAY=10 python3 load_data.py """ WFR DAQ System - Startup Data Loader -- Default: writes metrics in InfluxDB line protocol format to a Telegraf file -- BACKFILL=1: writes directly to InfluxDB (fast bulk load) +- Writes metrics directly to InfluxDB (fast bulk load) - Supports resume after interrupt using JSON state file """ @@ -44,9 +43,6 @@ def _env_int(var_name: str, default: int) -> int: print(f"⚠️ Invalid value '{raw_value}' for {var_name}; using default {default}") return default -# Output file - use local file if telegraf directory doesn't exist -OUTPUT_FILE = "can_metrics.out" if not os.path.exists("/var/lib/telegraf") else "/var/lib/telegraf/can_metrics.out" - # Progress state file PROGRESS_FILE = "load_data_progress.json" # InfluxDB direct write config @@ -58,9 +54,6 @@ def _env_int(var_name: str, default: int) -> int: DBC_FILENAME = "example.dbc" INSTALLER_ROOT = Path(__file__).resolve().parent.parent -# Mode switch -BACKFILL_MODE = os.getenv("BACKFILL", "0") == "1" - # Performance tuning - delays to reduce server load BATCH_DELAY = float(os.getenv("BATCH_DELAY", "0.0")) # Delay after each batch write (seconds) INTER_FILE_DELAY = float(os.getenv("INTER_FILE_DELAY", "0.0")) # Delay between files (seconds) @@ -207,9 +200,8 @@ def _resolve_dbc_path() -> Path: class CANLineProtocolWriter: - def __init__(self, output_path: str, batch_size: int = 1000, progress_state: Optional[ProgressState] = None): + def __init__(self, batch_size: int = 1000, progress_state: Optional[ProgressState] = None): self.batch_size = batch_size - self.output_path = output_path self.org = "WFR" self.tz_toronto = ZoneInfo("America/Toronto") self.progress_state = progress_state or ProgressState.load() @@ -219,38 +211,31 @@ def __init__(self, output_path: str, batch_size: int = 1000, progress_state: Opt self.db = cantools.database.load_file(str(dbc_path)) print(f"📁 Loaded DBC file: {dbc_path}") - # Influx client setup (only if in backfill mode) - if BACKFILL_MODE: - # Adjust batch size and flush interval based on BATCH_DELAY - influx_batch_size = 50000 - influx_flush_interval = 10_000 - - if BATCH_DELAY > 0: - # If user adds delay, we can use larger batches - influx_batch_size = min(100000, int(50000 * (1 + BATCH_DELAY))) - influx_flush_interval = min(30_000, int(10_000 * (1 + BATCH_DELAY))) - - self.client = InfluxDBClient( - url=INFLUX_URL, - token=INFLUX_TOKEN, - org=INFLUX_ORG - ) - self.write_api = self.client.write_api( - write_options=WriteOptions( - batch_size=influx_batch_size, - flush_interval=influx_flush_interval, - jitter_interval=2000, - retry_interval=5000 - ) + # Influx client setup + # Adjust batch size and flush interval based on BATCH_DELAY + influx_batch_size = 50000 + influx_flush_interval = 10_000 + + if BATCH_DELAY > 0: + # If user adds delay, we can use larger batches + influx_batch_size = min(100000, int(50000 * (1 + BATCH_DELAY))) + influx_flush_interval = min(30_000, int(10_000 * (1 + BATCH_DELAY))) + + self.client = InfluxDBClient( + url=INFLUX_URL, + token=INFLUX_TOKEN, + org=INFLUX_ORG + ) + self.write_api = self.client.write_api( + write_options=WriteOptions( + batch_size=influx_batch_size, + flush_interval=influx_flush_interval, + jitter_interval=2000, + retry_interval=5000 ) - print(f"⚙️ InfluxDB batch_size={influx_batch_size}, flush_interval={influx_flush_interval}ms") - else: - # Clear or create output file for Telegraf only if starting fresh - if not self.progress_state.completed_files and not self.progress_state.file_progress: - with open(self.output_path, "w") as f: - pass - self.client = None - self.write_api = None + ) + print(f"⚙️ InfluxDB batch_size={influx_batch_size}, flush_interval={influx_flush_interval}ms") + def count_total_messages(self, file: IO[bytes], is_csv: bool = True) -> int: total = 0 @@ -283,7 +268,7 @@ def _escape_tag_value(self, val: str) -> str: def _format_line_protocol(self, measurement: str, tags: dict, fields: dict, timestamp: int) -> str: tags_str = ",".join(f"{self._escape_tag_value(k)}={self._escape_tag_value(v)}" for k, v in tags.items()) fields_str = ",".join( - f"{self._escape_tag_value(k)}={v}" if isinstance(v, (int, float)) else f'{self._escape_tag_value(k)}="{v}"' + f"{self._escape_tag_value(k)}={v}" if isinstance(v, (int, float)) else f'{self._escape_tag_value(k)}"{v}"' for k, v in fields.items()) line = f"{measurement},{tags_str} {fields_str} {timestamp}" return line @@ -383,11 +368,7 @@ async def stream_csv(self, file: IO[bytes], csv_path: str, csv_filename: str, current_row += 1 if len(batch_lines) >= self.batch_size: - if BACKFILL_MODE: - self.write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=batch_lines) - else: - with open(self.output_path, "a") as out_file: - out_file.write("\n".join(batch_lines) + "\n") + self.write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=batch_lines) file_progress.processed_rows = current_row file_progress.last_update = time.time() @@ -405,16 +386,10 @@ async def stream_csv(self, file: IO[bytes], csv_path: str, csv_filename: str, # Configurable delay to reduce server load if BATCH_DELAY > 0: await asyncio.sleep(BATCH_DELAY) - elif not BACKFILL_MODE: - await asyncio.sleep(0.1) # let Telegraf catch up # Write remaining batch if batch_lines: - if BACKFILL_MODE: - self.write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=batch_lines) - else: - with open(self.output_path, "a") as out_file: - out_file.write("\n".join(batch_lines) + "\n") + self.write_api.write(bucket=INFLUX_BUCKET, org=INFLUX_ORG, record=batch_lines) file_progress.processed_rows = current_row if on_progress: on_progress(current_row, total_rows) @@ -431,8 +406,7 @@ async def stream_csv(self, file: IO[bytes], csv_path: str, csv_filename: str, except: pass - mode_str = "InfluxDB Direct" if BACKFILL_MODE else "Telegraf File" - print(f"\n✅ Processed {current_row:,} rows using {mode_str} mode") + print(f"\n✅ Processed {current_row:,} rows using InfluxDB Direct mode") def make_progress_callback(file_path: str): @@ -446,8 +420,7 @@ def callback(processed: int, total: int): async def load_startup_data(): - mode_str = "InfluxDB Direct (BACKFILL)" if BACKFILL_MODE else "Telegraf File" - print(f"🚀 WFR DAQ System - Startup Data Loader [{mode_str}]") + print(f"🚀 WFR DAQ System - Startup Data Loader [InfluxDB Direct]") print("=" * 60) restart_interval = CSV_RESTART_INTERVAL if CSV_RESTART_INTERVAL > 0 else None if restart_interval: @@ -495,7 +468,6 @@ async def load_startup_data(): try: writer = CANLineProtocolWriter( - output_path=OUTPUT_FILE, batch_size=1000, progress_state=progress_state ) @@ -564,12 +536,11 @@ def main(): try: success = asyncio.run(load_startup_data()) elapsed = time.time() - start_time - mode_str = "InfluxDB Direct" if BACKFILL_MODE else "Telegraf Loader" if success: - print(f"\n🏁 Data writing completed in {elapsed:.2f} seconds ({mode_str})") + print(f"\n🏁 Data writing completed in {elapsed:.2f} seconds (InfluxDB Direct)") sys.exit(0) else: - print(f"\n💥 Data writing failed after {elapsed:.2f} seconds ({mode_str})") + print(f"\n💥 Data writing failed after {elapsed:.2f} seconds (InfluxDB Direct)") sys.exit(1) except KeyboardInterrupt: @@ -582,4 +553,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/installer/telegraf.conf b/installer/telegraf.conf deleted file mode 100644 index 051179f..0000000 --- a/installer/telegraf.conf +++ /dev/null @@ -1,84 +0,0 @@ - -# Global Agent Configuration -[agent] - ## Don't add hostname as a tag (saves storage space) - omit_hostname = true - - ## Default data collection interval for all inputs - interval = "10s" - ## Rounds collection interval to 'interval' - round_interval = true - ## Telegraf will send metrics to outputs in batches of at most - ## metric_batch_size metrics. - - # WE NEED TO TEST THESE METRICS FOR LOCAL AND AWS - - metric_batch_size = 10000 - ## Maximum number of unwritten metrics per output. - metric_buffer_limit = 100000000 - ## Collection jitter is used to jitter the collection by a random amount. - collection_jitter = "0s" - ## Default flushing interval for all outputs. - flush_interval = "2s" - ## Jitter the flush interval by a random amount. - flush_jitter = "0s" - -[[inputs.file]] - files = ["/var/lib/telegraf/can_metrics.out"] - data_format = "influx" - -# Accept metrics over InfluxDB 2.x HTTP API -[[inputs.influxdb_v2_listener]] - ## Address and port to host InfluxDB listener on - ## (Double check the port. Could be 9999 if using OSS Beta) - service_address = ":8181" - - ## Maximum undelivered metrics before rate limit kicks in. - ## When the rate limit kicks in, HTTP status 429 will be returned. - ## 0 disables rate limiting - # max_undelivered_metrics = 0 - - ## Maximum duration before timing out read of the request - # read_timeout = "10s" - ## Maximum duration before timing out write of the response - # write_timeout = "10s" - - ## Maximum allowed HTTP request body size in bytes. - ## 0 means to use the default of 32MiB. - # max_body_size = "32MiB" - - ## Optional tag to determine the bucket. - ## If the write has a bucket in the query string then it will be kept in this tag name. - ## This tag can be used in downstream outputs. - ## The default value of nothing means it will be off and the database will not be recorded. - # bucket_tag = "" - - ## Set one or more allowed client CA certificate file names to - ## enable mutually authenticated TLS connections - # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] - - ## Add service certificate and key - # tls_cert = "/etc/telegraf/cert.pem" - # tls_key = "/etc/telegraf/key.pem" - - ## Optional token to accept for HTTP authentication. - ## You probably want to make sure you have TLS configured above for this. - # token = "some-long-shared-secret-token" - - ## Influx line protocol parser - ## 'internal' is the default. 'upstream' is a newer parser that is faster - ## and more memory efficient. - # parser_type = "internal" - -[[outputs.influxdb_v2]] -urls = ["${INFLUXDB_URL}"] -token = "${INFLUXDB_TOKEN}" -organization = "WFR" -bucket = "WFR25" -## Timeout for HTTP writes -timeout = "30s" -## HTTP Content-Encoding for write request body, can be set to "gzip" to -## compress body or "identity" to apply no encoding. -content_encoding = "gzip" -## Enable or disable uint support for writing uints influxdb 2.0 -influx_uint_support = true \ No newline at end of file From 5516fdc2f527ff2e4f6e6e27a37050183b64c781 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Wed, 31 Dec 2025 14:30:56 -0800 Subject: [PATCH 2/3] Update installer/startup-data-loader/load_data.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- installer/startup-data-loader/load_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/installer/startup-data-loader/load_data.py b/installer/startup-data-loader/load_data.py index b092b2b..7d8e762 100644 --- a/installer/startup-data-loader/load_data.py +++ b/installer/startup-data-loader/load_data.py @@ -268,7 +268,7 @@ def _escape_tag_value(self, val: str) -> str: def _format_line_protocol(self, measurement: str, tags: dict, fields: dict, timestamp: int) -> str: tags_str = ",".join(f"{self._escape_tag_value(k)}={self._escape_tag_value(v)}" for k, v in tags.items()) fields_str = ",".join( - f"{self._escape_tag_value(k)}={v}" if isinstance(v, (int, float)) else f'{self._escape_tag_value(k)}"{v}"' + f"{self._escape_tag_value(k)}={v}" if isinstance(v, (int, float)) else f'{self._escape_tag_value(k)}="{v}"' for k, v in fields.items()) line = f"{measurement},{tags_str} {fields_str} {timestamp}" return line From 430cee678e393e15ca1cdb3d278d9094c9d0d253 Mon Sep 17 00:00:00 2001 From: Haorui Zhou Date: Wed, 31 Dec 2025 14:33:49 -0800 Subject: [PATCH 3/3] Fix formatting in InfluxDB legacy build docs --- docs/containers/influxdb2-legacy-build.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/containers/influxdb2-legacy-build.md b/docs/containers/influxdb2-legacy-build.md index 87a01d7..756a6ac 100644 --- a/docs/containers/influxdb2-legacy-build.md +++ b/docs/containers/influxdb2-legacy-build.md @@ -17,6 +17,4 @@ docker run -d --name influxdb2 \ influxdb:2 ``` -For current deployments use the Docker Compose stack under `installer/`, which provisions InfluxDB 3 along with Grafana and the rest of the telemetry tooling. - -``` \ No newline at end of file +For current deployments use the Docker Compose stack under `installer/`, which provisions InfluxDB 3 along with Grafana and the rest of the telemetry tooling. \ No newline at end of file