From a7698f64bad2e273c98fc43fa8afa6fc85cb3344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Thu, 13 Mar 2025 14:46:25 +0100 Subject: [PATCH] [core] OCTRL-989 - removed `vars` from Ev_RunEvent - changing status and state of Task in kafka message only if event contains them - properly filling `traits` in Ev_TaskEvent from task.Traits --- common/protos/events.proto | 6 +++--- core/environment/environment.go | 8 -------- core/environment/manager.go | 13 ++++--------- core/task/task.go | 24 ++++++++++++++++++------ 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/common/protos/events.proto b/common/protos/events.proto index cc6940a1..66b6fe0c 100644 --- a/common/protos/events.proto +++ b/common/protos/events.proto @@ -78,7 +78,7 @@ message Ev_TaskEvent { string name = 1; // task name, based on the name of the task class string taskid = 2; // task id, unique string state = 3; // state machine state for this task - string status = 4; // active/inactive etc. + string status = 4; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go. string hostname = 5; string className = 6; // name of the task class from which this task was spawned Traits traits = 7; @@ -99,7 +99,7 @@ message Ev_CallEvent { message Ev_RoleEvent { string name = 1; // role name - string status = 2; // active/inactive etc., derived from the state of child tasks, calls or other roles + string status = 2; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go. Derived from the state of child tasks, calls or other roles string state = 3; // state machine state for this role string rolePath = 4; // path to this role within the environment string environmentId = 5; @@ -123,7 +123,7 @@ message Ev_RunEvent { string error = 4; string transition = 5; OpStatus transitionStatus = 6; - map vars = 7; + reserved 7; // 7 was used for `vars` field that was removed common.User lastRequestUser = 8; } diff --git a/core/environment/environment.go b/core/environment/environment.go index 68dc5649..cbfb16b2 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -160,7 +160,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, }, fsm.Callbacks{ "before_event": func(_ context.Context, e *fsm.Event) { - env.Mu.Lock() env.currentTransition = e.Event env.Mu.Unlock() @@ -224,7 +223,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Error: "", Transition: e.Event, TransitionStatus: pb.OpStatus_STARTED, - Vars: nil, LastRequestUser: env.GetLastRequestUser(), }, runStartTime) @@ -265,7 +263,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Error: "", Transition: e.Event, TransitionStatus: pb.OpStatus_STARTED, - Vars: nil, LastRequestUser: env.GetLastRequestUser(), }, runEndTime) @@ -287,7 +284,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Error: "", Transition: e.Event, TransitionStatus: pb.OpStatus_STARTED, - Vars: nil, LastRequestUser: env.GetLastRequestUser(), }, runEndTime) @@ -552,7 +548,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Error: "", Transition: e.Event, TransitionStatus: pb.OpStatus_DONE_OK, - Vars: nil, LastRequestUser: env.GetLastRequestUser(), } if e.Err != nil { @@ -574,7 +569,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Error: "", Transition: e.Event, TransitionStatus: pb.OpStatus_DONE_OK, - Vars: nil, LastRequestUser: env.GetLastRequestUser(), } if e.Err != nil { @@ -600,7 +594,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Error: "", Transition: e.Event, TransitionStatus: pb.OpStatus_DONE_OK, - Vars: nil, LastRequestUser: env.GetLastRequestUser(), }, runEndCompletionTime) @@ -648,7 +641,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, } func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) { - // Starting point: get all hooks to be started for the current trigger hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger) callsMapForAwait := env.callsPendingAwait[trigger] diff --git a/core/environment/manager.go b/core/environment/manager.go index 3a928e17..0f239b73 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -61,9 +61,7 @@ type Manager struct { pendingStateChangeCh map[uid.ID]chan *event.TasksStateChangedEvent } -var ( - instance *Manager -) +var instance *Manager func ManagerInstance() *Manager { return instance @@ -145,7 +143,7 @@ func NewEnvManager(tm *task.Manager, incomingEventCh chan event.Event) *Manager // If there is no pending environment teardown, it means that the released task stopped // unexpectedly. In that case, the environment should get torn-down only if the task // is critical. - var releaseCriticalTask = false + releaseCriticalTask := false for _, v := range typedEvent.GetTaskIds() { if tm.GetTask(v) != nil { if tm.GetTask(v).GetTraits().Critical == true { @@ -407,7 +405,7 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string] err = env.TryTransition(NewDeployTransition( envs.taskman, - nil, //roles, + nil, // roles, nil), ) @@ -647,7 +645,6 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error Error: "", Transition: "TEARDOWN", TransitionStatus: evpb.OpStatus_STARTED, - Vars: nil, }, runEndTime) } else { log.WithField("partition", environmentId.String()). @@ -667,7 +664,6 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error Error: "", Transition: "TEARDOWN", TransitionStatus: evpb.OpStatus_STARTED, - Vars: nil, }, runEndCompletionTime) } else { log.WithField("partition", environmentId.String()). @@ -1172,7 +1168,6 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) { // FIXME: this function should be deduplicated with CreateEnvironment so detector resource matching works correctly func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[string]string, newId uid.ID, sub Subscription) { - envUserVars := make(map[string]string) workflowUserVars := make(map[string]string) for k, v := range userVars { @@ -1296,7 +1291,7 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str err = env.TryTransition(NewDeployTransition( envs.taskman, - nil, //roles, + nil, // roles, nil), ) if err == nil { diff --git a/core/task/task.go b/core/task/task.go index 825dcfc6..d7420a4e 100644 --- a/core/task/task.go +++ b/core/task/task.go @@ -54,7 +54,7 @@ import ( "github.com/AliceO2Group/Control/core/task/taskclass" "github.com/AliceO2Group/Control/core/task/taskclass/port" "github.com/AliceO2Group/Control/core/the" - "github.com/mesos/mesos-go/api/v1/lib" + mesos "github.com/mesos/mesos-go/api/v1/lib" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) @@ -112,7 +112,7 @@ type Task struct { mu sync.RWMutex parent parentRole className string - //configuration Descriptor + // configuration Descriptor name string hostname string agentId string @@ -501,6 +501,15 @@ func (t *Task) GetEnvironmentId() uid.ID { return t.parent.GetEnvironmentId() } +func traitsToPbTraits(traits Traits) *evpb.Traits { + return &evpb.Traits{ + Trigger: traits.Trigger, + Await: traits.Await, + Timeout: traits.Timeout, + Critical: traits.Critical, + } +} + func (t *Task) SendEvent(ev event.Event) { if t == nil { return @@ -516,6 +525,7 @@ func (t *Task) SendEvent(ev event.Event) { Hostname: t.hostname, ClassName: t.className, Path: t.getParentRolePath(), + Traits: traitsToPbTraits(t.GetTraits()), } if t.parent == nil { @@ -527,8 +537,12 @@ func (t *Task) SendEvent(ev event.Event) { taskEvent, ok := ev.(*event.TaskEvent) if ok { - outgoingEvent.State = taskEvent.State - outgoingEvent.Status = taskEvent.Status + if len(taskEvent.State) != 0 { + outgoingEvent.State = taskEvent.State + } + if len(taskEvent.Status) != 0 { + outgoingEvent.Status = taskEvent.Status + } } the.EventWriterWithTopic(topic.Task).WriteEvent(outgoingEvent) @@ -658,7 +672,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand // __ptree__:: with the plain payload. keysToDelete := make([]string, 0) for k, v := range propMap { - if strings.HasPrefix(v, "__ptree__:") { keysToDelete = append(keysToDelete, k, v) splitValue := strings.Split(v, ":") @@ -671,7 +684,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand delete(propMap, k) } } - } return propMap, err }