From 72c33c1418443f966784b6c7888f195abf51aa93 Mon Sep 17 00:00:00 2001 From: xueli Date: Thu, 8 Jan 2026 20:01:21 +0800 Subject: [PATCH] HYPERFLEET-463 - Improved structured logging fields for enhanced filtering and correlation of adapter logger In this PR I added - Put error to logger fields - Put stack_trace to logger fields - Put resource_type, resource_name, resource_result, observed generation to logger fields - Bumped broker lib and updated the code --- cmd/adapter/main.go | 77 +++- go.mod | 9 +- go.sum | 24 +- internal/config_loader/loader_test.go | 1 - internal/config_loader/validator_test.go | 1 - internal/executor/executor.go | 62 ++- internal/executor/post_action_executor.go | 8 +- internal/executor/precondition_executor.go | 7 +- internal/executor/resource_executor.go | 48 ++- internal/executor/utils.go | 12 +- internal/hyperfleet_api/client.go | 26 +- pkg/logger/context.go | 102 ++++- pkg/logger/logger.go | 50 +-- pkg/logger/logger_test.go | 34 +- pkg/logger/stack_trace.go | 134 ++++++ pkg/logger/with_error_field_test.go | 472 +++++++++++++++++++++ pkg/otel/propagation.go | 57 +++ pkg/otel/propagation_test.go | 241 +++++++++++ pkg/otel/tracer.go | 100 +++++ 19 files changed, 1311 insertions(+), 154 deletions(-) create mode 100644 pkg/logger/stack_trace.go create mode 100644 pkg/logger/with_error_field_test.go create mode 100644 pkg/otel/propagation.go create mode 100644 pkg/otel/propagation_test.go create mode 100644 pkg/otel/tracer.go diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index 1bf8299..72df0ba 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -14,6 +14,7 @@ import ( "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/hyperfleet_api" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/k8s_client" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" + "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/otel" "github.com/openshift-hyperfleet/hyperfleet-broker/broker" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -41,6 +42,12 @@ const ( EnvBrokerTopic = "BROKER_TOPIC" ) +// Timeout constants +const ( + // OTelShutdownTimeout is the timeout for gracefully shutting down the OpenTelemetry TracerProvider + OTelShutdownTimeout = 5 * time.Second +) + func main() { // Root command rootCmd := &cobra.Command{ @@ -148,7 +155,8 @@ func runServe() error { log.Info(ctx, "Loading adapter configuration...") adapterConfig, err := config_loader.Load(configPath, config_loader.WithAdapterVersion(version)) if err != nil { - log.Errorf(ctx, "Failed to load adapter configuration: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to load adapter configuration") return fmt.Errorf("failed to load adapter configuration: %w", err) } @@ -164,11 +172,31 @@ func runServe() error { adapterConfig.Spec.HyperfleetAPI.Timeout, adapterConfig.Spec.HyperfleetAPI.RetryAttempts) + // Get trace sample ratio from environment (default: 10%) + sampleRatio := otel.GetTraceSampleRatio(log, ctx) + + // Initialize OpenTelemetry for trace_id/span_id generation and HTTP propagation + tp, err := otel.InitTracer(adapterConfig.Metadata.Name, version, sampleRatio) + if err != nil { + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to initialize OpenTelemetry") + return fmt.Errorf("failed to initialize OpenTelemetry: %w", err) + } + defer func() { + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), OTelShutdownTimeout) + defer shutdownCancel() + if err := tp.Shutdown(shutdownCtx); err != nil { + errCtx := logger.WithErrorField(shutdownCtx, err) + log.Warnf(errCtx, "Failed to shutdown TracerProvider") + } + }() + // Create HyperFleet API client from config log.Info(ctx, "Creating HyperFleet API client...") apiClient, err := createAPIClient(adapterConfig.Spec.HyperfleetAPI, log) if err != nil { - log.Errorf(ctx, "Failed to create HyperFleet API client: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to create HyperFleet API client") return fmt.Errorf("failed to create HyperFleet API client: %w", err) } @@ -177,7 +205,8 @@ func runServe() error { log.Info(ctx, "Creating Kubernetes client...") k8sClient, err := k8s_client.NewClient(ctx, k8s_client.ClientConfig{}, log) if err != nil { - log.Errorf(ctx, "Failed to create Kubernetes client: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to create Kubernetes client") return fmt.Errorf("failed to create Kubernetes client: %w", err) } @@ -190,7 +219,8 @@ func runServe() error { WithLogger(log). Build() if err != nil { - log.Errorf(ctx, "Failed to create executor: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to create executor") return fmt.Errorf("failed to create executor: %w", err) } @@ -219,15 +249,19 @@ func runServe() error { // Get subscription ID from environment subscriptionID := os.Getenv(EnvBrokerSubscriptionID) if subscriptionID == "" { - log.Errorf(ctx, "%s environment variable is required", EnvBrokerSubscriptionID) - return fmt.Errorf("%s environment variable is required", EnvBrokerSubscriptionID) + err := fmt.Errorf("%s environment variable is required", EnvBrokerSubscriptionID) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Missing required environment variable") + return err } // Get topic from environment topic := os.Getenv(EnvBrokerTopic) if topic == "" { - log.Errorf(ctx, "%s environment variable is required", EnvBrokerTopic) - return fmt.Errorf("%s environment variable is required", EnvBrokerTopic) + err := fmt.Errorf("%s environment variable is required", EnvBrokerTopic) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Missing required environment variable") + return err } // Create broker subscriber @@ -237,9 +271,10 @@ func runServe() error { // - BROKER_RABBITMQ_URL: RabbitMQ URL (for rabbitmq) // - SUBSCRIBER_PARALLELISM: number of parallel workers (default: 1) log.Info(ctx, "Creating broker subscriber...") - subscriber, err := broker.NewSubscriber(subscriptionID) + subscriber, err := broker.NewSubscriber(log, subscriptionID) if err != nil { - log.Errorf(ctx, "Failed to create subscriber: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to create subscriber") return fmt.Errorf("failed to create subscriber: %w", err) } log.Info(ctx, "Broker subscriber created successfully") @@ -248,7 +283,8 @@ func runServe() error { log.Info(ctx, "Subscribing to broker topic...") err = subscriber.Subscribe(ctx, topic, handler) if err != nil { - log.Errorf(ctx, "Failed to subscribe to topic: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to subscribe to topic") return fmt.Errorf("failed to subscribe to topic: %w", err) } log.Info(ctx, "Successfully subscribed to broker topic") @@ -256,10 +292,15 @@ func runServe() error { // Channel to signal fatal errors from the errors goroutine fatalErrCh := make(chan error, 1) - // Monitor subscription errors channel in a separate goroutine + // Monitor subscription errors channel in a separate goroutine. + // Note: Error context here reflects the handler's location, not the error's origin + // in the broker library. Stack traces (if captured) would show this goroutine's + // call stack. For richer error context, the broker library would need to provide + // errors with embedded stack traces or structured error details. go func() { for subErr := range subscriber.Errors() { - log.Errorf(ctx, "Subscription error: %v", subErr) + errCtx := logger.WithErrorField(ctx, subErr) + log.Errorf(errCtx, "Subscription error") // For critical errors, signal shutdown select { case fatalErrCh <- subErr: @@ -277,7 +318,8 @@ func runServe() error { case <-ctx.Done(): log.Info(ctx, "Context cancelled, shutting down...") case err := <-fatalErrCh: - log.Errorf(ctx, "Fatal subscription error, shutting down: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Fatal subscription error, shutting down") cancel() // Cancel context to trigger graceful shutdown } @@ -295,12 +337,15 @@ func runServe() error { select { case err := <-closeDone: if err != nil { - log.Errorf(ctx, "Error closing subscriber: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Error closing subscriber") } else { log.Info(ctx, "Subscriber closed successfully") } case <-shutdownCtx.Done(): - log.Error(ctx, "Subscriber close timed out after 30 seconds") + err := fmt.Errorf("subscriber close timed out after 30 seconds") + errCtx := logger.WithErrorField(ctx, err) + log.Error(errCtx, "Subscriber close timed out") } log.Info(ctx, "Adapter shutdown complete") diff --git a/go.mod b/go.mod index dd8c07b..168af68 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,14 @@ require ( github.com/docker/go-connections v0.6.0 github.com/google/cel-go v0.26.1 github.com/mitchellh/copystructure v1.2.0 - github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0 + github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/sdk v1.38.0 + go.opentelemetry.io/otel/trace v1.38.0 golang.org/x/text v0.32.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.34.1 @@ -116,9 +119,7 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect - go.opentelemetry.io/otel v1.36.0 // indirect - go.opentelemetry.io/otel/metric v1.36.0 // indirect - go.opentelemetry.io/otel/trace v1.36.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.43.0 // indirect diff --git a/go.sum b/go.sum index e91d12f..3d50914 100644 --- a/go.sum +++ b/go.sum @@ -228,8 +228,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= -github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0 h1:JfIF9ZVFIaXmo0hKAN42SCwItpqEHXtc54+WHRKuYg4= -github.com/openshift-hyperfleet/hyperfleet-broker v1.0.0/go.mod h1:z7QpS2m6gaqTbgPazl1lYYy+JuyNDMkMtco12rM29nU= +github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1 h1:Mvx0ojBvttYlwu3VfOHwvH+eEM1xA40GzNOZqv1cGyQ= +github.com/openshift-hyperfleet/hyperfleet-broker v1.0.1/go.mod h1:z7QpS2m6gaqTbgPazl1lYYy+JuyNDMkMtco12rM29nU= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -311,20 +311,20 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.6 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0/go.mod h1:snMWehoOh2wsEwnvvwtDyFCxVeDAODenXHtn5vzrKjo= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= -go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= -go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= -go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= -go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= -go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs= -go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY= -go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis= -go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4= -go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= -go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= diff --git a/internal/config_loader/loader_test.go b/internal/config_loader/loader_test.go index 76cf2ef..8d0c195 100644 --- a/internal/config_loader/loader_test.go +++ b/internal/config_loader/loader_test.go @@ -1222,7 +1222,6 @@ func TestValidateResourceDiscovery(t *testing.T) { } } - func TestConditionValuesAlias(t *testing.T) { // Test that both "value" and "values" YAML keys are supported tests := []struct { diff --git a/internal/config_loader/validator_test.go b/internal/config_loader/validator_test.go index 314f3ed..5a0d408 100644 --- a/internal/config_loader/validator_test.go +++ b/internal/config_loader/validator_test.go @@ -270,7 +270,6 @@ func TestValidateK8sManifests(t *testing.T) { }) } - func TestValidOperators(t *testing.T) { // Verify all expected operators are defined in criteria package expectedOperators := []string{ diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 77d0914..80c26df 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -5,12 +5,16 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "github.com/cloudevents/sdk-go/v2/event" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/config_loader" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/hyperfleet_api" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/k8s_client" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" + pkgotel "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/otel" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) // NewExecutor creates a new Executor with the given configuration @@ -50,12 +54,16 @@ func validateExecutorConfig(config *ExecutorConfig) error { // The caller is responsible for: // - Adding event ID to context for logging correlation using logger.WithEventID() func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResult { + // Start OTel span and add trace context to logs + ctx, span := e.startTracedExecution(ctx) + defer span.End() // Parse event data eventData, rawData, err := ParseEventData(data) if err != nil { parseErr := fmt.Errorf("failed to parse event data: %w", err) - e.log.Errorf(ctx, "Failed to parse event data: error=%v", parseErr) + errCtx := logger.WithErrorField(ctx, parseErr) + e.log.Errorf(errCtx, "Failed to parse event data") return &ExecutionResult{ Status: StatusFailed, CurrentPhase: PhaseParamExtraction, @@ -65,11 +73,11 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu // This is intended to set OwnerReference and ResourceID for the event when it exist // For example, when a NodePool event arrived - // the logger will set the cluster_id:owner_id , resource_id: nodepool_id and resource_type: nodepool - // but when a resource is cluster type, it will just record cluster_id:resource_id + // the logger will set the cluster_id=owner_id, nodepool_id=resource_id, resource_type=nodepool + // but when a resource is cluster type, it will just record cluster_id=resource_id if eventData.OwnedReference != nil { - ctx = logger.WithResourceID( - logger.WithResourceType(ctx, eventData.Kind), eventData.ID) + ctx = logger.WithResourceType(ctx, eventData.Kind) + ctx = logger.WithDynamicResourceID(ctx, eventData.Kind, eventData.ID) ctx = logger.WithDynamicResourceID(ctx, eventData.OwnedReference.Kind, eventData.OwnedReference.ID) } else { ctx = logger.WithDynamicResourceID(ctx, eventData.Kind, eventData.ID) @@ -110,7 +118,8 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu precondErr := fmt.Errorf("precondition evaluation failed: error=%w", precondOutcome.Error) result.Errors[result.CurrentPhase] = precondErr execCtx.SetError("PreconditionFailed", precondOutcome.Error.Error()) - e.log.Errorf(ctx, "Phase %s: FAILED - error=%v", result.CurrentPhase, precondOutcome.Error) + errCtx := logger.WithErrorField(ctx, precondOutcome.Error) + e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase) result.ResourcesSkipped = true result.SkipReason = "PreconditionFailed" execCtx.SetSkipped("PreconditionFailed", precondOutcome.Error.Error()) @@ -138,7 +147,8 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu resErr := fmt.Errorf("resource execution failed: %w", err) result.Errors[result.CurrentPhase] = resErr execCtx.SetError("ResourceFailed", err.Error()) - e.log.Errorf(ctx, "Phase %s: FAILED - error=%v", result.CurrentPhase, err) + errCtx := logger.WithErrorField(ctx, err) + e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase) // Continue to post actions for error reporting } else { e.log.Infof(ctx, "Phase %s: SUCCESS - %d processed", result.CurrentPhase, len(resourceResults)) @@ -161,7 +171,8 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu result.Status = StatusFailed postErr := fmt.Errorf("post action execution failed: %w", err) result.Errors[result.CurrentPhase] = postErr - e.log.Errorf(ctx, "Phase %s: FAILED - error=%v", result.CurrentPhase, err) + errCtx := logger.WithErrorField(ctx, err) + e.log.Errorf(errCtx, "Phase %s: FAILED", result.CurrentPhase) } else { e.log.Infof(ctx, "Phase %s: SUCCESS - %d executed", result.CurrentPhase, len(postResults)) } @@ -172,7 +183,14 @@ func (e *Executor) Execute(ctx context.Context, data interface{}) *ExecutionResu if result.Status == StatusSuccess { e.log.Infof(ctx, "Event execution finished: event_execution_status=success resources_skipped=%t reason=%s", result.ResourcesSkipped, result.SkipReason) } else { - e.log.Errorf(ctx, "Event execution finished: event_execution_status=failed event_execution_errors=%v", result.Errors) + // Combine all errors into a single error for logging + var errMsgs []string + for phase, err := range result.Errors { + errMsgs = append(errMsgs, fmt.Sprintf("%s: %v", phase, err)) + } + combinedErr := fmt.Errorf("execution failed: %s", strings.Join(errMsgs, "; ")) + errCtx := logger.WithErrorField(ctx, combinedErr) + e.log.Errorf(errCtx, "Event execution finished: event_execution_status=failed") } return result } @@ -190,6 +208,23 @@ func (e *Executor) executeParamExtraction(execCtx *ExecutionContext) error { return nil } +// startTracedExecution creates an OTel span and adds trace context to logs. +// Returns the enriched context and span. Caller must call span.End() when done. +// +// This method: +// - Creates an OTel span with trace_id and span_id (for distributed tracing) +// - Adds trace_id and span_id to logger context (for log correlation) +// - The trace context is automatically propagated to outgoing HTTP requests +func (e *Executor) startTracedExecution(ctx context.Context) (context.Context, trace.Span) { + componentName := e.config.AdapterConfig.Metadata.Name + ctx, span := otel.Tracer(componentName).Start(ctx, "Execute") + + // Add trace_id and span_id to logger context for log correlation + ctx = logger.WithOTelTraceContext(ctx) + + return ctx, span +} + // CreateHandler creates an event handler function that can be used with the broker subscriber // This is a convenience method for integrating with the broker_consumer package // @@ -201,14 +236,19 @@ func (e *Executor) CreateHandler() func(ctx context.Context, evt *event.Event) e // Add event ID to context for logging correlation ctx = logger.WithEventID(ctx, evt.ID()) + // Extract W3C trace context from CloudEvent extensions (if present) + // This enables distributed tracing when upstream services (e.g., Sentinel) + // include traceparent/tracestate in the CloudEvent + ctx = pkgotel.ExtractTraceContextFromCloudEvent(ctx, evt) + // Log event metadata e.log.Infof(ctx, "Event received: id=%s type=%s source=%s time=%s", evt.ID(), evt.Type(), evt.Source(), evt.Time()) _ = e.Execute(ctx, evt.Data()) - e.log.Infof(ctx, "Event processed: id=%s type=%s source=%s time=%s", - evt.ID(), evt.Type(), evt.Source(), evt.Time()) + e.log.Infof(ctx, "Event processed: type=%s source=%s time=%s", + evt.Type(), evt.Source(), evt.Time()) return nil } diff --git a/internal/executor/post_action_executor.go b/internal/executor/post_action_executor.go index 85aa1dd..85b9638 100644 --- a/internal/executor/post_action_executor.go +++ b/internal/executor/post_action_executor.go @@ -37,7 +37,8 @@ func (pae *PostActionExecutor) ExecuteAll(ctx context.Context, postConfig *confi if len(postConfig.Payloads) > 0 { pae.log.Infof(ctx, "Building %d post payloads", len(postConfig.Payloads)) if err := pae.buildPostPayloads(ctx, postConfig.Payloads, execCtx); err != nil { - pae.log.Errorf(ctx, "Failed to build post payloads: %v", err) + errCtx := logger.WithErrorField(ctx, err) + pae.log.Errorf(errCtx, "Failed to build post payloads") execCtx.Adapter.ExecutionError = &ExecutionError{ Phase: string(PhasePostActions), Step: "build_payloads", @@ -57,7 +58,8 @@ func (pae *PostActionExecutor) ExecuteAll(ctx context.Context, postConfig *confi results = append(results, result) if err != nil { - pae.log.Errorf(ctx, "PostAction[%s] processed: FAILED - error=%v", action.Name, err) + errCtx := logger.WithErrorField(ctx, err) + pae.log.Errorf(errCtx, "PostAction[%s] processed: FAILED", action.Name) // Set ExecutionError for failed post action execCtx.Adapter.ExecutionError = &ExecutionError{ @@ -215,11 +217,9 @@ func (pae *PostActionExecutor) executePostAction(ctx context.Context, action con // Execute API call if configured if action.APICall != nil { - pae.log.Debugf(ctx, "Making API call: %s %s", action.APICall.Method, action.APICall.URL) if err := pae.executeAPICall(ctx, action.APICall, execCtx, &result); err != nil { return result, err } - pae.log.Debugf(ctx, "API call completed: HTTP %d", result.HTTPStatus) } return result, nil diff --git a/internal/executor/precondition_executor.go b/internal/executor/precondition_executor.go index 470d881..d0cdbd4 100644 --- a/internal/executor/precondition_executor.go +++ b/internal/executor/precondition_executor.go @@ -38,7 +38,8 @@ func (pe *PreconditionExecutor) ExecuteAll(ctx context.Context, preconditions [] if err != nil { // Execution error (API call failed, parse error, etc.) - pe.log.Errorf(ctx, "Precondition[%s] evaluated: FAILED - error=%v", precond.Name, err) + errCtx := logger.WithErrorField(ctx, err) + pe.log.Errorf(errCtx, "Precondition[%s] evaluated: FAILED", precond.Name) return &PreconditionsOutcome{ AllMatched: false, Results: results, @@ -83,7 +84,6 @@ func (pe *PreconditionExecutor) executePrecondition(ctx context.Context, precond // Step 2: Make API call if configured if precond.APICall != nil { - pe.log.Debugf(ctx, "Making API call: %s %s", precond.APICall.Method, precond.APICall.URL) apiResult, err := pe.executeAPICall(ctx, precond.APICall, execCtx) if err != nil { result.Status = StatusFailed @@ -150,7 +150,6 @@ func (pe *PreconditionExecutor) executePrecondition(ctx context.Context, precond } } } - pe.log.Debugf(ctx, "API call completed, response captured") } // Step 3: Evaluate conditions @@ -186,7 +185,7 @@ func (pe *PreconditionExecutor) executePrecondition(ctx context.Context, precond if cr.Matched { pe.log.Debugf(ctx, "Condition: %s %s %v = %v (matched)", cr.Field, cr.Operator, cr.ExpectedValue, cr.FieldValue) } else { - pe.log.Infof(ctx, "Condition[%s] evaluated: NOT_MET - %s %s %v (actual: %v)", precond.Name, cr.Field, cr.Operator, cr.ExpectedValue, cr.FieldValue) + pe.log.Debugf(ctx, "Condition: %s %s %v = %v (not matched)", cr.Field, cr.Operator, cr.ExpectedValue, cr.FieldValue) } } diff --git a/internal/executor/resource_executor.go b/internal/executor/resource_executor.go index facdd66..74a304d 100644 --- a/internal/executor/resource_executor.go +++ b/internal/executor/resource_executor.go @@ -73,7 +73,12 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config result.Namespace = manifest.GetNamespace() result.ResourceName = manifest.GetName() - re.log.Infof(ctx, "Resource[%s] manifest built: kind=%s name=%s namespace=%s", resource.Name, gvk.Kind, manifest.GetName(), manifest.GetNamespace()) + // Add K8s resource context fields for logging (separate from event resource_type/resource_id) + ctx = logger.WithK8sKind(ctx, result.Kind) + ctx = logger.WithK8sName(ctx, result.ResourceName) + ctx = logger.WithK8sNamespace(ctx, result.Namespace) + + re.log.Debugf(ctx, "Resource[%s] manifest built: namespace=%s", resource.Name, manifest.GetNamespace()) // Step 2: Check for existing resource using discovery var existingResource *unstructured.Unstructured @@ -99,10 +104,17 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config } // Step 3: Determine and perform the appropriate operation + // Extract manifest generation once for use in comparison and logging + manifestGen := k8s_client.GetGenerationAnnotation(manifest) + + // Add observed_generation to context early so it appears in all subsequent logs + if manifestGen > 0 { + ctx = logger.WithObservedGeneration(ctx, manifestGen) + } + if existingResource != nil { // Check if generation annotations match - skip update if unchanged existingGen := k8s_client.GetGenerationAnnotation(existingResource) - manifestGen := k8s_client.GetGenerationAnnotation(manifest) if existingGen == manifestGen { // Generations match - no action needed @@ -126,8 +138,8 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config } // Log the operation decision - re.log.Infof(ctx, "Resource[%s] is processing: operation=%s kind=%s name=%s reason=%s", - resource.Name, strings.ToUpper(string(result.Operation)), gvk.Kind, manifest.GetName(), result.OperationReason) + re.log.Infof(ctx, "Resource[%s] is processing: operation=%s reason=%s", + resource.Name, strings.ToUpper(string(result.Operation)), result.OperationReason) // Execute the operation switch result.Operation { @@ -140,7 +152,7 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config case OperationSkip: // No action needed, resource already set above } - + if err != nil { result.Status = StatusFailed result.Error = err @@ -150,13 +162,16 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config Step: resource.Name, Message: err.Error(), } - re.log.Errorf(ctx, "Resource[%s] processed: FAILED - operation=%s reason=%s kind=%s name=%s error=%v", - resource.Name, result.Operation, result.OperationReason, gvk.Kind, manifest.GetName(), err) + errCtx := logger.WithK8sResult(ctx, "FAILED") + errCtx = logger.WithErrorField(errCtx, err) + re.log.Errorf(errCtx, "Resource[%s] processed: operation=%s reason=%s", + resource.Name, result.Operation, result.OperationReason) return result, NewExecutorError(PhaseResources, resource.Name, fmt.Sprintf("failed to %s resource", result.Operation), err) } - re.log.Infof(ctx, "Resource[%s] processed: SUCCESS - operation=%s reason=%s kind=%s name=%s", - resource.Name, result.Operation, result.OperationReason, gvk.Kind, manifest.GetName()) + successCtx := logger.WithK8sResult(ctx, "SUCCESS") + re.log.Infof(successCtx, "Resource[%s] processed: operation=%s reason=%s", + resource.Name, result.Operation, result.OperationReason) // Store resource in execution context if result.Resource != nil { @@ -321,19 +336,19 @@ func (re *ResourceExecutor) recreateResource(ctx context.Context, existing, mani name := existing.GetName() // Delete the existing resource - re.log.Infof(ctx, "Deleting resource for recreation: %s/%s", gvk.Kind, name) + re.log.Debugf(ctx, "Deleting resource for recreation") if err := re.k8sClient.DeleteResource(ctx, gvk, namespace, name); err != nil { return nil, fmt.Errorf("failed to delete resource for recreation: %w", err) } // Wait for the resource to be fully deleted - re.log.Infof(ctx, "Waiting for resource deletion to complete: %s/%s", gvk.Kind, name) + re.log.Debugf(ctx, "Waiting for resource deletion to complete") if err := re.waitForDeletion(ctx, gvk, namespace, name); err != nil { return nil, fmt.Errorf("failed waiting for resource deletion: %w", err) } // Create the new resource - re.log.Infof(ctx, "Creating new resource after deletion confirmed: %s/%s", gvk.Kind, manifest.GetName()) + re.log.Debugf(ctx, "Creating new resource after deletion confirmed") return re.k8sClient.CreateResource(ctx, manifest) } @@ -348,22 +363,23 @@ func (re *ResourceExecutor) waitForDeletion(ctx context.Context, gvk schema.Grou for { select { case <-ctx.Done(): - re.log.Warnf(ctx, "Context cancelled/timed out while waiting for deletion of %s/%s", gvk.Kind, name) + re.log.Warnf(ctx, "Context cancelled/timed out while waiting for deletion") return fmt.Errorf("context cancelled while waiting for resource deletion: %w", ctx.Err()) case <-ticker.C: _, err := re.k8sClient.GetResource(ctx, gvk, namespace, name) if err != nil { // NotFound means the resource is deleted - this is success if apierrors.IsNotFound(err) { - re.log.Infof(ctx, "Resource deletion confirmed: %s/%s", gvk.Kind, name) + re.log.Debugf(ctx, "Resource deletion confirmed") return nil } // Any other error is unexpected - re.log.Errorf(ctx, "Error checking resource deletion status for %s/%s: %v", gvk.Kind, name, err) + errCtx := logger.WithErrorField(ctx, err) + re.log.Errorf(errCtx, "Error checking resource deletion status") return fmt.Errorf("error checking deletion status: %w", err) } // Resource still exists, continue polling - re.log.Debugf(ctx, "Resource %s/%s still exists, waiting for deletion...", gvk.Kind, name) + re.log.Debugf(ctx, "Resource still exists, waiting for deletion...") } } } diff --git a/internal/executor/utils.go b/internal/executor/utils.go index 87dea7c..ee31d38 100644 --- a/internal/executor/utils.go +++ b/internal/executor/utils.go @@ -44,7 +44,8 @@ func ExecuteLogAction(ctx context.Context, logAction *config_loader.LogAction, e // Render the message template message, err := renderTemplate(logAction.Message, execCtx.Params) if err != nil { - log.Errorf(ctx, "failed to render log message: %v", err) + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "failed to render log message") return } @@ -137,7 +138,14 @@ func ExecuteAPICall(ctx context.Context, apiCall *config_loader.APICall, execCtx resp, err = apiClient.Post(ctx, url, body, opts...) // Log body on failure for debugging if err != nil || (resp != nil && !resp.IsSuccess()) { - log.Errorf(ctx, "POST %s failed, request body: %s", url, string(body)) + var logErr error + if err != nil { + logErr = err + } else { + logErr = fmt.Errorf("POST %s returned non-success status: %d", url, resp.StatusCode) + } + errCtx := logger.WithErrorField(ctx, logErr) + log.Error(errCtx, "Request failed") } case http.MethodPut: body := []byte(apiCall.Body) diff --git a/internal/hyperfleet_api/client.go b/internal/hyperfleet_api/client.go index 0a68364..705ff54 100644 --- a/internal/hyperfleet_api/client.go +++ b/internal/hyperfleet_api/client.go @@ -15,6 +15,9 @@ import ( apierrors "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/errors" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" ) // Environment variables for API configuration @@ -265,6 +268,22 @@ func (c *httpClient) resolveURL(url string) string { // doRequest performs a single HTTP request without retry logic func (c *httpClient) doRequest(ctx context.Context, req *Request) (*Response, error) { + // Resolve URL (prepend base URL if relative) + resolvedURL := c.resolveURL(req.URL) + + // Create child span for this HTTP request (new span_id, same trace_id as parent event) + // Use method-only span name to avoid high cardinality; URL is in attributes + spanName := fmt.Sprintf("HTTP %s", req.Method) + ctx, span := otel.Tracer("http-client").Start(ctx, spanName) + span.SetAttributes( + attribute.String("http.request.method", req.Method), + attribute.String("url.full", resolvedURL), + ) + defer span.End() + + // Update logger context with new span_id for this request + ctx = logger.WithOTelTraceContext(ctx) + // Determine timeout timeout := c.config.Timeout if req.Timeout > 0 { @@ -275,9 +294,6 @@ func (c *httpClient) doRequest(ctx context.Context, req *Request) (*Response, er reqCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // Resolve URL (prepend base URL if relative) - resolvedURL := c.resolveURL(req.URL) - // Create HTTP request var body io.Reader if len(req.Body) > 0 { @@ -304,6 +320,10 @@ func (c *httpClient) doRequest(ctx context.Context, req *Request) (*Response, er httpReq.Header.Set("Content-Type", "application/json") } + // Inject OpenTelemetry trace context into headers (W3C Trace Context format) + // This propagates trace_id and span_id via the 'traceparent' header + otel.GetTextMapPropagator().Inject(reqCtx, propagation.HeaderCarrier(httpReq.Header)) + // Execute request c.log.Debugf(ctx, "HyperFleet API request: %s %s", req.Method, req.URL) httpResp, err := c.client.Do(httpReq) diff --git a/pkg/logger/context.go b/pkg/logger/context.go index b0298a0..5ab8721 100644 --- a/pkg/logger/context.go +++ b/pkg/logger/context.go @@ -3,22 +3,39 @@ package logger import ( "context" "strings" + + "go.opentelemetry.io/otel/trace" ) // contextKey is a custom type for context keys to avoid collisions type contextKey string const ( + // Required fields (per logging spec) + ComponentKey contextKey = "component" + VersionKey contextKey = "version" + HostnameKey contextKey = "hostname" + + // Error fields (per logging spec) + ErrorKey contextKey = "error" + StackTraceKey contextKey = "stack_trace" + // Correlation fields (distributed tracing) TraceIDKey contextKey = "trace_id" SpanIDKey contextKey = "span_id" EventIDKey contextKey = "event_id" - // Resource fields + // Resource fields (from event data) ClusterIDKey contextKey = "cluster_id" ResourceTypeKey contextKey = "resource_type" ResourceIDKey contextKey = "resource_id" + // K8s manifest fields + K8sKindKey contextKey = "k8s_kind" + K8sNameKey contextKey = "k8s_name" + K8sNamespaceKey contextKey = "k8s_namespace" + K8sResultKey contextKey = "k8s_result" + // Adapter-specific fields AdapterKey contextKey = "adapter" ObservedGenerationKey contextKey = "observed_generation" @@ -86,16 +103,36 @@ func WithClusterID(ctx context.Context, clusterID string) context.Context { return WithLogField(ctx, string(ClusterIDKey), clusterID) } -// WithResourceType returns a context with the resource type set +// WithResourceType returns a context with the event resource type set (e.g., "cluster", "nodepool") func WithResourceType(ctx context.Context, resourceType string) context.Context { return WithLogField(ctx, string(ResourceTypeKey), resourceType) } -// WithResourceID returns a context with the resource ID set +// WithResourceID returns a context with the event resource ID set func WithResourceID(ctx context.Context, resourceID string) context.Context { return WithLogField(ctx, string(ResourceIDKey), resourceID) } +// WithK8sKind returns a context with the K8s resource kind set (e.g., "Deployment", "Job") +func WithK8sKind(ctx context.Context, kind string) context.Context { + return WithLogField(ctx, string(K8sKindKey), kind) +} + +// WithK8sName returns a context with the K8s resource name set +func WithK8sName(ctx context.Context, name string) context.Context { + return WithLogField(ctx, string(K8sNameKey), name) +} + +// WithK8sNamespace returns a context with the K8s resource namespace set +func WithK8sNamespace(ctx context.Context, namespace string) context.Context { + return WithLogField(ctx, string(K8sNamespaceKey), namespace) +} + +// WithK8sResult returns a context with the K8s resource operation result set (SUCCESS/FAILED) +func WithK8sResult(ctx context.Context, result string) context.Context { + return WithLogField(ctx, string(K8sResultKey), result) +} + // WithAdapter returns a context with the adapter name set func WithAdapter(ctx context.Context, adapter string) context.Context { return WithLogField(ctx, string(AdapterKey), adapter) @@ -111,6 +148,65 @@ func WithSubscription(ctx context.Context, subscription string) context.Context return WithLogField(ctx, string(SubscriptionKey), subscription) } +// WithErrorField returns a context with the error message set. +// Stack traces are captured only for unexpected/internal errors to avoid +// performance overhead under high event load. Expected operational errors +// (network issues, not found, auth failures) skip stack trace capture. +// If err is nil, returns the context unchanged. +func WithErrorField(ctx context.Context, err error) context.Context { + if err == nil { + return ctx + } + ctx = WithLogField(ctx, string(ErrorKey), err.Error()) + + // Only capture stack trace for unexpected/internal errors + if shouldCaptureStackTrace(err) { + ctx = withStackTraceField(ctx, CaptureStackTrace(1)) + } + + return ctx +} + +// WithOTelTraceContext extracts OpenTelemetry trace context (trace_id, span_id) +// from the context and adds them as log fields for distributed tracing correlation. +// If no active span exists, returns the context unchanged. +// +// This function is safe to call multiple times (e.g., once per span creation). +// Since Go contexts are immutable, each call returns a new context with the +// current span's IDs. The parent function's context remains unchanged, so logs +// after a child span completes will correctly use the parent's span_id. +// +// Example flow: +// +// func Parent(ctx context.Context) { +// ctx, span := tracer.Start(ctx, "Parent") +// ctx = logger.WithOTelTraceContext(ctx) // span_id=A +// Child(ctx) // Child logs use span_id=B +// log.Info(ctx, "Back in parent") // Still uses span_id=A +// } +// +// This will produce logs with trace_id and span_id fields: +// +// {"message":"...","trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7",...} +func WithOTelTraceContext(ctx context.Context) context.Context { + spanCtx := trace.SpanContextFromContext(ctx) + if !spanCtx.IsValid() { + return ctx + } + + // Add trace_id if valid + if spanCtx.HasTraceID() { + ctx = WithLogField(ctx, string(TraceIDKey), spanCtx.TraceID().String()) + } + + // Add span_id if valid + if spanCtx.HasSpanID() { + ctx = WithLogField(ctx, string(SpanIDKey), spanCtx.SpanID().String()) + } + + return ctx +} + // ----------------------------------------------------------------------------- // Context Getters // ----------------------------------------------------------------------------- diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index cadf37b..577cc94 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -6,7 +6,6 @@ import ( "io" "log/slog" "os" - "runtime" "strings" ) @@ -36,8 +35,6 @@ type Logger interface { With(key string, value interface{}) Logger // WithFields returns a new logger with multiple additional fields WithFields(fields map[string]interface{}) Logger - // WithError returns a new logger with error field (no-op if err is nil) - WithError(err error) Logger // Without returns a new logger with the specified field removed Without(key string) Logger } @@ -146,11 +143,11 @@ func NewLogger(cfg Config) (Logger, error) { hostname = "unknown" } - // Create base logger with required fields + // Create base logger with required fields (per logging spec) slogLogger := slog.New(handler).With( - "component", cfg.Component, - "version", cfg.Version, - "hostname", hostname, + string(ComponentKey), cfg.Component, + string(VersionKey), cfg.Version, + string(HostnameKey), hostname, ) return &logger{ @@ -283,28 +280,6 @@ func (l *logger) WithFields(fields map[string]interface{}) Logger { } } -// WithError returns a new logger with the error field set. -// If err is nil, returns the same logger instance (no-op) to avoid -// unnecessary allocations. This allows safe usage like: -// -// log.WithError(maybeNilErr).Info("message") -// -// To remove an existing error field, use Without("error"). -func (l *logger) WithError(err error) Logger { - if err == nil { - return l - } - newFields := copyFields(l.fields) - newFields["error"] = err.Error() - return &logger{ - slog: l.slog, - fields: newFields, - component: l.component, - version: l.version, - hostname: l.hostname, - } -} - // Without returns a new logger with the specified field removed. // If the field doesn't exist, returns a new logger with the same fields. func (l *logger) Without(key string) Logger { @@ -318,20 +293,3 @@ func (l *logger) Without(key string) Logger { hostname: l.hostname, } } - -// GetStackTrace returns the current stack trace as a slice of strings -func GetStackTrace(skip int) []string { - var pcs [32]uintptr - n := runtime.Callers(skip+2, pcs[:]) - frames := runtime.CallersFrames(pcs[:n]) - - var stack []string - for { - frame, more := frames.Next() - stack = append(stack, fmt.Sprintf("%s() %s:%d", frame.Function, frame.File, frame.Line)) - if !more { - break - } - } - return stack -} diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index b82e4f3..3764b81 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -145,35 +145,6 @@ func TestLoggerWithFields(t *testing.T) { } } -func TestLoggerWithError(t *testing.T) { - log, err := NewLogger(DefaultConfig()) - if err != nil { - t.Fatalf("NewLogger returned error: %v", err) - } - - t.Run("with_error", func(t *testing.T) { - err := &testError{msg: "test error message"} - result := log.WithError(err) - - impl, ok := result.(*logger) - if !ok { - t.Fatal("WithError() didn't return *logger type") - } - - if impl.fields["error"] != "test error message" { - t.Errorf("Expected error field, got %v", impl.fields["error"]) - } - }) - - t.Run("with_nil_error", func(t *testing.T) { - result := log.WithError(nil) - // Should return same logger when error is nil - if result != log { - t.Error("WithError(nil) should return same logger") - } - }) -} - type testError struct { msg string } @@ -296,7 +267,7 @@ func TestLoggerChaining(t *testing.T) { log.WithFields(map[string]interface{}{"a": 1}).With("b", 2).Info(ctx, "Test mixed chaining") }) - t.Run("chain_WithError", func(t *testing.T) { + t.Run("chain_WithErrorField", func(t *testing.T) { defer func() { if r := recover(); r != nil { t.Errorf("Chaining panicked: %v", r) @@ -304,7 +275,8 @@ func TestLoggerChaining(t *testing.T) { }() err := &testError{msg: "test error"} - log.WithError(err).With("extra", "info").Error(ctx, "Error with context") + ctx := WithErrorField(ctx, err) + log.With("extra", "info").Error(ctx, "Error with context") }) } diff --git a/pkg/logger/stack_trace.go b/pkg/logger/stack_trace.go new file mode 100644 index 0000000..2e685bb --- /dev/null +++ b/pkg/logger/stack_trace.go @@ -0,0 +1,134 @@ +package logger + +import ( + "context" + "errors" + "fmt" + "io" + "runtime" + + apperrors "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +// ----------------------------------------------------------------------------- +// Stack Trace Capture +// ----------------------------------------------------------------------------- + +// skipStackTraceCheckers is a list of functions that check if an error should skip stack trace capture. +// Each checker returns true if the error is an expected operational error. +// Add new error types here to extend the blocklist. +var skipStackTraceCheckers = []func(error) bool{ + // Context errors (expected in graceful shutdown) + func(err error) bool { return errors.Is(err, context.Canceled) }, + func(err error) bool { return errors.Is(err, context.DeadlineExceeded) }, + func(err error) bool { return errors.Is(err, io.EOF) }, + + // Network/transient errors (expected in distributed systems) + apperrors.IsNetworkError, + + // HyperFleet API errors (HTTP 4xx/5xx responses) + isExpectedAPIError, + + // K8s resource data errors + isK8sResourceDataError, + + // K8s API errors + apierrors.IsNotFound, + apierrors.IsConflict, + apierrors.IsAlreadyExists, + apierrors.IsForbidden, + apierrors.IsUnauthorized, + apierrors.IsInvalid, + apierrors.IsBadRequest, + apierrors.IsGone, + apierrors.IsResourceExpired, + apierrors.IsServiceUnavailable, + apierrors.IsTimeout, + apierrors.IsTooManyRequests, +} + +// isExpectedAPIError checks if the error is an expected HyperFleet API error +func isExpectedAPIError(err error) bool { + apiErr, ok := apperrors.IsAPIError(err) + if !ok { + return false + } + return apiErr.IsNotFound() || + apiErr.IsUnauthorized() || + apiErr.IsForbidden() || + apiErr.IsBadRequest() || + apiErr.IsConflict() || + apiErr.IsRateLimited() || + apiErr.IsTimeout() || + apiErr.IsServerError() +} + +// isK8sResourceDataError checks if the error is an expected K8s resource data error +func isK8sResourceDataError(err error) bool { + var k8sKeyNotFound *apperrors.K8sResourceKeyNotFoundError + if errors.As(err, &k8sKeyNotFound) { + return true + } + var k8sInvalidPath *apperrors.K8sInvalidPathError + if errors.As(err, &k8sInvalidPath) { + return true + } + var k8sDataErr *apperrors.K8sResourceDataError + return errors.As(err, &k8sDataErr) +} + +// shouldCaptureStackTrace determines if a stack trace should be captured for the given error. +// Returns false for expected operational errors (high frequency, known causes) to avoid +// performance overhead during error storms. Returns true for unexpected errors that +// indicate bugs or require investigation. +func shouldCaptureStackTrace(err error) bool { + if err == nil { + return false + } + + // Check all blocklist conditions + for _, check := range skipStackTraceCheckers { + if check(err) { + return false + } + } + + // Capture stack trace for unexpected/internal errors + return true +} + +// withStackTraceField returns a context with the stack trace set. +// If frames is nil or empty, returns the context unchanged. +func withStackTraceField(ctx context.Context, frames []string) context.Context { + if len(frames) == 0 { + return ctx + } + return WithLogField(ctx, string(StackTraceKey), frames) +} + +// CaptureStackTrace captures the current call stack and returns it as a slice of strings. +// Each string contains the file path, line number, and function name. +// The skip parameter specifies how many stack frames to skip: +// - skip=0 starts from the caller of CaptureStackTrace +// - skip=1 skips one additional level, etc. +func CaptureStackTrace(skip int) []string { + const maxFrames = 32 + pcs := make([]uintptr, maxFrames) + // +2 to skip runtime.Callers and CaptureStackTrace itself + n := runtime.Callers(skip+2, pcs) + if n == 0 { + return nil + } + + frames := runtime.CallersFrames(pcs[:n]) + var stack []string + for { + frame, more := frames.Next() + stack = append(stack, fmt.Sprintf("%s:%d %s", frame.File, frame.Line, frame.Function)) + if !more { + break + } + } + return stack +} diff --git a/pkg/logger/with_error_field_test.go b/pkg/logger/with_error_field_test.go new file mode 100644 index 0000000..d9d83fc --- /dev/null +++ b/pkg/logger/with_error_field_test.go @@ -0,0 +1,472 @@ +package logger + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + "testing" + + apperrors "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// ----------------------------------------------------------------------------- +// WithErrorField Tests +// ----------------------------------------------------------------------------- + +func TestWithErrorField(t *testing.T) { + t.Run("nil_error_returns_unchanged_context", func(t *testing.T) { + ctx := context.Background() + result := WithErrorField(ctx, nil) + + fields := GetLogFields(result) + if fields != nil && fields["error"] != nil { + t.Error("Expected no error field for nil error") + } + }) + + t.Run("sets_error_message_in_context", func(t *testing.T) { + ctx := context.Background() + err := errors.New("test error message") + result := WithErrorField(ctx, err) + + fields := GetLogFields(result) + if fields == nil { + t.Fatal("Expected log fields, got nil") + } + if fields["error"] != "test error message" { + t.Errorf("Expected 'test error message', got %v", fields["error"]) + } + }) + + t.Run("captures_stack_trace_for_unexpected_error", func(t *testing.T) { + ctx := context.Background() + err := errors.New("unexpected internal error") + result := WithErrorField(ctx, err) + + fields := GetLogFields(result) + if fields == nil { + t.Fatal("Expected log fields, got nil") + } + + stackTrace, ok := fields["stack_trace"].([]string) + if !ok || len(stackTrace) == 0 { + t.Error("Expected stack_trace to be captured for unexpected error") + } + }) + + t.Run("preserves_existing_context_fields", func(t *testing.T) { + ctx := context.Background() + ctx = WithEventID(ctx, "evt-123") + ctx = WithClusterID(ctx, "cls-456") + + err := errors.New("test error") + result := WithErrorField(ctx, err) + + fields := GetLogFields(result) + if fields["event_id"] != "evt-123" { + t.Errorf("Expected event_id=evt-123, got %v", fields["event_id"]) + } + if fields["cluster_id"] != "cls-456" { + t.Errorf("Expected cluster_id=cls-456, got %v", fields["cluster_id"]) + } + if fields["error"] != "test error" { + t.Errorf("Expected error='test error', got %v", fields["error"]) + } + }) +} + +// ----------------------------------------------------------------------------- +// Stack Trace Capture Decision Tests +// ----------------------------------------------------------------------------- + +func TestShouldCaptureStackTrace_ContextErrors(t *testing.T) { + tests := []struct { + name string + err error + expectCapture bool + }{ + { + name: "context.Canceled_skips_stack_trace", + err: context.Canceled, + expectCapture: false, + }, + { + name: "context.DeadlineExceeded_skips_stack_trace", + err: context.DeadlineExceeded, + expectCapture: false, + }, + { + name: "wrapped_context.Canceled_skips_stack_trace", + err: fmt.Errorf("operation failed: %w", context.Canceled), + expectCapture: false, + }, + { + name: "io.EOF_skips_stack_trace", + err: io.EOF, + expectCapture: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := shouldCaptureStackTrace(tt.err) + if result != tt.expectCapture { + t.Errorf("shouldCaptureStackTrace() = %v, want %v", result, tt.expectCapture) + } + }) + } +} + +func TestShouldCaptureStackTrace_K8sAPIErrors(t *testing.T) { + tests := []struct { + name string + err error + expectCapture bool + }{ + { + name: "NotFound_skips_stack_trace", + err: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "my-pod"), + expectCapture: false, + }, + { + name: "Conflict_skips_stack_trace", + err: apierrors.NewConflict(schema.GroupResource{Group: "", Resource: "pods"}, "my-pod", errors.New("conflict")), + expectCapture: false, + }, + { + name: "AlreadyExists_skips_stack_trace", + err: apierrors.NewAlreadyExists(schema.GroupResource{Group: "", Resource: "pods"}, "my-pod"), + expectCapture: false, + }, + { + name: "Forbidden_skips_stack_trace", + err: apierrors.NewForbidden(schema.GroupResource{Group: "", Resource: "pods"}, "my-pod", errors.New("forbidden")), + expectCapture: false, + }, + { + name: "Unauthorized_skips_stack_trace", + err: apierrors.NewUnauthorized("unauthorized"), + expectCapture: false, + }, + { + name: "BadRequest_skips_stack_trace", + err: apierrors.NewBadRequest("bad request"), + expectCapture: false, + }, + { + name: "ServiceUnavailable_skips_stack_trace", + err: apierrors.NewServiceUnavailable("service unavailable"), + expectCapture: false, + }, + { + name: "Timeout_skips_stack_trace", + err: apierrors.NewTimeoutError("timeout", 30), + expectCapture: false, + }, + { + name: "TooManyRequests_skips_stack_trace", + err: apierrors.NewTooManyRequestsError("too many requests"), + expectCapture: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := shouldCaptureStackTrace(tt.err) + if result != tt.expectCapture { + t.Errorf("shouldCaptureStackTrace() = %v, want %v", result, tt.expectCapture) + } + }) + } +} + +func TestShouldCaptureStackTrace_K8sResourceDataErrors(t *testing.T) { + tests := []struct { + name string + err error + expectCapture bool + }{ + { + name: "K8sResourceKeyNotFoundError_skips_stack_trace", + err: apperrors.NewK8sResourceKeyNotFoundError("Secret", "default", "my-secret", "password"), + expectCapture: false, + }, + { + name: "K8sInvalidPathError_skips_stack_trace", + err: apperrors.NewK8sInvalidPathError("Secret", "invalid/path", "namespace.name.key"), + expectCapture: false, + }, + { + name: "K8sResourceDataError_skips_stack_trace", + err: apperrors.NewK8sResourceDataError("ConfigMap", "default", "my-config", "data field missing", nil), + expectCapture: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := shouldCaptureStackTrace(tt.err) + if result != tt.expectCapture { + t.Errorf("shouldCaptureStackTrace() = %v, want %v", result, tt.expectCapture) + } + }) + } +} + +func TestShouldCaptureStackTrace_APIErrors(t *testing.T) { + tests := []struct { + name string + err error + expectCapture bool + }{ + { + name: "APIError_NotFound_skips_stack_trace", + err: apperrors.NewAPIError("GET", "/api/v1/clusters/123", 404, "Not Found", nil, 1, 0, errors.New("not found")), + expectCapture: false, + }, + { + name: "APIError_Unauthorized_skips_stack_trace", + err: apperrors.NewAPIError("GET", "/api/v1/clusters", 401, "Unauthorized", nil, 1, 0, errors.New("unauthorized")), + expectCapture: false, + }, + { + name: "APIError_Forbidden_skips_stack_trace", + err: apperrors.NewAPIError("POST", "/api/v1/clusters", 403, "Forbidden", nil, 1, 0, errors.New("forbidden")), + expectCapture: false, + }, + { + name: "APIError_BadRequest_skips_stack_trace", + err: apperrors.NewAPIError("POST", "/api/v1/clusters", 400, "Bad Request", nil, 1, 0, errors.New("bad request")), + expectCapture: false, + }, + { + name: "APIError_Conflict_skips_stack_trace", + err: apperrors.NewAPIError("PUT", "/api/v1/clusters/123", 409, "Conflict", nil, 1, 0, errors.New("conflict")), + expectCapture: false, + }, + { + name: "APIError_RateLimited_skips_stack_trace", + err: apperrors.NewAPIError("GET", "/api/v1/clusters", 429, "Too Many Requests", nil, 1, 0, errors.New("rate limited")), + expectCapture: false, + }, + { + name: "APIError_Timeout_skips_stack_trace", + err: apperrors.NewAPIError("GET", "/api/v1/clusters", 408, "Request Timeout", nil, 1, 0, errors.New("timeout")), + expectCapture: false, + }, + { + name: "APIError_ServerError_skips_stack_trace", + err: apperrors.NewAPIError("GET", "/api/v1/clusters", 503, "Service Unavailable", nil, 3, 0, errors.New("server error")), + expectCapture: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := shouldCaptureStackTrace(tt.err) + if result != tt.expectCapture { + t.Errorf("shouldCaptureStackTrace() = %v, want %v", result, tt.expectCapture) + } + }) + } +} + +func TestShouldCaptureStackTrace_UnexpectedErrors(t *testing.T) { + tests := []struct { + name string + err error + expectCapture bool + }{ + { + name: "generic_error_captures_stack_trace", + err: errors.New("unexpected error"), + expectCapture: true, + }, + { + name: "wrapped_generic_error_captures_stack_trace", + err: fmt.Errorf("failed to process: %w", errors.New("internal error")), + expectCapture: true, + }, + { + name: "nil_error_does_not_capture", + err: nil, + expectCapture: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := shouldCaptureStackTrace(tt.err) + if result != tt.expectCapture { + t.Errorf("shouldCaptureStackTrace() = %v, want %v", result, tt.expectCapture) + } + }) + } +} + +// ----------------------------------------------------------------------------- +// CaptureStackTrace Tests +// ----------------------------------------------------------------------------- + +func TestCaptureStackTrace(t *testing.T) { + t.Run("captures_call_stack", func(t *testing.T) { + stack := CaptureStackTrace(0) + if len(stack) == 0 { + t.Fatal("Expected non-empty stack trace") + } + + // First frame should be from this test file + found := false + for _, frame := range stack { + if strings.Contains(frame, "with_error_field_test.go") { + found = true + break + } + } + if !found { + t.Error("Expected stack trace to contain with_error_field_test.go") + } + }) + + t.Run("skip_parameter_works", func(t *testing.T) { + stack0 := CaptureStackTrace(0) + stack1 := CaptureStackTrace(1) + + // Verify skip behavior: stack1 should either have fewer frames OR + // (when maxFrames cap is reached) be equal to stack0 with the first frame removed. + // This handles the case where CaptureStackTrace hits maxFrames limit. + if len(stack1) < len(stack0) { + // Normal case: skip=1 results in fewer frames + return + } + + if len(stack1) == len(stack0) && len(stack0) > 0 { + // maxFrames cap reached: verify stack1 equals stack0[1:] + // (the first frame was skipped, but a new frame was captured at the end) + for i := 0; i < len(stack1)-1; i++ { + if stack1[i] != stack0[i+1] { + t.Errorf("Expected stack1[%d] to equal stack0[%d], got %q vs %q", + i, i+1, stack1[i], stack0[i+1]) + } + } + return + } + + t.Errorf("Expected skip=1 to result in fewer frames or shifted stack, got len(stack0)=%d, len(stack1)=%d", + len(stack0), len(stack1)) + }) + + t.Run("frames_contain_file_line_function", func(t *testing.T) { + stack := CaptureStackTrace(0) + if len(stack) == 0 { + t.Fatal("Expected non-empty stack trace") + } + + // Each frame should have format "file:line function" + for _, frame := range stack { + if !strings.Contains(frame, ":") { + t.Errorf("Frame missing colon separator: %s", frame) + } + if !strings.Contains(frame, " ") { + t.Errorf("Frame missing space separator: %s", frame) + } + } + }) +} + +// ----------------------------------------------------------------------------- +// Integration Tests +// ----------------------------------------------------------------------------- + +func TestWithErrorField_StackTraceIntegration(t *testing.T) { + t.Run("unexpected_error_has_stack_trace", func(t *testing.T) { + ctx := context.Background() + err := errors.New("unexpected internal error") + result := WithErrorField(ctx, err) + + fields := GetLogFields(result) + if fields == nil { + t.Fatal("Expected log fields") + } + + // Should have error field + if fields["error"] != "unexpected internal error" { + t.Errorf("Expected error message, got %v", fields["error"]) + } + + // Should have stack_trace field + stackTrace, ok := fields["stack_trace"].([]string) + if !ok { + t.Fatal("Expected stack_trace to be []string") + } + if len(stackTrace) == 0 { + t.Error("Expected non-empty stack trace") + } + }) + + t.Run("k8s_not_found_error_no_stack_trace", func(t *testing.T) { + ctx := context.Background() + err := apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "pods"}, "my-pod") + result := WithErrorField(ctx, err) + + fields := GetLogFields(result) + if fields == nil { + t.Fatal("Expected log fields") + } + + // Should have error field + if fields["error"] == nil { + t.Error("Expected error field") + } + + // Should NOT have stack_trace field + if fields["stack_trace"] != nil { + t.Error("Expected no stack_trace for K8s NotFound error") + } + }) + + t.Run("context_canceled_no_stack_trace", func(t *testing.T) { + ctx := context.Background() + result := WithErrorField(ctx, context.Canceled) + + fields := GetLogFields(result) + if fields == nil { + t.Fatal("Expected log fields") + } + + // Should have error field + if fields["error"] == nil { + t.Error("Expected error field") + } + + // Should NOT have stack_trace field + if fields["stack_trace"] != nil { + t.Error("Expected no stack_trace for context.Canceled") + } + }) + + t.Run("api_error_server_error_no_stack_trace", func(t *testing.T) { + ctx := context.Background() + err := apperrors.NewAPIError("GET", "/api/v1/clusters", 500, "Internal Server Error", nil, 1, 0, errors.New("server error")) + result := WithErrorField(ctx, err) + + fields := GetLogFields(result) + if fields == nil { + t.Fatal("Expected log fields") + } + + // Should have error field + if fields["error"] == nil { + t.Error("Expected error field") + } + + // Should NOT have stack_trace field (server errors are expected) + if fields["stack_trace"] != nil { + t.Error("Expected no stack_trace for API server error") + } + }) +} diff --git a/pkg/otel/propagation.go b/pkg/otel/propagation.go new file mode 100644 index 0000000..a81def8 --- /dev/null +++ b/pkg/otel/propagation.go @@ -0,0 +1,57 @@ +package otel + +import ( + "context" + + "github.com/cloudevents/sdk-go/v2/event" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" +) + +// ExtractTraceContextFromCloudEvent extracts W3C trace context from CloudEvent extensions. +// CloudEvents can carry traceparent and tracestate as extension attributes, +// allowing distributed tracing across event-driven systems. +// +// If trace context is present, the returned context will have the parent span +// information, making any subsequent spans children of the upstream trace. +// If no trace context is found, the original context is returned unchanged, +// and any new spans will be root spans. +// +// Example CloudEvent with trace context: +// +// { +// "specversion": "1.0", +// "type": "cluster.created", +// "source": "/sentinel", +// "id": "abc-123", +// "traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", +// "tracestate": "vendor=value" +// } +func ExtractTraceContextFromCloudEvent(ctx context.Context, evt *event.Event) context.Context { + if evt == nil { + return ctx + } + + extensions := evt.Extensions() + if extensions == nil { + return ctx + } + + carrier := propagation.MapCarrier{} + + // Extract traceparent (required for W3C Trace Context) + if traceparent, ok := extensions["traceparent"].(string); ok && traceparent != "" { + carrier["traceparent"] = traceparent + } else { + // No traceparent means no trace context to extract + return ctx + } + + // Extract tracestate (optional, carries vendor-specific trace data) + if tracestate, ok := extensions["tracestate"].(string); ok && tracestate != "" { + carrier["tracestate"] = tracestate + } + + // Use the global propagator to extract trace context into the context + return otel.GetTextMapPropagator().Extract(ctx, carrier) +} diff --git a/pkg/otel/propagation_test.go b/pkg/otel/propagation_test.go new file mode 100644 index 0000000..09cf94a --- /dev/null +++ b/pkg/otel/propagation_test.go @@ -0,0 +1,241 @@ +package otel + +import ( + "context" + "testing" + + "github.com/cloudevents/sdk-go/v2/event" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +func init() { + // Ensure the global propagator is set for tests + otel.SetTextMapPropagator(propagation.TraceContext{}) +} + +func TestExtractTraceContextFromCloudEvent(t *testing.T) { + // Valid W3C trace context values for testing + // Format: version-traceid-parentid-flags + const ( + validTraceID = "0af7651916cd43dd8448eb211c80319c" + validSpanID = "b7ad6b7169203331" + validTraceparent = "00-" + validTraceID + "-" + validSpanID + "-01" + validTracestate = "vendor1=value1,vendor2=value2" + ) + + t.Run("nil_event_returns_unchanged_context", func(t *testing.T) { + ctx := context.Background() + result := ExtractTraceContextFromCloudEvent(ctx, nil) + + // Context should be unchanged + if result != ctx { + t.Error("Expected context to be unchanged for nil event") + } + + // No span context should be present + spanCtx := trace.SpanContextFromContext(result) + if spanCtx.IsValid() { + t.Error("Expected no valid span context for nil event") + } + }) + + t.Run("event_without_extensions_returns_unchanged_context", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // No span context should be present + spanCtx := trace.SpanContextFromContext(result) + if spanCtx.IsValid() { + t.Error("Expected no valid span context for event without extensions") + } + }) + + t.Run("event_with_empty_traceparent_returns_unchanged_context", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + evt.SetExtension("traceparent", "") + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // No span context should be present + spanCtx := trace.SpanContextFromContext(result) + if spanCtx.IsValid() { + t.Error("Expected no valid span context for empty traceparent") + } + }) + + t.Run("event_with_valid_traceparent_extracts_trace_context", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + evt.SetExtension("traceparent", validTraceparent) + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // Span context should be present and valid + spanCtx := trace.SpanContextFromContext(result) + if !spanCtx.IsValid() { + t.Fatal("Expected valid span context") + } + + // Verify trace ID + if spanCtx.TraceID().String() != validTraceID { + t.Errorf("Expected trace ID %s, got %s", validTraceID, spanCtx.TraceID().String()) + } + + // Verify span ID (parent span ID from traceparent) + if spanCtx.SpanID().String() != validSpanID { + t.Errorf("Expected span ID %s, got %s", validSpanID, spanCtx.SpanID().String()) + } + + // Verify sampled flag (01 means sampled) + if !spanCtx.IsSampled() { + t.Error("Expected span context to be sampled") + } + }) + + t.Run("event_with_traceparent_and_tracestate_extracts_both", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + evt.SetExtension("traceparent", validTraceparent) + evt.SetExtension("tracestate", validTracestate) + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // Span context should be present and valid + spanCtx := trace.SpanContextFromContext(result) + if !spanCtx.IsValid() { + t.Fatal("Expected valid span context") + } + + // Verify trace ID + if spanCtx.TraceID().String() != validTraceID { + t.Errorf("Expected trace ID %s, got %s", validTraceID, spanCtx.TraceID().String()) + } + + // Verify tracestate is preserved + traceState := spanCtx.TraceState() + if traceState.Len() == 0 { + t.Error("Expected tracestate to be preserved") + } + + // Verify vendor1 value + if val := traceState.Get("vendor1"); val != "value1" { + t.Errorf("Expected tracestate vendor1=value1, got vendor1=%s", val) + } + }) + + t.Run("event_with_invalid_traceparent_handles_gracefully", func(t *testing.T) { + testCases := []struct { + name string + traceparent string + }{ + {"malformed_format", "not-a-valid-traceparent"}, + {"wrong_version", "ff-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"}, + {"short_trace_id", "00-0af7651916cd43dd-b7ad6b7169203331-01"}, + {"short_span_id", "00-0af7651916cd43dd8448eb211c80319c-b7ad6b71-01"}, + {"missing_parts", "00-0af7651916cd43dd8448eb211c80319c"}, + {"all_zeros_trace_id", "00-00000000000000000000000000000000-b7ad6b7169203331-01"}, + {"all_zeros_span_id", "00-0af7651916cd43dd8448eb211c80319c-0000000000000000-01"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + evt.SetExtension("traceparent", tc.traceparent) + + // Should not panic + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // Invalid traceparent should result in invalid span context + spanCtx := trace.SpanContextFromContext(result) + if spanCtx.IsValid() { + t.Errorf("Expected invalid span context for malformed traceparent %q", tc.traceparent) + } + }) + } + }) + + t.Run("event_with_non_string_traceparent_returns_unchanged_context", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + // Set traceparent as non-string type + evt.SetExtension("traceparent", 12345) + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // No span context should be present + spanCtx := trace.SpanContextFromContext(result) + if spanCtx.IsValid() { + t.Error("Expected no valid span context for non-string traceparent") + } + }) + + t.Run("event_with_traceparent_only_no_tracestate", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + evt.SetExtension("traceparent", validTraceparent) + // No tracestate set + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + // Span context should still be valid + spanCtx := trace.SpanContextFromContext(result) + if !spanCtx.IsValid() { + t.Fatal("Expected valid span context") + } + + // Tracestate should be empty + if spanCtx.TraceState().Len() != 0 { + t.Error("Expected empty tracestate when not provided") + } + }) + + t.Run("unsampled_trace_context_is_extracted", func(t *testing.T) { + ctx := context.Background() + evt := event.New() + evt.SetID("test-id") + evt.SetType("test.type") + evt.SetSource("/test") + // flags=00 means not sampled + unsampledTraceparent := "00-" + validTraceID + "-" + validSpanID + "-00" + evt.SetExtension("traceparent", unsampledTraceparent) + + result := ExtractTraceContextFromCloudEvent(ctx, &evt) + + spanCtx := trace.SpanContextFromContext(result) + if !spanCtx.IsValid() { + t.Fatal("Expected valid span context") + } + + // Should NOT be sampled + if spanCtx.IsSampled() { + t.Error("Expected span context to NOT be sampled (flags=00)") + } + }) +} diff --git a/pkg/otel/tracer.go b/pkg/otel/tracer.go new file mode 100644 index 0000000..ee4ace2 --- /dev/null +++ b/pkg/otel/tracer.go @@ -0,0 +1,100 @@ +// Package otel provides OpenTelemetry tracing utilities for the hyperfleet-adapter. +package otel + +import ( + "context" + "fmt" + "os" + "strconv" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" +) + +// Tracing configuration constants +const ( + // EnvTraceSampleRatio is the environment variable for trace sampling ratio + EnvTraceSampleRatio = "TRACE_SAMPLE_RATIO" + + // DefaultTraceSampleRatio is the default trace sampling ratio (10% of traces) + // Can be overridden via TRACE_SAMPLE_RATIO env var + DefaultTraceSampleRatio = 0.1 +) + +// GetTraceSampleRatio reads the trace sample ratio from TRACE_SAMPLE_RATIO env var. +// Returns DefaultTraceSampleRatio (0.1 = 10%) if not set or invalid. +// Valid range is 0.0 to 1.0 where: +// - 0.0 = sample no traces (not recommended, use for debugging only) +// - 0.01 = sample 1% of traces (high volume systems) +// - 0.1 = sample 10% of traces (default, moderate volume) +// - 1.0 = sample all traces (development/debugging only) +func GetTraceSampleRatio(log logger.Logger, ctx context.Context) float64 { + ratioStr := os.Getenv(EnvTraceSampleRatio) + if ratioStr == "" { + log.Infof(ctx, "Using default trace sample ratio: %.2f (set %s to override)", DefaultTraceSampleRatio, EnvTraceSampleRatio) + return DefaultTraceSampleRatio + } + + ratio, err := strconv.ParseFloat(ratioStr, 64) + if err != nil { + log.Warnf(ctx, "Invalid %s value %q, using default %.2f: %v", EnvTraceSampleRatio, ratioStr, DefaultTraceSampleRatio, err) + return DefaultTraceSampleRatio + } + + if ratio < 0.0 || ratio > 1.0 { + log.Warnf(ctx, "Invalid %s value %.4f (must be 0.0-1.0), using default %.2f", EnvTraceSampleRatio, ratio, DefaultTraceSampleRatio) + return DefaultTraceSampleRatio + } + + log.Infof(ctx, "Trace sample ratio configured: %.4f (%.2f%% of traces will be sampled)", ratio, ratio*100) + return ratio +} + +// InitTracer initializes OpenTelemetry TracerProvider for generating trace_id and span_id. +// These IDs are used for: +// 1. Log correlation (via logger.WithOTelTraceContext) +// 2. HTTP request propagation (via W3C Trace Context headers) +// +// The sampler uses ParentBased(TraceIDRatioBased(sampleRatio)) which: +// - Respects the parent span's sampling decision when present (from traceparent header) +// - Applies probabilistic sampling for root spans based on sampleRatio +// This allows distributed tracing visibility while controlling observability costs. +func InitTracer(serviceName, serviceVersion string, sampleRatio float64) (*sdktrace.TracerProvider, error) { + // Create resource with service attributes. + // Note: We don't merge with resource.Default() to avoid schema URL conflicts + // between the SDK's bundled semconv version and our imported version. + res, err := resource.New( + context.Background(), + resource.WithAttributes( + semconv.ServiceName(serviceName), + semconv.ServiceVersion(serviceVersion), + ), + resource.WithProcessRuntimeDescription(), + resource.WithTelemetrySDK(), + resource.WithHost(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Use ParentBased sampler with TraceIDRatioBased for root spans: + // - If parent span exists: inherit parent's sampling decision + // - If no parent (root span): apply probabilistic sampling based on trace ID + // This enables proper sampling propagation across service boundaries + sampler := sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRatio)) + + tp := sdktrace.NewTracerProvider( + sdktrace.WithResource(res), + sdktrace.WithSampler(sampler), + ) + otel.SetTracerProvider(tp) + // TraceContext propagator handles W3C traceparent/tracestate headers + // ensuring sampling decisions propagate through message headers + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return tp, nil +}