Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {
cmd.PersistentFlags().StringVar(&opts.PromQLTarget.KubeSvc.Name, "metrics-service", opts.PromQLTarget.KubeSvc.Name, "The name of the remote PromQL query service. Must be specified when --use-dns-for-services is disabled.")
cmd.PersistentFlags().BoolVar(&opts.PromQLTarget.UseDNS, "use-dns-for-services", opts.PromQLTarget.UseDNS, "Configures the CVO to use DNS for resolution of services in the cluster.")
cmd.PersistentFlags().StringVar(&opts.PrometheusURLString, "metrics-url", opts.PrometheusURLString, "The URL used to access the remote PromQL query service.")
cmd.PersistentFlags().BoolVar(&opts.InjectClusterIdIntoPromQL, "hypershift", opts.InjectClusterIdIntoPromQL, "This options indicates whether the CVO is running inside a hosted control plane.")
cmd.PersistentFlags().BoolVar(&opts.HyperShift, "hypershift", opts.HyperShift, "This options indicates whether the CVO is running inside a hosted control plane.")
cmd.PersistentFlags().StringVar(&opts.UpdateService, "update-service", opts.UpdateService, "The preferred update service. If set, this option overrides any upstream value configured in ClusterVersion spec.")
cmd.PersistentFlags().StringSliceVar(&opts.AlwaysEnableCapabilities, "always-enable-capabilities", opts.AlwaysEnableCapabilities, "List of the cluster capabilities which will always be implicitly enabled.")
rootCmd.AddCommand(cmd)
Expand Down
114 changes: 114 additions & 0 deletions pkg/cvo/configuration/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package configuration

import (
"context"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1"
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
cvoclientv1alpha1 "github.com/openshift/client-go/operator/clientset/versioned/typed/operator/v1alpha1"
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
operatorlistersv1alpha1 "github.com/openshift/client-go/operator/listers/operator/v1alpha1"
)

const ClusterVersionOperatorConfigurationName = "cluster"

type ClusterVersionOperatorConfiguration struct {
queueKey string
// queue tracks checking for the CVO configuration.
//
// The type any is used to comply with the worker method of the cvo.Operator struct.
queue workqueue.TypedRateLimitingInterface[any]

client cvoclientv1alpha1.ClusterVersionOperatorInterface
lister operatorlistersv1alpha1.ClusterVersionOperatorLister
factory operatorexternalversions.SharedInformerFactory

started bool
}

func (config *ClusterVersionOperatorConfiguration) Queue() workqueue.TypedRateLimitingInterface[any] {
return config.queue
}

// clusterVersionOperatorEventHandler queues an update for the cluster version operator on any change to the given object.
// Callers should use this with an informer.
func (config *ClusterVersionOperatorConfiguration) clusterVersionOperatorEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
config.queue.Add(config.queueKey)
},
UpdateFunc: func(_, _ interface{}) {
config.queue.Add(config.queueKey)
},
DeleteFunc: func(_ interface{}) {
config.queue.Add(config.queueKey)
},
}
}

// NewClusterVersionOperatorConfiguration returns ClusterVersionOperatorConfiguration, which might be used
// to synchronize with the ClusterVersionOperator resource.
func NewClusterVersionOperatorConfiguration(client operatorclientset.Interface, factory operatorexternalversions.SharedInformerFactory) *ClusterVersionOperatorConfiguration {
return &ClusterVersionOperatorConfiguration{
queueKey: fmt.Sprintf("ClusterVersionOperator/%s", ClusterVersionOperatorConfigurationName),
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{Name: "configuration"}),
client: client.OperatorV1alpha1().ClusterVersionOperators(),
factory: factory,
}
}

// Start initializes and starts the configuration's informers. Must be run before Sync is called.
// Blocks until informers caches are synchronized or the context is cancelled.
func (config *ClusterVersionOperatorConfiguration) Start(ctx context.Context) error {
informer := config.factory.Operator().V1alpha1().ClusterVersionOperators()
if _, err := informer.Informer().AddEventHandler(config.clusterVersionOperatorEventHandler()); err != nil {
return err
}
config.lister = informer.Lister()

config.factory.Start(ctx.Done())
synced := config.factory.WaitForCacheSync(ctx.Done())
for _, ok := range synced {
if !ok {
return fmt.Errorf("caches failed to sync: %w", ctx.Err())
}
}

config.started = true
return nil
}

