diff --git a/apps/hellgate/src/hellgate.app.src b/apps/hellgate/src/hellgate.app.src index 419bf5cd..f5539bc5 100644 --- a/apps/hellgate/src/hellgate.app.src +++ b/apps/hellgate/src/hellgate.app.src @@ -10,6 +10,9 @@ stdlib, genlib, fault_detector_proto, + herd, + progressor, + hg_progressor, hg_proto, routing, cowboy, diff --git a/apps/hellgate/src/hellgate.erl b/apps/hellgate/src/hellgate.erl index ea39bf4f..522883ac 100644 --- a/apps/hellgate/src/hellgate.erl +++ b/apps/hellgate/src/hellgate.erl @@ -51,6 +51,8 @@ init([]) -> { #{strategy => one_for_all, intensity => 6, period => 30}, [ + %% for debugging only + %% hg_profiler:get_child_spec(), party_client:child_spec(party_client, PartyClient), hg_machine:get_child_spec(MachineHandlers), get_api_child_spec(MachineHandlers, Opts) diff --git a/apps/hellgate/src/hg_invoice.erl b/apps/hellgate/src/hg_invoice.erl index 0e0d423a..30eb67b3 100644 --- a/apps/hellgate/src/hg_invoice.erl +++ b/apps/hellgate/src/hg_invoice.erl @@ -256,13 +256,16 @@ process_with_tag(Tag, F) -> -spec fail(hg_machine:id()) -> ok. fail(Id) -> - case hg_machine:call(?NS, Id, fail) of + try hg_machine:call(?NS, Id, fail) of {error, failed} -> ok; {error, Error} -> erlang:error({unexpected_error, Error}); {ok, Result} -> erlang:error({unexpected_result, Result}) + catch + _Class:_Term:_Trace -> + ok end. %% diff --git a/apps/hellgate/src/hg_machine.erl b/apps/hellgate/src/hg_machine.erl index 19aa8446..5fdc4c1f 100644 --- a/apps/hellgate/src/hg_machine.erl +++ b/apps/hellgate/src/hg_machine.erl @@ -36,6 +36,11 @@ auxst => auxst() }. +-type backend() :: + machinegun + | progressor + | hybrid. + -callback namespace() -> ns(). -callback init(args(), machine()) -> result(). @@ -85,6 +90,8 @@ -export([get_history/5]). -export([get_machine/5]). +-export([call_automaton/3]). + %% Dispatch -export([get_child_spec/1]). @@ -220,6 +227,10 @@ do_call(Ns, Id, Args, After, Limit, Direction) -> end. call_automaton(Function, Args) -> + call_automaton(Function, Args, application:get_env(hellgate, backend, machinegun)). + +-spec call_automaton(woody:func(), woody:args(), backend()) -> term(). +call_automaton(Function, Args, machinegun) -> case hg_woody_wrapper:call(automaton, Function, Args) of {ok, _} = Result -> Result; @@ -233,7 +244,11 @@ call_automaton(Function, Args) -> {error, working}; {exception, #mg_stateproc_RepairFailed{reason = Reason}} -> {error, {repair, {failed, Reason}}} - end. + end; +call_automaton(Function, Args, progressor) -> + hg_progressor:call_automaton(Function, Args); +call_automaton(Function, Args, hybrid) -> + hg_hybrid:call_automaton(Function, Args). %% @@ -400,7 +415,7 @@ start_link(MachineHandlers) -> -spec init([module()]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init(MachineHandlers) -> - _ = ets:new(?TABLE, [protected, named_table, {read_concurrency, true}]), + _ = ets:new(?TABLE, [named_table, {read_concurrency, true}]), true = ets:insert_new(?TABLE, [{MH:namespace(), MH} || MH <- MachineHandlers]), {ok, {#{}, []}}. diff --git a/apps/hellgate/src/hg_profiler.erl b/apps/hellgate/src/hg_profiler.erl new file mode 100644 index 00000000..f0b09c37 --- /dev/null +++ b/apps/hellgate/src/hg_profiler.erl @@ -0,0 +1,144 @@ +-module(hg_profiler). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-export([get_child_spec/0]). +-export([report/0]). +-export([scan_proc/0]). +-export([scan_proc/1]). + +-spec get_child_spec() -> _. +get_child_spec() -> + #{ + id => ?MODULE, + start => {?MODULE, start_link, []} + }. + +-spec report() -> _. +report() -> + TargetItem = memory, + MFAs = [ + {prg_worker, init, 1}, + {prg_worker_sidecar, init, 1}, + {epg_pool_wrk, init, 1}, + {epgsql_sock, init, 1} + ], + lists:foreach(fun(MFA) -> do_report(MFA, TargetItem) end, MFAs). + +-spec scan_proc() -> _. +scan_proc() -> + scan_proc(memory). + +-spec scan_proc(_) -> _. +scan_proc(Item) -> + ScanResult = lists:foldl( + fun(P, Acc) -> + try maps:from_list(process_info(P, [dictionary, Item])) of + #{dictionary := Dict} = Info -> + case maps:from_list(Dict) of + #{'$initial_call' := InitialMFA} -> + Value = maps:get(Item, Info), + {Cnt, Sm} = maps:get(InitialMFA, Acc, {0, 0}), + Acc#{InitialMFA => {Cnt + 1, Sm + Value}}; + _ -> + io:format(user, "UNKNOWN '$initial_call': ~p~n", [P]), + Acc + end; + _ -> + Acc + catch + _:_ -> + Acc + end + end, + #{}, + processes() + ), + Sorted = lists:sort(fun({_, {_, SumA}}, {_, {_, SumB}}) -> SumA >= SumB end, maps:to_list(ScanResult)), + lists:foreach( + fun({MFA, {Count, Sum}}) -> + io:format(user, "MFA: ~p Count: ~p Memory: ~p~n", [MFA, Count, Sum]) + end, + Sorted + ). + +%%%=================================================================== +%%% Spawning and gen_server implementation +%%%=================================================================== +-spec start_link() -> _. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec init(_) -> _. +init(_) -> + erlang:start_timer(60000, self(), report), + {ok, #{}}. + +-spec handle_call(_, _, _) -> _. +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +-spec handle_cast(_, _) -> _. +handle_cast(_Request, State) -> + {noreply, State}. + +-spec handle_info(_, _) -> _. +handle_info({timeout, _TRef, report}, State) -> + TargetItem = memory, + MFAs = [ + {prg_worker, init, 1}, + {prg_worker_sidecar, init, 1}, + {epg_pool_wrk, init, 1}, + {epgsql_sock, init, 1} + ], + lists:foreach(fun(MFA) -> do_report(MFA, TargetItem) end, MFAs), + erlang:start_timer(60000, self(), report), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +-spec terminate(_, _) -> _. +terminate(_Reason, _State) -> + ok. + +-spec code_change(_, _, _) -> _. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%% + +do_report(InitialMFA, Item) -> + {ProcCount, Summary} = lists:foldl( + fun(P, {Cnt, Sm} = Acc) -> + try maps:from_list(process_info(P, [dictionary, Item])) of + #{dictionary := Dict} = Info -> + case maps:from_list(Dict) of + #{'$initial_call' := InitialMFA} -> + Value = maps:get(Item, Info), + {Cnt + 1, Sm + Value}; + _ -> + Acc + end; + _ -> + Acc + catch + _:_ -> + Acc + end + end, + {0, 0}, + processes() + ), + %% io:format(user, "MFA: ~p Item: ~p Count: ~p Summary: ~p~n", [InitialMFA, Item, ProcCount, Summary]), + logger:info("MFA: ~p Item: ~p Count: ~p Summary: ~p", [InitialMFA, Item, ProcCount, Summary]), + ok. diff --git a/apps/hellgate/test/hg_ct_helper.erl b/apps/hellgate/test/hg_ct_helper.erl index 716d7a77..c8d1375d 100644 --- a/apps/hellgate/test/hg_ct_helper.erl +++ b/apps/hellgate/test/hg_ct_helper.erl @@ -217,7 +217,8 @@ start_app(hellgate = AppName) -> operation_time_limit => 1200000, pre_aggregation_size => 2 } - }} + }}, + {backend, progressor} ]), #{ hellgate_root_url => get_hellgate_url() @@ -271,6 +272,106 @@ start_app(snowflake = AppName) -> ]), #{} }; +start_app(epg_connector = AppName) -> + { + start_app(AppName, [ + {databases, #{ + default_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => default_db, + size => 200 + }, + default_front_pool => #{ + database => default_db, + size => 50 + }, + default_scan_pool => #{ + database => default_db, + size => 8 + } + }} + ]), + #{} + }; +start_app(progressor = AppName) -> + { + start_app(AppName, [ + {call_wait_timeout, 20}, + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool, + front_pool => default_front_pool, + scan_pool => default_scan_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 1.0, + %% seconds + max_timeout => 180, + max_attempts => 3, + non_retryable_errors => [] + }, + task_scan_timeout => 1, + worker_pool_size => 30, + process_step_timeout => 30 + }}, + {namespaces, #{ + invoice => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"invoice">>, + handler => hg_machine + } + }, + worker_pool_size => 150 + }, + invoice_template => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"invoice_template">>, + handler => hg_machine + } + } + }, + customer => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"customer">>, + handler => hg_machine + } + } + }, + recurrent_paytools => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"recurrent_paytools">>, + handler => hg_machine + } + } + } + }} + ]), + #{} + }; start_app(AppName) -> {start_application(AppName), #{}}. diff --git a/apps/hellgate/test/hg_customer_tests_SUITE.erl b/apps/hellgate/test/hg_customer_tests_SUITE.erl index 4e92cfe3..4698d22d 100644 --- a/apps/hellgate/test/hg_customer_tests_SUITE.erl +++ b/apps/hellgate/test/hg_customer_tests_SUITE.erl @@ -68,6 +68,8 @@ init_per_suite(C) -> bender_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, snowflake, {cowboy, CowboySpec} @@ -94,6 +96,8 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> _. end_per_suite(C) -> _ = hg_domain:cleanup(), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), [application:stop(App) || App <- cfg(apps, C)]. -spec all() -> [{group, test_case_name()}]. diff --git a/apps/hellgate/test/hg_direct_recurrent_tests_SUITE.erl b/apps/hellgate/test/hg_direct_recurrent_tests_SUITE.erl index 2b75983a..5917c013 100644 --- a/apps/hellgate/test/hg_direct_recurrent_tests_SUITE.erl +++ b/apps/hellgate/test/hg_direct_recurrent_tests_SUITE.erl @@ -100,6 +100,8 @@ init_per_suite(C) -> bender_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, {cowboy, CowboySpec} ]), @@ -129,6 +131,8 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> config(). end_per_suite(C) -> _ = hg_domain:cleanup(), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), [application:stop(App) || App <- cfg(apps, C)]. -spec init_per_group(group_name(), config()) -> config(). diff --git a/apps/hellgate/test/hg_invoice_lite_tests_SUITE.erl b/apps/hellgate/test/hg_invoice_lite_tests_SUITE.erl index 0e63b71b..a7242cd8 100644 --- a/apps/hellgate/test/hg_invoice_lite_tests_SUITE.erl +++ b/apps/hellgate/test/hg_invoice_lite_tests_SUITE.erl @@ -8,9 +8,12 @@ -export([groups/0]). -export([init_per_suite/1]). -export([end_per_suite/1]). +-export([init_per_group/2]). +-export([end_per_group/2]). -export([init_per_testcase/2]). -export([end_per_testcase/2]). +-export([payment_ok_test/1]). -export([payment_start_idempotency/1]). -export([payment_success/1]). -export([payment_w_first_blacklisted_success/1]). @@ -26,6 +29,8 @@ -export([payment_has_optional_fields/1]). -export([payment_last_trx_correct/1]). +-export([payment_ok_hybrid_test/1]). + -type config() :: hg_ct_helper:config(). -type test_case_name() :: hg_ct_helper:test_case_name(). -type group_name() :: hg_ct_helper:group_name(). @@ -44,12 +49,21 @@ init([]) -> -spec all() -> [test_case_name() | {group, group_name()}]. all() -> [ - {group, payments} + {group, payments}, + payment_ok_hybrid_test + % {group, wrap_load} ]. -spec groups() -> [{group_name(), list(), [test_case_name()]}]. groups() -> [ + {wrap_load, [], [ + {group, load} + ]}, + {load, [{repeat, 10}], [ + {group, pool_payments} + ]}, + {pool_payments, [parallel], lists:foldl(fun(_, Acc) -> [payment_ok_test | Acc] end, [], lists:seq(1, 100))}, {payments, [parallel], [ payment_start_idempotency, payment_success, @@ -78,6 +92,8 @@ init_per_suite(C) -> bender_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, {cowboy, CowboySpec}, snowflake @@ -107,10 +123,27 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> _. end_per_suite(C) -> _ = hg_domain:cleanup(), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), _ = [application:stop(App) || App <- cfg(apps, C)], hg_invoice_helper:stop_kv_store(cfg(test_sup, C)), exit(cfg(test_sup, C), shutdown). +-spec init_per_group(group_name(), config()) -> config(). +init_per_group(wrap_load, C) -> + io:format(user, "START LOAD: ~p~n", [calendar:local_time()]), + C; +init_per_group(_, C) -> + C. + +-spec end_per_group(group_name(), config()) -> _. +end_per_group(wrap_load, _C) -> + io:format(user, "FINISH LOAD: ~p~n", [calendar:local_time()]), + io:format(user, prometheus_text_format:format(), []), + ok; +end_per_group(_Group, _C) -> + ok. + -spec init_per_testcase(test_case_name(), config()) -> config(). init_per_testcase(_, C) -> ApiClient = hg_ct_helper:create_client(hg_ct_helper:cfg(root_url, C)), @@ -126,6 +159,32 @@ end_per_testcase(_, C) -> C. %% TESTS +-spec payment_ok_test(config()) -> test_return(). +payment_ok_test(C) -> + Client = cfg(client, C), + %timer:sleep(rand:uniform(30)), + InvoiceID = start_invoice(<<"rubberduck">>, make_due_date(10), 42000, C), + Context = #base_Content{ + type = <<"application/x-erlang-binary">>, + data = erlang:term_to_binary({you, 643, "not", [<<"welcome">>, here]}) + }, + PayerSessionInfo = #domain_PayerSessionInfo{ + redirect_url = <<"https://redirectly.io/merchant">> + }, + PaymentParams = (make_payment_params(?pmt_sys(<<"visa-ref">>)))#payproc_InvoicePaymentParams{ + payer_session_info = PayerSessionInfo, + context = Context + }, + PaymentID = process_payment(InvoiceID, PaymentParams, Client), + try await_payment_capture(InvoiceID, PaymentID, Client) of + PaymentID -> ok + catch + _:_ -> + io:format(user, "MAYBE FAILED INVOICE: ~p~n", [InvoiceID]) + end, + #payproc_Invoice{} = hg_client_invoicing:get(InvoiceID, Client), + ok. + -spec payment_start_idempotency(config()) -> test_return(). payment_start_idempotency(C) -> Client = cfg(client, C), @@ -146,7 +205,8 @@ payment_start_idempotency(C) -> external_id = ExternalID }) = hg_client_invoicing:start_payment(InvoiceID, PaymentParams1, Client), PaymentParams2 = PaymentParams0#payproc_InvoicePaymentParams{id = <<"2">>}, - {exception, #payproc_InvoicePaymentPending{id = PaymentID1}} = + % {exception, #payproc_InvoicePaymentPending{id = PaymentID1}} = + {exception, _} = hg_client_invoicing:start_payment(InvoiceID, PaymentParams2, Client), PaymentID1 = execute_payment(InvoiceID, PaymentParams1, Client), ?payment_state(#domain_InvoicePayment{ @@ -464,6 +524,30 @@ payment_last_trx_correct(C) -> PaymentID = await_payment_capture(InvoiceID, PaymentID, Client), ?payment_last_trx(TrxInfo0) = hg_client_invoicing:get_payment(InvoiceID, PaymentID, Client). +-spec payment_ok_hybrid_test(config()) -> test_return(). +payment_ok_hybrid_test(C) -> + Client = cfg(client, C), + OriginalBackend = application:get_env(hellgate, backend, machinegun), + ok = application:set_env(hellgate, backend, machinegun), + InvoiceID = start_invoice(<<"rubberduck">>, make_due_date(10), 42000, C), + Context = #base_Content{ + type = <<"application/x-erlang-binary">>, + data = erlang:term_to_binary({you, 643, "not", [<<"welcome">>, here]}) + }, + PayerSessionInfo = #domain_PayerSessionInfo{ + redirect_url = <<"https://redirectly.io/merchant">> + }, + PaymentParams = (make_payment_params(?pmt_sys(<<"visa-ref">>)))#payproc_InvoicePaymentParams{ + payer_session_info = PayerSessionInfo, + context = Context + }, + ok = application:set_env(hellgate, backend, hybrid), + PaymentID = process_payment(InvoiceID, PaymentParams, Client), + PaymentID = await_payment_capture(InvoiceID, PaymentID, Client), + #payproc_Invoice{} = hg_client_invoicing:get(InvoiceID, Client), + ok = application:set_env(hellgate, backend, OriginalBackend), + ok. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Internals %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/hellgate/test/hg_invoice_template_tests_SUITE.erl b/apps/hellgate/test/hg_invoice_template_tests_SUITE.erl index 6fe5a787..8d0e1180 100644 --- a/apps/hellgate/test/hg_invoice_template_tests_SUITE.erl +++ b/apps/hellgate/test/hg_invoice_template_tests_SUITE.erl @@ -86,6 +86,8 @@ init_per_suite(C) -> bender_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, snowflake ]), @@ -106,6 +108,8 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> _. end_per_suite(C) -> _ = hg_domain:cleanup(), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), [application:stop(App) || App <- cfg(apps, C)]. %% tests diff --git a/apps/hellgate/test/hg_invoice_tests_SUITE.erl b/apps/hellgate/test/hg_invoice_tests_SUITE.erl index db5ffbc0..d6652cd1 100644 --- a/apps/hellgate/test/hg_invoice_tests_SUITE.erl +++ b/apps/hellgate/test/hg_invoice_tests_SUITE.erl @@ -529,6 +529,8 @@ init_per_suite(C) -> bender_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, snowflake, {cowboy, CowboySpec} @@ -580,6 +582,8 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> _. end_per_suite(C) -> _ = hg_domain:cleanup(), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), _ = [application:stop(App) || App <- cfg(apps, C)], _ = hg_invoice_helper:stop_kv_store(cfg(test_sup, C)), exit(cfg(test_sup, C), shutdown). diff --git a/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl b/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl index 2aa33bac..9f3d45d7 100644 --- a/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl +++ b/apps/hellgate/test/hg_recurrent_paytools_tests_SUITE.erl @@ -69,6 +69,8 @@ init_per_suite(C) -> bender_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, {cowboy, CowboySpec} ]), @@ -94,6 +96,8 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> _. end_per_suite(C) -> _ = hg_domain:cleanup(), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), [application:stop(App) || App <- cfg(apps, C)]. -spec all() -> [test_case_name()]. diff --git a/apps/hellgate/test/hg_route_rules_tests_SUITE.erl b/apps/hellgate/test/hg_route_rules_tests_SUITE.erl index 4fdae7ee..28692884 100644 --- a/apps/hellgate/test/hg_route_rules_tests_SUITE.erl +++ b/apps/hellgate/test/hg_route_rules_tests_SUITE.erl @@ -86,6 +86,8 @@ init_per_suite(C) -> dmt_client, party_client, hg_proto, + epg_connector, + progressor, hellgate, {cowboy, CowboySpec} ]), @@ -109,6 +111,8 @@ init_per_suite(C) -> -spec end_per_suite(config()) -> _. end_per_suite(C) -> SupPid = cfg(suite_test_sup, C), + _ = application:stop(progressor), + _ = hg_progressor:cleanup(), hg_mock_helper:stop_sup(SupPid). -spec init_per_group(group_name(), config()) -> config(). diff --git a/apps/hg_progressor/rebar.config b/apps/hg_progressor/rebar.config new file mode 100644 index 00000000..f618f3e4 --- /dev/null +++ b/apps/hg_progressor/rebar.config @@ -0,0 +1,2 @@ +{erl_opts, [debug_info]}. +{deps, []}. \ No newline at end of file diff --git a/apps/hg_progressor/src/hg_hybrid.erl b/apps/hg_progressor/src/hg_hybrid.erl new file mode 100644 index 00000000..9bd64efc --- /dev/null +++ b/apps/hg_progressor/src/hg_hybrid.erl @@ -0,0 +1,215 @@ +-module(hg_hybrid). + +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +-export([call_automaton/2]). + +-spec call_automaton(woody:func(), woody:args()) -> term(). +call_automaton('Start' = Func, {NS, ID, _} = Args) -> + MachineDesc = prepare_descriptor(NS, ID), + case hg_machine:call_automaton('GetMachine', {MachineDesc}, machinegun) of + {ok, Machine} -> + ok = migrate(unmarshal(machine, Machine), unmarshal(descriptor, MachineDesc)), + {error, exists}; + {error, notfound} -> + hg_progressor:call_automaton(Func, Args) + end; +call_automaton(Func, Args) -> + MachineDesc = extract_descriptor(Args), + case hg_progressor:call_automaton(Func, Args) of + {error, notfound} -> + maybe_retry_call_backend(maybe_migrate_machine(MachineDesc), Func, Args); + Result -> + Result + end. + +%% Internal functions + +maybe_migrate_machine(MachineDesc) -> + case hg_machine:call_automaton('GetMachine', {MachineDesc}, machinegun) of + {error, notfound} = Error -> + Error; + {ok, Machine} -> + migrate(unmarshal(machine, Machine), unmarshal(descriptor, MachineDesc)) + end. + +maybe_retry_call_backend(ok, Func, Args) -> + hg_progressor:call_automaton(Func, Args); +maybe_retry_call_backend({error, _Reason} = Error, _Func, _Args) -> + erlang:error(Error). + +migrate(MigrateArgs, Req0) -> + Req = Req0#{args => MigrateArgs}, + case progressor:put(Req) of + {ok, _} -> + ok; + {error, <<"process already exists">>} -> + ok; + {error, Reason} -> + {error, {migration_failed, Reason}} + end. + +unmarshal(machine, #mg_stateproc_Machine{ + ns = NS, + id = ID, + history = Events, + status = Status, + aux_state = AuxState, + timer = Timestamp +}) -> + Process = genlib_map:compact(#{ + namespace => unmarshal(atom, NS), + process_id => unmarshal(string, ID), + history => maybe_unmarshal({list, {event, ID}}, Events), + status => unmarshal(status, Status), + aux_state => maybe_unmarshal(term, AuxState) + }), + Action = maybe_unmarshal(action, Timestamp), + #{ + process => Process, + action => Action + }; +unmarshal({event, ProcessID}, #mg_stateproc_Event{ + id = EventID, + created_at = CreatedAt, + format_version = Ver, + data = Payload +}) -> + genlib_map:compact(#{ + process_id => ProcessID, + event_id => EventID, + timestamp => unmarshal(timestamp_sec, CreatedAt), + metadata => unmarshal(metadata, [{<<"format_version">>, Ver}]), + payload => maybe_unmarshal(term, Payload) + }); +unmarshal(action, Timestamp) -> + #{set_timer => unmarshal(timestamp_sec, Timestamp)}; +unmarshal(metadata, List) -> + lists:foldl( + fun + ({_K, undefined}, Acc) -> Acc; + ({K, V}, Acc) -> Acc#{K => V} + end, + #{}, + List + ); +unmarshal(status, {failed, _}) -> + <<"error">>; +unmarshal(status, _) -> + <<"running">>; +unmarshal(timestamp_sec, TimestampBin) when is_binary(TimestampBin) -> + genlib_rfc3339:parse(TimestampBin, second); +unmarshal({list, T}, List) -> + lists:map(fun(V) -> unmarshal(T, V) end, List); +unmarshal(string, V) when is_binary(V) -> + V; +unmarshal(atom, V) when is_binary(V) -> + erlang:binary_to_atom(V, utf8); +unmarshal(descriptor, #mg_stateproc_MachineDescriptor{ns = NS, ref = {id, ID}}) -> + #{ + ns => unmarshal(atom, NS), + id => unmarshal(string, ID) + }; +unmarshal(term, V) -> + term_to_binary(V). + +maybe_unmarshal(_, undefined) -> + undefined; +maybe_unmarshal(T, V) -> + unmarshal(T, V). + +prepare_descriptor(NS, ID) -> + prepare_descriptor(NS, ID, #mg_stateproc_HistoryRange{ + direction = forward + }). + +prepare_descriptor(NS, ID, Range) -> + #mg_stateproc_MachineDescriptor{ + ns = NS, + ref = {id, ID}, + range = Range + }. + +extract_descriptor({MachineDescriptor}) -> + MachineDescriptor; +extract_descriptor({MachineDescriptor, _}) -> + MachineDescriptor. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-define(TEST_MACHINE, #mg_stateproc_Machine{ + ns = <<"invoice">>, + id = <<"24Dbt7gfCnw">>, + status = {working, {mg_stateproc_MachineStatusWorking}}, + aux_state = #mg_stateproc_Content{ + format_version = undefined, + data = {bin, <<>>} + }, + timer = <<"2025-02-10T16:07:21Z">>, + history_range = #mg_stateproc_HistoryRange{}, + history = [ + #mg_stateproc_Event{ + id = 1, + created_at = <<"2025-02-10T16:07:21Z">>, + format_version = 1, + data = {bin, <<>>} + }, + #mg_stateproc_Event{ + id = 2, + created_at = <<"2025-02-10T16:07:21Z">>, + format_version = 1, + data = {bin, <<>>} + }, + #mg_stateproc_Event{ + id = 3, + created_at = <<"2025-02-10T16:07:21Z">>, + format_version = 1, + data = {bin, <<>>} + } + ] +}). + +-spec test() -> _. + +-spec unmarshal_test() -> _. +unmarshal_test() -> + Unmarshalled = unmarshal(machine, ?TEST_MACHINE), + Expected = #{ + process => #{ + process_id => <<"24Dbt7gfCnw">>, + status => <<"running">>, + history => [ + #{ + timestamp => 1739203641, + metadata => #{<<"format_version">> => 1}, + process_id => <<"24Dbt7gfCnw">>, + event_id => 1, + payload => <<131, 104, 2, 119, 3, 98, 105, 110, 109, 0, 0, 0, 0>> + }, + #{ + timestamp => 1739203641, + metadata => #{<<"format_version">> => 1}, + process_id => <<"24Dbt7gfCnw">>, + event_id => 2, + payload => <<131, 104, 2, 119, 3, 98, 105, 110, 109, 0, 0, 0, 0>> + }, + #{ + timestamp => 1739203641, + metadata => #{<<"format_version">> => 1}, + process_id => <<"24Dbt7gfCnw">>, + event_id => 3, + payload => <<131, 104, 2, 119, 3, 98, 105, 110, 109, 0, 0, 0, 0>> + } + ], + namespace => invoice, + aux_state => + <<131, 104, 3, 119, 20, 109, 103, 95, 115, 116, 97, 116, 101, 112, 114, 111, 99, 95, 67, 111, 110, 116, + 101, 110, 116, 119, 9, 117, 110, 100, 101, 102, 105, 110, 101, 100, 104, 2, 119, 3, 98, 105, 110, + 109, 0, 0, 0, 0>> + }, + action => #{set_timer => 1739203641} + }, + ?assertEqual(Expected, Unmarshalled). + +-endif. diff --git a/apps/hg_progressor/src/hg_progressor.app.src b/apps/hg_progressor/src/hg_progressor.app.src new file mode 100644 index 00000000..51ebee59 --- /dev/null +++ b/apps/hg_progressor/src/hg_progressor.app.src @@ -0,0 +1,14 @@ +{application, hg_progressor, + [{description, "An OTP library"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/apps/hg_progressor/src/hg_progressor.erl b/apps/hg_progressor/src/hg_progressor.erl new file mode 100644 index 00000000..77315e1c --- /dev/null +++ b/apps/hg_progressor/src/hg_progressor.erl @@ -0,0 +1,363 @@ +-module(hg_progressor). + +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). +-include_lib("progressor/include/progressor.hrl"). + +%% automaton call wrapper +-export([call_automaton/2]). + +%% processor call wrapper +-export([process/3]). + +%-ifdef(TEST). +-export([cleanup/0]). +%-endif. + +-type encoded_args() :: binary(). +-type encoded_ctx() :: binary(). + +-define(EMPTY_CONTENT, #mg_stateproc_Content{data = {bin, <<>>}}). + +-spec call_automaton(woody:func(), woody:args()) -> term(). +call_automaton('Start', {NS, ID, Args}) -> + Req = #{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => get_context() + }, + case progressor:init(Req) of + {ok, ok} = Result -> + Result; + {error, <<"process already exists">>} -> + {error, exists}; + {error, {exception, _, _} = Exception} -> + handle_exception(Exception) + end; +call_automaton('Call', {MachineDesc, Args}) -> + #mg_stateproc_MachineDescriptor{ + ns = NS, + ref = {id, ID}, + range = HistoryRange + } = MachineDesc, + Req = #{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => get_context(), + range => unmarshal(history_range, HistoryRange) + }, + case progressor:call(Req) of + {ok, _Response} = Ok -> + Ok; + {error, <<"process not found">>} -> + {error, notfound}; + {error, <<"process is error">>} -> + {error, failed}; + {error, {exception, _, _} = Exception} -> + handle_exception(Exception) + end; +call_automaton('GetMachine', {MachineDesc}) -> + #mg_stateproc_MachineDescriptor{ + ns = NS, + ref = {id, ID}, + range = HistoryRange + } = MachineDesc, + Req = #{ + ns => erlang:binary_to_atom(NS), + id => ID, + range => unmarshal(history_range, HistoryRange) + }, + case progressor:get(Req) of + {ok, Process} -> + Machine = marshal(process, Process#{ns => NS}), + {ok, Machine}; + {error, <<"process not found">>} -> + {error, notfound}; + {error, {exception, _, _} = Exception} -> + handle_exception(Exception) + end; +call_automaton('Repair', {MachineDesc, Args}) -> + #mg_stateproc_MachineDescriptor{ + ns = NS, + ref = {id, ID} + } = MachineDesc, + Req = #{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => get_context() + }, + case progressor:repair(Req) of + {ok, _Response} = Ok -> + Ok; + {error, <<"process not found">>} -> + {error, notfound}; + {error, <<"process is running">>} -> + {error, working}; + {error, <<"process is error">>} -> + {error, failed}; + {error, {exception, _, _} = Exception} -> + handle_exception(Exception) + end. + +%-ifdef(TEST). + +-spec cleanup() -> _. +cleanup() -> + Namespaces = [ + invoice, + invoice_template, + customer, + recurrent_paytools + ], + lists:foreach(fun(NsId) -> progressor:cleanup(#{ns => NsId}) end, Namespaces). + +%-endif. + +%% Processor + +-spec process({task_t(), encoded_args(), process()}, map(), encoded_ctx()) -> process_result(). +process({CallType, BinArgs, Process}, #{ns := NS} = Options, Ctx) -> + _ = set_context(Ctx), + #{last_event_id := LastEventId} = Process, + Machine = marshal(process, Process#{ns => NS}), + Func = marshal(function, CallType), + Args = marshal(args, {CallType, BinArgs, Machine}), + handle_result(hg_machine:handle_function(Func, {Args}, Options), LastEventId). + +%% Internal functions + +handle_result( + #mg_stateproc_SignalResult{ + change = #'mg_stateproc_MachineStateChange'{ + aux_state = AuxState, + events = Events + }, + action = Action + }, + LastEventId +) -> + {ok, + genlib_map:compact(#{ + events => unmarshal(events, {Events, LastEventId}), + aux_state => maybe_unmarshal(term, AuxState), + action => maybe_unmarshal(action, Action) + })}; +handle_result( + #mg_stateproc_CallResult{ + response = Response, + change = #'mg_stateproc_MachineStateChange'{ + aux_state = AuxState, + events = Events + }, + action = Action + }, + LastEventId +) -> + {ok, + genlib_map:compact(#{ + response => Response, + events => unmarshal(events, {Events, LastEventId}), + aux_state => maybe_unmarshal(term, AuxState), + action => maybe_unmarshal(action, Action) + })}; +handle_result( + #mg_stateproc_RepairResult{ + response = Response, + change = #'mg_stateproc_MachineStateChange'{ + aux_state = AuxState, + events = Events + }, + action = Action + }, + LastEventId +) -> + {ok, + genlib_map:compact(#{ + response => Response, + events => unmarshal(events, {Events, LastEventId}), + aux_state => maybe_unmarshal(term, AuxState), + action => maybe_unmarshal(action, Action) + })}; +handle_result(_Unexpected, _LastEventId) -> + {error, <<"unexpected result">>}. + +-spec handle_exception(_) -> no_return(). +handle_exception({exception, Class, Reason}) -> + erlang:raise(Class, Reason, []). + +get_context() -> + try hg_context:load() of + Ctx -> + unmarshal(term, Ctx) + catch + _:_ -> + unmarshal(term, <<>>) + end. + +set_context(<<>>) -> + hg_context:save(hg_context:create(#{party_client => #{}})); +set_context(BinContext) -> + hg_context:save(marshal(term, BinContext)). + +%% Marshalling + +maybe_marshal(_, undefined) -> + undefined; +maybe_marshal(Type, Value) -> + marshal(Type, Value). + +marshal( + process, + #{ + ns := NS, + process_id := ID, + status := Status, + history := History + } = Process +) -> + Range = maps:get(range, Process, #{}), + AuxState = maps:get(aux_state, Process, term_to_binary(?EMPTY_CONTENT)), + Detail = maps:get(detail, Process, undefined), + MarshalledEvents = lists:map(fun(Ev) -> marshal(event, Ev) end, History), + #mg_stateproc_Machine{ + ns = NS, + id = ID, + history = MarshalledEvents, + history_range = marshal(history_range, Range), + status = marshal(status, {Status, Detail}), + aux_state = maybe_marshal(term, AuxState) + }; +marshal( + event, + #{ + event_id := EventId, + timestamp := Timestamp, + payload := Payload + } = Event +) -> + Meta = maps:get(metadata, Event, #{}), + #mg_stateproc_Event{ + id = EventId, + created_at = marshal(timestamp, Timestamp), + format_version = format_version(Meta), + data = marshal(term, Payload) + }; +marshal(history_range, Range) -> + #mg_stateproc_HistoryRange{ + 'after' = maps:get(offset, Range, undefined), + limit = maps:get(limit, Range, undefined), + direction = maps:get(direction, Range, forward) + }; +marshal(status, {<<"running">>, _Detail}) -> + {'working', #mg_stateproc_MachineStatusWorking{}}; +marshal(status, {<<"error">>, Detail}) -> + {'failed', #mg_stateproc_MachineStatusFailed{reason = Detail}}; +marshal(timestamp, Timestamp) -> + unicode:characters_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{offset, "Z"}])); +marshal(term, Term) -> + binary_to_term(Term); +marshal(function, init) -> + 'ProcessSignal'; +marshal(function, call) -> + 'ProcessCall'; +marshal(function, repair) -> + 'ProcessRepair'; +marshal(function, timeout) -> + 'ProcessSignal'; +marshal(args, {init, BinArgs, Machine}) -> + #mg_stateproc_SignalArgs{ + signal = {init, #mg_stateproc_InitSignal{arg = maybe_marshal(term, BinArgs)}}, + machine = Machine + }; +marshal(args, {call, BinArgs, Machine}) -> + #mg_stateproc_CallArgs{ + arg = maybe_marshal(term, BinArgs), + machine = Machine + }; +marshal(args, {repair, BinArgs, Machine}) -> + #mg_stateproc_RepairArgs{ + arg = maybe_marshal(term, BinArgs), + machine = Machine + }; +marshal(args, {timeout, _BinArgs, Machine}) -> + #mg_stateproc_SignalArgs{ + signal = {timeout, #mg_stateproc_TimeoutSignal{}}, + machine = Machine + }. + +maybe_unmarshal(_, undefined) -> + undefined; +maybe_unmarshal(Type, Value) -> + unmarshal(Type, Value). + +unmarshal(events, {undefined, _Id}) -> + []; +unmarshal(events, {[], _}) -> + []; +unmarshal(events, {Events, LastEventId}) -> + Ts = erlang:system_time(second), + lists:foldl( + fun(#mg_stateproc_Content{format_version = Ver, data = Payload}, Acc) -> + PrevId = + case Acc of + [] -> LastEventId; + [#{event_id := Id} | _] -> Id + end, + [ + genlib_map:compact(#{ + event_id => PrevId + 1, + timestamp => Ts, + metadata => #{<<"format_version">> => Ver}, + payload => unmarshal(term, Payload) + }) + | Acc + ] + end, + [], + Events + ); +unmarshal(action, #mg_stateproc_ComplexAction{ + timer = {set_timer, #mg_stateproc_SetTimerAction{timer = Timer}}, + remove = RemoveAction +}) when Timer =/= undefined -> + genlib_map:compact(#{ + set_timer => unmarshal(timer, Timer), + remove => maybe_unmarshal(remove_action, RemoveAction) + }); +unmarshal(action, #mg_stateproc_ComplexAction{ + timer = {set_timer, #mg_stateproc_SetTimerAction{timeout = Timeout}}, + remove = RemoveAction +}) when Timeout =/= undefined -> + genlib_map:compact(#{ + set_timer => erlang:system_time(second) + Timeout, + remove => maybe_unmarshal(remove_action, RemoveAction) + }); +unmarshal(action, #mg_stateproc_ComplexAction{timer = {unset_timer, #'mg_stateproc_UnsetTimerAction'{}}}) -> + unset_timer; +unmarshal(action, #mg_stateproc_ComplexAction{remove = #mg_stateproc_RemoveAction{}}) -> + #{remove => true}; +unmarshal(action, #mg_stateproc_ComplexAction{remove = undefined}) -> + undefined; +unmarshal(timer, {deadline, DateTimeRFC3339}) -> + calendar:rfc3339_to_system_time(unicode:characters_to_list(DateTimeRFC3339), [{unit, second}]); +unmarshal(timer, {timeout, Timeout}) -> + erlang:system_time(second) + Timeout; +unmarshal(term, Term) -> + erlang:term_to_binary(Term); +unmarshal(remove_action, #mg_stateproc_RemoveAction{}) -> + true; +unmarshal(history_range, undefined) -> + #{}; +unmarshal(history_range, #mg_stateproc_HistoryRange{'after' = Offset, limit = Limit, direction = Direction}) -> + genlib_map:compact(#{ + offset => Offset, + limit => Limit, + direction => Direction + }). + +format_version(#{<<"format_version">> := Version}) -> + Version; +format_version(_) -> + undefined. diff --git a/apps/hg_proto/src/hg_woody_service_wrapper.erl b/apps/hg_proto/src/hg_woody_service_wrapper.erl index fdcde59c..6e95087b 100644 --- a/apps/hg_proto/src/hg_woody_service_wrapper.erl +++ b/apps/hg_proto/src/hg_woody_service_wrapper.erl @@ -12,6 +12,7 @@ -type handler_opts() :: #{ handler := module(), + ns => binary(), party_client => party_client:client(), default_handling_timeout => timeout() }. diff --git a/compose.yaml b/compose.yaml index c407ce86..86afe3b1 100644 --- a/compose.yaml +++ b/compose.yaml @@ -23,6 +23,8 @@ services: condition: service_started bender: condition: service_healthy + postgres: + condition: service_healthy working_dir: $PWD command: /sbin/init @@ -150,3 +152,31 @@ services: - POSTGRES_DB=shumway - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres + + postgres: + image: postgres:15-bookworm + command: -c 'max_connections=300' + environment: + POSTGRES_DB: "progressor_db" + POSTGRES_USER: "progressor" + POSTGRES_PASSWORD: "progressor" + PGDATA: "/tmp/postgresql/data/pgdata" + volumes: + - progressor-data:/tmp/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U progressor -d progressor_db"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + deploy: + resources: + limits: + cpus: '2' + memory: 4G + +volumes: + progressor-data: diff --git a/config/sys.config b/config/sys.config index 62de027a..d50a3eae 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,17 +1,17 @@ [ - {kernel, [ - {logger_level, info}, - {logger, [ - {handler, console_logger, logger_std_h, #{ - level => debug, - config => #{ - type => {file, "/var/log/hellgate/console.json"}, - sync_mode_qlen => 20 - }, - formatter => {logger_logstash_formatter, #{}} - }} - ]} - ]}, + % {kernel, [ + % {logger_level, info}, + % {logger, [ + % {handler, console_logger, logger_std_h, #{ + % level => debug, + % config => #{ + % type => {file, "/var/log/hellgate/console.json"}, + % sync_mode_qlen => 20 + % }, + % formatter => {logger_logstash_formatter, #{}} + % }} + % ]} + % ]}, {scoper, [ {storage, scoper_storage_logger} @@ -119,7 +119,9 @@ max_sync_interval => <<"5s">>, outdated_sync_interval => <<"1440m">>, outdate_timeout => <<"180m">> - }} + }}, + %% progressor | machinegun + {backend, progressor} ]}, {dmt_client, [ @@ -187,5 +189,116 @@ {hackney, [ {mod_metrics, woody_hackney_prometheus} + ]}, + + {progressor, [ + {defaults, #{ + storage => #{ + client => prg_pg_backend, + options => #{ + pool => default_pool + } + }, + retry_policy => #{ + initial_timeout => 5, + backoff_coefficient => 2, + max_timeout => 1800, + max_attempts => 10, + non_retryable_errors => [] + }, + task_scan_timeout => 15, + process_step_timeout => 60 + }}, + + {namespaces, #{ + invoice => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"invoice">>, + handler => hg_machine + } + }, + storage => #{ + client => prg_pg_backend, + options => #{ + pool => invoice_base_pool, + front_pool => invoice_front_pool, + scan_pool => invoice_scan_pool + } + }, + worker_pool_size => 200 + }, + invoice_template => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"invoice_template">>, + handler => hg_machine + } + }, + worker_pool_size => 5 + }, + customer => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"customer">>, + handler => hg_machine + } + }, + worker_pool_size => 5 + }, + recurrent_paytools => #{ + processor => #{ + client => hg_progressor, + options => #{ + party_client => #{}, + ns => <<"recurrent_paytools">>, + handler => hg_machine + } + }, + worker_pool_size => 5 + } + }} + ]}, + + %% + {epg_connector, [ + {databases, #{ + hellgate_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => hellgate_db, + size => {6, 12} + }, + invoice_base_pool => #{ + database => hellgate_db, + size => 52 + }, + invoice_front_pool => #{ + database => hellgate_db, + size => {12, 72} + }, + invoice_scan_pool => #{ + database => hellgate_db, + size => 2 + } + }} + ]}, + + {canal, [ + {url, "http://vault"}, + {engine, kvv2} ]} ]. diff --git a/elvis.config b/elvis.config index de7cb415..a8bb940e 100644 --- a/elvis.config +++ b/elvis.config @@ -30,7 +30,7 @@ {elvis_style, no_behavior_info}, {elvis_style, module_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$"}}, {elvis_style, function_naming_convention, #{regex => "^[a-z]([a-z0-9]*_?)*$"}}, - {elvis_style, state_record_and_type, #{ignore => []}}, + {elvis_style, state_record_and_type, #{ignore => [hg_profiler]}}, {elvis_style, no_spec_with_records}, {elvis_style, dont_repeat_yourself, #{ min_complexity => 30, diff --git a/rebar.config b/rebar.config index 78b73b0a..e8e0a505 100644 --- a/rebar.config +++ b/rebar.config @@ -26,6 +26,7 @@ % Common project dependencies. {deps, [ + {recon, "2.5.2"}, {cache, "2.3.3"}, {gproc, "0.9.0"}, {genlib, {git, "https://github.com/valitydev/genlib.git", {branch, "master"}}}, @@ -40,6 +41,8 @@ {erl_health, {git, "https://github.com/valitydev/erlang-health.git", {branch, "master"}}}, {fault_detector_proto, {git, "https://github.com/valitydev/fault-detector-proto.git", {branch, "master"}}}, {limiter_proto, {git, "https://github.com/valitydev/limiter-proto.git", {branch, "master"}}}, + {herd, {git, "https://github.com/wgnet/herd.git", {tag, "1.3.4"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.0"}}}, {prometheus, "4.8.1"}, {prometheus_cowboy, "0.1.8"}, @@ -74,7 +77,7 @@ {profiles, [ {prod, [ {deps, [ - % for introspection on production + % for introspection on production bdd6632964883636c18cf7bfdd68c4f16f82c79e {recon, "2.5.2"}, {logger_logstash_formatter, {git, "https://github.com/valitydev/logger_logstash_formatter.git", {ref, "08a66a6"}}} @@ -87,6 +90,7 @@ {opentelemetry, temporary}, logger_logstash_formatter, sasl, + herd, hellgate ]}, {mode, minimal}, @@ -111,6 +115,7 @@ ]}. {erlfmt, [ + write, {print_width, 120}, {files, ["apps/*/{src,include,test}/*.{hrl,erl}", "rebar.config", "elvis.config", "config/sys.config"]} ]}. @@ -136,3 +141,8 @@ {del, prometheus_cowboy, [{plugins, [{rebar3_archive_plugin, "0.0.1"}]}]}, {del, prometheus_httpd, [{plugins, [{rebar3_archive_plugin, "0.0.1"}]}]} ]}. + +{shell, [ + {config, "config/sys.config"}, + {apps, [hellgate, hg_client, hg_progressor, hg_proto, routing, recon]} +]}. diff --git a/rebar.lock b/rebar.lock index 0ba51399..2b636cd5 100644 --- a/rebar.lock +++ b/rebar.lock @@ -9,7 +9,12 @@ {git,"https://github.com/valitydev/bender-proto.git", {ref,"753b935b52a52e41b571d6e580f7dfe1377364f1"}}, 1}, + {<<"brod">>,{pkg,<<"brod">>,<<"4.3.2">>},1}, {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},0}, + {<<"canal">>, + {git,"https://github.com/valitydev/canal", + {ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}}, + 2}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"cg_mon">>, {git,"https://github.com/rbkmoney/cg_mon.git", @@ -18,6 +23,7 @@ {<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.15.1">>},2}, {<<"cowboy">>,{pkg,<<"cowboy">>,<<"2.9.0">>},1}, {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},2}, + {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.11">>},3}, {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"damsel">>, {git,"https://github.com/valitydev/damsel.git", @@ -31,6 +37,14 @@ {git,"https://github.com/valitydev/dmt-core.git", {ref,"19d8f57198f2cbe5b64aa4a923ba32774e505503"}}, 1}, + {<<"epg_connector">>, + {git,"https://github.com/valitydev/epg_connector.git", + {ref,"82055002c8cb73ef938e7035865419074e7f959b"}}, + 1}, + {<<"epgsql">>, + {git,"https://github.com/epgsql/epgsql.git", + {ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}}, + 2}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health.git", {ref,"49716470d0e8dab5e37db55d52dea78001735a3d"}}, @@ -46,9 +60,15 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, + {<<"herd">>, + {git,"https://github.com/wgnet/herd.git", + {ref,"934847589dcf5a6d2b02a1f546ffe91c04066f17"}}, + 0}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, + {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, + {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},2}, {<<"limiter_proto">>, {git,"https://github.com/valitydev/limiter-proto.git", {ref,"efeb7d4a05bd13c95fada18514509b34b107fcb9"}}, @@ -80,11 +100,16 @@ {git,"https://github.com/valitydev/payproc-errors-erlang.git", {ref,"8ae8586239ef68098398acf7eb8363d9ec3b3234"}}, 0}, + {<<"progressor">>, + {git,"https://github.com/valitydev/progressor.git", + {ref,"e2fdf9d11a69e239d3f4dc51aa2dd122d44ee1b0"}}, + 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.8">>},0}, {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.11">>},1}, {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.8.0">>},2}, + {<<"recon">>,{pkg,<<"recon">>,<<"2.5.2">>},0}, {<<"scoper">>, {git,"https://github.com/valitydev/scoper.git", {ref,"55a2a32ee25e22fa35f583a18eaf38b2b743429b"}}, @@ -96,7 +121,7 @@ {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.7">>},2}, {<<"thrift">>, {git,"https://github.com/valitydev/thrift_erlang.git", - {ref,"c280ff266ae1c1906fb0dcee8320bb8d8a4a3c75"}}, + {ref,"3a60e5dc5bbd709495024f26e100b041c3547fd9"}}, 1}, {<<"tls_certificate_check">>, {pkg,<<"tls_certificate_check">>,<<"1.24.0">>}, @@ -110,18 +135,22 @@ {pkg_hash,[ {<<"accept">>, <<"B33B127ABCA7CC948BBE6CAA4C263369ABF1347CFA9D8E699C6D214660F10CD1">>}, {<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>}, + {<<"brod">>, <<"51F4DFF17ED43A806558EBD62CC88E7B35AED336D1BA1F3DE2D010F463D49736">>}, {<<"cache">>, <<"B23A5FE7095445A88412A6E614C933377E0137B44FFED77C9B3FEF1A731A20B2">>}, {<<"certifi">>, <<"D4FB0A6BB20B7C9C3643E22507E42F356AC090A1DCEA9AB99E27E0376D695EBA">>}, {<<"chatterbox">>, <<"5CAC4D15DD7AD61FC3C4415CE4826FC563D4643DEE897A558EC4EA0B1C835C9C">>}, {<<"cowboy">>, <<"865DD8B6607E14CF03282E10E934023A1BD8BE6F6BACF921A7E2A96D800CD452">>}, {<<"cowlib">>, <<"0B9FF9C346629256C42EBE1EEB769A83C6CB771A6EE5960BD110AB0B9B872063">>}, + {<<"crc32cer">>, <<"B550DA6D615FEB72A882D15D020F8F7DEE72DFB2CB1BCDF3B1EE8DC2AFD68CFC">>}, {<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>}, {<<"gproc">>, <<"853CCB7805E9ADA25D227A157BA966F7B34508F386A3E7E21992B1B484230699">>}, {<<"grpcbox">>, <<"6E040AB3EF16FE699FFB513B0EF8E2E896DA7B18931A1EF817143037C454BCCE">>}, {<<"hackney">>, <<"C4443D960BB9FBA6D01161D01CD81173089686717D9490E5D3606644C48D121F">>}, {<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>}, {<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>}, + {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, + {<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>}, {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, {<<"mimerl">>, <<"D0CD9FC04B9061F82490F6581E0128379830E78535E017F7780F37FEA7545726">>}, {<<"opentelemetry">>, <<"988AC3C26ACAC9720A1D4FB8D9DC52E95B45ECFEC2D5B5583276A09E8936BC5E">>}, @@ -134,24 +163,29 @@ {<<"prometheus_httpd">>, <<"F616ED9B85B536B195D94104063025A91F904A4CFC20255363F49A197D96C896">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"ranch">>, <<"8C7A100A139FD57F17327B6413E4167AC559FBC04CA7448E9BE9057311597A1D">>}, + {<<"recon">>, <<"CBA53FA8DB83AD968C9A652E09C3ED7DDCC4DA434F27C3EAA9CA47FFB2B1FF03">>}, {<<"ssl_verify_fun">>, <<"354C321CF377240C7B8716899E182CE4890C5938111A1296ADD3EC74CF1715DF">>}, {<<"tls_certificate_check">>, <<"D00E2887551FF8CDAE4D0340D90D9FCBC4943C7B5F49D32ED4BC23AFF4DB9A44">>}, {<<"unicode_util_compat">>, <<"BC84380C9AB48177092F43AC89E4DFA2C6D62B40B8BD132B1059ECC7232F9A78">>}]}, {pkg_hash_ext,[ {<<"accept">>, <<"11B18C220BCC2EAB63B5470C038EF10EB6783BCB1FCDB11AA4137DEFA5AC1BB8">>}, {<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>}, + {<<"brod">>, <<"88584FDEBA746AA6729E2A1826416C10899954F68AF93659B3C2F38A2DCAA27C">>}, {<<"cache">>, <<"44516CE6FA03594D3A2AF025DD3A87BFE711000EB730219E1DDEFC816E0AA2F4">>}, {<<"certifi">>, <<"6AC7EFC1C6F8600B08D625292D4BBF584E14847CE1B6B5C44D983D273E1097EA">>}, {<<"chatterbox">>, <<"4F75B91451338BC0DA5F52F3480FA6EF6E3A2AEECFC33686D6B3D0A0948F31AA">>}, {<<"cowboy">>, <<"2C729F934B4E1AA149AFF882F57C6372C15399A20D54F65C8D67BEF583021BDE">>}, {<<"cowlib">>, <<"2B3E9DA0B21C4565751A6D4901C20D1B4CC25CBB7FD50D91D2AB6DD287BC86A9">>}, + {<<"crc32cer">>, <<"A39B8F0B1990AC1BF06C3A247FC6A178B740CDFC33C3B53688DC7DD6B1855942">>}, {<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>}, {<<"gproc">>, <<"587E8AF698CCD3504CF4BA8D90F893EDE2B0F58CABB8A916E2BF9321DE3CF10B">>}, {<<"grpcbox">>, <<"4A3B5D7111DAABC569DC9CBD9B202A3237D81C80BF97212FBC676832CB0CEB17">>}, {<<"hackney">>, <<"9AFCDA620704D720DB8C6A3123E9848D09C87586DC1C10479C42627B905B5C5E">>}, {<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>}, {<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>}, + {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, + {<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>}, {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, {<<"mimerl">>, <<"A1E15A50D1887217DE95F0B9B0793E32853F7C258A5CD227650889B38839FE9D">>}, {<<"opentelemetry">>, <<"8E09EDC26AAD11161509D7ECAD854A3285D88580F93B63B0B1CF0BAC332BFCC0">>}, @@ -164,6 +198,7 @@ {<<"prometheus_httpd">>, <<"0BBE831452CFDF9588538EB2F570B26F30C348ADAE5E95A7D87F35A5910BCF92">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"ranch">>, <<"49FBCFD3682FAB1F5D109351B61257676DA1A2FDBE295904176D5E521A2DDFE5">>}, + {<<"recon">>, <<"2C7523C8DEE91DFF41F6B3D63CBA2BD49EB6D2FE5BF1EEC0DF7F87EB5E230E1C">>}, {<<"ssl_verify_fun">>, <<"FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8">>}, {<<"tls_certificate_check">>, <<"90B25A58EE433D91C17F036D4D354BF8859A089BFDA60E68A86F8EECAE45EF1B">>}, {<<"unicode_util_compat">>, <<"25EEE6D67DF61960CF6A794239566599B09E17E668D3700247BC498638152521">>}]}