Real-time L2 orderbook data ingestion for Kalshi and Polymarket. Captures every tick, compresses 20x, enables timestamp-accurate backtesting.
See Developer Environment for setup details.
WebSocket → Normalize → Kafka → Compress → Parquet → Query API (1ms) (1ms) (5ms) (async) (disk) (50ms)
WebSocket connectors subscribe to Kalshi/Polymarket orderbook feeds. Every orderbook change (insert/update/delete) captured with nanosecond timestamps.
Exchange-specific formats → unified schema. Validate price/quantity, detect gaps, assign event IDs.
Write to Kafka (persistent queue). If storage crashes, no data loss. Kafka retains 7 days.
- Full snapshot every
xminutes - Deltas (changes only) between snapshots
- Delta encoding + Zstandard + Parquet columnar
- Result: 95% size reduction (20x compression)
Partitioned Parquet files: /data/{exchange}/date={YYYY-MM-DD}/market={id}/
SQLite index tracks snapshot locations and sequence ranges.
To get orderbook at timestamp T:
- Load nearest snapshot before T
- Apply deltas chronologically until T
- Return exact orderbook state
- Performance: 50-100ms per query
Modular Monolith - Single process, clean boundaries, can extract later if needed.
Components:
- Connectors: Kalshi and Polymarket WebSocket clients
- Normalization: Parser and Validator for unified schema
- Storage: Orderbook State manager, Snapshots, and Deltas
src/connectors/- Kalshi/Polymarket WebSocketsrc/normalization/- Parse & validatesrc/compression/- Snapshot + delta logicsrc/storage/- Parquet writer + indexsrc/reconstruction/- Query & replaysrc/main.py- Entry pointtests/- Unit + integration testsscripts/- Ops tools (gap recovery, etc)config/- YAML configurationdata/- Local storage (gitignored)
- Week 1: Both connectors + Kafka + raw storage
- Week 2: Compression (snapshot+delta) + Parquet
- Week 3: Reconstruction + Query API
- Week 4: Monitoring + validation + production ready
events_per_second: Should be consistent (100-1000/sec)ingestion_latency_p99: Should be < 100mssequence_gaps_detected: Should be 0compression_ratio: Should be > 10xstorage_growth_gb_per_day: Should be < 10 GB
# Get orderbook at specific timestamp
GET /api/v1/orderbook/{exchange}/{market}?timestamp={unix_ns}
# Stream for backtesting
POST /api/v1/backtest/stream
{
"market_id": "PRES2028",
"start": 1728700000000000000,
"end": 1728800000000000000
}
Design Decisions
ChoiceWhyMonolithLatency critical, small team, can split laterKafkaPersistent queue, survives crashes, replay capabilitySnapshot+DeltaBalance compression vs reconstruction speedParquetColumnar = best compression + fast queriesNo RedisNot needed - backtesting queries are uniqueNo cachingQueries never repeat (different timestamps)We use anaconda for our python runtime env. Download the distribution for your platform. The distributions are also available to download for linux for use in developer runtimes and virtual machines. Do not use conda for production, use a global env for that. Distribution download page here: https://www.anaconda.com/download/success
Set up the conda environment for this project by running the following command from the root of the project:
conda env create -f environment.yml
# pm is the name of the environment
conda activate pmWhen adding a new dependency—whether via Conda or pip—be sure to update the appropriate section in environment.yml.