diff --git a/clients/gofs/api.go b/clients/gofs/api.go new file mode 100644 index 0000000..6ced2b8 --- /dev/null +++ b/clients/gofs/api.go @@ -0,0 +1,156 @@ +/* + * Copyright 2024 DefFunction Stream Org. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gofs + +import "context" + +type FunctionContext interface { + context.Context + GetState(ctx context.Context, key string) ([]byte, error) + PutState(ctx context.Context, key string, value []byte) error + Write(ctx context.Context, rawEvent Event[[]byte]) error + Read(ctx context.Context) (Event[[]byte], error) + GetConfig(ctx context.Context) (map[string]string, error) +} + +type Event[T any] interface { + Data() *T + Ack(ctx context.Context) error +} + +type BaseModule interface { + Init(ctx FunctionContext) error +} + +type Function[I any, O any] interface { + BaseModule + Handle(ctx FunctionContext, event Event[I]) (Event[O], error) +} + +type Source[O any] interface { + BaseModule + Handle(ctx FunctionContext, emit func(context.Context, Event[O]) error) error +} + +type Sink[I any] interface { + BaseModule + Handle(ctx FunctionContext, event Event[I]) error +} + +type Custom interface { + BaseModule + Handle(ctx FunctionContext) error +} + +type eventImpl[T any] struct { + data *T + ackFunc func(context.Context) error +} + +func NewEvent[T any](data *T) Event[T] { + return NewEventWithAck(data, nil) +} + +func NewEventWithAck[T any](data *T, ack func(ctx context.Context) error) Event[T] { + return &eventImpl[T]{ + data: data, + ackFunc: ack, + } +} + +func (e *eventImpl[T]) Data() *T { + return e.data +} + +func (e *eventImpl[T]) Ack(ctx context.Context) error { + if e.ackFunc != nil { + return e.ackFunc(ctx) + } + return nil +} + +type simpleFunction[I any, O any] struct { + handle func(ctx FunctionContext, event Event[I]) (Event[O], error) +} + +func NewSimpleFunction[I any, O any](handle func(ctx FunctionContext, event Event[I]) (Event[O], error)) Function[I, O] { + return &simpleFunction[I, O]{ + handle: handle, + } +} + +func (f *simpleFunction[I, O]) Init(_ FunctionContext) error { + return nil +} + +func (f *simpleFunction[I, O]) Handle(ctx FunctionContext, event Event[I]) (Event[O], error) { + return f.handle(ctx, event) +} + +type simpleSource[O any] struct { + handle func(ctx FunctionContext, emit func(context.Context, Event[O]) error) error +} + +func NewSimpleSource[O any](handle func(ctx FunctionContext, emit func(context.Context, Event[O]) error) error) Source[O] { + return &simpleSource[O]{ + handle: handle, + } +} + +func (s *simpleSource[O]) Init(_ FunctionContext) error { + return nil +} + +func (s *simpleSource[O]) Handle(ctx FunctionContext, emit func(context.Context, Event[O]) error) error { + return s.handle(ctx, emit) +} + +type simpleSink[I any] struct { + handle func(ctx FunctionContext, event Event[I]) error +} + +func NewSimpleSink[I any](handle func(ctx FunctionContext, event Event[I]) error) Sink[I] { + return &simpleSink[I]{ + handle: handle, + } +} + +func (s *simpleSink[I]) Init(_ FunctionContext) error { + return nil +} + +func (s *simpleSink[I]) Handle(ctx FunctionContext, event Event[I]) error { + return s.handle(ctx, event) +} + +type simpleCustom struct { + handle func(ctx FunctionContext) error +} + +func NewSimpleCustom(handle func(ctx FunctionContext) error) Custom { + return &simpleCustom{ + handle: handle, + } +} + +func (c *simpleCustom) Init(_ FunctionContext) error { + return nil +} + +func (c *simpleCustom) Handle(ctx FunctionContext) error { + return c.handle(ctx) +} diff --git a/clients/gofs/gofs.go b/clients/gofs/gofs.go index f6caee2..b17279d 100644 --- a/clients/gofs/gofs.go +++ b/clients/gofs/gofs.go @@ -19,6 +19,7 @@ package gofs import ( "context" "encoding/json" + "errors" "fmt" "os" "sync" @@ -67,16 +68,17 @@ func NewFSClient() FSClient { type moduleWrapper struct { *fsClient - processFunc func(context.Context, []byte) []byte // Only for Function - executeFunc func(context.Context) error - initFunc func(context.Context) error + ctx *functionContextImpl + processFunc func(FunctionContext, []byte) ([]byte, error) // Only for Wasm Function + executeFunc func(FunctionContext) error + initFunc func(FunctionContext) error registerErr error } -func (m *moduleWrapper) AddInitFunc(initFunc func(context.Context) error) *moduleWrapper { +func (m *moduleWrapper) AddInitFunc(initFunc func(FunctionContext) error) *moduleWrapper { parentInit := m.initFunc if parentInit != nil { - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { err := parentInit(ctx) if err != nil { return err @@ -107,19 +109,25 @@ func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient { return c } -func Function[I any, O any](process func(context.Context, *I) *O) *moduleWrapper { - processFunc := func(ctx context.Context, payload []byte) []byte { +func WithFunction[I any, O any](function Function[I, O]) *moduleWrapper { + m := &moduleWrapper{} + processFunc := func(ctx FunctionContext, payload []byte) ([]byte, error) { // This is only for the wasm function input := new(I) err := json.Unmarshal(payload, input) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) + return nil, fmt.Errorf("failed to parse JSON: %w", err) + } + output, err := function.Handle(ctx, NewEvent(input)) + if err != nil { + return nil, err + } + outputPayload, err := json.Marshal(output.Data()) + if err != nil { + return nil, fmt.Errorf("failed to marshal JSON: %w", err) } - output := process(ctx, input) - outputPayload, _ := json.Marshal(output) - return outputPayload + return outputPayload, nil } - m := &moduleWrapper{} - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { outputSchema, err := avroschema.Reflect(new(O)) if err != nil { return err @@ -128,20 +136,34 @@ func Function[I any, O any](process func(context.Context, *I) *O) *moduleWrapper if err != nil { return fmt.Errorf("failed to register schema: %w", err) } - return nil + return function.Init(ctx) } - m.executeFunc = func(ctx context.Context) error { + m.executeFunc = func(ctx FunctionContext) error { for { - inputPayload, err := m.rpc.Read(ctx) + inputPayload, err := ctx.Read(ctx) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) time.Sleep(3 * time.Second) continue } - outputPayload := processFunc(ctx, inputPayload) - err = m.rpc.Write(ctx, outputPayload) + input := new(I) + err = json.Unmarshal(*inputPayload.Data(), input) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err) + return fmt.Errorf("failed to parse JSON: %w", err) + } + output, err := function.Handle(ctx, NewEventWithAck(input, inputPayload.Ack)) + if err != nil { + return err + } + outputPayload, err := json.Marshal(output.Data()) + if err != nil { + return fmt.Errorf("failed to marshal JSON: %w", err) + } + err = ctx.Write(ctx, NewEventWithAck(&outputPayload, func(ctx context.Context) error { + return errors.Join(inputPayload.Ack(ctx), output.Ack(ctx)) + })) + if err != nil { + return err } } } @@ -149,13 +171,13 @@ func Function[I any, O any](process func(context.Context, *I) *O) *moduleWrapper return m } -func Source[O any](process func(ctx context.Context, emit func(context.Context, *O) error)) *moduleWrapper { +func WithSource[O any](source Source[O]) *moduleWrapper { m := &moduleWrapper{} - emit := func(ctx context.Context, event *O) error { - outputPayload, _ := json.Marshal(event) - return m.rpc.Write(ctx, outputPayload) + emit := func(ctx context.Context, event Event[O]) error { + outputPayload, _ := json.Marshal(event.Data()) + return m.ctx.Write(ctx, NewEventWithAck(&outputPayload, event.Ack)) } - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { outputSchema, err := avroschema.Reflect(new(O)) if err != nil { return err @@ -164,26 +186,17 @@ func Source[O any](process func(ctx context.Context, emit func(context.Context, if err != nil { return fmt.Errorf("failed to register schema: %w", err) } - return nil + return source.Init(ctx) } - m.executeFunc = func(ctx context.Context) error { - process(ctx, emit) - return nil + m.executeFunc = func(ctx FunctionContext) error { + return source.Handle(ctx, emit) } return m } -func Sink[I any](process func(context.Context, *I)) *moduleWrapper { - processFunc := func(ctx context.Context, payload []byte) { - input := new(I) - err := json.Unmarshal(payload, input) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload) - } - process(ctx, input) - } +func WithSink[I any](sink Sink[I]) *moduleWrapper { m := &moduleWrapper{} - m.initFunc = func(ctx context.Context) error { + m.initFunc = func(ctx FunctionContext) error { inputSchema, err := avroschema.Reflect(new(I)) if err != nil { return err @@ -192,61 +205,69 @@ func Sink[I any](process func(context.Context, *I)) *moduleWrapper { if err != nil { return fmt.Errorf("failed to register schema: %w", err) } - return nil + return sink.Init(ctx) } - m.executeFunc = func(ctx context.Context) error { + m.executeFunc = func(ctx FunctionContext) error { for { - inputPayload, err := m.rpc.Read(ctx) + inputPayload, err := ctx.Read(ctx) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err) time.Sleep(3 * time.Second) continue } - processFunc(ctx, inputPayload) + input := new(I) + err = json.Unmarshal(*inputPayload.Data(), input) + if err != nil { + return fmt.Errorf("failed to parse JSON: %w", err) + } + if err = sink.Handle(ctx, NewEventWithAck(input, inputPayload.Ack)); err != nil { + return err + } } } return m } -func Custom(init func(ctx context.Context) error, execute func(ctx context.Context) error) *moduleWrapper { +func WithCustom(custom Custom) *moduleWrapper { return &moduleWrapper{ - initFunc: init, - executeFunc: execute, + initFunc: func(ctx FunctionContext) error { + return custom.Init(ctx) + }, + executeFunc: func(ctx FunctionContext) error { + return custom.Handle(ctx) + }, } } -type FunctionContext struct { +type functionContextImpl struct { + context.Context c *fsClient name string module string } -func (c *FunctionContext) GetState(ctx context.Context, key string) ([]byte, error) { +func (c *functionContextImpl) GetState(ctx context.Context, key string) ([]byte, error) { return c.c.rpc.GetState(c.warpContext(ctx), key) } -func (c *FunctionContext) PutState(ctx context.Context, key string, value []byte) error { +func (c *functionContextImpl) PutState(ctx context.Context, key string, value []byte) error { return c.c.rpc.PutState(c.warpContext(ctx), key, value) } -func (c *FunctionContext) Write(ctx context.Context, payload []byte) error { - return c.c.rpc.Write(c.warpContext(ctx), payload) +func (c *functionContextImpl) Write(ctx context.Context, rawEvent Event[[]byte]) error { + return c.c.rpc.Write(c.warpContext(ctx), rawEvent) } -func (c *FunctionContext) Read(ctx context.Context) ([]byte, error) { +func (c *functionContextImpl) Read(ctx context.Context) (Event[[]byte], error) { return c.c.rpc.Read(c.warpContext(ctx)) } -func (c *FunctionContext) GetConfig(ctx context.Context) (map[string]string, error) { +func (c *functionContextImpl) GetConfig(ctx context.Context) (map[string]string, error) { return c.c.rpc.GetConfig(c.warpContext(ctx)) } type funcCtxKey struct{} -func GetFunctionContext(ctx context.Context) *FunctionContext { - return ctx.Value(funcCtxKey{}).(*FunctionContext) -} - func (c *fsClient) Run() error { if c.err != nil { return c.err @@ -271,7 +292,7 @@ func (c *fsClient) Run() error { if !ok { return fmt.Errorf("module %s not found", module) } - funcCtx := &FunctionContext{c: c, name: funcName, module: module} + funcCtx := &functionContextImpl{c: c, name: funcName, module: module} if c.rpc == nil { rpc, err := newFSRPCClient() if err != nil { @@ -280,8 +301,10 @@ func (c *fsClient) Run() error { c.rpc = rpc } ctx := funcCtx.warpContext(context.WithValue(context.Background(), funcCtxKey{}, funcCtx)) + funcCtx.Context = ctx m.fsClient = c - err := m.initFunc(ctx) + m.ctx = funcCtx + err := m.initFunc(funcCtx) if err != nil { return err } @@ -289,7 +312,7 @@ func (c *fsClient) Run() error { if c.rpc.skipExecuting() { return nil } - return m.executeFunc(ctx) + return m.executeFunc(funcCtx) } func (c *fsClient) Error() string { diff --git a/clients/gofs/gofs_socket.go b/clients/gofs/gofs_socket.go index fd9908d..a474e6a 100644 --- a/clients/gofs/gofs_socket.go +++ b/clients/gofs/gofs_socket.go @@ -32,12 +32,20 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func (c *FunctionContext) warpContext(parent context.Context) context.Context { +func (c *functionContextImpl) warpContext(parent context.Context) context.Context { return metadata.NewOutgoingContext(parent, metadata.New(map[string]string{ "name": c.name, })) } +func passMetadataContext(from context.Context, to context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(from) + if ok { + return metadata.NewOutgoingContext(to, md) + } + return to +} + type fsRPCClient struct { grpcCli model.FunctionClient } @@ -80,20 +88,27 @@ func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error { return nil } -func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error { - _, err := c.grpcCli.Write(ctx, &model.Event{Payload: payload}) +func (c *fsRPCClient) Write(ctx context.Context, event Event[[]byte]) error { + _, err := c.grpcCli.Write(ctx, &model.Event{Payload: *event.Data()}) if err != nil { return fmt.Errorf("failed to write: %w", err) } - return nil + return event.Ack(ctx) } -func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) { +func (c *fsRPCClient) Read(ctx context.Context) (Event[[]byte], error) { res, err := c.grpcCli.Read(ctx, &model.ReadRequest{}) if err != nil { return nil, fmt.Errorf("failed to read: %w", err) } - return res.Payload, nil + return NewEventWithAck(&res.Payload, func(ackCtx context.Context) error { + if _, err := c.grpcCli.Ack(passMetadataContext(ctx, ackCtx), &model.AckRequest{ + Id: res.Id, + }); err != nil { + return err + } + return nil + }), nil } func (c *fsRPCClient) PutState(ctx context.Context, key string, value []byte) error { diff --git a/clients/gofs/gofs_wasmfs.go b/clients/gofs/gofs_wasmfs.go index afd7313..7404b8a 100644 --- a/clients/gofs/gofs_wasmfs.go +++ b/clients/gofs/gofs_wasmfs.go @@ -42,13 +42,13 @@ func process() { if runningModule == nil { panic("no module loaded") } - err := runningModule.executeFunc(context.Background()) + err := runningModule.executeFunc(runningModule.ctx) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } } -func (c *FunctionContext) warpContext(parent context.Context) context.Context { +func (c *functionContextImpl) warpContext(parent context.Context) context.Context { return parent } @@ -67,11 +67,11 @@ func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error { return nil } -func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error { +func (c *fsRPCClient) Write(ctx context.Context, event Event[[]byte]) error { panic("rpc write not supported") } -func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) { +func (c *fsRPCClient) Read(ctx context.Context) (Event[[]byte], error) { panic("rpc read not supported") } @@ -91,7 +91,7 @@ func (c *fsRPCClient) loadModule(m *moduleWrapper) { if m.processFunc == nil { panic("only function module is supported for the wasm runtime") } - m.executeFunc = func(ctx context.Context) error { + m.executeFunc = func(ctx FunctionContext) error { var stat syscall.Stat_t syscall.Fstat(processFd, &stat) payload := make([]byte, stat.Size) @@ -99,7 +99,10 @@ func (c *fsRPCClient) loadModule(m *moduleWrapper) { if err != nil { return fmt.Errorf("failed to read: %w", err) } - outputPayload := m.processFunc(ctx, payload) + outputPayload, err := m.processFunc(ctx, payload) + if err != nil { + return fmt.Errorf("failed to process: %w", err) + } _, err = syscall.Write(processFd, outputPayload) if err != nil { return fmt.Errorf("failed to write: %w", err) diff --git a/examples/basic/main.go b/examples/basic/main.go index 8178ea1..3875553 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -17,7 +17,6 @@ package main import ( - "context" "github.com/functionstream/function-stream/clients/gofs" "log/slog" ) @@ -25,7 +24,7 @@ import ( func main() { slog.Info("Hello from Go function!") err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.Function(myProcess)). + Register(gofs.DefaultModule, gofs.WithFunction(gofs.NewSimpleFunction(myProcess))). Run() if err != nil { slog.Error(err.Error()) @@ -38,7 +37,8 @@ type Person struct { Expected int `json:"expected"` } -func myProcess(ctx context.Context, person *Person) *Person { +func myProcess(ctx gofs.FunctionContext, e gofs.Event[Person]) (gofs.Event[Person], error) { + person := e.Data() person.Money += 1 - return person + return gofs.NewEvent(person), nil } diff --git a/fs/contube/memory.go b/fs/contube/memory.go index b0a96fa..57ac7bd 100644 --- a/fs/contube/memory.go +++ b/fs/contube/memory.go @@ -131,7 +131,6 @@ func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMa if !ok { return } - event.Commit() c <- event } } diff --git a/fs/runtime/external/model/fs.pb.go b/fs/runtime/external/model/fs.pb.go index 7ad3703..fe093f7 100644 --- a/fs/runtime/external/model/fs.pb.go +++ b/fs/runtime/external/model/fs.pb.go @@ -163,7 +163,8 @@ type Event struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *Event) Reset() { @@ -198,6 +199,13 @@ func (*Event) Descriptor() ([]byte, []int) { return file_fs_runtime_external_model_fs_proto_rawDescGZIP(), []int{3} } +func (x *Event) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + func (x *Event) GetPayload() []byte { if x != nil { return x.Payload @@ -772,6 +780,91 @@ func (x *GetConfigResponse) GetConfig() map[string]string { return nil } +type AckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *AckRequest) Reset() { + *x = AckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckRequest) ProtoMessage() {} + +func (x *AckRequest) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[16] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckRequest.ProtoReflect.Descriptor instead. +func (*AckRequest) Descriptor() ([]byte, []int) { + return file_fs_runtime_external_model_fs_proto_rawDescGZIP(), []int{16} +} + +func (x *AckRequest) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +type AckResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *AckResponse) Reset() { + *x = AckResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AckResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckResponse) ProtoMessage() {} + +func (x *AckResponse) ProtoReflect() protoreflect.Message { + mi := &file_fs_runtime_external_model_fs_proto_msgTypes[17] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckResponse.ProtoReflect.Descriptor instead. +func (*AckResponse) Descriptor() ([]byte, []int) { + return file_fs_runtime_external_model_fs_proto_rawDescGZIP(), []int{17} +} + var File_fs_runtime_external_model_fs_proto protoreflect.FileDescriptor var file_fs_runtime_external_model_fs_proto_rawDesc = []byte{ @@ -783,9 +876,10 @@ var file_fs_runtime_external_model_fs_proto_rawDesc = []byte{ 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x22, 0x18, 0x0a, 0x16, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0d, 0x0a, 0x0b, - 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x21, 0x0a, 0x05, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x0f, + 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x31, 0x0a, 0x05, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0e, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x58, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, @@ -834,47 +928,53 @@ var file_fs_runtime_external_model_fs_proto_rawDesc = []byte{ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xd3, 0x04, 0x0a, 0x08, 0x46, 0x75, 0x6e, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x59, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, - 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x22, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, - 0x65, 0x6d, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x73, 0x5f, - 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x34, 0x0a, 0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, - 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x12, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x37, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x12, + 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1c, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x02, 0x69, 0x64, 0x22, 0x0d, 0x0a, 0x0b, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8d, 0x05, 0x0a, 0x08, 0x46, 0x75, 0x6e, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x59, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x12, 0x22, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x63, + 0x68, 0x65, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x04, + 0x52, 0x65, 0x61, 0x64, 0x12, 0x18, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x1a, 0x1a, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, - 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, - 0x0a, 0x08, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, - 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x6e, 0x74, 0x12, 0x37, 0x0a, 0x05, 0x57, 0x72, 0x69, 0x74, 0x65, 0x12, 0x12, 0x2e, 0x66, 0x73, + 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x1a, + 0x1a, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x57, 0x72, + 0x69, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x03, 0x41, + 0x63, 0x6b, 0x12, 0x17, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x66, 0x73, + 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x08, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, + 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x75, + 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, + 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, + 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, - 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x50, 0x75, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, - 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, - 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x4d, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x12, 0x1e, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x50, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1f, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, 0x6c, - 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x20, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, - 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x4a, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1d, - 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, - 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x1b, 0x5a, - 0x19, 0x66, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x65, 0x78, 0x74, 0x65, - 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x53, + 0x74, 0x61, 0x74, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x50, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1f, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, + 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4a, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1d, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x73, 0x5f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, + 0x61, 0x6c, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x1b, 0x5a, 0x19, 0x66, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, 0x69, + 0x6d, 0x65, 0x2f, 0x65, 0x78, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x6d, 0x6f, 0x64, 0x65, + 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -889,7 +989,7 @@ func file_fs_runtime_external_model_fs_proto_rawDescGZIP() []byte { return file_fs_runtime_external_model_fs_proto_rawDescData } -var file_fs_runtime_external_model_fs_proto_msgTypes = make([]protoimpl.MessageInfo, 17) +var file_fs_runtime_external_model_fs_proto_msgTypes = make([]protoimpl.MessageInfo, 19) var file_fs_runtime_external_model_fs_proto_goTypes = []any{ (*RegisterSchemaRequest)(nil), // 0: fs_external.RegisterSchemaRequest (*RegisterSchemaResponse)(nil), // 1: fs_external.RegisterSchemaResponse @@ -907,32 +1007,36 @@ var file_fs_runtime_external_model_fs_proto_goTypes = []any{ (*DeleteStateResponse)(nil), // 13: fs_external.DeleteStateResponse (*GetConfigRequest)(nil), // 14: fs_external.GetConfigRequest (*GetConfigResponse)(nil), // 15: fs_external.GetConfigResponse - nil, // 16: fs_external.GetConfigResponse.ConfigEntry + (*AckRequest)(nil), // 16: fs_external.AckRequest + (*AckResponse)(nil), // 17: fs_external.AckResponse + nil, // 18: fs_external.GetConfigResponse.ConfigEntry } var file_fs_runtime_external_model_fs_proto_depIdxs = []int32{ 5, // 0: fs_external.GetStateRequest.context:type_name -> fs_external.StateContext 5, // 1: fs_external.PutStateRequest.context:type_name -> fs_external.StateContext 5, // 2: fs_external.ListStatesRequest.context:type_name -> fs_external.StateContext 5, // 3: fs_external.DeleteStateRequest.context:type_name -> fs_external.StateContext - 16, // 4: fs_external.GetConfigResponse.config:type_name -> fs_external.GetConfigResponse.ConfigEntry + 18, // 4: fs_external.GetConfigResponse.config:type_name -> fs_external.GetConfigResponse.ConfigEntry 0, // 5: fs_external.Function.RegisterSchema:input_type -> fs_external.RegisterSchemaRequest 2, // 6: fs_external.Function.Read:input_type -> fs_external.ReadRequest 3, // 7: fs_external.Function.Write:input_type -> fs_external.Event - 8, // 8: fs_external.Function.PutState:input_type -> fs_external.PutStateRequest - 6, // 9: fs_external.Function.GetState:input_type -> fs_external.GetStateRequest - 10, // 10: fs_external.Function.ListStates:input_type -> fs_external.ListStatesRequest - 12, // 11: fs_external.Function.DeleteState:input_type -> fs_external.DeleteStateRequest - 14, // 12: fs_external.Function.GetConfig:input_type -> fs_external.GetConfigRequest - 1, // 13: fs_external.Function.RegisterSchema:output_type -> fs_external.RegisterSchemaResponse - 3, // 14: fs_external.Function.Read:output_type -> fs_external.Event - 4, // 15: fs_external.Function.Write:output_type -> fs_external.WriteResponse - 9, // 16: fs_external.Function.PutState:output_type -> fs_external.PutStateResponse - 7, // 17: fs_external.Function.GetState:output_type -> fs_external.GetStateResponse - 11, // 18: fs_external.Function.ListStates:output_type -> fs_external.ListStatesResponse - 13, // 19: fs_external.Function.DeleteState:output_type -> fs_external.DeleteStateResponse - 15, // 20: fs_external.Function.GetConfig:output_type -> fs_external.GetConfigResponse - 13, // [13:21] is the sub-list for method output_type - 5, // [5:13] is the sub-list for method input_type + 16, // 8: fs_external.Function.Ack:input_type -> fs_external.AckRequest + 8, // 9: fs_external.Function.PutState:input_type -> fs_external.PutStateRequest + 6, // 10: fs_external.Function.GetState:input_type -> fs_external.GetStateRequest + 10, // 11: fs_external.Function.ListStates:input_type -> fs_external.ListStatesRequest + 12, // 12: fs_external.Function.DeleteState:input_type -> fs_external.DeleteStateRequest + 14, // 13: fs_external.Function.GetConfig:input_type -> fs_external.GetConfigRequest + 1, // 14: fs_external.Function.RegisterSchema:output_type -> fs_external.RegisterSchemaResponse + 3, // 15: fs_external.Function.Read:output_type -> fs_external.Event + 4, // 16: fs_external.Function.Write:output_type -> fs_external.WriteResponse + 17, // 17: fs_external.Function.Ack:output_type -> fs_external.AckResponse + 9, // 18: fs_external.Function.PutState:output_type -> fs_external.PutStateResponse + 7, // 19: fs_external.Function.GetState:output_type -> fs_external.GetStateResponse + 11, // 20: fs_external.Function.ListStates:output_type -> fs_external.ListStatesResponse + 13, // 21: fs_external.Function.DeleteState:output_type -> fs_external.DeleteStateResponse + 15, // 22: fs_external.Function.GetConfig:output_type -> fs_external.GetConfigResponse + 14, // [14:23] is the sub-list for method output_type + 5, // [5:14] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name 5, // [5:5] is the sub-list for extension extendee 0, // [0:5] is the sub-list for field type_name @@ -1136,6 +1240,30 @@ func file_fs_runtime_external_model_fs_proto_init() { return nil } } + file_fs_runtime_external_model_fs_proto_msgTypes[16].Exporter = func(v any, i int) any { + switch v := v.(*AckRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_fs_runtime_external_model_fs_proto_msgTypes[17].Exporter = func(v any, i int) any { + switch v := v.(*AckResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1143,7 +1271,7 @@ func file_fs_runtime_external_model_fs_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_fs_runtime_external_model_fs_proto_rawDesc, NumEnums: 0, - NumMessages: 17, + NumMessages: 19, NumExtensions: 0, NumServices: 1, }, diff --git a/fs/runtime/external/model/fs.proto b/fs/runtime/external/model/fs.proto index e0ade86..1c97779 100644 --- a/fs/runtime/external/model/fs.proto +++ b/fs/runtime/external/model/fs.proto @@ -30,7 +30,8 @@ message ReadRequest { } message Event { - bytes payload = 1; + int64 id = 1; + bytes payload = 2; } message WriteResponse { @@ -87,10 +88,19 @@ message GetConfigResponse { map config = 1; } +message AckRequest { + int64 id = 1; +} + +message AckResponse { + +} + service Function { rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse); rpc Read(ReadRequest) returns (Event); rpc Write(Event) returns (WriteResponse); + rpc Ack(AckRequest) returns (AckResponse); rpc PutState(PutStateRequest) returns (PutStateResponse); rpc GetState(GetStateRequest) returns (GetStateResponse); rpc ListStates(ListStatesRequest) returns (ListStatesResponse); diff --git a/fs/runtime/external/model/fs_grpc.pb.go b/fs/runtime/external/model/fs_grpc.pb.go index 38668e9..4064443 100644 --- a/fs/runtime/external/model/fs_grpc.pb.go +++ b/fs/runtime/external/model/fs_grpc.pb.go @@ -21,6 +21,7 @@ type FunctionClient interface { RegisterSchema(ctx context.Context, in *RegisterSchemaRequest, opts ...grpc.CallOption) (*RegisterSchemaResponse, error) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*Event, error) Write(ctx context.Context, in *Event, opts ...grpc.CallOption) (*WriteResponse, error) + Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) PutState(ctx context.Context, in *PutStateRequest, opts ...grpc.CallOption) (*PutStateResponse, error) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) ListStates(ctx context.Context, in *ListStatesRequest, opts ...grpc.CallOption) (*ListStatesResponse, error) @@ -63,6 +64,15 @@ func (c *functionClient) Write(ctx context.Context, in *Event, opts ...grpc.Call return out, nil } +func (c *functionClient) Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) { + out := new(AckResponse) + err := c.cc.Invoke(ctx, "/fs_external.Function/Ack", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *functionClient) PutState(ctx context.Context, in *PutStateRequest, opts ...grpc.CallOption) (*PutStateResponse, error) { out := new(PutStateResponse) err := c.cc.Invoke(ctx, "/fs_external.Function/PutState", in, out, opts...) @@ -115,6 +125,7 @@ type FunctionServer interface { RegisterSchema(context.Context, *RegisterSchemaRequest) (*RegisterSchemaResponse, error) Read(context.Context, *ReadRequest) (*Event, error) Write(context.Context, *Event) (*WriteResponse, error) + Ack(context.Context, *AckRequest) (*AckResponse, error) PutState(context.Context, *PutStateRequest) (*PutStateResponse, error) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) ListStates(context.Context, *ListStatesRequest) (*ListStatesResponse, error) @@ -136,6 +147,9 @@ func (UnimplementedFunctionServer) Read(context.Context, *ReadRequest) (*Event, func (UnimplementedFunctionServer) Write(context.Context, *Event) (*WriteResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Write not implemented") } +func (UnimplementedFunctionServer) Ack(context.Context, *AckRequest) (*AckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ack not implemented") +} func (UnimplementedFunctionServer) PutState(context.Context, *PutStateRequest) (*PutStateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PutState not implemented") } @@ -218,6 +232,24 @@ func _Function_Write_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Function_Ack_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(FunctionServer).Ack(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/fs_external.Function/Ack", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(FunctionServer).Ack(ctx, req.(*AckRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Function_PutState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(PutStateRequest) if err := dec(in); err != nil { @@ -327,6 +359,10 @@ var Function_ServiceDesc = grpc.ServiceDesc{ MethodName: "Write", Handler: _Function_Write_Handler, }, + { + MethodName: "Ack", + Handler: _Function_Ack_Handler, + }, { MethodName: "PutState", Handler: _Function_PutState_Handler, diff --git a/fs/runtime/external/runtime.go b/fs/runtime/external/runtime.go index 7e56be2..9d7b467 100644 --- a/fs/runtime/external/runtime.go +++ b/fs/runtime/external/runtime.go @@ -70,19 +70,12 @@ func (f *functionServerImpl) RegisterSchema(ctx context.Context, return &model.RegisterSchemaResponse{}, nil } -func (f *functionServerImpl) Read(ctx context.Context, request *model.ReadRequest) (*model.Event, error) { +func (f *functionServerImpl) Read(ctx context.Context, _ *model.ReadRequest) (*model.Event, error) { r, err := f.getFunctionRuntime(ctx) if err != nil { return nil, err } - select { - case e := <-r.inputCh: - return &model.Event{ - Payload: e.GetPayload(), - }, nil - case <-ctx.Done(): - return nil, ctx.Err() - } + return r.ReadRecord(ctx) } func (f *functionServerImpl) Write(ctx context.Context, event *model.Event) (*model.WriteResponse, error) { @@ -96,6 +89,15 @@ func (f *functionServerImpl) Write(ctx context.Context, event *model.Event) (*mo return &model.WriteResponse{}, nil } +func (f *functionServerImpl) Ack(ctx context.Context, request *model.AckRequest) (*model.AckResponse, error) { + r, err := f.getFunctionRuntime(ctx) + if err != nil { + return nil, err + } + r.Ack(request.Id) + return &model.AckResponse{}, nil +} + func (f *functionServerImpl) PutState( ctx context.Context, request *model.PutStateRequest) (*model.PutStateResponse, error) { r, err := f.getFunctionRuntime(ctx) @@ -173,9 +175,10 @@ func (f *Factory) NewFunctionRuntime(instance api.FunctionInstance, _ *funcModel.RuntimeConfig) (api.FunctionRuntime, error) { def := instance.Definition() r := &runtime{ - inputCh: make(chan contube.Record), - funcCtx: instance.FunctionContext(), - log: instance.Logger(), + inputCh: make(chan contube.Record), + funcCtx: instance.FunctionContext(), + log: instance.Logger(), + recordsMap: make(map[int64]contube.Record), } f.server.runtimeMaps.Store(common.GetNamespacedName(def.Namespace, def.Name).String(), r) f.log.Info("Creating new function runtime", "function", common.GetNamespacedName(def.Namespace, def.Name)) @@ -229,6 +232,10 @@ type runtime struct { inputCh chan contube.Record funcCtx api.FunctionContext log *common.Logger + + recordsMapMu sync.Mutex + recordIndex int64 + recordsMap map[int64]contube.Record } func (r *runtime) Call(e contube.Record) (contube.Record, error) { @@ -238,3 +245,31 @@ func (r *runtime) Call(e contube.Record) (contube.Record, error) { func (r *runtime) Stop() { } + +func (r *runtime) ReadRecord(ctx context.Context) (*model.Event, error) { + select { + case e := <-r.inputCh: + r.recordsMapMu.Lock() + defer r.recordsMapMu.Unlock() + eventId := r.recordIndex + r.recordIndex++ + r.recordsMap[eventId] = e + return &model.Event{ + Id: eventId, + Payload: e.GetPayload(), + }, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Ack acknowledges the processing of a record +// This is an idempotent operation +func (r *runtime) Ack(id int64) { + r.recordsMapMu.Lock() + defer r.recordsMapMu.Unlock() + if record, ok := r.recordsMap[id]; ok { + record.Commit() + delete(r.recordsMap, id) + } +} diff --git a/fs/runtime/external/runtime_test.go b/fs/runtime/external/runtime_test.go index 583fd30..06e11c4 100644 --- a/fs/runtime/external/runtime_test.go +++ b/fs/runtime/external/runtime_test.go @@ -23,6 +23,7 @@ import ( "net" "os" "testing" + "time" "github.com/functionstream/function-stream/fs/statestore" @@ -51,28 +52,76 @@ type testRecord struct { var log = common.NewDefaultLogger() -func runMockClient() { +type TestFunction struct { +} + +func (f *TestFunction) Init(_ gofs.FunctionContext) error { + return nil +} + +func (f *TestFunction) Handle(_ gofs.FunctionContext, event gofs.Event[Person]) (gofs.Event[Person], error) { + p := event.Data() + p.Money += 1 + return gofs.NewEvent(p), nil +} + +type TestCounterFunction struct { +} + +func (f *TestCounterFunction) Init(ctx gofs.FunctionContext) error { + return nil +} + +func (f *TestCounterFunction) Handle(_ gofs.FunctionContext, event gofs.Event[Counter]) (gofs.Event[Counter], error) { + c := event.Data() + c.Count += 1 + return gofs.NewEvent(c), nil +} + +type TestSource struct { +} + +func (f *TestSource) Init(_ gofs.FunctionContext) error { + return nil +} + +func (f *TestSource) Handle(_ gofs.FunctionContext, emit func(context.Context, gofs.Event[testRecord]) error) error { + for i := 0; i < 10; i++ { + err := emit(context.Background(), gofs.NewEvent(&testRecord{ + ID: i, + Name: "test", + })) + if err != nil { + log.Error(err, "failed to emit record") + } + } + return nil +} + +type TestModules struct { + testFunction *TestFunction + testCounter *TestCounterFunction + testSource *TestSource + testSink *TestSink +} + +func NewTestModules() *TestModules { + return &TestModules{ + testFunction: &TestFunction{}, + testCounter: &TestCounterFunction{}, + testSource: &TestSource{}, + testSink: &TestSink{ + sinkCh: make(chan Counter), + }, + } +} + +func (t *TestModules) Run() { err := gofs.NewFSClient(). - Register(gofs.DefaultModule, gofs.Function(func(ctx context.Context, i *Person) *Person { - i.Money += 1 - return i - })). - Register("counter", gofs.Function(func(ctx context.Context, i *Counter) *Counter { - i.Count += 1 - return i - })). - Register("test-source", gofs.Source( - func(ctx context.Context, emit func(ctx context.Context, record *testRecord) error) { - for i := 0; i < 10; i++ { - err := emit(ctx, &testRecord{ - ID: i, - Name: "test", - }) - if err != nil { - log.Error(err, "failed to emit record") - } - } - })). + Register(gofs.DefaultModule, gofs.WithFunction(t.testFunction)). + Register("counter", gofs.WithFunction(t.testCounter)). + Register("test-source", gofs.WithSource(t.testSource)). + Register("test-sink", gofs.WithSink(t.testSink)). Run() if err != nil { log.Error(err, "failed to run mock client") @@ -99,7 +148,7 @@ func TestExternalRuntime(t *testing.T) { t.Fatal(err) } - go runMockClient() + go NewTestModules().Run() inputTopic := "input" outputTopic := "output" @@ -129,10 +178,14 @@ func TestExternalRuntime(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) + acked := make(chan struct{}) + event, err := contube.NewStructRecord(&Person{ Name: "test", Money: 1, - }, func() {}) + }, func() { + acked <- struct{}{} + }) assert.NoError(t, err) err = fm.ProduceEvent(inputTopic, event) assert.NoError(t, err) @@ -144,6 +197,12 @@ func TestExternalRuntime(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, p.Money) + select { + case <-acked: + case <-time.After(5 * time.Second): + t.Fatal("failed to ack event") + } + err = fm.DeleteFunction("", f.Name) assert.NoError(t, err) } @@ -197,7 +256,7 @@ func TestNonDefaultModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - go runMockClient() + go NewTestModules().Run() event, err := contube.NewStructRecord(&Counter{ Count: 1, @@ -262,7 +321,7 @@ func TestExternalSourceModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - go runMockClient() + go NewTestModules().Run() for i := 0; i < 10; i++ { output, err := fm.ConsumeEvent(outputTopic) @@ -278,6 +337,19 @@ func TestExternalSourceModule(t *testing.T) { assert.NoError(t, err) } +type TestSink struct { + sinkCh chan Counter +} + +func (f *TestSink) Init(_ gofs.FunctionContext) error { + return nil +} + +func (f *TestSink) Handle(ctx gofs.FunctionContext, event gofs.Event[Counter]) error { + f.sinkCh <- *event.Data() + return event.Ack(ctx) +} + func TestExternalSinkModule(t *testing.T) { testSocketPath := fmt.Sprintf("/tmp/%s.sock", t.Name()) assert.NoError(t, os.RemoveAll(testSocketPath)) @@ -324,27 +396,31 @@ func TestExternalSinkModule(t *testing.T) { err = fm.StartFunction(f) assert.NoError(t, err) - sinkCh := make(chan Counter) + testMods := NewTestModules() + sinkMod := testMods.testSink - go func() { - err := gofs.NewFSClient().Register("test-sink", gofs.Sink(func(ctx context.Context, record *Counter) { - sinkCh <- *record - })).Run() - if err != nil { - log.Error(err, "failed to run mock client") - } - }() + go testMods.Run() + + ackCh := make(chan struct{}, 100) event, err := contube.NewStructRecord(&Counter{ Count: 1, - }, func() {}) + }, func() { + ackCh <- struct{}{} + }) assert.NoError(t, err) err = fm.ProduceEvent(inputTopic, event) assert.NoError(t, err) - r := <-sinkCh + r := <-sinkMod.sinkCh assert.Equal(t, 1, r.Count) + select { + case <-ackCh: + case <-time.After(5 * time.Second): + t.Fatal("failed to ack event") + } + err = fm.DeleteFunction("", f.Name) assert.NoError(t, err) } @@ -395,17 +471,16 @@ func TestExternalStatefulModule(t *testing.T) { readyCh := make(chan struct{}) go func() { - err := gofs.NewFSClient().Register("test-stateful", gofs.Custom(func(ctx context.Context) error { return nil }, - func(ctx context.Context) error { - funcCtx := gofs.GetFunctionContext(ctx) - err = funcCtx.PutState(ctx, "test-key", []byte("test-value")) + err := gofs.NewFSClient().Register("test-stateful", gofs.WithCustom(gofs.NewSimpleCustom( + func(ctx gofs.FunctionContext) error { + err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { log.Error(err, "failed to put state") } close(readyCh) return nil }, - )).Run() + ))).Run() if err != nil { log.Error(err, "failed to run mock client") } @@ -471,17 +546,16 @@ func TestFunctionConfig(t *testing.T) { readyCh := make(chan struct{}) go func() { - err := gofs.NewFSClient().Register(module, gofs.Custom(func(ctx context.Context) error { return nil }, - func(ctx context.Context) error { - funcCtx := gofs.GetFunctionContext(ctx) - err = funcCtx.PutState(ctx, "test-key", []byte("test-value")) + err := gofs.NewFSClient().Register(module, gofs.WithCustom(gofs.NewSimpleCustom( + func(ctx gofs.FunctionContext) error { + err = ctx.PutState(context.Background(), "test-key", []byte("test-value")) if err != nil { log.Error(err, "failed to put state") } close(readyCh) return nil }, - )).Run() + ))).Run() if err != nil { log.Error(err, "failed to run mock client") }