diff --git a/include/progressor.hrl b/include/progressor.hrl index 5e33e9b..c5cdc32 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -104,7 +104,8 @@ -type history_range() :: #{ offset => non_neg_integer(), - limit => pos_integer() + limit => pos_integer(), + direction => forward | backward }. -type process_result() :: diff --git a/src/progressor.erl b/src/progressor.erl index efe0242..bb1c864 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -246,6 +246,8 @@ await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration) -> do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := HistoryRange}) -> prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange); +do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange}) -> + prg_storage:get_process(external, StorageOpts, NsId, Id, HistoryRange); do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> prg_storage:get_process(external, StorageOpts, NsId, Id, #{}). diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 7c39694..1a6aa9a 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -79,37 +79,52 @@ get_process_status(PgOpts, NsId, Id) -> [{Status}] -> {ok, Status} end. --spec get_process(recipient(), pg_opts(), namespace_id(), id(), history_range()) -> {ok, process()}. +-spec get_process(recipient(), pg_opts(), namespace_id(), id(), history_range()) -> + {ok, process()} | {error, _Reason}. get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> Pool = get_pool(Recipient, PgOpts), - EventsTable = construct_table_name(NsId, "_events"), - ProcessesTable = construct_table_name(NsId, "_processes"), - RangeCondition = create_range_condition(HistoryRange), - %% TODO optimize request - {ok, Columns, Rows} = epg_pool:query( + #{ + processes := ProcessesTable, + events := EventsTable + } = tables(NsId), + RawResult = epg_pool:transaction( Pool, - "SELECT pr.process_id, pr.status, pr.detail, pr.aux_state, pr.metadata as p_meta, pr.corrupted_by," - " ev.event_id, ev.timestamp, ev.metadata, ev.payload " - "FROM " - " (SELECT * FROM " ++ ProcessesTable ++ - " WHERE process_id = $1) AS pr " - " LEFT JOIN (SELECT * FROM " ++ EventsTable ++ RangeCondition ++ - " ORDER BY event_id ASC) AS ev " - " ON ev.process_id = pr.process_id ", - [ProcessId] + fun(Connection) -> + case do_get_process(Connection, ProcessesTable, ProcessId) of + {ok, _, []} -> + {error, <<"process not found">>}; + {ok, _ColumnsPr, _RowsPr} = ProcResult -> + {ok, _, _} = + EventsResult = do_get_events(Connection, EventsTable, ProcessId, HistoryRange), + {ok, ProcResult, EventsResult} + end + end ), - case Rows of - [] -> - {error, <<"process not found">>}; - [Head | _] -> - [Proc] = to_maps(Columns, [Head], fun marshal_process/1), - Events = lists:filter( - fun(Rec) -> is_map_key(event_id, Rec) end, - to_maps(Columns, Rows, fun marshal_event/1) - ), - {ok, Proc#{history => Events}} + case RawResult of + {error, _} = Error -> + Error; + {ok, {ok, ProcColumns, ProcRows}, {ok, EventsColumns, EventsRows}} -> + [Process] = to_maps(ProcColumns, ProcRows, fun marshal_process/1), + History = to_maps(EventsColumns, EventsRows, fun marshal_event/1), + {ok, Process#{history => History}} end. +do_get_process(Connection, Table, ProcessId) -> + epg_pool:query( + Connection, + "SELECT * from " ++ Table ++ " WHERE process_id = $1", + [ProcessId] + ). + +do_get_events(Connection, EventsTable, ProcessId, HistoryRange) -> + RangeCondition = create_range_condition(HistoryRange), + SQL = "SELECT * FROM " ++ EventsTable ++ " WHERE process_id = $1 " ++ RangeCondition, + epg_pool:query( + Connection, + SQL, + [ProcessId] + ). + -spec put_process_data( pg_opts(), namespace_id(), @@ -783,13 +798,22 @@ tables(NsId) -> events => construct_table_name(NsId, "_events") }. -create_range_condition(#{offset := Offset, limit := Limit}) -> - " WHERE event_id > " ++ integer_to_list(Offset) ++ " AND event_id <= " ++ integer_to_list(Offset + Limit) ++ " "; -create_range_condition(#{offset := Offset}) -> - " WHERE event_id > " ++ integer_to_list(Offset) ++ " "; -create_range_condition(#{limit := Limit}) -> - " WHERE event_id <= " ++ integer_to_list(Limit) ++ " "; -create_range_condition(_) -> +create_range_condition(Range) -> + order_by(Range) ++ limit(Range) ++ offset(Range). + +order_by(#{direction := backward}) -> + " ORDER BY event_id DESC "; +order_by(_) -> + " ORDER BY event_id ASC ". + +limit(#{limit := Limit}) -> + " LIMIT " ++ integer_to_list(Limit) ++ " "; +limit(_) -> + " ". + +offset(#{offset := Offset}) -> + " OFFSET " ++ integer_to_list(Offset) ++ " "; +offset(_) -> " ". do_save_process(Connection, Table, Process) -> @@ -1192,7 +1216,7 @@ marshal_process(Process) -> (<<"status">>, Status, Acc) -> Acc#{status => Status}; (<<"detail">>, Detail, Acc) -> Acc#{detail => Detail}; (<<"aux_state">>, AuxState, Acc) -> Acc#{aux_state => AuxState}; - (<<"p_meta">>, Meta, Acc) -> Acc#{metadata => Meta}; + (<<"metadata">>, Meta, Acc) -> Acc#{metadata => Meta}; (<<"corrupted_by">>, CorruptedBy, Acc) -> Acc#{corrupted_by => CorruptedBy}; (_, _, Acc) -> Acc end, diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 5ddd2e6..bb97dee 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -145,7 +145,13 @@ simple_call_with_range_test(_C) -> args => <<"call_args">>, range => #{offset => 1, limit => 2} }), - 3 = expect_steps_counter(3), + {ok, <<"response">>} = progressor:call(#{ + ns => ?NS, + id => Id, + args => <<"call_args_back">>, + range => #{offset => 1, limit => 2, direction => backward} + }), + 4 = expect_steps_counter(4), {ok, #{ process_id := Id, status := <<"running">>, @@ -155,9 +161,21 @@ simple_call_with_range_test(_C) -> #{event_id := 3}, #{event_id := 4}, #{event_id := 5}, - #{event_id := 6} + #{event_id := 6}, + #{event_id := 7} ] }} = progressor:get(#{ns => ?NS, id => Id}), + {ok, #{ + process_id := Id, + status := <<"running">>, + history := [ + #{event_id := 5}, + #{event_id := 4}, + #{event_id := 3}, + #{event_id := 2}, + #{event_id := 1} + ] + }} = progressor:get(#{ns => ?NS, id => Id, range => #{offset => 2, direction => backward}}), unmock_processor(), ok. %% @@ -726,17 +744,30 @@ mock_processor(simple_call_with_range_test = TestCase) -> ] = History, Result = #{ response => <<"response">>, - events => [event(5)], - action => #{set_timer => erlang:system_time(second)} + events => [event(5)] }, Self ! 2, {ok, Result}; - ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> - ?assertEqual(5, erlang:length(History)), + ({call, <<"call_args_back">>, #{history := History} = _Process}, _Opts, _Ctx) -> + %% call with range limit=2, offset=1 direction=backward + ?assertEqual(2, erlang:length(History)), + [ + #{event_id := 4}, + #{event_id := 3} + ] = History, Result = #{ - events => [event(6)] + response => <<"response">>, + events => [event(6)], + action => #{set_timer => erlang:system_time(second)} }, Self ! 3, + {ok, Result}; + ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> + ?assertEqual(6, erlang:length(History)), + Result = #{ + events => [event(7)] + }, + Self ! 4, {ok, Result} end, mock_processor(TestCase, MockProcessor);