Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common/protos/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ message Ev_TaskEvent {
string name = 1; // task name, based on the name of the task class
string taskid = 2; // task id, unique
string state = 3; // state machine state for this task
string status = 4; // active/inactive etc.
string status = 4; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go.
string hostname = 5;
string className = 6; // name of the task class from which this task was spawned
Traits traits = 7;
Expand All @@ -99,7 +99,7 @@ message Ev_CallEvent {

message Ev_RoleEvent {
string name = 1; // role name
string status = 2; // active/inactive etc., derived from the state of child tasks, calls or other roles
string status = 2; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go. Derived from the state of child tasks, calls or other roles
string state = 3; // state machine state for this role
string rolePath = 4; // path to this role within the environment
string environmentId = 5;
Expand All @@ -123,7 +123,7 @@ message Ev_RunEvent {
string error = 4;
string transition = 5;
OpStatus transitionStatus = 6;
map<string, string> vars = 7;
reserved 7; // 7 was used for `vars` field that was removed
common.User lastRequestUser = 8;
}

Expand Down
8 changes: 0 additions & 8 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
},
fsm.Callbacks{
"before_event": func(_ context.Context, e *fsm.Event) {

env.Mu.Lock()
env.currentTransition = e.Event
env.Mu.Unlock()
Expand Down Expand Up @@ -224,7 +223,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Error: "",
Transition: e.Event,
TransitionStatus: pb.OpStatus_STARTED,
Vars: nil,
LastRequestUser: env.GetLastRequestUser(),
}, runStartTime)

Expand Down Expand Up @@ -265,7 +263,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Error: "",
Transition: e.Event,
TransitionStatus: pb.OpStatus_STARTED,
Vars: nil,
LastRequestUser: env.GetLastRequestUser(),
}, runEndTime)

Expand All @@ -287,7 +284,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Error: "",
Transition: e.Event,
TransitionStatus: pb.OpStatus_STARTED,
Vars: nil,
LastRequestUser: env.GetLastRequestUser(),
}, runEndTime)

Expand Down Expand Up @@ -552,7 +548,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Error: "",
Transition: e.Event,
TransitionStatus: pb.OpStatus_DONE_OK,
Vars: nil,
LastRequestUser: env.GetLastRequestUser(),
}
if e.Err != nil {
Expand All @@ -574,7 +569,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Error: "",
Transition: e.Event,
TransitionStatus: pb.OpStatus_DONE_OK,
Vars: nil,
LastRequestUser: env.GetLastRequestUser(),
}
if e.Err != nil {
Expand All @@ -600,7 +594,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Error: "",
Transition: e.Event,
TransitionStatus: pb.OpStatus_DONE_OK,
Vars: nil,
LastRequestUser: env.GetLastRequestUser(),
}, runEndCompletionTime)

Expand Down Expand Up @@ -648,7 +641,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
}

func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) {

// Starting point: get all hooks to be started for the current trigger
hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger)
callsMapForAwait := env.callsPendingAwait[trigger]
Expand Down
13 changes: 4 additions & 9 deletions core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ type Manager struct {
pendingStateChangeCh map[uid.ID]chan *event.TasksStateChangedEvent
}

var (
instance *Manager
)
var instance *Manager

func ManagerInstance() *Manager {
return instance
Expand Down Expand Up @@ -145,7 +143,7 @@ func NewEnvManager(tm *task.Manager, incomingEventCh chan event.Event) *Manager
// If there is no pending environment teardown, it means that the released task stopped
// unexpectedly. In that case, the environment should get torn-down only if the task
// is critical.
var releaseCriticalTask = false
releaseCriticalTask := false
for _, v := range typedEvent.GetTaskIds() {
if tm.GetTask(v) != nil {
if tm.GetTask(v).GetTraits().Critical == true {
Expand Down Expand Up @@ -407,7 +405,7 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]

err = env.TryTransition(NewDeployTransition(
envs.taskman,
nil, //roles,
nil, // roles,
nil),
)

Expand Down Expand Up @@ -647,7 +645,6 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
Error: "",
Transition: "TEARDOWN",
TransitionStatus: evpb.OpStatus_STARTED,
Vars: nil,
}, runEndTime)
} else {
log.WithField("partition", environmentId.String()).
Expand All @@ -667,7 +664,6 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
Error: "",
Transition: "TEARDOWN",
TransitionStatus: evpb.OpStatus_STARTED,
Vars: nil,
}, runEndCompletionTime)
} else {
log.WithField("partition", environmentId.String()).
Expand Down Expand Up @@ -1172,7 +1168,6 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {

// FIXME: this function should be deduplicated with CreateEnvironment so detector resource matching works correctly
func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[string]string, newId uid.ID, sub Subscription) {

envUserVars := make(map[string]string)
workflowUserVars := make(map[string]string)
for k, v := range userVars {
Expand Down Expand Up @@ -1296,7 +1291,7 @@ func (envs *Manager) CreateAutoEnvironment(workflowPath string, userVars map[str

err = env.TryTransition(NewDeployTransition(
envs.taskman,
nil, //roles,
nil, // roles,
nil),
)
if err == nil {
Expand Down
24 changes: 18 additions & 6 deletions core/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import (
"github.com/AliceO2Group/Control/core/task/taskclass"
"github.com/AliceO2Group/Control/core/task/taskclass/port"
"github.com/AliceO2Group/Control/core/the"
"github.com/mesos/mesos-go/api/v1/lib"
mesos "github.com/mesos/mesos-go/api/v1/lib"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -112,7 +112,7 @@ type Task struct {
mu sync.RWMutex
parent parentRole
className string
//configuration Descriptor
// configuration Descriptor
name string
hostname string
agentId string
Expand Down Expand Up @@ -501,6 +501,15 @@ func (t *Task) GetEnvironmentId() uid.ID {
return t.parent.GetEnvironmentId()
}

func traitsToPbTraits(traits Traits) *evpb.Traits {
return &evpb.Traits{
Trigger: traits.Trigger,
Await: traits.Await,
Timeout: traits.Timeout,
Critical: traits.Critical,
}
}

func (t *Task) SendEvent(ev event.Event) {
if t == nil {
return
Expand All @@ -516,6 +525,7 @@ func (t *Task) SendEvent(ev event.Event) {
Hostname: t.hostname,
ClassName: t.className,
Path: t.getParentRolePath(),
Traits: traitsToPbTraits(t.GetTraits()),
}

if t.parent == nil {
Expand All @@ -527,8 +537,12 @@ func (t *Task) SendEvent(ev event.Event) {

taskEvent, ok := ev.(*event.TaskEvent)
if ok {
outgoingEvent.State = taskEvent.State
outgoingEvent.Status = taskEvent.Status
if len(taskEvent.State) != 0 {
outgoingEvent.State = taskEvent.State
}
if len(taskEvent.Status) != 0 {
outgoingEvent.Status = taskEvent.Status
}
}
the.EventWriterWithTopic(topic.Task).WriteEvent(outgoingEvent)

Expand Down Expand Up @@ -658,7 +672,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
// __ptree__:<syntax>:<key> with the plain payload.
keysToDelete := make([]string, 0)
for k, v := range propMap {

if strings.HasPrefix(v, "__ptree__:") {
keysToDelete = append(keysToDelete, k, v)
splitValue := strings.Split(v, ":")
Expand All @@ -671,7 +684,6 @@ func (t *Task) BuildPropertyMap(bindMap channel.BindMap) (propMap controlcommand
delete(propMap, k)
}
}

}
return propMap, err
}
Expand Down