From 3316c34de519899d6d33c49b489e970293b8e96b Mon Sep 17 00:00:00 2001 From: zhudi Date: Fri, 7 Nov 2025 16:22:43 +0800 Subject: [PATCH 1/2] =?UTF-8?q?kq:=20=E6=94=AF=E6=8C=81=20kafka.Writer=20B?= =?UTF-8?q?atchTimeout/BatchSize/BatchBytes=20=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 WithBatchTimeout 选项,支持配置批量发送超时时间 - 新增 WithBatchSize 选项,支持配置批量发送消息数量 - 新增 WithBatchBytes 选项,支持配置批量发送字节大小 - 补充完整的单元测试,覆盖单个配置和组合配置场景 涉及文件: - kq/pusher.go: 添加三个配置选项和对应的 PushOption 函数 - kq/pusher_test.go: 添加对应的单元测试用例 --- kq/pusher.go | 33 +++++++++++++++++++++++++++++ kq/pusher_test.go | 54 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/kq/pusher.go b/kq/pusher.go index b254d4c..6ef9fa9 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -30,6 +30,9 @@ type ( // kafka.Writer options allowAutoTopicCreation bool balancer kafka.Balancer + batchTimeout time.Duration + batchSize int + batchBytes int64 // executors.ChunkExecutor options chunkSize int @@ -59,6 +62,15 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { if options.balancer != nil { producer.Balancer = options.balancer } + if options.batchTimeout > 0 { + producer.BatchTimeout = options.batchTimeout + } + if options.batchSize > 0 { + producer.BatchSize = options.batchSize + } + if options.batchBytes > 0 { + producer.BatchBytes = options.batchBytes + } pusher := &Pusher{ producer: producer, @@ -178,3 +190,24 @@ func WithSyncPush() PushOption { options.syncPush = true } } + +// WithBatchTimeout customizes the Pusher with the given batch timeout. +func WithBatchTimeout(timeout time.Duration) PushOption { + return func(options *pushOptions) { + options.batchTimeout = timeout + } +} + +// WithBatchSize customizes the Pusher with the given batch size. +func WithBatchSize(size int) PushOption { + return func(options *pushOptions) { + options.batchSize = size + } +} + +// WithBatchBytes customizes the Pusher with the given batch bytes. +func WithBatchBytes(bytes int64) PushOption { + return func(options *pushOptions) { + options.batchBytes = bytes + } +} diff --git a/kq/pusher_test.go b/kq/pusher_test.go index 72b380d..83047b1 100644 --- a/kq/pusher_test.go +++ b/kq/pusher_test.go @@ -63,6 +63,39 @@ func TestNewPusher(t *testing.T) { assert.NotNil(t, pusher) assert.True(t, pusher.producer.(*kafka.Writer).AllowAutoTopicCreation) }) + + t.Run("WithBatchTimeout", func(t *testing.T) { + timeout := time.Second * 5 + pusher := NewPusher(addrs, topic, WithBatchTimeout(timeout)) + assert.NotNil(t, pusher) + assert.Equal(t, timeout, pusher.producer.(*kafka.Writer).BatchTimeout) + }) + + t.Run("WithBatchSize", func(t *testing.T) { + batchSize := 100 + pusher := NewPusher(addrs, topic, WithBatchSize(batchSize)) + assert.NotNil(t, pusher) + assert.Equal(t, batchSize, pusher.producer.(*kafka.Writer).BatchSize) + }) + + t.Run("WithBatchBytes", func(t *testing.T) { + batchBytes := int64(1024 * 1024) // 1MB + pusher := NewPusher(addrs, topic, WithBatchBytes(batchBytes)) + assert.NotNil(t, pusher) + assert.Equal(t, batchBytes, pusher.producer.(*kafka.Writer).BatchBytes) + }) + + t.Run("WithMultipleBatchOptions", func(t *testing.T) { + timeout := time.Second * 3 + batchSize := 50 + batchBytes := int64(512 * 1024) // 512KB + pusher := NewPusher(addrs, topic, WithBatchTimeout(timeout), WithBatchSize(batchSize), WithBatchBytes(batchBytes)) + assert.NotNil(t, pusher) + writer := pusher.producer.(*kafka.Writer) + assert.Equal(t, timeout, writer.BatchTimeout) + assert.Equal(t, batchSize, writer.BatchSize) + assert.Equal(t, batchBytes, writer.BatchBytes) + }) } func TestPusher_Close(t *testing.T) { @@ -139,3 +172,24 @@ func TestPusher_PushWithKey_Error(t *testing.T) { assert.Equal(t, expectedError, err) mockWriter.AssertExpectations(t) } + +func TestWithBatchTimeout(t *testing.T) { + options := &pushOptions{} + timeout := time.Second * 5 + WithBatchTimeout(timeout)(options) + assert.Equal(t, timeout, options.batchTimeout) +} + +func TestWithBatchSize(t *testing.T) { + options := &pushOptions{} + batchSize := 100 + WithBatchSize(batchSize)(options) + assert.Equal(t, batchSize, options.batchSize) +} + +func TestWithBatchBytes(t *testing.T) { + options := &pushOptions{} + batchBytes := int64(1024 * 1024) + WithBatchBytes(batchBytes)(options) + assert.Equal(t, batchBytes, options.batchBytes) +} From c274e9c11b85e9a8a37acded53dac5eae7ab873d Mon Sep 17 00:00:00 2001 From: zhudi Date: Mon, 10 Nov 2025 14:30:34 +0800 Subject: [PATCH 2/2] =?UTF-8?q?kq:=20=E6=94=AF=E6=8C=81=20kafka.Writer=20B?= =?UTF-8?q?atchTimeout/BatchBytes=20=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 WithBatchTimeout 选项,支持配置批量发送超时时间 - 新增 WithBatchBytes 选项,支持配置批量发送字节大小 - 补充完整的单元测试,覆盖单个配置和组合配置场景 涉及文件: - kq/pusher.go: 添加两个配置选项和对应的 PushOption 函数 - kq/pusher_test.go: 添加对应的单元测试用例 --- kq/pusher.go | 11 ----------- kq/pusher_test.go | 18 +----------------- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/kq/pusher.go b/kq/pusher.go index 6ef9fa9..02e6f23 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -31,7 +31,6 @@ type ( allowAutoTopicCreation bool balancer kafka.Balancer batchTimeout time.Duration - batchSize int batchBytes int64 // executors.ChunkExecutor options @@ -65,9 +64,6 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { if options.batchTimeout > 0 { producer.BatchTimeout = options.batchTimeout } - if options.batchSize > 0 { - producer.BatchSize = options.batchSize - } if options.batchBytes > 0 { producer.BatchBytes = options.batchBytes } @@ -198,13 +194,6 @@ func WithBatchTimeout(timeout time.Duration) PushOption { } } -// WithBatchSize customizes the Pusher with the given batch size. -func WithBatchSize(size int) PushOption { - return func(options *pushOptions) { - options.batchSize = size - } -} - // WithBatchBytes customizes the Pusher with the given batch bytes. func WithBatchBytes(bytes int64) PushOption { return func(options *pushOptions) { diff --git a/kq/pusher_test.go b/kq/pusher_test.go index 83047b1..6d2396b 100644 --- a/kq/pusher_test.go +++ b/kq/pusher_test.go @@ -71,13 +71,6 @@ func TestNewPusher(t *testing.T) { assert.Equal(t, timeout, pusher.producer.(*kafka.Writer).BatchTimeout) }) - t.Run("WithBatchSize", func(t *testing.T) { - batchSize := 100 - pusher := NewPusher(addrs, topic, WithBatchSize(batchSize)) - assert.NotNil(t, pusher) - assert.Equal(t, batchSize, pusher.producer.(*kafka.Writer).BatchSize) - }) - t.Run("WithBatchBytes", func(t *testing.T) { batchBytes := int64(1024 * 1024) // 1MB pusher := NewPusher(addrs, topic, WithBatchBytes(batchBytes)) @@ -87,13 +80,11 @@ func TestNewPusher(t *testing.T) { t.Run("WithMultipleBatchOptions", func(t *testing.T) { timeout := time.Second * 3 - batchSize := 50 batchBytes := int64(512 * 1024) // 512KB - pusher := NewPusher(addrs, topic, WithBatchTimeout(timeout), WithBatchSize(batchSize), WithBatchBytes(batchBytes)) + pusher := NewPusher(addrs, topic, WithBatchTimeout(timeout), WithBatchBytes(batchBytes)) assert.NotNil(t, pusher) writer := pusher.producer.(*kafka.Writer) assert.Equal(t, timeout, writer.BatchTimeout) - assert.Equal(t, batchSize, writer.BatchSize) assert.Equal(t, batchBytes, writer.BatchBytes) }) } @@ -180,13 +171,6 @@ func TestWithBatchTimeout(t *testing.T) { assert.Equal(t, timeout, options.batchTimeout) } -func TestWithBatchSize(t *testing.T) { - options := &pushOptions{} - batchSize := 100 - WithBatchSize(batchSize)(options) - assert.Equal(t, batchSize, options.batchSize) -} - func TestWithBatchBytes(t *testing.T) { options := &pushOptions{} batchBytes := int64(1024 * 1024)