Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 114 additions & 37 deletions common/event/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,44 +27,42 @@ package event
import (
"context"
"fmt"

"github.com/AliceO2Group/Control/common/event/topic"
"github.com/AliceO2Group/Control/common/logger/infologger"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/segmentio/kafka-go"
"github.com/spf13/viper"
"google.golang.org/protobuf/proto"
"sync"
)

// Reader interface provides methods to read events.
type Reader interface {
// Next should return the next event or cancel if the context is cancelled.
Next(ctx context.Context) (*pb.Event, error)
// Last should return the last available event currently present on the topic (or nil if none)
// or cancel if the context is cancelled.
Last(ctx context.Context) (*pb.Event, error)
Close() error
}

// DummyReader is an implementation of Reader that returns no events.
type DummyReader struct{}

// Next returns the next event or nil if there are no more events.
func (*DummyReader) Next(context.Context) (*pb.Event, error) { return nil, nil }

// Close closes the DummyReader.
func (*DummyReader) Close() error { return nil }
func (*DummyReader) Last(context.Context) (*pb.Event, error) { return nil, nil }
func (*DummyReader) Close() error { return nil }

// KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events.
// Consumption mode is chosen at creation time:
// - latestOnly=false: consume everything (from stored offsets or beginning depending on group state)
// - latestOnly=true: seek to latest offsets on start and only receive messages produced after start
type KafkaReader struct {
*kafka.Reader
mu sync.Mutex
topic string
topic string
brokers []string
groupID string
}

// NewReaderWithTopic creates a KafkaReader for the provided topic and starts it.
// If latestOnly is true the reader attempts to seek to the latest offsets on start so that
// only new messages (produced after creation) are consumed.
func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader {
func NewReaderWithTopic(topic topic.Topic, groupID string) *KafkaReader {
cfg := kafka.ReaderConfig{
Brokers: viper.GetStringSlice("kafkaEndpoints"),
Topic: string(topic),
Expand All @@ -74,53 +72,80 @@ func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *Kaf
}

rk := &KafkaReader{
Reader: kafka.NewReader(cfg),
topic: string(topic),
}

if latestOnly {
// best-effort: set offset to last so we don't replay older messages
if err := rk.SetOffset(kafka.LastOffset); err != nil {
log.WithField(infologger.Level, infologger.IL_Devel).
Warnf("failed to set offset to last offset: %v", err)
}
Reader: kafka.NewReader(cfg),
topic: string(topic),
brokers: append([]string{}, cfg.Brokers...),
groupID: groupID,
}

return rk
}

// Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed
// (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx.
// Next blocks until the next event is available or ctx is cancelled.
func (r *KafkaReader) Next(ctx context.Context) (*pb.Event, error) {
if r == nil {
return nil, fmt.Errorf("nil reader")
}

msg, err := r.ReadMessage(ctx)
if err != nil {
return nil, err
}
return kafkaMessageToEvent(msg)
}

event, err := kafkaMessageToEvent(msg)
// Last fetches the last available message on the topic (considering all partitions).
// If multiple partitions have data, the event with the greatest message timestamp is returned.
func (r *KafkaReader) Last(ctx context.Context) (*pb.Event, error) {
if r == nil {
return nil, fmt.Errorf("nil reader")
}
partitions, err := r.readPartitions()
if err != nil {
return nil, err
}

return event, nil
var latestEvt *pb.Event
var latestEvtTimeNs int64
for _, p := range partitions {
if p.Topic != r.topic {
continue
}
first, last, err := r.readFirstAndLast(p.ID)
if err != nil {
log.WithField(infologger.Level, infologger.IL_Devel).WithError(err).
Warnf("failed to read offsets for %s[%d]", r.topic, p.ID)
continue
}
if last <= first {
continue
}
msg, err := r.readAtOffset(ctx, p.ID, last-1)
if err != nil {
log.WithError(err).
WithField(infologger.Level, infologger.IL_Devel).
Warnf("failed to read last message for %s[%d] at offset %d", r.topic, p.ID, last-1)
continue
}
evt, err := kafkaMessageToEvent(msg)
if err != nil {
log.WithError(err).
WithField(infologger.Level, infologger.IL_Devel).
Warnf("failed to decode last message for %s[%d]", r.topic, p.ID)
continue
}
currentEvtTimeNs := msg.Time.UnixNano()
if latestEvt == nil || currentEvtTimeNs > latestEvtTimeNs {
latestEvt = evt
latestEvtTimeNs = currentEvtTimeNs
}
}
return latestEvt, nil
}

// Close stops the reader.
func (r *KafkaReader) Close() error {
if r == nil {
return nil
}
// Close the underlying kafka reader which will cause ReadMessage to return an error
err := r.Reader.Close()
if err != nil {
log.WithField(infologger.Level, infologger.IL_Devel).
Errorf("failed to close kafka reader: %v", err)
}
return err
return r.Reader.Close()
}

func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) {
Expand All @@ -130,3 +155,55 @@ func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) {
}
return &evt, nil
}

func (r *KafkaReader) brokerAddr() (string, error) {
if len(r.brokers) == 0 {
return "", fmt.Errorf("no kafka brokers configured")
}
return r.brokers[0], nil
}

