diff --git a/core/environment/transition_startactivity.go b/core/environment/transition_startactivity.go index 86f13cba..21c6ec9e 100644 --- a/core/environment/transition_startactivity.go +++ b/core/environment/transition_startactivity.go @@ -81,7 +81,6 @@ func (t StartActivityTransition) do(env *Environment) (err error) { // Get a handle to the consolidated var stack of the root role of the env's workflow if wf := env.Workflow(); wf != nil { if cvs, cvsErr := wf.ConsolidatedVarStack(); cvsErr == nil { - // If bookkeeping is enabled and has fetched the LHC fill info, we can acquire it here for _, key := range []string{ "fill_info_fill_number", "fill_info_filling_scheme", diff --git a/core/integration/lhc/plugin.go b/core/integration/lhc/plugin.go index 1d5853d9..92a0b121 100644 --- a/core/integration/lhc/plugin.go +++ b/core/integration/lhc/plugin.go @@ -28,20 +28,22 @@ import ( "context" "encoding/json" "errors" - "github.com/AliceO2Group/Control/common/event/topic" - "github.com/AliceO2Group/Control/common/logger/infologger" - pb "github.com/AliceO2Group/Control/common/protos" "io" + "strconv" "strings" "sync" "time" cmnevent "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/logger/infologger" + pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/environment" "github.com/AliceO2Group/Control/core/integration" lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event" + "github.com/AliceO2Group/Control/core/workflow/callable" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) @@ -125,8 +127,18 @@ func (p *Plugin) GetEnvironmentsShortData(envIds []uid.ID) map[uid.ID]string { func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) { return make(map[string]interface{}) } -func (p *Plugin) CallStack(_ interface{}) (stack map[string]interface{}) { - return make(map[string]interface{}) +func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { + call, ok := data.(*callable.Call) + if !ok { + return + } + + stack = make(map[string]interface{}) + stack["UpdateFillInfo"] = func() (out string) { + p.updateFillInfo(call) + return + } + return } func (p *Plugin) Destroy() error { @@ -198,3 +210,83 @@ func (p *Plugin) readAndInjectLhcUpdates() { } } } + +// UpdateFillInfo: propagate latest LHC fill info into the environment's global runtime vars +func (p *Plugin) updateFillInfo(call *callable.Call) (out string) { + varStack := call.VarStack + envId, ok := varStack["environment_id"] + if !ok { + err := errors.New("cannot acquire environment ID") + log.Error(err) + + call.VarStack["__call_error_reason"] = err.Error() + call.VarStack["__call_error"] = "LHC plugin Call Stack failed" + return + } + + log := log.WithFields(logrus.Fields{ + "partition": envId, + "call": "UpdateFillInfo", + }) + + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if !ok || parentRole == nil { + log.WithField(infologger.Level, infologger.IL_Support). + Error("cannot access parent role to propagate LHC fill info") + return + } + + if p.currentState == nil { + log.WithField(infologger.Level, infologger.IL_Support). + Warn("attempted to update environment with fill info, but fill info is not available in plugin") + return + } + + // note: the following was causing very weird behaviours, which could be attributed to memory corruption. + // I did not manage to understand why can't we safely clone such a proto message. + // state := proto.Clone(p.currentState).(*pb.BeamInfo) + + p.mu.Lock() + defer p.mu.Unlock() + state := p.currentState + + parentRole.SetGlobalRuntimeVar("fill_info_beam_mode", state.BeamMode.String()) + + // If NO_BEAM, clear all other fill info and return + if state.BeamMode == pb.BeamMode_NO_BEAM { + parentRole.DeleteGlobalRuntimeVar("fill_info_fill_number") + parentRole.DeleteGlobalRuntimeVar("fill_info_filling_scheme") + parentRole.DeleteGlobalRuntimeVar("fill_info_beam_type") + parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_start_ms") + parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_end_ms") + + log.WithField(infologger.Level, infologger.IL_Devel). + Debug("NO_BEAM — cleared fill info vars and set beam mode only") + return + } + + // Otherwise, propagate latest known info + parentRole.SetGlobalRuntimeVar("fill_info_fill_number", strconv.FormatInt(int64(state.FillNumber), 10)) + parentRole.SetGlobalRuntimeVar("fill_info_filling_scheme", state.FillingSchemeName) + parentRole.SetGlobalRuntimeVar("fill_info_beam_type", state.BeamType) + if state.StableBeamsStart > 0 { + parentRole.SetGlobalRuntimeVar("fill_info_stable_beam_start_ms", strconv.FormatInt(state.StableBeamsStart, 10)) + } else { + parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_start_ms") + } + if state.StableBeamsEnd > 0 { + parentRole.SetGlobalRuntimeVar("fill_info_stable_beam_end_ms", strconv.FormatInt(state.StableBeamsEnd, 10)) + } else { + parentRole.DeleteGlobalRuntimeVar("fill_info_stable_beam_end_ms") + } + + log.WithField("fillNumber", state.FillNumber). + WithField("fillingScheme", state.FillingSchemeName). + WithField("beamType", state.BeamType). + WithField("beamMode", state.BeamMode). + WithField("stableStartMs", state.StableBeamsStart). + WithField("stableEndMs", state.StableBeamsEnd). + WithField(infologger.Level, infologger.IL_Devel). + Debug("updated environment fill info from latest snapshot") + return +} diff --git a/docs/handbook/operation_order.md b/docs/handbook/operation_order.md index cd81e77e..7f1ef830 100644 --- a/docs/handbook/operation_order.md +++ b/docs/handbook/operation_order.md @@ -32,11 +32,11 @@ This is the order of actions happening at a healthy start of run. - `before_START_ACTIVITY` hooks with negative weights are executed: - `trg.PrepareForRun()` at `-200` + - `lhc.UpdateFillInfo()` at `-50` - `"run_number"` is set. - `"run_start_time_ms"` is set using the current time. It is considered as the SOR and SOSOR timestamps. - `before_START_ACTIVITY` hooks with positive weights (incl. 0) are executed: - `trg.RunLoad()`, `bookkeeping.StartOfRun()` at `10` - - `bookkeeping.RetrieveFillInfo()` at `11` - `kafka.PublishStartActivityUpdate()` at `50` - `dcs.StartOfRun()`, `odc.Start()` (does not need to return now), `ccdb.RunStart()` at `100` @@ -72,6 +72,7 @@ This is the order of actions happening at a healthy end of run. ### before_STOP_ACTIVITY - `before_STOP_ACTIVITY` hooks with negative weights are executed + - `lhc.UpdateFillInfo()` at `-50` - `trg.RunStop()` at `-10` - `"run_end_time_ms"` is set using the current time. It is considered as the EOR and SOEOR timestamps. - `before_STOP_ACTIVITY` hooks with positive weights (incl. 0) are executed: