From 71cae8cf9283f7999c27831d67f1065abc5463f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Tue, 25 Feb 2025 17:44:26 +0100 Subject: [PATCH] [core] changes to distribution of kafka messages to partitions - added environment id as a key to messages containing environment id field - changed Balancer from LeastBytes to Hash which makes kafka-go puts messages with same key to the same partition, fixing ordering problems --- common/event/writer.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/common/event/writer.go b/common/event/writer.go index 9e329af4..49bb0d3c 100644 --- a/common/event/writer.go +++ b/common/event/writer.go @@ -62,7 +62,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter { Writer: &kafka.Writer{ Addr: kafka.TCP(viper.GetStringSlice("kafkaEndpoints")...), Topic: string(topic), - Balancer: &kafka.LeastBytes{}, + Balancer: &kafka.Hash{}, AllowAutoTopicCreation: true, }, } @@ -80,6 +80,18 @@ func (w *KafkaWriter) WriteEvent(e interface{}) { } } +type HasEnvID interface { + GetEnvironmentId() string +} + +func extractAndConvertEnvID[T HasEnvID](object T) []byte { + envID := []byte(object.GetEnvironmentId()) + if len(envID) > 0 { + return envID + } + return nil +} + func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time) { if w == nil { return @@ -89,6 +101,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time var ( err error wrappedEvent *pb.Event + key []byte = nil ) switch e := e.(type) { @@ -111,36 +124,42 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e}, } case *pb.Ev_TaskEvent: + key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ Timestamp: timestamp.UnixMilli(), TimestampNano: timestamp.UnixNano(), Payload: &pb.Event_TaskEvent{TaskEvent: e}, } case *pb.Ev_RoleEvent: + key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ Timestamp: timestamp.UnixMilli(), TimestampNano: timestamp.UnixNano(), Payload: &pb.Event_RoleEvent{RoleEvent: e}, } case *pb.Ev_EnvironmentEvent: + key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ Timestamp: timestamp.UnixMilli(), TimestampNano: timestamp.UnixNano(), Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e}, } case *pb.Ev_CallEvent: + key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ Timestamp: timestamp.UnixMilli(), TimestampNano: timestamp.UnixNano(), Payload: &pb.Event_CallEvent{CallEvent: e}, } case *pb.Ev_IntegratedServiceEvent: + key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ Timestamp: timestamp.UnixMilli(), TimestampNano: timestamp.UnixNano(), Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e}, } case *pb.Ev_RunEvent: + key = extractAndConvertEnvID(e) wrappedEvent = &pb.Event{ Timestamp: timestamp.UnixMilli(), TimestampNano: timestamp.UnixNano(), @@ -151,7 +170,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time if wrappedEvent == nil { err = fmt.Errorf("unsupported event type") } else { - err = w.doWriteEvent(wrappedEvent) + err = w.doWriteEvent(key, wrappedEvent) } if err != nil { @@ -162,7 +181,7 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time }() } -func (w *KafkaWriter) doWriteEvent(e *pb.Event) error { +func (w *KafkaWriter) doWriteEvent(key []byte, e *pb.Event) error { if w == nil { return nil } @@ -172,10 +191,15 @@ func (w *KafkaWriter) doWriteEvent(e *pb.Event) error { return fmt.Errorf("failed to marshal event: %w", err) } - err = w.WriteMessages(context.Background(), kafka.Message{ + message := kafka.Message{ Value: data, - }) + } + + if key != nil { + message.Key = key + } + err = w.WriteMessages(context.Background(), message) if err != nil { return fmt.Errorf("failed to write event: %w", err) }