Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 48 additions & 33 deletions core/integration/dcs/plugin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2021-2024 CERN and copyright holders of ALICE O².
* Copyright 2021-2025 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -33,6 +33,7 @@ import (
"fmt"
"io"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -438,8 +439,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
}

// We acquire a grace period during which we hope that DCS will become compatible with the operation.
// During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz,
// and if we don't get a compatible state within the grace period, we declare the operation failed.
// During this period we'll keep checking our internal state for op compatibility as reported by DCS
// at 1Hz, and if we don't get a compatible state for all included detectors within the grace period,
// we go ahead with a partial PFR.
pfrGracePeriod := time.Duration(0)
pfrGracePeriodS, ok := varStack["dcs_pfr_grace_period"]
if ok {
Expand Down Expand Up @@ -476,8 +478,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
pfrGraceTimeout := time.Now().Add(pfrGracePeriod)
isCompatibleWithOperation := false

var dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr DCSDetectors

knownDetectorStates := p.getDetectorsPfrAvailability(dcsDetectors)
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
isCompatibleWithOperation, dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)

for {
if isCompatibleWithOperation {
Expand All @@ -494,34 +498,39 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

if time.Now().Before(pfrGraceTimeout) {
knownDetectorStates = p.getDetectorsPfrAvailability(dcsDetectors)
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
isCompatibleWithOperation, dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
} else {
break
}
}

if !isCompatibleWithOperation {
log.WithError(err).
WithField("level", infologger.IL_Ops).
WithField("partition", envId).
WithField("call", "PrepareForRun").
Error("DCS error")
ecsDetectorsIncompatibleWithPfr := dcsDetectorsIncompatibleWithPfr.EcsDetectorsSlice()
sort.Strings(ecsDetectorsIncompatibleWithPfr)

call.VarStack["__call_error_reason"] = err.Error()
call.VarStack["__call_error"] = callFailedStr
if !isCompatibleWithOperation { // some detectors are not ready for PFR
if len(dcsDetectorsCompatibleWithPfr) == 0 { // if actually none are ready, we bail
log.WithError(err).
WithField("level", infologger.IL_Ops).
WithField("partition", envId).
WithField("call", "PrepareForRun").
Error("DCS error")

the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_DONE_ERROR,
OperationStep: "acquire detectors availability",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})
call.VarStack["__call_error_reason"] = err.Error()
call.VarStack["__call_error"] = callFailedStr

return
the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{
Name: call.GetName(),
OperationName: call.Func,
OperationStatus: pb.OpStatus_DONE_ERROR,
OperationStep: "acquire detectors availability",
OperationStepStatus: pb.OpStatus_DONE_ERROR,
EnvironmentId: envId,
Payload: string(payloadJson[:]),
Error: err.Error(),
})

return
}
} else if isCompatibleWithOperation && err != nil {
log.WithField("level", infologger.IL_Ops).
WithField("partition", envId).
Expand All @@ -542,11 +551,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Payload: string(payloadJson[:]),
})

// By now the DCS must be in a compatible state, so we proceed with gathering params for the operation
// By now the DCS must be in a compatible state for at least some detectors, so we proceed
// with gathering params for the operation

if len(ecsDetectorsIncompatibleWithPfr) > 0 {
log.WithField("partition", envId).
WithField("level", infologger.IL_Ops).
Warnf("skipping DCS PFR for detectors: %s - a detector state compatible with PFR was not reached within %s", strings.Join(ecsDetectorsIncompatibleWithPfr, " "), pfrGracePeriod)
}
log.WithField("partition", envId).
WithField("level", infologger.IL_Ops).
Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectors.EcsDetectorsSlice(), " "))
Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectorsCompatibleWithPfr.EcsDetectorsSlice(), " "))

