From 55caa7fab7b9243a259feaa862a28a824703eced Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 26 Mar 2025 09:17:01 +0300 Subject: [PATCH] TECH-26: bump epg_connector --- rebar.lock | 2 +- src/prg_worker.erl | 4 + src/storage/postgres/prg_pg_backend.erl | 284 +++++------------------- test/prg_ct_hook.erl | 3 +- 4 files changed, 64 insertions(+), 229 deletions(-) diff --git a/rebar.lock b/rebar.lock index 08148c6..50743e3 100644 --- a/rebar.lock +++ b/rebar.lock @@ -7,7 +7,7 @@ {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.11">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"35a7480b298ac4318352a03824ce06619b75f9da"}}, + {ref,"3ff44b60214aa0a9f74a659a3e06fc7de45d7f8b"}}, 0}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", diff --git a/src/prg_worker.erl b/src/prg_worker.erl index c19454f..14e5e8a 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -21,6 +21,8 @@ -record(prg_worker_state, {ns_id, ns_opts, process, sidecar_pid}). +-define(DEFAULT_RANGE, #{direction => forward}). + %%% %%% API %%% @@ -141,6 +143,8 @@ do_process_task( State1 = maybe_restore_history(Task, State), handle_result(Result, TaskHeader, Task, Deadline, State1). +maybe_restore_history(#{metadata := #{range := Range}}, State) when Range =:= ?DEFAULT_RANGE -> + State; %% if task range is defined then need restore full history for continuation maybe_restore_history( #{metadata := #{range := Range}}, diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 8f1da2d..eacd86f 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -87,97 +87,30 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> processes := ProcessesTable, events := EventsTable } = tables(NsId), + RangeCondition = create_range_condition(HistoryRange), RawResult = epg_pool:transaction( Pool, fun(Connection) -> case do_get_process(Connection, ProcessesTable, ProcessId) of {ok, _, []} -> {error, <<"process not found">>}; - {ok, _ColumnsPr, _RowsPr} = ProcResult -> + {ok, ColumnsPr, RowsPr} -> {ok, _, _} = - EventsResult = do_get_events(Connection, EventsTable, ProcessId), - {ok, ProcResult, EventsResult} + {ok, ColumnstEv, RowsEv} = do_get_events(Connection, EventsTable, ProcessId, RangeCondition), + LastEventId = get_last_event_id(Connection, EventsTable, ProcessId), + {ok, {ColumnsPr, RowsPr}, {ColumnstEv, RowsEv}, LastEventId} end end ), case RawResult of {error, _} = Error -> Error; - {ok, {ok, ProcColumns, ProcRows}, {ok, EventsColumns, EventsRows}} -> + {ok, {ProcColumns, ProcRows}, {EventsColumns, EventsRows}, LastEventId} -> [Process] = to_maps(ProcColumns, ProcRows, fun marshal_process/1), - FullHistory = to_maps(EventsColumns, EventsRows, fun marshal_event/1), - HistoryData = create_history_data(FullHistory, HistoryRange), - {ok, maps:merge(Process, HistoryData)} + History = to_maps(EventsColumns, EventsRows, fun marshal_event/1), + {ok, Process#{history => History, last_event_id => LastEventId, range => HistoryRange}} end. -do_get_process(Connection, Table, ProcessId) -> - epg_pool:query( - Connection, - "SELECT * from " ++ Table ++ " WHERE process_id = $1", - [ProcessId] - ). - -do_get_events(Connection, EventsTable, ProcessId) -> - SQL = "SELECT * FROM " ++ EventsTable ++ " WHERE process_id = $1 ORDER BY event_id ASC", - epg_pool:query( - Connection, - SQL, - [ProcessId] - ). - -create_history_data([], RequestedRange) -> - #{ - history => [], - range => RequestedRange, - last_event_id => 0 - }; -create_history_data([#{event_id := FirstEventId} | _] = FullHistory, #{direction := backward} = RequestedRange) -> - [#{event_id := LastEventId} | _] = lists:reverse(FullHistory), - RequestedStartPosition = maps:get(offset, RequestedRange, undefined), - RequestedLimit = maps:get(limit, RequestedRange, undefined), - StartFrom = start_position(backward, RequestedStartPosition, FirstEventId, LastEventId), - HistoryPart = lists:sublist(FullHistory, StartFrom), - Limit = applicable_limit(RequestedLimit, erlang:length(HistoryPart)), - History = lists:sublist(lists:reverse(HistoryPart), Limit), - #{ - history => History, - range => RequestedRange, - last_event_id => LastEventId - }; -create_history_data([#{event_id := FirstEventId} | _] = FullHistory, RequestedRange) -> - [#{event_id := LastEventId} | _] = lists:reverse(FullHistory), - RequestedStartPosition = maps:get(offset, RequestedRange, undefined), - RequestedLimit = maps:get(limit, RequestedRange, undefined), - StartFrom = start_position(forward, RequestedStartPosition, FirstEventId, LastEventId), - {_, HistoryPart} = lists:split(StartFrom, FullHistory), - Limit = applicable_limit(RequestedLimit, erlang:length(HistoryPart)), - History = lists:sublist(HistoryPart, Limit), - #{ - history => History, - range => RequestedRange, - last_event_id => LastEventId - }. - -start_position(forward, undefined, _FirstEventId, _LastEventId) -> - 0; -start_position(forward, RequestedStartPosition, FirstEventId, LastEventId) -> - erlang:max( - 0, - erlang:min(RequestedStartPosition, LastEventId) - FirstEventId + 1 - ); -start_position(backward, undefined, FirstEventId, LastEventId) -> - LastEventId - FirstEventId + 1; -start_position(backward, RequestedStartPosition, FirstEventId, LastEventId) -> - erlang:max( - 0, - erlang:min(RequestedStartPosition, LastEventId + 1) - FirstEventId - ). - -applicable_limit(undefined, AvailableLimit) -> - AvailableLimit; -applicable_limit(RequestedLimit, AvailableLimit) -> - erlang:min(RequestedLimit, AvailableLimit). - %%% -spec put_process_data( @@ -853,23 +786,58 @@ tables(NsId) -> events => construct_table_name(NsId, "_events") }. -%create_range_condition(Range) -> -% order_by(Range) ++ limit(Range) ++ offset(Range). +create_range_condition(Range) -> + after_id(Range) ++ direction(Range) ++ limit(Range). + +after_id(#{offset := After} = Range) -> + Direction = maps:get(direction, Range, forward), + " AND event_id " ++ operator(Direction) ++ integer_to_list(After) ++ " "; +after_id(_) -> + " ". + +operator(forward) -> + " > "; +operator(backward) -> + " < ". -%order_by(#{direction := backward}) -> -% " ORDER BY event_id DESC "; -%order_by(_) -> -% " ORDER BY event_id ASC ". +limit(#{limit := Limit}) -> + " LIMIT " ++ integer_to_list(Limit) ++ " "; +limit(_) -> + " ". + +direction(#{direction := backward}) -> + " ORDER BY event_id DESC "; +direction(_) -> + " ORDER BY event_id ASC ". + +do_get_process(Connection, Table, ProcessId) -> + epg_pool:query( + Connection, + "SELECT * from " ++ Table ++ " WHERE process_id = $1", + [ProcessId] + ). -%limit(#{limit := Limit}) -> -% " LIMIT " ++ integer_to_list(Limit) ++ " "; -%limit(_) -> -% " ". +do_get_events(Connection, EventsTable, ProcessId, RangeCondition) -> + SQL = "SELECT * FROM " ++ EventsTable ++ " WHERE process_id = $1 " ++ RangeCondition, + epg_pool:query( + Connection, + SQL, + [ProcessId] + ). -%offset(#{offset := Offset}) -> -% " OFFSET " ++ integer_to_list(Offset) ++ " "; -%offset(_) -> -% " ". +get_last_event_id(Connection, EventsTable, ProcessId) -> + SQL = "SELECT max(event_id) FROM " ++ EventsTable ++ " WHERE process_id = $1", + Result = epg_pool:query( + Connection, + SQL, + [ProcessId] + ), + case Result of + {ok, _, [{null}]} -> + 0; + {ok, _, [{Value}]} -> + Value + end. do_save_process(Connection, Table, Process) -> #{ @@ -1302,141 +1270,3 @@ get_pool(external, #{pool := BasePool} = PgOpts) -> maps:get(front_pool, PgOpts, BasePool); get_pool(scan, #{pool := BasePool} = PgOpts) -> maps:get(scan_pool, PgOpts, BasePool). - -%% - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -event(Id) -> - #{event_id => Id}. - -events(Min, Max) -> - lists:reverse( - lists:foldl( - fun(Id, Acc) -> [event(Id) | Acc] end, - [], - lists:seq(Min, Max) - ) - ). - --spec test() -> _. - --spec range_test() -> _. -range_test() -> - %% see mg_core_events_SUITE - Events1 = events(1, 100), - Range11 = #{offset => 5, limit => 3, direction => backward}, - ?assertEqual( - #{ - range => Range11, - last_event_id => 100, - history => [event(4), event(3), event(2)] - }, - create_history_data(Events1, Range11) - ), - Range12 = #{offset => 4, limit => 4, direction => forward}, - ?assertEqual( - #{ - range => Range12, - last_event_id => 100, - history => [event(5), event(6), event(7), event(8)] - }, - create_history_data(Events1, Range12) - ), - %% - Events2 = events(5, 10), - Range21 = #{offset => 11, limit => 1, direction => forward}, - ?assertEqual( - #{ - range => Range21, - last_event_id => 10, - history => [] - }, - create_history_data(Events2, Range21) - ), - Range22 = #{offset => 4, limit => 1, direction => backward}, - ?assertEqual( - #{ - range => Range22, - last_event_id => 10, - history => [] - }, - create_history_data(Events2, Range22) - ), - Range23 = #{offset => 1, limit => 2, direction => forward}, - ?assertEqual( - #{ - range => Range23, - last_event_id => 10, - history => [event(5), event(6)] - }, - create_history_data(Events2, Range23) - ), - Range24 = #{offset => 11, limit => 2, direction => backward}, - ?assertEqual( - #{ - range => Range24, - last_event_id => 10, - history => [event(10), event(9)] - }, - create_history_data(Events2, Range24) - ), - %% - Events3 = events(1, 8), - Range31 = #{limit => 2, direction => forward}, - ?assertEqual( - #{ - range => Range31, - last_event_id => 8, - history => [event(1), event(2)] - }, - create_history_data(Events3, Range31) - ), - Range32 = #{limit => 2, direction => backward}, - ?assertEqual( - #{ - range => Range32, - last_event_id => 8, - history => [event(8), event(7)] - }, - create_history_data(Events3, Range32) - ), - Range33 = #{offset => 5, limit => 5, direction => forward}, - ?assertEqual( - #{ - range => Range33, - last_event_id => 8, - history => [event(6), event(7), event(8)] - }, - create_history_data(Events3, Range33) - ), - Range34 = #{limit => 3, direction => forward}, - ?assertEqual( - #{ - range => Range34, - last_event_id => 8, - history => [event(1), event(2), event(3)] - }, - create_history_data(Events3, Range34) - ), - Range35 = #{offset => 6, direction => forward}, - ?assertEqual( - #{ - range => Range35, - last_event_id => 8, - history => [event(7), event(8)] - }, - create_history_data(Events3, Range35) - ), - Range36 = #{}, - ?assertEqual( - #{ - range => Range36, - last_event_id => 8, - history => [event(1), event(2), event(3), event(4), event(5), event(6), event(7), event(8)] - }, - create_history_data(Events3, Range36) - ). - --endif. diff --git a/test/prg_ct_hook.erl b/test/prg_ct_hook.erl index 59240eb..e3583a4 100644 --- a/test/prg_ct_hook.erl +++ b/test/prg_ct_hook.erl @@ -100,7 +100,8 @@ app_env(epg_connector) -> database => progressor_db, size => 10 } - }} + }}, + {force_garbage_collect, true} ]; app_env(brod) -> [