From 5a32d2ce8b76458e1e9f1219a70bd0c2587f0f0a Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 5 Mar 2025 02:23:41 +0300 Subject: [PATCH 01/10] TECH-61: progressor backend implementation (#25) Co-authored-by: ttt161 --- Dockerfile.neighbour | 46 ++++ Makefile | 18 +- apps/machinegun/src/machinegun.app.src | 2 + apps/mg_conf/src/mg_conf.erl | 3 +- apps/mg_core/src/mg_core_events_machine.erl | 3 +- apps/mg_cth/src/mg_cth.erl | 39 +++ apps/mg_cth_neighbour/README.md | 4 + apps/mg_cth_neighbour/rebar.config | 9 + .../src/mg_cth_nbr_processor.erl | 59 +++++ .../src/mg_cth_neighbour.app.src | 15 ++ .../src/mg_cth_neighbour_app.erl | 20 ++ .../src/mg_cth_neighbour_sup.erl | 39 +++ apps/mg_progressor/rebar.config | 2 + apps/mg_progressor/src/mg_progressor.app.src | 13 + apps/mg_progressor/src/mg_progressor.erl | 169 +++++++++++++ apps/mg_woody/src/mg_woody_automaton.erl | 52 +++- .../test/mg_woody_prg_engine_tests_SUITE.erl | 229 ++++++++++++++++++ compose.yaml | 40 +++ config/config.yaml | 43 ++++ config/neighbour.sys.config | 44 ++++ rebar.config | 13 +- rebar.lock | 23 +- rel_scripts/configurator.escript | 89 ++++++- 23 files changed, 956 insertions(+), 18 deletions(-) create mode 100644 Dockerfile.neighbour create mode 100644 apps/mg_cth_neighbour/README.md create mode 100644 apps/mg_cth_neighbour/rebar.config create mode 100644 apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl create mode 100644 apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src create mode 100644 apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl create mode 100644 apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl create mode 100644 apps/mg_progressor/rebar.config create mode 100644 apps/mg_progressor/src/mg_progressor.app.src create mode 100644 apps/mg_progressor/src/mg_progressor.erl create mode 100644 apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl create mode 100644 config/neighbour.sys.config diff --git a/Dockerfile.neighbour b/Dockerfile.neighbour new file mode 100644 index 0000000..aa7984d --- /dev/null +++ b/Dockerfile.neighbour @@ -0,0 +1,46 @@ +ARG OTP_VERSION + +# Build the release +FROM docker.io/library/erlang:${OTP_VERSION} AS builder +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +# Install thrift compiler +ARG THRIFT_VERSION +ARG TARGETARCH + +RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${TARGETARCH}.tar.gz" \ + | tar -xvz -C /usr/local/bin/ + +# Copy sources +RUN mkdir /build +COPY . /build/ + +# Build the release +WORKDIR /build +RUN rebar3 compile \ + && rebar3 as test_neighbour release + +# Make a runner image +FROM docker.io/library/erlang:${OTP_VERSION}-slim + +ARG SERVICE_NAME=mg_cth_neighbour +ARG USER_UID=1001 +ARG USER_GID=$USER_UID + +# Set env +ENV CHARSET=UTF-8 +ENV LANG=C.UTF-8 + +# Expose SERVICE_NAME as env so CMD expands properly on start +ENV SERVICE_NAME=${SERVICE_NAME} + +# Set runtime +WORKDIR /opt/${SERVICE_NAME} + +COPY --from=builder /build/_build/test_neighbour/rel/${SERVICE_NAME} /opt/${SERVICE_NAME} + +RUN echo "#!/bin/sh" >> /entrypoint.sh && \ + echo "exec /opt/${SERVICE_NAME}/bin/${SERVICE_NAME} foreground" >> /entrypoint.sh && \ + chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/Makefile b/Makefile index 9a150cd..422ed50 100644 --- a/Makefile +++ b/Makefile @@ -14,15 +14,19 @@ DOTENV := $(shell grep -v '^\#' .env) DEV_IMAGE_TAG = $(TEST_CONTAINER_NAME)-dev DEV_IMAGE_ID = $(file < .image.dev) +TEST_IMAGE_TAG = $(DIST_CONTAINER_NAME)-test +TEST_IMAGE_ID = $(file < .image.test) + DOCKER ?= docker DOCKERCOMPOSE ?= docker-compose DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f compose.yaml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner +DIST_CONTAINER_NAME ?= distrunner all: compile xref lint check-format dialyze eunit -.PHONY: dev-image clean-dev-image wc-shell test +.PHONY: dev-image test-image clean-dev-image clean-test-image wc-shell test dev-image: .image.dev @@ -36,6 +40,18 @@ ifneq ($(DEV_IMAGE_ID),) rm .image.dev endif +test-image: .image.test + +.image.test: Dockerfile.neighbour .env + env $(DOTENV) $(DOCKERCOMPOSE_W_ENV) build $(DIST_CONTAINER_NAME) + $(DOCKER) image ls -q -f "reference=$(TEST_IMAGE_ID)" | head -n1 > $@ + +clean-test-image: +ifneq ($(TEST_IMAGE_ID),) + $(DOCKER) image rm -f $(TEST_IMAGE_TAG) + rm .image.test +endif + DOCKER_WC_OPTIONS := -v $(PWD):$(PWD) --workdir $(PWD) DOCKER_WC_EXTRA_OPTIONS ?= --rm DOCKER_RUN = $(DOCKER) run -t $(DOCKER_WC_OPTIONS) $(DOCKER_WC_EXTRA_OPTIONS) diff --git a/apps/machinegun/src/machinegun.app.src b/apps/machinegun/src/machinegun.app.src index 8f236ff..c54365d 100644 --- a/apps/machinegun/src/machinegun.app.src +++ b/apps/machinegun/src/machinegun.app.src @@ -26,6 +26,8 @@ erl_health, prometheus, prometheus_cowboy, + progressor, + mg_progressor, mg_utils, mg_core, mg_riak, diff --git a/apps/mg_conf/src/mg_conf.erl b/apps/mg_conf/src/mg_conf.erl index e682869..2e2a0f4 100644 --- a/apps/mg_conf/src/mg_conf.erl +++ b/apps/mg_conf/src/mg_conf.erl @@ -99,7 +99,8 @@ events_machine_options(NS, NSs, Pulse) -> event_sinks => EventSinks, pulse => Pulse, default_processing_timeout => maps:get(default_processing_timeout, NSConfigs), - event_stash_size => maps:get(event_stash_size, NSConfigs, 0) + event_stash_size => maps:get(event_stash_size, NSConfigs, 0), + engine => maps:get(engine, NSConfigs, machinegun) }. -spec machine_options(mg_core:ns(), events_machines(), pulse()) -> mg_core_machine:options(). diff --git a/apps/mg_core/src/mg_core_events_machine.erl b/apps/mg_core/src/mg_core_events_machine.erl index a5a77ca..5cf75d7 100644 --- a/apps/mg_core/src/mg_core_events_machine.erl +++ b/apps/mg_core/src/mg_core_events_machine.erl @@ -127,7 +127,8 @@ pulse => mpulse:handler(), event_sinks => [mg_core_event_sink:handler()], default_processing_timeout => timeout(), - event_stash_size => non_neg_integer() + event_stash_size => non_neg_integer(), + engine => machinegun | progressor }. % like mg_core_storage:options() except `name` -type storage_options() :: mg_utils:mod_opts(map()). diff --git a/apps/mg_cth/src/mg_cth.erl b/apps/mg_cth/src/mg_cth.erl index 3e36a08..56aae2e 100644 --- a/apps/mg_cth/src/mg_cth.erl +++ b/apps/mg_cth/src/mg_cth.erl @@ -69,9 +69,48 @@ kafka_client_config(Brokers) -> ]} ]. +-spec epg_connector_config() -> _. +epg_connector_config() -> + [ + {databases, #{ + progressor_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => progressor_db, + size => 10 + } + }} + ]. + +-spec progressor_config() -> _. +progressor_config() -> + [ + {namespaces, #{ + binary_to_atom(?NS) => #{ + processor => #{client => null}, + storage => #{ + client => prg_pg_backend, + options => #{pool => default_pool} + }, + worker_pool_size => 0 + } + }} + ]. + -spec start_application(app()) -> _Deps :: [appname()]. start_application(brod) -> genlib_app:start_application_with(brod, kafka_client_config(?BROKERS)); +start_application(epg_connector) -> + genlib_app:start_application_with(epg_connector, epg_connector_config()); +start_application(progressor) -> + genlib_app:start_application_with(progressor, progressor_config()); start_application({AppName, Env}) -> genlib_app:start_application_with(AppName, Env); start_application(AppName) -> diff --git a/apps/mg_cth_neighbour/README.md b/apps/mg_cth_neighbour/README.md new file mode 100644 index 0000000..dc18cdb --- /dev/null +++ b/apps/mg_cth_neighbour/README.md @@ -0,0 +1,4 @@ +mg_cth_neighbour +===== + +Вспомогательное приложения для тестов, в релиз не включается. diff --git a/apps/mg_cth_neighbour/rebar.config b/apps/mg_cth_neighbour/rebar.config new file mode 100644 index 0000000..cf255cc --- /dev/null +++ b/apps/mg_cth_neighbour/rebar.config @@ -0,0 +1,9 @@ +{erl_opts, [debug_info]}. +{deps, [ + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.3"}}} +]}. + +{shell, [ + %% {config, "config/sys.config"}, + {apps, [mg_cth_neighbour]} +]}. diff --git a/apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl b/apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl new file mode 100644 index 0000000..154c2b1 --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl @@ -0,0 +1,59 @@ +-module(mg_cth_nbr_processor). + +-export([process/3]). + +-spec process({_TaskType, _Args, _Process}, _Opts, _Ctx) -> _. +process({init, _InitArgs, _Process}, _Opts, _Ctx) -> + Result = #{events => [event(1)]}, + {ok, Result}; +process( + %% <<"simple_call">> + {call, <<131, 109, 0, 0, 0, 11, 115, 105, 109, 112, 108, 101, 95, 99, 97, 108, 108>> = CallArgs, _Process}, + _Opts, + _Ctx +) -> + Result = #{ + response => CallArgs, + events => [] + }, + {ok, Result}; +process( + %% <<"fail_call">> + {call, <<131, 109, 0, 0, 0, 9, 102, 97, 105, 108, 95, 99, 97, 108, 108>> = _CallArgs, _Process}, + _Opts, + _Ctx +) -> + {error, do_not_retry}; +process( + %% simple repair + {timeout, _Args, _Process}, + _Opts, + _Ctx +) -> + Result = #{events => []}, + {ok, Result}; +% +process( + %% <<"repair_ok">> + {repair, <<131, 109, 0, 0, 0, 9, 114, 101, 112, 97, 105, 114, 95, 111, 107>> = CallArgs, _Process}, + _Opts, + _Ctx +) -> + Result = #{events => [], response => CallArgs}, + {ok, Result}; +process( + %% <<"repair_fail">> + {repair, <<131, 109, 0, 0, 0, 11, 114, 101, 112, 97, 105, 114, 95, 102, 97, 105, 108>> = _CallArgs, _Process}, + _Opts, + _Ctx +) -> + {error, unreach}. + +% +event(Id) -> + #{ + event_id => Id, + timestamp => erlang:system_time(second), + metadata => #{<<"format_version">> => 1}, + payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)}) + }. diff --git a/apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src b/apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src new file mode 100644 index 0000000..46f9326 --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src @@ -0,0 +1,15 @@ +{application, mg_cth_neighbour, [ + {description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {mg_cth_neighbour_app, []}}, + {applications, [ + kernel, + stdlib, + progressor + ]}, + {env, []}, + {modules, []}, + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl b/apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl new file mode 100644 index 0000000..117da68 --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl @@ -0,0 +1,20 @@ +%%%------------------------------------------------------------------- +%% @doc mg_cth_neighbour public API +%% @end +%%%------------------------------------------------------------------- + +-module(mg_cth_neighbour_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +-spec start(_, _) -> _. +start(_StartType, _StartArgs) -> + mg_cth_neighbour_sup:start_link(). + +-spec stop(_) -> _. +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl b/apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl new file mode 100644 index 0000000..a1618bd --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl @@ -0,0 +1,39 @@ +%%%------------------------------------------------------------------- +%% @doc mg_cth_neighbour top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(mg_cth_neighbour_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +-spec start_link() -> _. +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +-spec init(_) -> _. +init([]) -> + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/mg_progressor/rebar.config b/apps/mg_progressor/rebar.config new file mode 100644 index 0000000..2656fd5 --- /dev/null +++ b/apps/mg_progressor/rebar.config @@ -0,0 +1,2 @@ +{erl_opts, [debug_info]}. +{deps, []}. diff --git a/apps/mg_progressor/src/mg_progressor.app.src b/apps/mg_progressor/src/mg_progressor.app.src new file mode 100644 index 0000000..f325b32 --- /dev/null +++ b/apps/mg_progressor/src/mg_progressor.app.src @@ -0,0 +1,13 @@ +{application, mg_progressor, [ + {description, "An OTP library"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {env, []}, + {modules, []}, + {licenses, []}, + {links, []} + ]}. diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl new file mode 100644 index 0000000..37bc8cb --- /dev/null +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -0,0 +1,169 @@ +-module(mg_progressor). + +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +-export([handle_function/3]). + +-spec handle_function(woody:func(), woody:args(), _) -> + {ok, _Result} | no_return(). +handle_function('Start', {NS, IDIn, PackedArgs}, Context) -> + ID = mg_woody_packer:unpack(id, IDIn), + Args = mg_woody_packer:unpack(args, PackedArgs), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => maybe_unmarshal(term, Context) + }), + handle_result(progressor:init(Req)); +handle_function('Call', {MachineDesc, PackedArgs}, Context) -> + {NS, ID, {After, Limit, Direction}} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + Args = mg_woody_packer:unpack(args, PackedArgs), + Range = genlib_map:compact(#{ + limit => Limit, + offset => After, + direction => Direction + }), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => maybe_unmarshal(term, Context), + range => Range + }), + case progressor:call(Req) of + {ok, RawResponse} -> + %% TODO: CHECK THIS! + {ok, mg_woody_packer:pack(call_response, marshal(term, RawResponse))}; + Error -> + handle_error(Error) + end; +handle_function('SimpleRepair', {NS, RefIn}, Context) -> + ID = mg_woody_packer:unpack(ref, RefIn), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + context => maybe_unmarshal(term, Context) + }), + handle_result(progressor:simple_repair(Req)); +handle_function('Repair', {MachineDesc, PackedArgs}, Context) -> + {NS, ID, _} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + Args = mg_woody_packer:unpack(args, PackedArgs), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => maybe_unmarshal(term, Context) + }), + case progressor:repair(Req) of + {ok, RawResponse} -> + %% TODO: CHECK THIS! + {ok, mg_woody_packer:pack(repair_response, marshal(term, RawResponse))}; + {error, <<"process is running">>} = Error -> + handle_error(Error); + {error, <<"process not found">>} = Error -> + handle_error(Error); + {error, <<"namespace not found">>} = Error -> + handle_error(Error); + _Error -> + handle_error({error, <<"process is error">>}) + end; +handle_function('GetMachine', {MachineDesc}, Context) -> + {NS, ID, {After, Limit, Direction}} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + Range = genlib_map:compact(#{ + limit => Limit, + offset => After, + direction => Direction + }), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + context => maybe_unmarshal(term, Context), + range => Range + }), + case progressor:get(Req) of + {ok, Process} -> + {ok, marshal(process, Process#{ns => NS, history_range => Range})}; + Error -> + handle_error(Error) + end; +handle_function(Func, _Args, _Context) -> + error({not_implemented, Func}). + +%% Internal functions + +handle_result({ok, _} = Ok) -> + Ok; +handle_result({error, _Reason} = Error) -> + handle_error(Error). + +-spec handle_error(any()) -> no_return(). +handle_error({error, <<"process already exists">>}) -> + erlang:throw({logic, machine_already_exist}); +handle_error({error, <<"process not found">>}) -> + erlang:throw({logic, machine_not_found}); +handle_error({error, <<"namespace not found">>}) -> + erlang:throw({logic, namespace_not_found}); +handle_error({error, <<"process is running">>}) -> + erlang:throw({logic, machine_already_working}); +handle_error({error, <<"process is error">>}) -> + erlang:throw({logic, machine_failed}); +handle_error(UnknownError) -> + erlang:throw(UnknownError). + +maybe_unmarshal(_Type, undefined) -> + undefined; +maybe_unmarshal(Type, Value) -> + unmarshal(Type, Value). + +unmarshal(term, Value) -> + erlang:term_to_binary(Value). + +maybe_marshal(_Type, undefined) -> + undefined; +maybe_marshal(Type, Value) -> + marshal(Type, Value). + +marshal(process, Process) -> + #mg_stateproc_Machine{ + ns = maps:get(ns, Process), + id = maps:get(process_id, Process), + history = maybe_marshal(history, maps:get(history, Process)), + history_range = marshal(history_range, maps:get(history_range, Process)), + status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}), + aux_state = maybe_marshal(term, maps:get(aux_state, Process, undefined)) + }; +marshal(history, History) -> + lists:map(fun(Ev) -> marshal(event, Ev) end, History); +marshal(event, Event) -> + #{ + event_id := EventId, + timestamp := Timestamp, + payload := Payload + } = Event, + Meta = maps:get(metadata, Event, #{}), + #mg_stateproc_Event{ + id = EventId, + created_at = marshal(timestamp, Timestamp), + format_version = format_version(Meta), + data = marshal(term, Payload) + }; +marshal(timestamp, Timestamp) -> + unicode:characters_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{offset, "Z"}])); +marshal(term, Term) -> + binary_to_term(Term); +marshal(history_range, Range) -> + #mg_stateproc_HistoryRange{ + 'after' = maps:get(offset, Range, undefined), + limit = maps:get(limit, Range, undefined), + direction = maps:get(direction, Range, undefined) + }; +marshal(status, {<<"running">>, _Detail}) -> + {'working', #mg_stateproc_MachineStatusWorking{}}; +marshal(status, {<<"error">>, Detail}) -> + {'failed', #mg_stateproc_MachineStatusFailed{reason = Detail}}. + +format_version(#{<<"format_version">> := Version}) -> + Version; +format_version(_) -> + undefined. diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index 67a1102..b911c49 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -56,8 +56,44 @@ handler(Options) -> %% -spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> {ok, _Result} | no_return(). +handle_function(Fun, Args, WoodyContext, Options) -> + case maps:to_list(Options) of + [{_NS, #{machine := #{engine := progressor}}} | _] -> + {NS, ID} = parse_args(Args), + ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), + Deadline = get_deadline(NS, WoodyContext, Options), + {ok, _} = + Result = mg_woody_utils:handle_error( + #{ + namespace => NS, + machine_id => ID, + request_context => ReqCtx, + deadline => Deadline + }, + fun() -> mg_progressor:handle_function(Fun, Args, ReqCtx) end, + pulse(NS, Options) + ), + Result; + _ -> + handle_function_(Fun, Args, WoodyContext, Options) + end. -handle_function('Start', {NS, IDIn, Args}, WoodyContext, Options) -> +parse_args({NS, IDIn, _Args}) -> + ID = mg_woody_packer:unpack(id, IDIn), + {NS, ID}; +parse_args({MachineDesc, _Args}) when is_tuple(MachineDesc) -> + {NS, ID, _Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + {NS, ID}; +parse_args({NS, RefIn}) when is_binary(NS) -> + ID = mg_woody_packer:unpack(ref, RefIn), + {NS, ID}; +parse_args({MachineDesc}) -> + {NS, ID, _Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + {NS, ID}. + +-spec handle_function_(woody:func(), woody:args(), woody_context:ctx(), options()) -> + {ok, _Result} | no_return(). +handle_function_('Start', {NS, IDIn, Args}, WoodyContext, Options) -> ID = mg_woody_packer:unpack(id, IDIn), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), Deadline = get_deadline(NS, WoodyContext, Options), @@ -80,7 +116,7 @@ handle_function('Start', {NS, IDIn, Args}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, ok}; -handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> +handle_function_('Repair', {MachineDesc, Args}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), Deadline = get_deadline(NS, WoodyContext, Options), @@ -105,7 +141,7 @@ handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> {error, {failed, Reason}} -> woody_error:raise(business, mg_woody_packer:pack(repair_error, Reason)) end; -handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> +handle_function_('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> Deadline = get_deadline(NS, WoodyContext, Options), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), ID = mg_woody_packer:unpack(ref, RefIn), @@ -122,7 +158,7 @@ handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, ok}; -handle_function('Call', {MachineDesc, Args}, WoodyContext, Options) -> +handle_function_('Call', {MachineDesc, Args}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), Deadline = get_deadline(NS, WoodyContext, Options), @@ -142,7 +178,7 @@ handle_function('Call', {MachineDesc, Args}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, mg_woody_packer:pack(call_response, Response)}; -handle_function('GetMachine', {MachineDesc}, WoodyContext, Options) -> +handle_function_('GetMachine', {MachineDesc}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), Machine = @@ -158,7 +194,7 @@ handle_function('GetMachine', {MachineDesc}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, mg_woody_packer:pack(machine_simple, simplify_core_machine(Machine))}; -handle_function('Remove', {NS, IDIn}, WoodyContext, Options) -> +handle_function_('Remove', {NS, IDIn}, WoodyContext, Options) -> ID = mg_woody_packer:unpack(id, IDIn), Deadline = get_deadline(NS, WoodyContext, Options), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), @@ -175,7 +211,7 @@ handle_function('Remove', {NS, IDIn}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, ok}; -handle_function('Modernize', {MachineDesc}, WoodyContext, Options) -> +handle_function_('Modernize', {MachineDesc}, WoodyContext, Options) -> {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), mg_woody_utils:handle_error( @@ -199,7 +235,7 @@ handle_function('Modernize', {MachineDesc}, WoodyContext, Options) -> end, pulse(NS, Options) ); -handle_function('Notify', {MachineDesc, Args}, WoodyContext, Options) -> +handle_function_('Notify', {MachineDesc, Args}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), NotificationID = diff --git a/apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl b/apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl new file mode 100644 index 0000000..801e36c --- /dev/null +++ b/apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl @@ -0,0 +1,229 @@ +-module(mg_woody_prg_engine_tests_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). +-include_lib("mg_cth/include/mg_cth.hrl"). + +%% tests descriptions +-export([all/0]). +-export([groups/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). +-export([init_per_group/2]). +-export([end_per_group/2]). +-export([init_per_testcase/2]). +-export([end_per_testcase/2]). + +%% TESTS +-export([namespace_not_found/1]). +-export([machine_start/1]). +-export([machine_already_exists/1]). +-export([machine_call_by_id/1]). +-export([machine_id_not_found/1]). +-export([failed_machine_call/1]). +-export([failed_machine_simple_repair/1]). +-export([failed_machine_repair/1]). +-export([failed_machine_repair_error/1]). +-export([working_machine_repair/1]). +-export([working_machine_simple_repair/1]). + +-type group_name() :: atom(). +-type test_name() :: atom(). +-type config() :: [{atom(), _}]. + +-spec init_per_suite(config()) -> config(). +init_per_suite(C) -> + _ = mg_cth:start_applications([ + epg_connector, + progressor + ]), + C. + +-spec end_per_suite(config()) -> ok. +end_per_suite(_C) -> + _ = mg_cth:stop_applications([ + epg_connector, + progressor + ]), + ok. + +-spec init_per_group(group_name(), config()) -> config(). +init_per_group(_, C) -> + Config = mg_woody_config(C), + Apps = mg_cth:start_applications([ + brod, + mg_woody + ]), + {ok, SupPid} = start_automaton(Config), + [ + {apps, Apps}, + {automaton_options, #{ + url => "http://localhost:38022", + ns => ?NS, + retry_strategy => genlib_retry:linear(3, 1) + }}, + {sup_pid, SupPid} + | C + ]. + +-spec end_per_group(group_name(), config()) -> ok. +end_per_group(_, C) -> + ok = proc_lib:stop(?config(sup_pid, C)), + mg_cth:stop_applications(?config(apps, C)). + +-spec init_per_testcase(atom(), config()) -> config(). +init_per_testcase(_Name, C) -> + C. + +-spec end_per_testcase(atom(), config()) -> _. +end_per_testcase(_Name, _C) -> + ok. +% + +-spec all() -> [test_name() | {group, group_name()}]. +all() -> + [ + {group, base} + ]. + +-spec groups() -> [{group_name(), list(_), [test_name()]}]. +groups() -> + [ + {base, [], [ + namespace_not_found, + machine_start, + machine_already_exists, + machine_call_by_id, + machine_id_not_found, + failed_machine_call, + failed_machine_simple_repair, + failed_machine_repair, + failed_machine_repair_error, + working_machine_repair, + working_machine_simple_repair + ]} + ]. + +%% TESTS + +-spec namespace_not_found(config()) -> _. +namespace_not_found(C) -> + Opts = maps:update(ns, <<"incorrect_NS">>, automaton_options(C)), + #mg_stateproc_NamespaceNotFound{} = (catch mg_cth_automaton_client:start(Opts, ?ID, <<>>)). + +-spec machine_start(config()) -> _. +machine_start(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>). + +-spec machine_already_exists(config()) -> _. +machine_already_exists(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + #mg_stateproc_MachineAlreadyExists{} = + (catch mg_cth_automaton_client:start(automaton_options(C), ID, <<>>)). + +-spec machine_call_by_id(config()) -> _. +machine_call_by_id(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + <<"simple_call">> = mg_cth_automaton_client:call(automaton_options(C), ID, <<"simple_call">>). + +-spec machine_id_not_found(config()) -> _. +machine_id_not_found(C) -> + IncorrectID = <<"incorrect_ID">>, + #mg_stateproc_MachineNotFound{} = + (catch mg_cth_automaton_client:call(automaton_options(C), IncorrectID, <<"simple_call">>)). + +-spec failed_machine_call(config()) -> _. +failed_machine_call(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + #mg_stateproc_MachineFailed{} = + (catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"simple_call">>)). + +-spec failed_machine_simple_repair(config()) -> _. +failed_machine_simple_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + ok = mg_cth_automaton_client:simple_repair(automaton_options(C), ID), + %% await repair result + timer:sleep(1100), + #{status := working} = + mg_cth_automaton_client:get_machine(automaton_options(C), ID, {undefined, undefined, forward}). + +-spec failed_machine_repair(config()) -> _. +failed_machine_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + <<"repair_ok">> = mg_cth_automaton_client:repair(automaton_options(C), ID, <<"repair_ok">>), + %% await repair result + timer:sleep(1100), + #{status := working} = + mg_cth_automaton_client:get_machine(automaton_options(C), ID, {undefined, undefined, forward}). + +-spec failed_machine_repair_error(config()) -> _. +failed_machine_repair_error(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + #mg_stateproc_MachineFailed{} = + (catch mg_cth_automaton_client:repair(automaton_options(C), ID, <<"repair_fail">>)). + +-spec working_machine_repair(config()) -> _. +working_machine_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + #mg_stateproc_MachineAlreadyWorking{} = + (catch mg_cth_automaton_client:repair(automaton_options(C), ID, <<"repair_ok">>)). + +-spec working_machine_simple_repair(config()) -> _. +working_machine_simple_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + #mg_stateproc_MachineAlreadyWorking{} = + (catch mg_cth_automaton_client:simple_repair(automaton_options(C), ID)). + +%% Internal functions + +-spec automaton_options(config()) -> _. +automaton_options(C) -> ?config(automaton_options, C). + +-spec mg_woody_config(_) -> _. +mg_woody_config(_C) -> + #{ + woody_server => #{ip => {0, 0, 0, 0}, port => 38022, limits => #{}}, + quotas => [], + namespaces => #{ + ?NS => #{ + storage => mg_core_storage_memory, + processor => #{ + url => <<"http://null">> + }, + default_processing_timeout => 5000, + schedulers => #{}, + retries => #{}, + event_sinks => [], + worker => #{ + registry => mg_procreg_global, + sidecar => mg_cth_worker + }, + engine => progressor + } + } + }. + +-spec start_automaton(_) -> _. +start_automaton(MgConfig) -> + Flags = #{strategy => one_for_all}, + ChildsSpecs = mg_cth_conf:construct_child_specs(MgConfig), + {ok, SupPid} = Res = genlib_adhoc_supervisor:start_link(Flags, ChildsSpecs), + true = erlang:unlink(SupPid), + Res. + +gen_id() -> + base64:encode(crypto:strong_rand_bytes(8)). diff --git a/compose.yaml b/compose.yaml index 3e4c531..d27f2a0 100644 --- a/compose.yaml +++ b/compose.yaml @@ -30,10 +30,24 @@ services: condition: service_healthy kafka3: condition: service_healthy + postgres: + condition: service_healthy + test_neighbour: + condition: service_started ports: - "8022" command: /sbin/init + test_neighbour: + image: distrunner-test + hostname: ${SERVICE_NAME}-neighbour + build: + dockerfile: Dockerfile.neighbour + context: . + args: + OTP_VERSION: $OTP_VERSION + THRIFT_VERSION: $THRIFT_VERSION + riakdb: &member-node image: docker.io/basho/riak-kv:${RIAK_VERSION} environment: @@ -96,6 +110,32 @@ services: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092 + postgres: + image: postgres:15-bookworm + command: -c 'max_connections=300' + environment: + POSTGRES_DB: "progressor_db" + POSTGRES_USER: "progressor" + POSTGRES_PASSWORD: "progressor" + PGDATA: "/tmp/postgresql/data/pgdata" + volumes: + - progressor-data:/tmp/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U progressor -d progressor_db"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + deploy: + resources: + limits: + cpus: '2' + memory: 4G + volumes: schemas: external: false + progressor-data: diff --git a/config/config.yaml b/config/config.yaml index 7ee197a..21c829e 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -156,6 +156,14 @@ logging: 'debug': 'DEBUG' namespaces: + prg_test_ns: + timers: disabled + overseer: disabled + notification: disabled + engine: progressor + processor: + # never will be called + url: http://some.fake.url mg_test_ns: # only for testing, default 0 # suicide_probability: 0.1 @@ -312,3 +320,38 @@ kafka: max_retries: 3 # Time in milli-seconds to sleep before retry the failed produce request. retry_backoff: 500ms + +postgres: + databases: + # for example hellgate_db, fistful_db + some_processor_db: + host: postgres + port: 5432 + database: some_processor_db_name + username: user + password: password + another_processor_db: + host: another_instanse_postgres + port: 5432 + database: another_processor_db_name + username: user + password: password + pools: + some_processor_pool1: + database: some_processor_db + size: 10 + some_processor_pool2: + database: some_processor_db + size: 10 + another_processor_pool: + database: another_processor_db + size: 10 + +## Progressor namespaces settings + +progressor: + prg_test_ns: some_processor_pool1 + +canal: + url: http://vault + engine: kvv2 diff --git a/config/neighbour.sys.config b/config/neighbour.sys.config new file mode 100644 index 0000000..9a0dc57 --- /dev/null +++ b/config/neighbour.sys.config @@ -0,0 +1,44 @@ +[ + {progressor, [ + {namespaces, #{ + 'NS' => #{ + storage => #{ + client => prg_pg_backend, + options => #{pool => default_pool} + }, + processor => #{ + client => mg_cth_nbr_processor, + options => #{} + }, + retry_policy => #{ + initial_timeout => 3, + backoff_coefficient => 1.2, + max_timeout => 180, + max_attempts => 2, + non_retryable_errors => [ + do_not_retry + ] + }, + task_scan_timeout => 1, + process_step_timeout => 30 + } + }} + ]}, + {epg_connector, [ + {databases, #{ + progressor_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => progressor_db, + size => 10 + } + }} + ]} +]. diff --git a/rebar.config b/rebar.config index 378e8b9..5553fd4 100644 --- a/rebar.config +++ b/rebar.config @@ -31,12 +31,12 @@ warn_shadow_vars, warn_unused_import, warn_unused_function, - warn_deprecated_function, + warn_deprecated_function % at will % bin_opt_info % no_auto_import, - warn_missing_spec_all + % warn_missing_spec_all ]}. %% XRef checks @@ -51,6 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.3"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} @@ -92,6 +93,14 @@ {cover_enabled, true}, {cover_excl_apps, [mg_cth]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, proper]}]} + ]}, + {test_neighbour, [ + {deps, []}, + {relx, [ + {release, {mg_cth_neighbour, "0.1.0"}, [mg_cth_neighbour]}, + {sys_config, "config/neighbour.sys.config"}, + {extended_start_script, true} + ]} ]} ]}. diff --git a/rebar.lock b/rebar.lock index d51c3fd..567cad8 100644 --- a/rebar.lock +++ b/rebar.lock @@ -3,6 +3,10 @@ {<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},2}, {<<"brod">>,{pkg,<<"brod">>,<<"3.16.1">>},0}, {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1}, + {<<"canal">>, + {git,"https://github.com/valitydev/canal", + {ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}}, + 2}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"cg_mon">>, {git,"https://github.com/valitydev/cg_mon", @@ -13,6 +17,14 @@ {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},2}, {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.8">>},2}, {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, + {<<"epg_connector">>, + {git,"https://github.com/valitydev/epg_connector.git", + {ref,"35a7480b298ac4318352a03824ce06619b75f9da"}}, + 1}, + {<<"epgsql">>, + {git,"https://github.com/epgsql/epgsql.git", + {ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}}, + 2}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health", {ref,"49716470d0e8dab5e37db55d52dea78001735a3d"}}, @@ -24,12 +36,9 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, - {<<"hamcrest">>, - {git,"https://github.com/basho/hamcrest-erlang.git", - {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, - 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, + {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.0.1">>},1}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, @@ -52,6 +61,10 @@ {git,"https://github.com/seth/pooler", {ref,"96e1840b0d67d06b12b14fa3699b13c1d6ebda73"}}, 0}, + {<<"progressor">>, + {git,"https://github.com/valitydev/progressor.git", + {ref,"2873cc8bf8d95cf95629fe97d381635300ed073b"}}, + 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.13">>},1}, @@ -106,6 +119,7 @@ {<<"hackney">>, <<"C4443D960BB9FBA6D01161D01CD81173089686717D9490E5D3606644C48D121F">>}, {<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>}, {<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>}, + {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"FC696880C73483C8B032C4BB60F2873046035C7824E1EDCB924CFCE643CF23DD">>}, {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, @@ -141,6 +155,7 @@ {<<"hackney">>, <<"9AFCDA620704D720DB8C6A3123E9848D09C87586DC1C10479C42627B905B5C5E">>}, {<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>}, {<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>}, + {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"687BFD9989998EC8FBBC3ED50D1239A6C07A7DC15B52914AD477413B89ECB621">>}, {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index 292c2b2..82c2ce6 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -71,7 +71,10 @@ sys_config(YamlConfig) -> {snowflake, snowflake(YamlConfig)}, {brod, brod(YamlConfig)}, {hackney, hackney(YamlConfig)}, - {machinegun, machinegun(YamlConfig)} + {machinegun, machinegun(YamlConfig)}, + {epg_connector, epg_connector(YamlConfig)}, + {progressor, progressor(YamlConfig)}, + {canal, canal(YamlConfig)} ]. os_mon(_YamlConfig) -> @@ -387,6 +390,7 @@ namespace({Name, NSYamlConfig}, YamlConfig) -> maps:merge( #{ storage => storage(Name, YamlConfig), + engine => ?C:atom(?C:conf([engine], NSYamlConfig, <<"machinegun">>)), processor => #{ url => ?C:conf([processor, url], NSYamlConfig), transport_opts => #{ @@ -561,6 +565,89 @@ procreg(YamlConfig) -> fun(ProcRegYamlConfig) -> ?C:atom(?C:conf([module], ProcRegYamlConfig)) end ). +epg_connector(YamlConfig) -> + [ + {databases, pg_databases(YamlConfig)}, + {pools, pg_pools(YamlConfig)} + ]. + +pg_databases(YamlConfig) -> + lists:foldl( + fun({DbKey, DbOpts}, Acc) -> + Acc#{?C:atom(DbKey) => pg_db_opts(DbOpts)} + end, + #{}, + ?C:conf([postgres, databases], YamlConfig, []) + ). + +pg_db_opts(OptsList) -> + lists:foldl( + fun + ({Key, Value}, AccIn) when Key =:= <<"port">> -> + AccIn#{port => Value}; + ({Key, Value}, AccIn) -> + AccIn#{?C:atom(Key) => unicode:characters_to_list(Value)} + end, + #{}, + OptsList + ). + +pg_pools(YamlConfig) -> + lists:foldl( + fun({PoolKey, PoolOpts}, Acc) -> + Acc#{?C:atom(PoolKey) => pg_pool_opts(PoolOpts)} + end, + #{}, + ?C:conf([postgres, pools], YamlConfig, []) + ). + +pg_pool_opts(PoolOpts) -> + lists:foldl( + fun + ({<<"database">>, Value}, Acc) -> + Acc#{database => ?C:atom(Value)}; + ({<<"size">>, Value}, Acc) -> + Acc#{size => Value} + end, + #{}, + PoolOpts + ). + +progressor(YamlConfig) -> + PrgNamespaces = lists:foldl( + fun({NsName, NsPgPool}, Acc) -> + Acc#{?C:atom(NsName) => prg_namespace(?C:atom(NsPgPool))} + end, + #{}, + ?C:conf([progressor], YamlConfig, []) + ), + [{namespaces, PrgNamespaces}]. + +prg_namespace(NsPgPool) -> + #{ + storage => #{ + client => prg_pg_backend, + options => #{pool => NsPgPool} + }, + processor => #{ + %% Never will be called + client => null + }, + worker_pool_size => 0 + }. + +canal(YamlConfig) -> + lists:foldl( + fun + ({<<"url">>, Url}, Acc) -> + [{url, unicode:characters_to_list(Url)} | Acc]; + ({<<"engine">>, Value}, Acc) -> + [{engine, ?C:atom(Value)} | Acc] + end, + [], + ?C:conf([canal], YamlConfig, []) + ). + %% %% vm.args %% From 09e984a25097148a9d2eecfad3a9e34e6221d4c4 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 5 Mar 2025 09:16:37 +0300 Subject: [PATCH 02/10] TECH-61: fix compose --- compose.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/compose.yaml b/compose.yaml index d27f2a0..a3be054 100644 --- a/compose.yaml +++ b/compose.yaml @@ -47,6 +47,9 @@ services: args: OTP_VERSION: $OTP_VERSION THRIFT_VERSION: $THRIFT_VERSION + depends_on: + postgres: + condition: service_healthy riakdb: &member-node image: docker.io/basho/riak-kv:${RIAK_VERSION} From b2c9360469c0ee447de6a21153bbb2f7a9f22f83 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 11 Mar 2025 08:13:40 +0300 Subject: [PATCH 03/10] TECH-61: bump progressor --- apps/mg_cth_neighbour/rebar.config | 4 +--- rebar.config | 6 ++++-- rebar.lock | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/mg_cth_neighbour/rebar.config b/apps/mg_cth_neighbour/rebar.config index cf255cc..e40651e 100644 --- a/apps/mg_cth_neighbour/rebar.config +++ b/apps/mg_cth_neighbour/rebar.config @@ -1,7 +1,5 @@ {erl_opts, [debug_info]}. -{deps, [ - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.3"}}} -]}. +{deps, []}. {shell, [ %% {config, "config/sys.config"}, diff --git a/rebar.config b/rebar.config index 5553fd4..ee703d2 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.3"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.5"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} @@ -95,7 +95,9 @@ {dialyzer, [{plt_extra_apps, [eunit, common_test, proper]}]} ]}, {test_neighbour, [ - {deps, []}, + {deps, [ + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.5"}}} + ]}, {relx, [ {release, {mg_cth_neighbour, "0.1.0"}, [mg_cth_neighbour]}, {sys_config, "config/neighbour.sys.config"}, diff --git a/rebar.lock b/rebar.lock index 567cad8..06a3dbf 100644 --- a/rebar.lock +++ b/rebar.lock @@ -63,7 +63,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"2873cc8bf8d95cf95629fe97d381635300ed073b"}}, + {ref,"cb2ff009d510d4d97cdc39d391dd2aa7f330f369"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, From 33193ab9197d08f41384196cc3938312d26b62e2 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 11 Mar 2025 16:07:34 +0300 Subject: [PATCH 04/10] TECH-61: bump progressor --- rebar.config | 4 ++-- rebar.lock | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/rebar.config b/rebar.config index ee703d2..269c6cc 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.5"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.6"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} @@ -96,7 +96,7 @@ ]}, {test_neighbour, [ {deps, [ - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.5"}}} + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.6"}}} ]}, {relx, [ {release, {mg_cth_neighbour, "0.1.0"}, [mg_cth_neighbour]}, diff --git a/rebar.lock b/rebar.lock index 06a3dbf..dca4ef0 100644 --- a/rebar.lock +++ b/rebar.lock @@ -36,6 +36,10 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, + {<<"hamcrest">>, + {git,"https://github.com/basho/hamcrest-erlang.git", + {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, + 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, @@ -63,7 +67,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"cb2ff009d510d4d97cdc39d391dd2aa7f330f369"}}, + {ref,"ce7bcbddc7e9b97a3bb24f45fb8ef455896a9cbb"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, From 8e78f7efe0992e7bf5f8d77844a389d9a826ea3d Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 11 Mar 2025 19:05:27 +0300 Subject: [PATCH 05/10] TECH-61: add debug --- apps/mg_woody/src/mg_woody_automaton.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index b911c49..84597c7 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -73,8 +73,10 @@ handle_function(Fun, Args, WoodyContext, Options) -> fun() -> mg_progressor:handle_function(Fun, Args, ReqCtx) end, pulse(NS, Options) ), + logger:info("route automaton request ~p to progressor with result: ~p", [Fun, Result]), Result; _ -> + logger:info("route automaton request ~p to mg_core with options: ~p", [Fun, Options]), handle_function_(Fun, Args, WoodyContext, Options) end. From e943654bcd292613b1734aa751b5d3d8f3d2cf30 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 11 Mar 2025 19:37:56 +0300 Subject: [PATCH 06/10] TECH-61: bugfix --- apps/mg_woody/src/mg_woody_automaton.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index 84597c7..ced9ef4 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -57,9 +57,10 @@ handler(Options) -> -spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> {ok, _Result} | no_return(). handle_function(Fun, Args, WoodyContext, Options) -> - case maps:to_list(Options) of - [{_NS, #{machine := #{engine := progressor}}} | _] -> - {NS, ID} = parse_args(Args), + {NS, ID} = parse_args(Args), + NsOpts = maps:get(NS, Options, #{}), + case NsOpts of + #{machine := #{engine := progressor}} -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), Deadline = get_deadline(NS, WoodyContext, Options), {ok, _} = From c9fd6f517093236fdeecd98d272bc4f833d3f933 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Tue, 11 Mar 2025 20:21:20 +0300 Subject: [PATCH 07/10] TECH-61: cleanup --- apps/mg_woody/src/mg_woody_automaton.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index ced9ef4..82a8fa9 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -74,10 +74,8 @@ handle_function(Fun, Args, WoodyContext, Options) -> fun() -> mg_progressor:handle_function(Fun, Args, ReqCtx) end, pulse(NS, Options) ), - logger:info("route automaton request ~p to progressor with result: ~p", [Fun, Result]), Result; _ -> - logger:info("route automaton request ~p to mg_core with options: ~p", [Fun, Options]), handle_function_(Fun, Args, WoodyContext, Options) end. From 83dec24f59610946e6669d943a64d921adade142 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 12 Mar 2025 07:08:23 +0300 Subject: [PATCH 08/10] TECH-61: bugfix --- apps/mg_woody/src/mg_woody_automaton.erl | 20 +++++++++++++++----- rebar.lock | 4 ---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index 82a8fa9..27a5030 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -57,7 +57,7 @@ handler(Options) -> -spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> {ok, _Result} | no_return(). handle_function(Fun, Args, WoodyContext, Options) -> - {NS, ID} = parse_args(Args), + {NS, ID} = parse_args(Fun, Args), NsOpts = maps:get(NS, Options, #{}), case NsOpts of #{machine := #{engine := progressor}} -> @@ -79,16 +79,26 @@ handle_function(Fun, Args, WoodyContext, Options) -> handle_function_(Fun, Args, WoodyContext, Options) end. -parse_args({NS, IDIn, _Args}) -> +parse_args('Start', {NS, IDIn, _Args}) -> ID = mg_woody_packer:unpack(id, IDIn), {NS, ID}; -parse_args({MachineDesc, _Args}) when is_tuple(MachineDesc) -> +parse_args(Fun, {MachineDesc, _Args}) when + Fun =:= 'Repair'; + Fun =:= 'Call'; + Fun =:= 'Notify' +-> {NS, ID, _Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), {NS, ID}; -parse_args({NS, RefIn}) when is_binary(NS) -> +parse_args('SimpleRepair', {NS, RefIn}) -> ID = mg_woody_packer:unpack(ref, RefIn), {NS, ID}; -parse_args({MachineDesc}) -> +parse_args('Remove', {NS, IDIn}) -> + ID = mg_woody_packer:unpack(id, IDIn), + {NS, ID}; +parse_args(Fun, {MachineDesc}) when + Fun =:= 'GetMachine'; + Fun =:= 'Modernize' +-> {NS, ID, _Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), {NS, ID}. diff --git a/rebar.lock b/rebar.lock index dca4ef0..9ea968e 100644 --- a/rebar.lock +++ b/rebar.lock @@ -36,10 +36,6 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, - {<<"hamcrest">>, - {git,"https://github.com/basho/hamcrest-erlang.git", - {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, - 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, From 3b8da935a694277c37c3a03c7a57625ae889ffbe Mon Sep 17 00:00:00 2001 From: ttt161 Date: Wed, 12 Mar 2025 07:20:02 +0300 Subject: [PATCH 09/10] TECH-61: fix error handling --- apps/mg_progressor/src/mg_progressor.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl index 37bc8cb..a7ea20a 100644 --- a/apps/mg_progressor/src/mg_progressor.erl +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -108,6 +108,8 @@ handle_error({error, <<"process is running">>}) -> erlang:throw({logic, machine_already_working}); handle_error({error, <<"process is error">>}) -> erlang:throw({logic, machine_failed}); +handle_error({error, {exception, _, _}}) -> + erlang:throw({logic, machine_failed}); handle_error(UnknownError) -> erlang:throw(UnknownError). From ac58403672465ff580418587445541edef7bea76 Mon Sep 17 00:00:00 2001 From: ttt161 Date: Thu, 22 May 2025 09:30:22 +0300 Subject: [PATCH 10/10] TECH-22: bump progressor --- rebar.config | 2 +- rebar.lock | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/rebar.config b/rebar.config index 269c6cc..7d4e8e3 100644 --- a/rebar.config +++ b/rebar.config @@ -51,7 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, - {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.6"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.1"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} diff --git a/rebar.lock b/rebar.lock index 9ea968e..a43373d 100644 --- a/rebar.lock +++ b/rebar.lock @@ -19,7 +19,7 @@ {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, {<<"epg_connector">>, {git,"https://github.com/valitydev/epg_connector.git", - {ref,"35a7480b298ac4318352a03824ce06619b75f9da"}}, + {ref,"dd93e27c00d492169e8a7bfc38976b911c6e7d05"}}, 1}, {<<"epgsql">>, {git,"https://github.com/epgsql/epgsql.git", @@ -36,6 +36,10 @@ {<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.0">>},0}, {<<"grpcbox">>,{pkg,<<"grpcbox">>,<<"0.17.1">>},1}, {<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},1}, + {<<"hamcrest">>, + {git,"https://github.com/basho/hamcrest-erlang.git", + {ref,"ad3dbab419762fc2d5821abb88b989da006b85c6"}}, + 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, @@ -63,7 +67,7 @@ 0}, {<<"progressor">>, {git,"https://github.com/valitydev/progressor.git", - {ref,"ce7bcbddc7e9b97a3bb24f45fb8ef455896a9cbb"}}, + {ref,"6df2e447a867434ad45bfc3540c4681e10105e02"}}, 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0},