Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 166 additions & 47 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ import (
"github.com/spf13/viper"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -782,22 +784,59 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

break
}
if err != nil { // stream termination in case of general error
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
Warn(logMsg)
if err != nil { // stream termination in case of unknown or gRPC error
got := status.Code(err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
if got == codes.DeadlineExceeded {
log.WithError(err).
WithField("partition", envId).
WithField("timeout", timeout.String()).
Debug("DCS PFR timed out")
err = fmt.Errorf("DCS PFR timed out after %s: %w", timeout.String(), err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS PFR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
Warn(logMsg)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
} else { // some other gRPC error code
log.WithError(err).
WithField("partition", envId).
Error("DCS PFR call error")
err = fmt.Errorf("DCS PFR call error: %w", err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: PrepareForRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
}

break
}
Expand Down Expand Up @@ -1452,23 +1491,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

break
}
if err != nil { // stream termination in case of general error
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)
if err != nil { // stream termination in case of unknown or gRPC error
got := status.Code(err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
if got == codes.DeadlineExceeded {
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
WithField("timeout", timeout.String()).
Debug("DCS SOR timed out")
err = fmt.Errorf("DCS SOR timed out after %s: %w", timeout.String(), err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})

} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS SOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
} else { // some other gRPC error code
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Debug("DCS SOR call error")
err = fmt.Errorf("DCS SOR call error: %w", err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: StartOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
}

break
}
Expand Down Expand Up @@ -2001,23 +2080,63 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

break
}
if err != nil { // stream termination in case of general error
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)
if err != nil { // stream termination in case of unknown or gRPC error
got := status.Code(err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
if got == codes.DeadlineExceeded {
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
WithField("timeout", timeout.String()).
Debug("DCS EOR timed out")
err = fmt.Errorf("DCS EOR timed out after %s: %w", timeout.String(), err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_TIMEOUT,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})

} else if got == codes.Unknown { // unknown error, likely not a gRPC code
logMsg := "bad DCS EOR event received, any future DCS events are ignored"
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Warn(logMsg)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: logMsg,
})
} else { // some other gRPC error code
log.WithError(err).
WithField("partition", envId).
WithField("run", runNumber64).
Debug("DCS EOR call error")
err = fmt.Errorf("DCS EOR call error: %w", err)

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_ONGOING,
OperationStep: "perform DCS call: EndOfRun",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
}

break
}
Expand Down