diff --git a/include/progressor.hrl b/include/progressor.hrl index c5cdc32..c9163f4 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -8,7 +8,9 @@ aux_state => binary(), metadata => map(), history => [event()], - corrupted_by => task_id() + corrupted_by => task_id(), + range => history_range(), + last_event_id => event_id() }. -type task() :: #{ diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 1a6aa9a..8f1da2d 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -95,7 +95,7 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> {error, <<"process not found">>}; {ok, _ColumnsPr, _RowsPr} = ProcResult -> {ok, _, _} = - EventsResult = do_get_events(Connection, EventsTable, ProcessId, HistoryRange), + EventsResult = do_get_events(Connection, EventsTable, ProcessId), {ok, ProcResult, EventsResult} end end @@ -105,8 +105,9 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> Error; {ok, {ok, ProcColumns, ProcRows}, {ok, EventsColumns, EventsRows}} -> [Process] = to_maps(ProcColumns, ProcRows, fun marshal_process/1), - History = to_maps(EventsColumns, EventsRows, fun marshal_event/1), - {ok, Process#{history => History}} + FullHistory = to_maps(EventsColumns, EventsRows, fun marshal_event/1), + HistoryData = create_history_data(FullHistory, HistoryRange), + {ok, maps:merge(Process, HistoryData)} end. do_get_process(Connection, Table, ProcessId) -> @@ -116,15 +117,69 @@ do_get_process(Connection, Table, ProcessId) -> [ProcessId] ). -do_get_events(Connection, EventsTable, ProcessId, HistoryRange) -> - RangeCondition = create_range_condition(HistoryRange), - SQL = "SELECT * FROM " ++ EventsTable ++ " WHERE process_id = $1 " ++ RangeCondition, +do_get_events(Connection, EventsTable, ProcessId) -> + SQL = "SELECT * FROM " ++ EventsTable ++ " WHERE process_id = $1 ORDER BY event_id ASC", epg_pool:query( Connection, SQL, [ProcessId] ). +create_history_data([], RequestedRange) -> + #{ + history => [], + range => RequestedRange, + last_event_id => 0 + }; +create_history_data([#{event_id := FirstEventId} | _] = FullHistory, #{direction := backward} = RequestedRange) -> + [#{event_id := LastEventId} | _] = lists:reverse(FullHistory), + RequestedStartPosition = maps:get(offset, RequestedRange, undefined), + RequestedLimit = maps:get(limit, RequestedRange, undefined), + StartFrom = start_position(backward, RequestedStartPosition, FirstEventId, LastEventId), + HistoryPart = lists:sublist(FullHistory, StartFrom), + Limit = applicable_limit(RequestedLimit, erlang:length(HistoryPart)), + History = lists:sublist(lists:reverse(HistoryPart), Limit), + #{ + history => History, + range => RequestedRange, + last_event_id => LastEventId + }; +create_history_data([#{event_id := FirstEventId} | _] = FullHistory, RequestedRange) -> + [#{event_id := LastEventId} | _] = lists:reverse(FullHistory), + RequestedStartPosition = maps:get(offset, RequestedRange, undefined), + RequestedLimit = maps:get(limit, RequestedRange, undefined), + StartFrom = start_position(forward, RequestedStartPosition, FirstEventId, LastEventId), + {_, HistoryPart} = lists:split(StartFrom, FullHistory), + Limit = applicable_limit(RequestedLimit, erlang:length(HistoryPart)), + History = lists:sublist(HistoryPart, Limit), + #{ + history => History, + range => RequestedRange, + last_event_id => LastEventId + }. + +start_position(forward, undefined, _FirstEventId, _LastEventId) -> + 0; +start_position(forward, RequestedStartPosition, FirstEventId, LastEventId) -> + erlang:max( + 0, + erlang:min(RequestedStartPosition, LastEventId) - FirstEventId + 1 + ); +start_position(backward, undefined, FirstEventId, LastEventId) -> + LastEventId - FirstEventId + 1; +start_position(backward, RequestedStartPosition, FirstEventId, LastEventId) -> + erlang:max( + 0, + erlang:min(RequestedStartPosition, LastEventId + 1) - FirstEventId + ). + +applicable_limit(undefined, AvailableLimit) -> + AvailableLimit; +applicable_limit(RequestedLimit, AvailableLimit) -> + erlang:min(RequestedLimit, AvailableLimit). + +%%% + -spec put_process_data( pg_opts(), namespace_id(), @@ -798,23 +853,23 @@ tables(NsId) -> events => construct_table_name(NsId, "_events") }. -create_range_condition(Range) -> - order_by(Range) ++ limit(Range) ++ offset(Range). +%create_range_condition(Range) -> +% order_by(Range) ++ limit(Range) ++ offset(Range). -order_by(#{direction := backward}) -> - " ORDER BY event_id DESC "; -order_by(_) -> - " ORDER BY event_id ASC ". +%order_by(#{direction := backward}) -> +% " ORDER BY event_id DESC "; +%order_by(_) -> +% " ORDER BY event_id ASC ". -limit(#{limit := Limit}) -> - " LIMIT " ++ integer_to_list(Limit) ++ " "; -limit(_) -> - " ". +%limit(#{limit := Limit}) -> +% " LIMIT " ++ integer_to_list(Limit) ++ " "; +%limit(_) -> +% " ". -offset(#{offset := Offset}) -> - " OFFSET " ++ integer_to_list(Offset) ++ " "; -offset(_) -> - " ". +%offset(#{offset := Offset}) -> +% " OFFSET " ++ integer_to_list(Offset) ++ " "; +%offset(_) -> +% " ". do_save_process(Connection, Table, Process) -> #{ @@ -1247,3 +1302,141 @@ get_pool(external, #{pool := BasePool} = PgOpts) -> maps:get(front_pool, PgOpts, BasePool); get_pool(scan, #{pool := BasePool} = PgOpts) -> maps:get(scan_pool, PgOpts, BasePool). + +%% + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +event(Id) -> + #{event_id => Id}. + +events(Min, Max) -> + lists:reverse( + lists:foldl( + fun(Id, Acc) -> [event(Id) | Acc] end, + [], + lists:seq(Min, Max) + ) + ). + +-spec test() -> _. + +-spec range_test() -> _. +range_test() -> + %% see mg_core_events_SUITE + Events1 = events(1, 100), + Range11 = #{offset => 5, limit => 3, direction => backward}, + ?assertEqual( + #{ + range => Range11, + last_event_id => 100, + history => [event(4), event(3), event(2)] + }, + create_history_data(Events1, Range11) + ), + Range12 = #{offset => 4, limit => 4, direction => forward}, + ?assertEqual( + #{ + range => Range12, + last_event_id => 100, + history => [event(5), event(6), event(7), event(8)] + }, + create_history_data(Events1, Range12) + ), + %% + Events2 = events(5, 10), + Range21 = #{offset => 11, limit => 1, direction => forward}, + ?assertEqual( + #{ + range => Range21, + last_event_id => 10, + history => [] + }, + create_history_data(Events2, Range21) + ), + Range22 = #{offset => 4, limit => 1, direction => backward}, + ?assertEqual( + #{ + range => Range22, + last_event_id => 10, + history => [] + }, + create_history_data(Events2, Range22) + ), + Range23 = #{offset => 1, limit => 2, direction => forward}, + ?assertEqual( + #{ + range => Range23, + last_event_id => 10, + history => [event(5), event(6)] + }, + create_history_data(Events2, Range23) + ), + Range24 = #{offset => 11, limit => 2, direction => backward}, + ?assertEqual( + #{ + range => Range24, + last_event_id => 10, + history => [event(10), event(9)] + }, + create_history_data(Events2, Range24) + ), + %% + Events3 = events(1, 8), + Range31 = #{limit => 2, direction => forward}, + ?assertEqual( + #{ + range => Range31, + last_event_id => 8, + history => [event(1), event(2)] + }, + create_history_data(Events3, Range31) + ), + Range32 = #{limit => 2, direction => backward}, + ?assertEqual( + #{ + range => Range32, + last_event_id => 8, + history => [event(8), event(7)] + }, + create_history_data(Events3, Range32) + ), + Range33 = #{offset => 5, limit => 5, direction => forward}, + ?assertEqual( + #{ + range => Range33, + last_event_id => 8, + history => [event(6), event(7), event(8)] + }, + create_history_data(Events3, Range33) + ), + Range34 = #{limit => 3, direction => forward}, + ?assertEqual( + #{ + range => Range34, + last_event_id => 8, + history => [event(1), event(2), event(3)] + }, + create_history_data(Events3, Range34) + ), + Range35 = #{offset => 6, direction => forward}, + ?assertEqual( + #{ + range => Range35, + last_event_id => 8, + history => [event(7), event(8)] + }, + create_history_data(Events3, Range35) + ), + Range36 = #{}, + ?assertEqual( + #{ + range => Range36, + last_event_id => 8, + history => [event(1), event(2), event(3), event(4), event(5), event(6), event(7), event(8)] + }, + create_history_data(Events3, Range36) + ). + +-endif. diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index bb97dee..ee4dd67 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -135,7 +135,8 @@ simple_call_with_range_test(_C) -> %% steps: %% 1. init -> [event1, event2, event3, event4], timer 2s %% 2. call range limit 2 offset 1 -> [event5], timer 0s - %% 3. timeout -> [event6], undefined + %% 2. call range limit 2 offset 5 back -> [event6], timer 0s + %% 3. timeout -> [event7], undefined _ = mock_processor(simple_call_with_range_test), Id = gen_id(), {ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}), @@ -149,7 +150,7 @@ simple_call_with_range_test(_C) -> ns => ?NS, id => Id, args => <<"call_args_back">>, - range => #{offset => 1, limit => 2, direction => backward} + range => #{offset => 5, limit => 2, direction => backward} }), 4 = expect_steps_counter(4), {ok, #{ @@ -168,6 +169,8 @@ simple_call_with_range_test(_C) -> {ok, #{ process_id := Id, status := <<"running">>, + range := #{offset := 6, direction := backward}, + last_event_id := 7, history := [ #{event_id := 5}, #{event_id := 4}, @@ -175,7 +178,7 @@ simple_call_with_range_test(_C) -> #{event_id := 2}, #{event_id := 1} ] - }} = progressor:get(#{ns => ?NS, id => Id, range => #{offset => 2, direction => backward}}), + }} = progressor:get(#{ns => ?NS, id => Id, range => #{offset => 6, direction => backward}}), unmock_processor(), ok. %% @@ -749,7 +752,7 @@ mock_processor(simple_call_with_range_test = TestCase) -> Self ! 2, {ok, Result}; ({call, <<"call_args_back">>, #{history := History} = _Process}, _Opts, _Ctx) -> - %% call with range limit=2, offset=1 direction=backward + %% call with range limit=2, offset=5 direction=backward ?assertEqual(2, erlang:length(History)), [ #{event_id := 4},