diff --git a/core/integration/odc/handlers.go b/core/integration/odc/handlers.go index 85869c1d..6727c1af 100644 --- a/core/integration/odc/handlers.go +++ b/core/integration/odc/handlers.go @@ -97,17 +97,10 @@ func handleGetState(ctx context.Context, odcClient *RpcClient, envId string) (st return odcutils.StateForOdcState(newState), err } -func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error { - defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId)) +func setProperties(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error { var err error = nil - var rep *odcpb.StateReply - - if envId == "" { - return errors.New("cannot proceed with empty environment id") - } - // SetProperties before START setPropertiesRequest := &odcpb.SetPropertiesRequest{ Partitionid: envId, Path: "", @@ -252,6 +245,25 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string }). Debug("call to ODC complete: odc.SetProperties") + return nil +} + +func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error { + defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId)) + + var err error = nil + var rep *odcpb.StateReply + + if envId == "" { + return errors.New("cannot proceed with empty environment id") + } + + // SetProperties before START + err = setProperties(ctx, odcClient, arguments, paddingTimeout, envId, runNumber, call) + if err != nil { + return err + } + // The actual START operation starts here req := &odcpb.StartRequest{ Request: &odcpb.StateRequest{ @@ -262,15 +274,15 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string }, } // We ask this ODC call to complete within our own DEADLINE, minus 1 second - ctxDeadline, ok = ctx.Deadline() + ctxDeadline, ok := ctx.Deadline() if ok { req.Request.Timeout = uint32((time.Until(ctxDeadline) - paddingTimeout).Seconds()) } - payload = map[string]interface{}{ + payload := map[string]interface{}{ "odcRequest": &req, } - payloadJson, _ = json.Marshal(payload) + payloadJson, _ := json.Marshal(payload) the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, @@ -382,6 +394,20 @@ func handleStart(ctx context.Context, odcClient *RpcClient, arguments map[string func handleStop(ctx context.Context, odcClient *RpcClient, arguments map[string]string, paddingTimeout time.Duration, envId string, runNumber uint64, call *callable.Call) error { defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient").WithField("partition", envId)) + var err error = nil + + // SetProperties before STOP + if len(arguments) > 0 { + err = setProperties(ctx, odcClient, arguments, paddingTimeout, envId, runNumber, call) + if err != nil { + log.WithField("partition", envId). + WithField("level", infologger.IL_Support). + WithError(err). + Warn("setProperties call to ODC failed. will continue with odc.Stop") + } + } + + // The actual STOP operation starts here req := &odcpb.StopRequest{ Request: &odcpb.StateRequest{ Partitionid: envId, @@ -396,7 +422,6 @@ func handleStop(ctx context.Context, odcClient *RpcClient, arguments map[string] req.Request.Timeout = uint32((time.Until(ctxDeadline) - paddingTimeout).Seconds()) } - var err error = nil var rep *odcpb.StateReply if envId == "" { diff --git a/core/integration/odc/plugin.go b/core/integration/odc/plugin.go index 55b0a22a..502b5723 100644 --- a/core/integration/odc/plugin.go +++ b/core/integration/odc/plugin.go @@ -1363,6 +1363,12 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { WithField("call", "Start"). Warn("cannot acquire FairMQ devices cleanup count for ODC") } + runStartTimeMs, ok := varStack["run_start_time_ms"] + if !ok { + log.WithField("partition", envId). + WithField("call", "Start"). + Warn("cannot acquire run_start_time_ms") + } var ( runNumberu64 uint64 @@ -1391,6 +1397,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { arguments := make(map[string]string) arguments["run_number"] = rn arguments["runNumber"] = rn + arguments["run_start_time_ms"] = runStartTimeMs arguments["cleanup"] = strconv.Itoa(cleanupCount) ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -1409,32 +1416,40 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } stack["Stop"] = func() (out string) { // ODC Stop + callFailedStr := "EPN Stop call failed" + var ( + runNumberu64 uint64 + err error + ) rn, ok := varStack["run_number"] if !ok { log.WithField("partition", envId). WithField("call", "Start"). - Warn("cannot acquire run number for ODC") + Warn("cannot acquire run number for ODC Stop") } - var ( - runNumberu64 uint64 - err error - ) - callFailedStr := "EPN Stop call failed" - runNumberu64, err = strconv.ParseUint(rn, 10, 32) if err != nil { log.WithField("partition", envId). WithError(err). - Error("cannot acquire run number for DCS SOR") + Error("cannot acquire run number for ODC EOR") runNumberu64 = 0 } + runEndTimeMs, ok := varStack["run_end_time_ms"] + if !ok { + log.WithField("partition", envId). + WithField("call", "Start"). + Warn("cannot acquire run_end_time_ms") + } + + arguments := make(map[string]string) + arguments["run_end_time_ms"] = runEndTimeMs timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - err = handleStop(ctx, p.odcClient, nil, paddingTimeout, envId, runNumberu64, call) + err = handleStop(ctx, p.odcClient, arguments, paddingTimeout, envId, runNumberu64, call) if err != nil { log.WithError(err). WithField("level", infologger.IL_Support). diff --git a/docs/handbook/configuration.md b/docs/handbook/configuration.md index 5168e4a9..cf0024fe 100644 --- a/docs/handbook/configuration.md +++ b/docs/handbook/configuration.md @@ -455,7 +455,10 @@ In addition to the above, which varies depending on the configuration of the env * `pdp_beam_type` * `pdp_override_run_start_time` -FairMQ task implementors should expect that these values are written to the FairMQ properties map right before the `RUN` transition via `SetProperty` calls. +The following values are pushed by AliECS during `STOP_ACTIVITY`: + * `run_end_time_ms` + +FairMQ task implementors should expect that these values are written to the FairMQ properties map right before the `RUN` and `STOP` transitions via `SetProperty` calls. ## Resource wants and limits