From ccb53eef941ea7549263f1c596fca776db88961d Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 11 Mar 2025 10:25:46 +0300 Subject: [PATCH] TECH-61: fix last_event_id --- src/prg_worker.erl | 62 ++++++++++++++++++++++++----------------- test/prg_base_SUITE.erl | 12 +++++--- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/prg_worker.erl b/src/prg_worker.erl index c6f5bfe..c19454f 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -141,29 +141,28 @@ do_process_task( State1 = maybe_restore_history(Task, State), handle_result(Result, TaskHeader, Task, Deadline, State1). -%% if task range undefined then history is full -maybe_restore_history(#{metadata := #{range := Range}}, State) when map_size(Range) =:= 0 -> - State; %% if task range is defined then need restore full history for continuation maybe_restore_history( - _, + #{metadata := #{range := Range}}, #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid, - process = #{process_id := ProcessId} = Process + process = #{process_id := ProcessId} } = State -) -> - %% TODO +) when map_size(Range) > 0 -> Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, - {ok, #{history := FullHistory}} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, #{}), - State#prg_worker_state{process = Process#{history => FullHistory}}. + {ok, ProcessUpd} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, #{}), + State#prg_worker_state{process = ProcessUpd}; +%% if task range undefined then history is full +maybe_restore_history(_, State) -> + State. %% success result with timer handle_result( {ok, #{action := #{set_timer := Timestamp} = Action, events := Events} = Result}, TaskHeader, - #{task_id := TaskId, context := Context} = Task, + #{task_id := TaskId, context := Context}, Deadline, #prg_worker_state{ ns_id = NsId, @@ -183,18 +182,15 @@ handle_result( finished_time => Now, status => <<"finished">> }, - NewTask = maps:merge( - #{ - process_id => ProcessId, - task_type => action_to_task_type(Action), - status => create_status(Timestamp, Now), - scheduled_time => Timestamp, - context => Context, - last_retry_interval => 0, - attempts_count => 0 - }, - maps:with([metadata], Task) - ), + NewTask = #{ + process_id => ProcessId, + task_type => action_to_task_type(Action), + status => create_status(Timestamp, Now), + scheduled_time => Timestamp, + context => Context, + last_retry_interval => 0, + attempts_count => 0 + }, ok = prg_worker_sidecar:lifecycle_sink( Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId ), @@ -220,7 +216,9 @@ handle_result( {ok, [ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), - State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}} + State#prg_worker_state{ + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + } end; %% success result with undefined timer and remove action handle_result( @@ -285,7 +283,9 @@ handle_result( {ok, [ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), - State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}} + State#prg_worker_state{ + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + } end; %% success repair with corrupted task and undefined action handle_result( @@ -341,7 +341,9 @@ handle_result( _ = maybe_reply(TaskHeader, Response), NewHistory = maps:get(history, Process) ++ Events, ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), - State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}}; + State#prg_worker_state{ + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + }; _ -> {ok, []} = prg_worker_sidecar:complete_and_unlock( Pid, @@ -400,7 +402,9 @@ handle_result( {ok, [ContinuationTask | _]} -> NewHistory = maps:get(history, Process) ++ Events, ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), - State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}} + State#prg_worker_state{ + process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)} + } end; %% calls processing error handle_result( @@ -602,3 +606,9 @@ maybe_wait_call(undefined) -> ok; maybe_wait_call(Timeout) -> timer:sleep(Timeout). + +last_event_id([]) -> + 0; +last_event_id(History) -> + [#{event_id := Id} | _] = lists:reverse(History), + Id. diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index ee4dd67..e9ab647 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -732,15 +732,17 @@ mock_processor(simple_call_test = TestCase) -> mock_processor(simple_call_with_range_test = TestCase) -> Self = self(), MockProcessor = fun - ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> + ({init, <<"init_args">>, Process}, _Opts, _Ctx) -> + ?assertEqual(0, maps:get(last_event_id, Process)), Result = #{ events => [event(1), event(2), event(3), event(4)] }, Self ! 1, {ok, Result}; - ({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) -> + ({call, <<"call_args">>, #{history := History} = Process}, _Opts, _Ctx) -> %% call with range limit=2, offset=1 ?assertEqual(2, erlang:length(History)), + ?assertEqual(4, maps:get(last_event_id, Process)), [ #{event_id := 2}, #{event_id := 3} @@ -751,9 +753,10 @@ mock_processor(simple_call_with_range_test = TestCase) -> }, Self ! 2, {ok, Result}; - ({call, <<"call_args_back">>, #{history := History} = _Process}, _Opts, _Ctx) -> + ({call, <<"call_args_back">>, #{history := History} = Process}, _Opts, _Ctx) -> %% call with range limit=2, offset=5 direction=backward ?assertEqual(2, erlang:length(History)), + ?assertEqual(5, maps:get(last_event_id, Process)), [ #{event_id := 4}, #{event_id := 3} @@ -765,8 +768,9 @@ mock_processor(simple_call_with_range_test = TestCase) -> }, Self ! 3, {ok, Result}; - ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> + ({timeout, <<>>, #{history := History} = Process}, _Opts, _Ctx) -> ?assertEqual(6, erlang:length(History)), + ?assertEqual(6, maps:get(last_event_id, Process)), Result = #{ events => [event(7)] },