diff --git a/core/integration/dcs/plugin.go b/core/integration/dcs/plugin.go index ee951dca..122ecb88 100644 --- a/core/integration/dcs/plugin.go +++ b/core/integration/dcs/plugin.go @@ -1,7 +1,7 @@ /* * === This file is part of ALICE O² === * - * Copyright 2021-2024 CERN and copyright holders of ALICE O². + * Copyright 2021-2025 CERN and copyright holders of ALICE O². * Author: Teo Mrnjavac * * This program is free software: you can redistribute it and/or modify @@ -33,6 +33,7 @@ import ( "fmt" "io" "net/url" + "sort" "strconv" "strings" "sync" @@ -438,8 +439,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { } // We acquire a grace period during which we hope that DCS will become compatible with the operation. - // During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz, - // and if we don't get a compatible state within the grace period, we declare the operation failed. + // During this period we'll keep checking our internal state for op compatibility as reported by DCS + // at 1Hz, and if we don't get a compatible state for all included detectors within the grace period, + // we go ahead with a partial PFR. pfrGracePeriod := time.Duration(0) pfrGracePeriodS, ok := varStack["dcs_pfr_grace_period"] if ok { @@ -476,8 +478,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { pfrGraceTimeout := time.Now().Add(pfrGracePeriod) isCompatibleWithOperation := false + var dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr DCSDetectors + knownDetectorStates := p.getDetectorsPfrAvailability(dcsDetectors) - isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) + isCompatibleWithOperation, dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) for { if isCompatibleWithOperation { @@ -494,34 +498,39 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { if time.Now().Before(pfrGraceTimeout) { knownDetectorStates = p.getDetectorsPfrAvailability(dcsDetectors) - isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) + isCompatibleWithOperation, dcsDetectorsCompatibleWithPfr, dcsDetectorsIncompatibleWithPfr, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE) } else { break } } - if !isCompatibleWithOperation { - log.WithError(err). - WithField("level", infologger.IL_Ops). - WithField("partition", envId). - WithField("call", "PrepareForRun"). - Error("DCS error") + ecsDetectorsIncompatibleWithPfr := dcsDetectorsIncompatibleWithPfr.EcsDetectorsSlice() + sort.Strings(ecsDetectorsIncompatibleWithPfr) - call.VarStack["__call_error_reason"] = err.Error() - call.VarStack["__call_error"] = callFailedStr + if !isCompatibleWithOperation { // some detectors are not ready for PFR + if len(dcsDetectorsCompatibleWithPfr) == 0 { // if actually none are ready, we bail + log.WithError(err). + WithField("level", infologger.IL_Ops). + WithField("partition", envId). + WithField("call", "PrepareForRun"). + Error("DCS error") - the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ - Name: call.GetName(), - OperationName: call.Func, - OperationStatus: pb.OpStatus_DONE_ERROR, - OperationStep: "acquire detectors availability", - OperationStepStatus: pb.OpStatus_DONE_ERROR, - EnvironmentId: envId, - Payload: string(payloadJson[:]), - Error: err.Error(), - }) + call.VarStack["__call_error_reason"] = err.Error() + call.VarStack["__call_error"] = callFailedStr - return + the.EventWriterWithTopic(TOPIC).WriteEvent(&pb.Ev_IntegratedServiceEvent{ + Name: call.GetName(), + OperationName: call.Func, + OperationStatus: pb.OpStatus_DONE_ERROR, + OperationStep: "acquire detectors availability", + OperationStepStatus: pb.OpStatus_DONE_ERROR, + EnvironmentId: envId, + Payload: string(payloadJson[:]), + Error: err.Error(), + }) + + return + } } else if isCompatibleWithOperation && err != nil { log.WithField("level", infologger.IL_Ops). WithField("partition", envId). @@ -542,11 +551,17 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Payload: string(payloadJson[:]), }) - // By now the DCS must be in a compatible state, so we proceed with gathering params for the operation + // By now the DCS must be in a compatible state for at least some detectors, so we proceed + // with gathering params for the operation + if len(ecsDetectorsIncompatibleWithPfr) > 0 { + log.WithField("partition", envId). + WithField("level", infologger.IL_Ops). + Warnf("skipping DCS PFR for detectors: %s - a detector state compatible with PFR was not reached within %s", strings.Join(ecsDetectorsIncompatibleWithPfr, " "), pfrGracePeriod) + } log.WithField("partition", envId). WithField("level", infologger.IL_Ops). - Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectors.EcsDetectorsSlice(), " ")) + Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectorsCompatibleWithPfr.EcsDetectorsSlice(), " ")) parameters, ok := varStack["dcs_sor_parameters"] if !ok { @@ -560,7 +575,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { bytes := []byte(parameters) err = json.Unmarshal(bytes, &argMap) if err != nil { - err = fmt.Errorf("error processing DCS SOR parameters: %w", err) + err = fmt.Errorf("error processing DCS PFR parameters: %w", err) log.WithError(err). WithField("partition", envId). WithField("level", infologger.IL_Ops). @@ -595,9 +610,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { in := dcspb.PfrRequest{ RunType: rt, PartitionId: envId, - Detectors: make([]*dcspb.DetectorOperationRequest, len(dcsDetectors)), + Detectors: make([]*dcspb.DetectorOperationRequest, len(dcsDetectorsCompatibleWithPfr)), } - for i, dcsDet := range dcsDetectors { + for i, dcsDet := range dcsDetectorsCompatibleWithPfr { ecsDet := dcsToEcsDetector(dcsDet) perDetectorParameters, okParam := varStack[strings.ToLower(ecsDet)+"_dcs_sor_parameters"] if !okParam { @@ -693,7 +708,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { defer cancel() detectorStatusMap := make(map[dcspb.Detector]dcspb.DetectorState) - for _, v := range dcsDetectors { + for _, v := range dcsDetectorsCompatibleWithPfr { detectorStatusMap[v] = dcspb.DetectorState_NULL_STATE } @@ -1044,7 +1059,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { dcsFailedEcsDetectors := make([]string, 0) dcsopOk := true - for _, v := range dcsDetectors { + for _, v := range dcsDetectorsCompatibleWithPfr { if detectorStatusMap[v] != dcspb.DetectorState_RUN_OK { dcsopOk = false dcsFailedEcsDetectors = append(dcsFailedEcsDetectors, dcsToEcsDetector(v)) @@ -1173,7 +1188,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { isCompatibleWithOperation := false knownDetectorStates := p.getDetectorsSorAvailability(dcsDetectors) - isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) + isCompatibleWithOperation, _, _, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) for { if isCompatibleWithOperation { @@ -1190,7 +1205,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { if time.Now().Before(sorGraceTimeout) { knownDetectorStates = p.getDetectorsSorAvailability(dcsDetectors) - isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) + isCompatibleWithOperation, _, _, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE) } else { break } diff --git a/core/integration/dcs/structs.go b/core/integration/dcs/structs.go index 5a98d68a..6455168f 100644 --- a/core/integration/dcs/structs.go +++ b/core/integration/dcs/structs.go @@ -1,7 +1,7 @@ /* * === This file is part of ALICE O² === * - * Copyright 2021-2024 CERN and copyright holders of ALICE O². + * Copyright 2021-2025 CERN and copyright holders of ALICE O². * Author: Teo Mrnjavac * * This program is free software: you can redistribute it and/or modify @@ -102,11 +102,11 @@ func (dsm DCSDetectorOpAvailabilityMap) makeDetectorsByStateMap() map[dcspb.Dete } // Returns true if the provided detectors are either all in conditionState or in NULL_STATE -func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (bool, error) { +func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (isCompatible bool, detectorsCompatible DCSDetectors, detectorsIncompatible DCSDetectors, err error) { detectorsByState := dsm.makeDetectorsByStateMap() if len(detectorsByState) == 0 { - return true, fmt.Errorf("no detectors provided") + return true, make(DCSDetectors, 0), make(DCSDetectors, 0), fmt.Errorf("no detectors provided") } detectorsInConditionState, thereAreDetectorsInConditionState := detectorsByState[conditionState] @@ -114,24 +114,33 @@ func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionStat if thereAreDetectorsInConditionState && (len(detectorsInConditionState) == len(dsm)) { // all detectors are in conditionState - return true, nil + return true, detectorsInConditionState, make(DCSDetectors, 0), nil } else if thereAreDetectorsInConditionState && thereAreDetectorsInNullState && (len(detectorsInConditionState)+len(detectorsInNullState) == len(dsm)) { // all detectors are either in conditionState or in NULL_STATE - return true, fmt.Errorf("detectors %s are in NULL_STATE", strings.Join(detectorsByState[dcspb.DetectorState_NULL_STATE].ToStringSlice(), ", ")) + detectorsCompatible = append(detectorsInConditionState, detectorsInNullState...) + return true, detectorsCompatible, make(DCSDetectors, 0), fmt.Errorf("detectors %s are in NULL_STATE", strings.Join(detectorsByState[dcspb.DetectorState_NULL_STATE].ToStringSlice(), ", ")) } else if thereAreDetectorsInNullState && (len(detectorsInNullState) == len(dsm)) { // all detectors are in NULL_STATE - return true, fmt.Errorf("all detectors are in NULL_STATE") + return true, detectorsInNullState, make(DCSDetectors, 0), fmt.Errorf("all detectors are in NULL_STATE") } else { // there are detectors in other states incompatible with conditionState reportByState := make([]string, 0) + detectorsCompatible = make(DCSDetectors, 0) + detectorsIncompatible = make(DCSDetectors, 0) for state, detectors := range detectorsByState { if state == conditionState { + detectorsCompatible = append(detectorsCompatible, detectors...) continue } + if state == dcspb.DetectorState_NULL_STATE { + detectorsCompatible = append(detectorsCompatible, detectors...) + } else { + detectorsIncompatible = append(detectorsIncompatible, detectors...) + } reportByState = append(reportByState, fmt.Sprintf("%s in %s", strings.Join(detectors.ToStringSlice(), ", "), state.String())) } - return false, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; ")) + return false, detectorsCompatible, detectorsIncompatible, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; ")) } } diff --git a/core/integration/dcs/structs_test.go b/core/integration/dcs/structs_test.go index 3843d712..066160b2 100644 --- a/core/integration/dcs/structs_test.go +++ b/core/integration/dcs/structs_test.go @@ -175,7 +175,7 @@ func TestDCSDetectorOpAvailabilityMap_compatibleWithDCSOperation(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := tt.dsm.compatibleWithDCSOperation(tt.args.conditionState) + got, _, _, err := tt.dsm.compatibleWithDCSOperation(tt.args.conditionState) if (err != nil) != tt.wantErr { t.Errorf("compatibleWithDCSOperation() error = %v, wantErr %v", err, tt.wantErr) return