Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 84 additions & 18 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/openshift-hyperfleet/hyperfleet-broker/broker"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

// Build-time variables set via ldflags
Expand Down Expand Up @@ -108,7 +113,9 @@ and HyperFleet API calls.`,
}

// buildLoggerConfig creates a logger configuration from environment variables
// and command-line flags. Flags take precedence over environment variables.
// buildLoggerConfig builds a logger.Config by loading defaults from the environment,
// applying any command-line flag overrides (flags take precedence), and setting the
// provided component name and the global version.
func buildLoggerConfig(component string) logger.Config {
cfg := logger.ConfigFromEnv()

Expand All @@ -129,7 +136,38 @@ func buildLoggerConfig(component string) logger.Config {
return cfg
}

// runServe contains the main application logic for the serve command
// initTracer initializes OpenTelemetry TracerProvider for generating trace_id and span_id.
// These IDs are used for:
// 1. Log correlation (via logger.WithOTelTraceContext)
// initTracer creates and registers an OpenTelemetry TracerProvider configured with the
// provided service name and version. It sets the global TracerProvider and installs the
// W3C Trace Context text-map propagator for HTTP request propagation.
// Returns the created TracerProvider, or an error if the required resource cannot be built.
func initTracer(serviceName, serviceVersion string) (*sdktrace.TracerProvider, error) {
res, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceName),
semconv.ServiceVersion(serviceVersion),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

tp := sdktrace.NewTracerProvider(
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.TraceContext{})

return tp, nil
}

// runServe executes the main application lifecycle for the serve subcommand, including logger and configuration setup, OpenTelemetry initialization, HyperFleet API and Kubernetes client creation, executor and broker subscription setup, and graceful shutdown handling.
// It returns an error if initialization fails or if an unrecoverable runtime condition occurs.
func runServe() error {
// Create context that cancels on system signals
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -148,7 +186,8 @@ func runServe() error {
log.Info(ctx, "Loading adapter configuration...")
adapterConfig, err := config_loader.Load(configPath, config_loader.WithAdapterVersion(version))
if err != nil {
log.Errorf(ctx, "Failed to load adapter configuration: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to load adapter configuration")
return fmt.Errorf("failed to load adapter configuration: %w", err)
}

Expand All @@ -164,11 +203,25 @@ func runServe() error {
adapterConfig.Spec.HyperfleetAPI.Timeout,
adapterConfig.Spec.HyperfleetAPI.RetryAttempts)

// Initialize OpenTelemetry for trace_id/span_id generation and HTTP propagation
tp, err := initTracer(adapterConfig.Metadata.Name, version)
if err != nil {
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to initialize OpenTelemetry")
return fmt.Errorf("failed to initialize OpenTelemetry: %w", err)
}
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
_ = tp.Shutdown(shutdownCtx)
}()

// Create HyperFleet API client from config
log.Info(ctx, "Creating HyperFleet API client...")
apiClient, err := createAPIClient(adapterConfig.Spec.HyperfleetAPI, log)
if err != nil {
log.Errorf(ctx, "Failed to create HyperFleet API client: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to create HyperFleet API client")
return fmt.Errorf("failed to create HyperFleet API client: %w", err)
}

Expand All @@ -177,7 +230,8 @@ func runServe() error {
log.Info(ctx, "Creating Kubernetes client...")
k8sClient, err := k8s_client.NewClient(ctx, k8s_client.ClientConfig{}, log)
if err != nil {
log.Errorf(ctx, "Failed to create Kubernetes client: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to create Kubernetes client")
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}

Expand All @@ -190,7 +244,8 @@ func runServe() error {
WithLogger(log).
Build()
if err != nil {
log.Errorf(ctx, "Failed to create executor: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to create executor")
return fmt.Errorf("failed to create executor: %w", err)
}

Expand Down Expand Up @@ -219,15 +274,19 @@ func runServe() error {
// Get subscription ID from environment
subscriptionID := os.Getenv(EnvBrokerSubscriptionID)
if subscriptionID == "" {
log.Errorf(ctx, "%s environment variable is required", EnvBrokerSubscriptionID)
return fmt.Errorf("%s environment variable is required", EnvBrokerSubscriptionID)
err := fmt.Errorf("%s environment variable is required", EnvBrokerSubscriptionID)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Missing required environment variable")
return err
}

// Get topic from environment
topic := os.Getenv(EnvBrokerTopic)
if topic == "" {
log.Errorf(ctx, "%s environment variable is required", EnvBrokerTopic)
return fmt.Errorf("%s environment variable is required", EnvBrokerTopic)
err := fmt.Errorf("%s environment variable is required", EnvBrokerTopic)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Missing required environment variable")
return err
}

// Create broker subscriber
Expand All @@ -237,9 +296,10 @@ func runServe() error {
// - BROKER_RABBITMQ_URL: RabbitMQ URL (for rabbitmq)
// - SUBSCRIBER_PARALLELISM: number of parallel workers (default: 1)
log.Info(ctx, "Creating broker subscriber...")
subscriber, err := broker.NewSubscriber(subscriptionID)
subscriber, err := broker.NewSubscriber(log, subscriptionID)
if err != nil {
log.Errorf(ctx, "Failed to create subscriber: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to create subscriber")
return fmt.Errorf("failed to create subscriber: %w", err)
}
log.Info(ctx, "Broker subscriber created successfully")
Expand All @@ -248,7 +308,8 @@ func runServe() error {
log.Info(ctx, "Subscribing to broker topic...")
err = subscriber.Subscribe(ctx, topic, handler)
if err != nil {
log.Errorf(ctx, "Failed to subscribe to topic: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Failed to subscribe to topic")
return fmt.Errorf("failed to subscribe to topic: %w", err)
}
log.Info(ctx, "Successfully subscribed to broker topic")
Expand All @@ -259,7 +320,8 @@ func runServe() error {
// Monitor subscription errors channel in a separate goroutine
go func() {
for subErr := range subscriber.Errors() {
log.Errorf(ctx, "Subscription error: %v", subErr)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, subErr), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Subscription error")
// For critical errors, signal shutdown
select {
case fatalErrCh <- subErr:
Expand All @@ -277,7 +339,8 @@ func runServe() error {
case <-ctx.Done():
log.Info(ctx, "Context cancelled, shutting down...")
case err := <-fatalErrCh:
log.Errorf(ctx, "Fatal subscription error, shutting down: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Fatal subscription error, shutting down")
cancel() // Cancel context to trigger graceful shutdown
}

Expand All @@ -295,12 +358,15 @@ func runServe() error {
select {
case err := <-closeDone:
if err != nil {
log.Errorf(ctx, "Error closing subscriber: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "Error closing subscriber")
} else {
log.Info(ctx, "Subscriber closed successfully")
}
case <-shutdownCtx.Done():
log.Error(ctx, "Subscriber close timed out after 30 seconds")
err := fmt.Errorf("subscriber close timed out after 30 seconds")
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Error(errCtx, "Subscriber close timed out")
}

log.Info(ctx, "Adapter shutdown complete")
Expand Down Expand Up @@ -338,4 +404,4 @@ func createAPIClient(apiConfig config_loader.HyperfleetAPIConfig, log logger.Log
}

return hyperfleet_api.NewClient(log, opts...)
}
}
29 changes: 24 additions & 5 deletions internal/executor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ func ToConditionDefs(conditions []config_loader.Condition) []criteria.ConditionD

// ExecuteLogAction executes a log action with the given context
// The message is rendered as a Go template with access to all params
// This is a shared utility function used by both PreconditionExecutor and PostActionExecutor
// ExecuteLogAction renders a log message template and emits the result at the configured level.
//
// If logAction is nil or its Message is empty, the function returns immediately. The Message is
// rendered as a Go template using execCtx.Params; if rendering fails the error is logged with an
// attached stack trace and the function returns. The rendered message is logged with a "[config]"
// prefix at the level specified by logAction.Level (defaults to "info").
func ExecuteLogAction(ctx context.Context, logAction *config_loader.LogAction, execCtx *ExecutionContext, log logger.Logger) {
if logAction == nil || logAction.Message == "" {
return
Expand All @@ -44,7 +49,8 @@ func ExecuteLogAction(ctx context.Context, logAction *config_loader.LogAction, e
// Render the message template
message, err := renderTemplate(logAction.Message, execCtx.Params)
if err != nil {
log.Errorf(ctx, "failed to render log message: %v", err)
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, err), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "failed to render log message")
return
}

Expand Down Expand Up @@ -72,7 +78,13 @@ func ExecuteLogAction(ctx context.Context, logAction *config_loader.LogAction, e
// ExecuteAPICall executes an API call with the given configuration and returns the response and rendered URL
// This is a shared utility function used by both PreconditionExecutor and PostActionExecutor
// On error, it returns an APIError with full context (method, URL, status, body, attempts, duration)
// Returns: response, renderedURL, error
// ExecuteAPICall executes the HTTP API call described by apiCall using apiClient.
//
// It renders the URL, header values, and body templates with execCtx.Params, applies timeout and retry options when specified,
// and performs the request with the HTTP method from apiCall (supports GET, POST, PUT, PATCH, DELETE).
// On error, it returns any available response along with an APIError that includes method, URL, status, body, attempts, duration, and the underlying error.
// If apiCall is nil or template rendering fails, a descriptive error is returned.
// If the HTTP method is unsupported, an error indicating the unsupported method is returned.
func ExecuteAPICall(ctx context.Context, apiCall *config_loader.APICall, execCtx *ExecutionContext, apiClient hyperfleet_api.Client, log logger.Logger) (*hyperfleet_api.Response, string, error) {
if apiCall == nil {
return nil, "", fmt.Errorf("apiCall is nil")
Expand Down Expand Up @@ -137,7 +149,14 @@ func ExecuteAPICall(ctx context.Context, apiCall *config_loader.APICall, execCtx
resp, err = apiClient.Post(ctx, url, body, opts...)
// Log body on failure for debugging
if err != nil || (resp != nil && !resp.IsSuccess()) {
log.Errorf(ctx, "POST %s failed, request body: %s", url, string(body))
var logErr error
if err != nil {
logErr = err
} else {
logErr = fmt.Errorf("POST %s returned non-success status: %d", url, resp.StatusCode)
}
errCtx := logger.WithStackTraceField(logger.WithErrorField(ctx, logErr), logger.CaptureStackTrace(1))
log.Errorf(errCtx, "POST %s failed, request body: %s", url, string(body))
}
case http.MethodPut:
body := []byte(apiCall.Body)
Expand Down Expand Up @@ -385,4 +404,4 @@ func adapterMetadataToMap(adapter *AdapterMetadata) map[string]interface{} {
"errorMessage": adapter.ErrorMessage,
"executionError": executionErrorToMap(adapter.ExecutionError),
}
}
}
Loading