func (config *ClusterVersionOperatorConfiguration) Sync(ctx context.Context, key string) error {
if !config.started {
panic("ClusterVersionOperatorConfiguration instance was not properly started before its synchronization.")
}
startTime := time.Now()
klog.V(2).Infof("Started syncing CVO configuration %q", key)
defer func() {
klog.V(2).Infof("Finished syncing CVO configuration (%v)", time.Since(startTime))
}()

desiredConfig, err := config.lister.Get(ClusterVersionOperatorConfigurationName)
if apierrors.IsNotFound(err) {
// TODO: Set default values
return nil
}
if err != nil {
return err
}
return config.sync(ctx, desiredConfig)
}

func (config *ClusterVersionOperatorConfiguration) sync(_ context.Context, _ *operatorv1alpha1.ClusterVersionOperator) error {
klog.Infof("ClusterVersionOperator configuration has been synced")
return nil
}
50 changes: 47 additions & 3 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
clientset "github.com/openshift/client-go/config/clientset/versioned"
configinformersv1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
operatorclientset "github.com/openshift/client-go/operator/clientset/versioned"
operatorexternalversions "github.com/openshift/client-go/operator/informers/externalversions"
"github.com/openshift/library-go/pkg/manifest"
"github.com/openshift/library-go/pkg/verify"
"github.com/openshift/library-go/pkg/verify/store"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"
"github.com/openshift/cluster-version-operator/pkg/customsignaturestore"
"github.com/openshift/cluster-version-operator/pkg/cvo/configuration"
cvointernal "github.com/openshift/cluster-version-operator/pkg/cvo/internal"
"github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient"
"github.com/openshift/cluster-version-operator/pkg/featuregates"
Expand Down Expand Up @@ -93,9 +96,10 @@ type Operator struct {
// releaseCreated, if set, is the timestamp of the current update.
releaseCreated time.Time

client clientset.Interface
kubeClient kubernetes.Interface
eventRecorder record.EventRecorder
client clientset.Interface
kubeClient kubernetes.Interface
operatorClient operatorclientset.Interface
eventRecorder record.EventRecorder

// minimumUpdateCheckInterval is the minimum duration to check for updates from
// the update service.
Expand Down Expand Up @@ -142,6 +146,9 @@ type Operator struct {
// conditionRegistry is used to evaluate whether a particular condition is risky or not.
conditionRegistry clusterconditions.ConditionRegistry

// hypershift signals whether the CVO is running inside a hosted control plane.
hypershift bool

// injectClusterIdIntoPromQL indicates whether the CVO should inject the cluster id
// into PromQL queries while evaluating risks from conditional updates. This is needed
// in HyperShift to differentiate between metrics from multiple hosted clusters in
Expand Down Expand Up @@ -176,6 +183,9 @@ type Operator struct {
// alwaysEnableCapabilities is a list of the cluster capabilities which should
// always be implicitly enabled.
alwaysEnableCapabilities []configv1.ClusterVersionCapability

// configuration, if enabled, reconciles the ClusterVersionOperator configuration.
configuration *configuration.ClusterVersionOperatorConfiguration
}

// New returns a new cluster version operator.
Expand All @@ -190,10 +200,13 @@ func New(
cmConfigInformer informerscorev1.ConfigMapInformer,
cmConfigManagedInformer informerscorev1.ConfigMapInformer,
proxyInformer configinformersv1.ProxyInformer,
operatorInformerFactory operatorexternalversions.SharedInformerFactory,
client clientset.Interface,
kubeClient kubernetes.Interface,
operatorClient operatorclientset.Interface,
exclude string,
clusterProfile string,
hypershift bool,
promqlTarget clusterconditions.PromQLTarget,
injectClusterIdIntoPromQL bool,
updateService string,
Expand All @@ -219,11 +232,13 @@ func New(

client: client,
kubeClient: kubeClient,
operatorClient: operatorClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
upgradeableQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "upgradeable"}),

hypershift: hypershift,
exclude: exclude,
clusterProfile: clusterProfile,
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
Expand Down Expand Up @@ -262,6 +277,8 @@ func New(
// make sure this is initialized after all the listers are initialized
optr.upgradeableChecks = optr.defaultUpgradeableChecks()

optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)

return optr, nil
}

Expand Down Expand Up @@ -408,6 +425,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
defer optr.queue.ShutDown()
defer optr.availableUpdatesQueue.ShutDown()
defer optr.upgradeableQueue.ShutDown()
defer optr.configuration.Queue().ShutDown()
stopCh := runContext.Done()

klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
Expand Down Expand Up @@ -446,6 +464,23 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
resultChannel <- asyncResult{name: "available updates"}
}()

