Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/progressor.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
corrupted_by => task_id(),
range => history_range(),
last_event_id => event_id(),
running_task => task_id()
initialization => task_id()
}.

-type task() :: #{
Expand Down
8 changes: 4 additions & 4 deletions src/prg_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-export([prepare_repair/4]).
-export([put_process_data/4]).
-export([process_trace/3]).
-export([get_process_with_running/4]).
-export([get_process_with_initialization/4]).

%% scan functions
-export([search_timers/4]).
Expand Down Expand Up @@ -70,10 +70,10 @@ put_process_data(#{client := Handler, options := HandlerOpts}, NsId, Id, Process
process_trace(#{client := Handler, options := HandlerOpts}, NsId, Id) ->
Handler:process_trace(HandlerOpts, NsId, Id).

-spec get_process_with_running(storage_opts(), namespace_id(), id(), history_range()) ->
-spec get_process_with_initialization(storage_opts(), namespace_id(), id(), history_range()) ->
{ok, process()} | {error, _Reason}.
get_process_with_running(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) ->
Handler:get_process_with_running(HandlerOpts, NsId, ProcessId, HistoryRange).
get_process_with_initialization(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) ->
Handler:get_process_with_initialization(HandlerOpts, NsId, ProcessId, HistoryRange).

-spec prepare_init(storage_opts(), namespace_id(), id(), task()) ->
{ok, {postpone, task_id()} | {continue, task_id()}} | {error, _Reason}.
Expand Down
39 changes: 14 additions & 25 deletions src/progressor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,43 +262,32 @@ get_task_result(#{
{error, not_found} ->
not_found;
{error, in_progress} ->
TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC),
Timeout = TimeoutSec * 1000,
await_task_result(StorageOpts, NsId, {idempotency_key, IdempotencyKey}, Timeout, 0)
StepTimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC),
StepTimeout = StepTimeoutSec * 1000,
await_task_result(StorageOpts, NsId, {idempotency_key, IdempotencyKey}, StepTimeout, 0)
end.

await_task_result(_StorageOpts, _NsId, _KeyOrId, Timeout, Duration) when Duration > Timeout ->
await_task_result(_StorageOpts, _NsId, _KeyOrId, StepTimeout, Duration) when Duration > StepTimeout ->
{error, <<"timeout">>};
await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration) ->
await_task_result(StorageOpts, NsId, KeyOrId, StepTimeout, Duration) ->
case prg_storage:get_task_result(StorageOpts, NsId, KeyOrId) of
{ok, Result} ->
Result;
{error, _} ->
timer:sleep(?TASK_REPEAT_REQUEST_TIMEOUT),
RepeatTimeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT),
timer:sleep(RepeatTimeout),
await_task_result(
StorageOpts, NsId, KeyOrId, Timeout, Duration + ?TASK_REPEAT_REQUEST_TIMEOUT
StorageOpts, NsId, KeyOrId, StepTimeout, Duration + RepeatTimeout
)
end.

do_get(
#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange, running_task := TaskId} = Req
) ->
%% retry clause
case prg_storage:get_process_with_running(StorageOpts, NsId, Id, HistoryRange) of
{ok, #{running_task := TaskId}} ->
%% same task_id, await task finished
timer:sleep(1000),
do_get(Req);
Result ->
%% previous task finished, return result
Result
end;
do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) ->
case prg_storage:get_process_with_running(StorageOpts, NsId, Id, HistoryRange) of
{ok, #{running_task := TaskId}} ->
%% some task running, sleep and retry
timer:sleep(1000),
do_get(Req#{running_task => TaskId});
case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of
{ok, #{initialization := _TaskId}} ->
%% init task not finished, await and retry
Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT),
timer:sleep(Timeout),
do_get(Req);
Result ->
Result
end;
Expand Down
16 changes: 8 additions & 8 deletions src/storage/postgres/prg_pg_backend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
-export([prepare_repair/4]).
-export([put_process_data/4]).
-export([process_trace/3]).
-export([get_process_with_running/4]).
-export([get_process_with_initialization/4]).

%% scan functions
-export([collect_zombies/3]).
Expand Down Expand Up @@ -137,9 +137,9 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) ->
),
parse_process_info(RawResult, HistoryRange).

