diff --git a/client/client.go b/client/client.go index b4f536b..899eed8 100644 --- a/client/client.go +++ b/client/client.go @@ -87,9 +87,9 @@ func (e *event) UnmarshalJSON(p []byte) error { var errTimeout = errors.New("timeout") -func (c *Client) PollForEvents(ctx context.Context, lastID string, limit int, evts any) ([]string, error) { +func (c *Client) PollForEvents(ctx context.Context, lastID string, limit int, sort string, evts any) ([]string, error) { for { - ids, err := c.pollForEvents(ctx, lastID, limit, evts) + ids, err := c.pollForEvents(ctx, lastID, limit, sort, evts) if err == errTimeout { continue @@ -103,7 +103,7 @@ func (c *Client) PollForEvents(ctx context.Context, lastID string, limit int, ev } } -func (c *Client) pollForEvents(ctx context.Context, lastID string, limit int, evts any) ([]string, error) { +func (c *Client) pollForEvents(ctx context.Context, lastID string, limit int, sort string, evts any) ([]string, error) { uc := *c.eventsURL u := &uc diff --git a/server/integration_state_test.go b/server/integration_state_test.go index dd4fa3c..3e3463b 100644 --- a/server/integration_state_test.go +++ b/server/integration_state_test.go @@ -15,9 +15,10 @@ type eventsOrError struct { type State struct { // serverBaseURL string - client *client.Client - pollResult []string - secondPollResult []string - longPollResult chan eventsOrError - lastId string + client *client.Client + pollResult []string + secondPollResult []string + longPollResult chan eventsOrError + longPollResultDesc chan eventsOrError + lastId string } diff --git a/server/integration_test.go b/server/integration_test.go index 21ea50c..15494c4 100644 --- a/server/integration_test.go +++ b/server/integration_test.go @@ -17,6 +17,11 @@ import ( "go.uber.org/zap" ) +const ( + sortDesc = "desc" + sortAsc = "asc" +) + func init() { logger, _ := zap.NewDevelopment() if false { @@ -91,6 +96,7 @@ func InitializeScenario(ctx *godog.ScenarioContext) { ctx.Step(`^I should get a confirmation$`, iShouldGetAConfirmation) ctx.Step(`^I poll for the events$`, iPollForTheEvents) ctx.Step(`^I should receive the buffered event$`, iShouldReceiveTheBufferedEvent) + ctx.Step("^I should receive the buffered events in desc order$", iShouldReceiveDescTheNewEvent) ctx.Step(`^one event in the buffer$`, oneEventInTheBuffer) ctx.Step(`^I should receive the new event$`, iShouldReceiveTheNewEvent) ctx.Step(`^I start polling for the events$`, iStartPollingForTheEvents) @@ -133,7 +139,7 @@ func oneEventInTheBuffer(ctx context.Context) error { func iPollForTheEvents(ctx context.Context) error { s := getState(ctx) evts := []string{} - _, err := s.client.PollForEvents(ctx, "", 100, &evts) + _, err := s.client.PollForEvents(ctx, "", 100, sortAsc, &evts) if err != nil { return fmt.Errorf("failed polling for events: %w", err) } @@ -160,7 +166,7 @@ func iStartPollingForTheEvents(ctx context.Context) error { s.longPollResult = make(chan eventsOrError, 1) go func() { evts := []string{} - _, err := s.client.PollForEvents(ctx, "", 100, &evts) + _, err := s.client.PollForEvents(ctx, "", 100, sortAsc, &evts) if err != nil { s.longPollResult <- eventsOrError{err: fmt.Errorf("failed polling for events: %w", err)} return @@ -168,6 +174,17 @@ func iStartPollingForTheEvents(ctx context.Context) error { s.longPollResult <- eventsOrError{events: evts} }() + s.longPollResultDesc = make(chan eventsOrError, 1) + go func() { + evts := []string{} + _, err := s.client.PollForEvents(ctx, "", 100, sortDesc, &evts) + if err != nil { + s.longPollResult <- eventsOrError{err: fmt.Errorf("failed polling for events: %w", err)} + return + } + s.longPollResultDesc <- eventsOrError{events: evts} + }() + return nil } @@ -211,7 +228,7 @@ func twoEventsInTheBuffer(ctx context.Context) error { func iPollForOneEvent(ctx context.Context) error { s := getState(ctx) evts := []string{} - ids, err := s.client.PollForEvents(ctx, "", 1, &evts) + ids, err := s.client.PollForEvents(ctx, "", 1, sortAsc, &evts) if err != nil { return fmt.Errorf("failed polling for events: %w", err) } @@ -228,7 +245,7 @@ func iPollForOneEvent(ctx context.Context) error { func iPollForOtherEventAfterThePreviousEvent(ctx context.Context) error { s := getState(ctx) evts := []string{} - _, err := s.client.PollForEvents(ctx, s.lastId, 1, &evts) + _, err := s.client.PollForEvents(ctx, s.lastId, 1, sortAsc, &evts) if err != nil { return fmt.Errorf("failed polling for events: %w", err) } @@ -248,3 +265,22 @@ func iShouldGetOneEventForEachPoll(ctx context.Context) error { } return nil } + +func iShouldReceiveDescTheNewEvent(ctx context.Context) error { + s := getState(ctx) + select { + case <-ctx.Done(): + return fmt.Errorf("could not get long poll event: %w", ctx.Err()) + case res := <-s.longPollResultDesc: + if res.err != nil { + return fmt.Errorf("long poll failed: %w", res.err) + } + d := cmp.Diff(res.events, []string{"evt1"}) + + if d != "" { + return fmt.Errorf("unexpected poll result:\n%s", d) + } + } + + return nil +} diff --git a/server/server.go b/server/server.go index c0c293b..af45ff2 100644 --- a/server/server.go +++ b/server/server.go @@ -16,6 +16,11 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const ( + sortDesc = "desc" + sortAsc = "asc" +) + type Server struct { db bolted.Database log logr.Logger @@ -86,6 +91,18 @@ func New(log logr.Logger, db bolted.Database) (*Server, error) { q := r.URL.Query() + sort := sortAsc + sortStr := q.Get("sort") + + if sortStr != "" && sortStr != sortAsc && sortStr != sortDesc { + http.Error(w, fmt.Errorf("invalid sort value: %s", sortStr).Error(), http.StatusBadRequest) + return + } + + if sortStr == sortDesc { + sort = sortStr + } + after := q.Get("after") limit := 100 @@ -126,14 +143,21 @@ func New(log logr.Logger, db bolted.Database) (*Server, error) { it := tx.Iterator(eventsPath) if after != "" { it.Seek(after) - if !it.IsDone() { - if it.GetKey() == after { - it.Next() - } + if sort == sortAsc && !it.IsDone() && it.GetKey() == after { + it.Next() + } else if sort == "desc" && !it.IsDone() && it.GetKey() == after { + it.Prev() } } - for ; !it.IsDone() && len(events) < limit; it.Next() { - events = append(events, event{it.GetKey(), it.GetValue()}) + switch sort { + case sortAsc: + for ; !it.IsDone() && len(events) < limit; it.Next() { + events = append(events, event{it.GetKey(), it.GetValue()}) + } + case sortDesc: + for ; !it.IsDone() && len(events) < limit; it.Prev() { + events = append(events, event{it.GetKey(), it.GetValue()}) + } } return nil })