Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,20 @@ func (opt *FindOptions) Validate() error {
type BulkJobFinder interface {
BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error)
}

// PromoteOptions specifies how a job is promoted in the queue.
type PromoteOptions struct {
Namespace string
QueueID string
}

// Validate validates PromoteOptions.
func (opt *PromoteOptions) Validate() error {
if opt.Namespace == "" {
return ErrEmptyNamespace
}
if opt.QueueID == "" {
return ErrEmptyQueueID
}
return nil
}
42 changes: 41 additions & 1 deletion redis_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,22 @@ type redisQueue struct {
metricScript *redis.Script
}

// JobPromoter can update a job's score in the queue to make it immediately
// eligible for dequeuing without re-enqueuing the entire job.
type JobPromoter interface {
// PromoteJob updates the job's score to time.Now(). Only affects jobs
// that exist and have scores <= now (won't demote jobs being processed).
PromoteJob(jobID string, opt *PromoteOptions) error
}

// RedisQueue implements Queue with other additional capabilities
type RedisQueue interface {
Queue
BulkEnqueuer
BulkDequeuer
BulkJobFinder
MetricsExporter
JobPromoter
}

// NewRedisQueue creates a new queue stored in redis.
Expand All @@ -64,7 +73,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue {
table.insert(zadd_args, at)
table.insert(zadd_args, job_key)
end
return redis.call("zadd", queue_key, unpack(zadd_args))
return redis.call("zadd", queue_key, "gt", unpack(zadd_args))
`)

dequeueScript := redis.NewScript(`
Expand Down Expand Up @@ -342,6 +351,37 @@ func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*J
return jobs, nil
}

func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error {
err := opt.Validate()
if err != nil {
return err
}

queueKey := opt.Namespace + ":queue:" + opt.QueueID
jobKey := opt.Namespace + ":job:" + jobID

// ZADD with both XX and GT flags:
// - XX: Only update existing members (don't resurrect completed jobs)
// - GT: Only update if new score > current score (don't demote processing jobs)
//
// Safety guarantees:
// 1. If job was completed and removed: XX prevents re-adding it
// 2. If job is being processed (score = now + invisibleSec): GT prevents demotion
// 3. If job is pending (score <= now): Both flags allow promotion
return q.client.ZAddArgs(
context.Background(),
queueKey,
redis.ZAddArgs{
XX: true, // Only update existing
GT: true, // Only if new score is greater
Members: []redis.Z{{
Score: float64(time.Now().Unix()),
Member: jobKey,
}},
},
).Err()
}

func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) {
err := opt.Validate()
if err != nil {
Expand Down
105 changes: 105 additions & 0 deletions redis_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,108 @@ func TestRedisQueueBulkEnqueue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(jobCount), count)
}

func TestRedisQueuePromoteJob(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
q := NewRedisQueue(client)

// Enqueue two jobs with old timestamps (in the past)
job1 := NewJob()
job1.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago
job2 := NewJob()
job2.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago

opts := &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
}

err := q.Enqueue(job1, opts)
require.NoError(t, err)
err = q.Enqueue(job2, opts)
require.NoError(t, err)

// Check initial score of job2 (should be old timestamp)
queueKey := "{ns1}:queue:q1"
jobKey := fmt.Sprintf("{ns1}:job:%s", job2.ID)
initialScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result()
require.NoError(t, err)
require.Equal(t, float64(job2.EnqueuedAt.Unix()), initialScore)

// Promote job2
beforePromote := time.Now().Unix()
err = q.PromoteJob(job2.ID, &PromoteOptions{
Namespace: opts.Namespace,
QueueID: opts.QueueID,
})
require.NoError(t, err)
afterPromote := time.Now().Unix()

// Check that job2's score was updated to now
newScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result()
require.NoError(t, err)
require.GreaterOrEqual(t, int64(newScore), beforePromote)
require.LessOrEqual(t, int64(newScore), afterPromote)

// Promote non-existent job should not error (XX flag prevents adding)
err = q.PromoteJob("non-existent-job", &PromoteOptions{
Namespace: opts.Namespace,
QueueID: opts.QueueID,
})
require.NoError(t, err)

// Verify non-existent job was not added to queue
exists, err := client.ZScore(context.Background(), queueKey, "{ns1}:job:non-existent-job").Result()
require.Error(t, err) // redis.Nil error expected
require.Equal(t, float64(0), exists)
}

func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) {
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
q := NewRedisQueue(client)

// Enqueue a job
job := NewJob()
job.EnqueuedAt = time.Now()

opts := &EnqueueOptions{
Namespace: "{ns1}",
QueueID: "q1",
}

err := q.Enqueue(job, opts)
require.NoError(t, err)

// Dequeue the job (this sets score to now + invisibleSec)
dequeueOpts := &DequeueOptions{
Namespace: "{ns1}",
QueueID: "q1",
At: time.Now(),
InvisibleSec: 60, // 60 seconds
}
dequeuedJob, err := q.Dequeue(dequeueOpts)
require.NoError(t, err)
require.Equal(t, job.ID, dequeuedJob.ID)

// Check that job's score is now + invisibleSec
queueKey := "{ns1}:queue:q1"
jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID)
scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result()
require.NoError(t, err)

// Try to promote the job (should not demote it because of GT flag)
err = q.PromoteJob(job.ID, &PromoteOptions{
Namespace: opts.Namespace,
QueueID: opts.QueueID,
})
require.NoError(t, err)

// Verify score hasn't changed (GT flag prevented demotion)
scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result()
require.NoError(t, err)
require.Equal(t, scoreAfterDequeue, scoreAfterPromote)
}
4 changes: 2 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFun
}

handle := func(job *Job, o *DequeueOptions) error {
ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime)
defer cancel()
//ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime)
//defer cancel()
return h(ctx, job, o)
}
for _, mw := range opt.HandleMiddleware {
Expand Down