-spec get_process_with_running(pg_opts(), namespace_id(), id(), history_range()) ->
-spec get_process_with_initialization(pg_opts(), namespace_id(), id(), history_range()) ->
{ok, process()} | {error, _Reason}.
get_process_with_running(PgOpts, NsId, ProcessId, HistoryRange) ->
get_process_with_initialization(PgOpts, NsId, ProcessId, HistoryRange) ->
Pool = get_pool(external, PgOpts),
#{
processes := ProcessesTable,
Expand All @@ -150,7 +150,7 @@ get_process_with_running(PgOpts, NsId, ProcessId, HistoryRange) ->
RawResult = epg_pool:transaction(
Pool,
fun(Connection) ->
case do_get_process_with_running(Connection, ProcessesTable, RunningTable, ProcessId) of
case do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) of
{ok, _, []} ->
{error, <<"process not found">>};
{ok, ColumnsPr, RowsPr} ->
Expand Down Expand Up @@ -699,13 +699,13 @@ do_get_process(Connection, Table, ProcessId) ->
[ProcessId]
).

do_get_process_with_running(Connection, ProcessesTable, RunningTable, ProcessId) ->
do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) ->
SQL =
"SELECT"
" pr.*, rt.task_id as running_task FROM " ++ ProcessesTable ++
" pr.*, rt.task_id as initialization FROM " ++ ProcessesTable ++
" pr "
" LEFT JOIN " ++ RunningTable ++
" rt ON pr.process_id = rt.process_id "
" rt ON pr.process_id = rt.process_id AND rt.task_type = 'init'"
" WHERE pr.process_id = $1",
epg_pool:query(
Connection,
Expand Down Expand Up @@ -1147,7 +1147,7 @@ marshal_process(Process) ->
(<<"aux_state">>, AuxState, Acc) -> Acc#{aux_state => AuxState};
(<<"metadata">>, Meta, Acc) -> Acc#{metadata => Meta};
(<<"corrupted_by">>, CorruptedBy, Acc) -> Acc#{corrupted_by => CorruptedBy};
(<<"running_task">>, RunningTask, Acc) -> Acc#{running_task => RunningTask};
(<<"initialization">>, TaskId, Acc) -> Acc#{initialization => TaskId};
(_, _, Acc) -> Acc
end,
#{},
Expand Down
6 changes: 4 additions & 2 deletions test/prg_base_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ put_process_with_remove_test(C) ->
-spec task_race_condition_hack_test(_) -> _.
task_race_condition_hack_test(C) ->
%% steps:
%% 1. init (spawn) -> [event1], timer 3s
%% 1. init (sleep 3s) -> [event1], undefined
_ = mock_processor(task_race_condition_hack_test),
Id = gen_id(),
erlang:spawn(fun() -> progressor:init(#{ns => ?NS(C), id => Id, args => <<"init_args">>}) end),
Expand All @@ -847,6 +847,7 @@ task_race_condition_hack_test(C) ->
range := #{},
history := [#{event_id := 1}],
process_id := Id,
aux_state := <<"aux_state">>,
last_event_id := 1
}} = progressor:get(#{ns => ?NS(C), id => Id}),
ok.
Expand Down Expand Up @@ -1261,7 +1262,8 @@ mock_processor(task_race_condition_hack_test = TestCase) ->
MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) ->
timer:sleep(3000),
Result = #{
events => [event(1)]
events => [event(1)],
aux_state => <<"aux_state">>
},
Self ! 1,
{ok, Result}
Expand Down
Loading