Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ const (
ErrDead = errors.ConstError("worker runner is not running")
)

// DelayFunc is the type alias of the function that returns the delay between
// worker restarts. It is called with the number of attempts that have been made
// since the last window period. The lastErr argument is the error that caused
// the worker to exit. Depending on the error, the delay can be adjusted.
//
// The worker id is not passed to the function because we shouldn't to prevent
// the function from being able to make decisions based on the worker id.
type DelayFunc = func(attempts int, lastErr error) time.Duration

// StartFunc is the type alias of the function that creates a worker.
type StartFunc = func(context.Context) (Worker, error)

Expand Down Expand Up @@ -87,7 +96,7 @@ type workerInfo struct {

// restartDelay holds the length of time that runWorker
// will wait before calling the start function.
restartDelay time.Duration
restartDelay DelayFunc

// stopping holds whether the worker is currently
// being killed. The runWorker goroutine will
Expand All @@ -100,6 +109,12 @@ type workerInfo struct {

// started holds the time the worker was started.
started time.Time

// attempts holds the number of times the worker has been started.
attempts int

// restarted holds the time the worker was last restarted.
restarted time.Time
}

func (i *workerInfo) status() string {
Expand Down Expand Up @@ -170,11 +185,15 @@ type RunnerParams struct {
// returned.
MoreImportant func(err0, err1 error) bool

// RestartDelay holds the length of time the runner will
// RestartDelay returns the length of time the runner will
// wait after a worker has exited with a non-fatal error
// before it is restarted.
// If this is zero, DefaultRestartDelay will be used.
RestartDelay time.Duration
// If this is nil, DefaultRestartDelay will be used.
RestartDelay DelayFunc

// RestartDelayPeriod is the length of time that the restart delay
// is calculated over.
RestartDelayPeriod time.Duration

// Clock is used for timekeeping. If it's nil, clock.WallClock
// will be used.
Expand Down Expand Up @@ -223,8 +242,13 @@ func NewRunner(p RunnerParams) (*Runner, error) {
return true
}
}
if p.RestartDelay == 0 {
p.RestartDelay = DefaultRestartDelay
if p.RestartDelay == nil {
p.RestartDelay = func(attempts int, lastErr error) time.Duration {
return DefaultRestartDelay
}
}
if p.RestartDelayPeriod == 0 {
p.RestartDelayPeriod = 10 * time.Minute
}
if p.Clock == nil {
p.Clock = clock.WallClock
Expand Down Expand Up @@ -486,6 +510,7 @@ func (runner *Runner) startWorker(req startReq) error {
start: req.start,
restartDelay: runner.params.RestartDelay,
started: runner.params.Clock.Now().UTC(),
attempts: 1,
done: make(chan struct{}, 1),
}

Expand All @@ -508,11 +533,16 @@ func (runner *Runner) workerDone(info doneInfo) {
params := runner.params

workerInfo := runner.workers[info.id]

// If the worker isn't stopping and there was no error, remove the worker.
// This is a clean exit.
if !workerInfo.stopping && info.err == nil {
params.Logger.Debugf("removing %q from known workers", info.id)
runner.removeWorker(info.id, workerInfo.done)
return
}

// The worker has exited with an error.
if info.err != nil {
errStr := info.err.Error()
if errWithStack, ok := info.err.(panicError); ok && errWithStack.Panicked() {
Expand Down Expand Up @@ -543,6 +573,9 @@ func (runner *Runner) workerDone(info doneInfo) {
}
params.Logger.Errorf("exited %q: %s", info.id, errStr)
}

// If the worker has no start function, it has been stopped and should be
// removed.
if workerInfo.start == nil {
params.Logger.Debugf("no restart, removing %q from known workers", info.id)

Expand All @@ -551,10 +584,26 @@ func (runner *Runner) workerDone(info doneInfo) {
runner.removeWorker(info.id, workerInfo.done)
return
}

// The worker has exited with a non-fatal error. We'll restart it after a
// delay, increment the attempts, so it's possible to track how many times
// the worker has been restarted over the restart delay period.
now := params.Clock.Now().UTC()

// If the worker has been restarted after the restart delay period, reset
// the attempts counter. Otherwise, increment the attempts counter.
if now.After(workerInfo.restarted.Add(params.RestartDelayPeriod)) {
workerInfo.attempts = 0
} else {
workerInfo.attempts++
}
delay := workerInfo.restartDelay(workerInfo.attempts, info.err)
workerInfo.restarted = now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what we are doing in the dependency engine?

if info.startedTime.IsZero() || timeSinceStarted < engine.config.BackoffResetTime {

Says that we do have a window that we consider this to be active restarts vs a new error.

If we are going with your DelayFunc, maybe we should align that with the DependencyEngine since it is in this same package:

if delay > time.Duration(0) {

We could factor out that code into our preferred DelayFunc and then be passing the function to the DependencyEngine as well as the Runner.


// Kick off the worker again taking into account the delay.
pprof.Do(runner.tomb.Context(context.Background()), workerInfo.labels, func(ctx context.Context) {
go runner.runWorker(ctx, workerInfo.restartDelay, info.id, workerInfo.start)
go runner.runWorker(ctx, delay, info.id, workerInfo.start)
})
workerInfo.restartDelay = params.RestartDelay
}

// removeWorker removes the worker with the given id from the
Expand Down Expand Up @@ -624,7 +673,7 @@ func (runner *Runner) runWorker(ctx context.Context, delay time.Duration, id str
// so that it can be stopped when a worker is removed.
select {
case <-runner.tomb.Dying():
runner.donec <- doneInfo{id, nil}
runner.donec <- doneInfo{id: id, err: nil}
return
case <-runner.params.Clock.After(delay):
}
Expand All @@ -651,17 +700,17 @@ func (runner *Runner) runWorker(ctx context.Context, delay time.Duration, id str
panic(err)
}
runner.params.Logger.Infof("%q called runtime.Goexit unexpectedly", id)
runner.donec <- doneInfo{id, errors.Errorf("runtime.Goexit called in running worker - probably inappropriate Assert")}
runner.donec <- doneInfo{id: id, err: errors.Errorf("runtime.Goexit called in running worker - probably inappropriate Assert")}
}()
worker, err := start(ctx)
normal = true

if err == nil {
runner.startedc <- startInfo{id, worker}
runner.startedc <- startInfo{id: id, worker: worker}
err = worker.Wait()
}
runner.params.Logger.Infof("stopped %q, err: %v", id, err)
runner.donec <- doneInfo{id, err}
runner.donec <- doneInfo{id: id, err: err}
}

type reporter interface {
Expand Down
Loading