Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/prg_scanner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ init(
rescan_timeout = RescanTimeoutMs,
step_timeout = StepTimeoutMs
},
_ = start_rescan_timers(RescanTimeoutMs),
_ = start_rescan_calls((RescanTimeoutMs div 3) + 100),
_ = start_zombie_collector(StepTimeoutMs),
_ =
case maps:get(worker_pool_size, Opts) > 0 of
true ->
_ = start_rescan_timers(RescanTimeoutMs),
_ = start_rescan_calls((RescanTimeoutMs div 3) + 100),
_ = start_zombie_collector(StepTimeoutMs);
false ->
skip_scanning
end,
{ok, State}.

handle_call(_Request, _From, State) ->
Expand Down
24 changes: 22 additions & 2 deletions src/prg_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ handle_cast(
) ->
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
ProcessId = maps:get(process_id, Task),
{ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId),
HistoryRange = maps:get(range, maps:get(metadata, Task, #{}), #{}),
{ok, Process} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, HistoryRange),
NewState = do_process_task(TaskHeader, Task, Deadline, State#prg_worker_state{process = Process}),
{noreply, NewState};
handle_cast(
Expand Down Expand Up @@ -137,7 +138,26 @@ do_process_task(
Ctx = maps:get(context, Task, <<>>),
Request = {extract_task_type(TaskHeader), Args, Process},
Result = prg_worker_sidecar:process(Pid, Deadline, NsOpts, Request, Ctx),
handle_result(Result, TaskHeader, Task, Deadline, State).
State1 = maybe_restore_history(Task, State),
handle_result(Result, TaskHeader, Task, Deadline, State1).

%% if task range undefined then history is full
maybe_restore_history(#{metadata := #{range := Range}}, State) when map_size(Range) =:= 0 ->
State;
%% if task range is defined then need restore full history for continuation
maybe_restore_history(
_,
#prg_worker_state{
ns_id = NsId,
ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts,
sidecar_pid = Pid,
process = #{process_id := ProcessId} = Process
} = State
) ->
%% TODO
Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000,
{ok, #{history := FullHistory}} = prg_worker_sidecar:get_process(Pid, Deadline, StorageOpts, NsId, ProcessId, #{}),
State#prg_worker_state{process = Process#{history => FullHistory}}.

%% success result with timer
handle_result(
Expand Down
13 changes: 10 additions & 3 deletions src/prg_worker_sidecar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
-export([lifecycle_sink/5]).
%%
-export([get_process/5]).
-export([get_process/6]).
-export([get_task/5]).

-type context() :: binary().
Expand Down Expand Up @@ -166,7 +167,13 @@ lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId)
{ok, process()} | {error, _Reason}.
get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) ->
%% Timeout = Deadline - erlang:system_time(millisecond),
gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId}, infinity).
gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, #{}}, infinity).

-spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id(), history_range()) ->
{ok, process()} | {error, _Reason}.
get_process(Pid, _Deadline, StorageOpts, NsId, ProcessId, HistoryRange) ->
%% Timeout = Deadline - erlang:system_time(millisecond),
gen_server:call(Pid, {get_process, StorageOpts, NsId, ProcessId, HistoryRange}, infinity).

-spec get_task(pid(), timestamp_ms(), storage_opts(), namespace_id(), task_id()) ->
{ok, task()} | {error, _Reason}.
Expand Down Expand Up @@ -230,12 +237,12 @@ handle_call(
Response = do_with_retry(Fun, ?DEFAULT_DELAY),
{reply, Response, State};
handle_call(
{get_process, StorageOpts, NsId, ProcessId},
{get_process, StorageOpts, NsId, ProcessId, HistoryRange},
_From,
#prg_sidecar_state{} = State
) ->
Fun = fun() ->
prg_storage:get_process(StorageOpts, NsId, ProcessId)
prg_storage:get_process(internal, StorageOpts, NsId, ProcessId, HistoryRange)
end,
Response = do_with_retry(Fun, ?DEFAULT_DELAY),
{reply, Response, State};
Expand Down
54 changes: 51 additions & 3 deletions src/progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-export([init/1]).
-export([call/1]).
-export([repair/1]).
-export([simple_repair/1]).
-export([get/1]).
-export([put/1]).
%% TODO
Expand All @@ -26,7 +27,8 @@
id := id(),
args => term(),
idempotency_key => binary(),
context => binary()
context => binary(),
range => history_range()
}.

%% see receive blocks bellow in this module
Expand Down Expand Up @@ -76,6 +78,20 @@ repair(Req) ->
Req#{type => repair}
).

