From 2b19af6581867feef9bda4c38e386e989e375001 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 6 Nov 2024 20:39:09 +0800 Subject: [PATCH 1/5] feat: introduce new gofs api --- clients/gofs/api.go | 139 ++++++++++++++++++++++++++++ clients/gofs/gofs.go | 129 +++++++++++++++++++++++++- examples/basic/main.go | 2 +- fs/runtime/external/runtime_test.go | 114 ++++++++++++++++------- 4 files changed, 343 insertions(+), 41 deletions(-) create mode 100644 clients/gofs/api.go diff --git a/clients/gofs/api.go b/clients/gofs/api.go new file mode 100644 index 0000000..687069b --- /dev/null +++ b/clients/gofs/api.go @@ -0,0 +1,139 @@ +/* + * Copyright 2024 DefFunction Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gofs + +import "context" + +type Event[T any] interface { + Data() *T + Ack() error // TODO: Handle Ack +} + +type BaseModule interface { + Init(ctx *FunctionContext) error +} + +type Function[I any, O any] interface { + BaseModule + Handle(ctx *FunctionContext, event Event[I]) (Event[O], error) +} + +type Source[O any] interface { + BaseModule + Handle(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error +} + +type Sink[I any] interface { + BaseModule + Handle(ctx *FunctionContext, event Event[I]) error +} + +type Custom interface { + BaseModule + Handle(ctx *FunctionContext) error +} + +type eventImpl[T any] struct { + data *T +} + +func NewEvent[T any](data *T) Event[T] { + return &eventImpl[T]{ + data: data, + } +} + +func (e *eventImpl[T]) Data() *T { + return e.data +} + +func (e *eventImpl[T]) Ack() error { + // TODO: Implement this + return nil +} + +type simpleFunction[I any, O any] struct { + handle func(ctx *FunctionContext, event Event[I]) (Event[O], error) +} + +func NewSimpleFunction[I any, O any](handle func(ctx *FunctionContext, event Event[I]) (Event[O], error)) Function[I, O] { + return &simpleFunction[I, O]{ + handle: handle, + } +} + +func (f *simpleFunction[I, O]) Init(_ *FunctionContext) error { + return nil +} + +func (f *simpleFunction[I, O]) Handle(ctx *FunctionContext, event Event[I]) (Event[O], error) { + return f.handle(ctx, event) +} + +type simpleSource[O any] struct { + handle func(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error +} + +func NewSimpleSource[O any](handle func(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error) Source[O] { + return &simpleSource[O]{ + handle: handle, + } +} + +func (s *simpleSource[O]) Init(_ *FunctionContext) error { + return nil +} + +func (s *simpleSource[O]) Handle(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error { + return s.handle(ctx, emit) +} + +type simpleSink[I any] struct { + handle func(ctx *FunctionContext, event Event[I]) error +} + +func NewSimpleSink[I any](handle func(ctx *FunctionContext, event Event[I]) error) Sink[I] { + return &simpleSink[I]{ + handle: handle, + } +} + +func (s *simpleSink[I]) Init(_ *FunctionContext) error { + return nil +} + +func (s *simpleSink[I]) Handle(ctx *FunctionContext, event Event[I]) error { + return s.handle(ctx, event) +} + +type simpleCustom struct { + handle func(ctx *FunctionContext) error +} + +func NewSimpleCustom(handle func(ctx *FunctionContext) error) Custom { + return &simpleCustom{ + handle: handle, + } +} + +func (c *simpleCustom) Init(_ *FunctionContext) error { + return nil +} + +func (c *simpleCustom) Handle(ctx *FunctionContext) error { + return c.handle(ctx) +} diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index f6caee2..9cfc8a5 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -67,7 +67,8 @@ func NewFSClient() FSClient { type moduleWrapper struct { *fsClient - processFunc func(context.Context, []byte) []byte // Only for Function + ctx *FunctionContext + processFunc func(context.Context, []byte) []byte // Only for DefFunction executeFunc func(context.Context) error initFunc func(context.Context) error registerErr error @@ -107,7 +108,124 @@ func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient { return c } -func Function[I any, O any](process func(context.Context, *I) *O) *moduleWrapper { +func RegisterFunction[I any, O any](function Function[I, O]) *moduleWrapper { + m := &moduleWrapper{} + processFunc := func(ctx context.Context, payload []byte) []byte { + input := new(I) + err := json.Unmarshal(payload, input) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) + return nil + } + output, err := function.Handle(m.ctx, NewEvent(input)) + // TODO: Handle err + outputPayload, _ := json.Marshal(output.Data()) + return outputPayload + } + m.initFunc = func(ctx context.Context) error { + outputSchema, err := avroschema.Reflect(new(O)) + if err != nil { + return err + } + err = m.rpc.RegisterSchema(ctx, outputSchema) + if err != nil { + return fmt.Errorf("failed to register schema: %w", err) + } + return function.Init(m.ctx) + } + m.executeFunc = func(ctx context.Context) error { + for { + inputPayload, err := m.rpc.Read(ctx) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) + time.Sleep(3 * time.Second) + continue + } + outputPayload := processFunc(ctx, inputPayload) + err = m.rpc.Write(ctx, outputPayload) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err) + } + } + } + m.processFunc = processFunc + return m +} + +func RegisterSource[O any](source Source[O]) *moduleWrapper { + m := &moduleWrapper{} + emit := func(ctx context.Context, event Event[O]) error { + outputPayload, _ := json.Marshal(event.Data()) + return m.rpc.Write(m.ctx.warpContext(ctx), outputPayload) + } + m.initFunc = func(ctx context.Context) error { + outputSchema, err := avroschema.Reflect(new(O)) + if err != nil { + return err + } + err = m.rpc.RegisterSchema(ctx, outputSchema) + if err != nil { + return fmt.Errorf("failed to register schema: %w", err) + } + return source.Init(m.ctx) + } + m.executeFunc = func(ctx context.Context) error { + return source.Handle(m.ctx, emit) + } + return m +} + +func RegisterSink[I any](sink Sink[I]) *moduleWrapper { + m := &moduleWrapper{} + processFunc := func(ctx context.Context, payload []byte) { + input := new(I) + err := json.Unmarshal(payload, input) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) + } + sink.Handle(m.ctx, NewEvent(input)) + } + m.initFunc = func(ctx context.Context) error { + inputSchema, err := avroschema.Reflect(new(I)) + if err != nil { + return err + } + err = m.rpc.RegisterSchema(ctx, inputSchema) + if err != nil { + return fmt.Errorf("failed to register schema: %w", err) + } + return sink.Init(m.ctx) + } + m.executeFunc = func(ctx context.Context) error { + for { + inputPayload, err := m.rpc.Read(ctx) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) + time.Sleep(3 * time.Second) + continue + } + processFunc(ctx, inputPayload) + } + } + return m +} + +func RegisterCustom(custom Custom) *moduleWrapper { + m := &moduleWrapper{} + initFunc := func(ctx context.Context) error { + return custom.Init(m.ctx) + } + executeFunc := func(ctx context.Context) error { + return custom.Handle(m.ctx) + } + m.initFunc = initFunc + m.executeFunc = executeFunc + + // TODO: Simplify this + return m +} + +func DefFunction[I any, O any](process func(context.Context, *I) *O) *moduleWrapper { processFunc := func(ctx context.Context, payload []byte) []byte { input := new(I) err := json.Unmarshal(payload, input) @@ -149,7 +267,7 @@ func Function[I any, O any](process func(context.Context, *I) *O) *moduleWrapper return m } -func Source[O any](process func(ctx context.Context, emit func(context.Context, *O) error)) *moduleWrapper { +func DefSource[O any](process func(ctx context.Context, emit func(context.Context, *O) error)) *moduleWrapper { m := &moduleWrapper{} emit := func(ctx context.Context, event *O) error { outputPayload, _ := json.Marshal(event) @@ -173,7 +291,7 @@ func Source[O any](process func(ctx context.Context, emit func(context.Context, return m } -func Sink[I any](process func(context.Context, *I)) *moduleWrapper { +func DefSink[I any](process func(context.Context, *I)) *moduleWrapper { processFunc := func(ctx context.Context, payload []byte) { input := new(I) err := json.Unmarshal(payload, input) @@ -208,7 +326,7 @@ func Sink[I any](process func(context.Context, *I)) *moduleWrapper { return m } -func Custom(init func(ctx context.Context) error, execute func(ctx context.Context) error) *moduleWrapper { +func DefCustom(init func(ctx context.Context) error, execute func(ctx context.Context) error) *moduleWrapper { return &moduleWrapper{ initFunc: init, executeFunc: execute, @@ -281,6 +399,7 @@ func (c *fsClient) Run() error { } ctx := funcCtx.warpContext(context.WithValue(context.Background(), funcCtxKey{}, funcCtx)) m.fsClient = c + m.ctx = funcCtx err := m.initFunc(ctx) if err != nil { return err diff --git a/examples/basic/main.go b/examples/basic/main.go index 8178ea1..87ec345 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -25,7 +25,7 @@ import ( func main() { slog.Info("Hello from Go function!") err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.Function(myProcess)). + Register(gofs.DefaultModule, gofs.DefFunction(myProcess)). Run() if err != nil { slog.Error(err.Error()) diff --git a/fs/runtime/external/runtime_test.go b/fs/runtime/external/runtime_test.go index 583fd30..3bfbc9e 100644 --- a/fs/runtime/external/runtime_test.go +++ b/fs/runtime/external/runtime_test.go @@ -51,28 +51,57 @@ type testRecord struct { var log = common.NewDefaultLogger() +type TestFunction struct { +} + +func (f *TestFunction) Init(_ *gofs.FunctionContext) error { + return nil +} + +func (f *TestFunction) Handle(_ *gofs.FunctionContext, event gofs.Event[Person]) (gofs.Event[Person], error) { + p := event.Data() + p.Money += 1 + return gofs.NewEvent(p), nil +} + +type TestCounterFunction struct { +} + +func (f *TestCounterFunction) Init(ctx *gofs.FunctionContext) error { + return nil +} + +func (f *TestCounterFunction) Handle(_ *gofs.FunctionContext, event gofs.Event[Counter]) (gofs.Event[Counter], error) { + c := event.Data() + c.Count += 1 + return gofs.NewEvent(c), nil +} + +type TestSource struct { +} + +func (f *TestSource) Init(_ *gofs.FunctionContext) error { + return nil +} + +func (f *TestSource) Handle(_ *gofs.FunctionContext, emit func(context.Context, gofs.Event[testRecord]) error) error { + for i := 0; i < 10; i++ { + err := emit(context.Background(), gofs.NewEvent(&testRecord{ + ID: i, + Name: "test", + })) + if err != nil { + log.Error(err, "failed to emit record") + } + } + return nil +} + func runMockClient() { err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.Function(func(ctx context.Context, i *Person) *Person { - i.Money += 1 - return i - })). - Register("counter", gofs.Function(func(ctx context.Context, i *Counter) *Counter { - i.Count += 1 - return i - })). - Register("test-source", gofs.Source( - func(ctx context.Context, emit func(ctx context.Context, record *testRecord) error) { - for i := 0; i < 10; i++ { - err := emit(ctx, &testRecord{ - ID: i, - Name: "test", - }) - if err != nil { - log.Error(err, "failed to emit record") - } - } - })). + Register(gofs.DefaultModule, gofs.RegisterFunction(&TestFunction{})). + Register("counter", gofs.RegisterFunction(&TestCounterFunction{})). + Register("test-source", gofs.RegisterSource(&TestSource{})). Run() if err != nil { log.Error(err, "failed to run mock client") @@ -278,6 +307,25 @@ func TestExternalSourceModule(t *testing.T) { assert.NoError(t, err) } +type TestSink struct { + sinkCh chan Counter +} + +func (f *TestSink) Init(_ *gofs.FunctionContext) error { + return nil +} + +func (f *TestSink) Handle(_ *gofs.FunctionContext, event gofs.Event[Counter]) error { + f.sinkCh <- *event.Data() + return nil +} + +func newTestSink() *TestSink { + return &TestSink{ + sinkCh: make(chan Counter), + } +} + func TestExternalSinkModule(t *testing.T) { testSocketPath := fmt.Sprintf("/tmp/%s.sock", t.Name()) assert.NoError(t, os.RemoveAll(testSocketPath)) @@ -324,12 +372,10 @@ func TestExternalSinkModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - sinkCh := make(chan Counter) + sinkMod := newTestSink() go func() { - err := gofs.NewFSClient().Register("test-sink", gofs.Sink(func(ctx context.Context, record *Counter) { - sinkCh <- *record - })).Run() + err := gofs.NewFSClient().Register("test-sink", gofs.RegisterSink(sinkMod)).Run() if err != nil { log.Error(err, "failed to run mock client") } @@ -342,7 +388,7 @@ func TestExternalSinkModule(t *testing.T) { err = fm.ProduceEvent(inputTopic, event) assert.NoError(t, err) - r := <-sinkCh + r := <-sinkMod.sinkCh assert.Equal(t, 1, r.Count) err = fm.DeleteFunction("", f.Name) @@ -395,17 +441,16 @@ func TestExternalStatefulModule(t *testing.T) { readyCh := make(chan struct{}) go func() { - err := gofs.NewFSClient().Register("test-stateful", gofs.Custom(func(ctx context.Context) error { return nil }, - func(ctx context.Context) error { - funcCtx := gofs.GetFunctionContext(ctx) - err = funcCtx.PutState(ctx, "test-key", []byte("test-value")) + err := gofs.NewFSClient().Register("test-stateful", gofs.RegisterCustom(gofs.NewSimpleCustom( + func(ctx *gofs.FunctionContext) error { + err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { log.Error(err, "failed to put state") } close(readyCh) return nil }, - )).Run() + ))).Run() if err != nil { log.Error(err, "failed to run mock client") } @@ -471,17 +516,16 @@ func TestFunctionConfig(t *testing.T) { readyCh := make(chan struct{}) go func() { - err := gofs.NewFSClient().Register(module, gofs.Custom(func(ctx context.Context) error { return nil }, - func(ctx context.Context) error { - funcCtx := gofs.GetFunctionContext(ctx) - err = funcCtx.PutState(ctx, "test-key", []byte("test-value")) + err := gofs.NewFSClient().Register(module, gofs.RegisterCustom(gofs.NewSimpleCustom( + func(ctx *gofs.FunctionContext) error { + err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { log.Error(err, "failed to put state") } close(readyCh) return nil }, - )).Run() + ))).Run() if err != nil { log.Error(err, "failed to run mock client") } From 6bff436da898612867dc603cc45dc3b0eefda718 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 6 Nov 2024 21:28:11 +0800 Subject: [PATCH 2/5] refactor: improve FunctionContext --- clients/gofs/api.go | 51 ++++--- clients/gofs/gofs.go | 211 ++++++++-------------------- clients/gofs/gofs_socket.go | 2 +- clients/gofs/gofs_wasmfs.go | 6 +- examples/basic/main.go | 8 +- fs/runtime/external/runtime_test.go | 20 +-- 6 files changed, 105 insertions(+), 193 deletions(-) diff --git a/clients/gofs/api.go b/clients/gofs/api.go index 687069b..6f1d512 100644 --- a/clients/gofs/api.go +++ b/clients/gofs/api.go @@ -18,33 +18,42 @@ package gofs import "context" +type FunctionContext interface { + context.Context + GetState(ctx context.Context, key string) ([]byte, error) + PutState(ctx context.Context, key string, value []byte) error + Write(ctx context.Context, payload []byte) error + Read(ctx context.Context) ([]byte, error) + GetConfig(ctx context.Context) (map[string]string, error) +} + type Event[T any] interface { Data() *T Ack() error // TODO: Handle Ack } type BaseModule interface { - Init(ctx *FunctionContext) error + Init(ctx FunctionContext) error } type Function[I any, O any] interface { BaseModule - Handle(ctx *FunctionContext, event Event[I]) (Event[O], error) + Handle(ctx FunctionContext, event Event[I]) (Event[O], error) } type Source[O any] interface { BaseModule - Handle(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error + Handle(ctx FunctionContext, emit func(context.Context, Event[O]) error) error } type Sink[I any] interface { BaseModule - Handle(ctx *FunctionContext, event Event[I]) error + Handle(ctx FunctionContext, event Event[I]) error } type Custom interface { BaseModule - Handle(ctx *FunctionContext) error + Handle(ctx FunctionContext) error } type eventImpl[T any] struct { @@ -67,73 +76,73 @@ func (e *eventImpl[T]) Ack() error { } type simpleFunction[I any, O any] struct { - handle func(ctx *FunctionContext, event Event[I]) (Event[O], error) + handle func(ctx FunctionContext, event Event[I]) (Event[O], error) } -func NewSimpleFunction[I any, O any](handle func(ctx *FunctionContext, event Event[I]) (Event[O], error)) Function[I, O] { +func NewSimpleFunction[I any, O any](handle func(ctx FunctionContext, event Event[I]) (Event[O], error)) Function[I, O] { return &simpleFunction[I, O]{ handle: handle, } } -func (f *simpleFunction[I, O]) Init(_ *FunctionContext) error { +func (f *simpleFunction[I, O]) Init(_ FunctionContext) error { return nil } -func (f *simpleFunction[I, O]) Handle(ctx *FunctionContext, event Event[I]) (Event[O], error) { +func (f *simpleFunction[I, O]) Handle(ctx FunctionContext, event Event[I]) (Event[O], error) { return f.handle(ctx, event) } type simpleSource[O any] struct { - handle func(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error + handle func(ctx FunctionContext, emit func(context.Context, Event[O]) error) error } -func NewSimpleSource[O any](handle func(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error) Source[O] { +func NewSimpleSource[O any](handle func(ctx FunctionContext, emit func(context.Context, Event[O]) error) error) Source[O] { return &simpleSource[O]{ handle: handle, } } -func (s *simpleSource[O]) Init(_ *FunctionContext) error { +func (s *simpleSource[O]) Init(_ FunctionContext) error { return nil } -func (s *simpleSource[O]) Handle(ctx *FunctionContext, emit func(context.Context, Event[O]) error) error { +func (s *simpleSource[O]) Handle(ctx FunctionContext, emit func(context.Context, Event[O]) error) error { return s.handle(ctx, emit) } type simpleSink[I any] struct { - handle func(ctx *FunctionContext, event Event[I]) error + handle func(ctx FunctionContext, event Event[I]) error } -func NewSimpleSink[I any](handle func(ctx *FunctionContext, event Event[I]) error) Sink[I] { +func NewSimpleSink[I any](handle func(ctx FunctionContext, event Event[I]) error) Sink[I] { return &simpleSink[I]{ handle: handle, } } -func (s *simpleSink[I]) Init(_ *FunctionContext) error { +func (s *simpleSink[I]) Init(_ FunctionContext) error { return nil } -func (s *simpleSink[I]) Handle(ctx *FunctionContext, event Event[I]) error { +func (s *simpleSink[I]) Handle(ctx FunctionContext, event Event[I]) error { return s.handle(ctx, event) } type simpleCustom struct { - handle func(ctx *FunctionContext) error + handle func(ctx FunctionContext) error } -func NewSimpleCustom(handle func(ctx *FunctionContext) error) Custom { +func NewSimpleCustom(handle func(ctx FunctionContext) error) Custom { return &simpleCustom{ handle: handle, } } -func (c *simpleCustom) Init(_ *FunctionContext) error { +func (c *simpleCustom) Init(_ FunctionContext) error { return nil } -func (c *simpleCustom) Handle(ctx *FunctionContext) error { +func (c *simpleCustom) Handle(ctx FunctionContext) error { return c.handle(ctx) } diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 9cfc8a5..3775c59 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -67,17 +67,17 @@ func NewFSClient() FSClient { type moduleWrapper struct { *fsClient - ctx *FunctionContext - processFunc func(context.Context, []byte) []byte // Only for DefFunction - executeFunc func(context.Context) error - initFunc func(context.Context) error + ctx *functionContextImpl + processFunc func(FunctionContext, []byte) ([]byte, error) // Only for Wasm Function + executeFunc func(FunctionContext) error + initFunc func(FunctionContext) error registerErr error } -func (m *moduleWrapper) AddInitFunc(initFunc func(context.Context) error) *moduleWrapper { +func (m *moduleWrapper) AddInitFunc(initFunc func(FunctionContext) error) *moduleWrapper { parentInit := m.initFunc if parentInit != nil { - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { err := parentInit(ctx) if err != nil { return err @@ -110,19 +110,23 @@ func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient { func RegisterFunction[I any, O any](function Function[I, O]) *moduleWrapper { m := &moduleWrapper{} - processFunc := func(ctx context.Context, payload []byte) []byte { + processFunc := func(ctx FunctionContext, payload []byte) ([]byte, error) { input := new(I) err := json.Unmarshal(payload, input) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) - return nil + return nil, fmt.Errorf("failed to parse JSON: %w", err) } - output, err := function.Handle(m.ctx, NewEvent(input)) - // TODO: Handle err - outputPayload, _ := json.Marshal(output.Data()) - return outputPayload + output, err := function.Handle(ctx, NewEvent(input)) + if err != nil { + return nil, err + } + outputPayload, err := json.Marshal(output.Data()) + if err != nil { + return nil, fmt.Errorf("failed to marshal JSON: %w", err) + } + return outputPayload, nil } - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { outputSchema, err := avroschema.Reflect(new(O)) if err != nil { return err @@ -131,20 +135,23 @@ func RegisterFunction[I any, O any](function Function[I, O]) *moduleWrapper { if err != nil { return fmt.Errorf("failed to register schema: %w", err) } - return function.Init(m.ctx) + return function.Init(ctx) } - m.executeFunc = func(ctx context.Context) error { + m.executeFunc = func(ctx FunctionContext) error { for { - inputPayload, err := m.rpc.Read(ctx) + inputPayload, err := ctx.Read(ctx) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) time.Sleep(3 * time.Second) continue } - outputPayload := processFunc(ctx, inputPayload) - err = m.rpc.Write(ctx, outputPayload) + outputPayload, err := processFunc(ctx, inputPayload) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err) + return err + } + err = ctx.Write(ctx, outputPayload) + if err != nil { + return err } } } @@ -156,9 +163,9 @@ func RegisterSource[O any](source Source[O]) *moduleWrapper { m := &moduleWrapper{} emit := func(ctx context.Context, event Event[O]) error { outputPayload, _ := json.Marshal(event.Data()) - return m.rpc.Write(m.ctx.warpContext(ctx), outputPayload) + return m.ctx.Write(ctx, outputPayload) } - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { outputSchema, err := avroschema.Reflect(new(O)) if err != nil { return err @@ -167,25 +174,25 @@ func RegisterSource[O any](source Source[O]) *moduleWrapper { if err != nil { return fmt.Errorf("failed to register schema: %w", err) } - return source.Init(m.ctx) + return source.Init(ctx) } - m.executeFunc = func(ctx context.Context) error { - return source.Handle(m.ctx, emit) + m.executeFunc = func(ctx FunctionContext) error { + return source.Handle(ctx, emit) } return m } func RegisterSink[I any](sink Sink[I]) *moduleWrapper { m := &moduleWrapper{} - processFunc := func(ctx context.Context, payload []byte) { + processFunc := func(ctx FunctionContext, payload []byte) error { input := new(I) err := json.Unmarshal(payload, input) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) + return fmt.Errorf("failed to parse JSON: %w", err) } - sink.Handle(m.ctx, NewEvent(input)) + return sink.Handle(ctx, NewEvent(input)) } - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { inputSchema, err := avroschema.Reflect(new(I)) if err != nil { return err @@ -194,17 +201,19 @@ func RegisterSink[I any](sink Sink[I]) *moduleWrapper { if err != nil { return fmt.Errorf("failed to register schema: %w", err) } - return sink.Init(m.ctx) + return sink.Init(ctx) } - m.executeFunc = func(ctx context.Context) error { + m.executeFunc = func(ctx FunctionContext) error { for { - inputPayload, err := m.rpc.Read(ctx) + inputPayload, err := ctx.Read(ctx) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) time.Sleep(3 * time.Second) continue } - processFunc(ctx, inputPayload) + if err = processFunc(ctx, inputPayload); err != nil { + return err + } } } return m @@ -212,11 +221,11 @@ func RegisterSink[I any](sink Sink[I]) *moduleWrapper { func RegisterCustom(custom Custom) *moduleWrapper { m := &moduleWrapper{} - initFunc := func(ctx context.Context) error { - return custom.Init(m.ctx) + initFunc := func(ctx FunctionContext) error { + return custom.Init(ctx) } - executeFunc := func(ctx context.Context) error { - return custom.Handle(m.ctx) + executeFunc := func(ctx FunctionContext) error { + return custom.Handle(ctx) } m.initFunc = initFunc m.executeFunc = executeFunc @@ -225,137 +234,30 @@ func RegisterCustom(custom Custom) *moduleWrapper { return m } -func DefFunction[I any, O any](process func(context.Context, *I) *O) *moduleWrapper { - processFunc := func(ctx context.Context, payload []byte) []byte { - input := new(I) - err := json.Unmarshal(payload, input) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) - } - output := process(ctx, input) - outputPayload, _ := json.Marshal(output) - return outputPayload - } - m := &moduleWrapper{} - m.initFunc = func(ctx context.Context) error { - outputSchema, err := avroschema.Reflect(new(O)) - if err != nil { - return err - } - err = m.rpc.RegisterSchema(ctx, outputSchema) - if err != nil { - return fmt.Errorf("failed to register schema: %w", err) - } - return nil - } - m.executeFunc = func(ctx context.Context) error { - for { - inputPayload, err := m.rpc.Read(ctx) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) - time.Sleep(3 * time.Second) - continue - } - outputPayload := processFunc(ctx, inputPayload) - err = m.rpc.Write(ctx, outputPayload) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err) - } - } - } - m.processFunc = processFunc - return m -} - -func DefSource[O any](process func(ctx context.Context, emit func(context.Context, *O) error)) *moduleWrapper { - m := &moduleWrapper{} - emit := func(ctx context.Context, event *O) error { - outputPayload, _ := json.Marshal(event) - return m.rpc.Write(ctx, outputPayload) - } - m.initFunc = func(ctx context.Context) error { - outputSchema, err := avroschema.Reflect(new(O)) - if err != nil { - return err - } - err = m.rpc.RegisterSchema(ctx, outputSchema) - if err != nil { - return fmt.Errorf("failed to register schema: %w", err) - } - return nil - } - m.executeFunc = func(ctx context.Context) error { - process(ctx, emit) - return nil - } - return m -} - -func DefSink[I any](process func(context.Context, *I)) *moduleWrapper { - processFunc := func(ctx context.Context, payload []byte) { - input := new(I) - err := json.Unmarshal(payload, input) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) - } - process(ctx, input) - } - m := &moduleWrapper{} - m.initFunc = func(ctx context.Context) error { - inputSchema, err := avroschema.Reflect(new(I)) - if err != nil { - return err - } - err = m.rpc.RegisterSchema(ctx, inputSchema) - if err != nil { - return fmt.Errorf("failed to register schema: %w", err) - } - return nil - } - m.executeFunc = func(ctx context.Context) error { - for { - inputPayload, err := m.rpc.Read(ctx) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) - time.Sleep(3 * time.Second) - continue - } - processFunc(ctx, inputPayload) - } - } - return m -} - -func DefCustom(init func(ctx context.Context) error, execute func(ctx context.Context) error) *moduleWrapper { - return &moduleWrapper{ - initFunc: init, - executeFunc: execute, - } -} - -type FunctionContext struct { +type functionContextImpl struct { + context.Context c *fsClient name string module string } -func (c *FunctionContext) GetState(ctx context.Context, key string) ([]byte, error) { +func (c *functionContextImpl) GetState(ctx context.Context, key string) ([]byte, error) { return c.c.rpc.GetState(c.warpContext(ctx), key) } -func (c *FunctionContext) PutState(ctx context.Context, key string, value []byte) error { +func (c *functionContextImpl) PutState(ctx context.Context, key string, value []byte) error { return c.c.rpc.PutState(c.warpContext(ctx), key, value) } -func (c *FunctionContext) Write(ctx context.Context, payload []byte) error { +func (c *functionContextImpl) Write(ctx context.Context, payload []byte) error { return c.c.rpc.Write(c.warpContext(ctx), payload) } -func (c *FunctionContext) Read(ctx context.Context) ([]byte, error) { +func (c *functionContextImpl) Read(ctx context.Context) ([]byte, error) { return c.c.rpc.Read(c.warpContext(ctx)) } -func (c *FunctionContext) GetConfig(ctx context.Context) (map[string]string, error) { +func (c *functionContextImpl) GetConfig(ctx context.Context) (map[string]string, error) { return c.c.rpc.GetConfig(c.warpContext(ctx)) } @@ -389,7 +291,7 @@ func (c *fsClient) Run() error { if !ok { return fmt.Errorf("module %s not found", module) } - funcCtx := &FunctionContext{c: c, name: funcName, module: module} + funcCtx := &functionContextImpl{c: c, name: funcName, module: module} if c.rpc == nil { rpc, err := newFSRPCClient() if err != nil { @@ -398,9 +300,10 @@ func (c *fsClient) Run() error { c.rpc = rpc } ctx := funcCtx.warpContext(context.WithValue(context.Background(), funcCtxKey{}, funcCtx)) + funcCtx.Context = ctx m.fsClient = c m.ctx = funcCtx - err := m.initFunc(ctx) + err := m.initFunc(funcCtx) if err != nil { return err } @@ -408,7 +311,7 @@ func (c *fsClient) Run() error { if c.rpc.skipExecuting() { return nil } - return m.executeFunc(ctx) + return m.executeFunc(funcCtx) } func (c *fsClient) Error() string { diff --git a/clients/gofs/gofs_socket.go b/clients/gofs/gofs_socket.go index fd9908d..7eb1d6c 100644 --- a/clients/gofs/gofs_socket.go +++ b/clients/gofs/gofs_socket.go @@ -32,7 +32,7 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func (c *FunctionContext) warpContext(parent context.Context) context.Context { +func (c *functionContextImpl) warpContext(parent context.Context) context.Context { return metadata.NewOutgoingContext(parent, metadata.New(map[string]string{ "name": c.name, })) diff --git a/clients/gofs/gofs_wasmfs.go b/clients/gofs/gofs_wasmfs.go index afd7313..bd9b143 100644 --- a/clients/gofs/gofs_wasmfs.go +++ b/clients/gofs/gofs_wasmfs.go @@ -42,13 +42,13 @@ func process() { if runningModule == nil { panic("no module loaded") } - err := runningModule.executeFunc(context.Background()) + err := runningModule.executeFunc(runningModule.ctx) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } } -func (c *FunctionContext) warpContext(parent context.Context) context.Context { +func (c *functionContextImpl) warpContext(parent context.Context) context.Context { return parent } @@ -91,7 +91,7 @@ func (c *fsRPCClient) loadModule(m *moduleWrapper) { if m.processFunc == nil { panic("only function module is supported for the wasm runtime") } - m.executeFunc = func(ctx context.Context) error { + m.executeFunc = func(ctx FunctionContext) error { var stat syscall.Stat_t syscall.Fstat(processFd, &stat) payload := make([]byte, stat.Size) diff --git a/examples/basic/main.go b/examples/basic/main.go index 87ec345..56b407a 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -17,7 +17,6 @@ package main import ( - "context" "github.com/functionstream/function-stream/clients/gofs" "log/slog" ) @@ -25,7 +24,7 @@ import ( func main() { slog.Info("Hello from Go function!") err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.DefFunction(myProcess)). + Register(gofs.DefaultModule, gofs.RegisterFunction(gofs.NewSimpleFunction(myProcess))). Run() if err != nil { slog.Error(err.Error()) @@ -38,7 +37,8 @@ type Person struct { Expected int `json:"expected"` } -func myProcess(ctx context.Context, person *Person) *Person { +func myProcess(ctx gofs.FunctionContext, e gofs.Event[Person]) (gofs.Event[Person], error) { + person := e.Data() person.Money += 1 - return person + return gofs.NewEvent(person), nil } diff --git a/fs/runtime/external/runtime_test.go b/fs/runtime/external/runtime_test.go index 3bfbc9e..08a562a 100644 --- a/fs/runtime/external/runtime_test.go +++ b/fs/runtime/external/runtime_test.go @@ -54,11 +54,11 @@ var log = common.NewDefaultLogger() type TestFunction struct { } -func (f *TestFunction) Init(_ *gofs.FunctionContext) error { +func (f *TestFunction) Init(_ gofs.FunctionContext) error { return nil } -func (f *TestFunction) Handle(_ *gofs.FunctionContext, event gofs.Event[Person]) (gofs.Event[Person], error) { +func (f *TestFunction) Handle(_ gofs.FunctionContext, event gofs.Event[Person]) (gofs.Event[Person], error) { p := event.Data() p.Money += 1 return gofs.NewEvent(p), nil @@ -67,11 +67,11 @@ func (f *TestFunction) Handle(_ *gofs.FunctionContext, event gofs.Event[Person]) type TestCounterFunction struct { } -func (f *TestCounterFunction) Init(ctx *gofs.FunctionContext) error { +func (f *TestCounterFunction) Init(ctx gofs.FunctionContext) error { return nil } -func (f *TestCounterFunction) Handle(_ *gofs.FunctionContext, event gofs.Event[Counter]) (gofs.Event[Counter], error) { +func (f *TestCounterFunction) Handle(_ gofs.FunctionContext, event gofs.Event[Counter]) (gofs.Event[Counter], error) { c := event.Data() c.Count += 1 return gofs.NewEvent(c), nil @@ -80,11 +80,11 @@ func (f *TestCounterFunction) Handle(_ *gofs.FunctionContext, event gofs.Event[C type TestSource struct { } -func (f *TestSource) Init(_ *gofs.FunctionContext) error { +func (f *TestSource) Init(_ gofs.FunctionContext) error { return nil } -func (f *TestSource) Handle(_ *gofs.FunctionContext, emit func(context.Context, gofs.Event[testRecord]) error) error { +func (f *TestSource) Handle(_ gofs.FunctionContext, emit func(context.Context, gofs.Event[testRecord]) error) error { for i := 0; i < 10; i++ { err := emit(context.Background(), gofs.NewEvent(&testRecord{ ID: i, @@ -311,11 +311,11 @@ type TestSink struct { sinkCh chan Counter } -func (f *TestSink) Init(_ *gofs.FunctionContext) error { +func (f *TestSink) Init(_ gofs.FunctionContext) error { return nil } -func (f *TestSink) Handle(_ *gofs.FunctionContext, event gofs.Event[Counter]) error { +func (f *TestSink) Handle(_ gofs.FunctionContext, event gofs.Event[Counter]) error { f.sinkCh <- *event.Data() return nil } @@ -442,7 +442,7 @@ func TestExternalStatefulModule(t *testing.T) { go func() { err := gofs.NewFSClient().Register("test-stateful", gofs.RegisterCustom(gofs.NewSimpleCustom( - func(ctx *gofs.FunctionContext) error { + func(ctx gofs.FunctionContext) error { err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { log.Error(err, "failed to put state") @@ -517,7 +517,7 @@ func TestFunctionConfig(t *testing.T) { go func() { err := gofs.NewFSClient().Register(module, gofs.RegisterCustom(gofs.NewSimpleCustom( - func(ctx *gofs.FunctionContext) error { + func(ctx gofs.FunctionContext) error { err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { log.Error(err, "failed to put state") From a22e197c4adb0b6bcca8db22276dcc7b0287ef30 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 6 Nov 2024 21:45:05 +0800 Subject: [PATCH 3/5] chore: rename api --- clients/gofs/gofs.go | 8 ++++---- examples/basic/main.go | 2 +- fs/runtime/external/runtime_test.go | 12 ++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 3775c59..53251e4 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -108,7 +108,7 @@ func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient { return c } -func RegisterFunction[I any, O any](function Function[I, O]) *moduleWrapper { +func WithFunction[I any, O any](function Function[I, O]) *moduleWrapper { m := &moduleWrapper{} processFunc := func(ctx FunctionContext, payload []byte) ([]byte, error) { input := new(I) @@ -159,7 +159,7 @@ func RegisterFunction[I any, O any](function Function[I, O]) *moduleWrapper { return m } -func RegisterSource[O any](source Source[O]) *moduleWrapper { +func WithSource[O any](source Source[O]) *moduleWrapper { m := &moduleWrapper{} emit := func(ctx context.Context, event Event[O]) error { outputPayload, _ := json.Marshal(event.Data()) @@ -182,7 +182,7 @@ func RegisterSource[O any](source Source[O]) *moduleWrapper { return m } -func RegisterSink[I any](sink Sink[I]) *moduleWrapper { +func WithSink[I any](sink Sink[I]) *moduleWrapper { m := &moduleWrapper{} processFunc := func(ctx FunctionContext, payload []byte) error { input := new(I) @@ -219,7 +219,7 @@ func RegisterSink[I any](sink Sink[I]) *moduleWrapper { return m } -func RegisterCustom(custom Custom) *moduleWrapper { +func WithCustom(custom Custom) *moduleWrapper { m := &moduleWrapper{} initFunc := func(ctx FunctionContext) error { return custom.Init(ctx) diff --git a/examples/basic/main.go b/examples/basic/main.go index 56b407a..3875553 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -24,7 +24,7 @@ import ( func main() { slog.Info("Hello from Go function!") err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.RegisterFunction(gofs.NewSimpleFunction(myProcess))). + Register(gofs.DefaultModule, gofs.WithFunction(gofs.NewSimpleFunction(myProcess))). Run() if err != nil { slog.Error(err.Error()) diff --git a/fs/runtime/external/runtime_test.go b/fs/runtime/external/runtime_test.go index 08a562a..629e454 100644 --- a/fs/runtime/external/runtime_test.go +++ b/fs/runtime/external/runtime_test.go @@ -99,9 +99,9 @@ func (f *TestSource) Handle(_ gofs.FunctionContext, emit func(context.Context, g func runMockClient() { err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.RegisterFunction(&TestFunction{})). - Register("counter", gofs.RegisterFunction(&TestCounterFunction{})). - Register("test-source", gofs.RegisterSource(&TestSource{})). + Register(gofs.DefaultModule, gofs.WithFunction(&TestFunction{})). + Register("counter", gofs.WithFunction(&TestCounterFunction{})). + Register("test-source", gofs.WithSource(&TestSource{})). Run() if err != nil { log.Error(err, "failed to run mock client") @@ -375,7 +375,7 @@ func TestExternalSinkModule(t *testing.T) { sinkMod := newTestSink() go func() { - err := gofs.NewFSClient().Register("test-sink", gofs.RegisterSink(sinkMod)).Run() + err := gofs.NewFSClient().Register("test-sink", gofs.WithSink(sinkMod)).Run() if err != nil { log.Error(err, "failed to run mock client") } @@ -441,7 +441,7 @@ func TestExternalStatefulModule(t *testing.T) { readyCh := make(chan struct{}) go func() { - err := gofs.NewFSClient().Register("test-stateful", gofs.RegisterCustom(gofs.NewSimpleCustom( + err := gofs.NewFSClient().Register("test-stateful", gofs.WithCustom(gofs.NewSimpleCustom( func(ctx gofs.FunctionContext) error { err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { @@ -516,7 +516,7 @@ func TestFunctionConfig(t *testing.T) { readyCh := make(chan struct{}) go func() { - err := gofs.NewFSClient().Register(module, gofs.RegisterCustom(gofs.NewSimpleCustom( + err := gofs.NewFSClient().Register(module, gofs.WithCustom(gofs.NewSimpleCustom( func(ctx gofs.FunctionContext) error { err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { From 5c9d1d2a059973b90a92feca6e8c5321482da5c0 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 8 Nov 2024 21:57:58 +0800 Subject: [PATCH 4/5] feat: support event ack --- clients/gofs/api.go | 22 ++- clients/gofs/gofs.go | 63 +++--- clients/gofs/gofs_socket.go | 25 ++- clients/gofs/gofs_wasmfs.go | 9 +- fs/runtime/external/model/fs.pb.go | 252 ++++++++++++++++++------ fs/runtime/external/model/fs.proto | 12 +- fs/runtime/external/model/fs_grpc.pb.go | 36 ++++ fs/runtime/external/runtime.go | 23 +++ fs/runtime/external/runtime_test.go | 9 +- 9 files changed, 341 insertions(+), 110 deletions(-) diff --git a/clients/gofs/api.go b/clients/gofs/api.go index 6f1d512..6ced2b8 100644 --- a/clients/gofs/api.go +++ b/clients/gofs/api.go @@ -22,14 +22,14 @@ type FunctionContext interface { context.Context GetState(ctx context.Context, key string) ([]byte, error) PutState(ctx context.Context, key string, value []byte) error - Write(ctx context.Context, payload []byte) error - Read(ctx context.Context) ([]byte, error) + Write(ctx context.Context, rawEvent Event[[]byte]) error + Read(ctx context.Context) (Event[[]byte], error) GetConfig(ctx context.Context) (map[string]string, error) } type Event[T any] interface { Data() *T - Ack() error // TODO: Handle Ack + Ack(ctx context.Context) error } type BaseModule interface { @@ -57,12 +57,18 @@ type Custom interface { } type eventImpl[T any] struct { - data *T + data *T + ackFunc func(context.Context) error } func NewEvent[T any](data *T) Event[T] { + return NewEventWithAck(data, nil) +} + +func NewEventWithAck[T any](data *T, ack func(ctx context.Context) error) Event[T] { return &eventImpl[T]{ - data: data, + data: data, + ackFunc: ack, } } @@ -70,8 +76,10 @@ func (e *eventImpl[T]) Data() *T { return e.data } -func (e *eventImpl[T]) Ack() error { - // TODO: Implement this +func (e *eventImpl[T]) Ack(ctx context.Context) error { + if e.ackFunc != nil { + return e.ackFunc(ctx) + } return nil } diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index 53251e4..b17279d 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -19,6 +19,7 @@ package gofs import ( "context" "encoding/json" + "errors" "fmt" "os" "sync" @@ -110,7 +111,7 @@ func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient { func WithFunction[I any, O any](function Function[I, O]) *moduleWrapper { m := &moduleWrapper{} - processFunc := func(ctx FunctionContext, payload []byte) ([]byte, error) { + processFunc := func(ctx FunctionContext, payload []byte) ([]byte, error) { // This is only for the wasm function input := new(I) err := json.Unmarshal(payload, input) if err != nil { @@ -145,11 +146,22 @@ func WithFunction[I any, O any](function Function[I, O]) *moduleWrapper { time.Sleep(3 * time.Second) continue } - outputPayload, err := processFunc(ctx, inputPayload) + input := new(I) + err = json.Unmarshal(*inputPayload.Data(), input) + if err != nil { + return fmt.Errorf("failed to parse JSON: %w", err) + } + output, err := function.Handle(ctx, NewEventWithAck(input, inputPayload.Ack)) if err != nil { return err } - err = ctx.Write(ctx, outputPayload) + outputPayload, err := json.Marshal(output.Data()) + if err != nil { + return fmt.Errorf("failed to marshal JSON: %w", err) + } + err = ctx.Write(ctx, NewEventWithAck(&outputPayload, func(ctx context.Context) error { + return errors.Join(inputPayload.Ack(ctx), output.Ack(ctx)) + })) if err != nil { return err } @@ -163,7 +175,7 @@ func WithSource[O any](source Source[O]) *moduleWrapper { m := &moduleWrapper{} emit := func(ctx context.Context, event Event[O]) error { outputPayload, _ := json.Marshal(event.Data()) - return m.ctx.Write(ctx, outputPayload) + return m.ctx.Write(ctx, NewEventWithAck(&outputPayload, event.Ack)) } m.initFunc = func(ctx FunctionContext) error { outputSchema, err := avroschema.Reflect(new(O)) @@ -184,14 +196,6 @@ func WithSource[O any](source Source[O]) *moduleWrapper { func WithSink[I any](sink Sink[I]) *moduleWrapper { m := &moduleWrapper{} - processFunc := func(ctx FunctionContext, payload []byte) error { - input := new(I) - err := json.Unmarshal(payload, input) - if err != nil { - return fmt.Errorf("failed to parse JSON: %w", err) - } - return sink.Handle(ctx, NewEvent(input)) - } m.initFunc = func(ctx FunctionContext) error { inputSchema, err := avroschema.Reflect(new(I)) if err != nil { @@ -211,7 +215,12 @@ func WithSink[I any](sink Sink[I]) *moduleWrapper { time.Sleep(3 * time.Second) continue } - if err = processFunc(ctx, inputPayload); err != nil { + input := new(I) + err = json.Unmarshal(*inputPayload.Data(), input) + if err != nil { + return fmt.Errorf("failed to parse JSON: %w", err) + } + if err = sink.Handle(ctx, NewEventWithAck(input, inputPayload.Ack)); err != nil { return err } } @@ -220,18 +229,14 @@ func WithSink[I any](sink Sink[I]) *moduleWrapper { } func WithCustom(custom Custom) *moduleWrapper { - m := &moduleWrapper{} - initFunc := func(ctx FunctionContext) error { - return custom.Init(ctx) - } - executeFunc := func(ctx FunctionContext) error { - return custom.Handle(ctx) + return &moduleWrapper{ + initFunc: func(ctx FunctionContext) error { + return custom.Init(ctx) + }, + executeFunc: func(ctx FunctionContext) error { + return custom.Handle(ctx) + }, } - m.initFunc = initFunc - m.executeFunc = executeFunc - - // TODO: Simplify this - return m } type functionContextImpl struct { @@ -249,11 +254,11 @@ func (c *functionContextImpl) PutState(ctx context.Context, key string, value [] return c.c.rpc.PutState(c.warpContext(ctx), key, value) } -func (c *functionContextImpl) Write(ctx context.Context, payload []byte) error { - return c.c.rpc.Write(c.warpContext(ctx), payload) +func (c *functionContextImpl) Write(ctx context.Context, rawEvent Event[[]byte]) error { + return c.c.rpc.Write(c.warpContext(ctx), rawEvent) } -func (c *functionContextImpl) Read(ctx context.Context) ([]byte, error) { +func (c *functionContextImpl) Read(ctx context.Context) (Event[[]byte], error) { return c.c.rpc.Read(c.warpContext(ctx)) } @@ -263,10 +268,6 @@ func (c *functionContextImpl) GetConfig(ctx context.Context) (map[string]string, type funcCtxKey struct{} -func GetFunctionContext(ctx context.Context) *FunctionContext { - return ctx.Value(funcCtxKey{}).(*FunctionContext) -} - func (c *fsClient) Run() error { if c.err != nil { return c.err diff --git a/clients/gofs/gofs_socket.go b/clients/gofs/gofs_socket.go index 7eb1d6c..a474e6a 100644 --- a/clients/gofs/gofs_socket.go +++ b/clients/gofs/gofs_socket.go @@ -38,6 +38,14 @@ func (c *functionContextImpl) warpContext(parent context.Context) context.Contex })) } +func passMetadataContext(from context.Context, to context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(from) + if ok { + return metadata.NewOutgoingContext(to, md) + } + return to +} + type fsRPCClient struct { grpcCli model.FunctionClient } @@ -80,20 +88,27 @@ func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error { return nil } -func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error { - _, err := c.grpcCli.Write(ctx, &model.Event{Payload: payload}) +func (c *fsRPCClient) Write(ctx context.Context, event Event[[]byte]) error { + _, err := c.grpcCli.Write(ctx, &model.Event{Payload: *event.Data()}) if err != nil { return fmt.Errorf("failed to write: %w", err) } - return nil + return event.Ack(ctx) } -func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) { +func (c *fsRPCClient) Read(ctx context.Context) (Event[[]byte], error) { res, err := c.grpcCli.Read(ctx, &model.ReadRequest{}) if err != nil { return nil, fmt.Errorf("failed to read: %w", err) } - return res.Payload, nil + return NewEventWithAck(&res.Payload, func(ackCtx context.Context) error { + if _, err := c.grpcCli.Ack(passMetadataContext(ctx, ackCtx), &model.AckRequest{ + Id: res.Id, + }); err != nil { + return err + } + return nil + }), nil } func (c *fsRPCClient) PutState(ctx context.Context, key string, value []byte) error { diff --git a/clients/gofs/gofs_wasmfs.go b/clients/gofs/gofs_wasmfs.go index bd9b143..7404b8a 100644 --- a/clients/gofs/gofs_wasmfs.go +++ b/clients/gofs/gofs_wasmfs.go @@ -67,11 +67,11 @@ func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error { return nil } -func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error { +func (c *fsRPCClient) Write(ctx context.Context, event Event[[]byte]) error { panic("rpc write not supported") } -func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) { +func (c *fsRPCClient) Read(ctx context.Context) (Event[[]byte], error) { panic("rpc read not supported") } @@ -99,7 +99,10 @@ func (c *fsRPCClient) loadModule(m *moduleWrapper) { if err != nil { return fmt.Errorf("failed to read: %w", err) } - outputPayload := m.processFunc(ctx, payload) + outputPayload, err := m.processFunc(ctx, payload) + if err != nil { + return fmt.Errorf("failed to process: %w", err) + } _, err = syscall.Write(processFd, outputPayload) if err != nil { return fmt.Errorf("failed to write: %w", err) diff --git a/fs/runtime/external/model/fs.pb.go b/fs/runtime/external/model/fs.pb.go index 7ad3703..fe093f7 100644 --- a/fs/runtime/external/model/fs.pb.go +++ b/fs/runtime/external/model/fs.pb.go @@ -163,7 +163,8 @@ type Event struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *Event) Reset() { @@ -198,6 +199,13 @@ func (*Event) Descriptor() ([]byte, []int) { return file_fs_runtime_external_model_fs_proto_rawDescGZIP(), []int{3} } +func (x *Event) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + func (x *Event) GetPayload() []byte { if x != nil { return x.Payload @@ -772,6 +780,91 @@ func (x *GetConfigResponse) GetConfig() map[string]string { return nil } +type AckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *AckRequest) Reset() { + *x = AckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckRequest) ProtoMessage() {} + +func (x *AckRequest) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[16] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckRequest.ProtoReflect.Descriptor instead. +func (*AckRequest) Descriptor() ([]byte, []int) { + return file_fs_runtime_external_model_fs_proto_rawDescGZIP(), []int{16} +} + +func (x *AckRequest) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type AckResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *AckResponse) Reset() { + *x = AckResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckResponse) ProtoMessage() {} + +func (x *AckResponse) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[17] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckResponse.ProtoReflect.Descriptor instead. +func (*AckResponse) Descriptor() ([]byte, []int) { + return file_fs_runtime_external_model_fs_proto_rawDescGZIP(), []int{17} +} + var File_fs_runtime_external_model_fs_proto protoreflect.FileDescriptor var file_fs_runtime_external_model_fs_proto_rawDesc = []byte{ @@ -783,9 +876,10 @@ var file_fs_runtime_external_model_fs_proto_rawDesc = []byte{ 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x22, 0x18, 0x0a, 0x16, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0d, 0x0a, 0x0b, - 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x21, 0x0a, 0x05, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x0f, + 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x31, 0x0a, 0x05, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x58, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, @@ -834,47 +928,53 @@ var file_fs_runtime_external_model_fs_proto_rawDesc = []byte{ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xd3, 0x04, 0x0a, 0x08, 0x46, 0x75, 0x6e, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x59, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x22, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x73, 0x5f, - 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x34, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x12, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x37, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x12, + 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1c, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x02, 0x69, 0x64, 0x22, 0x0d, 0x0a, 0x0b, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8d, 0x05, 0x0a, 0x08, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x59, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x12, 0x22, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x04, + 0x52, 0x65, 0x61, 0x64, 0x12, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x1a, 0x1a, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, - 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, - 0x0a, 0x08, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, - 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x6e, 0x74, 0x12, 0x37, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x12, 0x2e, 0x66, 0x73, + 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x1a, + 0x1a, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x03, 0x41, + 0x63, 0x6b, 0x12, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x66, 0x73, + 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x08, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, + 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x75, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, + 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, + 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, - 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, - 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x4d, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x12, 0x1e, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x50, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1f, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, 0x6c, - 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x20, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, - 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x4a, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1d, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, - 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x1b, 0x5a, - 0x19, 0x66, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x65, 0x78, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x53, + 0x74, 0x61, 0x74, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1f, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4a, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x1b, 0x5a, 0x19, 0x66, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, + 0x6d, 0x65, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x6f, 0x64, 0x65, + 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -889,7 +989,7 @@ func file_fs_runtime_external_model_fs_proto_rawDescGZIP() []byte { return file_fs_runtime_external_model_fs_proto_rawDescData } -var file_fs_runtime_external_model_fs_proto_msgTypes = make([]protoimpl.MessageInfo, 17) +var file_fs_runtime_external_model_fs_proto_msgTypes = make([]protoimpl.MessageInfo, 19) var file_fs_runtime_external_model_fs_proto_goTypes = []any{ (*RegisterSchemaRequest)(nil), // 0: fs_external.RegisterSchemaRequest (*RegisterSchemaResponse)(nil), // 1: fs_external.RegisterSchemaResponse @@ -907,32 +1007,36 @@ var file_fs_runtime_external_model_fs_proto_goTypes = []any{ (*DeleteStateResponse)(nil), // 13: fs_external.DeleteStateResponse (*GetConfigRequest)(nil), // 14: fs_external.GetConfigRequest (*GetConfigResponse)(nil), // 15: fs_external.GetConfigResponse - nil, // 16: fs_external.GetConfigResponse.ConfigEntry + (*AckRequest)(nil), // 16: fs_external.AckRequest + (*AckResponse)(nil), // 17: fs_external.AckResponse + nil, // 18: fs_external.GetConfigResponse.ConfigEntry } var file_fs_runtime_external_model_fs_proto_depIdxs = []int32{ 5, // 0: fs_external.GetStateRequest.context:type_name -> fs_external.StateContext 5, // 1: fs_external.PutStateRequest.context:type_name -> fs_external.StateContext 5, // 2: fs_external.ListStatesRequest.context:type_name -> fs_external.StateContext 5, // 3: fs_external.DeleteStateRequest.context:type_name -> fs_external.StateContext - 16, // 4: fs_external.GetConfigResponse.config:type_name -> fs_external.GetConfigResponse.ConfigEntry + 18, // 4: fs_external.GetConfigResponse.config:type_name -> fs_external.GetConfigResponse.ConfigEntry 0, // 5: fs_external.Function.RegisterSchema:input_type -> fs_external.RegisterSchemaRequest 2, // 6: fs_external.Function.Read:input_type -> fs_external.ReadRequest 3, // 7: fs_external.Function.Write:input_type -> fs_external.Event - 8, // 8: fs_external.Function.PutState:input_type -> fs_external.PutStateRequest - 6, // 9: fs_external.Function.GetState:input_type -> fs_external.GetStateRequest - 10, // 10: fs_external.Function.ListStates:input_type -> fs_external.ListStatesRequest - 12, // 11: fs_external.Function.DeleteState:input_type -> fs_external.DeleteStateRequest - 14, // 12: fs_external.Function.GetConfig:input_type -> fs_external.GetConfigRequest - 1, // 13: fs_external.Function.RegisterSchema:output_type -> fs_external.RegisterSchemaResponse - 3, // 14: fs_external.Function.Read:output_type -> fs_external.Event - 4, // 15: fs_external.Function.Write:output_type -> fs_external.WriteResponse - 9, // 16: fs_external.Function.PutState:output_type -> fs_external.PutStateResponse - 7, // 17: fs_external.Function.GetState:output_type -> fs_external.GetStateResponse - 11, // 18: fs_external.Function.ListStates:output_type -> fs_external.ListStatesResponse - 13, // 19: fs_external.Function.DeleteState:output_type -> fs_external.DeleteStateResponse - 15, // 20: fs_external.Function.GetConfig:output_type -> fs_external.GetConfigResponse - 13, // [13:21] is the sub-list for method output_type - 5, // [5:13] is the sub-list for method input_type + 16, // 8: fs_external.Function.Ack:input_type -> fs_external.AckRequest + 8, // 9: fs_external.Function.PutState:input_type -> fs_external.PutStateRequest + 6, // 10: fs_external.Function.GetState:input_type -> fs_external.GetStateRequest + 10, // 11: fs_external.Function.ListStates:input_type -> fs_external.ListStatesRequest + 12, // 12: fs_external.Function.DeleteState:input_type -> fs_external.DeleteStateRequest + 14, // 13: fs_external.Function.GetConfig:input_type -> fs_external.GetConfigRequest + 1, // 14: fs_external.Function.RegisterSchema:output_type -> fs_external.RegisterSchemaResponse + 3, // 15: fs_external.Function.Read:output_type -> fs_external.Event + 4, // 16: fs_external.Function.Write:output_type -> fs_external.WriteResponse + 17, // 17: fs_external.Function.Ack:output_type -> fs_external.AckResponse + 9, // 18: fs_external.Function.PutState:output_type -> fs_external.PutStateResponse + 7, // 19: fs_external.Function.GetState:output_type -> fs_external.GetStateResponse + 11, // 20: fs_external.Function.ListStates:output_type -> fs_external.ListStatesResponse + 13, // 21: fs_external.Function.DeleteState:output_type -> fs_external.DeleteStateResponse + 15, // 22: fs_external.Function.GetConfig:output_type -> fs_external.GetConfigResponse + 14, // [14:23] is the sub-list for method output_type + 5, // [5:14] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name 5, // [5:5] is the sub-list for extension extendee 0, // [0:5] is the sub-list for field type_name @@ -1136,6 +1240,30 @@ func file_fs_runtime_external_model_fs_proto_init() { return nil } } + file_fs_runtime_external_model_fs_proto_msgTypes[16].Exporter = func(v any, i int) any { + switch v := v.(*AckRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fs_runtime_external_model_fs_proto_msgTypes[17].Exporter = func(v any, i int) any { + switch v := v.(*AckResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1143,7 +1271,7 @@ func file_fs_runtime_external_model_fs_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_fs_runtime_external_model_fs_proto_rawDesc, NumEnums: 0, - NumMessages: 17, + NumMessages: 19, NumExtensions: 0, NumServices: 1, }, diff --git a/fs/runtime/external/model/fs.proto b/fs/runtime/external/model/fs.proto index e0ade86..1c97779 100644 --- a/fs/runtime/external/model/fs.proto +++ b/fs/runtime/external/model/fs.proto @@ -30,7 +30,8 @@ message ReadRequest { } message Event { - bytes payload = 1; + int64 id = 1; + bytes payload = 2; } message WriteResponse { @@ -87,10 +88,19 @@ message GetConfigResponse { map config = 1; } +message AckRequest { + int64 id = 1; +} + +message AckResponse { + +} + service Function { rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse); rpc Read(ReadRequest) returns (Event); rpc Write(Event) returns (WriteResponse); + rpc Ack(AckRequest) returns (AckResponse); rpc PutState(PutStateRequest) returns (PutStateResponse); rpc GetState(GetStateRequest) returns (GetStateResponse); rpc ListStates(ListStatesRequest) returns (ListStatesResponse); diff --git a/fs/runtime/external/model/fs_grpc.pb.go b/fs/runtime/external/model/fs_grpc.pb.go index 38668e9..4064443 100644 --- a/fs/runtime/external/model/fs_grpc.pb.go +++ b/fs/runtime/external/model/fs_grpc.pb.go @@ -21,6 +21,7 @@ type FunctionClient interface { RegisterSchema(ctx context.Context, in *RegisterSchemaRequest, opts ...grpc.CallOption) (*RegisterSchemaResponse, error) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*Event, error) Write(ctx context.Context, in *Event, opts ...grpc.CallOption) (*WriteResponse, error) + Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) PutState(ctx context.Context, in *PutStateRequest, opts ...grpc.CallOption) (*PutStateResponse, error) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) ListStates(ctx context.Context, in *ListStatesRequest, opts ...grpc.CallOption) (*ListStatesResponse, error) @@ -63,6 +64,15 @@ func (c *functionClient) Write(ctx context.Context, in *Event, opts ...grpc.Call return out, nil } +func (c *functionClient) Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) { + out := new(AckResponse) + err := c.cc.Invoke(ctx, "/fs_external.Function/Ack", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *functionClient) PutState(ctx context.Context, in *PutStateRequest, opts ...grpc.CallOption) (*PutStateResponse, error) { out := new(PutStateResponse) err := c.cc.Invoke(ctx, "/fs_external.Function/PutState", in, out, opts...) @@ -115,6 +125,7 @@ type FunctionServer interface { RegisterSchema(context.Context, *RegisterSchemaRequest) (*RegisterSchemaResponse, error) Read(context.Context, *ReadRequest) (*Event, error) Write(context.Context, *Event) (*WriteResponse, error) + Ack(context.Context, *AckRequest) (*AckResponse, error) PutState(context.Context, *PutStateRequest) (*PutStateResponse, error) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) ListStates(context.Context, *ListStatesRequest) (*ListStatesResponse, error) @@ -136,6 +147,9 @@ func (UnimplementedFunctionServer) Read(context.Context, *ReadRequest) (*Event, func (UnimplementedFunctionServer) Write(context.Context, *Event) (*WriteResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Write not implemented") } +func (UnimplementedFunctionServer) Ack(context.Context, *AckRequest) (*AckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ack not implemented") +} func (UnimplementedFunctionServer) PutState(context.Context, *PutStateRequest) (*PutStateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PutState not implemented") } @@ -218,6 +232,24 @@ func _Function_Write_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Function_Ack_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FunctionServer).Ack(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fs_external.Function/Ack", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FunctionServer).Ack(ctx, req.(*AckRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Function_PutState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PutStateRequest) if err := dec(in); err != nil { @@ -327,6 +359,10 @@ var Function_ServiceDesc = grpc.ServiceDesc{ MethodName: "Write", Handler: _Function_Write_Handler, }, + { + MethodName: "Ack", + Handler: _Function_Ack_Handler, + }, { MethodName: "PutState", Handler: _Function_PutState_Handler, diff --git a/fs/runtime/external/runtime.go b/fs/runtime/external/runtime.go index 7e56be2..e628bee 100644 --- a/fs/runtime/external/runtime.go +++ b/fs/runtime/external/runtime.go @@ -96,6 +96,15 @@ func (f *functionServerImpl) Write(ctx context.Context, event *model.Event) (*mo return &model.WriteResponse{}, nil } +func (f *functionServerImpl) Ack(ctx context.Context, request *model.AckRequest) (*model.AckResponse, error) { + r, err := f.getFunctionRuntime(ctx) + if err != nil { + return nil, err + } + r.Ack(request.Id) + return &model.AckResponse{}, nil +} + func (f *functionServerImpl) PutState( ctx context.Context, request *model.PutStateRequest) (*model.PutStateResponse, error) { r, err := f.getFunctionRuntime(ctx) @@ -229,6 +238,9 @@ type runtime struct { inputCh chan contube.Record funcCtx api.FunctionContext log *common.Logger + + recordsMapMu sync.Mutex + recordsMap map[int64]contube.Record } func (r *runtime) Call(e contube.Record) (contube.Record, error) { @@ -238,3 +250,14 @@ func (r *runtime) Call(e contube.Record) (contube.Record, error) { func (r *runtime) Stop() { } + +// Ack acknowledges the processing of a record +// This is an idempotent operation +func (r *runtime) Ack(id int64) { + r.recordsMapMu.Lock() + defer r.recordsMapMu.Unlock() + if record, ok := r.recordsMap[id]; ok { + record.Commit() + delete(r.recordsMap, id) + } +} diff --git a/fs/runtime/external/runtime_test.go b/fs/runtime/external/runtime_test.go index 629e454..db1794a 100644 --- a/fs/runtime/external/runtime_test.go +++ b/fs/runtime/external/runtime_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "os" + "sync/atomic" "testing" "github.com/functionstream/function-stream/fs/statestore" @@ -158,10 +159,14 @@ func TestExternalRuntime(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) + var acked atomic.Bool + event, err := contube.NewStructRecord(&Person{ Name: "test", Money: 1, - }, func() {}) + }, func() { + acked.Store(true) + }) assert.NoError(t, err) err = fm.ProduceEvent(inputTopic, event) assert.NoError(t, err) @@ -173,6 +178,8 @@ func TestExternalRuntime(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, p.Money) + assert.True(t, acked.Load()) + err = fm.DeleteFunction("", f.Name) assert.NoError(t, err) } From 9486f5f8b1ed24a2441b25d78ee967a8f482fd85 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Sun, 10 Nov 2024 20:55:20 +0800 Subject: [PATCH 5/5] feat: support event ack --- fs/contube/memory.go | 1 - fs/runtime/external/runtime.go | 36 +++++++++----- fs/runtime/external/runtime_test.go | 77 +++++++++++++++++++---------- 3 files changed, 74 insertions(+), 40 deletions(-) diff --git a/fs/contube/memory.go b/fs/contube/memory.go index b0a96fa..57ac7bd 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -131,7 +131,6 @@ func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMa if !ok { return } - event.Commit() c <- event } } diff --git a/fs/runtime/external/runtime.go b/fs/runtime/external/runtime.go index e628bee..9d7b467 100644 --- a/fs/runtime/external/runtime.go +++ b/fs/runtime/external/runtime.go @@ -70,19 +70,12 @@ func (f *functionServerImpl) RegisterSchema(ctx context.Context, return &model.RegisterSchemaResponse{}, nil } -func (f *functionServerImpl) Read(ctx context.Context, request *model.ReadRequest) (*model.Event, error) { +func (f *functionServerImpl) Read(ctx context.Context, _ *model.ReadRequest) (*model.Event, error) { r, err := f.getFunctionRuntime(ctx) if err != nil { return nil, err } - select { - case e := <-r.inputCh: - return &model.Event{ - Payload: e.GetPayload(), - }, nil - case <-ctx.Done(): - return nil, ctx.Err() - } + return r.ReadRecord(ctx) } func (f *functionServerImpl) Write(ctx context.Context, event *model.Event) (*model.WriteResponse, error) { @@ -182,9 +175,10 @@ func (f *Factory) NewFunctionRuntime(instance api.FunctionInstance, _ *funcModel.RuntimeConfig) (api.FunctionRuntime, error) { def := instance.Definition() r := &runtime{ - inputCh: make(chan contube.Record), - funcCtx: instance.FunctionContext(), - log: instance.Logger(), + inputCh: make(chan contube.Record), + funcCtx: instance.FunctionContext(), + log: instance.Logger(), + recordsMap: make(map[int64]contube.Record), } f.server.runtimeMaps.Store(common.GetNamespacedName(def.Namespace, def.Name).String(), r) f.log.Info("Creating new function runtime", "function", common.GetNamespacedName(def.Namespace, def.Name)) @@ -240,6 +234,7 @@ type runtime struct { log *common.Logger recordsMapMu sync.Mutex + recordIndex int64 recordsMap map[int64]contube.Record } @@ -251,6 +246,23 @@ func (r *runtime) Call(e contube.Record) (contube.Record, error) { func (r *runtime) Stop() { } +func (r *runtime) ReadRecord(ctx context.Context) (*model.Event, error) { + select { + case e := <-r.inputCh: + r.recordsMapMu.Lock() + defer r.recordsMapMu.Unlock() + eventId := r.recordIndex + r.recordIndex++ + r.recordsMap[eventId] = e + return &model.Event{ + Id: eventId, + Payload: e.GetPayload(), + }, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + // Ack acknowledges the processing of a record // This is an idempotent operation func (r *runtime) Ack(id int64) { diff --git a/fs/runtime/external/runtime_test.go b/fs/runtime/external/runtime_test.go index db1794a..06e11c4 100644 --- a/fs/runtime/external/runtime_test.go +++ b/fs/runtime/external/runtime_test.go @@ -22,8 +22,8 @@ import ( "fmt" "net" "os" - "sync/atomic" "testing" + "time" "github.com/functionstream/function-stream/fs/statestore" @@ -98,11 +98,30 @@ func (f *TestSource) Handle(_ gofs.FunctionContext, emit func(context.Context, g return nil } -func runMockClient() { +type TestModules struct { + testFunction *TestFunction + testCounter *TestCounterFunction + testSource *TestSource + testSink *TestSink +} + +func NewTestModules() *TestModules { + return &TestModules{ + testFunction: &TestFunction{}, + testCounter: &TestCounterFunction{}, + testSource: &TestSource{}, + testSink: &TestSink{ + sinkCh: make(chan Counter), + }, + } +} + +func (t *TestModules) Run() { err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.WithFunction(&TestFunction{})). - Register("counter", gofs.WithFunction(&TestCounterFunction{})). - Register("test-source", gofs.WithSource(&TestSource{})). + Register(gofs.DefaultModule, gofs.WithFunction(t.testFunction)). + Register("counter", gofs.WithFunction(t.testCounter)). + Register("test-source", gofs.WithSource(t.testSource)). + Register("test-sink", gofs.WithSink(t.testSink)). Run() if err != nil { log.Error(err, "failed to run mock client") @@ -129,7 +148,7 @@ func TestExternalRuntime(t *testing.T) { t.Fatal(err) } - go runMockClient() + go NewTestModules().Run() inputTopic := "input" outputTopic := "output" @@ -159,13 +178,13 @@ func TestExternalRuntime(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - var acked atomic.Bool + acked := make(chan struct{}) event, err := contube.NewStructRecord(&Person{ Name: "test", Money: 1, }, func() { - acked.Store(true) + acked <- struct{}{} }) assert.NoError(t, err) err = fm.ProduceEvent(inputTopic, event) @@ -178,7 +197,11 @@ func TestExternalRuntime(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, p.Money) - assert.True(t, acked.Load()) + select { + case <-acked: + case <-time.After(5 * time.Second): + t.Fatal("failed to ack event") + } err = fm.DeleteFunction("", f.Name) assert.NoError(t, err) @@ -233,7 +256,7 @@ func TestNonDefaultModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - go runMockClient() + go NewTestModules().Run() event, err := contube.NewStructRecord(&Counter{ Count: 1, @@ -298,7 +321,7 @@ func TestExternalSourceModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - go runMockClient() + go NewTestModules().Run() for i := 0; i < 10; i++ { output, err := fm.ConsumeEvent(outputTopic) @@ -322,15 +345,9 @@ func (f *TestSink) Init(_ gofs.FunctionContext) error { return nil } -func (f *TestSink) Handle(_ gofs.FunctionContext, event gofs.Event[Counter]) error { +func (f *TestSink) Handle(ctx gofs.FunctionContext, event gofs.Event[Counter]) error { f.sinkCh <- *event.Data() - return nil -} - -func newTestSink() *TestSink { - return &TestSink{ - sinkCh: make(chan Counter), - } + return event.Ack(ctx) } func TestExternalSinkModule(t *testing.T) { @@ -379,18 +396,18 @@ func TestExternalSinkModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - sinkMod := newTestSink() + testMods := NewTestModules() + sinkMod := testMods.testSink - go func() { - err := gofs.NewFSClient().Register("test-sink", gofs.WithSink(sinkMod)).Run() - if err != nil { - log.Error(err, "failed to run mock client") - } - }() + go testMods.Run() + + ackCh := make(chan struct{}, 100) event, err := contube.NewStructRecord(&Counter{ Count: 1, - }, func() {}) + }, func() { + ackCh <- struct{}{} + }) assert.NoError(t, err) err = fm.ProduceEvent(inputTopic, event) assert.NoError(t, err) @@ -398,6 +415,12 @@ func TestExternalSinkModule(t *testing.T) { r := <-sinkMod.sinkCh assert.Equal(t, 1, r.Count) + select { + case <-ackCh: + case <-time.After(5 * time.Second): + t.Fatal("failed to ack event") + } + err = fm.DeleteFunction("", f.Name) assert.NoError(t, err) }