diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index e6e904a..69ad2a4 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -5,7 +5,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: [ '1.25', '1.24', '1.23' ] + go: [ '1.25', '1.24' ] name: go ${{ matrix.go }} steps: - uses: actions/checkout@v3 diff --git a/.gitignore b/.gitignore index 346ccf9..feb25c9 100644 --- a/.gitignore +++ b/.gitignore @@ -14,5 +14,6 @@ /go.work* *.bench +*.profile .idea/ diff --git a/README.md b/README.md index 8c17636..7c05e0e 100644 --- a/README.md +++ b/README.md @@ -9,30 +9,32 @@ Simple low-overhead circuit breaker library. ## Usage ```go +// some arbitrary function +foo := func(ctx context.Context, bar int) (Foo, error) { + if bar == 42 { + return Foo{Bar: bar}, nil + } + return Foo{}, fmt.Errorf("bar is not 42") +} + h, err := hoglet.NewCircuit( - func(ctx context.Context, bar int) (Foo, error) { - if bar == 42 { - return Foo{Bar: bar}, nil - } - return Foo{}, fmt.Errorf("bar is not 42") - }, hoglet.NewSlidingWindowBreaker(5*time.Second, 0.1), hoglet.WithFailureCondition(hoglet.IgnoreContextCanceled), ) /* if err != nil ... */ -f, _ := h.Call(context.Background(), 42) +f, _ := hoglet.Wrap(h, foo)(context.Background(), 42) fmt.Println(f.Bar) // 42 -_, err = h.Call(context.Background(), 0) +_, err = hoglet.Wrap(h, foo)(context.Background(), 0) fmt.Println(err) // bar is not 42 -_, err = h.Call(context.Background(), 42) +_, err = hoglet.Wrap(h, foo)(context.Background(), 42) fmt.Println(err) // hoglet: breaker is open time.Sleep(5 * time.Second) -f, _ = h.Call(context.Background(), 42) +f, _ = hoglet.Wrap(h, foo)(context.Background(), 42) fmt.Println(f.Bar) // 42 ``` @@ -51,4 +53,4 @@ non-racy behavior around the failed function. ## Design Hoglet prefers throughput to correctness (e.g. by avoiding locks), which means it cannot guarantee an exact number of -calls will go through. \ No newline at end of file +calls will go through. diff --git a/example_test.go b/example_test.go index 453ca49..0687cc9 100644 --- a/example_test.go +++ b/example_test.go @@ -22,7 +22,6 @@ func foo(ctx context.Context, bar int) (Foo, error) { func ExampleEWMABreaker() { h, err := hoglet.NewCircuit( - foo, hoglet.NewEWMABreaker(10, 0.1), hoglet.WithHalfOpenDelay(time.Second), ) @@ -30,21 +29,21 @@ func ExampleEWMABreaker() { log.Fatal(err) } - f, err := h.Call(context.Background(), 1) + f, err := hoglet.Wrap(h, foo)(context.Background(), 1) if err != nil { log.Fatal(err) } fmt.Println(f.Bar) - _, err = h.Call(context.Background(), 100) + _, err = hoglet.Wrap(h, foo)(context.Background(), 100) fmt.Println(err) - _, err = h.Call(context.Background(), 2) + _, err = hoglet.Wrap(h, foo)(context.Background(), 2) fmt.Println(err) time.Sleep(time.Second) // wait for half-open delay - f, err = h.Call(context.Background(), 3) + f, err = hoglet.Wrap(h, foo)(context.Background(), 3) if err != nil { log.Fatal(err) } @@ -59,28 +58,27 @@ func ExampleEWMABreaker() { func ExampleSlidingWindowBreaker() { h, err := hoglet.NewCircuit( - foo, hoglet.NewSlidingWindowBreaker(time.Second, 0.1), ) if err != nil { log.Fatal(err) } - f, err := h.Call(context.Background(), 1) + f, err := hoglet.Wrap(h, foo)(context.Background(), 1) if err != nil { log.Fatal(err) } fmt.Println(f.Bar) - _, err = h.Call(context.Background(), 100) + _, err = hoglet.Wrap(h, foo)(context.Background(), 100) fmt.Println(err) - _, err = h.Call(context.Background(), 2) + _, err = hoglet.Wrap(h, foo)(context.Background(), 2) fmt.Println(err) time.Sleep(time.Second) // wait for sliding window - f, err = h.Call(context.Background(), 3) + f, err = hoglet.Wrap(h, foo)(context.Background(), 3) if err != nil { log.Fatal(err) } @@ -94,14 +92,15 @@ func ExampleSlidingWindowBreaker() { } func ExampleConcurrencyLimiter() { + foo := func(ctx context.Context, _ any) (any, error) { + select { + case <-ctx.Done(): + case <-time.After(time.Second): + } + return nil, nil + } + h, err := hoglet.NewCircuit( - func(ctx context.Context, _ any) (any, error) { - select { - case <-ctx.Done(): - case <-time.After(time.Second): - } - return nil, nil - }, hoglet.NewSlidingWindowBreaker(10, 0.1), hoglet.WithBreakerMiddleware(hoglet.ConcurrencyLimiter(1, false)), ) @@ -116,7 +115,7 @@ func ExampleConcurrencyLimiter() { go func() { // use up the concurrency limit - _, _ = h.Call(ctx, 42) + _, _ = hoglet.Wrap(h, foo)(ctx, 42) }() // ensure call above actually started @@ -124,7 +123,7 @@ func ExampleConcurrencyLimiter() { go func() { defer close(errCh) - _, err := h.Call(ctx, 42) + _, err := hoglet.Wrap(h, foo)(ctx, 42) if err != nil { errCh <- err } diff --git a/go.mod b/go.mod index ca81d9d..0ef5e6e 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module github.com/exaring/hoglet -go 1.23 +go 1.24.0 require ( github.com/stretchr/testify v1.8.2 - golang.org/x/sync v0.5.0 + golang.org/x/sync v0.18.0 ) require ( diff --git a/go.sum b/go.sum index e4f1d94..571f69f 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= -golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/hoglet.go b/hoglet.go index fb17a0a..4aa0d8f 100644 --- a/hoglet.go +++ b/hoglet.go @@ -13,8 +13,7 @@ import ( // stops calling the wrapped function until it closes again, returning [ErrCircuitOpen] in the meantime. // // A zero Circuit will panic, analogous to calling a nil function variable. Initialize with [NewCircuit]. -type Circuit[IN, OUT any] struct { - f WrappedFunc[IN, OUT] +type Circuit struct { options // State @@ -64,8 +63,8 @@ func (f BreakerMiddlewareFunc) Wrap(of ObserverFactory) (ObserverFactory, error) return f(of) } -// WrappedFunc is the type of the function wrapped by a Breaker. -type WrappedFunc[IN, OUT any] func(context.Context, IN) (OUT, error) +// WrappableFunc is the type of the function wrapped by a [Circuit]. +type WrappableFunc[IN, OUT any] func(context.Context, IN) (OUT, error) // dedupObservableCall wraps an [Observer] ensuring it can only be observed a single time. func dedupObservableCall(obs Observer) Observer { @@ -83,12 +82,10 @@ func (d *dedupedObserver) Observe(failure bool) { }) } -// NewCircuit instantiates a new [Circuit] that wraps the provided function. See [Circuit.Call] for calling semantics. -// A Circuit with a nil breaker is a noop wrapper around the provided function and will never open. -func NewCircuit[IN, OUT any](f WrappedFunc[IN, OUT], breaker Breaker, opts ...Option) (*Circuit[IN, OUT], error) { - c := &Circuit[IN, OUT]{ - f: f, - } +// NewCircuit instantiates a new [Circuit]. See [Wrap] for further usage. +// A [Circuit] with a nil breaker is a noop and will never open for any of its wrapped functions. +func NewCircuit(breaker Breaker, opts ...Option) (*Circuit, error) { + c := &Circuit{} o := options{ isFailure: defaultFailureCondition, @@ -115,10 +112,10 @@ func NewCircuit[IN, OUT any](f WrappedFunc[IN, OUT], breaker Breaker, opts ...Op return c, nil } -// State reports the current [State] of the circuit. +// State reports the current [State] of the [Circuit]. // It should only be used for informational purposes. To minimize race conditions, the circuit should be called directly // instead of checking its state first. -func (c *Circuit[IN, OUT]) State() State { +func (c *Circuit) State() State { oa := c.openedAt.Load() if oa == 0 { @@ -137,7 +134,7 @@ func (c *Circuit[IN, OUT]) State() State { // stateForCall returns the state of the circuit meant for the next call. // It wraps [State] to keep the mutable part outside of the external API. -func (c *Circuit[IN, OUT]) stateForCall() State { +func (c *Circuit) stateForCall() State { state := c.State() if state == StateHalfOpen { @@ -152,18 +149,18 @@ func (c *Circuit[IN, OUT]) stateForCall() State { // open marks the circuit as open, if it not already. // It is safe for concurrent calls and only the first one will actually set opening time. -func (c *Circuit[IN, OUT]) open() { +func (c *Circuit) open() { // CompareAndSwap is needed to avoid clobbering another goroutine's openedAt value. c.openedAt.CompareAndSwap(0, time.Now().UnixMicro()) } // reopen forcefully (re)marks the circuit as open, resetting the half-open time. -func (c *Circuit[IN, OUT]) reopen() { +func (c *Circuit) reopen() { c.openedAt.Store(time.Now().UnixMicro()) } // close closes the circuit. -func (c *Circuit[IN, OUT]) close() { +func (c *Circuit) close() { c.openedAt.Store(0) } @@ -173,22 +170,22 @@ func (c *Circuit[IN, OUT]) close() { // If the breaker is closed, it returns a non-nil [Observer] that will be used to observe the result of the call. // // It implements [ObserverFactory], so that the [Circuit] can act as the base for [BreakerMiddleware]. -func (c *Circuit[IN, OUT]) ObserverForCall(_ context.Context, state State) (Observer, error) { +func (c *Circuit) ObserverForCall(_ context.Context, state State) (Observer, error) { if state == StateOpen { return nil, ErrCircuitOpen } - return stateObserver[IN, OUT]{ + return stateObserver{ circuit: c, state: state, }, nil } -type stateObserver[IN, OUT any] struct { - circuit *Circuit[IN, OUT] +type stateObserver struct { + circuit *Circuit state State } -func (s stateObserver[IN, OUT]) Observe(failure bool) { +func (s stateObserver) Observe(failure bool) { switch s.circuit.breaker.observe(s.state == StateHalfOpen, failure) { case stateChangeNone: return // noop @@ -199,51 +196,51 @@ func (s stateObserver[IN, OUT]) Observe(failure bool) { } } -// Call calls the wrapped function if the circuit is closed and returns its result. If the circuit is open, it returns -// [ErrCircuitOpen]. +// Wrap wraps the provided function with the given [Circuit]. +// +// The returned function calls the wrapped function if the circuit is closed and returns its result. +// If the circuit is open, it returns [ErrCircuitOpen]. // // The wrapped function is called synchronously, but possible context errors are recorded as soon as they occur. This // ensures the circuit opens quickly, even if the wrapped function blocks. // // By default, all errors are considered failures (including [context.Canceled]), but this can be customized via -// [WithFailureCondition] and [IgnoreContextCanceled]. +// [WithFailureCondition] and [IgnoreContextCanceled] on the provided [Circuit]. // // Panics are observed as failures, but are not recovered (i.e.: they are "repanicked" instead). -func (c *Circuit[IN, OUT]) Call(ctx context.Context, in IN) (out OUT, err error) { - if c.f == nil { - return out, nil - } - - obs, err := c.observerFactory.ObserverForCall(ctx, c.stateForCall()) - if err != nil { - // Note: any errors here are not "observed" and do not count towards the breaker's failure rate. - // This includes: - // - ErrCircuitOpen - // - ErrConcurrencyLimit (for blocking limited circuits) - // - context timeouts while blocked on concurrency limit - // And any other errors that may be returned by optional breaker wrappers. - return out, err - } +func Wrap[IN, OUT any](c *Circuit, f WrappableFunc[IN, OUT]) WrappableFunc[IN, OUT] { + return func(ctx context.Context, in IN) (out OUT, err error) { + obs, err := c.observerFactory.ObserverForCall(ctx, c.stateForCall()) + if err != nil { + // Note: any errors here are not "observed" and do not count towards the breaker's failure rate. + // This includes: + // - ErrCircuitOpen + // - ErrConcurrencyLimit (for blocking limited circuits) + // - context timeouts while blocked on concurrency limit + // And any other errors that may be returned by optional breaker wrappers. + return out, err + } - // ensure we dedup the final - potentially wrapped - observer. - obs = dedupObservableCall(obs) + // ensure we dedup the final - potentially wrapped - observer. + obs = dedupObservableCall(obs) - obsCtx, cancel := context.WithCancelCause(ctx) - defer cancel(errWrappedFunctionDone) + obsCtx, cancel := context.WithCancelCause(ctx) + defer cancel(errWrappedFunctionDone) - // TODO: we could skip this if we could ensure the original context has neither cancellation nor deadline - go c.observeCtx(obs, obsCtx) + // TODO: we could skip this if we could ensure the original context has neither cancellation nor deadline + go c.observeCtx(obs, obsCtx) - defer func() { - // ensure we also open the breaker on panics - if err := recover(); err != nil { - obs.Observe(true) - panic(err) // let the caller deal with panics - } - obs.Observe(c.options.isFailure(err)) - }() + defer func() { + // ensure we also open the breaker on panics + if err := recover(); err != nil { + obs.Observe(true) + panic(err) // let the caller deal with panics + } + obs.Observe(c.options.isFailure(err)) + }() - return c.f(ctx, in) + return f(ctx, in) + } } // errWrappedFunctionDone is used to distinguish between internal and external (to the lib) context cancellations. @@ -251,7 +248,7 @@ var errWrappedFunctionDone = errors.New("wrapped function done") // observeCtx observes the given context for cancellation and records it as a failure. // It assumes [Observer] is idempotent and deduplicates calls itself. -func (c *Circuit[IN, OUT]) observeCtx(obs Observer, ctx context.Context) { +func (c *Circuit) observeCtx(obs Observer, ctx context.Context) { // We want to observe a context error as soon as possible to open the breaker, but at the same time we want to // keep the call to the wrapped function synchronous to avoid all pitfalls that come with asynchronicity. <-ctx.Done() diff --git a/hoglet_test.go b/hoglet_test.go index 576f152..e445a72 100644 --- a/hoglet_test.go +++ b/hoglet_test.go @@ -33,79 +33,82 @@ func noop(ctx context.Context, in noopIn) (struct{}, error) { } func BenchmarkHoglet_Do_EWMA(b *testing.B) { + noop := func(context.Context, struct{}) (out struct{}, err error) { return } h, err := NewCircuit( - func(context.Context, struct{}) (out struct{}, err error) { return }, NewEWMABreaker(10, 0.9), WithHalfOpenDelay(time.Second), // WithBreakerMiddleware(ConcurrencyLimiter(1, true)), ) require.NoError(b, err) - ctx := context.Background() + ctx := context.Background() // b.Context() introduces some overhead b.ReportAllocs() b.ResetTimer() + f := Wrap(h, noop) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _ = h.Call(ctx, struct{}{}) + _, _ = f(ctx, struct{}{}) } }) } func BenchmarkHoglet_Do_SlidingWindow(b *testing.B) { + noop := func(context.Context, struct{}) (out struct{}, err error) { return } + h, err := NewCircuit( - func(context.Context, struct{}) (out struct{}, err error) { return }, NewSlidingWindowBreaker(10*time.Second, 0.9), // WithBreakerMiddleware(ConcurrencyLimiter(1, true)), ) require.NoError(b, err) - ctx := context.Background() + ctx := context.Background() // b.Context() introduces some overhead b.ReportAllocs() b.ResetTimer() + f := Wrap(h, noop) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _ = h.Call(ctx, struct{}{}) + _, _ = f(ctx, struct{}{}) } }) } func TestBreaker_nil_breaker_does_not_open(t *testing.T) { - b, err := NewCircuit(noop, nil) + b, err := NewCircuit(nil) require.NoError(t, err) - _, err = b.Call(context.Background(), noopInFailure) + _, err = Wrap(b, noop)(t.Context(), noopInFailure) assert.Equal(t, sentinel, err) - _, err = b.Call(context.Background(), noopInFailure) + _, err = Wrap(b, noop)(t.Context(), noopInFailure) assert.Equal(t, sentinel, err) } func TestBreaker_ctx_parameter_not_cancelled(t *testing.T) { - b, err := NewCircuit(func(ctx context.Context, _ any) (context.Context, error) { - return ctx, nil - }, nil) + noop := func(ctx context.Context, _ any) (context.Context, error) { return ctx, nil } + b, err := NewCircuit(nil) require.NoError(t, err) - ctx, err := b.Call(context.Background(), noopInSuccess) + ctx, err := Wrap(b, noop)(t.Context(), noopInSuccess) require.NoError(t, err) assert.NoError(t, ctx.Err()) } func TestCircuit_ignored_context_cancellation_still_returned(t *testing.T) { + noop := func(ctx context.Context, _ any) (string, error) { + return "expected", ctx.Err() + } + b, err := NewCircuit( - func(ctx context.Context, _ any) (string, error) { - return "expected", ctx.Err() - }, nil, WithFailureCondition(IgnoreContextCanceled)) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) cancel() - out, err := b.Call(ctx, nil) + out, err := Wrap(b, noop)(ctx, nil) assert.ErrorIs(t, err, context.Canceled) assert.Equal(t, "expected", out) } @@ -182,12 +185,11 @@ func TestHoglet_Do(t *testing.T) { } for _, tt := range tests { - tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() mt := &mockBreaker{} - h, err := NewCircuit(noop, mt, WithHalfOpenDelay(time.Minute)) + h, err := NewCircuit(mt, WithHalfOpenDelay(time.Minute)) require.NoError(t, err) for i, call := range tt.calls { if call.halfOpen { @@ -197,7 +199,7 @@ func TestHoglet_Do(t *testing.T) { var err error maybeAssertPanic(t, func() { - _, err = h.Call(context.Background(), call.arg) + _, err = Wrap(h, noop)(t.Context(), call.arg) }, call.wantPanic) assert.Equal(t, call.wantErr, err, "unexpected error on call %d: %v", i, err) } diff --git a/options_test.go b/options_test.go index 52471e3..832cf7c 100644 --- a/options_test.go +++ b/options_test.go @@ -1,4 +1,4 @@ -package hoglet +package hoglet_test import ( "context" @@ -6,30 +6,33 @@ import ( "testing" "time" + "github.com/exaring/hoglet" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestWithHalfOpenDelay(t *testing.T) { - for name, b := range map[string]Breaker{ - "ewma": NewEWMABreaker(10, 0.1), - "slidingWindow": NewSlidingWindowBreaker(time.Second, 0.1), + noop := func(_ context.Context, in error) (any, error) { return nil, in } + + for name, b := range map[string]hoglet.Breaker{ + "ewma": hoglet.NewEWMABreaker(10, 0.1), + "slidingWindow": hoglet.NewSlidingWindowBreaker(time.Second, 0.1), } { t.Run(name, func(t *testing.T) { halfOpenDelay := 500 * time.Millisecond sentinelErr := errors.New("foo") - cb, err := NewCircuit(func(_ context.Context, in error) (any, error) { return nil, in }, b, WithHalfOpenDelay(halfOpenDelay)) + cb, err := hoglet.NewCircuit(b, hoglet.WithHalfOpenDelay(halfOpenDelay)) require.NoError(t, err) - _, err = cb.Call(context.Background(), sentinelErr) + _, err = hoglet.Wrap(cb, noop)(context.Background(), sentinelErr) require.ErrorIs(t, err, sentinelErr) - _, err = cb.Call(context.Background(), nil) + _, err = hoglet.Wrap(cb, noop)(context.Background(), nil) assert.Error(t, err, "expected circuit breaker to be open, but it's not") time.Sleep(halfOpenDelay) - _, err = cb.Call(context.Background(), nil) + _, err = hoglet.Wrap(cb, noop)(context.Background(), nil) assert.NoError(t, err, "expected circuit breaker to be closed again, but it's not") }) }