This project implements a Change Data Capture (CDC) pipeline in Go that:
- Reads real-time changes (INSERT, UPDATE, DELETE) from PostgreSQL via logical replication.
- Streams these changes into Kafka.
- Consumes Kafka topics and upserts events into Elasticsearch.
- Provides snapshot/backfill functionality to seed initial data.
- Persists replication state for crash-resilience.
- Offers logging via SQLite.
- Visualizes changes in Kibana.
- Prerequisites
- Setup
- Project Structure
- Components
- Running the Services
- Kibana Dashboard
- Further Enhancements
- Go >= 1.20
- Docker & Docker Compose
- Internet access to pull Docker images
git clone https://github.com/yourusername/go-cdc-service.git
cd go-cdc-serviceEdit config.yaml:
postgres:
host: localhost
port: 5432
user: cdc_user
password: secret
dbname: mydb
slot_name: my_slot
publication_name: my_publication
tables:
- public.users
kafka:
brokers:
- localhost:9092
topic: cdc-events
groupID: cdc-es-consumer
elasticsearch:
addresses:
- http://localhost:9200
username: elastic
password: youelasticsearchpassword
indexPrefix: cdc_Start PostgreSQL, Kafka, Elasticsearch, and Kibana:
cd Docker_Files/es-kibana
docker-compose up -dcd Docker_Files/postgres-kafka
docker-compose up -d- PostgreSQL: logical replication enabled.
- Kafka & Zookeeper.
- Elasticsearch single-node (×pack security enabled).
- Kibana (for exploration).
.
├── cmd
│ ├── main.go # CDC service entrypoint
│ └── es_consumer # ES consumer CLI
├── config.yaml # Configuration file
├── internal
│ ├── wal # Replicator & snapshot logic
│ ├── kafka # Kafka producer
│ ├── es # Elasticsearch client
│ ├── store # BoltDB LSN store
│ └── pkg
│ └── logger # File & SQLite logger
└── go.mod
- Connects to Postgres via
pgconn. - Creates publication & replication slot.
- Streams WAL with
pglogrepl. - Parses
XLogData→ logical messages. - Uses
snapshotTablesto backfill initial data.
- Uses
segmentio/kafka-go. - Publishes JSON events with composite key (
table:pk).
- Reads configured tables via
SELECT *. - Emits
"snapshot"events to Kafka. - Captures current WAL LSN to avoid gaps.
- Persists last processed LSN on each standby update.
- Resumes replication exactly where left off.
- Custom
zapcore writes to console & SQLite. - Stores logs in
logstable for auditing.
- Reads from Kafka consumer group.
- Upserts events into Elasticsearch via REST.
- Uses URL-escaped LSN as document ID.
- Configurable index prefix (
cdc_schema_table).
-
Start Docker:
docker-compose up -d
-
Run CDC Service:
go run cmd/main.go
-
Run ES Consumer:
go run cmd/es_consumer/main.go
- Access Kibana: http://localhost:5601
- Create Index Pattern:
cdc_*. - Discover events and build visualizations.
- Compose dashboards (e.g. inserts over time).
- Use Avro/Protobuf & Schema Registry.
- Implement exactly-once with Kafka transactions.
- Package as Kubernetes Operator.
- Add alerting & monitoring (Prometheus, Grafana).
- Support additional sinks (Redis, search, data warehouse).