diff --git a/.github/settings.yml b/.github/settings.yml new file mode 100644 index 0000000..9267e7d --- /dev/null +++ b/.github/settings.yml @@ -0,0 +1,2 @@ +# These settings are synced to GitHub by https://probot.github.io/apps/settings/ +_extends: .github diff --git a/.github/workflows/erlang-checks.yaml b/.github/workflows/erlang-checks.yaml new file mode 100644 index 0000000..7d1512b --- /dev/null +++ b/.github/workflows/erlang-checks.yaml @@ -0,0 +1,41 @@ +name: Erlang CI Checks + +on: + push: + branches: + - 'master' + - 'epic/**' + pull_request: + branches: ['**'] + +jobs: + setup: + name: Load .env + runs-on: ubuntu-latest + outputs: + otp-version: ${{ steps.otp-version.outputs.version }} + rebar-version: ${{ steps.rebar-version.outputs.version }} + thrift-version: ${{ steps.thrift-version.outputs.version }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - run: grep -v '^#' .env >> $GITHUB_ENV + - id: otp-version + run: echo "::set-output name=version::$OTP_VERSION" + - id: rebar-version + run: echo "::set-output name=version::$REBAR_VERSION" + - id: thrift-version + run: echo "::set-output name=version::$THRIFT_VERSION" + + run: + name: Run checks + needs: setup + uses: valitydev/erlang-workflows/.github/workflows/erlang-parallel-build.yml@v1.0.17 + with: + otp-version: ${{ needs.setup.outputs.otp-version }} + rebar-version: ${{ needs.setup.outputs.rebar-version }} + use-thrift: true + thrift-version: ${{ needs.setup.outputs.thrift-version }} + run-ct-with-compose: true + use-coveralls: true + upload-coverage: false diff --git a/.github/workflows/tag-action.yml b/.github/workflows/tag-action.yml new file mode 100644 index 0000000..ec3faf4 --- /dev/null +++ b/.github/workflows/tag-action.yml @@ -0,0 +1,18 @@ +name: Create Tag + +on: + push: + branches: + - master + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + + - uses: valitydev/action-tagger@v2 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + with-v: true diff --git a/Makefile b/Makefile index cd84254..7345cae 100644 --- a/Makefile +++ b/Makefile @@ -92,9 +92,6 @@ check-format: dialyze: $(REBAR) as test dialyzer -release: - $(REBAR) as prod release - eunit: $(REBAR) eunit --cover diff --git a/config/sys.config b/config/sys.config index 0a9afdc..7dba5d6 100644 --- a/config/sys.config +++ b/config/sys.config @@ -14,9 +14,11 @@ }, %% optional retry_policy => #{ - initial_timeout => 3, %% seconds + %% seconds + initial_timeout => 3, backoff_coefficient => 1.2, - max_timeout => 180, %% seconds + %% seconds + max_timeout => 180, max_attempts => 2, non_retryable_errors => [ some_reason, @@ -26,11 +28,15 @@ ] }, %% optional, default process_step_timeout div 2 - task_scan_timeout => 10, %% seconds + + %% seconds + task_scan_timeout => 10, %% optional, default 100 worker_pool_size => 200, %% optional, default 60 sec - process_step_timeout => 30 %% seconds + + %% seconds + process_step_timeout => 30 }}, {namespaces, #{ @@ -95,5 +101,4 @@ ]} ]} ]} - ]. diff --git a/elvis.config b/elvis.config new file mode 100644 index 0000000..8472abf --- /dev/null +++ b/elvis.config @@ -0,0 +1,70 @@ +[ + {elvis, [ + {verbose, true}, + {config, [ + #{ + dirs => ["src", "src/**", "include", "test"], + filter => "*.erl", + ruleset => erl_files, + rules => [ + {elvis_text_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_text_style, no_tabs}, + {elvis_text_style, no_trailing_whitespace}, + {elvis_style, macro_module_names}, + {elvis_style, operator_spaces, #{rules => [{right, ","}, {right, "++"}, {left, "++"}]}}, + {elvis_style, nesting_level, #{level => 4}}, + {elvis_style, no_if_expression}, + %% FIXME Implement appropriate behaviours + {elvis_style, invalid_dynamic_call, #{ + ignore => [prg_storage, prg_worker_sidecar] + }}, + {elvis_style, used_ignored_variable}, + {elvis_style, no_behavior_info}, + {elvis_style, module_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$"}}, + {elvis_style, function_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*$"}}, + %% FIXME State naming convention or leave modules ignored + {elvis_style, state_record_and_type, #{ + ignore => [prg_scanner, prg_scheduler, prg_worker, prg_worker_sidecar] + }}, + {elvis_style, no_spec_with_records}, + {elvis_style, no_debug_call, #{}}, + {elvis_style, export_used_types, disable}, + %% FIXME Maybe refactor code blocks + {elvis_style, dont_repeat_yourself, #{ + ignore => [prg_worker, prg_worker_sidecar, progressor_app, prg_pg_backend, prg_base_SUITE] + }} + ] + }, + #{ + dirs => ["."], + filter => "Makefile", + ruleset => makefiles + }, + #{ + dirs => ["."], + filter => "elvis.config", + ruleset => elvis_config + }, + #{ + dirs => ["."], + filter => "rebar.config", + ruleset => rebar_config, + rules => [ + {elvis_text_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_text_style, no_tabs}, + {elvis_text_style, no_trailing_whitespace}, + {elvis_project, no_branch_deps, disable} + ] + }, + #{ + dirs => ["src"], + filter => "*.app.src", + rules => [ + {elvis_text_style, line_length, #{limit => 120, skip_comments => false}}, + {elvis_text_style, no_tabs}, + {elvis_text_style, no_trailing_whitespace} + ] + } + ]} + ]} +]. diff --git a/include/progressor.hrl b/include/progressor.hrl index c4b2b29..5e33e9b 100644 --- a/include/progressor.hrl +++ b/include/progressor.hrl @@ -1,4 +1,3 @@ - %%% %%% Base entities %%% @@ -137,7 +136,8 @@ -define(DEFAULT_STEP_TIMEOUT_SEC, 60). -define(DEFAULT_RETRY_POLICY, #{ - initial_timeout => 5, %% second + %% second + initial_timeout => 5, backoff_coefficient => 1.0, max_attempts => 3 }). diff --git a/rebar.config b/rebar.config index 5333c6a..cf6136f 100644 --- a/rebar.config +++ b/rebar.config @@ -3,24 +3,27 @@ {brod, "4.3.2"}, {prometheus, "4.11.0"}, {recon, "2.5.6"}, - {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {branch, "master"}}}, + {thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}}, {mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}}, {epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {branch, "master"}}} ]}. -{relx, [ - {release, - {progressor, "0.1"}, - [progressor, brod, epg_connector], - [ - {dev_mode, false}, - {include_erts, false}, - {include_src, false}, - {sys_config, "config/sys.config"}, - {vm_args, "config/vm.args"}, - {extended_start_script, true} - ] - } +{xref_checks, [ + % mandatory + undefined_function_calls, + undefined_functions, + deprecated_functions_calls, + deprecated_functions +]}. + +{dialyzer, [ + {warnings, [ + % mandatory + unmatched_returns, + error_handling, + unknown + ]}, + {plt_apps, all_deps} ]}. {shell, [ @@ -36,6 +39,17 @@ {test, [ {deps, [ {meck, "0.9.2"} - ]} + ]}, + {dialyzer, [{plt_extra_apps, [eunit, common_test, runtime_tools, meck]}]} ]} ]}. + +{project_plugins, [ + {erlfmt, "1.5.0"}, + {rebar3_lint, "3.2.6"} +]}. + +{erlfmt, [ + {print_width, 120}, + {files, ["{src,include,test}/*.{hrl,erl}", "src/**/*.erl", "rebar.config", "elvis.config", "config/sys.config"]} +]}. diff --git a/src/prg_namespace_sup.erl b/src/prg_namespace_sup.erl index 26150fd..0d8070a 100644 --- a/src/prg_namespace_sup.erl +++ b/src/prg_namespace_sup.erl @@ -14,7 +14,8 @@ %%% API functions %%%=================================================================== --spec(start_link({namespace_id(), namespace_opts()}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +-spec start_link({namespace_id(), namespace_opts()}) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. start_link({NsId, #{storage := StorageOpts}} = NS) -> ok = prg_storage:db_init(StorageOpts, NsId), RegName = prg_utils:registered_name(NsId, "_namespace_sup"), @@ -27,9 +28,11 @@ start_link({NsId, #{storage := StorageOpts}} = NS) -> init({NsId, _NsOpts} = NS) -> MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, - SupFlags = #{strategy => one_for_all, + SupFlags = #{ + strategy => one_for_all, intensity => MaxRestarts, - period => MaxSecondsBetweenRestarts}, + period => MaxSecondsBetweenRestarts + }, SchedulerSpec = #{ id => prg_utils:registered_name(NsId, "_scheduler"), start => {prg_scheduler, start_link, [NS]} diff --git a/src/prg_notifier.erl b/src/prg_notifier.erl index af54b71..864eb87 100644 --- a/src/prg_notifier.erl +++ b/src/prg_notifier.erl @@ -27,7 +27,8 @@ event_sink( event_sink(_NsOpts, _ID, _Events) -> ok. --spec lifecycle_sink(namespace_opts(), task_t() | {error, _Reason}, id()) -> ok | {error, _Reason} | no_return(). +-spec lifecycle_sink(namespace_opts(), task_t() | {error, _Reason}, id()) -> + ok | {error, _Reason} | no_return(). lifecycle_sink( #{ namespace := NS, @@ -54,7 +55,8 @@ encode(Encoder, NS, ID, Events) -> #{ key => event_key(NS, ID), value => Encoder(NS, ID, Event) - } || Event <- Events + } + || Event <- Events ]. produce(Client, Topic, PartitionKey, Batch) -> diff --git a/src/prg_scanner.erl b/src/prg_scanner.erl index 83d9ab7..593ac92 100644 --- a/src/prg_scanner.erl +++ b/src/prg_scanner.erl @@ -3,8 +3,14 @@ -behaviour(gen_server). -export([start_link/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). -record(prg_scanner_state, {ns_id, ns_opts, rescan_timeout, step_timeout}). @@ -23,8 +29,10 @@ start_link({NsId, _NsOpts} = NS) -> RegName = prg_utils:registered_name(NsId, "_scanner"), gen_server:start_link({local, RegName}, ?MODULE, NS, []). -init({NsId, #{task_scan_timeout := RescanTimeoutSec, process_step_timeout := StepTimeoutSec} = Opts}) -> - RescanTimeoutMs = RescanTimeoutSec * 1000, +init( + {NsId, #{task_scan_timeout := RescanTimeoutSec, process_step_timeout := StepTimeoutSec} = Opts} +) -> + RescanTimeoutMs = RescanTimeoutSec * 1000, StepTimeoutMs = StepTimeoutSec * 1000, State = #prg_scanner_state{ ns_id = NsId, @@ -40,12 +48,12 @@ init({NsId, #{task_scan_timeout := RescanTimeoutSec, process_step_timeout := Ste handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast(_Request, State = #prg_scanner_state{}) -> +handle_cast(_Request, #prg_scanner_state{} = State) -> {noreply, State}. handle_info( {timeout, _TimerRef, rescan_timers}, - State = #prg_scanner_state{ns_id = NsId, ns_opts = NsOpts, rescan_timeout = RescanTimeout} + #prg_scanner_state{ns_id = NsId, ns_opts = NsOpts, rescan_timeout = RescanTimeout} = State ) -> case prg_scheduler:count_workers(NsId) of 0 -> @@ -55,15 +63,18 @@ handle_info( Calls = search_calls(N, NsId, NsOpts), Timers = search_timers(N - erlang:length(Calls), NsId, NsOpts), Tasks = Calls ++ Timers, - lists:foreach(fun(#{task_type := Type} = Task) -> - ok = prg_scheduler:push_task(NsId, header(Type), Task) - end, Tasks) + lists:foreach( + fun(#{task_type := Type} = Task) -> + ok = prg_scheduler:push_task(NsId, header(Type), Task) + end, + Tasks + ) end, _ = start_rescan_timers(RescanTimeout), {noreply, State}; handle_info( {timeout, _TimerRef, rescan_calls}, - State = #prg_scanner_state{ns_id = NsId, ns_opts = NsOpts, rescan_timeout = RescanTimeout} + #prg_scanner_state{ns_id = NsId, ns_opts = NsOpts, rescan_timeout = RescanTimeout} = State ) -> case prg_scheduler:count_workers(NsId) of 0 -> @@ -71,26 +82,29 @@ handle_info( skip; N -> Calls = search_calls(N, NsId, NsOpts), - lists:foreach(fun(#{task_type := Type} = Task) -> - ok = prg_scheduler:push_task(NsId, header(Type), Task) - end, Calls) + lists:foreach( + fun(#{task_type := Type} = Task) -> + ok = prg_scheduler:push_task(NsId, header(Type), Task) + end, + Calls + ) end, _ = start_rescan_calls((RescanTimeout div 3) + 1), {noreply, State}; handle_info( {timeout, _TimerRef, collect_zombie}, - State = #prg_scanner_state{ns_id = NsId, ns_opts = NsOpts, step_timeout = StepTimeout} + #prg_scanner_state{ns_id = NsId, ns_opts = NsOpts, step_timeout = StepTimeout} = State ) -> ok = collect_zombie(NsId, NsOpts), _ = start_zombie_collector(StepTimeout), {noreply, State}; -handle_info(_Info, State = #prg_scanner_state{}) -> +handle_info(_Info, #prg_scanner_state{} = State) -> {noreply, State}. -terminate(_Reason, _State = #prg_scanner_state{}) -> +terminate(_Reason, #prg_scanner_state{} = _State) -> ok. -code_change(_OldVsn, State = #prg_scanner_state{}, _Extra) -> +code_change(_OldVsn, #prg_scanner_state{} = State, _Extra) -> {ok, State}. %%%=================================================================== @@ -120,12 +134,15 @@ search_timers( ) -> Fun = fun() -> try - prg_storage:search_timers(StorageOpts, NsId, TimeoutSec + ScanTimeoutSec, FreeWorkersCount) of - Result when is_list(Result) -> - Result; - Unexpected -> - logger:error("search timers error: ~p", [Unexpected]), - [] + prg_storage:search_timers( + StorageOpts, NsId, TimeoutSec + ScanTimeoutSec, FreeWorkersCount + ) + of + Result when is_list(Result) -> + Result; + Unexpected -> + logger:error("search timers error: ~p", [Unexpected]), + [] catch Class:Reason:Trace -> logger:error("search timers exception: ~p", [[Class, Reason, Trace]]), @@ -163,7 +180,8 @@ collect_zombie( } ) -> Fun = fun() -> - try prg_storage:collect_zombies(StorageOpts, NsId, TimeoutSec + ScanTimeoutSec) + try + prg_storage:collect_zombies(StorageOpts, NsId, TimeoutSec + ScanTimeoutSec) catch Class:Reason:Trace -> logger:error("zombie collection exception: ~p", [[Class, Reason, Trace]]) diff --git a/src/prg_scheduler.erl b/src/prg_scheduler.erl index 9da91ca..4a95b54 100644 --- a/src/prg_scheduler.erl +++ b/src/prg_scheduler.erl @@ -5,8 +5,14 @@ -behaviour(gen_server). -export([start_link/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). %% API -export([push_task/3]). @@ -89,19 +95,23 @@ handle_call({pop_task, Worker}, _From, State) -> handle_call(count_workers, _From, #prg_scheduler_state{free_workers = Workers} = State) -> {reply, queue:len(Workers), State}; handle_call( - {capture_worker, Owner}, _From, + {capture_worker, Owner}, + _From, #prg_scheduler_state{owners = Owners} = State ) when erlang:is_map_key(Owner, Owners) -> {reply, {error, nested_capture}, State}; handle_call( - {capture_worker, Owner}, _From, + {capture_worker, Owner}, + _From, #prg_scheduler_state{owners = Owners} = State ) -> case queue:out(State#prg_scheduler_state.free_workers) of {{value, Worker}, NewWorkers} -> MRef = erlang:monitor(process, Owner), NewOwners = Owners#{Owner => {MRef, Worker}}, - {reply, {ok, Worker}, State#prg_scheduler_state{free_workers = NewWorkers, owners = NewOwners}}; + {reply, {ok, Worker}, State#prg_scheduler_state{ + free_workers = NewWorkers, owners = NewOwners + }}; {empty, _} -> {reply, {error, not_found}, State} end; @@ -114,7 +124,7 @@ handle_call({continuation_task, Task}, _From, State) -> {empty, _} -> {reply, ok, State} end; -handle_call(_Request, _From, State = #prg_scheduler_state{}) -> +handle_call(_Request, _From, #prg_scheduler_state{} = State) -> {reply, ok, State}. handle_cast({push_task, TaskHeader, Task}, State) -> @@ -122,7 +132,7 @@ handle_cast({push_task, TaskHeader, Task}, State) -> {noreply, NewState}; handle_cast( {return_worker, Owner, Worker}, - State = #prg_scheduler_state{free_workers = Workers, owners = Owners} + #prg_scheduler_state{free_workers = Workers, owners = Owners} = State ) -> case maps:get(Owner, Owners, undefined) of undefined -> @@ -135,40 +145,41 @@ handle_cast( {noreply, State#prg_scheduler_state{free_workers = NewWorkers, owners = NewOwners}}; handle_cast( {release_worker, Owner, Worker}, - State = #prg_scheduler_state{owners = Owners} + #prg_scheduler_state{owners = Owners} = State ) -> - NewState = case maps:get(Owner, Owners, undefined) of - undefined -> - State; - {Ref, Worker} -> - _ = erlang:demonitor(Ref), - State#prg_scheduler_state{owners = maps:without([Owner], Owners)} - end, + NewState = + case maps:get(Owner, Owners, undefined) of + undefined -> + State; + {Ref, Worker} -> + _ = erlang:demonitor(Ref), + State#prg_scheduler_state{owners = maps:without([Owner], Owners)} + end, {noreply, NewState}; -handle_cast(_Request, State = #prg_scheduler_state{}) -> +handle_cast(_Request, #prg_scheduler_state{} = State) -> {noreply, State}. handle_info( {'DOWN', _Ref, process, Pid, _Info}, - State = #prg_scheduler_state{owners = Owners} + #prg_scheduler_state{owners = Owners} = State ) when erlang:is_map_key(Pid, Owners) -> {noreply, State#prg_scheduler_state{owners = maps:without([Pid], Owners)}}; handle_info( {'DOWN', _Ref, process, Pid, _Info}, - State = #prg_scheduler_state{wrk_monitors = WrkMonitors, ns_id = NsId} + #prg_scheduler_state{wrk_monitors = WrkMonitors, ns_id = NsId} = State ) when erlang:is_map_key(Pid, WrkMonitors) -> WorkerSup = prg_utils:registered_name(NsId, "_worker_sup"), {ok, NewWrk} = supervisor:start_child(WorkerSup, []), MRef = erlang:monitor(process, NewWrk), NewWrkMonitors = maps:put(NewWrk, MRef, maps:without([Pid], WrkMonitors)), {noreply, State#prg_scheduler_state{wrk_monitors = NewWrkMonitors}}; -handle_info(_Info, State = #prg_scheduler_state{}) -> +handle_info(_Info, #prg_scheduler_state{} = State) -> {noreply, State}. -terminate(_Reason, _State = #prg_scheduler_state{}) -> +terminate(_Reason, #prg_scheduler_state{} = _State) -> ok. -code_change(_OldVsn, State = #prg_scheduler_state{}, _Extra) -> +code_change(_OldVsn, #prg_scheduler_state{} = State, _Extra) -> {ok, State}. %%%=================================================================== @@ -178,11 +189,15 @@ code_change(_OldVsn, State = #prg_scheduler_state{}, _Extra) -> start_workers(NsId, NsOpts) -> WorkerPoolSize = maps:get(worker_pool_size, NsOpts, ?DEFAULT_WORKER_POOL_SIZE), WorkerSup = prg_utils:registered_name(NsId, "_worker_sup"), - lists:foldl(fun(_N, Acc) -> - {ok, Pid} = supervisor:start_child(WorkerSup, []), - MRef = erlang:monitor(process, Pid), - Acc#{Pid => MRef} - end, #{}, lists:seq(1, WorkerPoolSize)). + lists:foldl( + fun(_N, Acc) -> + {ok, Pid} = supervisor:start_child(WorkerSup, []), + MRef = erlang:monitor(process, Pid), + Acc#{Pid => MRef} + end, + #{}, + lists:seq(1, WorkerPoolSize) + ). do_push_task(TaskHeader, Task, State) -> FreeWorkers = State#prg_scheduler_state.free_workers, @@ -197,7 +212,7 @@ do_push_task(TaskHeader, Task, State) -> State#prg_scheduler_state{ ready = queue:in({TaskHeader, Task}, OldReady) } - end. + end. header() -> header(<<"timeout">>). diff --git a/src/prg_storage.erl b/src/prg_storage.erl index a8927b8..29ae207 100644 --- a/src/prg_storage.erl +++ b/src/prg_storage.erl @@ -49,8 +49,12 @@ get_task_result(#{client := Handler, options := HandlerOpts}, NsId, KeyOrId) -> get_process_status(#{client := Handler, options := HandlerOpts}, NsId, Id) -> Handler:get_process_status(HandlerOpts, NsId, Id). --spec put_process_data(storage_opts(), namespace_id(), id(), - #{process := process(), init_task => task(), active_task => task()}) -> +-spec put_process_data( + storage_opts(), + namespace_id(), + id(), + #{process := process(), init_task => task(), active_task => task()} +) -> {ok, _Result} | {error, _Reason}. put_process_data(#{client := Handler, options := HandlerOpts}, NsId, Id, ProcessData) -> Handler:put_process_data(HandlerOpts, NsId, Id, ProcessData). @@ -91,14 +95,20 @@ collect_zombies(#{client := Handler, options := HandlerOpts}, NsId, Timeout) -> %% Worker functions %%%%%%%%%%%%%%%%%%% --spec complete_and_continue(storage_opts(), namespace_id(), task_result(), process(), [event()], task()) -> +-spec complete_and_continue( + storage_opts(), namespace_id(), task_result(), process(), [event()], task() +) -> {ok, [task()]}. -complete_and_continue(#{client := Handler, options := HandlerOpts}, NsId, TaskResult, Process, Events, NextTask) -> +complete_and_continue( + #{client := Handler, options := HandlerOpts}, NsId, TaskResult, Process, Events, NextTask +) -> Handler:complete_and_continue(HandlerOpts, NsId, TaskResult, Process, Events, NextTask). -spec complete_and_suspend(storage_opts(), namespace_id(), task_result(), process(), [event()]) -> {ok, [task()]}. -complete_and_suspend(#{client := Handler, options := HandlerOpts}, NsId, TaskResult, Process, Events) -> +complete_and_suspend( + #{client := Handler, options := HandlerOpts}, NsId, TaskResult, Process, Events +) -> Handler:complete_and_suspend(HandlerOpts, NsId, TaskResult, Process, Events). -spec complete_and_error(storage_opts(), namespace_id(), task_result(), process()) -> ok. @@ -107,7 +117,9 @@ complete_and_error(#{client := Handler, options := HandlerOpts}, NsId, TaskResul -spec complete_and_unlock(storage_opts(), namespace_id(), task_result(), process(), [event()]) -> {ok, [task()]}. -complete_and_unlock(#{client := Handler, options := HandlerOpts}, NsId, TaskResult, Process, Events) -> +complete_and_unlock( + #{client := Handler, options := HandlerOpts}, NsId, TaskResult, Process, Events +) -> Handler:complete_and_unlock(HandlerOpts, NsId, TaskResult, Process, Events). -spec remove_process(storage_opts(), namespace_id(), id()) -> ok | no_return(). @@ -122,7 +134,8 @@ remove_process(#{client := Handler, options := HandlerOpts}, NsId, ProcessId) -> get_task(StorageOpts, NsId, TaskId) -> get_task(internal, StorageOpts, NsId, TaskId). --spec get_task(recipient(), storage_opts(), namespace_id(), task_id()) -> {ok, task()} | {error, _Reason}. +-spec get_task(recipient(), storage_opts(), namespace_id(), task_id()) -> + {ok, task()} | {error, _Reason}. get_task(Recipient, #{client := Handler, options := HandlerOpts}, NsId, TaskId) -> Handler:get_task(Recipient, HandlerOpts, NsId, TaskId). diff --git a/src/prg_utils.erl b/src/prg_utils.erl index 5915531..3f7304a 100644 --- a/src/prg_utils.erl +++ b/src/prg_utils.erl @@ -18,8 +18,7 @@ registered_name(BaseAtom, PostfixStr) -> pipe([], Result) -> Result; pipe(_Funs, {error, _} = Error) -> Error; pipe(_Funs, {break, Result}) -> Result; -pipe([F | Rest], Acc) -> - pipe(Rest, F(Acc)). +pipe([F | Rest], Acc) -> pipe(Rest, F(Acc)). -spec format(term()) -> binary(). format(Term) when is_binary(Term) -> @@ -50,7 +49,7 @@ with_observe(Fun, MetricType, MetricKey, Labels) -> {DurationMicro, Result} = timer:tc(Fun), DurationMs = DurationMicro div 1000, logger:debug("metric: ~p, labels: ~p, value: ~p", [MetricKey, Labels, DurationMs]), - collect(MetricType, MetricKey, Labels, DurationMs), + ok = collect(MetricType, MetricKey, Labels, DurationMs), Result. collect(histogram, MetricKey, Labels, Value) -> diff --git a/src/prg_worker.erl b/src/prg_worker.erl index 4271fdb..acba026 100644 --- a/src/prg_worker.erl +++ b/src/prg_worker.erl @@ -5,8 +5,14 @@ -include("progressor.hrl"). -export([start_link/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). -export([handle_continue/2]). -export([process_task/3]). @@ -39,12 +45,14 @@ start_link(NsId, NsOpts) -> gen_server:start_link(?MODULE, [NsId, NsOpts], []). init([NsId, NsOpts]) -> - {ok, #prg_worker_state{ - ns_id = NsId, - ns_opts = NsOpts - }, {continue, do_start}}. + {ok, + #prg_worker_state{ + ns_id = NsId, + ns_opts = NsOpts + }, + {continue, do_start}}. -handle_continue(do_start, State = #prg_worker_state{ns_id = NsId}) -> +handle_continue(do_start, #prg_worker_state{ns_id = NsId} = State) -> {ok, Pid} = prg_worker_sidecar:start_link(), case prg_scheduler:pop_task(NsId, self()) of {TaskHeader, Task} -> @@ -54,16 +62,16 @@ handle_continue(do_start, State = #prg_worker_state{ns_id = NsId}) -> end, {noreply, State#prg_worker_state{sidecar_pid = Pid}}. -handle_call(_Request, _From, State = #prg_worker_state{}) -> +handle_call(_Request, _From, #prg_worker_state{} = State) -> {reply, ok, State}. handle_cast( {process_task, TaskHeader, Task}, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, process_step_timeout := TimeoutSec} = _NsOpts, sidecar_pid = Pid - } + } = State ) -> Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, ProcessId = maps:get(process_id, Task), @@ -72,9 +80,9 @@ handle_cast( {noreply, NewState}; handle_cast( {continuation_task, TaskHeader, Task}, - State = #prg_worker_state{ + #prg_worker_state{ ns_opts = #{process_step_timeout := TimeoutSec} - } + } = State ) -> Deadline = erlang:system_time(millisecond) + TimeoutSec * 1000, NewState = do_process_task(TaskHeader, Task, Deadline, State), @@ -85,13 +93,13 @@ handle_cast(next_task, #prg_worker_state{sidecar_pid = CurrentPid}) -> true = erlang:exit(CurrentPid, kill), exit(normal). -handle_info(_Info, State = #prg_worker_state{}) -> +handle_info(_Info, #prg_worker_state{} = State) -> {noreply, State}. -terminate(_Reason, _State = #prg_worker_state{}) -> +terminate(_Reason, #prg_worker_state{} = _State) -> ok. -code_change(_OldVsn, State = #prg_worker_state{}, _Extra) -> +code_change(_OldVsn, #prg_worker_state{} = State, _Extra) -> {ok, State}. %%%=================================================================== @@ -103,12 +111,12 @@ do_process_task( _TaskHeader, #{task_type := <<"remove">>} = _Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = _Process, sidecar_pid = Pid - } + } = State ) -> ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), ok = prg_worker_sidecar:remove_process(Pid, Deadline, StorageOpts, NsId, ProcessId), @@ -118,12 +126,12 @@ do_process_task( TaskHeader, Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = _NsId, ns_opts = NsOpts, process = Process, sidecar_pid = Pid - } + } = State ) -> Args = maps:get(args, Task, <<>>), Ctx = maps:get(context, Task, <<>>), @@ -137,15 +145,17 @@ handle_result( TaskHeader, #{task_id := TaskId, context := Context} = Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = Process, sidecar_pid = Pid - } + } = State ) -> Now = erlang:system_time(second), - ProcessUpdated = update_process(maps:without([detail,corrupted_by], Process#{status => <<"running">>}), Result), + ProcessUpdated = update_process( + maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result + ), Response = response(maps:get(response, Result, undefined)), TaskResult = #{ task_id => TaskId, @@ -165,13 +175,23 @@ handle_result( }, maps:with([metadata], Task) ), - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId), + ok = prg_worker_sidecar:lifecycle_sink( + Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId + ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), %% just for tests ok = maybe_wait_call(application:get_env(progressor, call_wait_timeout, undefined)), %% - SaveResult = prg_worker_sidecar:complete_and_continue(Pid, Deadline, StorageOpts, NsId, TaskResult, - ProcessUpdated, Events, NewTask), + SaveResult = prg_worker_sidecar:complete_and_continue( + Pid, + Deadline, + StorageOpts, + NsId, + TaskResult, + ProcessUpdated, + Events, + NewTask + ), _ = maybe_reply(TaskHeader, Response), case SaveResult of {ok, []} -> @@ -182,19 +202,18 @@ handle_result( ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}} end; - %% success result with undefined timer and remove action handle_result( {ok, #{action := #{remove := true}} = Result}, TaskHeader, _Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = _Process, sidecar_pid = Pid - } + } = State ) -> Response = response(maps:get(response, Result, undefined)), ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, remove, ProcessId), @@ -202,23 +221,26 @@ handle_result( _ = maybe_reply(TaskHeader, Response), ok = next_task(self()), State#prg_worker_state{process = undefined}; - %% success result with unset_timer action handle_result( {ok, #{events := Events, action := unset_timer} = Result}, TaskHeader, #{task_id := TaskId} = _Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = Process, sidecar_pid = Pid - } + } = State ) -> - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId), + ok = prg_worker_sidecar:lifecycle_sink( + Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId + ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), - ProcessUpdated = update_process(maps:without([detail,corrupted_by], Process#{status => <<"running">>}), Result), + ProcessUpdated = update_process( + maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result + ), Response = response(maps:get(response, Result, undefined)), TaskResult = #{ task_id => TaskId, @@ -226,8 +248,15 @@ handle_result( finished_time => erlang:system_time(second), status => <<"finished">> }, - SaveResult = prg_worker_sidecar:complete_and_suspend(Pid, Deadline, StorageOpts, NsId, TaskResult, - ProcessUpdated, Events), + SaveResult = prg_worker_sidecar:complete_and_suspend( + Pid, + Deadline, + StorageOpts, + NsId, + TaskResult, + ProcessUpdated, + Events + ), _ = maybe_reply(TaskHeader, Response), case SaveResult of {ok, []} -> @@ -238,24 +267,27 @@ handle_result( ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}} end; - %% success repair with corrupted task and undefined action handle_result( {ok, #{events := Events} = Result}, {repair, _} = TaskHeader, #{task_id := TaskId} = _Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId, corrupted_by := ErrorTaskId} = Process, sidecar_pid = Pid - } + } = State ) -> Now = erlang:system_time(second), - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId), + ok = prg_worker_sidecar:lifecycle_sink( + Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId + ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), - ProcessUpdated = update_process(maps:without([detail,corrupted_by], Process#{status => <<"running">>}), Result), + ProcessUpdated = update_process( + maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result + ), Response = response(maps:get(response, Result, undefined)), TaskResult = #{ task_id => TaskId, @@ -267,43 +299,63 @@ handle_result( case ErrorTask of #{task_type := Type} when Type =:= <<"timeout">>; Type =:= <<"remove">> -> %% machinegun legacy behaviour - NewTask0 = maps:with([process_id, task_type, scheduled_time, args, metadata, context], ErrorTask), + NewTask0 = maps:with( + [process_id, task_type, scheduled_time, args, metadata, context], ErrorTask + ), NewTask = NewTask0#{ status => <<"running">>, running_time => Now, last_retry_interval => 0, attempts_count => 0 }, - {ok, [ContinuationTask | _]} = prg_worker_sidecar:complete_and_continue(Pid, Deadline, StorageOpts, NsId, - TaskResult, ProcessUpdated, Events, NewTask), + {ok, [ContinuationTask | _]} = prg_worker_sidecar:complete_and_continue( + Pid, + Deadline, + StorageOpts, + NsId, + TaskResult, + ProcessUpdated, + Events, + NewTask + ), _ = maybe_reply(TaskHeader, Response), NewHistory = maps:get(history, Process) ++ Events, ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}}; _ -> - {ok, []} = prg_worker_sidecar:complete_and_unlock(Pid, Deadline, StorageOpts, NsId, TaskResult, - ProcessUpdated, Events), + {ok, []} = prg_worker_sidecar:complete_and_unlock( + Pid, + Deadline, + StorageOpts, + NsId, + TaskResult, + ProcessUpdated, + Events + ), _ = maybe_reply(TaskHeader, Response), ok = next_task(self()), State#prg_worker_state{process = undefined} end; - %% success result with undefined action handle_result( {ok, #{events := Events} = Result}, TaskHeader, #{task_id := TaskId} = _Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = Process, sidecar_pid = Pid - } + } = State ) -> - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId), + ok = prg_worker_sidecar:lifecycle_sink( + Pid, Deadline, NsOpts, extract_task_type(TaskHeader), ProcessId + ), ok = prg_worker_sidecar:event_sink(Pid, Deadline, NsOpts, ProcessId, Events), - ProcessUpdated = update_process(maps:without([detail,corrupted_by], Process#{status => <<"running">>}), Result), + ProcessUpdated = update_process( + maps:without([detail, corrupted_by], Process#{status => <<"running">>}), Result + ), Response = response(maps:get(response, Result, undefined)), TaskResult = #{ task_id => TaskId, @@ -311,8 +363,15 @@ handle_result( finished_time => erlang:system_time(second), status => <<"finished">> }, - SaveResult = prg_worker_sidecar:complete_and_unlock(Pid, Deadline, StorageOpts, NsId, TaskResult, - ProcessUpdated, Events), + SaveResult = prg_worker_sidecar:complete_and_unlock( + Pid, + Deadline, + StorageOpts, + NsId, + TaskResult, + ProcessUpdated, + Events + ), _ = maybe_reply(TaskHeader, Response), case SaveResult of {ok, []} -> @@ -323,56 +382,59 @@ handle_result( ok = continuation_task(self(), create_header(ContinuationTask), ContinuationTask), State#prg_worker_state{process = ProcessUpdated#{history => NewHistory}} end; - %% calls processing error handle_result( {error, Reason} = Response, {TaskType, _} = TaskHeader, #{task_id := TaskId} = _Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts} = NsOpts, process = #{process_id := ProcessId} = Process, sidecar_pid = Pid - } + } = State ) when TaskType =:= init; TaskType =:= call; TaskType =:= notify; TaskType =:= repair -> - ProcessUpdated = case TaskType of - repair -> - Process; - _ -> - Detail = prg_utils:format(Reason), - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, {error, Detail}, ProcessId), - Process#{status => <<"error">>, detail => Detail} - end, + ProcessUpdated = + case TaskType of + repair -> + Process; + _ -> + Detail = prg_utils:format(Reason), + ok = prg_worker_sidecar:lifecycle_sink( + Pid, Deadline, NsOpts, {error, Detail}, ProcessId + ), + Process#{status => <<"error">>, detail => Detail} + end, TaskResult = #{ task_id => TaskId, response => term_to_binary(Response), finished_time => erlang:system_time(second), status => <<"error">> }, - ok = prg_worker_sidecar:complete_and_error(Pid, Deadline, StorageOpts, NsId, TaskResult, ProcessUpdated), + ok = prg_worker_sidecar:complete_and_error( + Pid, Deadline, StorageOpts, NsId, TaskResult, ProcessUpdated + ), _ = maybe_reply(TaskHeader, Response), ok = next_task(self()), State#prg_worker_state{process = undefined}; - %% timeout/remove processing error handle_result( {error, Reason} = Response, {TaskType, _} = TaskHeader, #{task_id := TaskId} = Task, Deadline, - State = #prg_worker_state{ + #prg_worker_state{ ns_id = NsId, ns_opts = #{storage := StorageOpts, retry_policy := RetryPolicy} = NsOpts, process = #{process_id := ProcessId} = Process, sidecar_pid = Pid - } + } = State ) when TaskType =:= timeout; TaskType =:= remove -> TaskResult = #{ task_id => TaskId, @@ -380,25 +442,42 @@ handle_result( finished_time => erlang:system_time(second), status => <<"error">> }, - case check_retryable(TaskHeader, Task, RetryPolicy, Reason) of - not_retryable -> - Detail = prg_utils:format(Reason), - ProcessUpdated = Process#{status => <<"error">>, detail => Detail, corrupted_by => TaskId}, - ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, {error, Detail}, ProcessId), - ok = prg_worker_sidecar:complete_and_error(Pid, Deadline, StorageOpts, NsId, TaskResult, ProcessUpdated); - NewTask -> - {ok, _} = prg_worker_sidecar:complete_and_continue(Pid, Deadline, StorageOpts, NsId, - TaskResult, Process, [], NewTask) - end, + _ = + case check_retryable(TaskHeader, Task, RetryPolicy, Reason) of + not_retryable -> + Detail = prg_utils:format(Reason), + ProcessUpdated = Process#{ + status => <<"error">>, detail => Detail, corrupted_by => TaskId + }, + ok = prg_worker_sidecar:lifecycle_sink(Pid, Deadline, NsOpts, {error, Detail}, ProcessId), + ok = prg_worker_sidecar:complete_and_error( + Pid, Deadline, StorageOpts, NsId, TaskResult, ProcessUpdated + ); + NewTask -> + {ok, _} = prg_worker_sidecar:complete_and_continue( + Pid, + Deadline, + StorageOpts, + NsId, + TaskResult, + Process, + [], + NewTask + ) + end, ok = next_task(self()), State#prg_worker_state{process = undefined}. update_process(Process, Result) -> - maps:fold(fun - (metadata, Meta, Acc) -> Acc#{metadata => Meta}; - (aux_state, AuxState, Acc) -> Acc#{aux_state => AuxState}; - (_, _, Acc) -> Acc - end, Process, Result). + maps:fold( + fun + (metadata, Meta, Acc) -> Acc#{metadata => Meta}; + (aux_state, AuxState, Acc) -> Acc#{aux_state => AuxState}; + (_, _, Acc) -> Acc + end, + Process, + Result + ). -spec maybe_reply(task_header(), term()) -> term(). maybe_reply({_, undefined}, _) -> @@ -427,7 +506,16 @@ check_retryable(TaskHeader, #{last_retry_interval := LastInterval} = Task, Retry case is_retryable(Error, TaskHeader, RetryPolicy, Timeout, Attempts) of true -> maps:with( - [process_id, task_type, status, scheduled_time, args, last_retry_interval, attempts_count, metadata], + [ + process_id, + task_type, + status, + scheduled_time, + args, + last_retry_interval, + attempts_count, + metadata + ], Task#{ status => <<"waiting">>, scheduled_time => Now + Timeout, @@ -458,7 +546,6 @@ is_retryable( Timeout < maps:get(max_timeout, RetryPolicy, infinity) andalso Attempts < maps:get(max_attempts, RetryPolicy, infinity) andalso not lists:any(fun(E) -> Error =:= E end, maps:get(non_retryable_errors, RetryPolicy, [])); - is_retryable({exception, _, _} = _Error, _TaskHeader, _RetryPolicy, _Timeout, _Attempts) -> false; is_retryable(Error, {timeout, undefined}, RetryPolicy, Timeout, Attempts) -> diff --git a/src/prg_worker_sidecar.erl b/src/prg_worker_sidecar.erl index f7e1e5f..ea3f5c7 100644 --- a/src/prg_worker_sidecar.erl +++ b/src/prg_worker_sidecar.erl @@ -5,8 +5,14 @@ -include("progressor.hrl"). -export([start_link/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). %% Processor functions wrapper -export([process/5]). @@ -68,7 +74,9 @@ complete_and_continue(Pid, _Deadline, StorageOpts, NsId, TaskResult, Process, Ev infinity ) end, - prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_continue"]). + prg_utils:with_observe(Fun, ?COMPLETION_KEY, [ + erlang:atom_to_list(NsId), "complete_and_continue" + ]). -spec complete_and_suspend( pid(), @@ -84,7 +92,8 @@ complete_and_suspend(Pid, _Deadline, StorageOpts, NsId, TaskResult, Process, Eve Fun = fun() -> gen_server:call( Pid, - {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events}, infinity + {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events}, + infinity ) end, prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_suspend"]). @@ -103,19 +112,23 @@ complete_and_unlock(Pid, _Deadline, StorageOpts, NsId, TaskResult, Process, Even Fun = fun() -> gen_server:call( Pid, - {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events}, infinity + {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events}, + infinity ) end, prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_unlock"]). --spec complete_and_error(pid(), timestamp_ms(), storage_opts(), namespace_id(), task_result(), process()) -> +-spec complete_and_error( + pid(), timestamp_ms(), storage_opts(), namespace_id(), task_result(), process() +) -> ok | no_return(). complete_and_error(Pid, _Deadline, StorageOpts, NsId, TaskResult, Process) -> %% Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> gen_server:call( Pid, - {complete_and_error, StorageOpts, NsId, TaskResult, Process}, infinity + {complete_and_error, StorageOpts, NsId, TaskResult, Process}, + infinity ) end, prg_utils:with_observe(Fun, ?COMPLETION_KEY, [erlang:atom_to_list(NsId), "complete_and_unlock"]). @@ -132,21 +145,21 @@ remove_process(Pid, _Deadline, StorageOpts, NsId, ProcessId) -> %% notifier wrappers -spec event_sink(pid(), timestamp_ms(), namespace_opts(), id(), [event()]) -> ok | no_return(). -event_sink(Pid, Deadline, #{namespace := Ns} = NsOpts, ProcessId, Events) -> +event_sink(Pid, Deadline, #{namespace := NS} = NsOpts, ProcessId, Events) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> gen_server:call(Pid, {event_sink, NsOpts, ProcessId, Events}, Timeout) end, - prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [Ns, "event_sink"]). + prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "event_sink"]). -spec lifecycle_sink(pid(), timestamp_ms(), namespace_opts(), task_t() | {error, _Reason}, id()) -> ok | no_return(). -lifecycle_sink(Pid, Deadline, #{namespace := Ns} = NsOpts, TaskType, ProcessId) -> +lifecycle_sink(Pid, Deadline, #{namespace := NS} = NsOpts, TaskType, ProcessId) -> Timeout = Deadline - erlang:system_time(millisecond), Fun = fun() -> gen_server:call(Pid, {lifecycle_sink, NsOpts, TaskType, ProcessId}, Timeout) end, - prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [Ns, "lifecycle_sink"]). + prg_utils:with_observe(Fun, ?NOTIFICATION_KEY, [NS, "lifecycle_sink"]). %% -spec get_process(pid(), timestamp_ms(), storage_opts(), namespace_id(), id()) -> @@ -179,12 +192,14 @@ handle_call( Ctx }, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Response = try Handler:process(Request, Options, Ctx) of - {ok, _Result} = OK -> OK; - {error, _Reason} = ERR -> ERR; + {ok, _Result} = OK -> + OK; + {error, _Reason} = ERR -> + ERR; Unsupported -> logger:error("processor unexpected result: ~p", [Unsupported]), {error, <<"unsupported_result">>} @@ -194,104 +209,95 @@ handle_call( {error, {exception, Class, Term}} end, {reply, Response, State}; - handle_call( {complete_and_continue, StorageOpts, NsId, TaskResult, Process, Events, Task}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:complete_and_continue(StorageOpts, NsId, TaskResult, Process, Events, Task) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call( {remove_process, StorageOpts, NsId, ProcessId}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:remove_process(StorageOpts, NsId, ProcessId) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call( {get_process, StorageOpts, NsId, ProcessId}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:get_process(StorageOpts, NsId, ProcessId) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call( {get_task, StorageOpts, NsId, TaskId}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:get_task(StorageOpts, NsId, TaskId) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call( {complete_and_suspend, StorageOpts, NsId, TaskResult, Process, Events}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:complete_and_suspend(StorageOpts, NsId, TaskResult, Process, Events) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call( {complete_and_unlock, StorageOpts, NsId, TaskResult, Process, Events}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:complete_and_unlock(StorageOpts, NsId, TaskResult, Process, Events) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call( {complete_and_error, StorageOpts, NsId, TaskResult, Process}, _From, - State = #prg_sidecar_state{} + #prg_sidecar_state{} = State ) -> Fun = fun() -> prg_storage:complete_and_error(StorageOpts, NsId, TaskResult, Process) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call({event_sink, NsOpts, ProcessId, Events}, _From, State) -> Fun = fun() -> prg_notifier:event_sink(NsOpts, ProcessId, Events) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}; - handle_call({lifecycle_sink, NsOpts, TaskType, ProcessId}, _From, State) -> Fun = fun() -> prg_notifier:lifecycle_sink(NsOpts, TaskType, ProcessId) end, Response = do_with_retry(Fun, ?DEFAULT_DELAY), {reply, Response, State}. -handle_cast(_Request, State = #prg_sidecar_state{}) -> +handle_cast(_Request, #prg_sidecar_state{} = State) -> {noreply, State}. -handle_info(_Info, State = #prg_sidecar_state{}) -> +handle_info(_Info, #prg_sidecar_state{} = State) -> {noreply, State}. -terminate(_Reason, _State = #prg_sidecar_state{}) -> +terminate(_Reason, #prg_sidecar_state{} = _State) -> ok. -code_change(_OldVsn, State = #prg_sidecar_state{}, _Extra) -> +code_change(_OldVsn, #prg_sidecar_state{} = State, _Extra) -> {ok, State}. %%%=================================================================== diff --git a/src/prg_worker_sup.erl b/src/prg_worker_sup.erl index b59aacf..2fb3ae6 100644 --- a/src/prg_worker_sup.erl +++ b/src/prg_worker_sup.erl @@ -12,7 +12,7 @@ %%% API functions %%%=================================================================== --spec(start_link(term()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +-spec start_link(term()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. start_link({NsId, _} = NS) -> RegName = prg_utils:registered_name(NsId, "_worker_sup"), supervisor:start_link({local, RegName}, ?MODULE, NS). diff --git a/src/progressor.app.src b/src/progressor.app.src index b1602b5..d713868 100644 --- a/src/progressor.app.src +++ b/src/progressor.app.src @@ -1,21 +1,21 @@ -{application, progressor, - [{description, "An OTP application"}, - {vsn, "0.1.0"}, - {registered, []}, - {mod, {progressor_app, []}}, - {applications, - [kernel, - stdlib, - jsx, - prometheus, - epg_connector, - thrift, - mg_proto, - brod - ]}, - {env,[]}, - {modules, []}, +{application, progressor, [ + {description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {progressor_app, []}}, + {applications, [ + kernel, + stdlib, + jsx, + prometheus, + epg_connector, + thrift, + mg_proto, + brod + ]}, + {env, []}, + {modules, []}, - {licenses, ["MIT"]}, - {links, []} - ]}. + {licenses, ["MIT"]}, + {links, []} +]}. diff --git a/src/progressor.erl b/src/progressor.erl index a5ad987..46ca3ea 100644 --- a/src/progressor.erl +++ b/src/progressor.erl @@ -37,58 +37,76 @@ reply(Pid, Msg) -> %% API -spec init(request()) -> {ok, _Result} | {error, _Reason}. init(Req) -> - prg_utils:pipe([ - fun add_ns_opts/1, - fun check_idempotency/1, - fun add_task/1, - fun(Data) -> prepare(fun prg_storage:prepare_init/4, Data) end, - fun process_call/1 - ], Req#{type => init}). + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun check_idempotency/1, + fun add_task/1, + fun(Data) -> prepare(fun prg_storage:prepare_init/4, Data) end, + fun process_call/1 + ], + Req#{type => init} + ). -spec call(request()) -> {ok, _Result} | {error, _Reason}. call(Req) -> - prg_utils:pipe([ - fun add_ns_opts/1, - fun check_idempotency/1, - fun(Data) -> check_process_status(Data, <<"running">>) end, - fun add_task/1, - fun(Data) -> prepare(fun prg_storage:prepare_call/4, Data) end, - fun process_call/1 - ], Req#{type => call}). + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun check_idempotency/1, + fun(Data) -> check_process_status(Data, <<"running">>) end, + fun add_task/1, + fun(Data) -> prepare(fun prg_storage:prepare_call/4, Data) end, + fun process_call/1 + ], + Req#{type => call} + ). -spec repair(request()) -> {ok, _Result} | {error, _Reason}. repair(Req) -> - prg_utils:pipe([ - fun add_ns_opts/1, - fun check_idempotency/1, - fun(Data) -> check_process_status(Data, <<"error">>) end, - fun add_task/1, - fun(Data) -> prepare(fun prg_storage:prepare_repair/4, Data) end, - fun process_call/1 - ], Req#{type => repair}). + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun check_idempotency/1, + fun(Data) -> check_process_status(Data, <<"error">>) end, + fun add_task/1, + fun(Data) -> prepare(fun prg_storage:prepare_repair/4, Data) end, + fun process_call/1 + ], + Req#{type => repair} + ). -spec get(request()) -> {ok, _Result} | {error, _Reason}. get(Req) -> - prg_utils:pipe([ - fun add_ns_opts/1, - fun do_get/1 - ], Req). + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun do_get/1 + ], + Req + ). -spec put(request()) -> {ok, _Result} | {error, _Reason}. put(Req) -> - prg_utils:pipe([ - fun add_ns_opts/1, - fun do_put/1 - ], Req). + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun do_put/1 + ], + Req + ). %-ifdef(TEST). -spec cleanup(_) -> _. cleanup(Opts) -> - prg_utils:pipe([ - fun add_ns_opts/1, - fun cleanup_storage/1 - ], Opts). + prg_utils:pipe( + [ + fun add_ns_opts/1, + fun cleanup_storage/1 + ], + Opts + ). cleanup_storage(#{ns := NsId, ns_opts := #{storage := StorageOpts}}) -> ok = prg_storage:cleanup(StorageOpts, NsId). @@ -126,14 +144,20 @@ add_task(#{id := Id, type := Type} = Opts) -> Task = make_task(maybe_add_idempotency(TaskData, maps:get(idempotency_key, Opts, undefined))), Opts#{task => Task}. -check_process_status(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId} = Opts, ExpectedStatus) -> +check_process_status( + #{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId} = Opts, ExpectedStatus +) -> case prg_storage:get_process_status(StorageOpts, NsId, Id) of {ok, ExpectedStatus} -> Opts; {ok, OtherStatus} -> {error, <<"process is ", OtherStatus/binary>>}; {error, _} = Error -> Error end. -prepare(Fun, #{ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, id := ProcessId, task := Task} = Req) -> +prepare( + Fun, + #{ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, id := ProcessId, task := Task} = + Req +) -> Worker = capture_worker(NsId), TaskStatus = check_for_run(Worker), TaskType = maps:get(task_type, Task), @@ -159,7 +183,9 @@ prepare(Fun, #{ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, id := Error end. -get_task_result(#{ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, idempotency_key := IdempotencyKey}) -> +get_task_result(#{ + ns_opts := #{storage := StorageOpts} = NsOpts, ns := NsId, idempotency_key := IdempotencyKey +}) -> case prg_storage:get_task_result(StorageOpts, NsId, {idempotency_key, IdempotencyKey}) of {ok, Result} -> Result; @@ -179,7 +205,9 @@ await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration) -> Result; {error, _} -> timer:sleep(?TASK_REPEAT_REQUEST_TIMEOUT), - await_task_result(StorageOpts, NsId, KeyOrId, Timeout, Duration + ?TASK_REPEAT_REQUEST_TIMEOUT) + await_task_result( + StorageOpts, NsId, KeyOrId, Timeout, Duration + ?TASK_REPEAT_REQUEST_TIMEOUT + ) end. do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := HistoryRange}) -> @@ -187,7 +215,14 @@ do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := His do_get(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId}) -> prg_storage:get_process(external, StorageOpts, NsId, Id, #{}). -do_put(#{ns_opts := #{storage := StorageOpts}, id := Id, ns := NsId, args := #{process := Process} = Args}= Opts) -> +do_put( + #{ + ns_opts := #{storage := StorageOpts}, + id := Id, + ns := NsId, + args := #{process := Process} = Args + } = Opts +) -> #{ process_id := ProcessId } = Process, @@ -298,10 +333,11 @@ action_to_task(undefined, _ProcessId, _Ctx) -> action_to_task(unset_timer, _ProcessId, _Ctx) -> undefined; action_to_task(#{set_timer := Timestamp} = Action, ProcessId, Context) -> - TaskType = case maps:get(remove, Action, false) of - true -> <<"remove">>; - false -> <<"timeout">> - end, + TaskType = + case maps:get(remove, Action, false) of + true -> <<"remove">>; + false -> <<"timeout">> + end, #{ process_id => ProcessId, task_type => TaskType, diff --git a/src/progressor_app.erl b/src/progressor_app.erl index bd22a36..6176cdf 100644 --- a/src/progressor_app.erl +++ b/src/progressor_app.erl @@ -19,7 +19,6 @@ stop(_State) -> %% internal functions create_metrics() -> - _ = prometheus_histogram:new([ {name, progressor_calls_scanning_duration_ms}, {help, "Calls (call, repair) scanning durations in millisecond"}, diff --git a/src/progressor_sup.erl b/src/progressor_sup.erl index ce978a4..9991c87 100644 --- a/src/progressor_sup.erl +++ b/src/progressor_sup.erl @@ -14,9 +14,11 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> - SupFlags = #{strategy => one_for_one, + SupFlags = #{ + strategy => one_for_one, intensity => 0, - period => 1}, + period => 1 + }, ChildSpecs = maps:fold( fun(ID, NsOpts, Acc) -> FullOpts = prg_utils:make_ns_opts(ID, NsOpts), @@ -26,7 +28,8 @@ init([]) -> id => ID, start => {prg_namespace_sup, start_link, [NS]}, type => supervisor - } | Acc + } + | Acc ] end, [], diff --git a/src/storage/postgres/prg_pg_backend.erl b/src/storage/postgres/prg_pg_backend.erl index 9b27053..7c39694 100644 --- a/src/storage/postgres/prg_pg_backend.erl +++ b/src/storage/postgres/prg_pg_backend.erl @@ -36,7 +36,8 @@ -type pg_opts() :: #{pool := atom()}. --define(PROTECT_TIMEOUT, 5). %% second +%% second +-define(PROTECT_TIMEOUT, 5). -spec get_task_result(pg_opts(), namespace_id(), {task_id | idempotency_key, binary()}) -> {ok, term()} | {error, _Reason}. @@ -90,9 +91,11 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> "SELECT pr.process_id, pr.status, pr.detail, pr.aux_state, pr.metadata as p_meta, pr.corrupted_by," " ev.event_id, ev.timestamp, ev.metadata, ev.payload " "FROM " - " (SELECT * FROM " ++ ProcessesTable ++ " WHERE process_id = $1) AS pr " - " LEFT JOIN (SELECT * FROM " ++ EventsTable ++ RangeCondition ++ " ORDER BY event_id ASC) AS ev " - " ON ev.process_id = pr.process_id ", + " (SELECT * FROM " ++ ProcessesTable ++ + " WHERE process_id = $1) AS pr " + " LEFT JOIN (SELECT * FROM " ++ EventsTable ++ RangeCondition ++ + " ORDER BY event_id ASC) AS ev " + " ON ev.process_id = pr.process_id ", [ProcessId] ), case Rows of @@ -107,7 +110,10 @@ get_process(Recipient, PgOpts, NsId, ProcessId, HistoryRange) -> {ok, Proc#{history => Events}} end. --spec put_process_data(pg_opts(), namespace_id(), id(), +-spec put_process_data( + pg_opts(), + namespace_id(), + id(), #{process := process(), init_task := task(), active_task => task() | undefined} ) -> {ok, _Result} | {error, _Reason}. @@ -132,9 +138,12 @@ put_process_data(PgOpts, NsId, ProcessId, ProcessData) -> case do_save_process(Connection, ProcessesTable, Process) of {ok, _} -> {ok, _, _, [{InitTaskId}]} = do_save_task(Connection, TaskTable, InitTask), - lists:foreach(fun(Ev) -> - {ok, _} = do_save_event(Connection, EventsTable, ProcessId, InitTaskId, Ev) - end, Events), + lists:foreach( + fun(Ev) -> + {ok, _} = do_save_event(Connection, EventsTable, ProcessId, InitTaskId, Ev) + end, + Events + ), ok = maybe_schedule_task(Connection, TaskTable, ScheduleTable, ActiveTask), {ok, ok}; {error, #error{codename = unique_violation}} -> @@ -156,11 +165,15 @@ remove_process(PgOpts, NsId, ProcessId) -> epg_pool:transaction( Pool, fun(Connection) -> - {ok, _R} = epg_pool:query(Connection, "DELETE FROM " ++ RunningTable ++ " WHERE process_id = $1", [ProcessId]), - {ok, _S} = epg_pool:query(Connection, "DELETE FROM " ++ ScheduleTable ++ " WHERE process_id = $1", [ProcessId]), - {ok, _E} = epg_pool:query(Connection, "DELETE FROM " ++ EventsTable ++ " WHERE process_id = $1", [ProcessId]), + {ok, _R} = + epg_pool:query(Connection, "DELETE FROM " ++ RunningTable ++ " WHERE process_id = $1", [ProcessId]), + {ok, _S} = + epg_pool:query(Connection, "DELETE FROM " ++ ScheduleTable ++ " WHERE process_id = $1", [ProcessId]), + {ok, _E} = + epg_pool:query(Connection, "DELETE FROM " ++ EventsTable ++ " WHERE process_id = $1", [ProcessId]), {ok, _T} = epg_pool:query(Connection, "DELETE FROM " ++ TaskTable ++ " WHERE process_id = $1", [ProcessId]), - {ok, _P} = epg_pool:query(Connection, "DELETE FROM " ++ ProcessesTable ++ " WHERE process_id = $1", [ProcessId]) + {ok, _P} = + epg_pool:query(Connection, "DELETE FROM " ++ ProcessesTable ++ " WHERE process_id = $1", [ProcessId]) end ), ok. @@ -182,13 +195,20 @@ collect_zombies(PgOpts, NsId, Timeout) -> {ok, _, _} = epg_pool:query( Connection, "WITH zombie_tasks as (" - " DELETE FROM " ++ RunningTable ++ " WHERE running_time < $1 " - " RETURNING process_id, task_id" - " ), " - " t1 AS (UPDATE " ++ TaskTable ++ " SET status = 'cancelled' WHERE process_id IN (SELECT process_id FROM zombie_tasks))" - " t2 AS (UPDATE " ++ TaskTable ++ " SET status = 'error', finished_time = $2 WHERE task_id IN (SELECT task_id FROM zombie_tasks))" - "MERGE INTO " ++ ProcessesTable ++ " AS pt USING zombie_tasks AS zt ON pt.process_id = zt.process_id " - " WHEN MATCHED THEN UPDATE SET status = 'error', detail = 'zombie detected', corrupted_by = zt.task_id", + " DELETE FROM " ++ RunningTable ++ + " WHERE running_time < $1 " + " RETURNING process_id, task_id" + " ), " + " t1 AS (UPDATE " ++ TaskTable ++ + " SET status = 'cancelled' WHERE process_id IN (SELECT process_id FROM zombie_tasks))" + " t2 AS (UPDATE " ++ TaskTable ++ + " SET status = 'error', finished_time = $2 WHERE task_id IN (SELECT task_id FROM zombie_tasks))" + "MERGE INTO " ++ ProcessesTable ++ + " AS pt USING zombie_tasks AS zt ON pt.process_id = zt.process_id " + " WHEN MATCHED THEN UPDATE SET" + " status = 'error'," + " detail = 'zombie detected'," + " corrupted_by = zt.task_id", [TsBackward, Now] ) end @@ -205,27 +225,34 @@ search_timers(PgOpts, NsId, _Timeout, Limit) -> NowSec = erlang:system_time(second), Now = unixtime_to_datetime(NowSec), NowText = unixtime_to_text(NowSec), - {ok, _, Columns, Rows} = _Res = epg_pool:transaction( - Pool, - fun(Connection) -> - {ok, _, _, _} = epg_pool:query( - Connection, - "WITH tasks_for_run as(" - " DELETE FROM " ++ ScheduleTable ++ " WHERE task_id IN " - " (SELECT task_id FROM " ++ ScheduleTable ++ " WHERE status = 'waiting' AND scheduled_time <= $1 " - " AND task_type IN ('timeout', 'remove') AND process_id NOT IN (SELECT process_id FROM " ++ RunningTable ++ " )" - " ORDER BY scheduled_time ASC LIMIT $3)" - " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " - " last_retry_interval, attempts_count, context" - " ) " - "INSERT INTO " ++ RunningTable ++ - " (task_id, process_id, task_type, status, scheduled_time, running_time, args, metadata, last_retry_interval, attempts_count, context) " - " SELECT * FROM tasks_for_run RETURNING *", - [Now, NowText, Limit] - ) - end - ), + {ok, _, Columns, Rows} = + _Res = epg_pool:transaction( + Pool, + fun(Connection) -> + {ok, _, _, _} = epg_pool:query( + Connection, + "WITH tasks_for_run as(" + " DELETE FROM " ++ ScheduleTable ++ + " WHERE task_id IN " + " (SELECT task_id FROM " ++ ScheduleTable ++ + " WHERE status = 'waiting' AND scheduled_time <= $1 " + " AND task_type IN ('timeout', 'remove') AND process_id NOT IN (SELECT process_id FROM " ++ + RunningTable ++ + " )" + " ORDER BY scheduled_time ASC LIMIT $3)" + " RETURNING" + " task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " + " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " last_retry_interval, attempts_count, context" + " ) " + "INSERT INTO " ++ RunningTable ++ + " (task_id, process_id, task_type, status, scheduled_time, running_time," + " args, metadata, last_retry_interval, attempts_count, context) " + " SELECT * FROM tasks_for_run RETURNING *", + [Now, NowText, Limit] + ) + end + ), to_maps(Columns, Rows, fun marshal_task/1). -spec search_calls(pg_opts(), namespace_id(), pos_integer()) -> [task()]. @@ -243,18 +270,22 @@ search_calls(PgOpts, NsId, Limit) -> {ok, _, _, _} = epg_pool:query( Connection, "WITH tasks_for_run as(" - " DELETE FROM " ++ ScheduleTable ++ " WHERE task_id IN " - " (SELECT min(task_id) FROM " ++ ScheduleTable ++ " WHERE status = 'waiting' AND task_type IN ('init', 'call', 'repair')" - " AND process_id NOT IN (SELECT process_id FROM " ++ RunningTable ++ " )" - " GROUP BY process_id ORDER BY min ASC LIMIT $2" - " ) " - " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($1, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " - " last_retry_interval, attempts_count, context" - " ) " - "INSERT INTO " ++ RunningTable ++ - " (task_id, process_id, task_type, status, scheduled_time, running_time, args, metadata, last_retry_interval, attempts_count, context) " - " SELECT * FROM tasks_for_run RETURNING *", + " DELETE FROM " ++ ScheduleTable ++ + " WHERE task_id IN " + " (SELECT min(task_id) FROM " ++ ScheduleTable ++ + " WHERE status = 'waiting' AND task_type IN ('init', 'call', 'repair')" + " AND process_id NOT IN (SELECT process_id FROM " ++ RunningTable ++ + " )" + " GROUP BY process_id ORDER BY min ASC LIMIT $2" + " ) " + " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " + " TO_TIMESTAMP($1, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " last_retry_interval, attempts_count, context" + " ) " + "INSERT INTO " ++ RunningTable ++ + " (task_id, process_id, task_type, status, scheduled_time, running_time, args," + " metadata, last_retry_interval, attempts_count, context) " + " SELECT * FROM tasks_for_run RETURNING *", [Now, Limit] ) end @@ -322,7 +353,8 @@ prepare_call(PgOpts, NsId, ProcessId, #{status := <<"running">>} = Task) -> {ok, _BlockedTaskId} = do_block_timer(C, ScheduleTable, ProcessId), case do_save_running(C, RunningTable, Task#{task_id => TaskId}) of {ok, _, _, []} -> - {ok, _, _, _} = do_save_schedule(C, ScheduleTable, Task#{task_id => TaskId, status => <<"waiting">>}), + {ok, _, _, _} = + do_save_schedule(C, ScheduleTable, Task#{task_id => TaskId, status => <<"waiting">>}), {ok, {postpone, TaskId}}; {ok, _, _, [{TaskId}]} -> {ok, {continue, TaskId}} @@ -387,12 +419,19 @@ complete_and_continue(PgOpts, NsId, TaskResult, Process, Events, NextTask) -> fun(Connection) -> {ok, _} = do_update_process(Connection, ProcessesTable, Process), %% TODO implement via batch execute - lists:foreach(fun(Ev) -> - {ok, _} = do_save_event(Connection, EventsTable, ProcessId, TaskId, Ev) - end, Events), + lists:foreach( + fun(Ev) -> + {ok, _} = do_save_event(Connection, EventsTable, ProcessId, TaskId, Ev) + end, + Events + ), {ok, _, _} = do_cancel_timer(Connection, TaskTable, ScheduleTable, ProcessId), {ok, _, _, [{NextTaskId}]} = do_save_task(Connection, TaskTable, NextTask), - case do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult#{process_id => ProcessId}) of + case + do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult#{ + process_id => ProcessId + }) + of {ok, _, _, []} -> %% continuation call not exist case NextTask of @@ -442,9 +481,12 @@ complete_and_suspend(PgOpts, NsId, TaskResult, Process, Events) -> Pool, fun(Connection) -> {ok, _} = do_update_process(Connection, ProcessesTable, Process), - lists:foreach(fun(Ev) -> - {ok, _} = do_save_event(Connection, EventsTable, ProcessId, TaskId, Ev) - end, Events), + lists:foreach( + fun(Ev) -> + {ok, _} = do_save_event(Connection, EventsTable, ProcessId, TaskId, Ev) + end, + Events + ), {ok, _, _} = do_cancel_timer(Connection, TaskTable, ScheduleTable, ProcessId), do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult#{process_id => ProcessId}) end @@ -466,7 +508,7 @@ complete_and_error(PgOpts, NsId, TaskResult, Process) -> fun(Connection) -> {ok, 1} = do_update_process(Connection, ProcessesTable, Process), {ok, _, _} = do_cancel_timer(Connection, TaskTable, ScheduleTable, ProcessId), - {ok, _, _} = do_cancel_calls(Connection, TaskTable, ScheduleTable, ProcessId), + {ok, _, _} = do_cancel_calls(Connection, TaskTable, ScheduleTable, ProcessId), {ok, _, _, _} = do_complete_task( Connection, TaskTable, @@ -496,9 +538,12 @@ complete_and_unlock(PgOpts, NsId, TaskResult, Process, Events) -> Pool, fun(Connection) -> {ok, _} = do_update_process(Connection, ProcessesTable, Process), - lists:foreach(fun(Ev) -> - {ok, _} = do_save_event(Connection, EventsTable, ProcessId, TaskId, Ev) - end, Events), + lists:foreach( + fun(Ev) -> + {ok, _} = do_save_event(Connection, EventsTable, ProcessId, TaskId, Ev) + end, + Events + ), Completion = do_complete_task( Connection, TaskTable, @@ -506,13 +551,14 @@ complete_and_unlock(PgOpts, NsId, TaskResult, Process, Events) -> RunningTable, TaskResult#{process_id => ProcessId} ), - case Completion of - {ok, _, _, []} -> - {ok, _} = do_unlock_timer(Connection, ScheduleTable, ProcessId); - {ok, _, _Col, _Row} -> - %% if postponed call exists then timer remain blocked - do_nothing - end, + _ = + case Completion of + {ok, _, _, []} -> + {ok, _} = do_unlock_timer(Connection, ScheduleTable, ProcessId); + {ok, _, _Col, _Row} -> + %% if postponed call exists then timer remain blocked + do_nothing + end, Completion end ), @@ -535,133 +581,145 @@ db_init(#{pool := Pool}, NsId) -> Connection, "select exists (select 1 from pg_type where typname = 'process_status')" ), - case IsProcessStatusExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TYPE process_status AS ENUM ('running', 'error')" - ) - end, + _ = + case IsProcessStatusExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TYPE process_status AS ENUM ('running', 'error')" + ) + end, %% create type task_status if not exists {ok, _, [{IsTaskStatusExists}]} = epg_pool:query( Connection, "select exists (select 1 from pg_type where typname = 'task_status')" ), - case IsTaskStatusExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TYPE task_status AS ENUM " - "('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled')" - ) - end, + _ = + case IsTaskStatusExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TYPE task_status AS ENUM " + "('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled')" + ) + end, %% create type task_type if not exists {ok, _, [{IsTaskTypeExists}]} = epg_pool:query( Connection, "select exists (select 1 from pg_type where typname = 'task_type')" ), - case IsTaskTypeExists of - true -> - ok; - false -> - {ok, _, _} = epg_pool:query( - Connection, - "CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove')" - ) - end, + _ = + case IsTaskTypeExists of + true -> + ok; + false -> + {ok, _, _} = epg_pool:query( + Connection, + "CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove')" + ) + end, %% create processes table {ok, _, _} = epg_pool:query( Connection, - "CREATE TABLE IF NOT EXISTS " ++ ProcessesTable ++ " (" - "process_id VARCHAR(80) PRIMARY KEY, " - "status process_status NOT NULL, " - "detail TEXT, " - "aux_state BYTEA, " - "created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), " - "metadata JSONB)" + "CREATE TABLE IF NOT EXISTS " ++ ProcessesTable ++ + " (" + "process_id VARCHAR(80) PRIMARY KEY, " + "status process_status NOT NULL, " + "detail TEXT, " + "aux_state BYTEA, " + "created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), " + "metadata JSONB)" ), %% create tasks table {ok, _, _} = epg_pool:query( Connection, - "CREATE TABLE IF NOT EXISTS " ++ TaskTable ++ " (" - "task_id BIGSERIAL PRIMARY KEY, " - "process_id VARCHAR(80) NOT NULL, " - "task_type task_type NOT NULL, " - "status task_status NOT NULL, " - "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "running_time TIMESTAMP WITH TIME ZONE, " - "finished_time TIMESTAMP WITH TIME ZONE, " - "args BYTEA, " - "metadata JSONB, " - "idempotency_key VARCHAR(80) UNIQUE, " - "response BYTEA, " - "blocked_task BIGINT REFERENCES " ++ TaskTable ++ " (task_id), " - "last_retry_interval INTEGER NOT NULL, " - "attempts_count SMALLINT NOT NULL, " - "context BYTEA, " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id))" + "CREATE TABLE IF NOT EXISTS " ++ TaskTable ++ + " (" + "task_id BIGSERIAL PRIMARY KEY, " + "process_id VARCHAR(80) NOT NULL, " + "task_type task_type NOT NULL, " + "status task_status NOT NULL, " + "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "running_time TIMESTAMP WITH TIME ZONE, " + "finished_time TIMESTAMP WITH TIME ZONE, " + "args BYTEA, " + "metadata JSONB, " + "idempotency_key VARCHAR(80) UNIQUE, " + "response BYTEA, " + "blocked_task BIGINT REFERENCES " ++ TaskTable ++ + " (task_id), " + "last_retry_interval INTEGER NOT NULL, " + "attempts_count SMALLINT NOT NULL, " + "context BYTEA, " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id))" ), %% create constraint for process error cause {ok, _, _} = epg_pool:query( Connection, "ALTER TABLE " ++ ProcessesTable ++ - " ADD COLUMN IF NOT EXISTS corrupted_by BIGINT REFERENCES " ++ TaskTable ++ "(task_id)" + " ADD COLUMN IF NOT EXISTS corrupted_by BIGINT REFERENCES " ++ TaskTable ++ "(task_id)" ), %% create schedule table {ok, _, _} = epg_pool:query( Connection, - "CREATE TABLE IF NOT EXISTS " ++ ScheduleTable ++ " (" - "task_id BIGINT PRIMARY KEY, " - "process_id VARCHAR(80) NOT NULL, " - "task_type task_type NOT NULL, " - "status task_status NOT NULL, " - "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "args BYTEA, " - "metadata JSONB, " - "last_retry_interval INTEGER NOT NULL, " - "attempts_count SMALLINT NOT NULL, " - "context BYTEA, " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id), " - "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" + "CREATE TABLE IF NOT EXISTS " ++ ScheduleTable ++ + " (" + "task_id BIGINT PRIMARY KEY, " + "process_id VARCHAR(80) NOT NULL, " + "task_type task_type NOT NULL, " + "status task_status NOT NULL, " + "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "args BYTEA, " + "metadata JSONB, " + "last_retry_interval INTEGER NOT NULL, " + "attempts_count SMALLINT NOT NULL, " + "context BYTEA, " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ + " (process_id), " + "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" ), %% create running table {ok, _, _} = epg_pool:query( Connection, - "CREATE TABLE IF NOT EXISTS " ++ RunningTable ++ " (" - "process_id VARCHAR(80) PRIMARY KEY, " - "task_id BIGINT NOT NULL, " - "task_type task_type NOT NULL, " - "status task_status NOT NULL, " - "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "running_time TIMESTAMP WITH TIME ZONE NOT NULL, " - "args BYTEA, " - "metadata JSONB, " - "last_retry_interval INTEGER NOT NULL, " - "attempts_count SMALLINT NOT NULL, " - "context BYTEA, " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id), " - "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" + "CREATE TABLE IF NOT EXISTS " ++ RunningTable ++ + " (" + "process_id VARCHAR(80) PRIMARY KEY, " + "task_id BIGINT NOT NULL, " + "task_type task_type NOT NULL, " + "status task_status NOT NULL, " + "scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "running_time TIMESTAMP WITH TIME ZONE NOT NULL, " + "args BYTEA, " + "metadata JSONB, " + "last_retry_interval INTEGER NOT NULL, " + "attempts_count SMALLINT NOT NULL, " + "context BYTEA, " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ + " (process_id), " + "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" ), %% create events table {ok, _, _} = epg_pool:query( Connection, - "CREATE TABLE IF NOT EXISTS " ++ EventsTable ++ " (" - "process_id VARCHAR(80) NOT NULL, " - "task_id BIGINT NOT NULL, " - "event_id SMALLINT NOT NULL, " - "timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(), " - "metadata JSONB, " - "payload BYTEA NOT NULL, " - "PRIMARY KEY (process_id, event_id), " - "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id), " - "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" + "CREATE TABLE IF NOT EXISTS " ++ EventsTable ++ + " (" + "process_id VARCHAR(80) NOT NULL, " + "task_id BIGINT NOT NULL, " + "event_id SMALLINT NOT NULL, " + "timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(), " + "metadata JSONB, " + "payload BYTEA NOT NULL, " + "PRIMARY KEY (process_id, event_id), " + "FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ + " (process_id), " + "FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))" ), %% create indexes {ok, _, _} = epg_pool:query( @@ -680,8 +738,8 @@ db_init(#{pool := Pool}, NsId) -> Connection, "CREATE INDEX IF NOT EXISTS task_idx on " ++ RunningTable ++ " USING HASH (task_id)" ) - - end), + end + ), ok. %-ifdef(TEST). @@ -777,13 +835,26 @@ do_save_task(Connection, Table, Task, Returning) -> Context = maps:get(context, Task, <<>>), epg_pool:query( Connection, - "INSERT INTO " ++ Table ++ " " - " (process_id, task_type, status, scheduled_time, running_time, finished_time, args, " - " metadata, idempotency_key, blocked_task, response, last_retry_interval, attempts_count, context)" - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING " ++ Returning, + "INSERT INTO " ++ Table ++ + " " + " (process_id, task_type, status, scheduled_time, running_time, finished_time, args, " + " metadata, idempotency_key, blocked_task, response, last_retry_interval, attempts_count, context)" + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING " ++ Returning, [ - ProcessId, TaskType, Status, ScheduledTs, RunningTs, FinishedTs, Args, - json_encode(MetaData), IdempotencyKey, BlockedTask, Response, LastRetryInterval, AttemptsCount, Context + ProcessId, + TaskType, + Status, + ScheduledTs, + RunningTs, + FinishedTs, + Args, + json_encode(MetaData), + IdempotencyKey, + BlockedTask, + Response, + LastRetryInterval, + AttemptsCount, + Context ] ). %% @@ -807,14 +878,24 @@ do_save_running(Connection, Table, Task, Returning) -> Context = maps:get(context, Task, <<>>), epg_pool:query( Connection, - "INSERT INTO " ++ Table ++ " " - " (task_id, process_id, task_type, status, scheduled_time, running_time, " - " args, metadata, last_retry_interval, attempts_count, context)" - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " - " ON CONFLICT (process_id) DO NOTHING RETURNING " ++ Returning, + "INSERT INTO " ++ Table ++ + " " + " (task_id, process_id, task_type, status, scheduled_time, running_time, " + " args, metadata, last_retry_interval, attempts_count, context)" + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " + " ON CONFLICT (process_id) DO NOTHING RETURNING " ++ Returning, [ - TaskId, ProcessId, TaskType, Status, unixtime_to_datetime(ScheduledTs), unixtime_to_datetime(RunningTs), - Args, json_encode(MetaData), LastRetryInterval, AttemptsCount, Context + TaskId, + ProcessId, + TaskType, + Status, + unixtime_to_datetime(ScheduledTs), + unixtime_to_datetime(RunningTs), + Args, + json_encode(MetaData), + LastRetryInterval, + AttemptsCount, + Context ] ). @@ -836,12 +917,22 @@ do_save_schedule(Connection, Table, Task, Returning) -> Context = maps:get(context, Task, <<>>), epg_pool:query( Connection, - "INSERT INTO " ++ Table ++ " " - " (task_id, process_id, task_type, status, scheduled_time, args, metadata, last_retry_interval, attempts_count, context)" - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING " ++ Returning, + "INSERT INTO " ++ Table ++ + " " + " (task_id, process_id, task_type, status, scheduled_time, args," + " metadata, last_retry_interval, attempts_count, context)" + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING " ++ Returning, [ - TaskId, ProcessId, TaskType, Status, unixtime_to_datetime(ScheduledTs), Args, - json_encode(MetaData), LastRetryInterval, AttemptsCount, Context + TaskId, + ProcessId, + TaskType, + Status, + unixtime_to_datetime(ScheduledTs), + Args, + json_encode(MetaData), + LastRetryInterval, + AttemptsCount, + Context ] ). @@ -876,8 +967,9 @@ do_update_process(Connection, ProcessesTable, Process) -> CorruptedBy = maps:get(corrupted_by, Process, null), epg_pool:query( Connection, - "UPDATE " ++ ProcessesTable ++ " SET status = $1, detail = $2, aux_state = $3, metadata = $4, corrupted_by = $5 " - "WHERE process_id = $6", + "UPDATE " ++ ProcessesTable ++ + " SET status = $1, detail = $2, aux_state = $3, metadata = $4, corrupted_by = $5 " + "WHERE process_id = $6", [Status, Detail, AuxState, json_encode(MetaData), CorruptedBy, ProcessId] ). @@ -890,8 +982,9 @@ do_save_event(Connection, EventsTable, ProcessId, TaskId, Event) -> MetaData = maps:get(metadata, Event, null), epg_pool:query( Connection, - "INSERT INTO " ++ EventsTable ++ " (process_id, task_id, event_id, timestamp, payload, metadata) " - "VALUES ($1, $2, $3, $4, $5, $6)", + "INSERT INTO " ++ EventsTable ++ + " (process_id, task_id, event_id, timestamp, payload, metadata) " + "VALUES ($1, $2, $3, $4, $5, $6)", [ProcessId, TaskId, EventId, unixtime_to_datetime(EventTs), Payload, json_encode(MetaData)] ). @@ -906,9 +999,10 @@ do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult) {ok, _} = epg_pool:query( Connection, "WITH deleted AS(" - " DELETE FROM " ++ RunningTable ++ " WHERE process_id = $4" - " )" - "UPDATE " ++ TaskTable ++ " SET status = $1, response = $2, finished_time = $3 WHERE task_id = $5", + " DELETE FROM " ++ RunningTable ++ + " WHERE process_id = $4" + " )" + "UPDATE " ++ TaskTable ++ " SET status = $1, response = $2, finished_time = $3 WHERE task_id = $5", [Status, Response, unixtime_to_datetime(FinishedTime), ProcessId, TaskId] ), case Status of @@ -921,15 +1015,18 @@ do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult) epg_pool:query( Connection, "WITH postponed_tasks AS (" - " DELETE FROM " ++ ScheduleTable ++ " WHERE task_id = " - " (SELECT min(task_id) FROM " ++ ScheduleTable ++ - " WHERE process_id = $1 AND status = 'waiting' AND task_type IN ('call', 'repair')) " - " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " - " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " - " last_retry_interval, attempts_count, context" - " ) " - "INSERT INTO " ++ RunningTable ++ " (task_id, process_id, task_type, status, scheduled_time, running_time, args, " - " metadata, last_retry_interval, attempts_count, context) SELECT * FROM postponed_tasks RETURNING *", + " DELETE FROM " ++ ScheduleTable ++ + " WHERE task_id = " + " (SELECT min(task_id) FROM " ++ ScheduleTable ++ + " WHERE process_id = $1 AND status = 'waiting' AND task_type IN ('call', 'repair')) " + " RETURNING task_id, process_id, task_type, 'running'::task_status as status, scheduled_time, " + " TO_TIMESTAMP($2, 'YYYY-MM-DD HH24:MI:SS') as running_time, args, metadata, " + " last_retry_interval, attempts_count, context" + " ) " + "INSERT INTO " ++ RunningTable ++ + " (task_id, process_id, task_type, status, scheduled_time, running_time, args, " + " metadata, last_retry_interval, attempts_count, context)" + " SELECT * FROM postponed_tasks RETURNING *", [ProcessId, RunningTime] ) end. @@ -937,8 +1034,9 @@ do_complete_task(Connection, TaskTable, ScheduleTable, RunningTable, TaskResult) do_block_timer(Connection, ScheduleTable, ProcessId) -> {ok, _, _Columns, Rows} = epg_pool:query( Connection, - "UPDATE " ++ ScheduleTable ++ " SET status = 'blocked' WHERE task_type IN ('timeout', 'remove') AND " - "process_id = $1 AND status = 'waiting' RETURNING task_id", + "UPDATE " ++ ScheduleTable ++ + " SET status = 'blocked' WHERE task_type IN ('timeout', 'remove') AND " + "process_id = $1 AND status = 'waiting' RETURNING task_id", [ProcessId] ), case Rows of @@ -949,8 +1047,9 @@ do_block_timer(Connection, ScheduleTable, ProcessId) -> do_unlock_timer(Connection, ScheduleTable, ProcessId) -> epg_pool:query( Connection, - "UPDATE " ++ ScheduleTable ++ " SET status = 'waiting' " - "WHERE process_id = $1 AND status = 'blocked'", + "UPDATE " ++ ScheduleTable ++ + " SET status = 'waiting' " + "WHERE process_id = $1 AND status = 'blocked'", [ProcessId] ). @@ -958,11 +1057,13 @@ do_cancel_timer(Connection, TaskTable, ScheduleTable, ProcessId) -> epg_pool:query( Connection, "WITH deleted_tasks as(" - " DELETE FROM " ++ ScheduleTable ++ " WHERE process_id = $1 AND task_type IN ('timeout', 'remove') " - " AND (status = 'waiting' OR status = 'blocked') RETURNING task_id" - " ) " - "MERGE INTO " ++ TaskTable ++ " as tt USING deleted_tasks as dt ON tt.task_id = dt.task_id " - " WHEN MATCHED THEN UPDATE SET status = 'cancelled'", + " DELETE FROM " ++ ScheduleTable ++ + " WHERE process_id = $1 AND task_type IN ('timeout', 'remove') " + " AND (status = 'waiting' OR status = 'blocked') RETURNING task_id" + " ) " + "MERGE INTO " ++ TaskTable ++ + " as tt USING deleted_tasks as dt ON tt.task_id = dt.task_id " + " WHEN MATCHED THEN UPDATE SET status = 'cancelled'", [ProcessId] ). @@ -970,11 +1071,13 @@ do_cancel_calls(Connection, TaskTable, ScheduleTable, ProcessId) -> epg_pool:query( Connection, "WITH deleted_tasks as(" - " DELETE FROM " ++ ScheduleTable ++ " WHERE process_id = $1 AND task_type NOT IN ('timeout', 'remove') AND status = 'waiting' " - " RETURNING task_id" - " ) " - "MERGE INTO " ++ TaskTable ++ " as tt USING deleted_tasks as dt ON tt.task_id = dt.task_id " - " WHEN MATCHED THEN UPDATE SET status = 'cancelled'", + " DELETE FROM " ++ ScheduleTable ++ + " WHERE process_id = $1 AND task_type NOT IN ('timeout', 'remove') AND status = 'waiting' " + " RETURNING task_id" + " ) " + "MERGE INTO " ++ TaskTable ++ + " as tt USING deleted_tasks as dt ON tt.task_id = dt.task_id " + " WHEN MATCHED THEN UPDATE SET status = 'cancelled'", [ProcessId] ). @@ -1032,11 +1135,16 @@ unixtime_to_text(TimestampSec) -> {Hour, Minute, Seconds} } = calendar:gregorian_seconds_to_datetime(TimestampSec + ?EPOCH_DIFF), << - (integer_to_binary(Year))/binary, "-", - (maybe_add_zero(Month))/binary, "-", - (maybe_add_zero(Day))/binary, " ", - (maybe_add_zero(Hour))/binary, "-", - (maybe_add_zero(Minute))/binary, "-", + (integer_to_binary(Year))/binary, + "-", + (maybe_add_zero(Month))/binary, + "-", + (maybe_add_zero(Day))/binary, + " ", + (maybe_add_zero(Hour))/binary, + "-", + (maybe_add_zero(Minute))/binary, + "-", (maybe_add_zero(Seconds))/binary >>. @@ -1053,48 +1161,60 @@ json_encode(MetaData) -> %% Marshalling marshal_task(Task) -> - maps:fold(fun - (_, null, Acc) -> Acc; - (<<"task_id">>, TaskId, Acc) -> Acc#{task_id => TaskId}; - (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; - (<<"task_type">>, TaskType, Acc) -> Acc#{task_type => TaskType}; - (<<"status">>, Status, Acc) -> Acc#{status => Status}; - (<<"scheduled_time">>, Ts, Acc) -> Acc#{scheduled_time => Ts}; - (<<"running_time">>, Ts, Acc) -> Acc#{running_time => Ts}; - (<<"args">>, Args, Acc) -> Acc#{args => Args}; - (<<"metadata">>, MetaData, Acc) -> Acc#{metadata => MetaData}; - (<<"idempotency_key">>, IdempotencyKey, Acc) -> Acc#{idempotency_key => IdempotencyKey}; - (<<"response">>, Response, Acc) -> Acc#{response => Response}; - (<<"blocked_task">>, BlockedTaskId, Acc) -> Acc#{blocked_task => BlockedTaskId}; - (<<"last_retry_interval">>, LastRetryInterval, Acc) -> Acc#{last_retry_interval => LastRetryInterval}; - (<<"attempts_count">>, AttemptsCount, Acc) -> Acc#{attempts_count => AttemptsCount}; - (<<"context">>, Context, Acc) -> Acc#{context => Context}; - (_, _, Acc) -> Acc - end, #{}, Task). + maps:fold( + fun + (_, null, Acc) -> Acc; + (<<"task_id">>, TaskId, Acc) -> Acc#{task_id => TaskId}; + (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; + (<<"task_type">>, TaskType, Acc) -> Acc#{task_type => TaskType}; + (<<"status">>, Status, Acc) -> Acc#{status => Status}; + (<<"scheduled_time">>, Ts, Acc) -> Acc#{scheduled_time => Ts}; + (<<"running_time">>, Ts, Acc) -> Acc#{running_time => Ts}; + (<<"args">>, Args, Acc) -> Acc#{args => Args}; + (<<"metadata">>, MetaData, Acc) -> Acc#{metadata => MetaData}; + (<<"idempotency_key">>, IdempotencyKey, Acc) -> Acc#{idempotency_key => IdempotencyKey}; + (<<"response">>, Response, Acc) -> Acc#{response => Response}; + (<<"blocked_task">>, BlockedTaskId, Acc) -> Acc#{blocked_task => BlockedTaskId}; + (<<"last_retry_interval">>, LastRetryInterval, Acc) -> Acc#{last_retry_interval => LastRetryInterval}; + (<<"attempts_count">>, AttemptsCount, Acc) -> Acc#{attempts_count => AttemptsCount}; + (<<"context">>, Context, Acc) -> Acc#{context => Context}; + (_, _, Acc) -> Acc + end, + #{}, + Task + ). marshal_process(Process) -> - maps:fold(fun - (_, null, Acc) -> Acc; - (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; - (<<"status">>, Status, Acc) -> Acc#{status => Status}; - (<<"detail">>, Detail, Acc) -> Acc#{detail => Detail}; - (<<"aux_state">>, AuxState, Acc) -> Acc#{aux_state => AuxState}; - (<<"p_meta">>, Meta, Acc) -> Acc#{metadata => Meta}; - (<<"corrupted_by">>, CorruptedBy, Acc) -> Acc#{corrupted_by => CorruptedBy}; - (_, _, Acc) -> Acc - end, #{}, Process). + maps:fold( + fun + (_, null, Acc) -> Acc; + (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; + (<<"status">>, Status, Acc) -> Acc#{status => Status}; + (<<"detail">>, Detail, Acc) -> Acc#{detail => Detail}; + (<<"aux_state">>, AuxState, Acc) -> Acc#{aux_state => AuxState}; + (<<"p_meta">>, Meta, Acc) -> Acc#{metadata => Meta}; + (<<"corrupted_by">>, CorruptedBy, Acc) -> Acc#{corrupted_by => CorruptedBy}; + (_, _, Acc) -> Acc + end, + #{}, + Process + ). marshal_event(Event) -> - maps:fold(fun - (_, null, Acc) -> Acc; - (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; - (<<"task_id">>, TaskId, Acc) -> Acc#{task_id => TaskId}; - (<<"event_id">>, EventId, Acc) -> Acc#{event_id => EventId}; - (<<"timestamp">>, Ts, Acc) -> Acc#{timestamp => Ts}; - (<<"metadata">>, MetaData, Acc) -> Acc#{metadata => MetaData}; - (<<"payload">>, Payload, Acc) -> Acc#{payload => Payload}; - (_, _, Acc) -> Acc - end, #{}, Event). + maps:fold( + fun + (_, null, Acc) -> Acc; + (<<"process_id">>, ProcessId, Acc) -> Acc#{process_id => ProcessId}; + (<<"task_id">>, TaskId, Acc) -> Acc#{task_id => TaskId}; + (<<"event_id">>, EventId, Acc) -> Acc#{event_id => EventId}; + (<<"timestamp">>, Ts, Acc) -> Acc#{timestamp => Ts}; + (<<"metadata">>, MetaData, Acc) -> Acc#{metadata => MetaData}; + (<<"payload">>, Payload, Acc) -> Acc#{payload => Payload}; + (_, _, Acc) -> Acc + end, + #{}, + Event + ). %% get_pool(internal, #{pool := Pool}) -> diff --git a/test/prg_base_SUITE.erl b/test/prg_base_SUITE.erl index 14e756e..aad9de1 100644 --- a/test/prg_base_SUITE.erl +++ b/test/prg_base_SUITE.erl @@ -34,23 +34,24 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -all() -> [ - simple_timers_test, - simple_call_test, - call_replace_timer_test, - call_unset_timer_test, - postponed_call_test, - postponed_call_to_suspended_process_test, - multiple_calls_test, - repair_after_non_retriable_error_test, - error_after_max_retries_test, - repair_after_call_error_test, - remove_by_timer_test, - remove_without_timer_test, - put_process_test, - put_process_with_timeout_test, - put_process_with_remove_test -]. +all() -> + [ + simple_timers_test, + simple_call_test, + call_replace_timer_test, + call_unset_timer_test, + postponed_call_test, + postponed_call_to_suspended_process_test, + multiple_calls_test, + repair_after_non_retriable_error_test, + error_after_max_retries_test, + repair_after_call_error_test, + remove_by_timer_test, + remove_without_timer_test, + put_process_test, + put_process_with_timeout_test, + put_process_with_remove_test + ]. -spec simple_timers_test(_) -> _. simple_timers_test(_C) -> @@ -314,24 +315,25 @@ repair_after_non_retriable_error_test(_C) -> }} = progressor:get(#{ns => ?NS, id => Id}), {ok, ok} = progressor:repair(#{ns => ?NS, id => Id, args => <<"repair_args">>}), 4 = expect_steps_counter(4), - {ok, #{ - process_id := Id, - status := <<"running">>, - history := [ - #{ - event_id := 1, - metadata := #{<<"format_version">> := 1}, - payload := _Pl1, - timestamp := _Ts1 - }, - #{ - event_id := 2, - metadata := #{<<"format_version">> := 1}, - payload := _Pl2, - timestamp := _Ts2 - } - ] - } = Process} = progressor:get(#{ns => ?NS, id => Id}), + {ok, + #{ + process_id := Id, + status := <<"running">>, + history := [ + #{ + event_id := 1, + metadata := #{<<"format_version">> := 1}, + payload := _Pl1, + timestamp := _Ts1 + }, + #{ + event_id := 2, + metadata := #{<<"format_version">> := 1}, + payload := _Pl2, + timestamp := _Ts2 + } + ] + } = Process} = progressor:get(#{ns => ?NS, id => Id}), false = erlang:is_map_key(detail, Process), unmock_processor(), ok. @@ -375,7 +377,9 @@ repair_after_call_error_test(_C) -> process_id := Id, status := <<"error">> }} = progressor:get(#{ns => ?NS, id => Id}), - {error, <<"repair_error">>} = progressor:repair(#{ns => ?NS, id => Id, args => <<"bad_repair_args">>}), + {error, <<"repair_error">>} = progressor:repair(#{ + ns => ?NS, id => Id, args => <<"bad_repair_args">> + }), 3 = expect_steps_counter(3), %% shoul not rewrite detail {ok, #{ @@ -462,18 +466,20 @@ remove_without_timer_test(_C) -> -spec put_process_test(_C) -> _. put_process_test(_C) -> Id = gen_id(), - Args = #{process => #{ - process_id => Id, - status => <<"running">>, - history => [ - event(1), - event(2), - event(3) - ] - }}, + Args = #{ + process => #{ + process_id => Id, + status => <<"running">>, + history => [ + event(1), + event(2), + event(3) + ] + } + }, {ok, ok} = progressor:put(#{ns => ?NS, id => Id, args => Args}), - {ok,#{ - process_id := Id, + {ok, #{ + process_id := Id, status := <<"running">>, history := [ #{ @@ -520,13 +526,13 @@ put_process_with_timeout_test(_C) -> }, {ok, ok} = progressor:put(#{ns => ?NS, id => Id, args => Args}), {ok, #{ - process_id := Id, + process_id := Id, status := <<"running">>, history := [#{event_id := 1}] }} = progressor:get(#{ns => ?NS, id => Id}), 1 = expect_steps_counter(1), {ok, #{ - process_id := Id, + process_id := Id, status := <<"running">>, history := [#{event_id := 1}, #{event_id := 2}] }} = progressor:get(#{ns => ?NS, id => Id}), @@ -549,7 +555,7 @@ put_process_with_remove_test(_C) -> }, {ok, ok} = progressor:put(#{ns => ?NS, id => Id, args => Args}), {ok, #{ - process_id := Id, + process_id := Id, status := <<"running">>, history := [#{event_id := 1}] }} = progressor:get(#{ns => ?NS, id => Id}), @@ -843,13 +849,12 @@ mock_processor(repair_after_call_error_test = TestCase) -> mock_processor(TestCase, MockProcessor); %% mock_processor(remove_by_timer_test = TestCase) -> - MockProcessor = fun - ({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> - Result = #{ - events => [event(1), event(2)], - action => #{set_timer => erlang:system_time(second) + 2, remove => true} - }, - {ok, Result} + MockProcessor = fun({init, <<"init_args">>, _Process}, _Opts, _Ctx) -> + Result = #{ + events => [event(1), event(2)], + action => #{set_timer => erlang:system_time(second) + 2, remove => true} + }, + {ok, Result} end, mock_processor(TestCase, MockProcessor); %% @@ -875,11 +880,10 @@ mock_processor(remove_without_timer_test = TestCase) -> %% mock_processor(put_process_with_timeout_test = TestCase) -> Self = self(), - MockProcessor = fun - ({timeout, <<>>, _Process}, _Opts, _Ctx) -> - Result = #{events => [event(2)]}, - Self ! 1, - {ok, Result} + MockProcessor = fun({timeout, <<>>, _Process}, _Opts, _Ctx) -> + Result = #{events => [event(2)]}, + Self ! 1, + {ok, Result} end, mock_processor(TestCase, MockProcessor). @@ -917,7 +921,8 @@ event(Id) -> event_id => Id, timestamp => erlang:system_time(second), metadata => #{<<"format_version">> => 1}, - payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)}) %% msg_pack compatibility for kafka + %% msg_pack compatibility for kafka + payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)}) }. gen_id() -> diff --git a/test/prg_ct_hook.erl b/test/prg_ct_hook.erl index 84269b4..59240eb 100644 --- a/test/prg_ct_hook.erl +++ b/test/prg_ct_hook.erl @@ -20,11 +20,14 @@ terminate(_State) -> ok. %% Internal functions start_applications() -> - lists:foreach(fun(App) -> - application:load(App), - lists:foreach(fun({K, V}) -> ok = application:set_env(App, K, V) end, app_env(App)), - {ok, _} = application:ensure_all_started(App) - end, app_list()). + lists:foreach( + fun(App) -> + _ = application:load(App), + lists:foreach(fun({K, V}) -> ok = application:set_env(App, K, V) end, app_env(App)), + {ok, _} = application:ensure_all_started(App) + end, + app_list() + ). app_list() -> %% in order of launch @@ -44,9 +47,11 @@ app_env(progressor) -> } }, retry_policy => #{ - initial_timeout => 1, %% seconds + %% seconds + initial_timeout => 1, backoff_coefficient => 1.0, - max_timeout => 180, %% seconds + %% seconds + max_timeout => 180, max_attempts => 3, non_retryable_errors => [ do_not_retry, @@ -56,9 +61,11 @@ app_env(progressor) -> {temporary, unavilable} ] }, - task_scan_timeout => 1, %% seconds + %% seconds + task_scan_timeout => 1, worker_pool_size => 10, - process_step_timeout => 10 %% seconds + %% seconds + process_step_timeout => 10 }}, {namespaces, #{