diff --git a/Dockerfile.neighbour b/Dockerfile.neighbour new file mode 100644 index 0000000..aa7984d --- /dev/null +++ b/Dockerfile.neighbour @@ -0,0 +1,46 @@ +ARG OTP_VERSION + +# Build the release +FROM docker.io/library/erlang:${OTP_VERSION} AS builder +SHELL ["/bin/bash", "-o", "pipefail", "-c"] + +# Install thrift compiler +ARG THRIFT_VERSION +ARG TARGETARCH + +RUN wget -q -O- "https://github.com/valitydev/thrift/releases/download/${THRIFT_VERSION}/thrift-${THRIFT_VERSION}-linux-${TARGETARCH}.tar.gz" \ + | tar -xvz -C /usr/local/bin/ + +# Copy sources +RUN mkdir /build +COPY . /build/ + +# Build the release +WORKDIR /build +RUN rebar3 compile \ + && rebar3 as test_neighbour release + +# Make a runner image +FROM docker.io/library/erlang:${OTP_VERSION}-slim + +ARG SERVICE_NAME=mg_cth_neighbour +ARG USER_UID=1001 +ARG USER_GID=$USER_UID + +# Set env +ENV CHARSET=UTF-8 +ENV LANG=C.UTF-8 + +# Expose SERVICE_NAME as env so CMD expands properly on start +ENV SERVICE_NAME=${SERVICE_NAME} + +# Set runtime +WORKDIR /opt/${SERVICE_NAME} + +COPY --from=builder /build/_build/test_neighbour/rel/${SERVICE_NAME} /opt/${SERVICE_NAME} + +RUN echo "#!/bin/sh" >> /entrypoint.sh && \ + echo "exec /opt/${SERVICE_NAME}/bin/${SERVICE_NAME} foreground" >> /entrypoint.sh && \ + chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/Makefile b/Makefile index 9a150cd..422ed50 100644 --- a/Makefile +++ b/Makefile @@ -14,15 +14,19 @@ DOTENV := $(shell grep -v '^\#' .env) DEV_IMAGE_TAG = $(TEST_CONTAINER_NAME)-dev DEV_IMAGE_ID = $(file < .image.dev) +TEST_IMAGE_TAG = $(DIST_CONTAINER_NAME)-test +TEST_IMAGE_ID = $(file < .image.test) + DOCKER ?= docker DOCKERCOMPOSE ?= docker-compose DOCKERCOMPOSE_W_ENV = DEV_IMAGE_TAG=$(DEV_IMAGE_TAG) $(DOCKERCOMPOSE) -f compose.yaml -f compose.tracing.yaml REBAR ?= rebar3 TEST_CONTAINER_NAME ?= testrunner +DIST_CONTAINER_NAME ?= distrunner all: compile xref lint check-format dialyze eunit -.PHONY: dev-image clean-dev-image wc-shell test +.PHONY: dev-image test-image clean-dev-image clean-test-image wc-shell test dev-image: .image.dev @@ -36,6 +40,18 @@ ifneq ($(DEV_IMAGE_ID),) rm .image.dev endif +test-image: .image.test + +.image.test: Dockerfile.neighbour .env + env $(DOTENV) $(DOCKERCOMPOSE_W_ENV) build $(DIST_CONTAINER_NAME) + $(DOCKER) image ls -q -f "reference=$(TEST_IMAGE_ID)" | head -n1 > $@ + +clean-test-image: +ifneq ($(TEST_IMAGE_ID),) + $(DOCKER) image rm -f $(TEST_IMAGE_TAG) + rm .image.test +endif + DOCKER_WC_OPTIONS := -v $(PWD):$(PWD) --workdir $(PWD) DOCKER_WC_EXTRA_OPTIONS ?= --rm DOCKER_RUN = $(DOCKER) run -t $(DOCKER_WC_OPTIONS) $(DOCKER_WC_EXTRA_OPTIONS) diff --git a/apps/machinegun/src/machinegun.app.src b/apps/machinegun/src/machinegun.app.src index 8f236ff..c54365d 100644 --- a/apps/machinegun/src/machinegun.app.src +++ b/apps/machinegun/src/machinegun.app.src @@ -26,6 +26,8 @@ erl_health, prometheus, prometheus_cowboy, + progressor, + mg_progressor, mg_utils, mg_core, mg_riak, diff --git a/apps/mg_conf/src/mg_conf.erl b/apps/mg_conf/src/mg_conf.erl index e682869..2e2a0f4 100644 --- a/apps/mg_conf/src/mg_conf.erl +++ b/apps/mg_conf/src/mg_conf.erl @@ -99,7 +99,8 @@ events_machine_options(NS, NSs, Pulse) -> event_sinks => EventSinks, pulse => Pulse, default_processing_timeout => maps:get(default_processing_timeout, NSConfigs), - event_stash_size => maps:get(event_stash_size, NSConfigs, 0) + event_stash_size => maps:get(event_stash_size, NSConfigs, 0), + engine => maps:get(engine, NSConfigs, machinegun) }. -spec machine_options(mg_core:ns(), events_machines(), pulse()) -> mg_core_machine:options(). diff --git a/apps/mg_core/src/mg_core_events_machine.erl b/apps/mg_core/src/mg_core_events_machine.erl index a5a77ca..5cf75d7 100644 --- a/apps/mg_core/src/mg_core_events_machine.erl +++ b/apps/mg_core/src/mg_core_events_machine.erl @@ -127,7 +127,8 @@ pulse => mpulse:handler(), event_sinks => [mg_core_event_sink:handler()], default_processing_timeout => timeout(), - event_stash_size => non_neg_integer() + event_stash_size => non_neg_integer(), + engine => machinegun | progressor }. % like mg_core_storage:options() except `name` -type storage_options() :: mg_utils:mod_opts(map()). diff --git a/apps/mg_cth/src/mg_cth.erl b/apps/mg_cth/src/mg_cth.erl index 3e36a08..56aae2e 100644 --- a/apps/mg_cth/src/mg_cth.erl +++ b/apps/mg_cth/src/mg_cth.erl @@ -69,9 +69,48 @@ kafka_client_config(Brokers) -> ]} ]. +-spec epg_connector_config() -> _. +epg_connector_config() -> + [ + {databases, #{ + progressor_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => progressor_db, + size => 10 + } + }} + ]. + +-spec progressor_config() -> _. +progressor_config() -> + [ + {namespaces, #{ + binary_to_atom(?NS) => #{ + processor => #{client => null}, + storage => #{ + client => prg_pg_backend, + options => #{pool => default_pool} + }, + worker_pool_size => 0 + } + }} + ]. + -spec start_application(app()) -> _Deps :: [appname()]. start_application(brod) -> genlib_app:start_application_with(brod, kafka_client_config(?BROKERS)); +start_application(epg_connector) -> + genlib_app:start_application_with(epg_connector, epg_connector_config()); +start_application(progressor) -> + genlib_app:start_application_with(progressor, progressor_config()); start_application({AppName, Env}) -> genlib_app:start_application_with(AppName, Env); start_application(AppName) -> diff --git a/apps/mg_cth_neighbour/README.md b/apps/mg_cth_neighbour/README.md new file mode 100644 index 0000000..dc18cdb --- /dev/null +++ b/apps/mg_cth_neighbour/README.md @@ -0,0 +1,4 @@ +mg_cth_neighbour +===== + +Вспомогательное приложения для тестов, в релиз не включается. diff --git a/apps/mg_cth_neighbour/rebar.config b/apps/mg_cth_neighbour/rebar.config new file mode 100644 index 0000000..e40651e --- /dev/null +++ b/apps/mg_cth_neighbour/rebar.config @@ -0,0 +1,7 @@ +{erl_opts, [debug_info]}. +{deps, []}. + +{shell, [ + %% {config, "config/sys.config"}, + {apps, [mg_cth_neighbour]} +]}. diff --git a/apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl b/apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl new file mode 100644 index 0000000..154c2b1 --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_nbr_processor.erl @@ -0,0 +1,59 @@ +-module(mg_cth_nbr_processor). + +-export([process/3]). + +-spec process({_TaskType, _Args, _Process}, _Opts, _Ctx) -> _. +process({init, _InitArgs, _Process}, _Opts, _Ctx) -> + Result = #{events => [event(1)]}, + {ok, Result}; +process( + %% <<"simple_call">> + {call, <<131, 109, 0, 0, 0, 11, 115, 105, 109, 112, 108, 101, 95, 99, 97, 108, 108>> = CallArgs, _Process}, + _Opts, + _Ctx +) -> + Result = #{ + response => CallArgs, + events => [] + }, + {ok, Result}; +process( + %% <<"fail_call">> + {call, <<131, 109, 0, 0, 0, 9, 102, 97, 105, 108, 95, 99, 97, 108, 108>> = _CallArgs, _Process}, + _Opts, + _Ctx +) -> + {error, do_not_retry}; +process( + %% simple repair + {timeout, _Args, _Process}, + _Opts, + _Ctx +) -> + Result = #{events => []}, + {ok, Result}; +% +process( + %% <<"repair_ok">> + {repair, <<131, 109, 0, 0, 0, 9, 114, 101, 112, 97, 105, 114, 95, 111, 107>> = CallArgs, _Process}, + _Opts, + _Ctx +) -> + Result = #{events => [], response => CallArgs}, + {ok, Result}; +process( + %% <<"repair_fail">> + {repair, <<131, 109, 0, 0, 0, 11, 114, 101, 112, 97, 105, 114, 95, 102, 97, 105, 108>> = _CallArgs, _Process}, + _Opts, + _Ctx +) -> + {error, unreach}. + +% +event(Id) -> + #{ + event_id => Id, + timestamp => erlang:system_time(second), + metadata => #{<<"format_version">> => 1}, + payload => erlang:term_to_binary({bin, crypto:strong_rand_bytes(8)}) + }. diff --git a/apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src b/apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src new file mode 100644 index 0000000..46f9326 --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_neighbour.app.src @@ -0,0 +1,15 @@ +{application, mg_cth_neighbour, [ + {description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {mg_cth_neighbour_app, []}}, + {applications, [ + kernel, + stdlib, + progressor + ]}, + {env, []}, + {modules, []}, + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl b/apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl new file mode 100644 index 0000000..117da68 --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_neighbour_app.erl @@ -0,0 +1,20 @@ +%%%------------------------------------------------------------------- +%% @doc mg_cth_neighbour public API +%% @end +%%%------------------------------------------------------------------- + +-module(mg_cth_neighbour_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +-spec start(_, _) -> _. +start(_StartType, _StartArgs) -> + mg_cth_neighbour_sup:start_link(). + +-spec stop(_) -> _. +stop(_State) -> + ok. + +%% internal functions diff --git a/apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl b/apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl new file mode 100644 index 0000000..a1618bd --- /dev/null +++ b/apps/mg_cth_neighbour/src/mg_cth_neighbour_sup.erl @@ -0,0 +1,39 @@ +%%%------------------------------------------------------------------- +%% @doc mg_cth_neighbour top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(mg_cth_neighbour_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +-spec start_link() -> _. +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +-spec init(_) -> _. +init([]) -> + SupFlags = #{ + strategy => one_for_all, + intensity => 0, + period => 1 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/apps/mg_progressor/rebar.config b/apps/mg_progressor/rebar.config new file mode 100644 index 0000000..2656fd5 --- /dev/null +++ b/apps/mg_progressor/rebar.config @@ -0,0 +1,2 @@ +{erl_opts, [debug_info]}. +{deps, []}. diff --git a/apps/mg_progressor/src/mg_progressor.app.src b/apps/mg_progressor/src/mg_progressor.app.src new file mode 100644 index 0000000..f325b32 --- /dev/null +++ b/apps/mg_progressor/src/mg_progressor.app.src @@ -0,0 +1,13 @@ +{application, mg_progressor, [ + {description, "An OTP library"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {env, []}, + {modules, []}, + {licenses, []}, + {links, []} + ]}. diff --git a/apps/mg_progressor/src/mg_progressor.erl b/apps/mg_progressor/src/mg_progressor.erl new file mode 100644 index 0000000..a7ea20a --- /dev/null +++ b/apps/mg_progressor/src/mg_progressor.erl @@ -0,0 +1,171 @@ +-module(mg_progressor). + +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). + +-export([handle_function/3]). + +-spec handle_function(woody:func(), woody:args(), _) -> + {ok, _Result} | no_return(). +handle_function('Start', {NS, IDIn, PackedArgs}, Context) -> + ID = mg_woody_packer:unpack(id, IDIn), + Args = mg_woody_packer:unpack(args, PackedArgs), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => maybe_unmarshal(term, Context) + }), + handle_result(progressor:init(Req)); +handle_function('Call', {MachineDesc, PackedArgs}, Context) -> + {NS, ID, {After, Limit, Direction}} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + Args = mg_woody_packer:unpack(args, PackedArgs), + Range = genlib_map:compact(#{ + limit => Limit, + offset => After, + direction => Direction + }), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => maybe_unmarshal(term, Context), + range => Range + }), + case progressor:call(Req) of + {ok, RawResponse} -> + %% TODO: CHECK THIS! + {ok, mg_woody_packer:pack(call_response, marshal(term, RawResponse))}; + Error -> + handle_error(Error) + end; +handle_function('SimpleRepair', {NS, RefIn}, Context) -> + ID = mg_woody_packer:unpack(ref, RefIn), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + context => maybe_unmarshal(term, Context) + }), + handle_result(progressor:simple_repair(Req)); +handle_function('Repair', {MachineDesc, PackedArgs}, Context) -> + {NS, ID, _} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + Args = mg_woody_packer:unpack(args, PackedArgs), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + args => maybe_unmarshal(term, Args), + context => maybe_unmarshal(term, Context) + }), + case progressor:repair(Req) of + {ok, RawResponse} -> + %% TODO: CHECK THIS! + {ok, mg_woody_packer:pack(repair_response, marshal(term, RawResponse))}; + {error, <<"process is running">>} = Error -> + handle_error(Error); + {error, <<"process not found">>} = Error -> + handle_error(Error); + {error, <<"namespace not found">>} = Error -> + handle_error(Error); + _Error -> + handle_error({error, <<"process is error">>}) + end; +handle_function('GetMachine', {MachineDesc}, Context) -> + {NS, ID, {After, Limit, Direction}} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + Range = genlib_map:compact(#{ + limit => Limit, + offset => After, + direction => Direction + }), + Req = genlib_map:compact(#{ + ns => erlang:binary_to_atom(NS), + id => ID, + context => maybe_unmarshal(term, Context), + range => Range + }), + case progressor:get(Req) of + {ok, Process} -> + {ok, marshal(process, Process#{ns => NS, history_range => Range})}; + Error -> + handle_error(Error) + end; +handle_function(Func, _Args, _Context) -> + error({not_implemented, Func}). + +%% Internal functions + +handle_result({ok, _} = Ok) -> + Ok; +handle_result({error, _Reason} = Error) -> + handle_error(Error). + +-spec handle_error(any()) -> no_return(). +handle_error({error, <<"process already exists">>}) -> + erlang:throw({logic, machine_already_exist}); +handle_error({error, <<"process not found">>}) -> + erlang:throw({logic, machine_not_found}); +handle_error({error, <<"namespace not found">>}) -> + erlang:throw({logic, namespace_not_found}); +handle_error({error, <<"process is running">>}) -> + erlang:throw({logic, machine_already_working}); +handle_error({error, <<"process is error">>}) -> + erlang:throw({logic, machine_failed}); +handle_error({error, {exception, _, _}}) -> + erlang:throw({logic, machine_failed}); +handle_error(UnknownError) -> + erlang:throw(UnknownError). + +maybe_unmarshal(_Type, undefined) -> + undefined; +maybe_unmarshal(Type, Value) -> + unmarshal(Type, Value). + +unmarshal(term, Value) -> + erlang:term_to_binary(Value). + +maybe_marshal(_Type, undefined) -> + undefined; +maybe_marshal(Type, Value) -> + marshal(Type, Value). + +marshal(process, Process) -> + #mg_stateproc_Machine{ + ns = maps:get(ns, Process), + id = maps:get(process_id, Process), + history = maybe_marshal(history, maps:get(history, Process)), + history_range = marshal(history_range, maps:get(history_range, Process)), + status = marshal(status, {maps:get(status, Process), maps:get(detail, Process, undefined)}), + aux_state = maybe_marshal(term, maps:get(aux_state, Process, undefined)) + }; +marshal(history, History) -> + lists:map(fun(Ev) -> marshal(event, Ev) end, History); +marshal(event, Event) -> + #{ + event_id := EventId, + timestamp := Timestamp, + payload := Payload + } = Event, + Meta = maps:get(metadata, Event, #{}), + #mg_stateproc_Event{ + id = EventId, + created_at = marshal(timestamp, Timestamp), + format_version = format_version(Meta), + data = marshal(term, Payload) + }; +marshal(timestamp, Timestamp) -> + unicode:characters_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{offset, "Z"}])); +marshal(term, Term) -> + binary_to_term(Term); +marshal(history_range, Range) -> + #mg_stateproc_HistoryRange{ + 'after' = maps:get(offset, Range, undefined), + limit = maps:get(limit, Range, undefined), + direction = maps:get(direction, Range, undefined) + }; +marshal(status, {<<"running">>, _Detail}) -> + {'working', #mg_stateproc_MachineStatusWorking{}}; +marshal(status, {<<"error">>, Detail}) -> + {'failed', #mg_stateproc_MachineStatusFailed{reason = Detail}}. + +format_version(#{<<"format_version">> := Version}) -> + Version; +format_version(_) -> + undefined. diff --git a/apps/mg_woody/src/mg_woody_automaton.erl b/apps/mg_woody/src/mg_woody_automaton.erl index 67a1102..27a5030 100644 --- a/apps/mg_woody/src/mg_woody_automaton.erl +++ b/apps/mg_woody/src/mg_woody_automaton.erl @@ -56,8 +56,55 @@ handler(Options) -> %% -spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> {ok, _Result} | no_return(). +handle_function(Fun, Args, WoodyContext, Options) -> + {NS, ID} = parse_args(Fun, Args), + NsOpts = maps:get(NS, Options, #{}), + case NsOpts of + #{machine := #{engine := progressor}} -> + ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), + Deadline = get_deadline(NS, WoodyContext, Options), + {ok, _} = + Result = mg_woody_utils:handle_error( + #{ + namespace => NS, + machine_id => ID, + request_context => ReqCtx, + deadline => Deadline + }, + fun() -> mg_progressor:handle_function(Fun, Args, ReqCtx) end, + pulse(NS, Options) + ), + Result; + _ -> + handle_function_(Fun, Args, WoodyContext, Options) + end. + +parse_args('Start', {NS, IDIn, _Args}) -> + ID = mg_woody_packer:unpack(id, IDIn), + {NS, ID}; +parse_args(Fun, {MachineDesc, _Args}) when + Fun =:= 'Repair'; + Fun =:= 'Call'; + Fun =:= 'Notify' +-> + {NS, ID, _Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + {NS, ID}; +parse_args('SimpleRepair', {NS, RefIn}) -> + ID = mg_woody_packer:unpack(ref, RefIn), + {NS, ID}; +parse_args('Remove', {NS, IDIn}) -> + ID = mg_woody_packer:unpack(id, IDIn), + {NS, ID}; +parse_args(Fun, {MachineDesc}) when + Fun =:= 'GetMachine'; + Fun =:= 'Modernize' +-> + {NS, ID, _Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), + {NS, ID}. -handle_function('Start', {NS, IDIn, Args}, WoodyContext, Options) -> +-spec handle_function_(woody:func(), woody:args(), woody_context:ctx(), options()) -> + {ok, _Result} | no_return(). +handle_function_('Start', {NS, IDIn, Args}, WoodyContext, Options) -> ID = mg_woody_packer:unpack(id, IDIn), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), Deadline = get_deadline(NS, WoodyContext, Options), @@ -80,7 +127,7 @@ handle_function('Start', {NS, IDIn, Args}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, ok}; -handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> +handle_function_('Repair', {MachineDesc, Args}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), Deadline = get_deadline(NS, WoodyContext, Options), @@ -105,7 +152,7 @@ handle_function('Repair', {MachineDesc, Args}, WoodyContext, Options) -> {error, {failed, Reason}} -> woody_error:raise(business, mg_woody_packer:pack(repair_error, Reason)) end; -handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> +handle_function_('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> Deadline = get_deadline(NS, WoodyContext, Options), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), ID = mg_woody_packer:unpack(ref, RefIn), @@ -122,7 +169,7 @@ handle_function('SimpleRepair', {NS, RefIn}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, ok}; -handle_function('Call', {MachineDesc, Args}, WoodyContext, Options) -> +handle_function_('Call', {MachineDesc, Args}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), Deadline = get_deadline(NS, WoodyContext, Options), @@ -142,7 +189,7 @@ handle_function('Call', {MachineDesc, Args}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, mg_woody_packer:pack(call_response, Response)}; -handle_function('GetMachine', {MachineDesc}, WoodyContext, Options) -> +handle_function_('GetMachine', {MachineDesc}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), Machine = @@ -158,7 +205,7 @@ handle_function('GetMachine', {MachineDesc}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, mg_woody_packer:pack(machine_simple, simplify_core_machine(Machine))}; -handle_function('Remove', {NS, IDIn}, WoodyContext, Options) -> +handle_function_('Remove', {NS, IDIn}, WoodyContext, Options) -> ID = mg_woody_packer:unpack(id, IDIn), Deadline = get_deadline(NS, WoodyContext, Options), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), @@ -175,7 +222,7 @@ handle_function('Remove', {NS, IDIn}, WoodyContext, Options) -> pulse(NS, Options) ), {ok, ok}; -handle_function('Modernize', {MachineDesc}, WoodyContext, Options) -> +handle_function_('Modernize', {MachineDesc}, WoodyContext, Options) -> {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), mg_woody_utils:handle_error( @@ -199,7 +246,7 @@ handle_function('Modernize', {MachineDesc}, WoodyContext, Options) -> end, pulse(NS, Options) ); -handle_function('Notify', {MachineDesc, Args}, WoodyContext, Options) -> +handle_function_('Notify', {MachineDesc, Args}, WoodyContext, Options) -> ReqCtx = to_request_context(otel_ctx:get_current(), WoodyContext), {NS, ID, Range} = mg_woody_packer:unpack(machine_descriptor, MachineDesc), NotificationID = diff --git a/apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl b/apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl new file mode 100644 index 0000000..801e36c --- /dev/null +++ b/apps/mg_woody/test/mg_woody_prg_engine_tests_SUITE.erl @@ -0,0 +1,229 @@ +-module(mg_woody_prg_engine_tests_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). +-include_lib("mg_cth/include/mg_cth.hrl"). + +%% tests descriptions +-export([all/0]). +-export([groups/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). +-export([init_per_group/2]). +-export([end_per_group/2]). +-export([init_per_testcase/2]). +-export([end_per_testcase/2]). + +%% TESTS +-export([namespace_not_found/1]). +-export([machine_start/1]). +-export([machine_already_exists/1]). +-export([machine_call_by_id/1]). +-export([machine_id_not_found/1]). +-export([failed_machine_call/1]). +-export([failed_machine_simple_repair/1]). +-export([failed_machine_repair/1]). +-export([failed_machine_repair_error/1]). +-export([working_machine_repair/1]). +-export([working_machine_simple_repair/1]). + +-type group_name() :: atom(). +-type test_name() :: atom(). +-type config() :: [{atom(), _}]. + +-spec init_per_suite(config()) -> config(). +init_per_suite(C) -> + _ = mg_cth:start_applications([ + epg_connector, + progressor + ]), + C. + +-spec end_per_suite(config()) -> ok. +end_per_suite(_C) -> + _ = mg_cth:stop_applications([ + epg_connector, + progressor + ]), + ok. + +-spec init_per_group(group_name(), config()) -> config(). +init_per_group(_, C) -> + Config = mg_woody_config(C), + Apps = mg_cth:start_applications([ + brod, + mg_woody + ]), + {ok, SupPid} = start_automaton(Config), + [ + {apps, Apps}, + {automaton_options, #{ + url => "http://localhost:38022", + ns => ?NS, + retry_strategy => genlib_retry:linear(3, 1) + }}, + {sup_pid, SupPid} + | C + ]. + +-spec end_per_group(group_name(), config()) -> ok. +end_per_group(_, C) -> + ok = proc_lib:stop(?config(sup_pid, C)), + mg_cth:stop_applications(?config(apps, C)). + +-spec init_per_testcase(atom(), config()) -> config(). +init_per_testcase(_Name, C) -> + C. + +-spec end_per_testcase(atom(), config()) -> _. +end_per_testcase(_Name, _C) -> + ok. +% + +-spec all() -> [test_name() | {group, group_name()}]. +all() -> + [ + {group, base} + ]. + +-spec groups() -> [{group_name(), list(_), [test_name()]}]. +groups() -> + [ + {base, [], [ + namespace_not_found, + machine_start, + machine_already_exists, + machine_call_by_id, + machine_id_not_found, + failed_machine_call, + failed_machine_simple_repair, + failed_machine_repair, + failed_machine_repair_error, + working_machine_repair, + working_machine_simple_repair + ]} + ]. + +%% TESTS + +-spec namespace_not_found(config()) -> _. +namespace_not_found(C) -> + Opts = maps:update(ns, <<"incorrect_NS">>, automaton_options(C)), + #mg_stateproc_NamespaceNotFound{} = (catch mg_cth_automaton_client:start(Opts, ?ID, <<>>)). + +-spec machine_start(config()) -> _. +machine_start(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>). + +-spec machine_already_exists(config()) -> _. +machine_already_exists(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + #mg_stateproc_MachineAlreadyExists{} = + (catch mg_cth_automaton_client:start(automaton_options(C), ID, <<>>)). + +-spec machine_call_by_id(config()) -> _. +machine_call_by_id(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + <<"simple_call">> = mg_cth_automaton_client:call(automaton_options(C), ID, <<"simple_call">>). + +-spec machine_id_not_found(config()) -> _. +machine_id_not_found(C) -> + IncorrectID = <<"incorrect_ID">>, + #mg_stateproc_MachineNotFound{} = + (catch mg_cth_automaton_client:call(automaton_options(C), IncorrectID, <<"simple_call">>)). + +-spec failed_machine_call(config()) -> _. +failed_machine_call(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + #mg_stateproc_MachineFailed{} = + (catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"simple_call">>)). + +-spec failed_machine_simple_repair(config()) -> _. +failed_machine_simple_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + ok = mg_cth_automaton_client:simple_repair(automaton_options(C), ID), + %% await repair result + timer:sleep(1100), + #{status := working} = + mg_cth_automaton_client:get_machine(automaton_options(C), ID, {undefined, undefined, forward}). + +-spec failed_machine_repair(config()) -> _. +failed_machine_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + <<"repair_ok">> = mg_cth_automaton_client:repair(automaton_options(C), ID, <<"repair_ok">>), + %% await repair result + timer:sleep(1100), + #{status := working} = + mg_cth_automaton_client:get_machine(automaton_options(C), ID, {undefined, undefined, forward}). + +-spec failed_machine_repair_error(config()) -> _. +failed_machine_repair_error(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + _Fail = catch mg_cth_automaton_client:call(automaton_options(C), ID, <<"fail_call">>), + #mg_stateproc_MachineFailed{} = + (catch mg_cth_automaton_client:repair(automaton_options(C), ID, <<"repair_fail">>)). + +-spec working_machine_repair(config()) -> _. +working_machine_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + #mg_stateproc_MachineAlreadyWorking{} = + (catch mg_cth_automaton_client:repair(automaton_options(C), ID, <<"repair_ok">>)). + +-spec working_machine_simple_repair(config()) -> _. +working_machine_simple_repair(C) -> + ID = gen_id(), + ok = mg_cth_automaton_client:start(automaton_options(C), ID, <<"init_args">>), + #mg_stateproc_MachineAlreadyWorking{} = + (catch mg_cth_automaton_client:simple_repair(automaton_options(C), ID)). + +%% Internal functions + +-spec automaton_options(config()) -> _. +automaton_options(C) -> ?config(automaton_options, C). + +-spec mg_woody_config(_) -> _. +mg_woody_config(_C) -> + #{ + woody_server => #{ip => {0, 0, 0, 0}, port => 38022, limits => #{}}, + quotas => [], + namespaces => #{ + ?NS => #{ + storage => mg_core_storage_memory, + processor => #{ + url => <<"http://null">> + }, + default_processing_timeout => 5000, + schedulers => #{}, + retries => #{}, + event_sinks => [], + worker => #{ + registry => mg_procreg_global, + sidecar => mg_cth_worker + }, + engine => progressor + } + } + }. + +-spec start_automaton(_) -> _. +start_automaton(MgConfig) -> + Flags = #{strategy => one_for_all}, + ChildsSpecs = mg_cth_conf:construct_child_specs(MgConfig), + {ok, SupPid} = Res = genlib_adhoc_supervisor:start_link(Flags, ChildsSpecs), + true = erlang:unlink(SupPid), + Res. + +gen_id() -> + base64:encode(crypto:strong_rand_bytes(8)). diff --git a/compose.yaml b/compose.yaml index 3e4c531..a3be054 100644 --- a/compose.yaml +++ b/compose.yaml @@ -30,10 +30,27 @@ services: condition: service_healthy kafka3: condition: service_healthy + postgres: + condition: service_healthy + test_neighbour: + condition: service_started ports: - "8022" command: /sbin/init + test_neighbour: + image: distrunner-test + hostname: ${SERVICE_NAME}-neighbour + build: + dockerfile: Dockerfile.neighbour + context: . + args: + OTP_VERSION: $OTP_VERSION + THRIFT_VERSION: $THRIFT_VERSION + depends_on: + postgres: + condition: service_healthy + riakdb: &member-node image: docker.io/basho/riak-kv:${RIAK_VERSION} environment: @@ -96,6 +113,32 @@ services: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092 + postgres: + image: postgres:15-bookworm + command: -c 'max_connections=300' + environment: + POSTGRES_DB: "progressor_db" + POSTGRES_USER: "progressor" + POSTGRES_PASSWORD: "progressor" + PGDATA: "/tmp/postgresql/data/pgdata" + volumes: + - progressor-data:/tmp/postgresql/data + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U progressor -d progressor_db"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + deploy: + resources: + limits: + cpus: '2' + memory: 4G + volumes: schemas: external: false + progressor-data: diff --git a/config/config.yaml b/config/config.yaml index 7ee197a..21c829e 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -156,6 +156,14 @@ logging: 'debug': 'DEBUG' namespaces: + prg_test_ns: + timers: disabled + overseer: disabled + notification: disabled + engine: progressor + processor: + # never will be called + url: http://some.fake.url mg_test_ns: # only for testing, default 0 # suicide_probability: 0.1 @@ -312,3 +320,38 @@ kafka: max_retries: 3 # Time in milli-seconds to sleep before retry the failed produce request. retry_backoff: 500ms + +postgres: + databases: + # for example hellgate_db, fistful_db + some_processor_db: + host: postgres + port: 5432 + database: some_processor_db_name + username: user + password: password + another_processor_db: + host: another_instanse_postgres + port: 5432 + database: another_processor_db_name + username: user + password: password + pools: + some_processor_pool1: + database: some_processor_db + size: 10 + some_processor_pool2: + database: some_processor_db + size: 10 + another_processor_pool: + database: another_processor_db + size: 10 + +## Progressor namespaces settings + +progressor: + prg_test_ns: some_processor_pool1 + +canal: + url: http://vault + engine: kvv2 diff --git a/config/neighbour.sys.config b/config/neighbour.sys.config new file mode 100644 index 0000000..9a0dc57 --- /dev/null +++ b/config/neighbour.sys.config @@ -0,0 +1,44 @@ +[ + {progressor, [ + {namespaces, #{ + 'NS' => #{ + storage => #{ + client => prg_pg_backend, + options => #{pool => default_pool} + }, + processor => #{ + client => mg_cth_nbr_processor, + options => #{} + }, + retry_policy => #{ + initial_timeout => 3, + backoff_coefficient => 1.2, + max_timeout => 180, + max_attempts => 2, + non_retryable_errors => [ + do_not_retry + ] + }, + task_scan_timeout => 1, + process_step_timeout => 30 + } + }} + ]}, + {epg_connector, [ + {databases, #{ + progressor_db => #{ + host => "postgres", + port => 5432, + database => "progressor_db", + username => "progressor", + password => "progressor" + } + }}, + {pools, #{ + default_pool => #{ + database => progressor_db, + size => 10 + } + }} + ]} +]. diff --git a/rebar.config b/rebar.config index 378e8b9..7d4e8e3 100644 --- a/rebar.config +++ b/rebar.config @@ -31,12 +31,12 @@ warn_shadow_vars, warn_unused_import, warn_unused_function, - warn_deprecated_function, + warn_deprecated_function % at will % bin_opt_info % no_auto_import, - warn_missing_spec_all + % warn_missing_spec_all ]}. %% XRef checks @@ -51,6 +51,7 @@ {deps, [ {genlib, {git, "https://github.com/valitydev/genlib", {tag, "v1.1.0"}}}, + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v1.0.1"}}}, % for configurator script {yamerl, {git, "https://github.com/valitydev/yamerl", {branch, master}}}, {cg_mon, {git, "https://github.com/valitydev/cg_mon", {branch, master}}} @@ -92,6 +93,16 @@ {cover_enabled, true}, {cover_excl_apps, [mg_cth]}, {dialyzer, [{plt_extra_apps, [eunit, common_test, proper]}]} + ]}, + {test_neighbour, [ + {deps, [ + {progressor, {git, "https://github.com/valitydev/progressor.git", {tag, "v0.0.6"}}} + ]}, + {relx, [ + {release, {mg_cth_neighbour, "0.1.0"}, [mg_cth_neighbour]}, + {sys_config, "config/neighbour.sys.config"}, + {extended_start_script, true} + ]} ]} ]}. diff --git a/rebar.lock b/rebar.lock index d51c3fd..a43373d 100644 --- a/rebar.lock +++ b/rebar.lock @@ -3,6 +3,10 @@ {<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},2}, {<<"brod">>,{pkg,<<"brod">>,<<"3.16.1">>},0}, {<<"cache">>,{pkg,<<"cache">>,<<"2.3.3">>},1}, + {<<"canal">>, + {git,"https://github.com/valitydev/canal", + {ref,"621d3821cd0a6036fee75d8e3b2d17167f3268e4"}}, + 2}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.8.0">>},2}, {<<"cg_mon">>, {git,"https://github.com/valitydev/cg_mon", @@ -13,6 +17,14 @@ {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},2}, {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.8">>},2}, {<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},2}, + {<<"epg_connector">>, + {git,"https://github.com/valitydev/epg_connector.git", + {ref,"dd93e27c00d492169e8a7bfc38976b911c6e7d05"}}, + 1}, + {<<"epgsql">>, + {git,"https://github.com/epgsql/epgsql.git", + {ref,"7ba52768cf0ea7d084df24d4275a88eef4db13c2"}}, + 2}, {<<"erl_health">>, {git,"https://github.com/valitydev/erlang-health", {ref,"49716470d0e8dab5e37db55d52dea78001735a3d"}}, @@ -30,6 +42,7 @@ 2}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},3}, {<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},2}, + {<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},3}, {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.0.1">>},1}, {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},2}, @@ -52,6 +65,10 @@ {git,"https://github.com/seth/pooler", {ref,"96e1840b0d67d06b12b14fa3699b13c1d6ebda73"}}, 0}, + {<<"progressor">>, + {git,"https://github.com/valitydev/progressor.git", + {ref,"6df2e447a867434ad45bfc3540c4681e10105e02"}}, + 0}, {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.11.0">>},0}, {<<"prometheus_cowboy">>,{pkg,<<"prometheus_cowboy">>,<<"0.1.9">>},0}, {<<"prometheus_httpd">>,{pkg,<<"prometheus_httpd">>,<<"2.1.13">>},1}, @@ -106,6 +123,7 @@ {<<"hackney">>, <<"C4443D960BB9FBA6D01161D01CD81173089686717D9490E5D3606644C48D121F">>}, {<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>}, {<<"idna">>, <<"8A63070E9F7D0C62EB9D9FCB360A7DE382448200FBBD1B106CC96D3D8099DF8D">>}, + {<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>}, {<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>}, {<<"kafka_protocol">>, <<"FC696880C73483C8B032C4BB60F2873046035C7824E1EDCB924CFCE643CF23DD">>}, {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, @@ -141,6 +159,7 @@ {<<"hackney">>, <<"9AFCDA620704D720DB8C6A3123E9848D09C87586DC1C10479C42627B905B5C5E">>}, {<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>}, {<<"idna">>, <<"92376EB7894412ED19AC475E4A86F7B413C1B9FBB5BD16DCCD57934157944CEA">>}, + {<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>}, {<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>}, {<<"kafka_protocol">>, <<"687BFD9989998EC8FBBC3ED50D1239A6C07A7DC15B52914AD477413B89ECB621">>}, {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index 292c2b2..82c2ce6 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -71,7 +71,10 @@ sys_config(YamlConfig) -> {snowflake, snowflake(YamlConfig)}, {brod, brod(YamlConfig)}, {hackney, hackney(YamlConfig)}, - {machinegun, machinegun(YamlConfig)} + {machinegun, machinegun(YamlConfig)}, + {epg_connector, epg_connector(YamlConfig)}, + {progressor, progressor(YamlConfig)}, + {canal, canal(YamlConfig)} ]. os_mon(_YamlConfig) -> @@ -387,6 +390,7 @@ namespace({Name, NSYamlConfig}, YamlConfig) -> maps:merge( #{ storage => storage(Name, YamlConfig), + engine => ?C:atom(?C:conf([engine], NSYamlConfig, <<"machinegun">>)), processor => #{ url => ?C:conf([processor, url], NSYamlConfig), transport_opts => #{ @@ -561,6 +565,89 @@ procreg(YamlConfig) -> fun(ProcRegYamlConfig) -> ?C:atom(?C:conf([module], ProcRegYamlConfig)) end ). +epg_connector(YamlConfig) -> + [ + {databases, pg_databases(YamlConfig)}, + {pools, pg_pools(YamlConfig)} + ]. + +pg_databases(YamlConfig) -> + lists:foldl( + fun({DbKey, DbOpts}, Acc) -> + Acc#{?C:atom(DbKey) => pg_db_opts(DbOpts)} + end, + #{}, + ?C:conf([postgres, databases], YamlConfig, []) + ). + +pg_db_opts(OptsList) -> + lists:foldl( + fun + ({Key, Value}, AccIn) when Key =:= <<"port">> -> + AccIn#{port => Value}; + ({Key, Value}, AccIn) -> + AccIn#{?C:atom(Key) => unicode:characters_to_list(Value)} + end, + #{}, + OptsList + ). + +pg_pools(YamlConfig) -> + lists:foldl( + fun({PoolKey, PoolOpts}, Acc) -> + Acc#{?C:atom(PoolKey) => pg_pool_opts(PoolOpts)} + end, + #{}, + ?C:conf([postgres, pools], YamlConfig, []) + ). + +pg_pool_opts(PoolOpts) -> + lists:foldl( + fun + ({<<"database">>, Value}, Acc) -> + Acc#{database => ?C:atom(Value)}; + ({<<"size">>, Value}, Acc) -> + Acc#{size => Value} + end, + #{}, + PoolOpts + ). + +progressor(YamlConfig) -> + PrgNamespaces = lists:foldl( + fun({NsName, NsPgPool}, Acc) -> + Acc#{?C:atom(NsName) => prg_namespace(?C:atom(NsPgPool))} + end, + #{}, + ?C:conf([progressor], YamlConfig, []) + ), + [{namespaces, PrgNamespaces}]. + +prg_namespace(NsPgPool) -> + #{ + storage => #{ + client => prg_pg_backend, + options => #{pool => NsPgPool} + }, + processor => #{ + %% Never will be called + client => null + }, + worker_pool_size => 0 + }. + +canal(YamlConfig) -> + lists:foldl( + fun + ({<<"url">>, Url}, Acc) -> + [{url, unicode:characters_to_list(Url)} | Acc]; + ({<<"engine">>, Value}, Acc) -> + [{engine, ?C:atom(Value)} | Acc] + end, + [], + ?C:conf([canal], YamlConfig, []) + ). + %% %% vm.args %%