diff --git a/apps/machinegun/src/mg_configurator.erl b/apps/machinegun/src/mg_configurator.erl index 94e8a79..f77746a 100644 --- a/apps/machinegun/src/mg_configurator.erl +++ b/apps/machinegun/src/mg_configurator.erl @@ -21,10 +21,17 @@ event_stash_size := non_neg_integer() }. +-type event_sink_ns() :: #{ + default_processing_timeout := timeout(), + storage => mg_core_storage:options(), + worker => mg_core_worker:options() +}. + -type namespaces() :: #{mg_core:ns() => events_machines()}. -type config() :: #{ woody_server := mg_woody:woody_server(), + event_sink_ns := event_sink_ns(), namespaces := namespaces(), pulse := pulse(), quotas => [mg_core_quota_worker:options()], @@ -39,6 +46,7 @@ construct_child_specs( #{ woody_server := WoodyServer, + event_sink_ns := EventSinkNS, namespaces := Namespaces, pulse := Pulse } = Config @@ -48,12 +56,14 @@ construct_child_specs( ClusterOpts = maps:get(cluster, Config, #{}), QuotasChildSpec = quotas_child_specs(Quotas, quota), - EventMachinesChildSpec = events_machines_child_specs(Namespaces, Pulse), + EventSinkChildSpec = event_sink_ns_child_spec(EventSinkNS, event_sink, Pulse), + EventMachinesChildSpec = events_machines_child_specs(Namespaces, EventSinkNS, Pulse), WoodyServerChildSpec = mg_woody:child_spec( woody_server, #{ pulse => Pulse, - automaton => api_automaton_options(Namespaces, Pulse), + automaton => api_automaton_options(Namespaces, EventSinkNS, Pulse), + event_sink => api_event_sink_options(Namespaces, EventSinkNS, Pulse), woody_server => WoodyServer, additional_routes => [ get_startup_route(), @@ -66,6 +76,7 @@ construct_child_specs( lists:flatten([ QuotasChildSpec, + EventSinkChildSpec, EventMachinesChildSpec, ClusterSpec, WoodyServerChildSpec @@ -101,16 +112,22 @@ quotas_child_specs(Quotas, ChildID) -> || Options <- Quotas ]. --spec events_machines_child_specs(namespaces(), pulse()) -> supervisor:child_spec(). -events_machines_child_specs(NSs, Pulse) -> - NsOptions = [events_machine_options(NS, NSs, Pulse) || NS <- maps:keys(NSs)], +-spec events_machines_child_specs(namespaces(), event_sink_ns(), pulse()) -> supervisor:child_spec(). +events_machines_child_specs(NSs, EventSinkNS, Pulse) -> + NsOptions = [ + events_machine_options(NS, NSs, EventSinkNS, Pulse) + || NS <- maps:keys(NSs) + ], mg_namespace_sup:child_spec(NsOptions, namespaces_sup). --spec events_machine_options(mg_core:ns(), namespaces(), pulse()) -> mg_core_events_machine:options(). -events_machine_options(NS, NSs, Pulse) -> +-spec events_machine_options(mg_core:ns(), namespaces(), event_sink_ns(), pulse()) -> mg_core_events_machine:options(). +events_machine_options(NS, NSs, EventSinkNS, Pulse) -> NSConfigs = maps:get(NS, NSs), #{processor := ProcessorConfig, storage := Storage} = NSConfigs, - EventSinks = [event_sink_options(SinkConfig, Pulse) || SinkConfig <- maps:get(event_sinks, NSConfigs, [])], + EventSinks = [ + event_sink_options(SinkConfig, EventSinkNS, Pulse) + || SinkConfig <- maps:get(event_sinks, NSConfigs, []) + ], EventsStorage = sub_storage_options(<<"events">>, Storage), #{ namespace => NS, @@ -150,14 +167,14 @@ machine_options(NS, Config, Pulse) -> suicide_probability => maps:get(suicide_probability, Config, undefined) }. --spec api_automaton_options(namespaces(), pulse()) -> mg_woody_automaton:options(). -api_automaton_options(NSs, Pulse) -> +-spec api_automaton_options(namespaces(), event_sink_ns(), pulse()) -> mg_woody_automaton:options(). +api_automaton_options(NSs, EventSinkNS, Pulse) -> maps:fold( fun(NS, ConfigNS, Options) -> Options#{ NS => maps:merge( #{ - machine => events_machine_options(NS, NSs, Pulse) + machine => events_machine_options(NS, NSs, EventSinkNS, Pulse) }, modernizer_options(maps:get(modernizer, ConfigNS, undefined), Pulse) ) @@ -167,13 +184,47 @@ api_automaton_options(NSs, Pulse) -> NSs ). --spec event_sink_options(mg_core_events_sink:handler(), pulse()) -> mg_core_events_sink:handler(). -event_sink_options({mg_core_events_sink_kafka, EventSinkConfig}, Pulse) -> +-spec event_sink_options(mg_core_events_sink:handler(), event_sink_ns(), pulse()) -> mg_core_events_sink:handler(). +event_sink_options({mg_core_events_sink_machine, EventSinkConfig}, EvSinks, Pulse) -> + EventSinkNS = event_sink_namespace_options(EvSinks, Pulse), + {mg_core_events_sink_machine, maps:merge(EventSinkNS, EventSinkConfig)}; +event_sink_options({mg_core_events_sink_kafka, EventSinkConfig}, _Config, Pulse) -> {mg_core_events_sink_kafka, EventSinkConfig#{ pulse => Pulse, encoder => fun mg_woody_event_sink:serialize/3 }}. +-spec event_sink_ns_child_spec(event_sink_ns(), atom(), pulse()) -> supervisor:child_spec(). +event_sink_ns_child_spec(EventSinkNS, ChildID, Pulse) -> + mg_core_events_sink_machine:child_spec(event_sink_namespace_options(EventSinkNS, Pulse), ChildID). + +-spec api_event_sink_options(namespaces(), event_sink_ns(), pulse()) -> mg_woody_event_sink:options(). +api_event_sink_options(NSs, EventSinkNS, Pulse) -> + EventSinkMachines = collect_event_sink_machines(NSs), + {EventSinkMachines, event_sink_namespace_options(EventSinkNS, Pulse)}. + +-spec collect_event_sink_machines(namespaces()) -> [mg_core:id()]. +collect_event_sink_machines(NSs) -> + NSConfigs = maps:values(NSs), + EventSinks = ordsets:from_list([ + maps:get(machine_id, SinkConfig) + || NSConfig <- NSConfigs, {mg_core_events_sink_machine, SinkConfig} <- maps:get(event_sinks, NSConfig, []) + ]), + ordsets:to_list(EventSinks). + +-spec event_sink_namespace_options(event_sink_ns(), pulse()) -> mg_core_events_sink_machine:ns_options(). +event_sink_namespace_options(#{storage := Storage} = EventSinkNS, Pulse) -> + NS = <<"_event_sinks">>, + MachinesStorage = sub_storage_options(<<"machines">>, Storage), + EventsStorage = sub_storage_options(<<"events">>, Storage), + EventSinkNS#{ + namespace => NS, + pulse => Pulse, + storage => MachinesStorage, + events_storage => EventsStorage, + worker => worker_manager_options(EventSinkNS) + }. + -spec worker_manager_options(map()) -> mg_core_workers_manager:ns_options(). worker_manager_options(Config) -> maps:merge( diff --git a/apps/machinegun/test/mg_prometheus_metric_SUITE.erl b/apps/machinegun/test/mg_prometheus_metric_SUITE.erl index 6cb0587..c198c12 100644 --- a/apps/machinegun/test/mg_prometheus_metric_SUITE.erl +++ b/apps/machinegun/test/mg_prometheus_metric_SUITE.erl @@ -839,6 +839,10 @@ mg_config() -> [ {woody_server, #{ip => {0, 0, 0, 0}, port => 8022}}, {namespaces, #{}}, + {event_sink_ns, #{ + storage => mg_core_storage_memory, + registry => mg_core_procreg_global + }}, {pulse, {mg_pulse, #{}}} ]. diff --git a/apps/machinegun/test/mg_tests_SUITE.erl b/apps/machinegun/test/mg_tests_SUITE.erl index 35b1d66..a43fa10 100644 --- a/apps/machinegun/test/mg_tests_SUITE.erl +++ b/apps/machinegun/test/mg_tests_SUITE.erl @@ -247,6 +247,10 @@ mg_config(#{endpoint := {IP, Port}}, C) -> % сейчас же можно иногда включать и смотреть % suicide_probability => 0.1, event_sinks => [ + {mg_core_events_sink_machine, #{ + name => machine, + machine_id => ?ES_ID + }}, {mg_core_events_sink_kafka, #{ name => kafka, topic => ?ES_ID, @@ -255,6 +259,10 @@ mg_config(#{endpoint := {IP, Port}}, C) -> ] } }}, + {event_sink_ns, #{ + storage => mg_core_storage_memory, + default_processing_timeout => 5000 + }}, {pulse, {mg_pulse, #{}}} ]. diff --git a/apps/mg_core/src/mg_core_events_sink_machine.erl b/apps/mg_core/src/mg_core_events_sink_machine.erl new file mode 100644 index 0000000..7232691 --- /dev/null +++ b/apps/mg_core/src/mg_core_events_sink_machine.erl @@ -0,0 +1,299 @@ +%%% +%%% Copyright 2017 RBKmoney +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +-module(mg_core_events_sink_machine). + +%% API +-export_type([event_body/0]). +-export_type([options/0]). +-export_type([storage_options/0]). +-export_type([ns_options/0]). +-export([child_spec/2]). +-export([start_link/1]). +-export([get_history/3]). +-export([repair/4]). + +%% mg_core_events_sink handler +-behaviour(mg_core_events_sink). +-export([add_events/6]). + +%% mg_core_machine handler +-behaviour(mg_core_machine). +-export([process_machine/7]). + +%% +%% API +%% +-type event_body() :: #{ + source_ns => mg_core:ns(), + source_id => mg_core:id(), + event => mg_core_events:event() +}. +-type event() :: mg_core_events:event(event_body()). +-type options() :: #{ + name := atom(), + namespace := mg_core:ns(), + machine_id := mg_core:id(), + storage := storage_options(), + worker := mg_core_workers_manager:ns_options(), + pulse := mg_core_pulse:handler(), + events_storage := mg_core_storage:options(), + default_processing_timeout := timeout() +}. +-type ns_options() :: #{ + namespace := mg_core:ns(), + storage := storage_options(), + worker := mg_core_workers_manager:ns_options(), + pulse := mg_core_pulse:handler(), + events_storage := storage_options(), + default_processing_timeout := timeout() +}. +% like mg_core_storage:options() except `name` +-type storage_options() :: mg_core_utils:mod_opts(map()). + +-spec child_spec(ns_options(), atom()) -> supervisor:child_spec(). +child_spec(Options, ChildID) -> + #{ + id => ChildID, + start => {?MODULE, start_link, [Options]}, + restart => permanent, + type => supervisor + }. + +-spec start_link(ns_options()) -> mg_core_utils:gen_start_ret(). +start_link(Options) -> + genlib_adhoc_supervisor:start_link( + #{strategy => one_for_all}, + mg_core_utils:lists_compact([ + mg_core_machine:child_spec(machine_options(Options), automaton), + mg_core_storage:child_spec(events_storage_options(Options), events_storage) + ]) + ). + +-spec add_events( + options(), + mg_core:ns(), + mg_core:id(), + [mg_core_events:event()], + ReqCtx, + Deadline +) -> ok when + ReqCtx :: mg_core:request_context(), + Deadline :: mg_core_deadline:deadline(). +add_events( + #{machine_id := EventSinkID} = Options, + SourceNS, + SourceMachineID, + Events, + ReqCtx, + Deadline +) -> + NSOptions = maps:without([machine_id, name], Options), + ok = mg_core_machine:call_with_lazy_start( + machine_options(NSOptions), + EventSinkID, + {add_events, SourceNS, SourceMachineID, Events}, + ReqCtx, + Deadline, + undefined + ). + +-spec get_history(ns_options(), mg_core:id(), mg_core_events:history_range()) -> [event()]. +get_history(Options, EventSinkID, HistoryRange) -> + #{events_range := EventsRange} = get_state(Options, EventSinkID), + StorageOptions = events_storage_options(Options), + Batch = mg_core_dirange:fold( + fun(EventID, Batch) -> + Key = mg_core_events:add_machine_id( + EventSinkID, + mg_core_events:event_id_to_key(EventID) + ), + mg_core_storage:add_batch_request({get, Key}, Batch) + end, + mg_core_storage:new_batch(), + mg_core_events:intersect_range(EventsRange, HistoryRange) + ), + BatchResults = mg_core_storage:run_batch(StorageOptions, Batch), + lists:map( + fun({{get, Key}, {_Context, Value}}) -> + kv_to_sink_event(EventSinkID, {Key, Value}) + end, + BatchResults + ). + +-spec repair(ns_options(), mg_core:id(), mg_core:request_context(), mg_core_deadline:deadline()) -> + ok. +repair(Options, EventSinkID, ReqCtx, Deadline) -> + mg_core_machine:repair(machine_options(Options), EventSinkID, undefined, ReqCtx, Deadline). + +%% +%% mg_core_processor handler +%% +-type state() :: #{ + events_range => mg_core_events:events_range() +}. + +-spec process_machine(Options, EventSinkID, Impact, PCtx, ReqCtx, Deadline, PackedState) -> + Result +when + Options :: ns_options(), + EventSinkID :: mg_core:id(), + Impact :: mg_core_machine:processor_impact(), + PCtx :: mg_core_machine:processing_context(), + ReqCtx :: mg_core:request_context(), + Deadline :: mg_core_deadline:deadline(), + PackedState :: mg_core_machine:machine_state(), + Result :: mg_core_machine:processor_result(). +process_machine(Options, EventSinkID, Impact, _PCtx, _ReqCtx, _Deadline, PackedState) -> + State = + case {Impact, PackedState} of + {{init, _}, null} -> new_state(); + {_, _} -> opaque_to_state(PackedState) + end, + NewState = process_machine_(Options, EventSinkID, Impact, State), + {{reply, ok}, sleep, state_to_opaque(NewState)}. + +-spec process_machine_(ns_options(), mg_core:id(), mg_core_machine:processor_impact(), state()) -> + state(). +process_machine_(_, _, {init, undefined}, State) -> + State; +process_machine_(_, _, {repair, undefined}, State) -> + State; +process_machine_( + Options, + EventSinkID, + {call, {add_events, SourceNS, SourceMachineID, Events}}, + State +) -> + {SinkEvents, NewState} = generate_sink_events(SourceNS, SourceMachineID, Events, State), + ok = store_sink_events(Options, EventSinkID, SinkEvents), + NewState. + +%% + +-spec store_sink_events(ns_options(), mg_core:id(), [event()]) -> ok. +store_sink_events(Options, EventSinkID, SinkEvents) -> + lists:foreach( + fun(SinkEvent) -> + store_event(Options, EventSinkID, SinkEvent) + end, + SinkEvents + ). + +-spec store_event(ns_options(), mg_core:id(), event()) -> ok. +store_event(Options, EventSinkID, SinkEvent) -> + {Key, Value} = sink_event_to_kv(EventSinkID, SinkEvent), + _ = mg_core_storage:put( + events_storage_options(Options), + Key, + undefined, + Value, + [] + ), + ok. + +-spec get_state(ns_options(), mg_core:id()) -> state(). +get_state(Options, EventSinkID) -> + try + #{state := State} = mg_core_machine:get(machine_options(Options), EventSinkID), + opaque_to_state(State) + catch + throw:{logic, machine_not_found} -> + new_state() + end. + +-spec new_state() -> state(). +new_state() -> + #{events_range => undefined}. + +-spec machine_options(ns_options()) -> mg_core_machine:options(). +machine_options( + Options = #{ + namespace := Namespace, storage := Storage, worker := Worker, pulse := Pulse + } +) -> + #{ + namespace => mg_core_utils:concatenate_namespaces(Namespace, <<"machines">>), + processor => {?MODULE, Options}, + storage => Storage, + worker => Worker, + pulse => Pulse + }. + +-spec events_storage_options(ns_options()) -> mg_core_storage:options(). +events_storage_options(#{namespace := NS, events_storage := StorageOptions, pulse := Handler}) -> + {Mod, Options} = mg_core_utils:separate_mod_opts(StorageOptions, #{}), + {Mod, Options#{name => {NS, ?MODULE, events}, pulse => Handler}}. + +%% + +-spec generate_sink_events(mg_core:ns(), mg_core:id(), [mg_core_events:event()], state()) -> + {[event()], state()}. +generate_sink_events(SourceNS, SourceMachineID, Events, State = #{events_range := EventsRange}) -> + Bodies = [generate_sink_event_body(SourceNS, SourceMachineID, Event) || Event <- Events], + {SinkEvents, NewEventsRange} = mg_core_events:generate_events_with_range(Bodies, EventsRange), + {SinkEvents, State#{events_range := NewEventsRange}}. + +-spec generate_sink_event_body(mg_core:ns(), mg_core:id(), mg_core_events:event()) -> event_body(). +generate_sink_event_body(SourceNS, SourceMachineID, Event) -> + #{ + source_ns => SourceNS, + source_id => SourceMachineID, + event => Event + }. + +%% +%% packer to opaque +%% +-spec state_to_opaque(state()) -> mg_core_storage:opaque(). +state_to_opaque(#{events_range := EventsRange}) -> + [1, mg_core_events:events_range_to_opaque(EventsRange)]. + +-spec opaque_to_state(mg_core_storage:opaque()) -> state(). +opaque_to_state([1, EventsRange]) -> + #{ + events_range => mg_core_events:opaque_to_events_range(EventsRange) + }. + +-spec sink_event_body_to_opaque(Vsn :: integer(), event_body()) -> mg_core_storage:opaque(). +sink_event_body_to_opaque(_Vsn, #{ + source_ns := SourceNS, + source_id := SourceMachineID, + event := Event +}) -> + [1, SourceNS, SourceMachineID, mg_core_events:event_to_opaque(Event)]. + +-spec opaque_to_sink_event_body(Vsn :: integer(), mg_core_storage:opaque()) -> event_body(). +opaque_to_sink_event_body(_Vsn, [1, SourceNS, SourceMachineID, Event]) -> + #{ + source_ns => SourceNS, + source_id => SourceMachineID, + event => mg_core_events:opaque_to_event(Event) + }. + +-spec sink_event_to_kv(mg_core:id(), event()) -> mg_core_storage:kv(). +sink_event_to_kv(EventSinkID, Event) -> + mg_core_events:add_machine_id( + EventSinkID, + mg_core_events:event_to_kv(Event, fun sink_event_body_to_opaque/2) + ). + +-spec kv_to_sink_event(mg_core:id(), mg_core_storage:kv()) -> event(). +kv_to_sink_event(EventSinkID, Kvs) -> + mg_core_events:kv_to_event( + mg_core_events:remove_machine_id(EventSinkID, Kvs), + fun opaque_to_sink_event_body/2 + ). diff --git a/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl b/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl new file mode 100644 index 0000000..249d586 --- /dev/null +++ b/apps/mg_core/test/mg_core_events_sink_machine_SUITE.erl @@ -0,0 +1,169 @@ +%%% +%%% Copyright 2017 RBKmoney +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +-module(mg_core_events_sink_machine_SUITE). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%% tests descriptions +-export([all/0]). +-export([groups/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). + +%% tests +-export([add_events_test/1]). +-export([get_unexisted_event_test/1]). +-export([not_idempotent_add_get_events_test/1]). + +%% Pulse +-export([handle_beat/2]). + +%% +%% tests descriptions +%% +-type group_name() :: atom(). +-type test_name() :: atom(). +-type config() :: [{atom(), _}]. + +-spec all() -> [test_name() | {group, group_name()}]. +all() -> + [ + {group, main} + ]. + +-spec groups() -> [{group_name(), list(_), [test_name()]}]. +groups() -> + [ + {main, [sequence], [ + add_events_test, + get_unexisted_event_test, + not_idempotent_add_get_events_test + ]} + ]. + +%% +%% starting/stopping +%% +-spec init_per_suite(config()) -> config(). +init_per_suite(C) -> + % dbg:tracer(), dbg:p(all, c), + % dbg:tpl({mg_core_events_sink_machine, '_', '_'}, x), + Apps = mg_cth:start_applications([mg_core]), + Pid = start_event_sink(event_sink_ns_options()), + true = erlang:unlink(Pid), + {Events, _} = mg_core_events:generate_events_with_range( + [{#{}, Body} || Body <- [1, 2, 3]], + undefined + ), + [{apps, Apps}, {pid, Pid}, {events, Events} | C]. + +-spec end_per_suite(config()) -> ok. +end_per_suite(C) -> + ok = proc_lib:stop(?config(pid, C)), + mg_cth:stop_applications(?config(apps, C)). + +%% +%% tests +%% +-define(ES_ID, <<"event_sink_id">>). +-define(SOURCE_NS, <<"source_ns">>). +-define(SOURCE_ID, <<"source_id">>). + +-spec add_events_test(config()) -> _. +add_events_test(C) -> + ?assertEqual(ok, add_events(C)). + +-spec get_unexisted_event_test(config()) -> _. +get_unexisted_event_test(_C) -> + [] = mg_core_events_sink_machine:get_history( + event_sink_ns_options(), + ?ES_ID, + {42, undefined, forward} + ). + +-spec not_idempotent_add_get_events_test(config()) -> _. +not_idempotent_add_get_events_test(C) -> + ?assertEqual(ok, add_events(C)), + ConfigEvents = [ + #{event => Event, source_ns => ?SOURCE_NS, source_id => ?SOURCE_ID} + || Event <- ?config(events, C) + ], + ExpectedEvents = lists:zip( + lists:seq(1, erlang:length(?config(events, C)) * 2), + ConfigEvents ++ ConfigEvents + ), + ?assertEqual(ExpectedEvents, get_history(C)). + +%% +%% utils +%% + +-spec add_events(config()) -> _. +add_events(C) -> + mg_core_events_sink_machine:add_events( + event_sink_options(), + ?SOURCE_NS, + ?SOURCE_ID, + ?config(events, C), + null, + mg_core_deadline:default() + ). + +-spec get_history(config()) -> _. +get_history(_C) -> + HRange = {undefined, undefined, forward}, + % _ = ct:pal("~p", [PreparedEvents]), + EventsSinkEvents = mg_core_events_sink_machine:get_history( + event_sink_ns_options(), + ?ES_ID, + HRange + ), + [{ID, Body} || #{id := ID, body := Body} <- EventsSinkEvents]. + +-spec start_event_sink(mg_core_events_sink_machine:ns_options()) -> pid(). +start_event_sink(Options) -> + mg_core_utils:throw_if_error( + genlib_adhoc_supervisor:start_link( + #{strategy => one_for_all}, + [mg_core_events_sink_machine:child_spec(Options, event_sink)] + ) + ). + +-spec event_sink_ns_options() -> mg_core_events_sink_machine:ns_options(). +event_sink_ns_options() -> + #{ + namespace => ?ES_ID, + storage => mg_core_storage_memory, + worker => #{ + registry => mg_core_procreg_global + }, + pulse => ?MODULE, + default_processing_timeout => 1000, + events_storage => mg_core_storage_memory + }. + +-spec event_sink_options() -> mg_core_events_sink_machine:options(). +event_sink_options() -> + NSOptions = event_sink_ns_options(), + NSOptions#{ + name => machine, + machine_id => ?ES_ID + }. + +-spec handle_beat(_, mg_core_pulse:beat()) -> ok. +handle_beat(_, Beat) -> + ct:pal("~p", [Beat]). diff --git a/apps/mg_cth/src/mg_cth_configurator.erl b/apps/mg_cth/src/mg_cth_configurator.erl index 8e8a4ba..397ba73 100644 --- a/apps/mg_cth/src/mg_cth_configurator.erl +++ b/apps/mg_cth/src/mg_cth_configurator.erl @@ -21,8 +21,15 @@ event_stash_size := non_neg_integer() }. +-type event_sink_ns() :: #{ + default_processing_timeout := timeout(), + storage => mg_core_storage:options(), + worker => mg_core_worker:options() +}. + -type config() :: #{ woody_server := mg_woody:woody_server(), + event_sink_ns := event_sink_ns(), namespaces := #{mg_core:ns() => events_machines()}, quotas => [mg_core_quota_worker:options()] }. @@ -32,21 +39,30 @@ -spec construct_child_specs(config() | undefined) -> _. construct_child_specs(undefined) -> []; -construct_child_specs(#{woody_server := WoodyServer, namespaces := Namespaces} = Config) -> +construct_child_specs( + #{ + woody_server := WoodyServer, + event_sink_ns := EventSinkNS, + namespaces := Namespaces + } = Config +) -> Quotas = maps:get(quotas, Config, []), QuotasChSpec = quotas_child_specs(Quotas, quota), - EventMachinesChSpec = events_machines_child_specs(Namespaces), + EventSinkChSpec = event_sink_ns_child_spec(EventSinkNS, event_sink), + EventMachinesChSpec = events_machines_child_specs(Namespaces, EventSinkNS), WoodyServerChSpec = mg_woody:child_spec( woody_server, #{ woody_server => WoodyServer, - automaton => api_automaton_options(Namespaces), + automaton => api_automaton_options(Namespaces, EventSinkNS), + event_sink => api_event_sink_options(Namespaces, EventSinkNS), pulse => mg_cth_pulse } ), lists:flatten([ + EventSinkChSpec, WoodyServerChSpec, QuotasChSpec, EventMachinesChSpec @@ -61,18 +77,24 @@ quotas_child_specs(Quotas, ChildID) -> || Options <- Quotas ]. --spec events_machines_child_specs(_) -> [supervisor:child_spec()]. -events_machines_child_specs(NSs) -> +-spec events_machines_child_specs(_, _) -> [supervisor:child_spec()]. +events_machines_child_specs(NSs, EventSinkNS) -> [ - mg_core_events_machine:child_spec(events_machine_options(NS, NSs), binary_to_atom(NS, utf8)) + mg_core_events_machine:child_spec( + events_machine_options(NS, NSs, EventSinkNS), + binary_to_atom(NS, utf8) + ) || NS <- maps:keys(NSs) ]. --spec events_machine_options(mg_core:ns(), _) -> mg_core_events_machine:options(). -events_machine_options(NS, NSs) -> +-spec events_machine_options(mg_core:ns(), _, event_sink_ns()) -> mg_core_events_machine:options(). +events_machine_options(NS, NSs, EventSinkNS) -> NSConfigs = maps:get(NS, NSs), #{processor := ProcessorConfig, storage := Storage} = NSConfigs, - EventSinks = [event_sink_options(SinkConfig) || SinkConfig <- maps:get(event_sinks, NSConfigs, [])], + EventSinks = [ + event_sink_options(SinkConfig, EventSinkNS) + || SinkConfig <- maps:get(event_sinks, NSConfigs, []) + ], EventsStorage = sub_storage_options(<<"events">>, Storage), #{ namespace => NS, @@ -112,14 +134,14 @@ machine_options(NS, Config) -> suicide_probability => maps:get(suicide_probability, Config, undefined) }. --spec api_automaton_options(_) -> mg_woody_automaton:options(). -api_automaton_options(NSs) -> +-spec api_automaton_options(_, event_sink_ns()) -> mg_woody_automaton:options(). +api_automaton_options(NSs, EventSinkNS) -> maps:fold( fun(NS, ConfigNS, Options) -> Options#{ NS => maps:merge( #{ - machine => events_machine_options(NS, NSs) + machine => events_machine_options(NS, NSs, EventSinkNS) }, modernizer_options(maps:get(modernizer, ConfigNS, undefined)) ) @@ -129,13 +151,48 @@ api_automaton_options(NSs) -> NSs ). --spec event_sink_options(mg_core_events_sink:handler()) -> mg_core_events_sink:handler(). -event_sink_options({mg_core_events_sink_kafka, EventSinkConfig}) -> +-spec event_sink_options(mg_core_events_sink:handler(), _) -> mg_core_events_sink:handler(). +event_sink_options({mg_core_events_sink_machine, EventSinkConfig}, EvSinks) -> + EventSinkNS = event_sink_namespace_options(EvSinks), + {mg_core_events_sink_machine, maps:merge(EventSinkNS, EventSinkConfig)}; +event_sink_options({mg_core_events_sink_kafka, EventSinkConfig}, _Config) -> {mg_core_events_sink_kafka, EventSinkConfig#{ pulse => pulse(), encoder => fun mg_woody_event_sink:serialize/3 }}. +-spec event_sink_ns_child_spec(_, atom()) -> supervisor:child_spec(). +event_sink_ns_child_spec(EventSinkNS, ChildID) -> + mg_core_events_sink_machine:child_spec(event_sink_namespace_options(EventSinkNS), ChildID). + +-spec api_event_sink_options(_, _) -> mg_woody_event_sink:options(). +api_event_sink_options(NSs, EventSinkNS) -> + EventSinkMachines = collect_event_sink_machines(NSs), + {EventSinkMachines, event_sink_namespace_options(EventSinkNS)}. + +-spec collect_event_sink_machines(_) -> [mg_core:id()]. +collect_event_sink_machines(NSs) -> + NSConfigs = maps:values(NSs), + EventSinks = ordsets:from_list([ + maps:get(machine_id, SinkConfig) + || NSConfig <- NSConfigs, + {mg_core_events_sink_machine, SinkConfig} <- maps:get(event_sinks, NSConfig, []) + ]), + ordsets:to_list(EventSinks). + +-spec event_sink_namespace_options(_) -> mg_core_events_sink_machine:ns_options(). +event_sink_namespace_options(#{storage := Storage} = EventSinkNS) -> + NS = <<"_event_sinks">>, + MachinesStorage = sub_storage_options(<<"machines">>, Storage), + EventsStorage = sub_storage_options(<<"events">>, Storage), + EventSinkNS#{ + namespace => NS, + pulse => pulse(), + storage => MachinesStorage, + events_storage => EventsStorage, + worker => worker_manager_options(EventSinkNS) + }. + -spec worker_manager_options(map()) -> mg_core_workers_manager:ns_options(). worker_manager_options(Config) -> maps:merge( diff --git a/apps/mg_woody/src/mg_woody.erl b/apps/mg_woody/src/mg_woody.erl index f3373b4..7d3eb8d 100644 --- a/apps/mg_woody/src/mg_woody.erl +++ b/apps/mg_woody/src/mg_woody.erl @@ -40,9 +40,12 @@ -type automaton() :: mg_woody_automaton:options(). +-type event_sink() :: mg_woody_event_sink:options(). + -type options() :: #{ pulse := module(), automaton := automaton(), + event_sink := event_sink(), woody_server := woody_server(), additional_routes => [woody_server_thrift_http_handler:route(any())] }. @@ -52,6 +55,7 @@ child_spec(ID, Options) -> #{ woody_server := WoodyConfig, automaton := Automaton, + event_sink := EventSink, pulse := PulseHandler } = Options, WoodyOptions = maps:merge( @@ -62,7 +66,8 @@ child_spec(ID, Options) -> port => maps:get(port, WoodyConfig), event_handler => {mg_woody_event_handler, PulseHandler}, handlers => [ - mg_woody_automaton:handler(Automaton) + mg_woody_automaton:handler(Automaton), + mg_woody_event_sink:handler(EventSink) ] }, genlib_map:compact(#{ diff --git a/apps/mg_woody/src/mg_woody_event_sink.erl b/apps/mg_woody/src/mg_woody_event_sink.erl index f2b0967..e2303de 100644 --- a/apps/mg_woody/src/mg_woody_event_sink.erl +++ b/apps/mg_woody/src/mg_woody_event_sink.erl @@ -17,7 +17,6 @@ -module(mg_woody_event_sink). -include_lib("mg_proto/include/mg_proto_event_sink_thrift.hrl"). --include_lib("mg_proto/include/mg_proto_state_processing_thrift.hrl"). %% API -export([handler/1]). @@ -31,7 +30,7 @@ %% %% API %% --type options() :: {[mg_core:id()], _NSOptions}. +-type options() :: {[mg_core:id()], mg_core_events_sink_machine:ns_options()}. -spec handler(options()) -> mg_woody_utils:woody_handler(). handler(Options) -> @@ -40,10 +39,33 @@ handler(Options) -> %% %% woody handler %% --spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> no_return(). +-spec handle_function(woody:func(), woody:args(), woody_context:ctx(), options()) -> + {ok, _Result} | no_return(). -handle_function('GetHistory', {_EventSinkID, _Range}, _WoodyContext, {_AvaliableEventSinks, _Options}) -> - erlang:throw(#mg_stateproc_EventSinkNotFound{}). +handle_function('GetHistory', {EventSinkID, Range}, WoodyContext, {AvaliableEventSinks, Options}) -> + ReqCtx = mg_woody_utils:woody_context_to_opaque(WoodyContext), + DefaultTimeout = maps:get(default_processing_timeout, Options), + DefaultDeadline = mg_core_deadline:from_timeout(DefaultTimeout), + Deadline = mg_woody_utils:get_deadline(WoodyContext, DefaultDeadline), + SinkHistory = + mg_woody_utils:handle_error( + #{ + namespace => undefined, + machine_id => EventSinkID, + request_context => ReqCtx, + deadline => Deadline + }, + fun() -> + _ = check_event_sink(AvaliableEventSinks, EventSinkID), + mg_core_events_sink_machine:get_history( + Options, + EventSinkID, + mg_woody_packer:unpack(history_range, Range) + ) + end, + pulse(Options) + ), + {ok, mg_woody_packer:pack(sink_history, SinkHistory)}. %% %% events_sink events encoder @@ -74,3 +96,20 @@ serialize(SourceNS, SourceID, Event) -> {error, Reason} -> erlang:error({?MODULE, Reason}) end. + +%% +%% Internals +%% + +-spec check_event_sink([mg_core:id()], mg_core:id()) -> ok | no_return(). +check_event_sink(AvaliableEventSinks, EventSinkID) -> + case lists:member(EventSinkID, AvaliableEventSinks) of + true -> + ok; + false -> + throw({logic, event_sink_not_found}) + end. + +-spec pulse(mg_core_events_sink_machine:ns_options()) -> mg_core_pulse:handler(). +pulse(#{pulse := Pulse}) -> + Pulse. diff --git a/apps/mg_woody/test/mg_event_sink_client.erl b/apps/mg_woody/test/mg_event_sink_client.erl new file mode 100644 index 0000000..469f5b6 --- /dev/null +++ b/apps/mg_woody/test/mg_event_sink_client.erl @@ -0,0 +1,51 @@ +%%% +%%% Copyright 2020 Valitydev +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +-module(mg_event_sink_client). + +%% API +-export_type([options/0]). +-export([get_history/3]). + +%% +%% API +%% +-type options() :: URL :: string(). + +-spec get_history(options(), mg_core:id(), mg_proto_state_processing_thrift:'HistoryRange'()) -> + mg_proto_state_processing_thrift:'SinkHistory'(). +get_history(BaseURL, EventSinkID, Range) -> + call_service(BaseURL, 'GetHistory', {EventSinkID, Range}). + +%% +%% local +%% +-spec call_service(_BaseURL, atom(), woody:args()) -> _. +call_service(BaseURL, Function, Args) -> + WR = woody_client:call( + {{mg_proto_state_processing_thrift, 'EventSink'}, Function, Args}, + #{ + url => BaseURL ++ "/v1/event_sink", + event_handler => {mg_woody_event_handler, mg_cth_pulse} + }, + woody_context:new() + ), + case WR of + {ok, R} -> + R; + {exception, Exception} -> + erlang:throw(Exception) + end. diff --git a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl index f08743f..58fee70 100644 --- a/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_modernizer_tests_SUITE.erl @@ -190,6 +190,10 @@ mg_woody_config(Name, C) -> } end ) + }, + event_sink_ns => #{ + storage => mg_core_storage_memory, + default_processing_timeout => 5000 } }. diff --git a/apps/mg_woody/test/mg_stress_SUITE.erl b/apps/mg_woody/test/mg_stress_SUITE.erl index 29bf3aa..6d77187 100644 --- a/apps/mg_woody/test/mg_stress_SUITE.erl +++ b/apps/mg_woody/test/mg_stress_SUITE.erl @@ -26,6 +26,7 @@ -export([stress_test/1]). -define(NS, <<"NS">>). +-define(ES_ID, <<"test_event_sink">>). -type test_name() :: atom(). -type config() :: [{atom(), _}]. @@ -82,6 +83,7 @@ init_per_suite(C) -> ns => ?NS, retry_strategy => genlib_retry:new_strategy({exponential, 5, 2, 1000}) }}, + {event_sink_options, "http://localhost:8022"}, {processor_pid, ProcessorPid} | C ]. @@ -120,9 +122,15 @@ mg_woody_config(_C) -> timers => #{} }, retries => #{}, - event_sinks => [], + event_sinks => [ + {mg_core_events_sink_machine, #{name => default, machine_id => ?ES_ID}} + ], event_stash_size => 10 } + }, + event_sink_ns => #{ + storage => mg_core_storage_memory, + default_processing_timeout => 5000 } }. diff --git a/apps/mg_woody/test/mg_woody_tests_SUITE.erl b/apps/mg_woody/test/mg_woody_tests_SUITE.erl index f317aab..7c088ad 100644 --- a/apps/mg_woody/test/mg_woody_tests_SUITE.erl +++ b/apps/mg_woody/test/mg_woody_tests_SUITE.erl @@ -70,6 +70,14 @@ -export([success_call_with_deadline/1]). -export([timeout_call_with_deadline/1]). +%% event_sink group tests +-export([event_sink_get_empty_history/1]). +-export([event_sink_get_not_empty_history/1]). +-export([event_sink_get_last_event/1]). +-export([event_sink_incorrect_event_id/1]). +-export([event_sink_incorrect_sink_id/1]). +-export([event_sink_lots_events_ordering/1]). + %% -export([config_with_multiple_event_sinks/1]). @@ -88,6 +96,7 @@ all() -> {group, history}, {group, repair}, {group, timers}, + {group, event_sink}, {group, deadline}, config_with_multiple_event_sinks ]. @@ -96,6 +105,7 @@ all() -> groups() -> [ % TODO проверить отмену таймера + % TODO проверить отдельно get_history {base, [sequence], [ namespace_not_found, machine_id_not_found, @@ -150,6 +160,16 @@ groups() -> machine_start, success_call_with_deadline, timeout_call_with_deadline + ]}, + + {event_sink, [sequence], [ + event_sink_get_empty_history, + event_sink_get_not_empty_history, + event_sink_get_last_event, + % TODO event_not_found + % event_sink_incorrect_event_id, + event_sink_incorrect_sink_id, + event_sink_lots_events_ordering ]} ]. @@ -215,6 +235,7 @@ init_per_group(C) -> ns => ?NS, retry_strategy => genlib_retry:linear(3, 1) }}, + {event_sink_options, "http://localhost:8022"}, {processor_pid, ProcessorPid} | C ]. @@ -332,6 +353,10 @@ mg_woody_config(C) -> % сейчас же можно иногда включать и смотреть % suicide_probability => 0.1, event_sinks => [ + {mg_core_events_sink_machine, #{ + name => machine, + machine_id => ?ES_ID + }}, {mg_core_events_sink_kafka, #{ name => kafka, topic => ?ES_ID, @@ -339,6 +364,10 @@ mg_woody_config(C) -> }} ] } + }, + event_sink_ns => #{ + storage => mg_core_storage_memory, + default_processing_timeout => 5000 } }. @@ -594,6 +623,95 @@ success_call_with_deadline(C) -> <<"sleep">> = mg_cth_automaton_client:call(Options, ?ID, <<"sleep">>, Deadline). %% +%% event_sink group test +%% +-spec event_sink_get_empty_history(config()) -> _. +event_sink_get_empty_history(C) -> + [] = mg_event_sink_client:get_history(es_opts(C), ?ES_ID, #mg_stateproc_HistoryRange{ + direction = forward + }). + +-spec event_sink_get_not_empty_history(config()) -> _. +event_sink_get_not_empty_history(C) -> + ok = start_machine(C, ?ID), + + _ = create_events(3, C, ?ID), + + AllEvents = mg_event_sink_client:get_history(es_opts(C), ?ES_ID, #mg_stateproc_HistoryRange{ + direction = forward + }), + GeneratedEvents = [ + E + || E = #mg_stateproc_SinkEvent{ + source_id = ?ID, + source_ns = ?NS, + event = #mg_stateproc_Event{} + } <- AllEvents + ], + ?assert(erlang:length(GeneratedEvents) >= 3). + +-spec event_sink_get_last_event(config()) -> _. +event_sink_get_last_event(C) -> + [ + #mg_stateproc_SinkEvent{ + id = 3, + source_id = _ID, + source_ns = _NS, + event = #mg_stateproc_Event{} + } + ] = + mg_event_sink_client:get_history(es_opts(C), ?ES_ID, #mg_stateproc_HistoryRange{ + direction = backward, + limit = 1 + }). + +-spec event_sink_incorrect_event_id(config()) -> _. +event_sink_incorrect_event_id(C) -> + #mg_stateproc_EventNotFound{} = + (catch mg_event_sink_client:get_history(es_opts(C), ?ES_ID, #mg_stateproc_HistoryRange{ + 'after' = 42 + })). + +-spec event_sink_incorrect_sink_id(config()) -> _. +event_sink_incorrect_sink_id(C) -> + HRange = #mg_stateproc_HistoryRange{}, + #mg_stateproc_EventSinkNotFound{} = + (catch mg_event_sink_client:get_history(es_opts(C), <<"incorrect_event_sink_id">>, HRange)). + +-spec event_sink_lots_events_ordering(config()) -> _. +event_sink_lots_events_ordering(C) -> + MachineID = genlib:unique(), + ok = start_machine(C, MachineID), + N = 20, + _ = create_events(N, C, MachineID), + + HRange = #mg_stateproc_HistoryRange{direction = forward}, + Events = mg_event_sink_client:get_history(es_opts(C), ?ES_ID, HRange), + % event_sink не гарантирует отсутствия дублей событий, но гарантирует + % сохранения порядка событий отдельной машины. + lists:foldl( + fun(Ev, LastEvIDMap) -> + #mg_stateproc_SinkEvent{ + source_id = Machine, + source_ns = NS, + event = Body + } = Ev, + Key = {NS, Machine}, + LastID = maps:get(Key, LastEvIDMap, 0), + case Body#mg_stateproc_Event.id of + ID when ID =:= LastID + 1 -> + LastEvIDMap#{Key => ID}; + ID when ID =< LastID -> + % Дубликат одного из уже известных событий + LastEvIDMap; + ID -> + % Нарушен порядок событий, получился пропуск + erlang:error({invalid_order, ID, LastID}, [Ev, LastEvIDMap]) + end + end, + #{}, + Events + ). -spec config_with_multiple_event_sinks(config()) -> _. config_with_multiple_event_sinks(_C) -> @@ -613,11 +731,7 @@ config_with_multiple_event_sinks(_C) -> }, retries => #{}, event_sinks => [ - {mg_core_events_sink_kafka, #{ - name => kafka, - topic => <<"mg_core_event_sink">>, - client => mg_cth:config(kafka_client_name) - }} + {mg_core_events_sink_machine, #{name => default, machine_id => <<"SingleES">>}} ] }, <<"2">> => #{ @@ -633,10 +747,9 @@ config_with_multiple_event_sinks(_C) -> }, retries => #{}, event_sinks => [ - {mg_core_events_sink_kafka, #{ - name => kafka_other, - topic => <<"mg_core_event_sink_2">>, - client => mg_cth:config(kafka_client_name) + {mg_core_events_sink_machine, #{ + name => machine, + machine_id => <<"SingleES">> }}, {mg_core_events_sink_kafka, #{ name => kafka, @@ -645,6 +758,10 @@ config_with_multiple_event_sinks(_C) -> }} ] } + }, + event_sink_ns => #{ + storage => mg_core_storage_memory, + default_processing_timeout => 5000 } }, Apps = mg_cth:start_applications([ @@ -674,9 +791,25 @@ start_machine(C, ID, Args) -> ok end. +-spec create_event(mg_core_storage:opaque(), config(), mg_core:id()) -> _. +create_event(Event, C, ID) -> + mg_cth_automaton_client:call(automaton_options(C), ID, Event). + +-spec create_events(integer(), config(), mg_core:id()) -> _. +create_events(N, C, ID) -> + lists:foreach( + fun(I) -> + I = create_event([<<"event">>, I], C, ID) + end, + lists:seq(1, N) + ). + -spec automaton_options(config()) -> _. automaton_options(C) -> ?config(automaton_options, C). +-spec es_opts(config()) -> _. +es_opts(C) -> ?config(event_sink_options, C). + -spec no_timeout_automaton_options(config()) -> _. no_timeout_automaton_options(C) -> Options0 = automaton_options(C), diff --git a/config/config.yaml b/config/config.yaml index 8458dc2..9d9d1e5 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -175,6 +175,9 @@ namespaces: # only for testing, default 0 # suicide_probability: 0.1 event_sinks: + machine: + type: machine + machine_id: main_event_sink kafka: type: kafka client: default_kafka_client diff --git a/rel_scripts/configurator.escript b/rel_scripts/configurator.escript index e79a67e..7e570da 100755 --- a/rel_scripts/configurator.escript +++ b/rel_scripts/configurator.escript @@ -216,6 +216,7 @@ machinegun(YamlConfig) -> {health_check, health_check(YamlConfig)}, {quotas, quotas(YamlConfig)}, {namespaces, namespaces(YamlConfig)}, + {event_sink_ns, event_sink_ns(YamlConfig)}, {pulse, pulse(YamlConfig)}, {cluster, cluster(YamlConfig)} ]. @@ -539,9 +540,23 @@ notification_scheduler(Share, Config) -> timeout(Name, Config, Default, Unit) -> ?C:time_interval(?C:conf([Name], Config, Default), Unit). +event_sink_ns(YamlConfig) -> + #{ + registry => procreg(YamlConfig), + storage => storage(<<"_event_sinks">>, YamlConfig), + worker => #{registry => procreg(YamlConfig)}, + duplicate_search_batch => 1000, + default_processing_timeout => ?C:milliseconds(<<"30s">>) + }. + event_sink({Name, ESYamlConfig}) -> event_sink(?C:atom(?C:conf([type], ESYamlConfig)), Name, ESYamlConfig). +event_sink(machine, Name, ESYamlConfig) -> + {mg_core_events_sink_machine, #{ + name => ?C:atom(Name), + machine_id => ?C:conf([machine_id], ESYamlConfig) + }}; event_sink(kafka, Name, ESYamlConfig) -> {mg_core_events_sink_kafka, #{ name => ?C:atom(Name),