Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 61 additions & 16 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/openshift-hyperfleet/hyperfleet-adapter/internal/hyperfleet_api"
"github.com/openshift-hyperfleet/hyperfleet-adapter/internal/k8s_client"
"github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger"
"github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/otel"
"github.com/openshift-hyperfleet/hyperfleet-broker/broker"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -41,6 +42,12 @@ const (
EnvBrokerTopic = "BROKER_TOPIC"
)

// Timeout constants
const (
// OTelShutdownTimeout is the timeout for gracefully shutting down the OpenTelemetry TracerProvider
OTelShutdownTimeout = 5 * time.Second
)

func main() {
// Root command
rootCmd := &cobra.Command{
Expand Down Expand Up @@ -148,7 +155,8 @@ func runServe() error {
log.Info(ctx, "Loading adapter configuration...")
adapterConfig, err := config_loader.Load(configPath, config_loader.WithAdapterVersion(version))
if err != nil {
log.Errorf(ctx, "Failed to load adapter configuration: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to load adapter configuration")
return fmt.Errorf("failed to load adapter configuration: %w", err)
}

Expand All @@ -164,11 +172,31 @@ func runServe() error {
adapterConfig.Spec.HyperfleetAPI.Timeout,
adapterConfig.Spec.HyperfleetAPI.RetryAttempts)

// Get trace sample ratio from environment (default: 10%)
sampleRatio := otel.GetTraceSampleRatio(log, ctx)

// Initialize OpenTelemetry for trace_id/span_id generation and HTTP propagation
tp, err := otel.InitTracer(adapterConfig.Metadata.Name, version, sampleRatio)
if err != nil {
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to initialize OpenTelemetry")
return fmt.Errorf("failed to initialize OpenTelemetry: %w", err)
}
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), OTelShutdownTimeout)
defer shutdownCancel()
if err := tp.Shutdown(shutdownCtx); err != nil {
errCtx := logger.WithErrorField(shutdownCtx, err)
log.Warnf(errCtx, "Failed to shutdown TracerProvider")
}
}()

// Create HyperFleet API client from config
log.Info(ctx, "Creating HyperFleet API client...")
apiClient, err := createAPIClient(adapterConfig.Spec.HyperfleetAPI, log)
if err != nil {
log.Errorf(ctx, "Failed to create HyperFleet API client: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to create HyperFleet API client")
return fmt.Errorf("failed to create HyperFleet API client: %w", err)
}

