From 1406a62a47f580235b62c52dde150ee603aaeff3 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Thu, 6 Nov 2025 09:50:49 +0300 Subject: [PATCH 1/2] rework wal handling for new progressors schema --- apps/cdc_progressor/src/cdc_progressor.erl | 6 ++--- config/sys.config | 30 +--------------------- 2 files changed, 3 insertions(+), 33 deletions(-) diff --git a/apps/cdc_progressor/src/cdc_progressor.erl b/apps/cdc_progressor/src/cdc_progressor.erl index 874eeda..725bc78 100644 --- a/apps/cdc_progressor/src/cdc_progressor.erl +++ b/apps/cdc_progressor/src/cdc_progressor.erl @@ -196,8 +196,6 @@ create_publication_if_not_exists(Connection, NsID) -> "CREATE PUBLICATION " ++ PubNameEscaped ++ " FOR TABLE " ++ ProcessesTable ++ " , " ++ EventsTable ), - %% TODO delete after rework progressor - {ok, _, _} = epgsql:equery(Connection, "ALTER TABLE " ++ ProcessesTable ++ " REPLICA IDENTITY FULL"), {ok, PubName} end. @@ -265,10 +263,10 @@ handle_processes_change(NsBin, Action, Row, PrevRow, StreamConfig) -> convert_process_change(NsBin, insert, #{<<"process_id">> := ProcessID}, _PrevRow) -> cdc_prg_converter:convert_lifecycle_event(NsBin, ProcessID, init); -convert_process_change(NsBin, update, Row, PrevRow) -> +convert_process_change(NsBin, update, Row, _PrevRow) -> #{<<"process_id">> := ProcessID} = Row, CurrentStatus = maps:get(<<"status">>, Row, undefined), - PreviousStatus = maps:get(<<"status">>, PrevRow, undefined), + PreviousStatus = maps:get(<<"previous_status">>, Row, undefined), case {PreviousStatus, CurrentStatus} of {<<"running">>, <<"error">>} -> diff --git a/config/sys.config b/config/sys.config index 1f09cc6..f34a4d1 100644 --- a/config/sys.config +++ b/config/sys.config @@ -23,7 +23,7 @@ %} progressor_db => #{ "cdc_slot_default" => #{ - 'default/default' => #{ + 'namespace/subspace' => #{ kafka_client => default_kafka_client, eventsink_topic => <<"cdc_eventsink_topic">>, lifecycle_topic => <<"cdc_lifecycle_topic">> @@ -33,28 +33,6 @@ }} ]}, - {progressor, [ - {namespaces, #{ - 'default/default' => #{ - storage => #{ - client => prg_pg_backend, - options => #{pool => default_pool} - }, - processor => #{ - client => prg_echo_processor, - options => #{} - }, - notifier => #{ - client => default_kafka_client, - options => #{ - topic => <<"prg_topic">>, - lifecycle_topic => <<"prg_lifecycle_topic">> - } - } - } - }} - ]}, - {epg_connector, [ {databases, #{ progressor_db => #{ @@ -64,12 +42,6 @@ username => "progressor", password => "progressor" } - }}, - {pools, #{ - default_pool => #{ - database => progressor_db, - size => 10 - } }} ]}, From 1d8ffb959a7285f08eb16c933368d3f21a39201e Mon Sep 17 00:00:00 2001 From: ttt161 Date: Thu, 13 Nov 2025 15:17:46 +0300 Subject: [PATCH 2/2] fix issues --- apps/cdc_progressor/src/cdc_progressor.erl | 14 +++++++------- rebar.config | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/cdc_progressor/src/cdc_progressor.erl b/apps/cdc_progressor/src/cdc_progressor.erl index 725bc78..542c70a 100644 --- a/apps/cdc_progressor/src/cdc_progressor.erl +++ b/apps/cdc_progressor/src/cdc_progressor.erl @@ -233,7 +233,7 @@ parse_repl_data(ReplData, State) -> ReplData ). -parse_repl_unit({Table, Action, Row, PrevRow}, #{streams := Streams} = _State) -> +parse_repl_unit({Table, Action, Row, _PrevRow}, #{streams := Streams} = _State) -> %% see table naming convention in progressor (prg_pg_migration) [NsBin, TableType] = string:split(Table, <<"_">>, trailing), NsID = binary_to_atom(NsBin), @@ -241,7 +241,7 @@ parse_repl_unit({Table, Action, Row, PrevRow}, #{streams := Streams} = _State) - case TableType of <<"processes">> -> - handle_processes_change(NsBin, Action, Row, PrevRow, StreamConfig); + handle_processes_change(NsBin, Action, Row, StreamConfig); <<"events">> when Action =:= insert -> handle_events_insert(NsBin, Row, StreamConfig); _Other -> @@ -249,21 +249,21 @@ parse_repl_unit({Table, Action, Row, PrevRow}, #{streams := Streams} = _State) - [] end. -handle_processes_change(NsBin, Action, Row, PrevRow, StreamConfig) -> +handle_processes_change(NsBin, Action, Row, StreamConfig) -> #{kafka_client := KafkaClient, lifecycle_topic := Topic} = StreamConfig, ProcessID = maps:get(<<"process_id">>, Row, <<>>), EventKey = cdc_prg_converter:event_key(NsBin, ProcessID), - case convert_process_change(NsBin, Action, Row, PrevRow) of + case convert_process_change(NsBin, Action, Row) of [] -> []; Batch -> {KafkaClient, Topic, EventKey, Batch} end. -convert_process_change(NsBin, insert, #{<<"process_id">> := ProcessID}, _PrevRow) -> +convert_process_change(NsBin, insert, #{<<"process_id">> := ProcessID}) -> cdc_prg_converter:convert_lifecycle_event(NsBin, ProcessID, init); -convert_process_change(NsBin, update, Row, _PrevRow) -> +convert_process_change(NsBin, update, Row) -> #{<<"process_id">> := ProcessID} = Row, CurrentStatus = maps:get(<<"status">>, Row, undefined), PreviousStatus = maps:get(<<"previous_status">>, Row, undefined), @@ -277,7 +277,7 @@ convert_process_change(NsBin, update, Row, _PrevRow) -> _NoRelevantChange -> [] end; -convert_process_change(NsBin, delete, #{<<"process_id">> := ProcessID}, _PrevRow) -> +convert_process_change(NsBin, delete, #{<<"process_id">> := ProcessID}) -> cdc_prg_converter:convert_lifecycle_event(NsBin, ProcessID, remove). handle_events_insert(NsBin, Row, StreamConfig) -> diff --git a/rebar.config b/rebar.config index 1db7c20..2b2b37a 100644 --- a/rebar.config +++ b/rebar.config @@ -52,7 +52,7 @@ ]}, {test, [ {deps, [ - {progressor, {git, "https://github.com/valitydev/progressor.git", {branch, "epic/change-data-capture"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.13"}}}, {meck, "0.9.2"} ]}, {dialyzer, [