Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions apps/bender/src/bender.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ init([]) ->
shutdown_timeout => get_shutdown_timeout(),
event_handler => EventHandlers,
handlers => get_handler_spec(),
additional_routes => get_routes(EventHandlers)
additional_routes => get_routes(EventHandlers, genlib_app:env(?MODULE, machinery_backend))
}
),
Flags = #{strategy => one_for_all, intensity => 6, period => 30},
Expand Down Expand Up @@ -101,8 +101,13 @@ get_handler_spec() ->
}}
].

-spec get_routes(woody:ev_handlers()) -> [woody_server_thrift_http_handler:route(_)].
get_routes(EventHandlers) ->
-spec get_routes(woody:ev_handlers(), machinegun | progressor | hybrid) -> [woody_server_thrift_http_handler:route(_)].
get_routes(_EventHandlers, progressor) ->
%% Shared routes
Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})),
[erl_health_handle:get_route(Check), get_prometheus_route()];
get_routes(EventHandlers, Mode) when Mode == machinegun orelse Mode == hybrid ->
%% Machinegun specific routes
RouteOptsEnv = genlib_app:env(?MODULE, route_opts, #{}),
RouteOpts = RouteOptsEnv#{event_handler => EventHandlers},
Generator = genlib_app:env(bender, generator, #{}),
Expand All @@ -121,8 +126,7 @@ get_routes(EventHandlers) ->
}
}}
],
Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})),
[erl_health_handle:get_route(Check), get_prometheus_route() | machinery_mg_backend:get_routes(Handlers, RouteOpts)].
get_routes(EventHandlers, progressor) ++ machinery_mg_backend:get_routes(Handlers, RouteOpts).

-spec enable_health_logging(erl_health:check()) -> erl_health:check().
enable_health_logging(Check) ->
Expand Down
37 changes: 36 additions & 1 deletion apps/bender/src/bender_generator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ bind(ExternalID, Schema, UserCtx, WoodyCtx) ->
ok ->
{ok, InternalID, undefined};
{error, exists} ->
get_internal_id(ExternalID, WoodyCtx)
get_internal_id_with_retry(ExternalID, WoodyCtx)
end.

-spec get_internal_id(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return().
Expand All @@ -64,6 +64,20 @@ get_internal_id(ExternalID, WoodyCtx) ->
throw({not_found, ExternalID})
end.

-spec get_internal_id_with_retry(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return().
get_internal_id_with_retry(ExternalID, WoodyCtx) ->
with_retry(get_retry_strategy(), fun() ->
try
{done, get_internal_id(ExternalID, WoodyCtx)}
catch
%% NOTE Underlying machinery backend can experience race
%% condition on machine writes. Thus it MAY occur that
%% 'aux_state' is undefined on machine read.
error:({badmatch, undefined}):_Stacktrace ->
retry
end
end).

%%% Machinery callbacks

-spec init(args({internal_id(), user_context()}), machine(), handler_args(), handler_opts()) -> result(state()).
Expand Down Expand Up @@ -93,6 +107,27 @@ process_notification(_Args, _Machine, _HandlerArgs, _HandlerOpts) ->

%%% Internal functions

-spec get_retry_strategy() -> genlib_retry:strategy().
get_retry_strategy() ->
Opts = genlib_app:env(bender, generator),
DefaultPolicy = genlib_retry:exponential(5, 2, {jitter, 200, 100}),
genlib_retry:new_strategy(maps:get(retry_policy, Opts, DefaultPolicy)).

-spec with_retry(genlib_retry:strategy(), fun(() -> {done, T} | retry)) -> T | no_return().
with_retry(Strategy, Fun) ->
case Fun() of
{done, Result} ->
Result;
retry ->
case genlib_retry:next_step(Strategy) of
{wait, Timeout, NextStrategy} ->
_ = timer:sleep(Timeout),
with_retry(NextStrategy, Fun);
finish ->
erlang:error(retries_exhausted)
end
end.

-spec start(external_id(), internal_id(), user_context(), woody_context()) -> ok | {error, exists}.
start(ExternalID, InternalID, UserCtx, WoodyCtx) ->
machinery:start(?NS, ExternalID, {InternalID, UserCtx}, get_backend(WoodyCtx)).
Expand Down
26 changes: 24 additions & 2 deletions apps/bender/src/bender_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,37 @@ unique_id() ->
<<ID:64>> = snowflake:new(),
genlib_format:format_int_base(ID, 62).

-spec get_backend(atom(), woody_context()) -> machinery_mg_backend:backend().
-spec get_backend(generator | sequence, woody_context()) -> machinery_mg_backend:backend().
get_backend(Service, WoodyCtx) ->
get_backend(genlib_app:env(bender, machinery_backend), Service, WoodyCtx).

%%% Internal functions

get_backend(hybrid, Service, WoodyCtx) ->
{machinery_hybrid_backend, #{
primary_backend => get_backend(progressor, Service, WoodyCtx),
fallback_backend => get_backend(machinegun, Service, WoodyCtx)
}};
get_backend(progressor, Service, WoodyCtx) ->
Automaton = genlib_app:env(bender, Service, #{}),
{Namespace, Handler} = get_machinery_namespace_handler(Service),
machinery_prg_backend:new(WoodyCtx, #{
namespace => Namespace,
handler => Handler,
schema => maps:get(schema, Automaton, machinery_mg_schema_generic)
});
get_backend(machinegun, Service, WoodyCtx) ->
Automaton = genlib_app:env(bender, Service, #{}),
machinery_mg_backend:new(WoodyCtx, #{
client => get_woody_client(Automaton),
schema => maps:get(schema, Automaton, machinery_mg_schema_generic)
}).

%%% Internal functions
-spec get_machinery_namespace_handler(generator | sequence) -> {machinery:namespace(), machinery:logic_handler(_)}.
get_machinery_namespace_handler(generator) ->
{'bender_generator', {bender_generator, #{}}};
get_machinery_namespace_handler(sequence) ->
{'bender_sequence', {bender_sequence, #{}}}.

-spec get_woody_client(automaton()) -> machinery_mg_client:woody_client().
get_woody_client(#{url := Url} = Automaton) ->
Expand Down
67 changes: 66 additions & 1 deletion apps/bender/test/bender_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,75 @@ groups() ->

-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
%% _ = dbg:tracer(),
%% _ = dbg:p(all, c),
%% _ = dbg:tpl({'progressor', 'get', '_'}, x),
EpgConnectorApps = genlib_app:start_application_with(epg_connector, [
{databases, #{
default_db => #{
host => "postgres",
port => 5432,
database => "progressor_db",
username => "progressor",
password => "progressor"
}
}},
{pools, #{
default_pool => #{
database => default_db,
size => 30
}
}}
]),
ProgressorApps = genlib_app:start_application_with(progressor, [
{call_wait_timeout, 20},
{defaults, #{
storage => #{
client => prg_pg_backend,
options => #{
pool => default_pool
}
},
retry_policy => #{
initial_timeout => 5,
backoff_coefficient => 1.0,
%% seconds
max_timeout => 180,
max_attempts => 3,
non_retryable_errors => []
},
task_scan_timeout => 1,
worker_pool_size => 1000,
process_step_timeout => 30
}},
{namespaces, #{
'bender_generator' => #{
processor => #{
client => machinery_prg_backend,
options => #{
namespace => 'bender_generator',
handler => {bender_generator, #{}},
schema => machinery_mg_schema_generic
}
}
},
'bender_sequence' => #{
processor => #{
client => machinery_prg_backend,
options => #{
namespace => 'bender_sequence',
handler => {bender_sequence, #{}},
schema => machinery_mg_schema_generic
}
}
}
}}
]),
ScoperApps = genlib_app:start_application_with(scoper, [
{storage, scoper_storage_logger}
]),
BenderApps = genlib_app:start_application_with(bender, [
{machinery_backend, hybrid},
{generator, #{
path => <<"/v1/stateproc/bender_generator">>,
schema => machinery_mg_schema_generic,
Expand All @@ -95,7 +160,7 @@ init_per_suite(C) ->
num_acceptors => 100
}}
]),
Apps = ScoperApps ++ BenderApps,
Apps = EpgConnectorApps ++ ProgressorApps ++ ScoperApps ++ BenderApps,
[{suite_apps, Apps} | C].

