Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (e *event) UnmarshalJSON(p []byte) error {

var errTimeout = errors.New("timeout")

func (c *Client) PollForEvents(ctx context.Context, lastID string, limit int, evts any) ([]string, error) {
func (c *Client) PollForEvents(ctx context.Context, lastID string, limit int, sort string, evts any) ([]string, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think the interface is not really clear. How about adding either a type Order with two const values, or passing a boolean ascending. Sting allows for way too many 'wrong' values to be passed.

for {
ids, err := c.pollForEvents(ctx, lastID, limit, evts)
ids, err := c.pollForEvents(ctx, lastID, limit, sort, evts)

if err == errTimeout {
continue
Expand All @@ -103,7 +103,7 @@ func (c *Client) PollForEvents(ctx context.Context, lastID string, limit int, ev
}
}

func (c *Client) pollForEvents(ctx context.Context, lastID string, limit int, evts any) ([]string, error) {
func (c *Client) pollForEvents(ctx context.Context, lastID string, limit int, sort string, evts any) ([]string, error) {
uc := *c.eventsURL

u := &uc
Expand Down
11 changes: 6 additions & 5 deletions server/integration_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type eventsOrError struct {

type State struct {
// serverBaseURL string
client *client.Client
pollResult []string
secondPollResult []string
longPollResult chan eventsOrError
lastId string
client *client.Client
pollResult []string
secondPollResult []string
longPollResult chan eventsOrError
longPollResultDesc chan eventsOrError
lastId string
}
44 changes: 40 additions & 4 deletions server/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"go.uber.org/zap"
)

const (
sortDesc = "desc"
sortAsc = "asc"
)

func init() {
logger, _ := zap.NewDevelopment()
if false {
Expand Down Expand Up @@ -91,6 +96,7 @@ func InitializeScenario(ctx *godog.ScenarioContext) {
ctx.Step(`^I should get a confirmation$`, iShouldGetAConfirmation)
ctx.Step(`^I poll for the events$`, iPollForTheEvents)
ctx.Step(`^I should receive the buffered event$`, iShouldReceiveTheBufferedEvent)
ctx.Step("^I should receive the buffered events in desc order$", iShouldReceiveDescTheNewEvent)
ctx.Step(`^one event in the buffer$`, oneEventInTheBuffer)
ctx.Step(`^I should receive the new event$`, iShouldReceiveTheNewEvent)
ctx.Step(`^I start polling for the events$`, iStartPollingForTheEvents)
Expand Down Expand Up @@ -133,7 +139,7 @@ func oneEventInTheBuffer(ctx context.Context) error {
func iPollForTheEvents(ctx context.Context) error {
s := getState(ctx)
evts := []string{}
_, err := s.client.PollForEvents(ctx, "", 100, &evts)
_, err := s.client.PollForEvents(ctx, "", 100, sortAsc, &evts)
if err != nil {
return fmt.Errorf("failed polling for events: %w", err)
}
Expand All @@ -160,14 +166,25 @@ func iStartPollingForTheEvents(ctx context.Context) error {
s.longPollResult = make(chan eventsOrError, 1)
go func() {
evts := []string{}
_, err := s.client.PollForEvents(ctx, "", 100, &evts)
_, err := s.client.PollForEvents(ctx, "", 100, sortAsc, &evts)
if err != nil {
s.longPollResult <- eventsOrError{err: fmt.Errorf("failed polling for events: %w", err)}
return
}
s.longPollResult <- eventsOrError{events: evts}
}()

s.longPollResultDesc = make(chan eventsOrError, 1)
go func() {
evts := []string{}
_, err := s.client.PollForEvents(ctx, "", 100, sortDesc, &evts)
if err != nil {
s.longPollResult <- eventsOrError{err: fmt.Errorf("failed polling for events: %w", err)}
return
}
s.longPollResultDesc <- eventsOrError{events: evts}
}()

return nil
}

Expand Down Expand Up @@ -211,7 +228,7 @@ func twoEventsInTheBuffer(ctx context.Context) error {
func iPollForOneEvent(ctx context.Context) error {
s := getState(ctx)
evts := []string{}
ids, err := s.client.PollForEvents(ctx, "", 1, &evts)
ids, err := s.client.PollForEvents(ctx, "", 1, sortAsc, &evts)
if err != nil {
return fmt.Errorf("failed polling for events: %w", err)
}
Expand All @@ -228,7 +245,7 @@ func iPollForOneEvent(ctx context.Context) error {
func iPollForOtherEventAfterThePreviousEvent(ctx context.Context) error {
s := getState(ctx)
evts := []string{}
_, err := s.client.PollForEvents(ctx, s.lastId, 1, &evts)
_, err := s.client.PollForEvents(ctx, s.lastId, 1, sortAsc, &evts)
if err != nil {
return fmt.Errorf("failed polling for events: %w", err)
}
Expand All @@ -248,3 +265,22 @@ func iShouldGetOneEventForEachPoll(ctx context.Context) error {
}
return nil
}

func iShouldReceiveDescTheNewEvent(ctx context.Context) error {
s := getState(ctx)
select {
case <-ctx.Done():
return fmt.Errorf("could not get long poll event: %w", ctx.Err())
case res := <-s.longPollResultDesc:
if res.err != nil {
return fmt.Errorf("long poll failed: %w", res.err)
}
d := cmp.Diff(res.events, []string{"evt1"})

if d != "" {
return fmt.Errorf("unexpected poll result:\n%s", d)
}
}

return nil
}
36 changes: 30 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const (
sortDesc = "desc"
sortAsc = "asc"
)

type Server struct {
db bolted.Database
log logr.Logger
Expand Down Expand Up @@ -86,6 +91,18 @@ func New(log logr.Logger, db bolted.Database) (*Server, error) {

q := r.URL.Query()

sort := sortAsc
sortStr := q.Get("sort")

if sortStr != "" && sortStr != sortAsc && sortStr != sortDesc {
http.Error(w, fmt.Errorf("invalid sort value: %s", sortStr).Error(), http.StatusBadRequest)
return
}

if sortStr == sortDesc {
sort = sortStr
}

after := q.Get("after")

limit := 100
Expand Down Expand Up @@ -126,14 +143,21 @@ func New(log logr.Logger, db bolted.Database) (*Server, error) {
it := tx.Iterator(eventsPath)
if after != "" {
it.Seek(after)
if !it.IsDone() {
if it.GetKey() == after {
it.Next()
}
if sort == sortAsc && !it.IsDone() && it.GetKey() == after {
it.Next()
} else if sort == "desc" && !it.IsDone() && it.GetKey() == after {
it.Prev()
}
}
for ; !it.IsDone() && len(events) < limit; it.Next() {
events = append(events, event{it.GetKey(), it.GetValue()})
switch sort {
case sortAsc:
for ; !it.IsDone() && len(events) < limit; it.Next() {
events = append(events, event{it.GetKey(), it.GetValue()})
}
case sortDesc:
for ; !it.IsDone() && len(events) < limit; it.Prev() {
events = append(events, event{it.GetKey(), it.GetValue()})
}
}
return nil
})
Expand Down