Skip to content
Merged
19 changes: 16 additions & 3 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ func (e *Enqueuer) Enqueue(jobName string, args map[string]interface{}) (*Job, e

// EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.
func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[string]interface{}) (*ScheduledJob, error) {
return e.EnqueueAt(jobName, epochAfterSeconds(secondsFromNow), args)
}

func (e *Enqueuer) EnqueueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) {
job := &Job{
Name: jobName,
ID: makeIdentifier(),
EnqueuedAt: nowEpochSeconds(),
Args: args,
}

rawJSON, err := job.serialize()
if err != nil {
return nil, err
Expand All @@ -98,7 +101,7 @@ func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, args map[stri
defer conn.Close()

scheduledJob := &ScheduledJob{
RunAt: nowEpochSeconds() + secondsFromNow,
RunAt: epochSeconds,
Job: job,
}

Expand Down Expand Up @@ -152,13 +155,23 @@ func (e *Enqueuer) EnqueueUniqueByKey(jobName string, args map[string]interface{
// EnqueueUniqueInByKey enqueues a job in the scheduled job queue that is unique on specified key for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.
// Subsequent calls with same key will update arguments
func (e *Enqueuer) EnqueueUniqueInByKey(jobName string, secondsFromNow int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) {
return e.EnqueueUniqueAtByKey(jobName, epochAfterSeconds(secondsFromNow), args, keyMap)
}

// EnqueueUniqueAt enqueues a unique job at the specified absolute epoch time in seconds. See EnqueueUnique for semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueAt(jobName string, epochSeconds int64, args map[string]interface{}) (*ScheduledJob, error) {
return e.EnqueueUniqueAtByKey(jobName, epochSeconds, args, nil)
}

// EnqueueUniqueAtByKey enqueues a job unique on specified key at the specified absolute epoch time in seconds, updating arguments. See EnqueueUnique for semantics of unique jobs.
func (e *Enqueuer) EnqueueUniqueAtByKey(jobName string, epochSeconds int64, args map[string]interface{}, keyMap map[string]interface{}) (*ScheduledJob, error) {
enqueue, job, err := e.uniqueJobHelper(jobName, args, keyMap)
if err != nil {
return nil, err
}

scheduledJob := &ScheduledJob{
RunAt: nowEpochSeconds() + secondsFromNow,
RunAt: epochSeconds,
Job: job,
}

Expand Down
Loading
Loading