-spec end_per_suite(config()) -> ok.
Expand Down
56 changes: 54 additions & 2 deletions apps/bender/test/generator_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,62 @@ groups() ->
-spec init_per_suite(config()) -> config().
init_per_suite(C) ->
Apps =
genlib_app:start_application_with(scoper, [
{storage, scoper_storage_logger}
genlib_app:start_application_with(epg_connector, [
{databases, #{
default_db => #{
host => "postgres",
port => 5432,
database => "progressor_db",
username => "progressor",
password => "progressor"
}
}},
{pools, #{
default_pool => #{
database => default_db,
size => 30
}
}}
]) ++
genlib_app:start_application_with(progressor, [
{call_wait_timeout, 20},
{defaults, #{
storage => #{
client => prg_pg_backend,
options => #{
pool => default_pool
}
},
retry_policy => #{
initial_timeout => 5,
backoff_coefficient => 1.0,
%% seconds
max_timeout => 180,
max_attempts => 3,
non_retryable_errors => []
},
task_scan_timeout => 1,
worker_pool_size => 100,
process_step_timeout => 30
}},
{namespaces, #{
'bender_sequence' => #{
processor => #{
client => machinery_prg_backend,
options => #{
namespace => 'bender_sequence',
handler => {bender_sequence, #{}},
schema => machinery_mg_schema_generic
}
}
}
}}
]) ++
genlib_app:start_application_with(scoper, [
{storage, scoper_storage_logger}
]) ++
genlib_app:start_application_with(bender, [
{machinery_backend, hybrid},
{sequence, #{
path => <<"/v1/stateproc/bender_sequence">>,
schema => machinery_mg_schema_generic,
Expand Down
30 changes: 30 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ services:
- .:$PWD
hostname: $SERVICE_NAME
depends_on:
postgres:
condition: service_healthy
machinegun:
condition: service_healthy
working_dir: $PWD
Expand All @@ -27,3 +29,31 @@ services:
interval: 10s
timeout: 5s
retries: 10

postgres:
image: postgres:15-bookworm
command: -c 'max_connections=200'
environment:
POSTGRES_DB: "progressor_db"
POSTGRES_USER: "progressor"
POSTGRES_PASSWORD: "progressor"
PGDATA: "/tmp/postgresql/data/pgdata"
volumes:
- progressor-data:/tmp/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U progressor -d progressor_db"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
restart: unless-stopped
deploy:
resources:
limits:
cpus: '1'
memory: 4G

volumes:
progressor-data:
Loading
Loading