Expand All @@ -177,7 +205,8 @@ func runServe() error {
log.Info(ctx, "Creating Kubernetes client...")
k8sClient, err := k8s_client.NewClient(ctx, k8s_client.ClientConfig{}, log)
if err != nil {
log.Errorf(ctx, "Failed to create Kubernetes client: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to create Kubernetes client")
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}

Expand All @@ -190,7 +219,8 @@ func runServe() error {
WithLogger(log).
Build()
if err != nil {
log.Errorf(ctx, "Failed to create executor: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to create executor")
return fmt.Errorf("failed to create executor: %w", err)
}

Expand Down Expand Up @@ -219,15 +249,19 @@ func runServe() error {
// Get subscription ID from environment
subscriptionID := os.Getenv(EnvBrokerSubscriptionID)
if subscriptionID == "" {
log.Errorf(ctx, "%s environment variable is required", EnvBrokerSubscriptionID)
return fmt.Errorf("%s environment variable is required", EnvBrokerSubscriptionID)
err := fmt.Errorf("%s environment variable is required", EnvBrokerSubscriptionID)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Missing required environment variable")
return err
}

// Get topic from environment
topic := os.Getenv(EnvBrokerTopic)
if topic == "" {
log.Errorf(ctx, "%s environment variable is required", EnvBrokerTopic)
return fmt.Errorf("%s environment variable is required", EnvBrokerTopic)
err := fmt.Errorf("%s environment variable is required", EnvBrokerTopic)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Missing required environment variable")
return err
}

// Create broker subscriber
Expand All @@ -237,9 +271,10 @@ func runServe() error {
// - BROKER_RABBITMQ_URL: RabbitMQ URL (for rabbitmq)
// - SUBSCRIBER_PARALLELISM: number of parallel workers (default: 1)
log.Info(ctx, "Creating broker subscriber...")
subscriber, err := broker.NewSubscriber(subscriptionID)
subscriber, err := broker.NewSubscriber(log, subscriptionID)
if err != nil {
log.Errorf(ctx, "Failed to create subscriber: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to create subscriber")
return fmt.Errorf("failed to create subscriber: %w", err)
}
log.Info(ctx, "Broker subscriber created successfully")
Expand All @@ -248,18 +283,24 @@ func runServe() error {
log.Info(ctx, "Subscribing to broker topic...")
err = subscriber.Subscribe(ctx, topic, handler)
if err != nil {
log.Errorf(ctx, "Failed to subscribe to topic: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Failed to subscribe to topic")
return fmt.Errorf("failed to subscribe to topic: %w", err)
}
log.Info(ctx, "Successfully subscribed to broker topic")

// Channel to signal fatal errors from the errors goroutine
fatalErrCh := make(chan error, 1)

// Monitor subscription errors channel in a separate goroutine
// Monitor subscription errors channel in a separate goroutine.
// Note: Error context here reflects the handler's location, not the error's origin
// in the broker library. Stack traces (if captured) would show this goroutine's
// call stack. For richer error context, the broker library would need to provide
// errors with embedded stack traces or structured error details.
go func() {
for subErr := range subscriber.Errors() {
log.Errorf(ctx, "Subscription error: %v", subErr)
errCtx := logger.WithErrorField(ctx, subErr)
log.Errorf(errCtx, "Subscription error")
// For critical errors, signal shutdown
select {
case fatalErrCh <- subErr:
Expand All @@ -277,7 +318,8 @@ func runServe() error {
case <-ctx.Done():
log.Info(ctx, "Context cancelled, shutting down...")
case err := <-fatalErrCh:
log.Errorf(ctx, "Fatal subscription error, shutting down: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Fatal subscription error, shutting down")
cancel() // Cancel context to trigger graceful shutdown
}

Expand All @@ -295,12 +337,15 @@ func runServe() error {
select {
case err := <-closeDone:
if err != nil {
log.Errorf(ctx, "Error closing subscriber: %v", err)
errCtx := logger.WithErrorField(ctx, err)
log.Errorf(errCtx, "Error closing subscriber")
} else {
log.Info(ctx, "Subscriber closed successfully")
}
case <-shutdownCtx.Done():
log.Error(ctx, "Subscriber close timed out after 30 seconds")
err := fmt.Errorf("subscriber close timed out after 30 seconds")
errCtx := logger.WithErrorField(ctx, err)
log.Error(errCtx, "Subscriber close timed out")
}

log.Info(ctx, "Adapter shutdown complete")
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ require (
github.com/docker/go-connections v0.6.0
github.com/google/cel-go v0.26.1
github.com/mitchellh/copystructure v1.2.0
github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0
github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.10
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
golang.org/x/text v0.32.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.34.1
Expand Down Expand Up @@ -116,9 +119,7 @@ require (
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.43.0 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M=
github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0 h1:JfIF9ZVFIaXmo0hKAN42SCwItpqEHXtc54+WHRKuYg4=
github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0/go.mod h1:z7QpS2m6gaqTbgPazl1lYYy+JuyNDMkMtco12rM29nU=
github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1 h1:Mvx0ojBvttYlwu3VfOHwvH+eEM1xA40GzNOZqv1cGyQ=
github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1/go.mod h1:z7QpS2m6gaqTbgPazl1lYYy+JuyNDMkMtco12rM29nU=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -311,20 +311,20 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0/go.mod h1:snMWehoOh2wsEwnvvwtDyFCxVeDAODenXHtn5vzrKjo=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q=
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE=
go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis=
go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4=
go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg=
go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM=
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
Expand Down
1 change: 0 additions & 1 deletion internal/config_loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,6 @@ func TestValidateResourceDiscovery(t *testing.T) {
}
}


func TestConditionValuesAlias(t *testing.T) {
// Test that both "value" and "values" YAML keys are supported
tests := []struct {
Expand Down
1 change: 0 additions & 1 deletion internal/config_loader/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func TestValidateK8sManifests(t *testing.T) {
})
}


func TestValidOperators(t *testing.T) {
// Verify all expected operators are defined in criteria package
expectedOperators := []string{
Expand Down
Loading