diff --git a/common/event/reader.go b/common/event/reader.go index 0b348caf..f163e692 100644 --- a/common/event/reader.go +++ b/common/event/reader.go @@ -27,44 +27,42 @@ package event import ( "context" "fmt" + "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger/infologger" pb "github.com/AliceO2Group/Control/common/protos" "github.com/segmentio/kafka-go" "github.com/spf13/viper" "google.golang.org/protobuf/proto" - "sync" ) // Reader interface provides methods to read events. type Reader interface { + // Next should return the next event or cancel if the context is cancelled. Next(ctx context.Context) (*pb.Event, error) + // Last should return the last available event currently present on the topic (or nil if none) + // or cancel if the context is cancelled. + Last(ctx context.Context) (*pb.Event, error) Close() error } // DummyReader is an implementation of Reader that returns no events. type DummyReader struct{} -// Next returns the next event or nil if there are no more events. func (*DummyReader) Next(context.Context) (*pb.Event, error) { return nil, nil } - -// Close closes the DummyReader. -func (*DummyReader) Close() error { return nil } +func (*DummyReader) Last(context.Context) (*pb.Event, error) { return nil, nil } +func (*DummyReader) Close() error { return nil } // KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events. -// Consumption mode is chosen at creation time: -// - latestOnly=false: consume everything (from stored offsets or beginning depending on group state) -// - latestOnly=true: seek to latest offsets on start and only receive messages produced after start type KafkaReader struct { *kafka.Reader - mu sync.Mutex - topic string + topic string + brokers []string + groupID string } // NewReaderWithTopic creates a KafkaReader for the provided topic and starts it. -// If latestOnly is true the reader attempts to seek to the latest offsets on start so that -// only new messages (produced after creation) are consumed. -func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader { +func NewReaderWithTopic(topic topic.Topic, groupID string) *KafkaReader { cfg := kafka.ReaderConfig{ Brokers: viper.GetStringSlice("kafkaEndpoints"), Topic: string(topic), @@ -74,39 +72,72 @@ func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *Kaf } rk := &KafkaReader{ - Reader: kafka.NewReader(cfg), - topic: string(topic), - } - - if latestOnly { - // best-effort: set offset to last so we don't replay older messages - if err := rk.SetOffset(kafka.LastOffset); err != nil { - log.WithField(infologger.Level, infologger.IL_Devel). - Warnf("failed to set offset to last offset: %v", err) - } + Reader: kafka.NewReader(cfg), + topic: string(topic), + brokers: append([]string{}, cfg.Brokers...), + groupID: groupID, } - return rk } -// Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed -// (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx. +// Next blocks until the next event is available or ctx is cancelled. func (r *KafkaReader) Next(ctx context.Context) (*pb.Event, error) { if r == nil { return nil, fmt.Errorf("nil reader") } - msg, err := r.ReadMessage(ctx) if err != nil { return nil, err } + return kafkaMessageToEvent(msg) +} - event, err := kafkaMessageToEvent(msg) +// Last fetches the last available message on the topic (considering all partitions). +// If multiple partitions have data, the event with the greatest message timestamp is returned. +func (r *KafkaReader) Last(ctx context.Context) (*pb.Event, error) { + if r == nil { + return nil, fmt.Errorf("nil reader") + } + partitions, err := r.readPartitions() if err != nil { return nil, err } - - return event, nil + var latestEvt *pb.Event + var latestEvtTimeNs int64 + for _, p := range partitions { + if p.Topic != r.topic { + continue + } + first, last, err := r.readFirstAndLast(p.ID) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel).WithError(err). + Warnf("failed to read offsets for %s[%d]", r.topic, p.ID) + continue + } + if last <= first { + continue + } + msg, err := r.readAtOffset(ctx, p.ID, last-1) + if err != nil { + log.WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Warnf("failed to read last message for %s[%d] at offset %d", r.topic, p.ID, last-1) + continue + } + evt, err := kafkaMessageToEvent(msg) + if err != nil { + log.WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Warnf("failed to decode last message for %s[%d]", r.topic, p.ID) + continue + } + currentEvtTimeNs := msg.Time.UnixNano() + if latestEvt == nil || currentEvtTimeNs > latestEvtTimeNs { + latestEvt = evt + latestEvtTimeNs = currentEvtTimeNs + } + } + return latestEvt, nil } // Close stops the reader. @@ -114,13 +145,7 @@ func (r *KafkaReader) Close() error { if r == nil { return nil } - // Close the underlying kafka reader which will cause ReadMessage to return an error - err := r.Reader.Close() - if err != nil { - log.WithField(infologger.Level, infologger.IL_Devel). - Errorf("failed to close kafka reader: %v", err) - } - return err + return r.Reader.Close() } func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) { @@ -130,3 +155,55 @@ func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) { } return &evt, nil } + +func (r *KafkaReader) brokerAddr() (string, error) { + if len(r.brokers) == 0 { + return "", fmt.Errorf("no kafka brokers configured") + } + return r.brokers[0], nil +} + +func (r *KafkaReader) readPartitions() ([]kafka.Partition, error) { + addr, err := r.brokerAddr() + if err != nil { + return nil, err + } + conn, err := kafka.Dial("tcp", addr) + if err != nil { + return nil, err + } + defer conn.Close() + return conn.ReadPartitions(r.topic) +} + +func (r *KafkaReader) readFirstAndLast(partition int) (int64, int64, error) { + addr, err := r.brokerAddr() + if err != nil { + return 0, 0, err + } + conn, err := kafka.DialLeader(context.Background(), "tcp", addr, r.topic, partition) + if err != nil { + return 0, 0, err + } + defer conn.Close() + first, last, err := conn.ReadOffsets() + return first, last, err +} + +func (r *KafkaReader) readAtOffset(ctx context.Context, partition int, offset int64) (kafka.Message, error) { + if offset < 0 { + return kafka.Message{}, fmt.Errorf("invalid offset %d", offset) + } + kr := kafka.NewReader(kafka.ReaderConfig{ + Brokers: append([]string{}, r.brokers...), + Topic: r.topic, + Partition: partition, + MinBytes: 1, + MaxBytes: 10e6, + }) + defer kr.Close() + if err := kr.SetOffset(offset); err != nil { + return kafka.Message{}, err + } + return kr.ReadMessage(ctx) +} diff --git a/core/integration/lhc/plugin.go b/core/integration/lhc/plugin.go index 1d5853d9..28cbcc74 100644 --- a/core/integration/lhc/plugin.go +++ b/core/integration/lhc/plugin.go @@ -28,16 +28,16 @@ import ( "context" "encoding/json" "errors" - "github.com/AliceO2Group/Control/common/event/topic" - "github.com/AliceO2Group/Control/common/logger/infologger" - pb "github.com/AliceO2Group/Control/common/protos" "io" "strings" "sync" "time" cmnevent "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/logger/infologger" + pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/environment" "github.com/AliceO2Group/Control/core/integration" @@ -51,10 +51,8 @@ var dipClientTopic topic.Topic = "dip.lhc.beam_mode" // Plugin implements integration.Plugin and listens for LHC updates. type Plugin struct { - endpoint string - ctx context.Context - //cancel context.CancelFunc - //wg sync.WaitGroup + endpoint string + ctx context.Context mu sync.Mutex currentState *pb.BeamInfo reader cmnevent.Reader @@ -66,21 +64,73 @@ func NewPlugin(endpoint string) integration.Plugin { } func (p *Plugin) Init(_ string) error { - // use a background context for reader loop; Destroy will Close the reader p.ctx = context.Background() - p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "o2-aliecs-core.lhc", true) - + p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "o2-aliecs-core.lhc") if p.reader == nil { return errors.New("could not create a kafka reader for LHC plugin") } - go p.readAndInjectLhcUpdates() - log.Debug("LHC plugin initialized (client started)") + // Always perform a short pre-drain to consume any backlog without injecting. + log.WithField(infologger.Level, infologger.IL_Devel). + Info("LHC plugin: draining any initial backlog") + p.drainBacklog(2 * time.Second) + + // If state is still empty, try reading the latest message once. + p.mu.Lock() + empty := p.currentState == nil || p.currentState.BeamMode == pb.BeamMode_UNKNOWN + p.mu.Unlock() + if empty { + if last, err := p.reader.Last(p.ctx); err != nil { + log.WithField(infologger.Level, infologger.IL_Support).WithError(err).Warn("failed to read last LHC state on init") + } else if last != nil { + if bmEvt := last.GetBeamModeEvent(); bmEvt != nil && bmEvt.GetBeamInfo() != nil { + p.mu.Lock() + p.currentState = bmEvt.GetBeamInfo() + p.mu.Unlock() + } + } else { + // nothing to retrieve in the topic, we move on + } + } + + go p.readAndInjectLhcUpdates() + log.WithField(infologger.Level, infologger.IL_Devel).Debug("LHC plugin initialized (client started)") return nil } +// drainBacklog reads messages for a limited time and only updates the plugin state, without injecting to env manager. +func (p *Plugin) drainBacklog(timeout time.Duration) { + drainCtx, cancel := context.WithTimeout(p.ctx, timeout) + defer cancel() + for { + msg, err := p.reader.Next(drainCtx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + break + } + // transient error: small sleep and continue until timeout + time.Sleep(50 * time.Millisecond) + continue + } + if msg == nil { + continue + } + if beamModeEvent := msg.GetBeamModeEvent(); beamModeEvent != nil && beamModeEvent.GetBeamInfo() != nil { + beamInfo := beamModeEvent.GetBeamInfo() + log.WithField(infologger.Level, infologger.IL_Devel). + Debugf("new LHC update received while draining backlog: BeamMode=%s, FillNumber=%d, FillingScheme=%s, StableBeamsStart=%d, StableBeamsEnd=%d, BeamType=%s", + beamInfo.GetBeamMode().String(), beamInfo.GetFillNumber(), beamInfo.GetFillingSchemeName(), + beamInfo.GetStableBeamsStart(), beamInfo.GetStableBeamsEnd(), beamInfo.GetBeamType()) + + p.mu.Lock() + p.currentState = beamModeEvent.GetBeamInfo() + p.mu.Unlock() + } + } +} + func (p *Plugin) GetName() string { return "lhc" } func (p *Plugin) GetPrettyName() string { return "LHC (DIP/Kafka client)" } func (p *Plugin) GetEndpoint() string {