Skip to content

Conversation

@me2seeks
Copy link

@me2seeks me2seeks commented Jan 20, 2026

  • What: Replace hardcoded stream name with queue.StreamName when fetching JetStream stream.
  • Why: Ensure consumers use the stream specified by configuration instead of a fixed placeholder.
  • Impact: Fixes incorrect stream selection in JetStream mode; no behavior change for default NATS mode.

Greptile Summary

  • Fixes hardcoded stream name "ccc" in JetStream consumer by using the configured queue.StreamName value instead
  • Ensures JetStream consumers connect to the stream specified in configuration rather than a placeholder value
  • Makes the existing stream name validation meaningful since the configured value is now actually used

Important Files Changed

Filename Overview
natsq/consumer.go Fixed JetStream stream fetching to use configured stream name instead of hardcoded "ccc" placeholder

Confidence score: 5/5

  • This PR is safe to merge with minimal risk as it fixes a clear bug with a straightforward solution
  • Score reflects a simple one-line fix that replaces hardcoded placeholder value with proper configuration usage
  • No files require special attention beyond the single line change in the consumer implementation

Sequence Diagram

sequenceDiagram
    participant User
    participant ConsumerManager
    participant NATS
    participant JetStream
    participant Consumer
    
    User->>ConsumerManager: "MustNewConsumerManager(cfg, queues, mode)"
    ConsumerManager->>NATS: "Connect(serverUri, options)"
    NATS-->>ConsumerManager: "connection"
    ConsumerManager->>ConsumerManager: "registerQueue(queue)"
    
    User->>ConsumerManager: "Start()"
    ConsumerManager->>ConsumerManager: "subscribe(queue)"
    
    alt NatJetMode
        ConsumerManager->>JetStream: "New(conn)"
        JetStream-->>ConsumerManager: "jetstream instance"
        ConsumerManager->>JetStream: "Stream(ctx, queue.StreamName)"
        JetStream-->>ConsumerManager: "stream"
        ConsumerManager->>JetStream: "CreateOrUpdateConsumer(ctx, config)"
        JetStream-->>ConsumerManager: "consumer"
        ConsumerManager->>JetStream: "Consume(handleFunc, options)"
        JetStream->>Consumer: "HandleMessage(msg)"
        Consumer-->>JetStream: "result"
        JetStream->>JetStream: "Ack()"
    else NatDefaultMode
        ConsumerManager->>NATS: "QueueSubscribe(subject, queueName, handleFunc)"
        NATS->>Consumer: "HandleMessage(msg)"
        Consumer-->>NATS: "result"
        NATS->>NATS: "Ack()"
    end
    
    User->>ConsumerManager: "Stop()"
    ConsumerManager->>NATS: "Close()"
Loading

@greptile-apps
Copy link

greptile-apps bot commented Jan 20, 2026

Greptile found no issues!

From now on, if a review finishes and we haven't found any issues, we will not post anything, but you can confirm that we reviewed your changes in the status check section.

This feature can be toggled off in your Code Review Settings by deselecting "Create a status check for each PR".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant