diff --git a/apps/bender/src/bender.erl b/apps/bender/src/bender.erl index f40bab0..5c76689 100644 --- a/apps/bender/src/bender.erl +++ b/apps/bender/src/bender.erl @@ -56,7 +56,7 @@ init([]) -> shutdown_timeout => get_shutdown_timeout(), event_handler => EventHandlers, handlers => get_handler_spec(), - additional_routes => get_routes(EventHandlers) + additional_routes => get_routes(EventHandlers, genlib_app:env(?MODULE, machinery_backend)) } ), Flags = #{strategy => one_for_all, intensity => 6, period => 30}, @@ -101,8 +101,13 @@ get_handler_spec() -> }} ]. --spec get_routes(woody:ev_handlers()) -> [woody_server_thrift_http_handler:route(_)]. -get_routes(EventHandlers) -> +-spec get_routes(woody:ev_handlers(), machinegun | progressor | hybrid) -> [woody_server_thrift_http_handler:route(_)]. +get_routes(_EventHandlers, progressor) -> + %% Shared routes + Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})), + [erl_health_handle:get_route(Check), get_prometheus_route()]; +get_routes(EventHandlers, Mode) when Mode == machinegun orelse Mode == hybrid -> + %% Machinegun specific routes RouteOptsEnv = genlib_app:env(?MODULE, route_opts, #{}), RouteOpts = RouteOptsEnv#{event_handler => EventHandlers}, Generator = genlib_app:env(bender, generator, #{}), @@ -121,8 +126,7 @@ get_routes(EventHandlers) -> } }} ], - Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})), - [erl_health_handle:get_route(Check), get_prometheus_route() | machinery_mg_backend:get_routes(Handlers, RouteOpts)]. + get_routes(EventHandlers, progressor) ++ machinery_mg_backend:get_routes(Handlers, RouteOpts). -spec enable_health_logging(erl_health:check()) -> erl_health:check(). enable_health_logging(Check) -> diff --git a/apps/bender/src/bender_generator.erl b/apps/bender/src/bender_generator.erl index 33f3e76..9e4221d 100644 --- a/apps/bender/src/bender_generator.erl +++ b/apps/bender/src/bender_generator.erl @@ -48,7 +48,7 @@ bind(ExternalID, Schema, UserCtx, WoodyCtx) -> ok -> {ok, InternalID, undefined}; {error, exists} -> - get_internal_id(ExternalID, WoodyCtx) + get_internal_id_with_retry(ExternalID, WoodyCtx) end. -spec get_internal_id(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return(). @@ -64,6 +64,20 @@ get_internal_id(ExternalID, WoodyCtx) -> throw({not_found, ExternalID}) end. +-spec get_internal_id_with_retry(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return(). +get_internal_id_with_retry(ExternalID, WoodyCtx) -> + with_retry(get_retry_strategy(), fun() -> + try + {done, get_internal_id(ExternalID, WoodyCtx)} + catch + %% NOTE Underlying machinery backend can experience race + %% condition on machine writes. Thus it MAY occur that + %% 'aux_state' is undefined on machine read. + error:({badmatch, undefined}):_Stacktrace -> + retry + end + end). + %%% Machinery callbacks -spec init(args({internal_id(), user_context()}), machine(), handler_args(), handler_opts()) -> result(state()). @@ -93,6 +107,27 @@ process_notification(_Args, _Machine, _HandlerArgs, _HandlerOpts) -> %%% Internal functions +-spec get_retry_strategy() -> genlib_retry:strategy(). +get_retry_strategy() -> + Opts = genlib_app:env(bender, generator), + DefaultPolicy = genlib_retry:exponential(5, 2, {jitter, 200, 100}), + genlib_retry:new_strategy(maps:get(retry_policy, Opts, DefaultPolicy)). + +-spec with_retry(genlib_retry:strategy(), fun(() -> {done, T} | retry)) -> T | no_return(). +with_retry(Strategy, Fun) -> + case Fun() of + {done, Result} -> + Result; + retry -> + case genlib_retry:next_step(Strategy) of + {wait, Timeout, NextStrategy} -> + _ = timer:sleep(Timeout), + with_retry(NextStrategy, Fun); + finish -> + erlang:error(retries_exhausted) + end + end. + -spec start(external_id(), internal_id(), user_context(), woody_context()) -> ok | {error, exists}. start(ExternalID, InternalID, UserCtx, WoodyCtx) -> machinery:start(?NS, ExternalID, {InternalID, UserCtx}, get_backend(WoodyCtx)). diff --git a/apps/bender/src/bender_utils.erl b/apps/bender/src/bender_utils.erl index 19aae28..d205b6e 100644 --- a/apps/bender/src/bender_utils.erl +++ b/apps/bender/src/bender_utils.erl @@ -26,15 +26,37 @@ unique_id() -> <> = snowflake:new(), genlib_format:format_int_base(ID, 62). --spec get_backend(atom(), woody_context()) -> machinery_mg_backend:backend(). +-spec get_backend(generator | sequence, woody_context()) -> machinery_mg_backend:backend(). get_backend(Service, WoodyCtx) -> + get_backend(genlib_app:env(bender, machinery_backend), Service, WoodyCtx). + +%%% Internal functions + +get_backend(hybrid, Service, WoodyCtx) -> + {machinery_hybrid_backend, #{ + primary_backend => get_backend(progressor, Service, WoodyCtx), + fallback_backend => get_backend(machinegun, Service, WoodyCtx) + }}; +get_backend(progressor, Service, WoodyCtx) -> + Automaton = genlib_app:env(bender, Service, #{}), + {Namespace, Handler} = get_machinery_namespace_handler(Service), + machinery_prg_backend:new(WoodyCtx, #{ + namespace => Namespace, + handler => Handler, + schema => maps:get(schema, Automaton, machinery_mg_schema_generic) + }); +get_backend(machinegun, Service, WoodyCtx) -> Automaton = genlib_app:env(bender, Service, #{}), machinery_mg_backend:new(WoodyCtx, #{ client => get_woody_client(Automaton), schema => maps:get(schema, Automaton, machinery_mg_schema_generic) }). -%%% Internal functions +-spec get_machinery_namespace_handler(generator | sequence) -> {machinery:namespace(), machinery:logic_handler(_)}. +get_machinery_namespace_handler(generator) -> + {'bender_generator', {bender_generator, #{}}}; +get_machinery_namespace_handler(sequence) -> + {'bender_sequence', {bender_sequence, #{}}}. -spec get_woody_client(automaton()) -> machinery_mg_client:woody_client(). get_woody_client(#{url := Url} = Automaton) -> diff --git a/apps/bender/test/bender_tests_SUITE.erl b/apps/bender/test/bender_tests_SUITE.erl index 3fed330..7e844b3 100644 --- a/apps/bender/test/bender_tests_SUITE.erl +++ b/apps/bender/test/bender_tests_SUITE.erl @@ -65,10 +65,75 @@ groups() -> -spec init_per_suite(config()) -> config(). init_per_suite(C) -> + %% _ = dbg:tracer(), + %% _ = dbg:p(all, c), + %% _ = dbg:tpl({'progressor', 'get', '_'}, x), + EpgConnectorApps = genlib_app:start_application_with(epg_connector, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 30 + } + }} + ]), + ProgressorApps = genlib_app:start_application_with(progressor, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 1000, + process_step_timeout => 30 + }}, + {namespaces, #{ + 'bender_generator' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_generator', + handler => {bender_generator, #{}}, + schema => machinery_mg_schema_generic + } + } + }, + 'bender_sequence' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_sequence', + handler => {bender_sequence, #{}}, + schema => machinery_mg_schema_generic + } + } + } + }} + ]), ScoperApps = genlib_app:start_application_with(scoper, [ {storage, scoper_storage_logger} ]), BenderApps = genlib_app:start_application_with(bender, [ + {machinery_backend, hybrid}, {generator, #{ path => <<"/v1/stateproc/bender_generator">>, schema => machinery_mg_schema_generic, @@ -95,7 +160,7 @@ init_per_suite(C) -> num_acceptors => 100 }} ]), - Apps = ScoperApps ++ BenderApps, + Apps = EpgConnectorApps ++ ProgressorApps ++ ScoperApps ++ BenderApps, [{suite_apps, Apps} | C]. -spec end_per_suite(config()) -> ok. diff --git a/apps/bender/test/generator_tests_SUITE.erl b/apps/bender/test/generator_tests_SUITE.erl index c1dba61..a23c5a4 100644 --- a/apps/bender/test/generator_tests_SUITE.erl +++ b/apps/bender/test/generator_tests_SUITE.erl @@ -42,10 +42,62 @@ groups() -> -spec init_per_suite(config()) -> config(). init_per_suite(C) -> Apps = - genlib_app:start_application_with(scoper, [ - {storage, scoper_storage_logger} + genlib_app:start_application_with(epg_connector, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 30 + } + }} ]) ++ + genlib_app:start_application_with(progressor, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 100, + process_step_timeout => 30 + }}, + {namespaces, #{ + 'bender_sequence' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_sequence', + handler => {bender_sequence, #{}}, + schema => machinery_mg_schema_generic + } + } + } + }} + ]) ++ + genlib_app:start_application_with(scoper, [ + {storage, scoper_storage_logger} + ]) ++ genlib_app:start_application_with(bender, [ + {machinery_backend, hybrid}, {sequence, #{ path => <<"/v1/stateproc/bender_sequence">>, schema => machinery_mg_schema_generic, diff --git a/compose.yaml b/compose.yaml index 479771c..c25d9d6 100644 --- a/compose.yaml +++ b/compose.yaml @@ -11,6 +11,8 @@ services: - .:$PWD hostname: $SERVICE_NAME depends_on: + postgres: + condition: service_healthy machinegun: condition: service_healthy working_dir: $PWD @@ -27,3 +29,31 @@ services: interval: 10s timeout: 5s retries: 10 + + postgres: + image: postgres:15-bookworm + command: -c 'max_connections=200' + environment: + POSTGRES_DB: "progressor_db" + POSTGRES_USER: "progressor" + POSTGRES_PASSWORD: "progressor" + PGDATA: "/tmp/postgresql/data/pgdata" + volumes: + - progressor-data:/tmp/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U progressor -d progressor_db"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + deploy: + resources: + limits: + cpus: '1' + memory: 4G + +volumes: + progressor-data: diff --git a/config/sys.config b/config/sys.config index 8b92442..126e3b1 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,5 +1,11 @@ [ {bender, [ + %% Available options for 'machinery_backend' + %% machinegun | progressor | hybrid + %% + %% For 'progressor' and 'hybrid' backends ensure config + %% '{progressor, [ ... ]}' is set. + {machinery_backend, hybrid}, {services, #{ bender => #{path => <<"/v1/bender">>}, generator => #{path => <<"/v1/generator">>} @@ -89,6 +95,69 @@ ]} ]}, + {epg_connector, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 30 + } + }} + ]}, + + {progressor, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 100, + process_step_timeout => 30 + }}, + {namespaces, #{ + 'bender_generator' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_generator', + handler => {bender_generator, #{}}, + schema => machinery_mg_schema_generic + } + } + }, + 'bender_sequence' => #{ + processor => #{ + client => machinery_prg_backend, + options => #{ + namespace => 'bender_sequence', + handler => {bender_sequence, #{}}, + schema => machinery_mg_schema_generic + } + } + } + }} + ]}, + {os_mon, [ % for better compatibility with busybox coreutils {disksup_posix_only, true} diff --git a/rebar.config b/rebar.config index 6fdbc11..c5dc5b5 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,7 @@ {bender_proto, {git, "https://github.com/valitydev/bender-proto.git", {branch, "master"}}}, {erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, "master"}}}, {genlib, {git, "https://github.com/valitydev/genlib.git", {tag, "v1.1.0"}}}, - {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {tag, "v1.1.0"}}}, + {machinery, {git, "https://github.com/valitydev/machinery-erlang.git", {tag, "v1.1.1"}}}, {scoper, {git, "https://github.com/valitydev/scoper.git", {tag, "v1.1.0"}}}, {snowflake, {git, "https://github.com/valitydev/snowflake.git", {branch, "master"}}}, {woody, {git, "https://github.com/valitydev/woody_erlang.git", {tag, "v1.1.0"}}}, @@ -74,12 +74,10 @@ % for introspection on production {recon, "2.5.2"}, {logger_logstash_formatter, - {git, "https://github.com/valitydev/logger_logstash_formatter.git", {ref, "08a66a6"}}}, - {iosetopts, {git, "https://github.com/valitydev/iosetopts.git", {ref, "edb445c"}}} + {git, "https://github.com/valitydev/logger_logstash_formatter.git", {ref, "08a66a6"}}} ]}, {relx, [ {release, {bender, "1.0.0"}, [ - iosetopts, % tools for introspection {recon, load}, % debugger diff --git a/rebar.lock b/rebar.lock index 9ca1d02..ce29942 100644 --- a/rebar.lock +++ b/rebar.lock @@ -47,7 +47,7 @@ {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},3}, {<<"machinery">>, {git,"https://github.com/valitydev/machinery-erlang.git", - {ref,"0ca82988ec310aceab7686c078c2a20fa6209cde"}}, + {ref,"4ead222fe26bf5341585f69cd2311cdbf8f11c67"}}, 0}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, {<<"mg_proto">>, @@ -70,13 +70,14 @@ {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"a67d4ddbcc3ddc3471e903d6d7291ca8e194906c"}}, + {ref,"d261aaba4a5ea34b0074d7d21de6da1da3eee690"}}, 1}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.8">>},0}, {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.11">>},1}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2}, + {<<"recon">>,{pkg,<<"recon">>,<<"2.5.6">>},2}, {<<"scoper">>, {git,"https://github.com/valitydev/scoper.git", {ref,"0e7aa01e9632daa39727edd62d4656ee715b4569"}}, @@ -130,6 +131,7 @@ {<<"prometheus_httpd">>, <<"F616ED9B85B536B195D94104063025A91F904A4CFC20255363F49A197D96C896">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"ranch">>, <<"8C7A100A139FD57F17327B6413E4167AC559FBC04CA7448E9BE9057311597A1D">>}, + {<<"recon">>, <<"9052588E83BFEDFD9B72E1034532AEE2A5369D9D9343B61AEB7FBCE761010741">>}, {<<"ssl_verify_fun">>, <<"354C321CF377240C7B8716899E182CE4890C5938111A1296ADD3EC74CF1715DF">>}, {<<"tls_certificate_check">>, <<"C76C4C5D79EE79A2B11C84F910C825D6F024A78427C854F515748E9BD025E987">>}, {<<"unicode_util_compat">>, <<"BC84380C9AB48177092F43AC89E4DFA2C6D62B40B8BD132B1059ECC7232F9A78">>}]}, @@ -164,6 +166,7 @@ {<<"prometheus_httpd">>, <<"0BBE831452CFDF9588538EB2F570B26F30C348ADAE5E95A7D87F35A5910BCF92">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"ranch">>, <<"49FBCFD3682FAB1F5D109351B61257676DA1A2FDBE295904176D5E521A2DDFE5">>}, + {<<"recon">>, <<"96C6799792D735CC0F0FD0F86267E9D351E63339CBE03DF9D162010CEFC26BB0">>}, {<<"ssl_verify_fun">>, <<"FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8">>}, {<<"tls_certificate_check">>, <<"4083B4A298ADD534C96125337CB01161C358BB32DD870D5A893AAE685FD91D70">>}, {<<"unicode_util_compat">>, <<"25EEE6D67DF61960CF6A794239566599B09E17E668D3700247BC498638152521">>}]}