Skip to content

Conversation

@me2seeks
Copy link

@me2seeks me2seeks commented Jan 20, 2026

  • Add header carrier and tests for NATS headers
  • Inject trace context on publish and extract on consume
  • Expose PublishMsg for full nats.Msg support

Greptile Summary

Adds OpenTelemetry trace propagation support for NATS JetStream, enabling distributed tracing across NATS message boundaries.

  • Implemented HeaderCarrier to bridge NATS headers with OTel's TextMapCarrier interface
  • Modified publisher to inject trace context into message headers via new PublishMsg and PublishWithHeaders methods
  • Updated consumer to extract trace context from headers and pass it to message handlers
  • Changed ConsumeHandler interface signature to accept context.Context (breaking change for existing implementations)
  • Updated example to demonstrate the new context-aware handler pattern
  • Follows the same implementation pattern as the existing Kafka queue (kq) for consistency

Confidence Score: 5/5

  • This PR is safe to merge with minimal risk
  • Clean implementation with comprehensive tests, follows established patterns from kq package, proper nil handling in HeaderCarrier, and includes updated example code
  • No files require special attention

Important Files Changed

Filename Overview
natsmq/internal/trace.go Added HeaderCarrier to propagate OTel trace context through NATS headers
natsmq/publisher/publisher.go Added trace injection via PublishMsg and PublishWithHeaders methods
natsmq/consumer/consumer.go Added trace extraction in ackMessage, passing context to consumer handlers
natsmq/consumer/config.go Updated ConsumeHandler interface to accept context for trace propagation

Sequence Diagram

sequenceDiagram
    participant Publisher
    participant OTel as OTel Propagator
    participant HeaderCarrier
    participant NATS
    participant Consumer
    participant Handler as ConsumeHandler

    Publisher->>HeaderCarrier: NewHeaderCarrier(&msg.Header)
    Publisher->>OTel: Inject(ctx, carrier)
    OTel->>HeaderCarrier: Set(key, value)
    HeaderCarrier->>NATS: msg.Header updated
    Publisher->>NATS: PublishMsg(ctx, msg)
    
    NATS->>Consumer: Message delivered
    Consumer->>HeaderCarrier: NewHeaderCarrier(&headers)
    Consumer->>OTel: Extract(ctx, carrier)
    OTel->>HeaderCarrier: Get(key)
    HeaderCarrier-->>OTel: trace context values
    OTel-->>Consumer: ctx with trace
    Consumer->>Consumer: contextx.ValueOnlyFrom(ctx)
    Consumer->>Handler: Consume(ctx, msg)
Loading

Introduce trace header carrier plus publish/consume helpers with context extraction.
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant