From c01e3cac68a07b154f818c08174821f54715d72b Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 17 Feb 2025 19:37:13 +0300 Subject: [PATCH 1/8] Implements hybrid machinery backend --- apps/bender/src/bender.erl | 14 +++-- apps/bender/src/bender_utils.erl | 26 +++++++- apps/bender/test/bender_tests_SUITE.erl | 67 ++++++++++++++++++++- apps/bender/test/generator_tests_SUITE.erl | 56 +++++++++++++++++- compose.yaml | 30 ++++++++++ config/sys.config | 69 ++++++++++++++++++++++ rebar.config | 2 +- rebar.lock | 7 ++- 8 files changed, 258 insertions(+), 13 deletions(-) diff --git a/apps/bender/src/bender.erl b/apps/bender/src/bender.erl index f40bab0..5c76689 100644 --- a/apps/bender/src/bender.erl +++ b/apps/bender/src/bender.erl @@ -56,7 +56,7 @@ init([]) -> shutdown_timeout => get_shutdown_timeout(), event_handler => EventHandlers, handlers => get_handler_spec(), - additional_routes => get_routes(EventHandlers) + additional_routes => get_routes(EventHandlers, genlib_app:env(?MODULE, machinery_backend)) } ), Flags = #{strategy => one_for_all, intensity => 6, period => 30}, @@ -101,8 +101,13 @@ get_handler_spec() -> }} ]. --spec get_routes(woody:ev_handlers()) -> [woody_server_thrift_http_handler:route(_)]. -get_routes(EventHandlers) -> +-spec get_routes(woody:ev_handlers(), machinegun | progressor | hybrid) -> [woody_server_thrift_http_handler:route(_)]. +get_routes(_EventHandlers, progressor) -> + %% Shared routes + Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})), + [erl_health_handle:get_route(Check), get_prometheus_route()]; +get_routes(EventHandlers, Mode) when Mode == machinegun orelse Mode == hybrid -> + %% Machinegun specific routes RouteOptsEnv = genlib_app:env(?MODULE, route_opts, #{}), RouteOpts = RouteOptsEnv#{event_handler => EventHandlers}, Generator = genlib_app:env(bender, generator, #{}), @@ -121,8 +126,7 @@ get_routes(EventHandlers) -> } }} ], - Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})), - [erl_health_handle:get_route(Check), get_prometheus_route() | machinery_mg_backend:get_routes(Handlers, RouteOpts)]. + get_routes(EventHandlers, progressor) ++ machinery_mg_backend:get_routes(Handlers, RouteOpts). -spec enable_health_logging(erl_health:check()) -> erl_health:check(). enable_health_logging(Check) -> diff --git a/apps/bender/src/bender_utils.erl b/apps/bender/src/bender_utils.erl index 19aae28..d205b6e 100644 --- a/apps/bender/src/bender_utils.erl +++ b/apps/bender/src/bender_utils.erl @@ -26,15 +26,37 @@ unique_id() -> <> = snowflake:new(), genlib_format:format_int_base(ID, 62). --spec get_backend(atom(), woody_context()) -> machinery_mg_backend:backend(). +-spec get_backend(generator | sequence, woody_context()) -> machinery_mg_backend:backend(). get_backend(Service, WoodyCtx) -> + get_backend(genlib_app:env(bender, machinery_backend), Service, WoodyCtx). + +%%% Internal functions + +get_backend(hybrid, Service, WoodyCtx) -> + {machinery_hybrid_backend, #{ + primary_backend => get_backend(progressor, Service, WoodyCtx), + fallback_backend => get_backend(machinegun, Service, WoodyCtx) + }}; +get_backend(progressor, Service, WoodyCtx) -> + Automaton = genlib_app:env(bender, Service, #{}), + {Namespace, Handler} = get_machinery_namespace_handler(Service), + machinery_prg_backend:new(WoodyCtx, #{ + namespace => Namespace, + handler => Handler, + schema => maps:get(schema, Automaton, machinery_mg_schema_generic) + }); +get_backend(machinegun, Service, WoodyCtx) -> Automaton = genlib_app:env(bender, Service, #{}), machinery_mg_backend:new(WoodyCtx, #{ client => get_woody_client(Automaton), schema => maps:get(schema, Automaton, machinery_mg_schema_generic) }). -%%% Internal functions +-spec get_machinery_namespace_handler(generator | sequence) -> {machinery:namespace(), machinery:logic_handler(_)}. +get_machinery_namespace_handler(generator) -> + {'bender_generator', {bender_generator, #{}}}; +get_machinery_namespace_handler(sequence) -> + {'bender_sequence', {bender_sequence, #{}}}. -spec get_woody_client(automaton()) -> machinery_mg_client:woody_client(). get_woody_client(#{url := Url} = Automaton) -> diff --git a/apps/bender/test/bender_tests_SUITE.erl b/apps/bender/test/bender_tests_SUITE.erl index 3fed330..d3bfcbc 100644 --- a/apps/bender/test/bender_tests_SUITE.erl +++ b/apps/bender/test/bender_tests_SUITE.erl @@ -65,10 +65,75 @@ groups() -> -spec init_per_suite(config()) -> config(). init_per_suite(C) -> + %% _ = dbg:tracer(), + %% _ = dbg:p(all, c), + %% _ = dbg:tpl({'progressor', 'get', '_'}, x), + EpgConnectorApps = genlib_app:start_application_with(epg_connector, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 30 + } + }} + ]), + ProgressorApps = genlib_app:start_application_with(progressor, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 100, + process_step_timeout => 30 + }}, + {namespaces, #{ + 'bender_generator' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_generator', + handler => {bender_generator, #{}}, + schema => machinery_mg_schema_generic + } + } + }, + 'bender_sequence' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_sequence', + handler => {bender_sequence, #{}}, + schema => machinery_mg_schema_generic + } + } + } + }} + ]), ScoperApps = genlib_app:start_application_with(scoper, [ {storage, scoper_storage_logger} ]), BenderApps = genlib_app:start_application_with(bender, [ + {machinery_backend, hybrid}, {generator, #{ path => <<"/v1/stateproc/bender_generator">>, schema => machinery_mg_schema_generic, @@ -95,7 +160,7 @@ init_per_suite(C) -> num_acceptors => 100 }} ]), - Apps = ScoperApps ++ BenderApps, + Apps = EpgConnectorApps ++ ProgressorApps ++ ScoperApps ++ BenderApps, [{suite_apps, Apps} | C]. -spec end_per_suite(config()) -> ok. diff --git a/apps/bender/test/generator_tests_SUITE.erl b/apps/bender/test/generator_tests_SUITE.erl index c1dba61..b60433b 100644 --- a/apps/bender/test/generator_tests_SUITE.erl +++ b/apps/bender/test/generator_tests_SUITE.erl @@ -42,10 +42,62 @@ groups() -> -spec init_per_suite(config()) -> config(). init_per_suite(C) -> Apps = - genlib_app:start_application_with(scoper, [ - {storage, scoper_storage_logger} + genlib_app:start_application_with(epg_connector, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 30 + } + }} ]) ++ + genlib_app:start_application_with(progressor, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 100, + process_step_timeout => 30 + }}, + {namespaces, #{ + 'bender_sequence' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_sequence', + handler => {bender_sequence, #{}}, + schema => machinery_mg_schema_generic + } + } + } + }} + ]) ++ + genlib_app:start_application_with(scoper, [ + {storage, scoper_storage_logger} + ]) ++ genlib_app:start_application_with(bender, [ + {machinery_backend, machinegun}, {sequence, #{ path => <<"/v1/stateproc/bender_sequence">>, schema => machinery_mg_schema_generic, diff --git a/compose.yaml b/compose.yaml index 479771c..c25d9d6 100644 --- a/compose.yaml +++ b/compose.yaml @@ -11,6 +11,8 @@ services: - .:$PWD hostname: $SERVICE_NAME depends_on: + postgres: + condition: service_healthy machinegun: condition: service_healthy working_dir: $PWD @@ -27,3 +29,31 @@ services: interval: 10s timeout: 5s retries: 10 + + postgres: + image: postgres:15-bookworm + command: -c 'max_connections=200' + environment: + POSTGRES_DB: "progressor_db" + POSTGRES_USER: "progressor" + POSTGRES_PASSWORD: "progressor" + PGDATA: "/tmp/postgresql/data/pgdata" + volumes: + - progressor-data:/tmp/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U progressor -d progressor_db"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + deploy: + resources: + limits: + cpus: '1' + memory: 4G + +volumes: + progressor-data: diff --git a/config/sys.config b/config/sys.config index 8b92442..126e3b1 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,5 +1,11 @@ [ {bender, [ + %% Available options for 'machinery_backend' + %% machinegun | progressor | hybrid + %% + %% For 'progressor' and 'hybrid' backends ensure config + %% '{progressor, [ ... ]}' is set. + {machinery_backend, hybrid}, {services, #{ bender => #{path => <<"/v1/bender">>}, generator => #{path => <<"/v1/generator">>} @@ -89,6 +95,69 @@ ]} ]}, + {epg_connector, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 30 + } + }} + ]}, + + {progressor, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 100, + process_step_timeout => 30 + }}, + {namespaces, #{ + 'bender_generator' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_generator', + handler => {bender_generator, #{}}, + schema => machinery_mg_schema_generic + } + } + }, + 'bender_sequence' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_sequence', + handler => {bender_sequence, #{}}, + schema => machinery_mg_schema_generic + } + } + } + }} + ]}, + {os_mon, [ % for better compatibility with busybox coreutils {disksup_posix_only, true} diff --git a/rebar.config b/rebar.config index 6fdbc11..2f84eb5 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,7 @@ {bender_proto, {git, "https://github.com/valitydev/bender-proto.git", {branch, "master"}}}, {erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, "master"}}}, {genlib, {git, "https://github.com/valitydev/genlib.git", {tag, "v1.1.0"}}}, - {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {tag, "v1.1.0"}}}, + {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {tag, "v1.1.1"}}}, {scoper, {git, "https://github.com/valitydev/scoper.git", {tag, "v1.1.0"}}}, {snowflake, {git, "https://github.com/valitydev/snowflake.git", {branch, "master"}}}, {woody, {git, "https://github.com/valitydev/woody_erlang.git", {tag, "v1.1.0"}}}, diff --git a/rebar.lock b/rebar.lock index 9ca1d02..ce29942 100644 --- a/rebar.lock +++ b/rebar.lock @@ -47,7 +47,7 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},3}, {<<"machinery">>, {git,"https://github.com/valitydev/machinery-erlang.git", - {ref,"0ca82988ec310aceab7686c078c2a20fa6209cde"}}, + {ref,"4ead222fe26bf5341585f69cd2311cdbf8f11c67"}}, 0}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, @@ -70,13 +70,14 @@ {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"a67d4ddbcc3ddc3471e903d6d7291ca8e194906c"}}, + {ref,"d261aaba4a5ea34b0074d7d21de6da1da3eee690"}}, 1}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.8">>},0}, {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.11">>},1}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2}, + {<<"recon">>,{pkg,<<"recon">>,<<"2.5.6">>},2}, {<<"scoper">>, {git,"https://github.com/valitydev/scoper.git", {ref,"0e7aa01e9632daa39727edd62d4656ee715b4569"}}, @@ -130,6 +131,7 @@ {<<"prometheus_httpd">>, <<"F616ED9B85B536B195D94104063025A91F904A4CFC20255363F49A197D96C896">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"ranch">>, <<"8C7A100A139FD57F17327B6413E4167AC559FBC04CA7448E9BE9057311597A1D">>}, + {<<"recon">>, <<"9052588E83BFEDFD9B72E1034532AEE2A5369D9D9343B61AEB7FBCE761010741">>}, {<<"ssl_verify_fun">>, <<"354C321CF377240C7B8716899E182CE4890C5938111A1296ADD3EC74CF1715DF">>}, {<<"tls_certificate_check">>, <<"C76C4C5D79EE79A2B11C84F910C825D6F024A78427C854F515748E9BD025E987">>}, {<<"unicode_util_compat">>, <<"BC84380C9AB48177092F43AC89E4DFA2C6D62B40B8BD132B1059ECC7232F9A78">>}]}, @@ -164,6 +166,7 @@ {<<"prometheus_httpd">>, <<"0BBE831452CFDF9588538EB2F570B26F30C348ADAE5E95A7D87F35A5910BCF92">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"ranch">>, <<"49FBCFD3682FAB1F5D109351B61257676DA1A2FDBE295904176D5E521A2DDFE5">>}, + {<<"recon">>, <<"96C6799792D735CC0F0FD0F86267E9D351E63339CBE03DF9D162010CEFC26BB0">>}, {<<"ssl_verify_fun">>, <<"FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8">>}, {<<"tls_certificate_check">>, <<"4083B4A298ADD534C96125337CB01161C358BB32DD870D5A893AAE685FD91D70">>}, {<<"unicode_util_compat">>, <<"25EEE6D67DF61960CF6A794239566599B09E17E668D3700247BC498638152521">>}]} From f0c86e35b7df755c15ad9fad2820394707939bff Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Tue, 18 Feb 2025 12:06:17 +0300 Subject: [PATCH 2/8] Adds retry strategy for generator's machine read; increases progressor worker pool for testsuite --- apps/bender/src/bender_generator.erl | 48 +++++++++++++++++++------ apps/bender/test/bender_tests_SUITE.erl | 2 +- rebar.config | 4 +-- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/apps/bender/src/bender_generator.erl b/apps/bender/src/bender_generator.erl index 33f3e76..42dd2e7 100644 --- a/apps/bender/src/bender_generator.erl +++ b/apps/bender/src/bender_generator.erl @@ -53,16 +53,19 @@ bind(ExternalID, Schema, UserCtx, WoodyCtx) -> -spec get_internal_id(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return(). get_internal_id(ExternalID, WoodyCtx) -> - case machinery:get(?NS, ExternalID, get_backend(WoodyCtx)) of - {ok, Machine} -> - #{ - internal_id := InternalID, - user_context := UserCtx - } = get_machine_state(Machine), - {ok, InternalID, UserCtx}; - {error, notfound} -> - throw({not_found, ExternalID}) - end. + with_retry(get_retry_strategy(), fun() -> + maybe + {ok, Machine} ?= machinery:get(?NS, ExternalID, get_backend(WoodyCtx)), + #{internal_id := InternalID, user_context := UserCtx} ?= get_machine_state(Machine), + {done, {ok, InternalID, UserCtx}} + else + {error, notfound} -> + throw({not_found, ExternalID}); + %% Retry if machine state is undefined + undefined -> + next + end + end). %%% Machinery callbacks @@ -93,11 +96,34 @@ process_notification(_Args, _Machine, _HandlerArgs, _HandlerOpts) -> %%% Internal functions +-spec get_retry_strategy() -> genlib_retry:strategy(). +get_retry_strategy() -> + Opts = genlib_app:env(bender, generator), + DefaultPolicy = genlib_retry:exponential(5, 2, {jitter, 200, 100}), + genlib_retry:new_strategy(maps:get(retry_policy, Opts, DefaultPolicy)). + +-spec with_retry(genlib_retry:strategy(), fun(() -> {done, T} | next)) -> T | no_return(). +with_retry(Strategy, Fun) -> + maybe + next ?= Fun(), + {wait, Timeout, NextStrategy} ?= genlib_retry:next_step(Strategy), + _ = timer:sleep(Timeout), + with_retry(NextStrategy, Fun) + else + {done, Result} -> + Result; + finish -> + erlang:error(retries_exhausted) + end. + -spec start(external_id(), internal_id(), user_context(), woody_context()) -> ok | {error, exists}. start(ExternalID, InternalID, UserCtx, WoodyCtx) -> machinery:start(?NS, ExternalID, {InternalID, UserCtx}, get_backend(WoodyCtx)). --spec get_machine_state(machine()) -> state(). +%% NOTE Underlying machinery backend can experience race condition on +%% machine writes. Thus it MAY occur that 'aux_state' is undefined on +%% machine read. +-spec get_machine_state(machine()) -> state() | undefined. get_machine_state(#{aux_state := State}) -> State. diff --git a/apps/bender/test/bender_tests_SUITE.erl b/apps/bender/test/bender_tests_SUITE.erl index d3bfcbc..b9dd626 100644 --- a/apps/bender/test/bender_tests_SUITE.erl +++ b/apps/bender/test/bender_tests_SUITE.erl @@ -103,7 +103,7 @@ init_per_suite(C) -> non_retryable_errors => [] }, task_scan_timeout => 1, - worker_pool_size => 100, + worker_pool_size => 600, process_step_timeout => 30 }}, {namespaces, #{ diff --git a/rebar.config b/rebar.config index 2f84eb5..c5dc5b5 100644 --- a/rebar.config +++ b/rebar.config @@ -74,12 +74,10 @@ % for introspection on production {recon, "2.5.2"}, {logger_logstash_formatter, - {git, "https://github.com/valitydev/logger_logstash_formatter.git", {ref, "08a66a6"}}}, - {iosetopts, {git, "https://github.com/valitydev/iosetopts.git", {ref, "edb445c"}}} + {git, "https://github.com/valitydev/logger_logstash_formatter.git", {ref, "08a66a6"}}} ]}, {relx, [ {release, {bender, "1.0.0"}, [ - iosetopts, % tools for introspection {recon, load}, % debugger From c1c89f6ba693298c8aba02fdc02e459be3ff384e Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Tue, 18 Feb 2025 14:26:33 +0300 Subject: [PATCH 3/8] Cranks up pool size to 1000 --- apps/bender/test/bender_tests_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/bender/test/bender_tests_SUITE.erl b/apps/bender/test/bender_tests_SUITE.erl index b9dd626..7e844b3 100644 --- a/apps/bender/test/bender_tests_SUITE.erl +++ b/apps/bender/test/bender_tests_SUITE.erl @@ -103,7 +103,7 @@ init_per_suite(C) -> non_retryable_errors => [] }, task_scan_timeout => 1, - worker_pool_size => 600, + worker_pool_size => 1000, process_step_timeout => 30 }}, {namespaces, #{ From e08b47fb52a2edbed559603bf42bcd9e9393b133 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Tue, 18 Feb 2025 14:58:11 +0300 Subject: [PATCH 4/8] Refactors retry helper func --- apps/bender/src/bender_generator.erl | 57 ++++++++++++++++------------ 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/apps/bender/src/bender_generator.erl b/apps/bender/src/bender_generator.erl index 42dd2e7..9e4221d 100644 --- a/apps/bender/src/bender_generator.erl +++ b/apps/bender/src/bender_generator.erl @@ -48,22 +48,33 @@ bind(ExternalID, Schema, UserCtx, WoodyCtx) -> ok -> {ok, InternalID, undefined}; {error, exists} -> - get_internal_id(ExternalID, WoodyCtx) + get_internal_id_with_retry(ExternalID, WoodyCtx) end. -spec get_internal_id(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return(). get_internal_id(ExternalID, WoodyCtx) -> + case machinery:get(?NS, ExternalID, get_backend(WoodyCtx)) of + {ok, Machine} -> + #{ + internal_id := InternalID, + user_context := UserCtx + } = get_machine_state(Machine), + {ok, InternalID, UserCtx}; + {error, notfound} -> + throw({not_found, ExternalID}) + end. + +-spec get_internal_id_with_retry(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return(). +get_internal_id_with_retry(ExternalID, WoodyCtx) -> with_retry(get_retry_strategy(), fun() -> - maybe - {ok, Machine} ?= machinery:get(?NS, ExternalID, get_backend(WoodyCtx)), - #{internal_id := InternalID, user_context := UserCtx} ?= get_machine_state(Machine), - {done, {ok, InternalID, UserCtx}} - else - {error, notfound} -> - throw({not_found, ExternalID}); - %% Retry if machine state is undefined - undefined -> - next + try + {done, get_internal_id(ExternalID, WoodyCtx)} + catch + %% NOTE Underlying machinery backend can experience race + %% condition on machine writes. Thus it MAY occur that + %% 'aux_state' is undefined on machine read. + error:({badmatch, undefined}):_Stacktrace -> + retry end end). @@ -102,28 +113,26 @@ get_retry_strategy() -> DefaultPolicy = genlib_retry:exponential(5, 2, {jitter, 200, 100}), genlib_retry:new_strategy(maps:get(retry_policy, Opts, DefaultPolicy)). --spec with_retry(genlib_retry:strategy(), fun(() -> {done, T} | next)) -> T | no_return(). +-spec with_retry(genlib_retry:strategy(), fun(() -> {done, T} | retry)) -> T | no_return(). with_retry(Strategy, Fun) -> - maybe - next ?= Fun(), - {wait, Timeout, NextStrategy} ?= genlib_retry:next_step(Strategy), - _ = timer:sleep(Timeout), - with_retry(NextStrategy, Fun) - else + case Fun() of {done, Result} -> Result; - finish -> - erlang:error(retries_exhausted) + retry -> + case genlib_retry:next_step(Strategy) of + {wait, Timeout, NextStrategy} -> + _ = timer:sleep(Timeout), + with_retry(NextStrategy, Fun); + finish -> + erlang:error(retries_exhausted) + end end. -spec start(external_id(), internal_id(), user_context(), woody_context()) -> ok | {error, exists}. start(ExternalID, InternalID, UserCtx, WoodyCtx) -> machinery:start(?NS, ExternalID, {InternalID, UserCtx}, get_backend(WoodyCtx)). -%% NOTE Underlying machinery backend can experience race condition on -%% machine writes. Thus it MAY occur that 'aux_state' is undefined on -%% machine read. --spec get_machine_state(machine()) -> state() | undefined. +-spec get_machine_state(machine()) -> state(). get_machine_state(#{aux_state := State}) -> State. From 9cf047528472500ca4a8fed8b09575525546158f Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Tue, 18 Feb 2025 14:59:25 +0300 Subject: [PATCH 5/8] Changes backend mode for generator testsuite --- apps/bender/test/generator_tests_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/bender/test/generator_tests_SUITE.erl b/apps/bender/test/generator_tests_SUITE.erl index b60433b..a23c5a4 100644 --- a/apps/bender/test/generator_tests_SUITE.erl +++ b/apps/bender/test/generator_tests_SUITE.erl @@ -97,7 +97,7 @@ init_per_suite(C) -> {storage, scoper_storage_logger} ]) ++ genlib_app:start_application_with(bender, [ - {machinery_backend, machinegun}, + {machinery_backend, hybrid}, {sequence, #{ path => <<"/v1/stateproc/bender_sequence">>, schema => machinery_mg_schema_generic, From 85865c8be53e1e8a91fb26da426673a4e73172b1 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Thu, 20 Feb 2025 15:53:02 +0300 Subject: [PATCH 6/8] Adds missing explicit app.src dependency on progressor --- apps/bender/src/bender.app.src | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/bender/src/bender.app.src b/apps/bender/src/bender.app.src index 75bc2b2..160361e 100644 --- a/apps/bender/src/bender.app.src +++ b/apps/bender/src/bender.app.src @@ -11,6 +11,7 @@ prometheus_cowboy, woody, % woody's hackney declares prometheus metrics on start scoper, % should be before any scoper event handler usage + progressor, machinery, erl_health, opentelemetry_api, From f69b877d7a48b96afaab410ebc77ef76b0e4e3cc Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 21 Apr 2025 06:42:37 +0300 Subject: [PATCH 7/8] TECH-156: bump progressor --- rebar.config | 2 +- rebar.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rebar.config b/rebar.config index c5dc5b5..b2c2e8e 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,7 @@ {bender_proto, {git, "https://github.com/valitydev/bender-proto.git", {branch, "master"}}}, {erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, "master"}}}, {genlib, {git, "https://github.com/valitydev/genlib.git", {tag, "v1.1.0"}}}, - {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {tag, "v1.1.1"}}}, + {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {branch, "TECH-156-bump-progressor"}}}, {scoper, {git, "https://github.com/valitydev/scoper.git", {tag, "v1.1.0"}}}, {snowflake, {git, "https://github.com/valitydev/snowflake.git", {branch, "master"}}}, {woody, {git, "https://github.com/valitydev/woody_erlang.git", {tag, "v1.1.0"}}}, diff --git a/rebar.lock b/rebar.lock index ce29942..53c8a8a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -23,7 +23,7 @@ {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"35a7480b298ac4318352a03824ce06619b75f9da"}}, + {ref,"fba34571474b11e1820126afff4bc552f68e1799"}}, 2}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", @@ -47,7 +47,7 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},3}, {<<"machinery">>, {git,"https://github.com/valitydev/machinery-erlang.git", - {ref,"4ead222fe26bf5341585f69cd2311cdbf8f11c67"}}, + {ref,"65382a5e4fd9f57855c681adaed6ef1cbb26c14a"}}, 0}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, @@ -70,7 +70,7 @@ {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"d261aaba4a5ea34b0074d7d21de6da1da3eee690"}}, + {ref,"1877ace83a3693a02ebd3e56768703c5595e35ec"}}, 1}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.8">>},0}, From 1b78eaf68cdfb8ed3c8867ef9d8385ff4f6a032f Mon Sep 17 00:00:00 2001 From: ttt161 Date: Mon, 12 May 2025 13:00:21 +0300 Subject: [PATCH 8/8] TECH-156: bump machinery --- rebar.config | 2 +- rebar.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rebar.config b/rebar.config index b2c2e8e..c723985 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,7 @@ {bender_proto, {git, "https://github.com/valitydev/bender-proto.git", {branch, "master"}}}, {erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, "master"}}}, {genlib, {git, "https://github.com/valitydev/genlib.git", {tag, "v1.1.0"}}}, - {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {branch, "TECH-156-bump-progressor"}}}, + {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {tag, "v1.1.5"}}}, {scoper, {git, "https://github.com/valitydev/scoper.git", {tag, "v1.1.0"}}}, {snowflake, {git, "https://github.com/valitydev/snowflake.git", {branch, "master"}}}, {woody, {git, "https://github.com/valitydev/woody_erlang.git", {tag, "v1.1.0"}}}, diff --git a/rebar.lock b/rebar.lock index 53c8a8a..91f6d4a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -23,7 +23,7 @@ {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"fba34571474b11e1820126afff4bc552f68e1799"}}, + {ref,"82055002c8cb73ef938e7035865419074e7f959b"}}, 2}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", @@ -47,7 +47,7 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},3}, {<<"machinery">>, {git,"https://github.com/valitydev/machinery-erlang.git", - {ref,"65382a5e4fd9f57855c681adaed6ef1cbb26c14a"}}, + {ref,"72ac2f56cf42a99c66310be8265b3c2bc84862fc"}}, 0}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, @@ -70,7 +70,7 @@ {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"1877ace83a3693a02ebd3e56768703c5595e35ec"}}, + {ref,"e2fdf9d11a69e239d3f4dc51aa2dd122d44ee1b0"}}, 1}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.8">>},0},