parameters, ok := varStack["dcs_sor_parameters"]
if !ok {
Expand All @@ -560,7 +575,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
bytes := []byte(parameters)
err = json.Unmarshal(bytes, &argMap)
if err != nil {
err = fmt.Errorf("error processing DCS SOR parameters: %w", err)
err = fmt.Errorf("error processing DCS PFR parameters: %w", err)
log.WithError(err).
WithField("partition", envId).
WithField("level", infologger.IL_Ops).
Expand Down Expand Up @@ -595,9 +610,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
in := dcspb.PfrRequest{
RunType: rt,
PartitionId: envId,
Detectors: make([]*dcspb.DetectorOperationRequest, len(dcsDetectors)),
Detectors: make([]*dcspb.DetectorOperationRequest, len(dcsDetectorsCompatibleWithPfr)),
}
for i, dcsDet := range dcsDetectors {
for i, dcsDet := range dcsDetectorsCompatibleWithPfr {
ecsDet := dcsToEcsDetector(dcsDet)
perDetectorParameters, okParam := varStack[strings.ToLower(ecsDet)+"_dcs_sor_parameters"]
if !okParam {
Expand Down Expand Up @@ -693,7 +708,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
defer cancel()

detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState)
for _, v := range dcsDetectors {
for _, v := range dcsDetectorsCompatibleWithPfr {
detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE
}

Expand Down Expand Up @@ -1044,7 +1059,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

dcsFailedEcsDetectors := make([]string, 0)
dcsopOk := true
for _, v := range dcsDetectors {
for _, v := range dcsDetectorsCompatibleWithPfr {
if detectorStatusMap[v] != dcspb.DetectorState_RUN_OK {
dcsopOk = false
dcsFailedEcsDetectors = append(dcsFailedEcsDetectors, dcsToEcsDetector(v))
Expand Down Expand Up @@ -1173,7 +1188,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
isCompatibleWithOperation := false

knownDetectorStates := p.getDetectorsSorAvailability(dcsDetectors)
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
isCompatibleWithOperation, _, _, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)

for {
if isCompatibleWithOperation {
Expand All @@ -1190,7 +1205,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

if time.Now().Before(sorGraceTimeout) {
knownDetectorStates = p.getDetectorsSorAvailability(dcsDetectors)
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
isCompatibleWithOperation, _, _, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
} else {
break
}
Expand Down
23 changes: 16 additions & 7 deletions core/integration/dcs/structs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2021-2024 CERN and copyright holders of ALICE O².
* Copyright 2021-2025 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
Expand Down Expand Up @@ -102,36 +102,45 @@ func (dsm DCSDetectorOpAvailabilityMap) makeDetectorsByStateMap() map[dcspb.Dete
}

// Returns true if the provided detectors are either all in conditionState or in NULL_STATE
func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (bool, error) {
func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (isCompatible bool, detectorsCompatible DCSDetectors, detectorsIncompatible DCSDetectors, err error) {
detectorsByState := dsm.makeDetectorsByStateMap()

if len(detectorsByState) == 0 {
return true, fmt.Errorf("no detectors provided")
return true, make(DCSDetectors, 0), make(DCSDetectors, 0), fmt.Errorf("no detectors provided")
}

detectorsInConditionState, thereAreDetectorsInConditionState := detectorsByState[conditionState]
detectorsInNullState, thereAreDetectorsInNullState := detectorsByState[dcspb.DetectorState_NULL_STATE]

if thereAreDetectorsInConditionState && (len(detectorsInConditionState) == len(dsm)) {
// all detectors are in conditionState
return true, nil
return true, detectorsInConditionState, make(DCSDetectors, 0), nil
} else if thereAreDetectorsInConditionState && thereAreDetectorsInNullState && (len(detectorsInConditionState)+len(detectorsInNullState) == len(dsm)) {
// all detectors are either in conditionState or in NULL_STATE
return true, fmt.Errorf("detectors %s are in NULL_STATE", strings.Join(detectorsByState[dcspb.DetectorState_NULL_STATE].ToStringSlice(), ", "))
detectorsCompatible = append(detectorsInConditionState, detectorsInNullState...)
return true, detectorsCompatible, make(DCSDetectors, 0), fmt.Errorf("detectors %s are in NULL_STATE", strings.Join(detectorsByState[dcspb.DetectorState_NULL_STATE].ToStringSlice(), ", "))
} else if thereAreDetectorsInNullState && (len(detectorsInNullState) == len(dsm)) {
// all detectors are in NULL_STATE
return true, fmt.Errorf("all detectors are in NULL_STATE")
return true, detectorsInNullState, make(DCSDetectors, 0), fmt.Errorf("all detectors are in NULL_STATE")
} else {
// there are detectors in other states incompatible with conditionState
reportByState := make([]string, 0)
detectorsCompatible = make(DCSDetectors, 0)
detectorsIncompatible = make(DCSDetectors, 0)
for state, detectors := range detectorsByState {
if state == conditionState {
detectorsCompatible = append(detectorsCompatible, detectors...)
continue
}
if state == dcspb.DetectorState_NULL_STATE {
detectorsCompatible = append(detectorsCompatible, detectors...)
} else {
detectorsIncompatible = append(detectorsIncompatible, detectors...)
}
reportByState = append(reportByState,
fmt.Sprintf("%s in %s", strings.Join(detectors.ToStringSlice(), ", "), state.String()))
}
return false, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; "))
return false, detectorsCompatible, detectorsIncompatible, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; "))
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/integration/dcs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestDCSDetectorOpAvailabilityMap_compatibleWithDCSOperation(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.dsm.compatibleWithDCSOperation(tt.args.conditionState)
got, _, _, err := tt.dsm.compatibleWithDCSOperation(tt.args.conditionState)
if (err != nil) != tt.wantErr {
t.Errorf("compatibleWithDCSOperation() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading