diff --git a/include/progressor.hrl b/include/progressor.hrl index 3c8430c..cd80fa9 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -11,7 +11,7 @@ corrupted_by => task_id(), range => history_range(), last_event_id => event_id(), - running_task => task_id() + initialization => task_id() }. -type task() :: #{ diff --git a/src/prg_storage.erl b/src/prg_storage.erl index 0232df2..787455e 100644 --- a/src/prg_storage.erl +++ b/src/prg_storage.erl @@ -11,7 +11,7 @@ -export([prepare_repair/4]). -export([put_process_data/4]). -export([process_trace/3]). --export([get_process_with_running/4]). +-export([get_process_with_initialization/4]). %% scan functions -export([search_timers/4]). @@ -70,10 +70,10 @@ put_process_data(#{client := Handler, options := HandlerOpts}, NsId, Id, Process process_trace(#{client := Handler, options := HandlerOpts}, NsId, Id) -> Handler:process_trace(HandlerOpts, NsId, Id). --spec get_process_with_running(storage_opts(), namespace_id(), id(), history_range()) -> +-spec get_process_with_initialization(storage_opts(), namespace_id(), id(), history_range()) -> {ok, process()} | {error, _Reason}. -get_process_with_running(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) -> - Handler:get_process_with_running(HandlerOpts, NsId, ProcessId, HistoryRange). +get_process_with_initialization(#{client := Handler, options := HandlerOpts}, NsId, ProcessId, HistoryRange) -> + Handler:get_process_with_initialization(HandlerOpts, NsId, ProcessId, HistoryRange). -spec prepare_init(storage_opts(), namespace_id(), id(), task()) -> {ok, {postpone, task_id()} | {continue, task_id()}} | {error, _Reason}. diff --git a/src/progressor.erl b/src/progressor.erl index e237545..ba21ff1 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -262,43 +262,32 @@ get_task_result(#{ {error, not_found} -> not_found; {error, in_progress} -> - TimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), - Timeout = TimeoutSec * 1000, - await_task_result(StorageOpts, NsId, {idempotency_key, IdempotencyKey}, Timeout, 0) + StepTimeoutSec = maps:get(process_step_timeout, NsOpts, ?DEFAULT_STEP_TIMEOUT_SEC), + StepTimeout = StepTimeoutSec * 1000, + await_task_result(StorageOpts, NsId, {idempotency_key, IdempotencyKey}, StepTimeout, 0) end. -await_task_result(_StorageOpts, _NsId, _KeyOrId, Timeout, Duration) when Duration > Timeout -> +await_task_result(_StorageOpts, _NsId, _KeyOrId, StepTimeout, Duration) when Duration > StepTimeout -> {error, <<"timeout">>}; -await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration) -> +await_task_result(StorageOpts, NsId, KeyOrId, StepTimeout, Duration) -> case prg_storage:get_task_result(StorageOpts, NsId, KeyOrId) of {ok, Result} -> Result; {error, _} -> - timer:sleep(?TASK_REPEAT_REQUEST_TIMEOUT), + RepeatTimeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT), + timer:sleep(RepeatTimeout), await_task_result( - StorageOpts, NsId, KeyOrId, Timeout, Duration + ?TASK_REPEAT_REQUEST_TIMEOUT + StorageOpts, NsId, KeyOrId, StepTimeout, Duration + RepeatTimeout ) end. -do_get( - #{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange, running_task := TaskId} = Req -) -> - %% retry clause - case prg_storage:get_process_with_running(StorageOpts, NsId, Id, HistoryRange) of - {ok, #{running_task := TaskId}} -> - %% same task_id, await task finished - timer:sleep(1000), - do_get(Req); - Result -> - %% previous task finished, return result - Result - end; do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, range := HistoryRange} = Req) -> - case prg_storage:get_process_with_running(StorageOpts, NsId, Id, HistoryRange) of - {ok, #{running_task := TaskId}} -> - %% some task running, sleep and retry - timer:sleep(1000), - do_get(Req#{running_task => TaskId}); + case prg_storage:get_process_with_initialization(StorageOpts, NsId, Id, HistoryRange) of + {ok, #{initialization := _TaskId}} -> + %% init task not finished, await and retry + Timeout = application:get_env(progressor, task_repeat_request_timeout, ?TASK_REPEAT_REQUEST_TIMEOUT), + timer:sleep(Timeout), + do_get(Req); Result -> Result end; diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 861a3fa..4b6fa43 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -14,7 +14,7 @@ -export([prepare_repair/4]). -export([put_process_data/4]). -export([process_trace/3]). --export([get_process_with_running/4]). +-export([get_process_with_initialization/4]). %% scan functions -export([collect_zombies/3]). @@ -137,9 +137,9 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> ), parse_process_info(RawResult, HistoryRange). --spec get_process_with_running(pg_opts(), namespace_id(), id(), history_range()) -> +-spec get_process_with_initialization(pg_opts(), namespace_id(), id(), history_range()) -> {ok, process()} | {error, _Reason}. -get_process_with_running(PgOpts, NsId, ProcessId, HistoryRange) -> +get_process_with_initialization(PgOpts, NsId, ProcessId, HistoryRange) -> Pool = get_pool(external, PgOpts), #{ processes := ProcessesTable, @@ -150,7 +150,7 @@ get_process_with_running(PgOpts, NsId, ProcessId, HistoryRange) -> RawResult = epg_pool:transaction( Pool, fun(Connection) -> - case do_get_process_with_running(Connection, ProcessesTable, RunningTable, ProcessId) of + case do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) of {ok, _, []} -> {error, <<"process not found">>}; {ok, ColumnsPr, RowsPr} -> @@ -699,13 +699,13 @@ do_get_process(Connection, Table, ProcessId) -> [ProcessId] ). -do_get_process_with_running(Connection, ProcessesTable, RunningTable, ProcessId) -> +do_get_process_with_initialization(Connection, ProcessesTable, RunningTable, ProcessId) -> SQL = "SELECT" - " pr.*, rt.task_id as running_task FROM " ++ ProcessesTable ++ + " pr.*, rt.task_id as initialization FROM " ++ ProcessesTable ++ " pr " " LEFT JOIN " ++ RunningTable ++ - " rt ON pr.process_id = rt.process_id " + " rt ON pr.process_id = rt.process_id AND rt.task_type = 'init'" " WHERE pr.process_id = $1", epg_pool:query( Connection, @@ -1147,7 +1147,7 @@ marshal_process(Process) -> (<<"aux_state">>, AuxState, Acc) -> Acc#{aux_state => AuxState}; (<<"metadata">>, Meta, Acc) -> Acc#{metadata => Meta}; (<<"corrupted_by">>, CorruptedBy, Acc) -> Acc#{corrupted_by => CorruptedBy}; - (<<"running_task">>, RunningTask, Acc) -> Acc#{running_task => RunningTask}; + (<<"initialization">>, TaskId, Acc) -> Acc#{initialization => TaskId}; (_, _, Acc) -> Acc end, #{}, diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 4ef6f37..7790fe4 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -837,7 +837,7 @@ put_process_with_remove_test(C) -> -spec task_race_condition_hack_test(_) -> _. task_race_condition_hack_test(C) -> %% steps: - %% 1. init (spawn) -> [event1], timer 3s + %% 1. init (sleep 3s) -> [event1], undefined _ = mock_processor(task_race_condition_hack_test), Id = gen_id(), erlang:spawn(fun() -> progressor:init(#{ns => ?NS(C), id => Id, args => <<"init_args">>}) end), @@ -847,6 +847,7 @@ task_race_condition_hack_test(C) -> range := #{}, history := [#{event_id := 1}], process_id := Id, + aux_state := <<"aux_state">>, last_event_id := 1 }} = progressor:get(#{ns => ?NS(C), id => Id}), ok. @@ -1261,7 +1262,8 @@ mock_processor(task_race_condition_hack_test = TestCase) -> MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> timer:sleep(3000), Result = #{ - events => [event(1)] + events => [event(1)], + aux_state => <<"aux_state">> }, Self ! 1, {ok, Result}