Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions src/prg_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,28 @@ do_process_task(
State1 = maybe_restore_history(Task, State),
handle_result(Result, TaskHeader, Task, Deadline, State1).

%% if task range undefined then history is full
maybe_restore_history(#{metadata := #{range := Range}}, State) when map_size(Range) =:= 0 ->
State;
%% if task range is defined then need restore full history for continuation
maybe_restore_history(
_,
#{metadata := #{range := Range}},
#prg_worker_state{
ns_id = NsId,
ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts,
sidecar_pid = Pid,
process = #{process_id := ProcessId} = Process
process = #{process_id := ProcessId}
} = State
) ->
%% TODO
) when map_size(Range) > 0 ->
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
{ok, #{history := FullHistory}} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, #{}),
State#prg_worker_state{process = Process#{history => FullHistory}}.
{ok, ProcessUpd} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, #{}),
State#prg_worker_state{process = ProcessUpd};
%% if task range undefined then history is full
maybe_restore_history(_, State) ->
State.

%% success result with timer
handle_result(
{ok, #{action := #{set_timer := Timestamp} = Action, events := Events} = Result},
TaskHeader,
#{task_id := TaskId, context := Context} = Task,
#{task_id := TaskId, context := Context},
Deadline,
#prg_worker_state{
ns_id = NsId,
Expand All @@ -183,18 +182,15 @@ handle_result(
finished_time => Now,
status => <<"finished">>
},
NewTask = maps:merge(
#{
process_id => ProcessId,
task_type => action_to_task_type(Action),
status => create_status(Timestamp, Now),
scheduled_time => Timestamp,
context => Context,
last_retry_interval => 0,
attempts_count => 0
},
maps:with([metadata], Task)
),
NewTask = #{
process_id => ProcessId,
task_type => action_to_task_type(Action),
status => create_status(Timestamp, Now),
scheduled_time => Timestamp,
context => Context,
last_retry_interval => 0,
attempts_count => 0
},
ok = prg_worker_sidecar:lifecycle_sink(
Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId
),
Expand All @@ -220,7 +216,9 @@ handle_result(
{ok, [ContinuationTask | _]} ->
NewHistory = maps:get(history, Process) ++ Events,
ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask),
State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}}
State#prg_worker_state{
process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}
}
end;
%% success result with undefined timer and remove action
handle_result(
Expand Down Expand Up @@ -285,7 +283,9 @@ handle_result(
{ok, [ContinuationTask | _]} ->
NewHistory = maps:get(history, Process) ++ Events,
ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask),
State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}}
State#prg_worker_state{
process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}
}
end;
%% success repair with corrupted task and undefined action
handle_result(
Expand Down Expand Up @@ -341,7 +341,9 @@ handle_result(
_ = maybe_reply(TaskHeader, Response),
NewHistory = maps:get(history, Process) ++ Events,
ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask),
State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}};
State#prg_worker_state{
process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}
};
_ ->
{ok, []} = prg_worker_sidecar:complete_and_unlock(
Pid,
Expand Down Expand Up @@ -400,7 +402,9 @@ handle_result(
{ok, [ContinuationTask | _]} ->
NewHistory = maps:get(history, Process) ++ Events,
ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask),
State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}}
State#prg_worker_state{
process = ProcessUpdated#{history => NewHistory, last_event_id => last_event_id(NewHistory)}
}
end;
%% calls processing error
handle_result(
Expand Down Expand Up @@ -602,3 +606,9 @@ maybe_wait_call(undefined) ->
ok;
maybe_wait_call(Timeout) ->
timer:sleep(Timeout).

last_event_id([]) ->
0;
last_event_id(History) ->
[#{event_id := Id} | _] = lists:reverse(History),
Id.
12 changes: 8 additions & 4 deletions test/prg_base_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -732,15 +732,17 @@ mock_processor(simple_call_test = TestCase) ->
mock_processor(simple_call_with_range_test = TestCase) ->
Self = self(),
MockProcessor = fun
({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
({init, <<"init_args">>, Process}, _Opts, _Ctx) ->
?assertEqual(0, maps:get(last_event_id, Process)),
Result = #{
events => [event(1), event(2), event(3), event(4)]
},
Self ! 1,
{ok, Result};
({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) ->
({call, <<"call_args">>, #{history := History} = Process}, _Opts, _Ctx) ->
%% call with range limit=2, offset=1
?assertEqual(2, erlang:length(History)),
?assertEqual(4, maps:get(last_event_id, Process)),
[
#{event_id := 2},
#{event_id := 3}
Expand All @@ -751,9 +753,10 @@ mock_processor(simple_call_with_range_test = TestCase) ->
},
Self ! 2,
{ok, Result};
({call, <<"call_args_back">>, #{history := History} = _Process}, _Opts, _Ctx) ->
({call, <<"call_args_back">>, #{history := History} = Process}, _Opts, _Ctx) ->
%% call with range limit=2, offset=5 direction=backward
?assertEqual(2, erlang:length(History)),
?assertEqual(5, maps:get(last_event_id, Process)),
[
#{event_id := 4},
#{event_id := 3}
Expand All @@ -765,8 +768,9 @@ mock_processor(simple_call_with_range_test = TestCase) ->
},
Self ! 3,
{ok, Result};
({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) ->
({timeout, <<>>, #{history := History} = Process}, _Opts, _Ctx) ->
?assertEqual(6, erlang:length(History)),
?assertEqual(6, maps:get(last_event_id, Process)),
Result = #{
events => [event(7)]
},
Expand Down
Loading