-spec simple_repair(request()) -> {ok, _Result} | {error, _Reason}.
simple_repair(Req) ->
prg_utils:pipe(
[
fun add_ns_opts/1,
fun check_idempotency/1,
fun(Data) -> check_process_status(Data, <<"error">>) end,
fun add_task/1,
fun(Data) -> prepare_postponed(fun prg_storage:prepare_call/4, Data) end,
fun(_Data) -> {ok, ok} end
],
Req#{type => timeout}
).

-spec get(request()) -> {ok, _Result} | {error, _Reason}.
get(Req) ->
prg_utils:pipe(
Expand Down Expand Up @@ -139,7 +155,8 @@ add_task(#{id := Id, type := Type} = Opts) ->
process_id => Id,
args => Args,
task_type => convert_task_type(Type),
context => Context
context => Context,
metadata => #{range => maps:get(range, Opts, #{})}
},
Task = make_task(maybe_add_idempotency(TaskData, maps:get(idempotency_key, Opts, undefined))),
Opts#{task => Task}.
Expand Down Expand Up @@ -183,6 +200,23 @@ prepare(
Error
end.

prepare_postponed(
Fun,
#{ns_opts := #{storage := StorageOpts}, ns := NsId, id := ProcessId, task := Task} = Req
) ->
TaskType = maps:get(task_type, Task),
PrepareResult = prg_utils:with_observe(
fun() -> Fun(StorageOpts, NsId, ProcessId, Task#{status => <<"waiting">>}) end,
?PREPARING_KEY,
[erlang:atom_to_binary(NsId, utf8), TaskType]
),
case PrepareResult of
{ok, {postpone, TaskId}} ->
Req#{task => Task#{task_id => TaskId}};
{error, _} = Error ->
Error
end.

get_task_result(#{
ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, idempotency_key := IdempotencyKey
}) ->
Expand Down Expand Up @@ -268,6 +302,8 @@ make_task_header(call, Ref) ->
{call, {self(), Ref}};
make_task_header(repair, Ref) ->
{repair, {self(), Ref}};
make_task_header(timeout, _Ref) ->
{timeout, undefined};
make_task_header(notify, _Ref) ->
{notify, undefined}.

Expand All @@ -277,6 +313,8 @@ convert_task_type(call) ->
<<"call">>;
convert_task_type(notify) ->
<<"notify">>;
convert_task_type(timeout) ->
<<"timeout">>;
convert_task_type(repair) ->
<<"repair">>.

Expand All @@ -299,6 +337,17 @@ make_task(#{task_type := TaskType} = TaskData) when
attempts_count => 0
},
maps:merge(Defaults, TaskData);
make_task(#{task_type := <<"timeout">>} = TaskData) ->
Now = erlang:system_time(second),
Defaults = #{
%% TODO
metadata => #{<<"kind">> => <<"simple_repair">>},
status => <<"waiting">>,
scheduled_time => Now,
last_retry_interval => 0,
attempts_count => 0
},
maps:merge(Defaults, TaskData);
make_task(#{task_type := <<"notify">>} = TaskData) ->
Now = erlang:system_time(second),
Defaults = #{
Expand Down Expand Up @@ -326,7 +375,6 @@ check_for_run(undefined) ->
<<"waiting">>;
check_for_run(Pid) when is_pid(Pid) ->
<<"running">>.
%%

action_to_task(undefined, _ProcessId, _Ctx) ->
undefined;
Expand Down
Loading
Loading