From 4a2b9a5eb34ea679ee4c203999a1ad31eb073978 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 25 Mar 2025 12:03:29 +0100 Subject: [PATCH] [core] ECS waits until DCS operation completes in case a detector fails This changes how we consume reply streams of requests to DCS. In particular, we do not stop consuming events in case we receive a detector error and instead we wait to receive the status of all the other detectors. As it has been clarified with the DCS team, Ecs2DcsGateway never sends "a pre-closure summary of the current operation" with detector == DCS, despite the documentation in the dcs.proto file. The latest version of this proto file in the Ecs2DcsGateway repo has the comment removed. With this commit, we treat an EOF from DCS as the only correct stream termination and do not ever expect a detector == DCS event. We also remove some reduntant operations, but more refactoring will have to come. Closes OCTRL-711. --- core/integration/dcs/plugin.go | 309 +++++++++++---------------------- 1 file changed, 102 insertions(+), 207 deletions(-) diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index 8bab8b3f..03fa9438 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -725,6 +725,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var dcsEvent *dcspb.RunEvent for { + // fixme: consider removing this check, since `stream.Recv()` will return a timeout error anyway if the context expires if ctx.Err() != nil { err = fmt.Errorf("DCS PrepareForRun context timed out (%s), any future DCS events are ignored", timeout.String()) @@ -743,8 +744,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } dcsEvent, err = stream.Recv() if errors.Is(err, io.EOF) { // correct stream termination - logMsg := "DCS PFR event stream was closed from the DCS side (EOF)" - log.Debug(logMsg) + log.Debug("DCS PFR event stream was closed from the DCS side (EOF)") + err = nil break // no more data } @@ -826,10 +827,16 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Warn("nil DCS PFR event received, skipping to next DCS event") continue } + if dcsEvent.GetDetector() == dcspb.Detector_DCS { + log.WithField(infologger.Level, infologger.IL_Support). + Warnf("Received an event for DCS detector (%s), which is unexpected, ignoring", dcsEvent.GetState().String()) + continue + } - if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) + detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() + ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) + if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { logErr := fmt.Errorf("%s PFR failure reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -851,7 +858,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_ERROR, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: PrepareForRun", OperationStepStatus: pb.OpStatus_DONE_ERROR, EnvironmentId: envId, @@ -859,12 +866,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } if dcsEvent.GetState() == dcspb.DetectorState_PFR_UNAVAILABLE { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - logErr := fmt.Errorf("%s PFR unavailable reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -886,7 +891,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_ERROR, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: PrepareForRun", OperationStepStatus: pb.OpStatus_DONE_ERROR, EnvironmentId: envId, @@ -894,12 +899,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } if dcsEvent.GetState() == dcspb.DetectorState_TIMEOUT { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - logErr := fmt.Errorf("%s PFR timeout reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -921,7 +924,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_TIMEOUT, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: PrepareForRun", OperationStepStatus: pb.OpStatus_DONE_TIMEOUT, EnvironmentId: envId, @@ -929,70 +932,37 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } - detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() - - if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK { - if dcsEvent.GetDetector() == dcspb.Detector_DCS { - log.WithField("event", dcsEvent). - WithField("level", infologger.IL_Support). - Debug("DCS PFR completed successfully") - - detPayload := map[string]interface{}{} - _ = copier.Copy(&detPayload, payload) - detPayload["dcsEvent"] = dcsEvent - detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] - detPayloadJson, _ := json.Marshal(detPayload) - - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: PrepareForRun", - OperationStepStatus: pb.OpStatus_ONGOING, - EnvironmentId: envId, - Payload: string(detPayloadJson[:]), - }) - - break - } else { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - log.WithField("detector", ecsDet). - Debugf("DCS PFR for %s: received status %s", ecsDet, dcsEvent.GetState().String()) - - detPayload := map[string]interface{}{} - _ = copier.Copy(&detPayload, payload) - detPayload["detector"] = ecsDet - detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] - detPayload["dcsEvent"] = dcsEvent - detPayloadJson, _ := json.Marshal(detPayload) - - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: PrepareForRun", - OperationStepStatus: pb.OpStatus_ONGOING, - EnvironmentId: envId, - Payload: string(detPayloadJson[:]), - }) - - } - } if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK { log.WithField("event", dcsEvent). WithField("level", infologger.IL_Support). - WithField("detector", dcsToEcsDetector(dcsEvent.GetDetector())). + WithField("detector", ecsDet). Info("ALIECS PFR operation : completed DCS PFR for ") + + detPayload := map[string]interface{}{} + _ = copier.Copy(&detPayload, payload) + detPayload["detector"] = ecsDet + detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] + detPayload["dcsEvent"] = dcsEvent + detPayloadJson, _ := json.Marshal(detPayload) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: PrepareForRun", + OperationStepStatus: pb.OpStatus_ONGOING, + EnvironmentId: envId, + Payload: string(detPayloadJson[:]), + }) } else { log.WithField("event", dcsEvent). WithField("level", infologger.IL_Devel). - WithField("detector", dcsToEcsDetector(dcsEvent.GetDetector())). + WithField("detector", ecsDet). Info("ALIECS PFR operation : processing DCS PFR for ") - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) detPayload := map[string]interface{}{} _ = copier.Copy(&detPayload, payload) detPayload["detector"] = ecsDet @@ -1010,7 +980,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Payload: string(detPayloadJson[:]), }) } - } dcsFailedEcsDetectors := make([]string, 0) @@ -1034,9 +1003,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } else { logErr := fmt.Errorf("PFR failed for %s", strings.Join(dcsFailedEcsDetectors, ", ")) if err != nil { - if errors.Is(err, io.EOF) { - err = fmt.Errorf("DCS PFR stream unexpectedly terminated from DCS side before completion: %w", err) - } logErr = fmt.Errorf("%v : %v", err, logErr) } @@ -1375,6 +1341,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var dcsEvent *dcspb.RunEvent for { + // fixme: consider removing this check, since `stream.Recv()` will return a timeout error anyway if the context expires if ctx.Err() != nil { err = fmt.Errorf("DCS StartOfRun context timed out (%s), any future DCS events are ignored", timeout.String()) @@ -1393,8 +1360,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } dcsEvent, err = stream.Recv() if errors.Is(err, io.EOF) { // correct stream termination - logMsg := "DCS SOR event stream was closed from the DCS side (EOF)" - log.Debug(logMsg) + log.Debug("DCS SOR event stream was closed from the DCS side (EOF)") + err = nil break // no more data } @@ -1475,10 +1442,16 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { log.Warn("nil DCS SOR event received, skipping to next DCS event") continue } + if dcsEvent.GetDetector() == dcspb.Detector_DCS { + log.WithField(infologger.Level, infologger.IL_Support). + Warnf("Received an event for DCS detector (%s), which is unexpected, ignoring", dcsEvent.GetState().String()) + continue + } - if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) + detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() + ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) + if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE { logErr := fmt.Errorf("%s SOR failure reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -1500,7 +1473,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_ERROR, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: StartOfRun", OperationStepStatus: pb.OpStatus_DONE_ERROR, EnvironmentId: envId, @@ -1508,12 +1481,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } if dcsEvent.GetState() == dcspb.DetectorState_SOR_UNAVAILABLE { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - logErr := fmt.Errorf("%s SOR unavailable reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -1535,7 +1506,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_ERROR, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: StartOfRun", OperationStepStatus: pb.OpStatus_DONE_ERROR, EnvironmentId: envId, @@ -1543,12 +1514,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } if dcsEvent.GetState() == dcspb.DetectorState_TIMEOUT { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - logErr := fmt.Errorf("%s SOR timeout reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -1570,7 +1539,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_TIMEOUT, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: StartOfRun", OperationStepStatus: pb.OpStatus_DONE_TIMEOUT, EnvironmentId: envId, @@ -1578,64 +1547,31 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } - detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() - - if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK { - if dcsEvent.GetDetector() == dcspb.Detector_DCS { - log.WithField("event", dcsEvent). - WithField("level", infologger.IL_Support). - Debug("DCS SOR completed successfully") - p.pendingEORs[envId] = runNumber64 - - detPayload := map[string]interface{}{} - _ = copier.Copy(&detPayload, payload) - detPayload["dcsEvent"] = dcsEvent - detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] - detPayloadJson, _ := json.Marshal(detPayload) - - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: StartOfRun", - OperationStepStatus: pb.OpStatus_ONGOING, - EnvironmentId: envId, - Payload: string(detPayloadJson[:]), - }) - - break - } else { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - log.WithField("detector", ecsDet). - Debugf("DCS SOR for %s: received status %s", ecsDet, dcsEvent.GetState().String()) - - detPayload := map[string]interface{}{} - _ = copier.Copy(&detPayload, payload) - detPayload["detector"] = ecsDet - detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] - detPayload["dcsEvent"] = dcsEvent - detPayloadJson, _ := json.Marshal(detPayload) - - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: StartOfRun", - OperationStepStatus: pb.OpStatus_ONGOING, - EnvironmentId: envId, - Payload: string(detPayloadJson[:]), - }) - - } - } if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK { log.WithField("event", dcsEvent). WithField("level", infologger.IL_Support). WithField("detector", dcsToEcsDetector(dcsEvent.GetDetector())). Info("ALIECS SOR operation : completed DCS SOR for ") + + detPayload := map[string]interface{}{} + _ = copier.Copy(&detPayload, payload) + detPayload["detector"] = ecsDet + detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] + detPayload["dcsEvent"] = dcsEvent + detPayloadJson, _ := json.Marshal(detPayload) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: StartOfRun", + OperationStepStatus: pb.OpStatus_ONGOING, + EnvironmentId: envId, + Payload: string(detPayloadJson[:]), + }) } else { log.WithField("event", dcsEvent). WithField("level", infologger.IL_Devel). @@ -1659,9 +1595,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { EnvironmentId: envId, Payload: string(detPayloadJson[:]), }) - } - } dcsFailedEcsDetectors := make([]string, 0) @@ -1687,9 +1621,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } else { logErr := fmt.Errorf("SOR failed for %s, DCS EOR will run anyway for this run", strings.Join(dcsFailedEcsDetectors, ", ")) if err != nil { - if errors.Is(err, io.EOF) { - err = fmt.Errorf("DCS SOR stream unexpectedly terminated from DCS side before completion: %w", err) - } logErr = fmt.Errorf("%v : %v", err, logErr) } @@ -1905,6 +1836,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { var dcsEvent *dcspb.RunEvent for { + // fixme: consider removing this check, since `stream.Recv()` will return a timeout error anyway if the context expires if ctx.Err() != nil { err = fmt.Errorf("DCS EndOfRun context timed out (%s), any future DCS events are ignored", timeout.String()) @@ -1923,8 +1855,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } dcsEvent, err = stream.Recv() if errors.Is(err, io.EOF) { // correct stream termination - logMsg := "DCS EOR event stream was closed from the DCS side (EOF)" - log.Debug(logMsg) + log.Debug("DCS EOR event stream was closed from the DCS side (EOF)") + err = nil break // no more data } @@ -2005,17 +1937,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { log.Warn("nil DCS EOR event received, skipping to next DCS event") continue } + if dcsEvent.GetDetector() == dcspb.Detector_DCS { + log.WithField(infologger.Level, infologger.IL_Support). + Warnf("Received an event for DCS detector (%s), which is unexpected, ignoring", dcsEvent.GetState().String()) + continue + } - if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) + detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() + ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) + if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE { logErr := fmt.Errorf("%s EOR failure reported by DCS", ecsDet) - if err != nil { - if errors.Is(err, io.EOF) { - err = fmt.Errorf("DCS EOR stream unexpectedly terminated from DCS side before completion: %w", err) - } - logErr = fmt.Errorf("%v : %v", err, logErr) - } log.WithError(logErr). WithField("event", dcsEvent). WithField("detector", ecsDet). @@ -2033,7 +1965,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_ERROR, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: EndOfRun", OperationStepStatus: pb.OpStatus_DONE_ERROR, EnvironmentId: envId, @@ -2041,12 +1973,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return + continue } if dcsEvent.GetState() == dcspb.DetectorState_TIMEOUT { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - logErr := fmt.Errorf("%s EOR timeout reported by DCS", ecsDet) if err != nil { logErr = fmt.Errorf("%v : %v", err, logErr) @@ -2068,7 +1998,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ Name: call.GetName(), OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_TIMEOUT, + OperationStatus: pb.OpStatus_ONGOING, OperationStep: "perform DCS call: EndOfRun", OperationStepStatus: pb.OpStatus_DONE_TIMEOUT, EnvironmentId: envId, @@ -2076,57 +2006,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error: logErr.Error(), }) - return - } - - detectorStatusMap[dcsEvent.GetDetector()] = dcsEvent.GetState() - - if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK { - if dcsEvent.GetDetector() == dcspb.Detector_DCS { - log.WithField("event", dcsEvent). - WithField("level", infologger.IL_Support). - Debug("DCS EOR completed successfully") - delete(p.pendingEORs, envId) - - detPayload := map[string]interface{}{} - _ = copier.Copy(&detPayload, payload) - detPayload["dcsEvent"] = dcsEvent - payload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] - detPayloadJson, _ := json.Marshal(detPayload) - - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: EndOfRun", - OperationStepStatus: pb.OpStatus_ONGOING, - EnvironmentId: envId, - Payload: string(detPayloadJson[:]), - }) - - break - } else { - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) - log.WithField("detector", dcsEvent.GetDetector().String()). - Debugf("DCS EOR for %s: received status %s", ecsDet, dcsEvent.GetState().String()) - - detPayload := map[string]interface{}{} - _ = copier.Copy(&detPayload, payload) - detPayload["detector"] = ecsDet - detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] - detPayload["dcsEvent"] = dcsEvent - detPayloadJson, _ := json.Marshal(detPayload) - - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_ONGOING, - OperationStep: "perform DCS call: EndOfRun", - OperationStepStatus: pb.OpStatus_ONGOING, - EnvironmentId: envId, - Payload: string(detPayloadJson[:]), - }) - } + continue } if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK { @@ -2134,13 +2014,29 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { WithField("level", infologger.IL_Support). WithField("detector", dcsToEcsDetector(dcsEvent.GetDetector())). Info("ALIECS EOR operation : completed DCS EOR for ") + + detPayload := map[string]interface{}{} + _ = copier.Copy(&detPayload, payload) + detPayload["detector"] = ecsDet + detPayload["state"] = dcspb.DetectorState_name[int32(dcsEvent.GetState())] + detPayload["dcsEvent"] = dcsEvent + detPayloadJson, _ := json.Marshal(detPayload) + + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_ONGOING, + OperationStep: "perform DCS call: EndOfRun", + OperationStepStatus: pb.OpStatus_ONGOING, + EnvironmentId: envId, + Payload: string(detPayloadJson[:]), + }) } else { log.WithField("event", dcsEvent). WithField("level", infologger.IL_Devel). WithField("detector", dcsToEcsDetector(dcsEvent.GetDetector())). Info("ALIECS EOR operation : processing DCS EOR for ") - ecsDet := dcsToEcsDetector(dcsEvent.GetDetector()) detPayload := map[string]interface{}{} _ = copier.Copy(&detPayload, payload) detPayload["detector"] = ecsDet @@ -2157,7 +2053,6 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { EnvironmentId: envId, Payload: string(detPayloadJson[:]), }) - } }