if optr.shouldReconcileCVOConfiguration() {
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
if err := optr.configuration.Start(runContext); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to initialize the CVO configuration sync: %v", err))
} else {
wait.UntilWithContext(runContext, func(runContext context.Context) {
optr.worker(runContext, optr.configuration.Queue(), optr.configuration.Sync)
}, time.Second)
}
resultChannel <- asyncResult{name: "cvo configuration"}
}()
} else {
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.")
}

resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
Expand Down Expand Up @@ -515,6 +550,7 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
optr.queue.ShutDown()
optr.availableUpdatesQueue.ShutDown()
optr.upgradeableQueue.ShutDown()
optr.configuration.Queue().ShutDown()
}
}

Expand Down Expand Up @@ -1011,3 +1047,11 @@ func (optr *Operator) HTTPClient() (*http.Client, error) {
Transport: transport,
}, nil
}

// shouldReconcileCVOConfiguration returns whether the CVO should reconcile its configuration using the API server.
//
// enabledFeatureGates must be initialized before the function is called.
func (optr *Operator) shouldReconcileCVOConfiguration() bool {
// The relevant CRD and CR are not applied in HyperShift, which configures the CVO via a configuration file
return optr.enabledFeatureGates.CVOConfiguration() && !optr.hypershift
}
5 changes: 5 additions & 0 deletions pkg/cvo/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type fakeRiFlags struct {
unknownVersion bool
reconciliationIssuesCondition bool
statusReleaseArchitecture bool
cvoConfiguration bool
}

func (f fakeRiFlags) UnknownVersion() bool {
Expand All @@ -216,6 +217,10 @@ func (f fakeRiFlags) StatusReleaseArchitecture() bool {
return f.statusReleaseArchitecture
}

func (f fakeRiFlags) CVOConfiguration() bool {
return f.cvoConfiguration
}

func TestUpdateClusterVersionStatus_UnknownVersionAndReconciliationIssues(t *testing.T) {
ignoreLastTransitionTime := cmpopts.IgnoreFields(configv1.ClusterOperatorStatusCondition{}, "LastTransitionTime")

Expand Down
19 changes: 19 additions & 0 deletions pkg/featuregates/featuregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type CvoGateChecker interface {
// StatusReleaseArchitecture controls whether CVO populates
// Release.Architecture in status properties like status.desired and status.history[].
StatusReleaseArchitecture() bool

// CVOConfiguration controls whether the CVO reconciles the ClusterVersionOperator resource that corresponds
// to its configuration.
CVOConfiguration() bool
}

type panicOnUsageBeforeInitializationFunc func()
Expand Down Expand Up @@ -56,6 +60,11 @@ func (p panicOnUsageBeforeInitializationFunc) UnknownVersion() bool {
return false
}

func (p panicOnUsageBeforeInitializationFunc) CVOConfiguration() bool {
p()
return false
}

// CvoGates contains flags that control CVO functionality gated by product feature gates. The
// names do not correspond to product feature gates, the booleans here are "smaller" (product-level
// gate will enable multiple CVO behaviors).
Expand All @@ -68,6 +77,7 @@ type CvoGates struct {
unknownVersion bool
reconciliationIssuesCondition bool
statusReleaseArchitecture bool
cvoConfiguration bool
}

func (c CvoGates) ReconciliationIssuesCondition() bool {
Expand All @@ -82,13 +92,18 @@ func (c CvoGates) UnknownVersion() bool {
return c.unknownVersion
}

func (c CvoGates) CVOConfiguration() bool {
return c.cvoConfiguration
}

// DefaultCvoGates apply when actual features for given version are unknown
func DefaultCvoGates(version string) CvoGates {
return CvoGates{
desiredVersion: version,
unknownVersion: true,
reconciliationIssuesCondition: false,
statusReleaseArchitecture: false,
cvoConfiguration: false,
}
}

Expand All @@ -110,6 +125,8 @@ func CvoGatesFromFeatureGate(gate *configv1.FeatureGate, version string) CvoGate
enabledGates.reconciliationIssuesCondition = true
case features.FeatureGateImageStreamImportMode:
enabledGates.statusReleaseArchitecture = true
case features.FeatureGateCVOConfiguration:
enabledGates.cvoConfiguration = true
}
}
for _, disabled := range g.Disabled {
Expand All @@ -118,6 +135,8 @@ func CvoGatesFromFeatureGate(gate *configv1.FeatureGate, version string) CvoGate
enabledGates.reconciliationIssuesCondition = false
case features.FeatureGateImageStreamImportMode:
enabledGates.statusReleaseArchitecture = false
case features.FeatureGateCVOConfiguration:
enabledGates.cvoConfiguration = false
}
}
}
Expand Down
Loading