From 1f992773cc55dda5e4e3a588c862de35ed26374b Mon Sep 17 00:00:00 2001 From: "rajat.sahu" Date: Wed, 15 Oct 2025 16:42:45 +0530 Subject: [PATCH 01/11] Enhanced EnqueueIn function to run job after atleast given seconds --- enqueue.go | 9 ++++++++- enqueue_test.go | 7 +++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/enqueue.go b/enqueue.go index 595d1a2b..30946b3f 100644 --- a/enqueue.go +++ b/enqueue.go @@ -97,8 +97,15 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri conn := e.Pool.Get() defer conn.Close() + now := time.Now() + // If there are any milliseconds, round up to next second + secondsToAdd := secondsFromNow + if now.Nanosecond() > 0 { + secondsToAdd++ // Add one extra second to compensate + } + scheduledJob := &ScheduledJob{ - RunAt: nowEpochSeconds() + secondsFromNow, + RunAt: now.Unix() + secondsToAdd, Job: job, } diff --git a/enqueue_test.go b/enqueue_test.go index 651e4ce2..4ae32752 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -168,7 +168,8 @@ func TestEnqueueIn(t *testing.T) { assert.Equal(t, "cool", job.ArgString("b")) assert.EqualValues(t, 1, job.ArgInt64("a")) assert.NoError(t, job.ArgError()) - assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) + assert.True(t, job.RunAt >= job.EnqueuedAt+300) + assert.True(t, job.RunAt <= job.EnqueuedAt+301) } // Make sure "wat" is in the known jobs @@ -185,7 +186,7 @@ func TestEnqueueIn(t *testing.T) { score, j := jobOnZset(pool, redisKeyScheduled(ns)) assert.True(t, score > time.Now().Unix()+290) - assert.True(t, score <= time.Now().Unix()+300) + assert.True(t, score <= time.Now().Unix()+301) assert.Equal(t, "wat", j.Name) assert.True(t, len(j.ID) > 10) // Something is in it @@ -273,9 +274,11 @@ func TestEnqueueIn_WithMock(t *testing.T) { enqueuer := NewEnqueuerWithOptions(ns, pool, tt.enqueuerOption) if tt.mockZadd != nil { conn.Command("ZADD", "work:scheduled", now+secondsFromNow, redigomock.NewAnyData()).Expect(*tt.mockZadd) + conn.Command("ZADD", "work:scheduled", now+secondsFromNow+1, redigomock.NewAnyData()).Expect(*tt.mockZadd) } if tt.mockZaddErr != nil { conn.Command("ZADD", "work:scheduled", now+secondsFromNow, redigomock.NewAnyData()).ExpectError(tt.mockZaddErr) + conn.Command("ZADD", "work:scheduled", now+secondsFromNow+1, redigomock.NewAnyData()).ExpectError(tt.mockZaddErr) } if tt.mockWait != nil { conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).Expect(*tt.mockWait) From ab20ab3caecffa320640ca8f2ac9ab9f89112bdd Mon Sep 17 00:00:00 2001 From: email Date: Wed, 22 Oct 2025 16:29:49 +0530 Subject: [PATCH 02/11] refactored time calc to a function --- enqueue.go | 9 ++------- time.go | 12 +++++++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/enqueue.go b/enqueue.go index 30946b3f..077d0dac 100644 --- a/enqueue.go +++ b/enqueue.go @@ -97,15 +97,10 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri conn := e.Pool.Get() defer conn.Close() - now := time.Now() - // If there are any milliseconds, round up to next second - secondsToAdd := secondsFromNow - if now.Nanosecond() > 0 { - secondsToAdd++ // Add one extra second to compensate - } + secondsToAdd := epochAfterSeconds(secondsFromNow) scheduledJob := &ScheduledJob{ - RunAt: now.Unix() + secondsToAdd, + RunAt: secondsToAdd, Job: job, } diff --git a/time.go b/time.go index a4548364..81379f6b 100644 --- a/time.go +++ b/time.go @@ -19,7 +19,13 @@ func resetNowEpochSecondsMock() { nowMock = 0 } -// convert epoch seconds to a time -func epochSecondsToTime(t int64) time.Time { - return time.Time{} +func epochAfterSeconds(seconds int64) int64 { + if nowMock != 0 { + return nowMock + seconds + } + t := time.Now().Add(time.Second * time.Duration(seconds)) + if t.Nanosecond() > 0 { + t = t.Add(time.Second) + } + return t.Unix() } From d29555cc7c3fee8c7fdbb93490d230678842f58b Mon Sep 17 00:00:00 2001 From: email Date: Mon, 27 Oct 2025 17:30:49 +0530 Subject: [PATCH 03/11] Added EnqueuedAt method to support job scheduling at a specific epochSeconds --- enqueue.go | 35 +++++++++++++ enqueue_test.go | 130 ++++++++++++++++++++++++++++++++++++++++++++++++ worker_pool.go | 10 ++-- 3 files changed, 170 insertions(+), 5 deletions(-) diff --git a/enqueue.go b/enqueue.go index 077d0dac..975811bb 100644 --- a/enqueue.go +++ b/enqueue.go @@ -116,6 +116,41 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri return scheduledJob, nil } +func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) { + job := &Job{ + Name: jobName, + ID: makeIdentifier(), + EnqueuedAt: nowEpochSeconds(), + Args: args, + } + if epochSeconds < job.EnqueuedAt { + return nil, errors.New("epochSeconds must be a greater value than current time") + } + rawJSON, err := job.serialize() + if err != nil { + return nil, err + } + + conn := e.Pool.Get() + defer conn.Close() + + scheduledJob := &ScheduledJob{ + RunAt: epochSeconds, + Job: job, + } + + _, err = e.redisDoHelper(conn, "ZADD", redisKeyScheduled(e.Namespace), scheduledJob.RunAt, rawJSON) + if err != nil { + return nil, err + } + + if err := e.addToKnownJobs(conn, jobName); err != nil { + return scheduledJob, err + } + + return scheduledJob, nil +} + // EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. // The already-enqueued job can be in the normal work queue or in the scheduled job queue. // Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. diff --git a/enqueue_test.go b/enqueue_test.go index 4ae32752..4374fb83 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -294,6 +294,136 @@ func TestEnqueueIn_WithMock(t *testing.T) { } } +func TestEnqueueAt(t *testing.T) { + pool := newTestPool(t) + ns := "work" + cleanKeyspace(ns, pool) + enqueuer := NewEnqueuer(ns, pool) + + now := time.Now().Unix() + runAt := now + 300 + + job, err := enqueuer.EnqueueAt("wat", runAt, Q{"a": 1, "b": "cool"}) + assert.Nil(t, err) + if assert.NotNil(t, job) { + assert.Equal(t, "wat", job.Name) + assert.True(t, len(job.ID) > 10) + assert.True(t, job.EnqueuedAt >= now) + assert.Equal(t, "cool", job.ArgString("b")) + assert.EqualValues(t, 1, job.ArgInt64("a")) + assert.NoError(t, job.ArgError()) + assert.EqualValues(t, runAt, job.RunAt) + } + + assert.EqualValues(t, []string{"wat"}, knownJobs(pool, redisKeyKnownJobs(ns))) + expiresAt := enqueuer.knownJobs["wat"] + assert.True(t, expiresAt > (time.Now().Unix()+290)) + assert.EqualValues(t, 1, zsetSize(pool, redisKeyScheduled(ns))) + + score, j := jobOnZset(pool, redisKeyScheduled(ns)) + assert.EqualValues(t, runAt, score) + assert.Equal(t, "wat", j.Name) + assert.True(t, len(j.ID) > 10) + assert.Equal(t, "cool", j.ArgString("b")) + assert.EqualValues(t, 1, j.ArgInt64("a")) + assert.NoError(t, j.ArgError()) +} + +func TestEnqueueAt_WithMock(t *testing.T) { + ns := "work" + jobName := "test" + jobArgs := map[string]interface{}{"arg": "value"} + now := time.Now().Unix() + runAt := now + 100 + setNowEpochSecondsMock(now) + defer resetNowEpochSecondsMock() + + var cases = []struct { + name string + enqueuerOption EnqueuerOption + mockZadd *int64 + mockZaddErr error + mockWait *int64 + mockWaitErr error + expectedError error + }{ + { + name: "Success without wait", + mockZadd: &one, + }, { + name: "Failure without wait", + mockZaddErr: errors.New("zadd failure"), + expectedError: errors.New("zadd failure"), + }, { + name: "Failure with wait", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockZadd: &one, + mockWaitErr: errors.New("wait failure"), + expectedError: errors.New("wait failure"), + }, { + name: "When wait return zero", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockZadd: &one, + mockWait: &zero, + expectedError: ErrReplicationFailed, + }, { + name: "When wait return less than MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockZadd: &one, + mockWait: &one, + expectedError: ErrReplicationFailed, + }, { + name: "When wait return same as MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockZadd: &one, + mockWait: &two, + }, { + name: "When wait return more than MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockZadd: &one, + mockWait: &three, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + pool, conn := newMockTestPool(t) + enqueuer := NewEnqueuerWithOptions(ns, pool, tt.enqueuerOption) + if tt.mockZadd != nil { + conn.Command("ZADD", "work:scheduled", runAt, redigomock.NewAnyData()).Expect(*tt.mockZadd) + } + if tt.mockZaddErr != nil { + conn.Command("ZADD", "work:scheduled", runAt, redigomock.NewAnyData()).ExpectError(tt.mockZaddErr) + } + if tt.mockWait != nil { + conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).Expect(*tt.mockWait) + } + if tt.mockWaitErr != nil { + conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).ExpectError(tt.mockWaitErr) + } + conn.Command("SADD", "work:known_jobs", jobName).Expect(1) + + _, err := enqueuer.EnqueueAt(jobName, runAt, jobArgs) + assert.Equal(t, tt.expectedError, err) + }) + } +} + func TestEnqueueUnique(t *testing.T) { pool := newTestPool(t) ns := "work" diff --git a/worker_pool.go b/worker_pool.go index 9bd72fd1..cd90751e 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -346,11 +346,11 @@ func validateMiddlewareType(ctxType reflect.Type, vfn reflect.Value) { // Since it's easy to pass the wrong method as a middleware/handler, and since the user can't rely on static type checking since we use reflection, // lets be super helpful about what they did and what they need to do. // Arguments: -// - vfn is the failed method -// - addingType is for "You are adding {addingType} to a worker pool...". Eg, "middleware" or "a handler" -// - yourType is for "Your {yourType} function can have...". Eg, "middleware" or "handler" or "error handler" -// - args is like "rw web.ResponseWriter, req *web.Request, next web.NextMiddlewareFunc" -// - NOTE: args can be calculated if you pass in each type. BUT, it doesn't have example argument name, so it has less copy/paste value. +// - vfn is the failed method +// - addingType is for "You are adding {addingType} to a worker pool...". Eg, "middleware" or "a handler" +// - yourType is for "Your {yourType} function can have...". Eg, "middleware" or "handler" or "error handler" +// - args is like "rw web.ResponseWriter, req *web.Request, next web.NextMiddlewareFunc" +// - NOTE: args can be calculated if you pass in each type. BUT, it doesn't have example argument name, so it has less copy/paste value. func instructiveMessage(vfn reflect.Value, addingType string, yourType string, args string, ctxType reflect.Type) string { // Get context type without package. ctxString := ctxType.String() From 463554f594c0857516de554320a02e20ddd72baf Mon Sep 17 00:00:00 2001 From: email Date: Mon, 27 Oct 2025 17:56:07 +0530 Subject: [PATCH 04/11] Updated uniqueIn function to support ceil conversion --- enqueue.go | 2 +- enqueue_test.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/enqueue.go b/enqueue.go index 975811bb..6a4d3859 100644 --- a/enqueue.go +++ b/enqueue.go @@ -195,7 +195,7 @@ func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, ar } scheduledJob := &ScheduledJob{ - RunAt: nowEpochSeconds() + secondsFromNow, + RunAt: epochAfterSeconds(secondsFromNow), Job: job, } diff --git a/enqueue_test.go b/enqueue_test.go index 4374fb83..b5764270 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -616,7 +616,8 @@ func TestEnqueueUniqueIn(t *testing.T) { assert.Equal(t, "cool", job.ArgString("b")) assert.EqualValues(t, 1, job.ArgInt64("a")) assert.NoError(t, job.ArgError()) - assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) + assert.True(t, job.RunAt >= job.EnqueuedAt+300) + assert.True(t, job.RunAt <= job.EnqueuedAt+301) } job, err = enqueuer.EnqueueUniqueIn("wat", 10, Q{"a": 1, "b": "cool"}) @@ -627,7 +628,7 @@ func TestEnqueueUniqueIn(t *testing.T) { score, j := jobOnZset(pool, redisKeyScheduled(ns)) assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time - assert.True(t, score <= time.Now().Unix()+300) + assert.True(t, score <= time.Now().Unix()+301) assert.Equal(t, "wat", j.Name) assert.True(t, len(j.ID) > 10) // Something is in it @@ -960,7 +961,8 @@ func TestEnqueueUniqueInByKey(t *testing.T) { assert.Equal(t, "cool", job.ArgString("b")) assert.EqualValues(t, 1, job.ArgInt64("a")) assert.NoError(t, job.ArgError()) - assert.EqualValues(t, job.EnqueuedAt+300, job.RunAt) + assert.True(t, job.RunAt >= job.EnqueuedAt+300) + assert.True(t, job.RunAt <= job.EnqueuedAt+301) } job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) @@ -971,7 +973,7 @@ func TestEnqueueUniqueInByKey(t *testing.T) { score, j := jobOnZset(pool, redisKeyScheduled(ns)) assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time - assert.True(t, score <= time.Now().Unix()+300) + assert.True(t, score <= time.Now().Unix()+301) assert.Equal(t, "wat", j.Name) assert.True(t, len(j.ID) > 10) // Something is in it From 4578bd882478c8f02337ae075e5f243ff7da45f4 Mon Sep 17 00:00:00 2001 From: email Date: Fri, 31 Oct 2025 11:04:29 +0530 Subject: [PATCH 05/11] replaced error with default --- enqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enqueue.go b/enqueue.go index 6a4d3859..b8d67a55 100644 --- a/enqueue.go +++ b/enqueue.go @@ -124,7 +124,7 @@ func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string Args: args, } if epochSeconds < job.EnqueuedAt { - return nil, errors.New("epochSeconds must be a greater value than current time") + epochSeconds = job.EnqueuedAt + 1 } rawJSON, err := job.serialize() if err != nil { From ea4fb79180cb3739f52009b22abc76653f034679 Mon Sep 17 00:00:00 2001 From: email Date: Mon, 3 Nov 2025 11:24:44 +0530 Subject: [PATCH 06/11] added uniqueAt version of functions --- enqueue.go | 27 ++++ enqueue_test.go | 334 +++++++++++++++++------------------------------- 2 files changed, 144 insertions(+), 217 deletions(-) diff --git a/enqueue.go b/enqueue.go index b8d67a55..099c1a31 100644 --- a/enqueue.go +++ b/enqueue.go @@ -206,6 +206,33 @@ func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, ar return nil, err } +// EnqueueUniqueAt enqueues a unique job at the specified absolute epoch time in seconds. See EnqueueUnique for semantics of unique jobs. +func (e *Enqueuer) EnqueueUniqueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) { + return e.EnqueueUniqueAtByKey(jobName, epochSeconds, args, nil) +} + +// EnqueueUniqueAtByKey enqueues a job unique on specified key at the specified absolute epoch time in seconds, updating arguments. See EnqueueUnique for semantics of unique jobs. +func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) { + enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) + if err != nil { + return nil, err + } + if epochSeconds < job.EnqueuedAt { + epochSeconds = job.EnqueuedAt + 1 + } + + scheduledJob := &ScheduledJob{ + RunAt: epochSeconds, + Job: job, + } + + res, err := enqueue(&scheduledJob.RunAt) + if res == "ok" && err == nil { + return scheduledJob, nil + } + return nil, err +} + func (e *Enqueuer) addToKnownJobs(conn redis.Conn, jobName string) error { needSadd := true now := time.Now().Unix() diff --git a/enqueue_test.go b/enqueue_test.go index b5764270..2730b4ce 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -760,95 +760,47 @@ func TestEnqueueUniqueIn_WithMock(t *testing.T) { } } -func TestEnqueueUniqueByKey(t *testing.T) { - var arg3 string - var arg4 string - +func TestEnqueueUniqueAt(t *testing.T) { pool := newTestPool(t) ns := "work" cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) - var mutex = &sync.Mutex{} - job, err := enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "foo"}, Q{"key": "123"}) + + now := time.Now().Unix() + runAt := now + 300 + + job, err := enqueuer.EnqueueUniqueAt("wat", runAt, Q{"a": 1, "b": "cool"}) assert.NoError(t, err) if assert.NotNil(t, job) { assert.Equal(t, "wat", job.Name) - assert.True(t, len(job.ID) > 10) // Something is in it - assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds - assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "foo", job.ArgString("b")) - assert.EqualValues(t, 3, job.ArgInt64("a")) + assert.True(t, len(job.ID) > 10) + assert.True(t, job.EnqueuedAt >= now) + assert.Equal(t, "cool", job.ArgString("b")) + assert.EqualValues(t, 1, job.ArgInt64("a")) assert.NoError(t, job.ArgError()) + assert.EqualValues(t, runAt, job.RunAt) } - job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "bar"}, Q{"key": "123"}) + job, err = enqueuer.EnqueueUniqueAt("wat", runAt-200, Q{"a": 1, "b": "cool"}) assert.NoError(t, err) assert.Nil(t, job) - job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 4, "b": "baz"}, Q{"key": "124"}) - assert.NoError(t, err) - assert.NotNil(t, job) - - job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "125"}) - assert.NoError(t, err) - assert.NotNil(t, job) - - // Process the queues. Ensure the right number of jobs were processed - var wats, taws int64 - wp := NewWorkerPool(TestContext{}, 3, ns, pool) - wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { - mutex.Lock() - argA := job.Args["a"].(float64) - argB := job.Args["b"].(string) - if argA == 3 { - arg3 = argB - } - if argA == 4 { - arg4 = argB - } - - wats++ - mutex.Unlock() - return nil - }) - wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { - mutex.Lock() - taws++ - mutex.Unlock() - return fmt.Errorf("ohno") - }) - wp.Start() - wp.Drain() - wp.Stop() - - assert.EqualValues(t, 2, wats) - assert.EqualValues(t, 1, taws) - - // Check that arguments got updated to new value - assert.EqualValues(t, "bar", arg3) - assert.EqualValues(t, "baz", arg4) - - // Enqueue again. Ensure we can. - job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "cool"}, Q{"key": "123"}) - assert.NoError(t, err) - assert.NotNil(t, job) - - job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "coolio"}, Q{"key": "124"}) - assert.NoError(t, err) - assert.NotNil(t, job) + score, j := jobOnZset(pool, redisKeyScheduled(ns)) + assert.EqualValues(t, runAt, score) + assert.Equal(t, "wat", j.Name) + assert.True(t, j.Unique) - // Even though taw resulted in an error, we should still be able to re-queue it. - // This could result in multiple taws enqueued at the same time in a production system. - job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "123"}) + job, err = enqueuer.EnqueueUniqueAt("wat", runAt+600, Q{"a": 1, "b": "coolio"}) assert.NoError(t, err) assert.NotNil(t, job) } -func TestEnqueueUniqueByKey_WithMock(t *testing.T) { +func TestEnqueueUniqueAt_WithMock(t *testing.T) { ns := "work" jobName := "test" jobArgs := map[string]interface{}{"arg": "value"} - jobKeyMap := map[string]interface{}{"key": "value"} + + runAt := time.Now().Unix() + 100 ok := "ok" dup := "ok" @@ -859,76 +811,57 @@ func TestEnqueueUniqueByKey_WithMock(t *testing.T) { mockLEvalshaErr error mockWait *int64 mockWaitErr error - - expectedError error + expectedError error }{ + {name: "Success without wait", mockLEvalsha: &ok}, + {name: "Duplicate without wait", mockLEvalsha: &dup}, + {name: "Failure without wait", mockLEvalshaErr: errors.New("lpush failure"), expectedError: errors.New("lpush failure")}, { - name: "Success without wait", - mockLEvalsha: &ok, - }, { - name: "Duplicate without wait", - mockLEvalsha: &dup, - }, { - name: "Failure without wait", - mockLEvalshaErr: errors.New("lpush failure"), - expectedError: errors.New("lpush failure"), - }, { - name: "Failure with wait", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &ok, - mockWaitErr: errors.New("wait failure"), - expectedError: errors.New("wait failure"), - }, { - name: "When wait return zero", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &dup, - mockWait: &zero, - expectedError: ErrReplicationFailed, - }, { - name: "When wait return less than MinWaitReplicas", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &ok, - mockWait: &one, - expectedError: ErrReplicationFailed, - }, { - name: "When wait return same as MinWaitReplicas", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &dup, - mockWait: &two, - }, { - name: "When wait return more than MinWaitReplicas", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &ok, - mockWait: &three, + name: "Failure with wait", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &ok, + mockWaitErr: errors.New("wait failure"), + expectedError: errors.New("wait failure"), + }, + { + name: "When wait return zero", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &dup, + mockWait: &zero, + expectedError: ErrReplicationFailed, + }, + { + name: "When wait return less than MinWaitReplicas", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &ok, + mockWait: &one, + expectedError: ErrReplicationFailed, + }, + { + name: "When wait return same as MinWaitReplicas", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &dup, + mockWait: &two, + }, + { + name: "When wait return more than MinWaitReplicas", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &ok, + mockWait: &three, }, } for _, tt := range cases { + // uniqueKey same as EnqueueUnique test (args based) + uniqueKey := "work:unique:test:{\"arg\":\"value\"}\n" t.Run(tt.name, func(t *testing.T) { pool, conn := newMockTestPool(t) enqueuer := NewEnqueuerWithOptions(ns, pool, tt.enqueuerOption) - uniqueKey := `work:unique:test:{"key":"value"} -` if tt.mockLEvalsha != nil { - conn.Command("EVALSHA", "f38b6aef74017e799294b1ec4b74eb707deb0c17", 2, "work:jobs:test", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData()).Expect(*tt.mockLEvalsha) + conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), "1", runAt).Expect(*tt.mockLEvalsha) } if tt.mockLEvalshaErr != nil { - conn.Command("EVALSHA", "f38b6aef74017e799294b1ec4b74eb707deb0c17", 2, "work:jobs:test", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData()).ExpectError(tt.mockLEvalshaErr) + conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), "1", runAt).ExpectError(tt.mockLEvalshaErr) } if tt.mockWait != nil { conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).Expect(*tt.mockWait) @@ -938,62 +871,49 @@ func TestEnqueueUniqueByKey_WithMock(t *testing.T) { } conn.Command("SADD", "work:known_jobs", jobName).Expect(1) - _, err := enqueuer.EnqueueUniqueByKey(jobName, jobArgs, jobKeyMap) + _, err := enqueuer.EnqueueUniqueAt(jobName, runAt, jobArgs) assert.Equal(t, tt.expectedError, err) }) } } -func TestEnqueueUniqueInByKey(t *testing.T) { +func TestEnqueueUniqueAtByKey(t *testing.T) { pool := newTestPool(t) ns := "work" cleanKeyspace(ns, pool) enqueuer := NewEnqueuer(ns, pool) - // Enqueue two unique jobs -- ensure one job sticks. - job, err := enqueuer.EnqueueUniqueInByKey("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + now := time.Now().Unix() + runAt := now + 300 + + job, err := enqueuer.EnqueueUniqueAtByKey("wat", runAt, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) assert.NoError(t, err) - if assert.NotNil(t, job) { - assert.Equal(t, "wat", job.Name) - assert.True(t, len(job.ID) > 10) // Something is in it - assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds - assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", job.ArgString("b")) - assert.EqualValues(t, 1, job.ArgInt64("a")) - assert.NoError(t, job.ArgError()) - assert.True(t, job.RunAt >= job.EnqueuedAt+300) - assert.True(t, job.RunAt <= job.EnqueuedAt+301) - } + assert.NotNil(t, job) - job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + job, err = enqueuer.EnqueueUniqueAtByKey("wat", runAt-100, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) assert.NoError(t, err) assert.Nil(t, job) - // Get the job score, j := jobOnZset(pool, redisKeyScheduled(ns)) - - assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time - assert.True(t, score <= time.Now().Unix()+301) - + assert.EqualValues(t, runAt, score) assert.Equal(t, "wat", j.Name) - assert.True(t, len(j.ID) > 10) // Something is in it - assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds - assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds - assert.Equal(t, "cool", j.ArgString("b")) - assert.EqualValues(t, 1, j.ArgInt64("a")) - assert.NoError(t, j.ArgError()) assert.True(t, j.Unique) + + job, err = enqueuer.EnqueueUniqueAtByKey("wat", runAt+600, Q{"a": 2, "b": "updated"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.Nil(t, job) // args update will be applied when processed, but not a new schedule + + job, err = enqueuer.EnqueueUniqueAtByKey("wat", runAt+600, Q{"a": 2, "b": "bar"}, Q{"key": "124"}) + assert.NoError(t, err) + assert.NotNil(t, job) } -func TestEnqueueUniqueInByKey_WithMock(t *testing.T) { +func TestEnqueueUniqueAtByKey_WithMock(t *testing.T) { ns := "work" jobName := "test" jobArgs := map[string]interface{}{"arg": "value"} jobKeyMap := map[string]interface{}{"key": "value"} - secondsFromNow := int64(100) - now := time.Now().Unix() - setNowEpochSecondsMock(now) - defer resetNowEpochSecondsMock() + runAt := time.Now().Unix() + 100 ok := "ok" dup := "ok" @@ -1004,76 +924,56 @@ func TestEnqueueUniqueInByKey_WithMock(t *testing.T) { mockLEvalshaErr error mockWait *int64 mockWaitErr error - - expectedError error + expectedError error }{ + {name: "Success without wait", mockLEvalsha: &ok}, + {name: "Duplicate without wait", mockLEvalsha: &dup}, + {name: "Failure without wait", mockLEvalshaErr: errors.New("lpush failure"), expectedError: errors.New("lpush failure")}, { - name: "Success without wait", - mockLEvalsha: &ok, - }, { - name: "Duplicate without wait", - mockLEvalsha: &dup, - }, { - name: "Failure without wait", - mockLEvalshaErr: errors.New("lpush failure"), - expectedError: errors.New("lpush failure"), - }, { - name: "Failure with wait", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &ok, - mockWaitErr: errors.New("wait failure"), - expectedError: errors.New("wait failure"), - }, { - name: "When wait return zero", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &dup, - mockWait: &zero, - expectedError: ErrReplicationFailed, - }, { - name: "When wait return less than MinWaitReplicas", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &ok, - mockWait: &one, - expectedError: ErrReplicationFailed, - }, { - name: "When wait return same as MinWaitReplicas", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &dup, - mockWait: &two, - }, { - name: "When wait return more than MinWaitReplicas", - enqueuerOption: EnqueuerOption{ - MinWaitReplicas: 2, - MaxWaitTimeoutMS: 1000, - }, - mockLEvalsha: &ok, - mockWait: &three, + name: "Failure with wait", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &ok, + mockWaitErr: errors.New("wait failure"), + expectedError: errors.New("wait failure"), + }, + { + name: "When wait return zero", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &dup, + mockWait: &zero, + expectedError: ErrReplicationFailed, + }, + { + name: "When wait return less than MinWaitReplicas", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &ok, + mockWait: &one, + expectedError: ErrReplicationFailed, + }, + { + name: "When wait return same as MinWaitReplicas", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &dup, + mockWait: &two, + }, + { + name: "When wait return more than MinWaitReplicas", + enqueuerOption: EnqueuerOption{MinWaitReplicas: 2, MaxWaitTimeoutMS: 1000}, + mockLEvalsha: &ok, + mockWait: &three, }, } for _, tt := range cases { + uniqueKey := "work:unique:test:{\"key\":\"value\"}\n" t.Run(tt.name, func(t *testing.T) { pool, conn := newMockTestPool(t) enqueuer := NewEnqueuerWithOptions(ns, pool, tt.enqueuerOption) - uniqueKey := `work:unique:test:{"key":"value"} -` if tt.mockLEvalsha != nil { - conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData(), now+secondsFromNow).Expect(*tt.mockLEvalsha) + conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData(), runAt).Expect(*tt.mockLEvalsha) } if tt.mockLEvalshaErr != nil { - conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData(), now+secondsFromNow).ExpectError(tt.mockLEvalshaErr) + conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData(), runAt).ExpectError(tt.mockLEvalshaErr) } if tt.mockWait != nil { conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).Expect(*tt.mockWait) @@ -1083,7 +983,7 @@ func TestEnqueueUniqueInByKey_WithMock(t *testing.T) { } conn.Command("SADD", "work:known_jobs", jobName).Expect(1) - _, err := enqueuer.EnqueueUniqueInByKey(jobName, secondsFromNow, jobArgs, jobKeyMap) + _, err := enqueuer.EnqueueUniqueAtByKey(jobName, runAt, jobArgs, jobKeyMap) assert.Equal(t, tt.expectedError, err) }) } From 7425107392f59ba21f7a75126e180a2b89049a22 Mon Sep 17 00:00:00 2001 From: email Date: Wed, 5 Nov 2025 08:33:34 +0530 Subject: [PATCH 07/11] removed before time check --- enqueue.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/enqueue.go b/enqueue.go index 099c1a31..03fd1417 100644 --- a/enqueue.go +++ b/enqueue.go @@ -123,9 +123,6 @@ func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string EnqueuedAt: nowEpochSeconds(), Args: args, } - if epochSeconds < job.EnqueuedAt { - epochSeconds = job.EnqueuedAt + 1 - } rawJSON, err := job.serialize() if err != nil { return nil, err @@ -217,9 +214,6 @@ func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args if err != nil { return nil, err } - if epochSeconds < job.EnqueuedAt { - epochSeconds = job.EnqueuedAt + 1 - } scheduledJob := &ScheduledJob{ RunAt: epochSeconds, From 12643fda449ed7fbb887cbc531c529713f0ddb0d Mon Sep 17 00:00:00 2001 From: email Date: Wed, 5 Nov 2025 08:40:03 +0530 Subject: [PATCH 08/11] refactored code --- enqueue.go | 49 ++----------------------------------------------- 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/enqueue.go b/enqueue.go index 03fd1417..abf4756c 100644 --- a/enqueue.go +++ b/enqueue.go @@ -82,38 +82,7 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e // EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds. func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) { - job := &Job{ - Name: jobName, - ID: makeIdentifier(), - EnqueuedAt: nowEpochSeconds(), - Args: args, - } - - rawJSON, err := job.serialize() - if err != nil { - return nil, err - } - - conn := e.Pool.Get() - defer conn.Close() - - secondsToAdd := epochAfterSeconds(secondsFromNow) - - scheduledJob := &ScheduledJob{ - RunAt: secondsToAdd, - Job: job, - } - - _, err = e.redisDoHelper(conn, "ZADD", redisKeyScheduled(e.Namespace), scheduledJob.RunAt, rawJSON) - if err != nil { - return nil, err - } - - if err := e.addToKnownJobs(conn, jobName); err != nil { - return scheduledJob, err - } - - return scheduledJob, nil + return e.EnqueueAt(jobName, epochAfterSeconds(secondsFromNow), args) } func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) { @@ -186,21 +155,7 @@ func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{ // EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs. // Subsequent calls with same key will update arguments func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) { - enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap) - if err != nil { - return nil, err - } - - scheduledJob := &ScheduledJob{ - RunAt: epochAfterSeconds(secondsFromNow), - Job: job, - } - - res, err := enqueue(&scheduledJob.RunAt) - if res == "ok" && err == nil { - return scheduledJob, nil - } - return nil, err + return e.EnqueueUniqueAtByKey(jobName, epochAfterSeconds(secondsFromNow), args, keyMap) } // EnqueueUniqueAt enqueues a unique job at the specified absolute epoch time in seconds. See EnqueueUnique for semantics of unique jobs. From e43ffec511321dd7f41ef6725e54f36def2202ef Mon Sep 17 00:00:00 2001 From: email Date: Wed, 5 Nov 2025 10:57:59 +0530 Subject: [PATCH 09/11] added back missing tests --- enqueue_test.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/enqueue_test.go b/enqueue_test.go index 2730b4ce..427241ff 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -877,6 +877,151 @@ func TestEnqueueUniqueAt_WithMock(t *testing.T) { } } +func TestEnqueueUniqueInByKey(t *testing.T) { + pool := newTestPool(t) + ns := "work" + cleanKeyspace(ns, pool) + enqueuer := NewEnqueuer(ns, pool) + + // Enqueue two unique jobs -- ensure one job sticks. + job, err := enqueuer.EnqueueUniqueInByKey("wat", 300, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + assert.NoError(t, err) + if assert.NotNil(t, job) { + assert.Equal(t, "wat", job.Name) + assert.True(t, len(job.ID) > 10) // Something is in it + assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds + assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds + assert.Equal(t, "cool", job.ArgString("b")) + assert.EqualValues(t, 1, job.ArgInt64("a")) + assert.NoError(t, job.ArgError()) + assert.True(t, job.RunAt >= job.EnqueuedAt+300) + assert.True(t, job.RunAt <= job.EnqueuedAt+301) + } + + job, err = enqueuer.EnqueueUniqueInByKey("wat", 10, Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.Nil(t, job) + + // Get the job + score, j := jobOnZset(pool, redisKeyScheduled(ns)) + + assert.True(t, score > time.Now().Unix()+290) // We don't want to overwrite the time + assert.True(t, score <= time.Now().Unix()+301) + + assert.Equal(t, "wat", j.Name) + assert.True(t, len(j.ID) > 10) // Something is in it + assert.True(t, j.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds + assert.True(t, j.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds + assert.Equal(t, "cool", j.ArgString("b")) + assert.EqualValues(t, 1, j.ArgInt64("a")) + assert.NoError(t, j.ArgError()) + assert.True(t, j.Unique) +} + +func TestEnqueueUniqueInByKey_WithMock(t *testing.T) { + ns := "work" + jobName := "test" + jobArgs := map[string]interface{}{"arg": "value"} + jobKeyMap := map[string]interface{}{"key": "value"} + secondsFromNow := int64(100) + now := time.Now().Unix() + setNowEpochSecondsMock(now) + defer resetNowEpochSecondsMock() + + ok := "ok" + dup := "ok" + var cases = []struct { + name string + enqueuerOption EnqueuerOption + mockLEvalsha *string + mockLEvalshaErr error + mockWait *int64 + mockWaitErr error + + expectedError error + }{ + { + name: "Success without wait", + mockLEvalsha: &ok, + }, { + name: "Duplicate without wait", + mockLEvalsha: &dup, + }, { + name: "Failure without wait", + mockLEvalshaErr: errors.New("lpush failure"), + expectedError: errors.New("lpush failure"), + }, { + name: "Failure with wait", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &ok, + mockWaitErr: errors.New("wait failure"), + expectedError: errors.New("wait failure"), + }, { + name: "When wait return zero", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &dup, + mockWait: &zero, + expectedError: ErrReplicationFailed, + }, { + name: "When wait return less than MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &ok, + mockWait: &one, + expectedError: ErrReplicationFailed, + }, { + name: "When wait return same as MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &dup, + mockWait: &two, + }, { + name: "When wait return more than MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &ok, + mockWait: &three, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + pool, conn := newMockTestPool(t) + enqueuer := NewEnqueuerWithOptions(ns, pool, tt.enqueuerOption) + uniqueKey := `work:unique:test:{"key":"value"} +` + if tt.mockLEvalsha != nil { + conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData(), now+secondsFromNow).Expect(*tt.mockLEvalsha) + } + if tt.mockLEvalshaErr != nil { + conn.Command("EVALSHA", "7b32230026d2ba0d5aa0b5451237f6c086e3072c", 2, "work:scheduled", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData(), now+secondsFromNow).ExpectError(tt.mockLEvalshaErr) + } + if tt.mockWait != nil { + conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).Expect(*tt.mockWait) + } + if tt.mockWaitErr != nil { + conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).ExpectError(tt.mockWaitErr) + } + conn.Command("SADD", "work:known_jobs", jobName).Expect(1) + + _, err := enqueuer.EnqueueUniqueInByKey(jobName, secondsFromNow, jobArgs, jobKeyMap) + assert.Equal(t, tt.expectedError, err) + }) + } +} + func TestEnqueueUniqueAtByKey(t *testing.T) { pool := newTestPool(t) ns := "work" From 098ab88584e6dbd3ed59c872d42d203586e4521d Mon Sep 17 00:00:00 2001 From: email Date: Wed, 5 Nov 2025 11:26:04 +0530 Subject: [PATCH 10/11] added back TestEnqueueUniqueByKey with seeding the TestPrioritySampler test --- enqueue_test.go | 84 ++++++++++++++++++++++++++++++++++++++++ priority_sampler_test.go | 2 + 2 files changed, 86 insertions(+) diff --git a/enqueue_test.go b/enqueue_test.go index 427241ff..5ae2f67a 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -760,6 +760,90 @@ func TestEnqueueUniqueIn_WithMock(t *testing.T) { } } +func TestEnqueueUniqueByKey(t *testing.T) { + var arg3 string + var arg4 string + + pool := newTestPool(t) + ns := "work" + cleanKeyspace(ns, pool) + enqueuer := NewEnqueuer(ns, pool) + var mutex = &sync.Mutex{} + job, err := enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "foo"}, Q{"key": "123"}) + assert.NoError(t, err) + if assert.NotNil(t, job) { + assert.Equal(t, "wat", job.Name) + assert.True(t, len(job.ID) > 10) // Something is in it + assert.True(t, job.EnqueuedAt > (time.Now().Unix()-10)) // Within 10 seconds + assert.True(t, job.EnqueuedAt < (time.Now().Unix()+10)) // Within 10 seconds + assert.Equal(t, "foo", job.ArgString("b")) + assert.EqualValues(t, 3, job.ArgInt64("a")) + assert.NoError(t, job.ArgError()) + } + + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 3, "b": "bar"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.Nil(t, job) + + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 4, "b": "baz"}, Q{"key": "124"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "125"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + // Process the queues. Ensure the right number of jobs were processed + var wats, taws int64 + wp := NewWorkerPool(TestContext{}, 3, ns, pool) + wp.JobWithOptions("wat", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + mutex.Lock() + argA := job.Args["a"].(float64) + argB := job.Args["b"].(string) + if argA == 3 { + arg3 = argB + } + if argA == 4 { + arg4 = argB + } + + wats++ + mutex.Unlock() + return nil + }) + wp.JobWithOptions("taw", JobOptions{Priority: 1, MaxFails: 1}, func(job *Job) error { + mutex.Lock() + taws++ + mutex.Unlock() + return fmt.Errorf("ohno") + }) + wp.Start() + wp.Drain() + wp.Stop() + + assert.EqualValues(t, 2, wats) + assert.EqualValues(t, 1, taws) + + // Check that arguments got updated to new value + assert.EqualValues(t, "bar", arg3) + assert.EqualValues(t, "baz", arg4) + + // Enqueue again. Ensure we can. + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "cool"}, Q{"key": "123"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + job, err = enqueuer.EnqueueUniqueByKey("wat", Q{"a": 1, "b": "coolio"}, Q{"key": "124"}) + assert.NoError(t, err) + assert.NotNil(t, job) + + // Even though taw resulted in an error, we should still be able to re-queue it. + // This could result in multiple taws enqueued at the same time in a production system. + job, err = enqueuer.EnqueueUniqueByKey("taw", nil, Q{"key": "123"}) + assert.NoError(t, err) + assert.NotNil(t, job) +} + func TestEnqueueUniqueAt(t *testing.T) { pool := newTestPool(t) ns := "work" diff --git a/priority_sampler_test.go b/priority_sampler_test.go index 6117601c..2d6ecc36 100644 --- a/priority_sampler_test.go +++ b/priority_sampler_test.go @@ -2,12 +2,14 @@ package work import ( "fmt" + "math/rand" "testing" "github.com/stretchr/testify/assert" ) func TestPrioritySampler(t *testing.T) { + rand.Seed(1) ps := prioritySampler{} ps.add(5, "jobs.5", "jobsinprog.5", "jobspaused.5", "jobslock.5", "jobslockinfo.5", "jobsconcurrency.5") From 23222a40184e0b4275ed33296e8570e164f81a27 Mon Sep 17 00:00:00 2001 From: "rajat.sahu" Date: Wed, 5 Nov 2025 12:08:31 +0530 Subject: [PATCH 11/11] added back TestEnqueueUniqueByKey_WithMock --- enqueue_test.go | 100 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/enqueue_test.go b/enqueue_test.go index 5ae2f67a..02d247ac 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -844,6 +844,106 @@ func TestEnqueueUniqueByKey(t *testing.T) { assert.NotNil(t, job) } +func TestEnqueueUniqueByKey_WithMock(t *testing.T) { + ns := "work" + jobName := "test" + jobArgs := map[string]interface{}{"arg": "value"} + jobKeyMap := map[string]interface{}{"key": "value"} + + ok := "ok" + dup := "ok" + var cases = []struct { + name string + enqueuerOption EnqueuerOption + mockLEvalsha *string + mockLEvalshaErr error + mockWait *int64 + mockWaitErr error + + expectedError error + }{ + { + name: "Success without wait", + mockLEvalsha: &ok, + }, { + name: "Duplicate without wait", + mockLEvalsha: &dup, + }, { + name: "Failure without wait", + mockLEvalshaErr: errors.New("lpush failure"), + expectedError: errors.New("lpush failure"), + }, { + name: "Failure with wait", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &ok, + mockWaitErr: errors.New("wait failure"), + expectedError: errors.New("wait failure"), + }, { + name: "When wait return zero", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &dup, + mockWait: &zero, + expectedError: ErrReplicationFailed, + }, { + name: "When wait return less than MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &ok, + mockWait: &one, + expectedError: ErrReplicationFailed, + }, { + name: "When wait return same as MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &dup, + mockWait: &two, + }, { + name: "When wait return more than MinWaitReplicas", + enqueuerOption: EnqueuerOption{ + MinWaitReplicas: 2, + MaxWaitTimeoutMS: 1000, + }, + mockLEvalsha: &ok, + mockWait: &three, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + pool, conn := newMockTestPool(t) + enqueuer := NewEnqueuerWithOptions(ns, pool, tt.enqueuerOption) + uniqueKey := `work:unique:test:{"key":"value"} +` + if tt.mockLEvalsha != nil { + conn.Command("EVALSHA", "f38b6aef74017e799294b1ec4b74eb707deb0c17", 2, "work:jobs:test", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData()).Expect(*tt.mockLEvalsha) + } + if tt.mockLEvalshaErr != nil { + conn.Command("EVALSHA", "f38b6aef74017e799294b1ec4b74eb707deb0c17", 2, "work:jobs:test", uniqueKey, redigomock.NewAnyData(), redigomock.NewAnyData()).ExpectError(tt.mockLEvalshaErr) + } + if tt.mockWait != nil { + conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).Expect(*tt.mockWait) + } + if tt.mockWaitErr != nil { + conn.Command("WAIT", tt.enqueuerOption.MinWaitReplicas, tt.enqueuerOption.MaxWaitTimeoutMS).ExpectError(tt.mockWaitErr) + } + conn.Command("SADD", "work:known_jobs", jobName).Expect(1) + + _, err := enqueuer.EnqueueUniqueByKey(jobName, jobArgs, jobKeyMap) + assert.Equal(t, tt.expectedError, err) + }) + } +} + func TestEnqueueUniqueAt(t *testing.T) { pool := newTestPool(t) ns := "work"