epg_connector is a high-performance Erlang application that provides PostgreSQL connection pooling and logical replication support. It offers a robust foundation for building scalable database-driven applications with real-time data synchronization capabilities.
- Features
- Prerequisites
- Installation
- Configuration
- Usage
- Logical Replication Protocol
- Data Types Support
- Examples
- License
- 🚀 High-performance PostgreSQL connection pooling - Efficient connection management with configurable pool sizes
- 📡 PostgreSQL Logical Replication - Real-time data streaming using PostgreSQL's logical replication protocol
- 🔄 pgoutput Protocol Support - Built-in decoder for PostgreSQL's native logical replication output plugin
- 📊 Comprehensive Data Type Support - Handles all PostgreSQL data types including arrays, JSON, timestamps, and custom types
- ⚙️ Highly Configurable - Flexible configuration for pools, databases, and replication settings
- 🔌 Easy Integration - Simple API for existing Erlang/OTP applications
- 🛡️ Robust Error Handling - Comprehensive error handling and logging for production environments
Before you begin, ensure you have the following:
- Erlang/OTP 21 or later
- Rebar3 (build tool for Erlang)
- PostgreSQL 10+ with logical replication enabled
- Replication slot configured in PostgreSQL (for logical replication)
Add epg_connector to your rebar.config dependencies:
{deps, [
{epg_connector, {git, "https://github.com/your-repo/epg_connector.git", {tag, "1.0.0"}}}
]}.Then run:
$ rebar3 get-deps
$ rebar3 compileConfigure your databases and connection pools in your sys.config file:
{epg_connector, [
{databases, #{
main_db => #{
host => "127.0.0.1",
port => 5432,
database => "myapp_production",
username => "postgres",
password => "postgres"
},
read_db => #{
host => "replica.example.com",
port => 5432,
database => "myapp_production",
username => "readonly_user",
password => "readonly_pass"
}
}},
{pools, #{
main_pool => #{
database => main_db,
size => {20, 50}
},
readonly_pool => #{
database => read_db,
size => 10
}
}}
]}.- Start the application:
via *_app.src file
{applications, [kernel, stdlib, epgsql, epg_connector]}.-
Use connection pools in your code:
-module(user_service). -export([get_user/1, create_user/2]). get_user(UserId) -> epg_pool:with(readonly_pool, fun(Connection) -> Query = "SELECT id, name, email FROM users WHERE id = $1", case epgsql:equery(Connection, Query, [UserId]) of {ok, _Columns, [{Id, Name, Email}]} -> {ok, #{id => Id, name => Name, email => Email}}; {ok, _Columns, []} -> {error, not_found} end end). create_user(Name, Email) -> epg_pool:query( main_pool, "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id", [Name, Email] ).
-
Implement the replication callback behavior:
-module(user_replication_handler). -behaviour(epg_wal_reader). -export([handle_replication_data/2, handle_replication_stop/2]). handle_replication_data(_Ref, Changes) -> lists:foreach(fun process_change/1, Changes), ok. handle_replication_stop(_Ref, ReplicationSlot) -> logger:info("Replication stopped for slot: ~p", [ReplicationSlot]), ok. process_change({<<"users">>, insert, UserData, _}) -> logger:info("New user created: ~p", [UserData]); process_change({<<"users">>, update, UserData, _OldUserData}) -> logger:info("User updated: ~p", [UserData]); process_change({<<"users">>, delete, UserData, _}) -> logger:info("User deleted: ~p", [UserData]); process_change({TableName, Operation, Data, _OldData}) -> logger:info("Change in table ~s: ~p ~p", [TableName, Operation, Data]).
-
Start logical replication:
DbOpts = #{ host => "localhost", port => 5432, database => "myapp", username => "replication_user", password => "replication_pass", replication => "database" }, epg_wal_reader:subscribe( user_replication_handler, % Callback module DbOpts, % Database connection options "myapp_slot", % Replication slot name ["user_changes"], % Publications to subscribe to #{slot_type => temporary} % replication slot options (temporary | persistent) ).
epg_connector implements PostgreSQL's logical replication protocol with support for:
- pgoutput plugin - Native PostgreSQL logical replication output format
- Real-time streaming - Continuous WAL (Write-Ahead Log) data streaming
- Transaction boundaries - BEGIN/COMMIT message handling
- Schema information - Automatic relation metadata decoding
- Data type conversion - Automatic conversion of PostgreSQL types to Erlang terms
BEGIN- Transaction startCOMMIT- Transaction commitINSERT- Row insertionUPDATE- Row update (with old/new values)DELETE- Row deletionRELATION- Table schema informationTYPE- Custom type informationTRUNCATE- Table truncation
The connector supports all major PostgreSQL data types:
- Integers:
int2,int4,int8 - Floating Point:
float4,float8 - Text:
text,varchar,char,bpchar - Binary:
bytea - Boolean:
bool
- Date:
date - Time:
time,timetz - Timestamp:
timestamp,timestamptz - Interval:
interval
- JSON:
json,jsonb - UUID:
uuid - Arrays: All array types (e.g.,
int4[],text[],jsonb[][][]) - Network:
inet,cidr,macaddr - Geometric:
point - Range Types:
int4range,int8range,tsrange,tstzrange
Some PostgreSQL types are not automatically decoded and are returned as text representation (binary strings):
- Extended types:
cidr,inet,macaddr,macaddr8 - Geometric types:
point- returned as text (e.g.,<<"(1,2)">>) - Range types:
int4range,int8range,tsrange,tstzrange,daterange - Advanced types:
hstore,geometry,interval - Custom types: User-defined types and enums - returned as text
These types can be parsed manually in your application logic if needed.
-module(order_sync).
-behaviour(epg_wal_reader).
-export([start/0, handle_replication_data/2, handle_replication_stop/2]).
start() ->
DbOpts = #{
host => "production-db.example.com",
port => 5432,
database => "ecommerce",
username => "repl_user",
password => "secure_password",
replication => "database"
},
Options = #{slot_type => persistent},
epg_wal_reader:subscribe(
{?MODULE, self()},
DbOpts,
"order_replication_slot",
["order_events", "inventory_changes"],
Options
).
handle_replication_data(_Ref, Changes) ->
ProcessedChanges = lists:map(fun transform_change/1, Changes),
send_to_analytics_service(ProcessedChanges),
update_cache(ProcessedChanges),
ok.
handle_replication_stop(_Ref, SlotName) ->
logger:warning("Replication stopped for slot: ~s", [SlotName]),
% Implement reconnection logic here
ok.
transform_change({<<"orders">>, insert, OrderData, _}) ->
#{
event_type => order_created,
table => <<"orders">>,
order_id => maps:get(<<"id">>, OrderData),
customer_id => maps:get(<<"customer_id">>, OrderData),
amount => maps:get(<<"total_amount">>, OrderData),
timestamp => os:timestamp()
};
transform_change({<<"orders">>, update, OrderData, _OldOrderData}) ->
#{
event_type => order_updated,
table => <<"orders">>,
order_id => maps:get(<<"id">>, OrderData),
status => maps:get(<<"status">>, OrderData),
timestamp => os:timestamp()
};
transform_change({<<"inventory">>, Operation, Data, _OldData}) ->
#{
event_type => inventory_change,
table => <<"inventory">>,
operation => Operation,
data => Data,
timestamp => os:timestamp()
};
transform_change({TableName, Operation, Data, _OldData}) ->
#{
event_type => generic_change,
table => TableName,
operation => Operation,
data => Data,
timestamp => os:timestamp()
}.$ git clone https://github.com/your-repo/epg_connector.git
$ cd epg_connector
$ rebar3 get-deps
$ rebar3 compile
$ rebar3 ct # Run testsThis project is licensed under the MIT License - see the LICENSE file for details.