diff --git a/runner.go b/runner.go index bda5937..52fb55e 100644 --- a/runner.go +++ b/runner.go @@ -24,6 +24,15 @@ const ( ErrDead = errors.ConstError("worker runner is not running") ) +// DelayFunc is the type alias of the function that returns the delay between +// worker restarts. It is called with the number of attempts that have been made +// since the last window period. The lastErr argument is the error that caused +// the worker to exit. Depending on the error, the delay can be adjusted. +// +// The worker id is not passed to the function because we shouldn't to prevent +// the function from being able to make decisions based on the worker id. +type DelayFunc = func(attempts int, lastErr error) time.Duration + // StartFunc is the type alias of the function that creates a worker. type StartFunc = func(context.Context) (Worker, error) @@ -87,7 +96,7 @@ type workerInfo struct { // restartDelay holds the length of time that runWorker // will wait before calling the start function. - restartDelay time.Duration + restartDelay DelayFunc // stopping holds whether the worker is currently // being killed. The runWorker goroutine will @@ -100,6 +109,12 @@ type workerInfo struct { // started holds the time the worker was started. started time.Time + + // attempts holds the number of times the worker has been started. + attempts int + + // restarted holds the time the worker was last restarted. + restarted time.Time } func (i *workerInfo) status() string { @@ -170,11 +185,15 @@ type RunnerParams struct { // returned. MoreImportant func(err0, err1 error) bool - // RestartDelay holds the length of time the runner will + // RestartDelay returns the length of time the runner will // wait after a worker has exited with a non-fatal error // before it is restarted. - // If this is zero, DefaultRestartDelay will be used. - RestartDelay time.Duration + // If this is nil, DefaultRestartDelay will be used. + RestartDelay DelayFunc + + // RestartDelayPeriod is the length of time that the restart delay + // is calculated over. + RestartDelayPeriod time.Duration // Clock is used for timekeeping. If it's nil, clock.WallClock // will be used. @@ -223,8 +242,13 @@ func NewRunner(p RunnerParams) (*Runner, error) { return true } } - if p.RestartDelay == 0 { - p.RestartDelay = DefaultRestartDelay + if p.RestartDelay == nil { + p.RestartDelay = func(attempts int, lastErr error) time.Duration { + return DefaultRestartDelay + } + } + if p.RestartDelayPeriod == 0 { + p.RestartDelayPeriod = 10 * time.Minute } if p.Clock == nil { p.Clock = clock.WallClock @@ -486,6 +510,7 @@ func (runner *Runner) startWorker(req startReq) error { start: req.start, restartDelay: runner.params.RestartDelay, started: runner.params.Clock.Now().UTC(), + attempts: 1, done: make(chan struct{}, 1), } @@ -508,11 +533,16 @@ func (runner *Runner) workerDone(info doneInfo) { params := runner.params workerInfo := runner.workers[info.id] + + // If the worker isn't stopping and there was no error, remove the worker. + // This is a clean exit. if !workerInfo.stopping && info.err == nil { params.Logger.Debugf("removing %q from known workers", info.id) runner.removeWorker(info.id, workerInfo.done) return } + + // The worker has exited with an error. if info.err != nil { errStr := info.err.Error() if errWithStack, ok := info.err.(panicError); ok && errWithStack.Panicked() { @@ -543,6 +573,9 @@ func (runner *Runner) workerDone(info doneInfo) { } params.Logger.Errorf("exited %q: %s", info.id, errStr) } + + // If the worker has no start function, it has been stopped and should be + // removed. if workerInfo.start == nil { params.Logger.Debugf("no restart, removing %q from known workers", info.id) @@ -551,10 +584,26 @@ func (runner *Runner) workerDone(info doneInfo) { runner.removeWorker(info.id, workerInfo.done) return } + + // The worker has exited with a non-fatal error. We'll restart it after a + // delay, increment the attempts, so it's possible to track how many times + // the worker has been restarted over the restart delay period. + now := params.Clock.Now().UTC() + + // If the worker has been restarted after the restart delay period, reset + // the attempts counter. Otherwise, increment the attempts counter. + if now.After(workerInfo.restarted.Add(params.RestartDelayPeriod)) { + workerInfo.attempts = 0 + } else { + workerInfo.attempts++ + } + delay := workerInfo.restartDelay(workerInfo.attempts, info.err) + workerInfo.restarted = now + + // Kick off the worker again taking into account the delay. pprof.Do(runner.tomb.Context(context.Background()), workerInfo.labels, func(ctx context.Context) { - go runner.runWorker(ctx, workerInfo.restartDelay, info.id, workerInfo.start) + go runner.runWorker(ctx, delay, info.id, workerInfo.start) }) - workerInfo.restartDelay = params.RestartDelay } // removeWorker removes the worker with the given id from the @@ -624,7 +673,7 @@ func (runner *Runner) runWorker(ctx context.Context, delay time.Duration, id str // so that it can be stopped when a worker is removed. select { case <-runner.tomb.Dying(): - runner.donec <- doneInfo{id, nil} + runner.donec <- doneInfo{id: id, err: nil} return case <-runner.params.Clock.After(delay): } @@ -651,17 +700,17 @@ func (runner *Runner) runWorker(ctx context.Context, delay time.Duration, id str panic(err) } runner.params.Logger.Infof("%q called runtime.Goexit unexpectedly", id) - runner.donec <- doneInfo{id, errors.Errorf("runtime.Goexit called in running worker - probably inappropriate Assert")} + runner.donec <- doneInfo{id: id, err: errors.Errorf("runtime.Goexit called in running worker - probably inappropriate Assert")} }() worker, err := start(ctx) normal = true if err == nil { - runner.startedc <- startInfo{id, worker} + runner.startedc <- startInfo{id: id, worker: worker} err = worker.Wait() } runner.params.Logger.Infof("stopped %q, err: %v", id, err) - runner.donec <- doneInfo{id, err} + runner.donec <- doneInfo{id: id, err: err} } type reporter interface { diff --git a/runner_test.go b/runner_test.go index 6c6f70c..db5481d 100644 --- a/runner_test.go +++ b/runner_test.go @@ -49,7 +49,7 @@ func (*RunnerSuite) TestOneWorkerStart(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -68,7 +68,7 @@ func (*RunnerSuite) TestOneWorkerFinish(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -89,7 +89,7 @@ func (*RunnerSuite) TestOneWorkerRestart(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -113,7 +113,7 @@ func (*RunnerSuite) TestStopAndWaitWorker(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: 3 * time.Second, + RestartDelay: constDelay(3 * time.Second), }) c.Assert(err, jc.ErrorIsNil) @@ -152,7 +152,7 @@ func (*RunnerSuite) TestStopAndWaitWorkerReturnsWorkerError(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: 3 * time.Second, + RestartDelay: constDelay(3 * time.Second), }) c.Assert(err, jc.ErrorIsNil) @@ -191,7 +191,7 @@ func (*RunnerSuite) TestStopAndWaitWorkerWithAbort(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Second, + RestartDelay: constDelay(time.Second), Clock: clock, }) c.Assert(err, jc.ErrorIsNil) @@ -232,7 +232,7 @@ func (*RunnerSuite) TestOneWorkerStartFatalError(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -248,7 +248,7 @@ func (*RunnerSuite) TestOneWorkerDieFatalError(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -267,7 +267,7 @@ func (*RunnerSuite) TestOneWorkerStartStop(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -285,7 +285,7 @@ func (*RunnerSuite) TestOneWorkerStopFatalError(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -304,7 +304,7 @@ func (*RunnerSuite) TestWorkerStartWhenRunningOrStopping(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: 3 * time.Second, + RestartDelay: constDelay(3 * time.Second), }) c.Assert(err, jc.ErrorIsNil) @@ -337,16 +337,19 @@ func (*RunnerSuite) TestOneWorkerRestartDelay(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: delay, + RestartDelay: constDelay(delay), }) c.Assert(err, jc.ErrorIsNil) starter := newTestWorkerStarter() err = runner.StartWorker(context.Background(), "id", starter.start) c.Assert(err, jc.ErrorIsNil) + starter.assertStarted(c, true) + starter.die <- fmt.Errorf("non-fatal error") starter.assertStarted(c, false) + t0 := time.Now() starter.assertStarted(c, true) restartDuration := time.Since(t0) @@ -356,13 +359,150 @@ func (*RunnerSuite) TestOneWorkerRestartDelay(c *gc.C) { c.Assert(worker.Stop(runner), gc.IsNil) } -func (*RunnerSuite) TestOneWorkerShouldRestart(c *gc.C) { +func (*RunnerSuite) TestOneWorkerRestartDelayAfterFailedAttempts(c *gc.C) { + var received atomic.Int64 + + var fatalErr = errors.New("fatal error") + + errs := make(chan error) + + const delay = 100 * time.Millisecond + runner, err := worker.NewRunner(worker.RunnerParams{ + Name: "test", + IsFatal: noneFatal, + RestartDelay: func(attempts int, restartErr error) time.Duration { + received.Add(int64(attempts)) + + go func(restartErr error) { + select { + case errs <- restartErr: + case <-time.After(shortWait): + c.Fatalf("timed out sending restart error") + } + }(restartErr) + + return delay + }, + }) + c.Assert(err, jc.ErrorIsNil) + + starter := newTestWorkerStarter() + err = runner.StartWorker(context.Background(), "id", starter.start) + c.Assert(err, jc.ErrorIsNil) + + starter.assertStarted(c, true) + + starter.die <- fatalErr + starter.assertStarted(c, false) + + // Wait for the error to be received, from the restart delay function. + select { + case err := <-errs: + c.Check(err, gc.Equals, fatalErr) + case <-time.After(shortWait): + c.Fatalf("timed out waiting for restart error") + } + + t0 := time.Now() + starter.assertStarted(c, true) + restartDuration := time.Since(t0) + if restartDuration < delay { + c.Fatalf("restart delay was not respected; got %v want %v", restartDuration, delay) + } + c.Assert(worker.Stop(runner), gc.IsNil) + + c.Check(received.Load(), gc.Equals, int64(0)) +} + +func (*RunnerSuite) TestOneWorkerRestartDelayAfterFailedAttemptsLargeDelayPeriod(c *gc.C) { + var received atomic.Int64 + + const delay = 100 * time.Millisecond + runner, err := worker.NewRunner(worker.RunnerParams{ + Name: "test", + IsFatal: noneFatal, + RestartDelay: func(attempts int, restartErr error) time.Duration { + received.Add(int64(attempts)) + + return delay + }, + RestartDelayPeriod: delay * 2, + }) + c.Assert(err, jc.ErrorIsNil) + + starter := newTestWorkerStarter() + + err = runner.StartWorker(context.Background(), "id", starter.start) + c.Assert(err, jc.ErrorIsNil) + + starter.assertStarted(c, true) + starter.die <- errors.New("fatal error") + starter.assertStarted(c, false) + + // First attempt should always be zero. + c.Check(received.Load(), gc.Equals, int64(0)) + + starter.assertStarted(c, true) + starter.die <- errors.New("fatal error") + starter.assertStarted(c, false) + + starter.assertStarted(c, true) + + c.Assert(worker.Stop(runner), gc.IsNil) + + // Second attempt with a large delay period should not reset the attempt + // counter. + c.Check(received.Load(), gc.Equals, int64(1)) +} + +func (*RunnerSuite) TestOneWorkerRestartDelayAfterFailedAttemptsSmallDelayPeriod(c *gc.C) { + var received atomic.Int64 + + const delay = 100 * time.Millisecond + runner, err := worker.NewRunner(worker.RunnerParams{ + Name: "test", + IsFatal: noneFatal, + RestartDelay: func(attempts int, restartErr error) time.Duration { + received.Add(int64(attempts)) + + return delay + }, + RestartDelayPeriod: delay / 2, + }) + c.Assert(err, jc.ErrorIsNil) + + starter := newTestWorkerStarter() + + err = runner.StartWorker(context.Background(), "id", starter.start) + c.Assert(err, jc.ErrorIsNil) + + starter.assertStarted(c, true) + starter.die <- errors.New("fatal error") + starter.assertStarted(c, false) + + // First attempt should always be zero. + c.Check(received.Load(), gc.Equals, int64(0)) + + starter.assertStarted(c, true) + starter.die <- errors.New("fatal error") + starter.assertStarted(c, false) + + starter.assertStarted(c, true) + + c.Assert(worker.Stop(runner), gc.IsNil) + + // Second attempt with a small delay period should reset the attempt + // counter. + c.Check(received.Load(), gc.Equals, int64(0)) +} + +func (*RunnerSuite) TestOneWorkerShouldNotRestart(c *gc.C) { const delay = 100 * time.Millisecond runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, ShouldRestart: func(err error) bool { return false }, - RestartDelay: delay, + RestartDelay: constDelay(delay), }) c.Assert(err, jc.ErrorIsNil) @@ -394,7 +534,7 @@ func (*RunnerSuite) TestErrorImportance(c *gc.C) { Name: "test", IsFatal: allFatal, MoreImportant: moreImportant, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -414,7 +554,7 @@ func (*RunnerSuite) TestStartWorkerWhenDead(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -427,7 +567,7 @@ func (*RunnerSuite) TestStopWorkerWhenDead(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -440,7 +580,7 @@ func (*RunnerSuite) TestAllWorkersStoppedWhenOneDiesWithFatalError(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -472,7 +612,7 @@ func (*RunnerSuite) TestFatalErrorWhileStarting(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -515,7 +655,7 @@ func (*RunnerSuite) TestFatalErrorWhileSelfStartWorker(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -558,7 +698,7 @@ func (*RunnerSuite) TestWorkerWithNoWorker(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -572,7 +712,7 @@ func (*RunnerSuite) TestWorkerWithWorkerImmediatelyAvailable(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -595,7 +735,7 @@ func (*RunnerSuite) TestWorkerWithWorkerNotImmediatelyAvailable(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: allFatal, - RestartDelay: time.Millisecond, + RestartDelay: constDelay(time.Millisecond), }) c.Assert(err, jc.ErrorIsNil) @@ -636,7 +776,7 @@ func (*RunnerSuite) TestWorkerWithAbort(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Second, + RestartDelay: constDelay(time.Second), Clock: clock, }) c.Assert(err, jc.ErrorIsNil) @@ -680,7 +820,7 @@ func (*RunnerSuite) TestWorkerConcurrent(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Microsecond, + RestartDelay: constDelay(time.Microsecond), }) c.Assert(err, jc.ErrorIsNil) @@ -739,7 +879,7 @@ func (*RunnerSuite) TestWorkerWhenRunnerKilledWhileWaiting(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Second, + RestartDelay: constDelay(time.Second), Clock: clock, }) c.Assert(err, jc.ErrorIsNil) @@ -778,7 +918,7 @@ func (*RunnerSuite) TestWorkerWhenWorkerRemovedWhileWaiting(c *gc.C) { runner, err := worker.NewRunner(worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Second, + RestartDelay: constDelay(time.Second), Clock: clock, }) c.Assert(err, jc.ErrorIsNil) @@ -849,7 +989,7 @@ func (*RunnerSuite) TestRunnerReport(c *gc.C) { runner := worker.NewRunnerWithNotify(c, worker.RunnerParams{ Name: "test", IsFatal: noneFatal, - RestartDelay: time.Second, + RestartDelay: constDelay(time.Second), Clock: clock, }, started) defer worker.Stop(runner) @@ -1074,3 +1214,9 @@ func (checker *containsChecker) Check(params []interface{}, names []string) (boo return false, "Obtained value is not a []string" } + +func constDelay(d time.Duration) worker.DelayFunc { + return func(attempts int, lastError error) time.Duration { + return d + } +}