diff --git a/src/prg_scanner.erl b/src/prg_scanner.erl index 593ac92..f09f98c 100644 --- a/src/prg_scanner.erl +++ b/src/prg_scanner.erl @@ -40,9 +40,15 @@ init( rescan_timeout = RescanTimeoutMs, step_timeout = StepTimeoutMs }, - _ = start_rescan_timers(RescanTimeoutMs), - _ = start_rescan_calls((RescanTimeoutMs div 3) + 100), - _ = start_zombie_collector(StepTimeoutMs), + _ = + case maps:get(worker_pool_size, Opts) > 0 of + true -> + _ = start_rescan_timers(RescanTimeoutMs), + _ = start_rescan_calls((RescanTimeoutMs div 3) + 100), + _ = start_zombie_collector(StepTimeoutMs); + false -> + skip_scanning + end, {ok, State}. handle_call(_Request, _From, State) -> diff --git a/src/prg_worker.erl b/src/prg_worker.erl index acba026..c6f5bfe 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -75,7 +75,8 @@ handle_cast( ) -> Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, ProcessId = maps:get(process_id, Task), - {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId), + HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}), + {ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange), NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}), {noreply, NewState}; handle_cast( @@ -137,7 +138,26 @@ do_process_task( Ctx = maps:get(context, Task, <<>>), Request = {extract_task_type(TaskHeader), Args, Process}, Result = prg_worker_sidecar:process(Pid, Deadline, NsOpts, Request, Ctx), - handle_result(Result, TaskHeader, Task, Deadline, State). + State1 = maybe_restore_history(Task, State), + handle_result(Result, TaskHeader, Task, Deadline, State1). + +%% if task range undefined then history is full +maybe_restore_history(#{metadata := #{range := Range}}, State) when map_size(Range) =:= 0 -> + State; +%% if task range is defined then need restore full history for continuation +maybe_restore_history( + _, + #prg_worker_state{ + ns_id = NsId, + ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, + sidecar_pid = Pid, + process = #{process_id := ProcessId} = Process + } = State +) -> + %% TODO + Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, + {ok, #{history := FullHistory}} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, #{}), + State#prg_worker_state{process = Process#{history => FullHistory}}. %% success result with timer handle_result( diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index ea3f5c7..025dde8 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -27,6 +27,7 @@ -export([lifecycle_sink/5]). %% -export([get_process/5]). +-export([get_process/6]). -export([get_task/5]). -type context() :: binary(). @@ -166,7 +167,13 @@ lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) {ok, process()} | {error, _Reason}. get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% Timeout = Deadline - erlang:system_time(millisecond), - gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId}, infinity). + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}}, infinity). + +-spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id(), history_range()) -> + {ok, process()} | {error, _Reason}. +get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId, HistoryRange) -> + %% Timeout = Deadline - erlang:system_time(millisecond), + gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, infinity). -spec get_task(pid(), timestamp_ms(), storage_opts(), namespace_id(), task_id()) -> {ok, task()} | {error, _Reason}. @@ -230,12 +237,12 @@ handle_call( Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; handle_call( - {get_process, StorageOpts, NsId, ProcessId}, + {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, _From, #prg_sidecar_state{} = State ) -> Fun = fun() -> - prg_storage:get_process(StorageOpts, NsId, ProcessId) + prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; diff --git a/src/progressor.erl b/src/progressor.erl index 46ca3ea..efe0242 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -9,6 +9,7 @@ -export([init/1]). -export([call/1]). -export([repair/1]). +-export([simple_repair/1]). -export([get/1]). -export([put/1]). %% TODO @@ -26,7 +27,8 @@ id := id(), args => term(), idempotency_key => binary(), - context => binary() + context => binary(), + range => history_range() }. %% see receive blocks bellow in this module @@ -76,6 +78,20 @@ repair(Req) -> Req#{type => repair} ). +-spec simple_repair(request()) -> {ok, _Result} | {error, _Reason}. +simple_repair(Req) -> + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun check_idempotency/1, + fun(Data) -> check_process_status(Data, <<"error">>) end, + fun add_task/1, + fun(Data) -> prepare_postponed(fun prg_storage:prepare_call/4, Data) end, + fun(_Data) -> {ok, ok} end + ], + Req#{type => timeout} + ). + -spec get(request()) -> {ok, _Result} | {error, _Reason}. get(Req) -> prg_utils:pipe( @@ -139,7 +155,8 @@ add_task(#{id := Id, type := Type} = Opts) -> process_id => Id, args => Args, task_type => convert_task_type(Type), - context => Context + context => Context, + metadata => #{range => maps:get(range, Opts, #{})} }, Task = make_task(maybe_add_idempotency(TaskData, maps:get(idempotency_key, Opts, undefined))), Opts#{task => Task}. @@ -183,6 +200,23 @@ prepare( Error end. +prepare_postponed( + Fun, + #{ns_opts := #{storage := StorageOpts}, ns := NsId, id := ProcessId, task := Task} = Req +) -> + TaskType = maps:get(task_type, Task), + PrepareResult = prg_utils:with_observe( + fun() -> Fun(StorageOpts, NsId, ProcessId, Task#{status => <<"waiting">>}) end, + ?PREPARING_KEY, + [erlang:atom_to_binary(NsId, utf8), TaskType] + ), + case PrepareResult of + {ok, {postpone, TaskId}} -> + Req#{task => Task#{task_id => TaskId}}; + {error, _} = Error -> + Error + end. + get_task_result(#{ ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, idempotency_key := IdempotencyKey }) -> @@ -268,6 +302,8 @@ make_task_header(call, Ref) -> {call, {self(), Ref}}; make_task_header(repair, Ref) -> {repair, {self(), Ref}}; +make_task_header(timeout, _Ref) -> + {timeout, undefined}; make_task_header(notify, _Ref) -> {notify, undefined}. @@ -277,6 +313,8 @@ convert_task_type(call) -> <<"call">>; convert_task_type(notify) -> <<"notify">>; +convert_task_type(timeout) -> + <<"timeout">>; convert_task_type(repair) -> <<"repair">>. @@ -299,6 +337,17 @@ make_task(#{task_type := TaskType} = TaskData) when attempts_count => 0 }, maps:merge(Defaults, TaskData); +make_task(#{task_type := <<"timeout">>} = TaskData) -> + Now = erlang:system_time(second), + Defaults = #{ + %% TODO + metadata => #{<<"kind">> => <<"simple_repair">>}, + status => <<"waiting">>, + scheduled_time => Now, + last_retry_interval => 0, + attempts_count => 0 + }, + maps:merge(Defaults, TaskData); make_task(#{task_type := <<"notify">>} = TaskData) -> Now = erlang:system_time(second), Defaults = #{ @@ -326,7 +375,6 @@ check_for_run(undefined) -> <<"waiting">>; check_for_run(Pid) when is_pid(Pid) -> <<"running">>. -%% action_to_task(undefined, _ProcessId, _Ctx) -> undefined; diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index aad9de1..5ddd2e6 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -12,11 +12,13 @@ %% Tests -export([simple_timers_test/1]). -export([simple_call_test/1]). +-export([simple_call_with_range_test/1]). -export([call_replace_timer_test/1]). -export([call_unset_timer_test/1]). -export([postponed_call_test/1]). -export([postponed_call_to_suspended_process_test/1]). -export([multiple_calls_test/1]). +-export([simple_repair_after_non_retriable_error_test/1]). -export([repair_after_non_retriable_error_test/1]). -export([error_after_max_retries_test/1]). -export([repair_after_call_error_test/1]). @@ -38,11 +40,13 @@ all() -> [ simple_timers_test, simple_call_test, + simple_call_with_range_test, call_replace_timer_test, call_unset_timer_test, postponed_call_test, postponed_call_to_suspended_process_test, multiple_calls_test, + simple_repair_after_non_retriable_error_test, repair_after_non_retriable_error_test, error_after_max_retries_test, repair_after_call_error_test, @@ -126,6 +130,38 @@ simple_call_test(_C) -> unmock_processor(), ok. %% +-spec simple_call_with_range_test(_) -> _. +simple_call_with_range_test(_C) -> + %% steps: + %% 1. init -> [event1, event2, event3, event4], timer 2s + %% 2. call range limit 2 offset 1 -> [event5], timer 0s + %% 3. timeout -> [event6], undefined + _ = mock_processor(simple_call_with_range_test), + Id = gen_id(), + {ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}), + {ok, <<"response">>} = progressor:call(#{ + ns => ?NS, + id => Id, + args => <<"call_args">>, + range => #{offset => 1, limit => 2} + }), + 3 = expect_steps_counter(3), + {ok, #{ + process_id := Id, + status := <<"running">>, + history := [ + #{event_id := 1}, + #{event_id := 2}, + #{event_id := 3}, + #{event_id := 4}, + #{event_id := 5}, + #{event_id := 6} + ] + }} = progressor:get(#{ns => ?NS, id => Id}), + unmock_processor(), + ok. +%% + -spec call_replace_timer_test(_) -> _. call_replace_timer_test(_C) -> %% steps: @@ -296,6 +332,48 @@ multiple_calls_test(_C) -> unmock_processor(), ok. +-spec simple_repair_after_non_retriable_error_test(_) -> _. +simple_repair_after_non_retriable_error_test(_C) -> + %% steps: + %% 1. init -> [], timer 0s + %% 2. timeout -> {error, do_not_retry} + %% 3. timeout(via simple repair call) -> [event1], undefined + %% 4. timeout -> [event2], undefined + _ = mock_processor(simple_repair_after_non_retriable_error_test), + Id = gen_id(), + {ok, ok} = progressor:init(#{ns => ?NS, id => Id, args => <<"init_args">>}), + 2 = expect_steps_counter(2), + {ok, #{ + detail := <<"do_not_retry">>, + history := [], + process_id := Id, + status := <<"error">> + }} = progressor:get(#{ns => ?NS, id => Id}), + {ok, ok} = progressor:simple_repair(#{ns => ?NS, id => Id, context => <<"simple_repair_ctx">>}), + 4 = expect_steps_counter(4), + {ok, + #{ + process_id := Id, + status := <<"running">>, + history := [ + #{ + event_id := 1, + metadata := #{<<"format_version">> := 1}, + payload := _Pl1, + timestamp := _Ts1 + }, + #{ + event_id := 2, + metadata := #{<<"format_version">> := 1}, + payload := _Pl2, + timestamp := _Ts2 + } + ] + } = Process} = progressor:get(#{ns => ?NS, id => Id}), + false = erlang:is_map_key(detail, Process), + unmock_processor(), + ok. + -spec repair_after_non_retriable_error_test(_) -> _. repair_after_non_retriable_error_test(_C) -> %% steps: @@ -630,6 +708,39 @@ mock_processor(simple_call_test = TestCase) -> end, mock_processor(TestCase, MockProcessor); %% +mock_processor(simple_call_with_range_test = TestCase) -> + Self = self(), + MockProcessor = fun + ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> + Result = #{ + events => [event(1), event(2), event(3), event(4)] + }, + Self ! 1, + {ok, Result}; + ({call, <<"call_args">>, #{history := History} = _Process}, _Opts, _Ctx) -> + %% call with range limit=2, offset=1 + ?assertEqual(2, erlang:length(History)), + [ + #{event_id := 2}, + #{event_id := 3} + ] = History, + Result = #{ + response => <<"response">>, + events => [event(5)], + action => #{set_timer => erlang:system_time(second)} + }, + Self ! 2, + {ok, Result}; + ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> + ?assertEqual(5, erlang:length(History)), + Result = #{ + events => [event(6)] + }, + Self ! 3, + {ok, Result} + end, + mock_processor(TestCase, MockProcessor); +%% mock_processor(call_replace_timer_test = TestCase) -> Self = self(), MockProcessor = fun @@ -774,6 +885,37 @@ mock_processor(multiple_calls_test = TestCase) -> end, mock_processor(TestCase, MockProcessor); %% +mock_processor(simple_repair_after_non_retriable_error_test = TestCase) -> + Self = self(), + MockProcessor = fun + ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> + Result = #{ + events => [], + action => #{set_timer => erlang:system_time(second)} + }, + Self ! 1, + {ok, Result}; + ({timeout, <<>>, #{history := []} = _Process}, _Opts, <<>>) -> + Self ! 2, + {error, do_not_retry}; + ({timeout, <<>>, #{history := []} = _Process}, _Opts, <<"simple_repair_ctx">>) -> + %% timeout via simple repair + Result = #{ + events => [event(1)], + action => #{set_timer => erlang:system_time(second)} + }, + Self ! 3, + {ok, Result}; + ({timeout, <<>>, #{history := History} = _Process}, _Opts, _Ctx) -> + ?assertEqual(1, erlang:length(History)), + Result = #{ + events => [event(2)] + }, + Self ! 4, + {ok, Result} + end, + mock_processor(TestCase, MockProcessor); +%% mock_processor(repair_after_non_retriable_error_test = TestCase) -> Self = self(), MockProcessor = fun