From 9fdfe22ec63ed797d6530133da11d97b7dd06911 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Tue, 29 Apr 2025 14:36:18 +0300 Subject: [PATCH 1/2] feat: Add Backoff interface to support customizable backoff policies (#8) --- backoff.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/backoff.go b/backoff.go index 851e72b..4f54943 100644 --- a/backoff.go +++ b/backoff.go @@ -5,6 +5,22 @@ import ( "time" ) +var DefaultBackoffPolicy = BackoffPolicy{ + BaseDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + UseJitter: true, + JitterRangeMs: 300, +} + +// Backoff defines the interface for calculating backoff delays between retries. +// Implementations of this interface can provide custom logic for determining +// how long to wait before retrying a failed task based on the number of retries. +type Backoff interface { + // Calculate returns the duration to wait before the next retry attempt. + // The input parameter retries indicates how many times the task has already been retried. + Calculate(retries int) time.Duration +} + // BackoffPolicy defines the configuration for handling retries with backoff logic. // It provides settings for base delay, maximum delay, jitter, and the range for jitter. type BackoffPolicy struct { From 1190330213ff2e049d90461f6c7fd924a2d2c405 Mon Sep 17 00:00:00 2001 From: KengoWada Date: Tue, 29 Apr 2025 14:37:05 +0300 Subject: [PATCH 2/2] refactor: Update worker and manager to use Backoff interface (#8) --- manager.go | 12 +++--------- worker.go | 4 ++-- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/manager.go b/manager.go index fc79282..3dcc4a3 100644 --- a/manager.go +++ b/manager.go @@ -17,7 +17,7 @@ type ManagerOption func(*ManagerConfig) // ManagerConfig holds configuration options for the Manager. type ManagerConfig struct { - BackoffPolicy *BackoffPolicy + BackoffPolicy Backoff } // Manager is responsible for managing the lifecycle of workers, @@ -37,7 +37,7 @@ type Manager struct { // Example usage: // // manager := NewManager(bp, numOfWorkers, WithBackoffPolicy(customBackoffPolicy)) -func WithBackoffPolicy(bp *BackoffPolicy) ManagerOption { +func WithBackoffPolicy(bp Backoff) ManagerOption { return func(cfg *ManagerConfig) { cfg.BackoffPolicy = bp } @@ -79,14 +79,8 @@ func NewManager(broker Broker, wf WorkerFactory, numWorkers int, opts ...Manager opt(managerConfig) } - // Set default backoff if cfg.BackoffPolicy == nil if managerConfig.BackoffPolicy == nil { - managerConfig.BackoffPolicy = &BackoffPolicy{ - BaseDelay: 1 * time.Second, - MaxDelay: 30 * time.Second, - UseJitter: true, - JitterRangeMs: 300, - } + managerConfig.BackoffPolicy = &DefaultBackoffPolicy } manager := &Manager{broker: broker, ctx: ctx, cancel: cancel} diff --git a/worker.go b/worker.go index c2fa28a..bb61f84 100644 --- a/worker.go +++ b/worker.go @@ -65,7 +65,7 @@ type WorkerFactory func(cfg WorkerConfig) Worker type WorkerConfig struct { ID int Broker Broker - Backoff *BackoffPolicy + Backoff Backoff WG *sync.WaitGroup } @@ -86,7 +86,7 @@ type WorkerConfig struct { type DefaultWorker struct { id int broker Broker - backoff *BackoffPolicy + backoff Backoff handlers map[string]TaskHandlerFunc wg *sync.WaitGroup }