From b1ba234fd8fc13771e57f7b378ff8bc6c636afb7 Mon Sep 17 00:00:00 2001 From: Adrian Moennich Date: Sat, 12 Jan 2019 17:11:29 +0100 Subject: [PATCH] Make initial/max retry duration configurable closes #40 --- stream.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/stream.go b/stream.go index 5970877..636dcde 100644 --- a/stream.go +++ b/stream.go @@ -18,7 +18,12 @@ type Stream struct { c *http.Client req *http.Request lastEventId string - retry time.Duration + // InitialRetryDelay indicates how long to wait before attempting to reconnect. + // This may be overridden by the server. + InitialRetryDelay time.Duration + // MaxRetryDelay specifies the maximum time to wait between consecutive + // reconnect attempts. + MaxRetryDelay time.Duration // Events emits the events received by the stream Events chan Event // Errors emits any errors encountered while reading events from the stream. @@ -63,12 +68,13 @@ func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, e // control over the http client settings (timeouts, tls, etc) func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error) { stream := &Stream{ - c: client, - req: request, - lastEventId: lastEventId, - retry: time.Millisecond * 3000, - Events: make(chan Event), - Errors: make(chan error), + c: client, + req: request, + lastEventId: lastEventId, + InitialRetryDelay: time.Millisecond * 3000, + MaxRetryDelay: 0, + Events: make(chan Event), + Errors: make(chan error), } stream.c.CheckRedirect = checkRedirect @@ -163,7 +169,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) { pub := ev.(*publication) if pub.Retry() > 0 { - stream.retry = time.Duration(pub.Retry()) * time.Millisecond + stream.InitialRetryDelay = time.Duration(pub.Retry()) * time.Millisecond } if len(pub.Id()) > 0 { stream.lastEventId = pub.Id() @@ -173,7 +179,7 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) { } func (stream *Stream) retryRestartStream() { - backoff := stream.retry + backoff := stream.InitialRetryDelay for { if stream.Logger != nil { stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds()) @@ -192,5 +198,8 @@ func (stream *Stream) retryRestartStream() { } stream.Errors <- err backoff *= 2 + if stream.MaxRetryDelay > 0 && backoff > stream.MaxRetryDelay { + backoff = stream.MaxRetryDelay + } } }