func (r *KafkaReader) readPartitions() ([]kafka.Partition, error) {
addr, err := r.brokerAddr()
if err != nil {
return nil, err
}
conn, err := kafka.Dial("tcp", addr)
if err != nil {
return nil, err
}
defer conn.Close()
return conn.ReadPartitions(r.topic)
}

func (r *KafkaReader) readFirstAndLast(partition int) (int64, int64, error) {
addr, err := r.brokerAddr()
if err != nil {
return 0, 0, err
}
conn, err := kafka.DialLeader(context.Background(), "tcp", addr, r.topic, partition)
if err != nil {
return 0, 0, err
}
defer conn.Close()
first, last, err := conn.ReadOffsets()
return first, last, err
}

func (r *KafkaReader) readAtOffset(ctx context.Context, partition int, offset int64) (kafka.Message, error) {
if offset < 0 {
return kafka.Message{}, fmt.Errorf("invalid offset %d", offset)
}
kr := kafka.NewReader(kafka.ReaderConfig{
Brokers: append([]string{}, r.brokers...),
Topic: r.topic,
Partition: partition,
MinBytes: 1,
MaxBytes: 10e6,
})
defer kr.Close()
if err := kr.SetOffset(offset); err != nil {
return kafka.Message{}, err
}
return kr.ReadMessage(ctx)
}
74 changes: 62 additions & 12 deletions core/integration/lhc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ import (
"context"
"encoding/json"
"errors"
"github.com/AliceO2Group/Control/common/event/topic"
"github.com/AliceO2Group/Control/common/logger/infologger"
pb "github.com/AliceO2Group/Control/common/protos"
"io"
"strings"
"sync"
"time"

cmnevent "github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/event/topic"
"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/common/logger/infologger"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/AliceO2Group/Control/common/utils/uid"
"github.com/AliceO2Group/Control/core/environment"
"github.com/AliceO2Group/Control/core/integration"
Expand All @@ -51,10 +51,8 @@ var dipClientTopic topic.Topic = "dip.lhc.beam_mode"

// Plugin implements integration.Plugin and listens for LHC updates.
type Plugin struct {
endpoint string
ctx context.Context
//cancel context.CancelFunc
//wg sync.WaitGroup
endpoint string
ctx context.Context
mu sync.Mutex
currentState *pb.BeamInfo
reader cmnevent.Reader
Expand All @@ -66,21 +64,73 @@ func NewPlugin(endpoint string) integration.Plugin {
}

func (p *Plugin) Init(_ string) error {

// use a background context for reader loop; Destroy will Close the reader
p.ctx = context.Background()

p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "o2-aliecs-core.lhc", true)

p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "o2-aliecs-core.lhc")
if p.reader == nil {
return errors.New("could not create a kafka reader for LHC plugin")
}
go p.readAndInjectLhcUpdates()

log.Debug("LHC plugin initialized (client started)")
// Always perform a short pre-drain to consume any backlog without injecting.
log.WithField(infologger.Level, infologger.IL_Devel).
Info("LHC plugin: draining any initial backlog")
p.drainBacklog(2 * time.Second)

// If state is still empty, try reading the latest message once.
p.mu.Lock()
empty := p.currentState == nil || p.currentState.BeamMode == pb.BeamMode_UNKNOWN
p.mu.Unlock()
if empty {
if last, err := p.reader.Last(p.ctx); err != nil {
log.WithField(infologger.Level, infologger.IL_Support).WithError(err).Warn("failed to read last LHC state on init")
} else if last != nil {
if bmEvt := last.GetBeamModeEvent(); bmEvt != nil && bmEvt.GetBeamInfo() != nil {
p.mu.Lock()
p.currentState = bmEvt.GetBeamInfo()
p.mu.Unlock()
}
} else {
// nothing to retrieve in the topic, we move on
}
}

go p.readAndInjectLhcUpdates()
log.WithField(infologger.Level, infologger.IL_Devel).Debug("LHC plugin initialized (client started)")
return nil
}

// drainBacklog reads messages for a limited time and only updates the plugin state, without injecting to env manager.
func (p *Plugin) drainBacklog(timeout time.Duration) {
drainCtx, cancel := context.WithTimeout(p.ctx, timeout)
defer cancel()
for {
msg, err := p.reader.Next(drainCtx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
break
}
// transient error: small sleep and continue until timeout
time.Sleep(50 * time.Millisecond)
continue
}
if msg == nil {
continue
}
if beamModeEvent := msg.GetBeamModeEvent(); beamModeEvent != nil && beamModeEvent.GetBeamInfo() != nil {
beamInfo := beamModeEvent.GetBeamInfo()
log.WithField(infologger.Level, infologger.IL_Devel).
Debugf("new LHC update received while draining backlog: BeamMode=%s, FillNumber=%d, FillingScheme=%s, StableBeamsStart=%d, StableBeamsEnd=%d, BeamType=%s",
beamInfo.GetBeamMode().String(), beamInfo.GetFillNumber(), beamInfo.GetFillingSchemeName(),
beamInfo.GetStableBeamsStart(), beamInfo.GetStableBeamsEnd(), beamInfo.GetBeamType())

p.mu.Lock()
p.currentState = beamModeEvent.GetBeamInfo()
p.mu.Unlock()
}
}
}

func (p *Plugin) GetName() string { return "lhc" }
func (p *Plugin) GetPrettyName() string { return "LHC (DIP/Kafka client)" }
func (p *Plugin) GetEndpoint() string {
Expand Down