From d1d1b2487d7a0d67274175c342a745c850caf467 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 5 Sep 2025 16:49:03 -0700 Subject: [PATCH] Continue to implement callback processor --- .gitignore | 4 +- api.yaml | 13 +- docker/dev-all.yaml | 2 - server/TODO.md | 28 ++ server/databases/db_unified_interfaces.go | 42 +-- server/databases/db_unified_models.go | 1 + server/engine/callback_processor.go | 325 ++++++++++++++++++++++ server/engine/interfaces.go | 3 +- server/log/tag/tags.go | 45 +-- 9 files changed, 396 insertions(+), 67 deletions(-) create mode 100644 server/TODO.md create mode 100644 server/engine/callback_processor.go diff --git a/.gitignore b/.gitignore index aaadf73..ed20272 100644 --- a/.gitignore +++ b/.gitignore @@ -28,5 +28,5 @@ go.work.sum .env # Editor/IDE -# .idea/ -# .vscode/ +.idea/ +.vscode/ diff --git a/api.yaml b/api.yaml index fd88b59..5e1c7d5 100644 --- a/api.yaml +++ b/api.yaml @@ -223,7 +223,6 @@ components: type: string description: Unique identifier for the timer within the namespace (used for deduplication) example: "user-notification-123" - maxLength: 255 executeAt: type: string format: date-time @@ -234,7 +233,6 @@ components: format: uri description: HTTP URL to call when the timer executes, returning 200 with CallbackResponse means success, 4xx means invalid timer and no retry, otherwise will be retried. example: "https://api.example.com/webhooks/timer" - maxLength: 2048 payload: type: object description: Custom JSON payload to include in the callback @@ -258,7 +256,9 @@ components: nextExecuteAt: type: string format: date-time - description: ISO 8601 timestamp when the timer should execute next time. Only valid when ok is false. + description: | + ISO 8601 timestamp when the timer should execute next time. Only valid when ok is false. + The retry policy will be reset to the initial attempt(since executeAt is changed) example: "2024-12-20T15:30:00Z" UpdateTimerRequest: @@ -278,14 +278,13 @@ components: executeAt: type: string format: date-time - description: New execution time for the timer + description: New execution time for the timer. If provided, the retry policy will be reset to the initial attempt. example: "2024-12-21T15:30:00Z" callbackUrl: type: string format: uri description: New callback URL, returning 200 with CallbackResponse means success, 4xx means invalid timer and no retry, otherwise will be retried. example: "https://api.example.com/webhooks/timer-updated" - maxLength: 2048 payload: type: object description: New payload data @@ -317,7 +316,7 @@ components: executeAt: type: string format: date-time - description: When the timer is scheduled to execute + description: When the timer is scheduled to execute as the first attempt. example: "2024-12-20T15:30:00Z" callbackUrl: type: string @@ -349,7 +348,7 @@ components: executedAt: type: string format: date-time - description: When the timer was executed (if applicable) + description: When the timer was executed and the callback failed (if applicable) example: "2024-12-20T15:30:05Z" RetryPolicy: diff --git a/docker/dev-all.yaml b/docker/dev-all.yaml index 504f846..f197999 100644 --- a/docker/dev-all.yaml +++ b/docker/dev-all.yaml @@ -1,5 +1,3 @@ -version: '3.8' - services: # Cassandra Database cassandra: diff --git a/server/TODO.md b/server/TODO.md new file mode 100644 index 0000000..df373bf --- /dev/null +++ b/server/TODO.md @@ -0,0 +1,28 @@ +In late august, I accidently lost all the process that I have made in this project :( .... + +which I have worked really hard for a few weeks, but I forgot to push to Github. + +Now I have to rely on my poor memory for what I have done during that time. I am listing them here as TODOs by go over every file: + +1. API schema + 1.1 RetryPolicy need to change to use integers instead of strings + 1.2 RetryPolicy in DbTimer need to use the actual type + 1.3 Use go generator instead of go-gin-server + + +2. DB + 2.1 remove RangeDeleteWithBatchInsertTxn + 2.2 implment shardId in DB layer + 2.3 fix behavior of RangeGetTimersRequest to be exclusive + inclusive + 2.4 Implement UpdadateTimerByUUID API for updating the next executeAt for nextExecuteAt and backoff retry case + 2.5 Add a flag to indicate the timer callback has failed + + + +3. Engine + 3.1 implement callback processor + 3.2 add local backoff retry + 3.3. implement timer queue + +4. Membership + 4.1 reuse what have done in xcherry diff --git a/server/databases/db_unified_interfaces.go b/server/databases/db_unified_interfaces.go index 1e78d6e..78131fe 100644 --- a/server/databases/db_unified_interfaces.go +++ b/server/databases/db_unified_interfaces.go @@ -24,11 +24,10 @@ type ( metadata ShardMetadata, ) (err *DbError) - CreateTimer( + GetTimer( ctx context.Context, - shardId int, shardVersion int64, namespace string, - timer *DbTimer, - ) (err *DbError) + shardId int, namespace string, timerId string, + ) (timer *DbTimer, err *DbError) CreateTimerNoLock( ctx context.Context, @@ -36,6 +35,17 @@ type ( timer *DbTimer, ) (err *DbError) + UpdateTimerNoLock( + ctx context.Context, + shardId int, namespace string, + request *UpdateDbTimerRequest, + ) (err *DbError) + + DeleteTimerNoLock( + ctx context.Context, + shardId int, namespace string, timerId string, + ) *DbError + RangeGetTimers( ctx context.Context, shardId int, @@ -43,6 +53,7 @@ type ( ) (*RangeGetTimersResponse, *DbError) // RangeDeleteWithBatchInsertTxn is a transaction that deletes timers in a range and inserts new timers + // TODO: remove this RangeDeleteWithBatchInsertTxn( ctx context.Context, shardId int, shardVersion int64, @@ -50,7 +61,7 @@ type ( TimersToInsert []*DbTimer, ) (*RangeDeleteTimersResponse, *DbError) - // RangeDeleteWithLimit is a non-transactional operation that deletes timers in a range + // RangeDeleteWithLimit is a non-transactional operation that deletes timers in a range RangeDeleteWithLimit( ctx context.Context, shardId int, @@ -58,33 +69,24 @@ type ( limit int, // Note that some distributed databases like Cassandra/MongoDB/DynamoDB don't support multiple range queries with LIMIT, so it may be ignored ) (*RangeDeleteTimersResponse, *DbError) + // TODO: only used in speical case (time skew) UpdateTimer( ctx context.Context, shardId int, shardVersion int64, namespace string, request *UpdateDbTimerRequest, ) (err *DbError) - - GetTimer( - ctx context.Context, - shardId int, namespace string, timerId string, - ) (timer *DbTimer, err *DbError) - + // TODO: only used in speical case (time skew) DeleteTimer( ctx context.Context, shardId int, shardVersion int64, namespace string, timerId string, ) *DbError - UpdateTimerNoLock( + // TODO: only used in speical case (time skew) + CreateTimer( ctx context.Context, - shardId int, namespace string, - request *UpdateDbTimerRequest, + shardId int, shardVersion int64, namespace string, + timer *DbTimer, ) (err *DbError) - - - DeleteTimerNoLock( - ctx context.Context, - shardId int, namespace string, timerId string, - ) *DbError } ) diff --git a/server/databases/db_unified_models.go b/server/databases/db_unified_models.go index aaeccfb..612e91a 100644 --- a/server/databases/db_unified_models.go +++ b/server/databases/db_unified_models.go @@ -45,6 +45,7 @@ type ( // DbTimer is the timer model stored in DB DbTimer struct { + ShardId int // TODO: implment this in DB layer // Unique identifier for the timer Id string diff --git a/server/engine/callback_processor.go b/server/engine/callback_processor.go new file mode 100644 index 0000000..8872bfa --- /dev/null +++ b/server/engine/callback_processor.go @@ -0,0 +1,325 @@ +package engine + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "github.com/iworkflowio/durable-timer/config" + "github.com/iworkflowio/durable-timer/databases" + "github.com/iworkflowio/durable-timer/engine/backoff" + "github.com/iworkflowio/durable-timer/log" + "github.com/iworkflowio/durable-timer/log/tag" +) + +type callbackProcessorImpl struct { + config *config.Config + logger log.Logger + processingChannel <-chan *databases.DbTimer + processingCompletedChannel chan<- *databases.DbTimer + timerStore databases.TimerStore + + // Worker pool management + ctx context.Context + cancel context.CancelFunc + workerWg sync.WaitGroup + httpClient *http.Client +} + +func newCallbackProcessorImpl( + config *config.Config, logger log.Logger, + processingChannel <-chan *databases.DbTimer, + processingCompletedChannel chan<- *databases.DbTimer, +) (CallbackProcessor, error) { + ctx, cancel := context.WithCancel(context.Background()) + + // Create HTTP client with reasonable defaults + // TODO: make this configurable + httpClient := &http.Client{ + Timeout: time.Duration(config.Engine.MaxCallbackTimeoutSeconds) * time.Second, + Transport: &http.Transport{ + MaxIdleConns: config.Engine.CallbackProcessorConfig.Concurrency, + MaxIdleConnsPerHost: config.Engine.CallbackProcessorConfig.Concurrency, + IdleConnTimeout: 90 * time.Second, + }, + } + + return &callbackProcessorImpl{ + config: config, + logger: logger, + processingChannel: processingChannel, + processingCompletedChannel: processingCompletedChannel, + ctx: ctx, + cancel: cancel, + httpClient: httpClient, + }, nil +} + +// Start implements CallbackProcessor. +func (c *callbackProcessorImpl) Start() error { + concurrency := c.config.Engine.CallbackProcessorConfig.Concurrency + c.logger.Info("Starting callback processor, concurrency ", tag.Value(concurrency)) + + // Start worker goroutines + for i := 0; i < concurrency; i++ { + c.workerWg.Add(1) + go c.worker(i) + } + + return nil +} + +// Close implements CallbackProcessor. +func (c *callbackProcessorImpl) Close() error { + c.logger.Info("Shutting down callback processor") + + // Cancel context to signal workers to stop + c.cancel() + + // Wait for all workers to finish with timeout + done := make(chan struct{}) + go func() { + c.workerWg.Wait() + close(done) + }() + + select { + case <-done: + c.logger.Info("Callback processor shutdown completed") + case <-time.After(c.config.Engine.EngineShutdownTimeout): + c.logger.Warn("Callback processor shutdown timed out") + } + + return nil +} + +// worker processes timers from the processing channel +func (c *callbackProcessorImpl) worker(workerID int) { + defer c.workerWg.Done() + + c.logger.Debug("Callback worker started ", tag.Value(workerID)) + + for { + select { + case <-c.ctx.Done(): + c.logger.Debug("Callback worker stopping") + return + case timer, ok := <-c.processingChannel: + if !ok { + c.logger.Debug("Processing channel closed, worker stopping") + return + } + + c.processTimer(c.logger, timer) + } + } +} + +// processTimer handles a single timer callback +func (c *callbackProcessorImpl) processTimer(logger log.Logger, timer *databases.DbTimer) { + logger = logger.WithTags( + tag.TimerId(timer.Id), + tag.Namespace(timer.Namespace), + tag.Value(timer.CallbackUrl), + ) + + logger.Info("Processing timer callback") + + // Update timer with execution time and attempt count + timer.ExecutedAt = time.Now() + timer.Attempts++ + + // Execute the callback with retry logic. + // There are four cases: + // 1. (happy case)success=true and nextExecuteAt is zero, only send the timer to the processing completed channel + // 2. (reschedule case)success=true, but nextExecuteAt is not zero (must be after now), update the nextExecuteAt and reset attempts + // 3. (failure and retry)success=false, but nextExecuteAt is not zero, update the nextExecuteAt and increase attempts + // 4. (failure and max out of retries) success=false, but nextExecuteAt is zero(must be after now), update the nextExecuteAt to infinity and increase attempts + success, nextExecuteAt := c.executeCallback(logger, timer) + + if success { + if nextExecuteAt.IsZero() { + // case 1, noop here + } else { + // case 2 + // TODO: use UpdadateExecuteAt API to update the nextExecuteAt + err := c.timerStore.UpdateTimerNoLock(c.ctx, timer.ShardId, timer.Namespace, &databases.UpdateDbTimerRequest{ + TimerId: timer.Id, + ExecuteAt: nextExecuteAt, + // TODO Attempts: 0, + // lastExecutedAt: time.Now(), + }) + if err != nil { + panic("TODO: use local backoff retry to never reach here") + } + } + } else { + + if !nextExecuteAt.IsZero() { + // case 4 + // TODO: configurable + infiniteTime := time.Now().Add(time.Hour * 24 * 365) + nextExecuteAt = infiniteTime + } + err := c.timerStore.UpdateTimerNoLock(c.ctx, timer.ShardId, timer.Namespace, &databases.UpdateDbTimerRequest{ + TimerId: timer.Id, + ExecuteAt: nextExecuteAt, + // TODO Attempts: timer.Attempts +1, + // lastExecutedAt: time.Now(), + }) + if err != nil { + panic("TODO: use local backoff retry to never reach here") + } + } + + c.processingCompletedChannel <- timer + // TODO: handle the case where the channel is closed + // maybe making sure always closing the channels lastly. +} + +// executeCallback executes the HTTP callback with retry logic +func (c *callbackProcessorImpl) executeCallback(logger log.Logger, timer *databases.DbTimer) (success bool, nextExecuteAt time.Time) { + // Create retry policy from timer configuration + retryPolicy := c.createRetryPolicy(timer) + + // Create retrier + retrier := backoff.NewRetrier(retryPolicy, nil) + + var lastErr error + for { + // Execute the HTTP callback + success, nextExecuteAt, err := c.makeHTTPCallback(timer) + if err == nil { + if success { + return true, time.Time{} + } + // Callback returned success=false, check if we should reschedule + if !nextExecuteAt.IsZero() { + return false, nextExecuteAt + } + // Callback failed but we can retry + lastErr = fmt.Errorf("callback returned ok=false") + } else { + lastErr = err + logger.Warn("HTTP callback failed", + tag.Error(err)) + } + + // Check if error is retryable + if !c.isRetryableError(err) { + logger.Warn("Non-retryable error encountered", + tag.Error(err)) + return false, time.Time{} + } + + // Check if we should retry + nextBackoff := retrier.NextBackOff() + if nextBackoff < 0 { + logger.Warn("Max retries exceeded or retry policy expired", + tag.Error(lastErr)) + return false, time.Time{} + } + + // Sleep before retry + time.Sleep(nextBackoff) + } +} + +// makeHTTPCallback makes the actual HTTP request to the callback URL +func (c *callbackProcessorImpl) makeHTTPCallback(timer *databases.DbTimer) (success bool, nextExecuteAt time.Time, err error) { + // Prepare the request payload + payload := map[string]interface{}{ + "timerId": timer.Id, + "namespace": timer.Namespace, + "executeAt": timer.ExecuteAt, + "payload": timer.Payload, + "attempts": timer.Attempts, + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return false, time.Time{}, fmt.Errorf("failed to marshal payload: %w", err) + } + + // Create HTTP request with timeout + ctx, cancel := context.WithTimeout(c.ctx, time.Duration(timer.CallbackTimeoutSeconds)*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "POST", timer.CallbackUrl, + strings.NewReader(string(jsonData))) + if err != nil { + return false, time.Time{}, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "durable-timer/1.0") + + // Make the HTTP request + resp, err := c.httpClient.Do(req) + if err != nil { + return false, time.Time{}, fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + // Parse response based on status code + switch resp.StatusCode { + case http.StatusOK: + // Parse callback response + var callbackResp struct { + Ok bool `json:"ok"` + NextExecuteAt time.Time `json:"nextExecuteAt,omitempty"` + } + if err := json.NewDecoder(resp.Body).Decode(&callbackResp); err != nil { + return false, time.Time{}, fmt.Errorf("failed to parse callback response: %w", err) + } + + if callbackResp.Ok { + return true, time.Time{}, nil + } + + // Timer wants to be rescheduled + return false, callbackResp.NextExecuteAt, nil + + case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound: + // 4xx errors are not retryable + return false, time.Time{}, fmt.Errorf("client error %d: %s", resp.StatusCode, resp.Status) + + default: + // 5xx and other errors are retryable + return false, time.Time{}, fmt.Errorf("server error %d: %s", resp.StatusCode, resp.Status) + } +} + +// createRetryPolicy creates a retry policy from the timer's retry configuration +func (c *callbackProcessorImpl) createRetryPolicy(timer *databases.DbTimer) backoff.RetryPolicy { + // Default retry policy if none specified + defaultPolicy := backoff.NewExponentialRetryPolicy(30 * time.Second) + defaultPolicy.SetMaximumAttempts(3) + + if timer.RetryPolicy == nil { + return defaultPolicy + } + + // Parse retry policy from timer + // Note: This is a simplified implementation. In a real system, you'd want + // to properly deserialize the RetryPolicy from the timer's RetryPolicy field + // and convert it to the backoff.RetryPolicy interface + + // For now, return the default policy + return defaultPolicy +} + +// isRetryableError determines if an error should trigger a retry +func (c *callbackProcessorImpl) isRetryableError(err error) bool { + if err == nil { + return false + } + + // Network errors, timeouts, and 5xx errors are retryable + // 4xx errors are generally not retryable + return true +} diff --git a/server/engine/interfaces.go b/server/engine/interfaces.go index 78aa611..ba73b1b 100644 --- a/server/engine/interfaces.go +++ b/server/engine/interfaces.go @@ -91,8 +91,7 @@ func NewCallbackProcessor( processingChannel <-chan *databases.DbTimer, // the receive-only channel to receive the fired timer from the timer queue to be processed processingCompletedChannel chan<- *databases.DbTimer, // the send-only channel to send the completed timer to the timer queue ) (CallbackProcessor, error) { - // TODO: implement - return nil, nil + return newCallbackProcessorImpl(config, logger, processingChannel, processingCompletedChannel) } // TimerBatchReader should be one instance per shard diff --git a/server/log/tag/tags.go b/server/log/tag/tags.go index d265aa6..87b78d5 100644 --- a/server/log/tag/tags.go +++ b/server/log/tag/tags.go @@ -91,36 +91,6 @@ func WorkflowType(wfType string) Tag { return newStringTag("wf-type", wfType) } -// WorkflowState returns tag for WorkflowState -func WorkflowState(s int) Tag { - return newInt("wf-state", s) -} - -// WorkflowRunID returns tag for WorkflowRunID -func WorkflowRunID(runID string) Tag { - return newStringTag("wf-run-id", runID) -} - -// WorkflowResetBaseRunID returns tag for WorkflowResetBaseRunID -func WorkflowResetBaseRunID(runID string) Tag { - return newStringTag("wf-reset-base-run-id", runID) -} - -// WorkflowResetNewRunID returns tag for WorkflowResetNewRunID -func WorkflowResetNewRunID(runID string) Tag { - return newStringTag("wf-reset-new-run-id", runID) -} - -// WorkflowBinaryChecksum returns tag for WorkflowBinaryChecksum -func WorkflowBinaryChecksum(cs string) Tag { - return newStringTag("wf-binary-checksum", cs) -} - -// WorkflowActivityID returns tag for WorkflowActivityID -func WorkflowActivityID(id string) Tag { - return newStringTag("wf-activity-id", id) -} - // OperationName returns tag for OperationName func OperationName(operationName string) Tag { return newStringTag("operation-name", operationName) @@ -143,13 +113,11 @@ func Env(env string) Tag { return newStringTag("env", env) } - // NodeName return tag for node name func NodeName(nodeName string) Tag { return newStringTag("node-name", nodeName) } - // Key returns tag for Key func Key(k string) Tag { return newStringTag("key", k) @@ -160,12 +128,21 @@ func Name(k string) Tag { return newStringTag("name", k) } +// Namespace returns tag for Namespace +func Namespace(k string) Tag { + return newStringTag("namespace", k) +} + +// TimerId returns tag for TimerId +func TimerId(k string) Tag { + return newStringTag("timer-id", k) +} + // Value returns tag for Value func Value(v interface{}) Tag { return newObjectTag("value", v) } - // Current returns tag for Current func Current(v interface{}) Tag { return newObjectTag("current", v) @@ -218,4 +195,4 @@ func SysStackTrace(stackTrace string) Tag { // ShardId returns tag for ShardId func ShardId(shardId int) Tag { return newInt("shard-id", shardId) -} \ No newline at end of file +}