Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
2df45bb
TD-927: progressor integration
Jul 26, 2024
b512965
TD-927: fix deps
Aug 5, 2024
f4dba44
TD-927: fix deps
Aug 6, 2024
eef51e8
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Aug 13, 2024
14ba322
TD-934: bump progressor
Aug 13, 2024
896dadf
TD-934: bump CI
Aug 13, 2024
6d6abe9
TD-934: fix tests
Aug 13, 2024
4f6e860
TD-934: fix suites
Aug 13, 2024
42ee640
TD-934: bump progressor
Aug 17, 2024
64e76cc
TD-934: fix release
Aug 17, 2024
6e76b64
TD-927: fix tests
Sep 9, 2024
bd95389
TD-930: bump progressor
Sep 16, 2024
e40b1bc
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Sep 16, 2024
44e8dea
TD-927: bump progressor
Sep 18, 2024
c2213b6
TD-927: bump progressor
Sep 18, 2024
fc7e724
TD-927: bump progressor
Sep 18, 2024
7f78a1f
TD-927: bump progressor
Sep 24, 2024
3e0a855
TD-927: bump progressor
Sep 24, 2024
803185b
TD-927: bump progressor
Sep 25, 2024
62e7702
TD-927: bump progressor
Sep 29, 2024
dbfb562
TD-927: bump progressor
Oct 2, 2024
556f23e
TD-927: fix style
Oct 2, 2024
a133b42
XP-927: bump progressor from master
Oct 9, 2024
746c50b
EMP-19: bump progressor
Oct 28, 2024
5d72396
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Oct 28, 2024
9441a4c
EMP-19: bump progressor with dynamic pools
Nov 1, 2024
19aa027
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Nov 1, 2024
ddd1492
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Dec 4, 2024
8061520
TECH-62: bump progressor
Dec 4, 2024
113d41e
TECH-59: bump progressor
Dec 7, 2024
b10d25b
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Jan 8, 2025
9aa335e
TD-927: debug
Jan 10, 2025
2f840bf
TECH-26: bump progressor
Jan 14, 2025
502bf6e
TECH-26: bump CI
Jan 27, 2025
7416387
TECH-26: bump progressor
Jan 27, 2025
8d2b4b3
TECH-26: try short-lived workers
Jan 28, 2025
a307152
TECH-26: update progressor with fix
Jan 28, 2025
6908775
TECH-26: bump progressor
Jan 29, 2025
797e535
TECH-77: add hybrid backend mode
Feb 11, 2025
e7eee77
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Feb 11, 2025
9b86157
TECH-77: fix rebar.config
Feb 11, 2025
d26a753
TECH-77: remove iosetopts
Feb 11, 2025
6fc91d0
TECH-61: bump progressor
Mar 5, 2025
b6582e5
TECH-61: bump progressor
Mar 11, 2025
36c6048
TECH-26: bump progressor
Mar 26, 2025
a8f7007
TECH-156: bump progressor
Apr 20, 2025
42528b3
Merge branch 'master' into epic/TD-927-progressor-prototype
ttt161 Apr 20, 2025
75eebdf
TECH-156: bump progressor with fix
Apr 21, 2025
fbda1bc
TECH-156: bump progressor
Apr 28, 2025
e23c455
TECH-156: bump progressor
Apr 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/hellgate/src/hellgate.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
stdlib,
genlib,
fault_detector_proto,
herd,
progressor,
hg_progressor,
hg_proto,
routing,
cowboy,
Expand Down
2 changes: 2 additions & 0 deletions apps/hellgate/src/hellgate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ init([]) ->
{
#{strategy => one_for_all, intensity => 6, period => 30},
[
%% for debugging only
%% hg_profiler:get_child_spec(),
party_client:child_spec(party_client, PartyClient),
hg_machine:get_child_spec(MachineHandlers),
get_api_child_spec(MachineHandlers, Opts)
Expand Down
5 changes: 4 additions & 1 deletion apps/hellgate/src/hg_invoice.erl
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,16 @@ process_with_tag(Tag, F) ->

-spec fail(hg_machine:id()) -> ok.
fail(Id) ->
case hg_machine:call(?NS, Id, fail) of
try hg_machine:call(?NS, Id, fail) of
{error, failed} ->
ok;
{error, Error} ->
erlang:error({unexpected_error, Error});
{ok, Result} ->
erlang:error({unexpected_result, Result})
catch
_Class:_Term:_Trace ->
ok
end.

%%
Expand Down
19 changes: 17 additions & 2 deletions apps/hellgate/src/hg_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
auxst => auxst()
}.

-type backend() ::
machinegun
| progressor
| hybrid.

-callback namespace() -> ns().

-callback init(args(), machine()) -> result().
Expand Down Expand Up @@ -85,6 +90,8 @@
-export([get_history/5]).
-export([get_machine/5]).

-export([call_automaton/3]).

%% Dispatch

-export([get_child_spec/1]).
Expand Down Expand Up @@ -220,6 +227,10 @@ do_call(Ns, Id, Args, After, Limit, Direction) ->
end.

call_automaton(Function, Args) ->
call_automaton(Function, Args, application:get_env(hellgate, backend, machinegun)).

-spec call_automaton(woody:func(), woody:args(), backend()) -> term().
call_automaton(Function, Args, machinegun) ->
case hg_woody_wrapper:call(automaton, Function, Args) of
{ok, _} = Result ->
Result;
Expand All @@ -233,7 +244,11 @@ call_automaton(Function, Args) ->
{error, working};
{exception, #mg_stateproc_RepairFailed{reason = Reason}} ->
{error, {repair, {failed, Reason}}}
end.
end;
call_automaton(Function, Args, progressor) ->
hg_progressor:call_automaton(Function, Args);
call_automaton(Function, Args, hybrid) ->
hg_hybrid:call_automaton(Function, Args).

%%

Expand Down Expand Up @@ -400,7 +415,7 @@ start_link(MachineHandlers) ->

-spec init([module()]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init(MachineHandlers) ->
_ = ets:new(?TABLE, [protected, named_table, {read_concurrency, true}]),
_ = ets:new(?TABLE, [named_table, {read_concurrency, true}]),
true = ets:insert_new(?TABLE, [{MH:namespace(), MH} || MH <- MachineHandlers]),
{ok, {#{}, []}}.

Expand Down
144 changes: 144 additions & 0 deletions apps/hellgate/src/hg_profiler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
-module(hg_profiler).

-behaviour(gen_server).

-export([start_link/0]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-export([get_child_spec/0]).
-export([report/0]).
-export([scan_proc/0]).
-export([scan_proc/1]).

-spec get_child_spec() -> _.
get_child_spec() ->
#{
id => ?MODULE,
start => {?MODULE, start_link, []}
}.

-spec report() -> _.
report() ->
TargetItem = memory,
MFAs = [
{prg_worker, init, 1},
{prg_worker_sidecar, init, 1},
{epg_pool_wrk, init, 1},
{epgsql_sock, init, 1}
],
lists:foreach(fun(MFA) -> do_report(MFA, TargetItem) end, MFAs).

-spec scan_proc() -> _.
scan_proc() ->
scan_proc(memory).

-spec scan_proc(_) -> _.
scan_proc(Item) ->
ScanResult = lists:foldl(
fun(P, Acc) ->
try maps:from_list(process_info(P, [dictionary, Item])) of
#{dictionary := Dict} = Info ->
case maps:from_list(Dict) of
#{'$initial_call' := InitialMFA} ->
Value = maps:get(Item, Info),
{Cnt, Sm} = maps:get(InitialMFA, Acc, {0, 0}),
Acc#{InitialMFA => {Cnt + 1, Sm + Value}};
_ ->
io:format(user, "UNKNOWN '$initial_call': ~p~n", [P]),
Acc
end;
_ ->
Acc
catch
_:_ ->
Acc
end
end,
#{},
processes()
),
Sorted = lists:sort(fun({_, {_, SumA}}, {_, {_, SumB}}) -> SumA >= SumB end, maps:to_list(ScanResult)),
lists:foreach(
fun({MFA, {Count, Sum}}) ->
io:format(user, "MFA: ~p Count: ~p Memory: ~p~n", [MFA, Count, Sum])
end,
Sorted
).

%%%===================================================================
%%% Spawning and gen_server implementation
%%%===================================================================
-spec start_link() -> _.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec init(_) -> _.
init(_) ->
erlang:start_timer(60000, self(), report),
{ok, #{}}.

-spec handle_call(_, _, _) -> _.
handle_call(_Request, _From, State) ->
{reply, ok, State}.

-spec handle_cast(_, _) -> _.
handle_cast(_Request, State) ->
{noreply, State}.

-spec handle_info(_, _) -> _.
handle_info({timeout, _TRef, report}, State) ->
TargetItem = memory,
MFAs = [
{prg_worker, init, 1},
{prg_worker_sidecar, init, 1},
{epg_pool_wrk, init, 1},
{epgsql_sock, init, 1}
],
lists:foreach(fun(MFA) -> do_report(MFA, TargetItem) end, MFAs),
erlang:start_timer(60000, self(), report),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

-spec terminate(_, _) -> _.
terminate(_Reason, _State) ->
ok.

-spec code_change(_, _, _) -> _.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%

do_report(InitialMFA, Item) ->
{ProcCount, Summary} = lists:foldl(
fun(P, {Cnt, Sm} = Acc) ->
try maps:from_list(process_info(P, [dictionary, Item])) of
#{dictionary := Dict} = Info ->
case maps:from_list(Dict) of
#{'$initial_call' := InitialMFA} ->
Value = maps:get(Item, Info),
{Cnt + 1, Sm + Value};
_ ->
Acc
end;
_ ->
Acc
catch
_:_ ->
Acc
end
end,
{0, 0},
processes()
),
%% io:format(user, "MFA: ~p Item: ~p Count: ~p Summary: ~p~n", [InitialMFA, Item, ProcCount, Summary]),
logger:info("MFA: ~p Item: ~p Count: ~p Summary: ~p", [InitialMFA, Item, ProcCount, Summary]),
ok.
103 changes: 102 additions & 1 deletion apps/hellgate/test/hg_ct_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ start_app(hellgate = AppName) ->
operation_time_limit => 1200000,
pre_aggregation_size => 2
}
}}
}},
{backend, progressor}
]),
#{
hellgate_root_url => get_hellgate_url()
Expand Down Expand Up @@ -271,6 +272,106 @@ start_app(snowflake = AppName) ->
]),
#{}
};
start_app(epg_connector = AppName) ->
{
start_app(AppName, [
{databases, #{
default_db => #{
host => "postgres",
port => 5432,
database => "progressor_db",
username => "progressor",
password => "progressor"
}
}},
{pools, #{
default_pool => #{
database => default_db,
size => 200
},
default_front_pool => #{
database => default_db,
size => 50
},
default_scan_pool => #{
database => default_db,
size => 8
}
}}
]),
#{}
};
start_app(progressor = AppName) ->
{
start_app(AppName, [
{call_wait_timeout, 20},
{defaults, #{
storage => #{
client => prg_pg_backend,
options => #{
pool => default_pool,
front_pool => default_front_pool,
scan_pool => default_scan_pool
}
},
retry_policy => #{
initial_timeout => 5,
backoff_coefficient => 1.0,
%% seconds
max_timeout => 180,
max_attempts => 3,
non_retryable_errors => []
},
task_scan_timeout => 1,
worker_pool_size => 30,
process_step_timeout => 30
}},
{namespaces, #{
invoice => #{
processor => #{
client => hg_progressor,
options => #{
party_client => #{},
ns => <<"invoice">>,
handler => hg_machine
}
},
worker_pool_size => 150
},
invoice_template => #{
processor => #{
client => hg_progressor,
options => #{
party_client => #{},
ns => <<"invoice_template">>,
handler => hg_machine
}
}
},
customer => #{
processor => #{
client => hg_progressor,
options => #{
party_client => #{},
ns => <<"customer">>,
handler => hg_machine
}
}
},
recurrent_paytools => #{
processor => #{
client => hg_progressor,
options => #{
party_client => #{},
ns => <<"recurrent_paytools">>,
handler => hg_machine
}
}
}
}}
]),
#{}
};
start_app(AppName) ->
{start_application(AppName), #{}}.

Expand Down
4 changes: 4 additions & 0 deletions apps/hellgate/test/hg_customer_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ init_per_suite(C) ->
bender_client,
party_client,
hg_proto,
epg_connector,
progressor,
hellgate,
snowflake,
{cowboy, CowboySpec}
Expand All @@ -94,6 +96,8 @@ init_per_suite(C) ->
-spec end_per_suite(config()) -> _.
end_per_suite(C) ->
_ = hg_domain:cleanup(),
_ = application:stop(progressor),
_ = hg_progressor:cleanup(),
[application:stop(App) || App <- cfg(apps, C)].

-spec all() -> [{group, test_case_name()}].
Expand Down
4 changes: 4 additions & 0 deletions apps/hellgate/test/hg_direct_recurrent_tests_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ init_per_suite(C) ->
bender_client,
party_client,
hg_proto,
epg_connector,
progressor,
hellgate,
{cowboy, CowboySpec}
]),
Expand Down Expand Up @@ -129,6 +131,8 @@ init_per_suite(C) ->
-spec end_per_suite(config()) -> config().
end_per_suite(C) ->
_ = hg_domain:cleanup(),
_ = application:stop(progressor),
_ = hg_progressor:cleanup(),
[application:stop(App) || App <- cfg(apps, C)].

-spec init_per_group(group_name(), config()) -> config().
Expand Down
Loading
Loading