From ea433e7c76779694e1db00569b9c70283affd88a Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Wed, 24 Jun 2015 18:05:01 +0800 Subject: [PATCH 1/9] fix header and event handling --- channel.go | 7 +++--- client.go | 6 ++--- event.go | 73 ++++++++++++------------------------------------------ socket.go | 12 ++++----- 4 files changed, 29 insertions(+), 69 deletions(-) diff --git a/channel.go b/channel.go index 4975f1b..959120d 100644 --- a/channel.go +++ b/channel.go @@ -104,14 +104,14 @@ func (ch *channel) sendEvent(e *Event) error { } if ch.Id != "" { - e.Header["response_to"] = ch.Id + e.Header.ResponseTo = ch.Id } else { - ch.Id = e.Header["message_id"].(string) + ch.Id = e.Header.Id go ch.sendHeartbeats() } - log.Printf("Channel %s sending event %s", ch.Id, e.Header["message_id"].(string)) + log.Printf("Channel %s sending event %s", ch.Id, e.Header.Id) identity := ch.identity @@ -160,6 +160,7 @@ func (ch *channel) sendHeartbeats() { } func (ch *channel) listen() { + streamCounter := 0 for { diff --git a/client.go b/client.go index c228a7a..0dd2abe 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,7 @@ package zerorpc import ( - "errors" + "fmt" "log" ) @@ -96,7 +96,7 @@ func (c *Client) Invoke(name string, args ...interface{}) (*Event, error) { select { case response := <-ch.channelOutput: if response.Name == "ERR" { - return response, errors.New(response.Args[0].(string)) + return response, fmt.Errorf("ERR:%s", response.Args) } else { return response, nil } @@ -177,7 +177,7 @@ func (c *Client) InvokeStream(name string, args ...interface{}) ([]*Event, error select { case response := <-ch.channelOutput: if response.Name == "ERR" { - return []*Event{response}, errors.New(response.Args[0].(string)) + return []*Event{response}, fmt.Errorf("%s", response.Args) } else if response.Name == "OK" { return []*Event{response}, nil } else if response.Name == "STREAM" { diff --git a/event.go b/event.go index 8edae65..932d624 100644 --- a/event.go +++ b/event.go @@ -1,7 +1,6 @@ package zerorpc import ( - "errors" uuid "github.com/nu7hatch/gouuid" "github.com/ugorji/go/codec" ) @@ -10,8 +9,15 @@ import ( const ProtocolVersion = 3 // Event representation + +type EventHeader struct { + Id string `codec:"message_id,omitempty"` + ResponseTo string `codec:"response_to,omitempty"` + Version int `codec:"v"` +} + type Event struct { - Header map[string]interface{} + Header *EventHeader Name string Args []interface{} } @@ -24,9 +30,7 @@ func newEvent(name string, args ...interface{}) (*Event, error) { return nil, err } - header := make(map[string]interface{}) - header["message_id"] = id.String() - header["v"] = ProtocolVersion + header := &EventHeader{Id: id.String(), Version: ProtocolVersion} e := Event{ Header: header, @@ -37,6 +41,10 @@ func newEvent(name string, args ...interface{}) (*Event, error) { return &e, nil } +var ( + mh codec.MsgpackHandle +) + // Packs an event into MsgPack bytes func (e *Event) packBytes() ([]byte, error) { data := make([]interface{}, 2) @@ -49,7 +57,7 @@ func (e *Event) packBytes() ([]byte, error) { var buf []byte - enc := codec.NewEncoderBytes(&buf, &codec.MsgpackHandle{}) + enc := codec.NewEncoderBytes(&buf, &mh) if err := enc.Encode(data); err != nil { return nil, err } @@ -59,63 +67,14 @@ func (e *Event) packBytes() ([]byte, error) { // Unpacks an event fom MsgPack bytes func unPackBytes(b []byte) (*Event, error) { - var mh codec.MsgpackHandle - var v interface{} - + var e Event dec := codec.NewDecoderBytes(b, &mh) - err := dec.Decode(&v) + err := dec.Decode(&e) if err != nil { return nil, err } - // get the event headers - h, ok := v.([]interface{})[0].(map[interface{}]interface{}) - if !ok { - return nil, errors.New("zerorpc/event interface conversion error") - } - - header := make(map[string]interface{}) - - for k, v := range h { - switch t := v.(type) { - case []byte: - header[k.(string)] = string(t) - - default: - header[k.(string)] = t - } - } - - // get the event name - n, ok := v.([]interface{})[1].([]byte) - if !ok { - return nil, errors.New("zerorpc/event interface conversion error") - } - - // get the event args - args := make([]interface{}, 0) - - for i := 2; i < len(v.([]interface{})); i++ { - t := v.([]interface{})[i] - - switch t.(type) { - case []interface{}: - for _, a := range t.([]interface{}) { - args = append(args, convertValue(a)) - } - - default: - args = append(args, convertValue(t)) - } - } - - e := Event{ - Header: header, - Name: string(n), - Args: args, - } - return &e, nil } diff --git a/socket.go b/socket.go index c73561a..a59faa7 100644 --- a/socket.go +++ b/socket.go @@ -102,7 +102,7 @@ func (s *socket) sendEvent(e *Event, identity string) error { return err } - log.Printf("ZeroRPC socket sent event %s", e.Header["message_id"].(string)) + log.Printf("ZeroRPC socket sent event %s", e.Header.Id) i, err := s.zmqSocket.SendMessage(identity, "", b) if err != nil { @@ -135,11 +135,11 @@ func (s *socket) listen() { s.socketErrors <- err } - log.Printf("ZeroRPC socket recieved event %s", ev.Header["message_id"].(string)) + log.Printf("ZeroRPC socket recieved event %s", ev.Header.Id) var ch *channel - if _, ok := ev.Header["response_to"]; !ok { - ch = s.newChannel(ev.Header["message_id"].(string)) + if ev.Header.ResponseTo == "" { + ch = s.newChannel(ev.Header.Id) go ch.sendHeartbeats() if len(barr) > 1 { @@ -147,14 +147,14 @@ func (s *socket) listen() { } } else { for _, c := range s.Channels { - if c.Id == ev.Header["response_to"].(string) { + if c.Id == ev.Header.ResponseTo { ch = c } } } if ch != nil && ch.state == open { - log.Printf("ZeroRPC socket routing event %s to channel %s", ev.Header["message_id"].(string), ch.Id) + log.Printf("ZeroRPC socket routing event %s to channel %s", ev.Header.Id, ch.Id) ch.socketInput <- ev } From 4b57838d933f8176eb9407d3eacc5bc843605907 Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Wed, 24 Jun 2015 18:06:52 +0800 Subject: [PATCH 2/9] fix Err --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index 0dd2abe..c298efa 100644 --- a/client.go +++ b/client.go @@ -96,7 +96,7 @@ func (c *Client) Invoke(name string, args ...interface{}) (*Event, error) { select { case response := <-ch.channelOutput: if response.Name == "ERR" { - return response, fmt.Errorf("ERR:%s", response.Args) + return response, fmt.Errorf("%s", response.Args) } else { return response, nil } From 3855e361591c84a56d22251d503a39c319272938 Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Thu, 25 Jun 2015 18:07:03 +0800 Subject: [PATCH 3/9] fix typo --- client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client.go b/client.go index c298efa..bc1af42 100644 --- a/client.go +++ b/client.go @@ -152,7 +152,7 @@ It also supports first class exceptions, in case of an exception, the error returned from Invoke() or InvokeStream() is the exception name and the args of the returned event are the exception description and traceback. -The client sends heartbeat events every 5 seconds, if twp heartbeat events are missed, +The client sends heartbeat events every 5 seconds, if two heartbeat events are missed, the remote is considered as lost and an ErrLostRemote is returned. */ func (c *Client) InvokeStream(name string, args ...interface{}) ([]*Event, error) { From 6db1fe0df6ce9ae38418a7a0f1468dbfc3ccb2ce Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Fri, 26 Jun 2015 09:59:10 +0800 Subject: [PATCH 4/9] remove state with open boolean --- channel.go | 23 ++++++++--------------- socket.go | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/channel.go b/channel.go index 959120d..e966447 100644 --- a/channel.go +++ b/channel.go @@ -17,13 +17,6 @@ const ( bufferSize = 100 ) -type state int - -const ( - open = iota - closed -) - var ( errClosedChannel = errors.New("zerorpc/channel closed channel") ErrLostRemote = errors.New("zerorpc/channel lost remote") @@ -32,7 +25,7 @@ var ( // Channel representation type channel struct { Id string - state state + open bool socket *socket socketInput chan *Event channelOutput chan *Event @@ -53,7 +46,7 @@ func (s *socket) newChannel(id string) *channel { c := channel{ Id: id, - state: open, + open: true, socket: s, socketInput: make(chan *Event, bufferSize), channelOutput: make(chan *Event), @@ -77,11 +70,11 @@ func (ch *channel) close() { ch.mu.Lock() defer ch.mu.Unlock() - if ch.state == closed { + if !ch.open { return } - ch.state = closed + ch.open = false ch.socket.removeChannel(ch) @@ -99,7 +92,7 @@ func (ch *channel) sendEvent(e *Event) error { ch.mu.Lock() defer ch.mu.Unlock() - if ch.state == closed { + if !ch.open { return errClosedChannel } @@ -138,7 +131,7 @@ func (ch *channel) sendHeartbeats() { for { time.Sleep(HeartbeatFrequency) - if ch.state == closed { + if !ch.open { return } @@ -164,7 +157,7 @@ func (ch *channel) listen() { streamCounter := 0 for { - if ch.state == closed { + if !ch.open { return } @@ -243,7 +236,7 @@ func (ch *channel) listen() { func (ch *channel) handleHeartbeats() { for { - if ch.state == closed { + if !ch.open { return } diff --git a/socket.go b/socket.go index a59faa7..42f1f00 100644 --- a/socket.go +++ b/socket.go @@ -153,7 +153,7 @@ func (s *socket) listen() { } } - if ch != nil && ch.state == open { + if ch != nil && ch.open { log.Printf("ZeroRPC socket routing event %s to channel %s", ev.Header.Id, ch.Id) ch.socketInput <- ev From 848de13663864a9eba11825fc1904ec0473b1aa5 Mon Sep 17 00:00:00 2001 From: "zhuo.meng" Date: Mon, 29 Jun 2015 21:26:57 +0800 Subject: [PATCH 5/9] fix RawToString --- event.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/event.go b/event.go index 932d624..e77f224 100644 --- a/event.go +++ b/event.go @@ -8,6 +8,15 @@ import ( // ZeroRPC protocol version const ProtocolVersion = 3 +var ( + mh codec.MsgpackHandle +) + +func init() { + mh.RawToString = true + +} + // Event representation type EventHeader struct { @@ -19,7 +28,7 @@ type EventHeader struct { type Event struct { Header *EventHeader Name string - Args []interface{} + Args codec.MsgpackSpecRpcMultiArgs } // Returns a pointer to a new event, @@ -41,10 +50,6 @@ func newEvent(name string, args ...interface{}) (*Event, error) { return &e, nil } -var ( - mh codec.MsgpackHandle -) - // Packs an event into MsgPack bytes func (e *Event) packBytes() ([]byte, error) { data := make([]interface{}, 2) From 23d53b11e2d49b24e674428e7c36f0d6b657220c Mon Sep 17 00:00:00 2001 From: "zhuo.meng" Date: Mon, 29 Jun 2015 23:37:33 +0800 Subject: [PATCH 6/9] fix race on close --- socket.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/socket.go b/socket.go index 42f1f00..ff8ab43 100644 --- a/socket.go +++ b/socket.go @@ -75,6 +75,9 @@ func (s *socket) close() error { s.removeChannel(c) } + s.mu.Lock() + defer s.mu.Unlock() + log.Printf("ZeroRPC socket closed") return s.zmqSocket.Close() } From 87cb75793ce57b94418b45532a35bb4971e4100a Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Fri, 24 Jul 2015 17:27:30 +0800 Subject: [PATCH 7/9] OK count as heartbeat --- channel.go | 1 + 1 file changed, 1 insertion(+) diff --git a/channel.go b/channel.go index e966447..d331111 100644 --- a/channel.go +++ b/channel.go @@ -169,6 +169,7 @@ func (ch *channel) listen() { switch ev.Name { case "OK": + ch.lastHeartbeat = time.Now() ch.channelOutput <- ev case "ERR": From 9006610653ed2a08cab7287ed8021848229611a3 Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Mon, 7 Sep 2015 14:49:12 +0800 Subject: [PATCH 8/9] add force encode message_id --- event.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/event.go b/event.go index e77f224..4ef7f3f 100644 --- a/event.go +++ b/event.go @@ -20,7 +20,7 @@ func init() { // Event representation type EventHeader struct { - Id string `codec:"message_id,omitempty"` + Id string `codec:"message_id"` ResponseTo string `codec:"response_to,omitempty"` Version int `codec:"v"` } @@ -74,7 +74,6 @@ func (e *Event) packBytes() ([]byte, error) { func unPackBytes(b []byte) (*Event, error) { var e Event dec := codec.NewDecoderBytes(b, &mh) - err := dec.Decode(&e) if err != nil { return nil, err From 574636abe7e3f3bf6aa39e1d7f1499e9add387b6 Mon Sep 17 00:00:00 2001 From: Meng Zhuo Date: Mon, 7 Sep 2015 15:52:34 +0800 Subject: [PATCH 9/9] change func call into map for effieice --- .gitignore | 2 ++ server.go | 32 +++++++++++++------------------- server_test.go | 18 ++++++++++++++++++ socket.go | 3 ++- 4 files changed, 35 insertions(+), 20 deletions(-) create mode 100644 server_test.go diff --git a/.gitignore b/.gitignore index 1062418..8e98a03 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .idea/ *.iml +src +pkg diff --git a/server.go b/server.go index cce060c..c616ef5 100644 --- a/server.go +++ b/server.go @@ -7,15 +7,11 @@ import ( // ZeroRPC server representation, // it holds a pointer to the ZeroMQ socket +type HandlerFunc func(args []interface{}) (interface{}, error) + type Server struct { socket *socket - handlers []*taskHandler -} - -// Task handler representation -type taskHandler struct { - TaskName string - HandlerFunc *func(args []interface{}) (interface{}, error) + handlers map[string]HandlerFunc } var ( @@ -66,7 +62,7 @@ func NewServer(endpoint string) (*Server, error) { server := Server{ socket: s, - handlers: make([]*taskHandler, 0), + handlers: make(map[string]HandlerFunc, 0), } server.socket.server = &server @@ -83,14 +79,13 @@ func (s *Server) Close() error { // tasks are invoked in new goroutines // // it returns ErrDuplicateHandler if an handler was already registered for the task -func (s *Server) RegisterTask(name string, handlerFunc *func(args []interface{}) (interface{}, error)) error { - for _, h := range s.handlers { - if h.TaskName == name { - return ErrDuplicateHandler - } +func (s *Server) RegisterTask(name string, handlerFunc func(args []interface{}) (interface{}, error)) error { + + if _, ok := s.handlers[name]; ok { + return ErrDuplicateHandler } - s.handlers = append(s.handlers, &taskHandler{TaskName: name, HandlerFunc: handlerFunc}) + s.handlers[name] = handlerFunc log.Printf("ZeroRPC server registered handler for task %s", name) @@ -100,12 +95,11 @@ func (s *Server) RegisterTask(name string, handlerFunc *func(args []interface{}) // Invoke the handler for a task event, // it returns ErrNoTaskHandler if no handler is registered for the task func (s *Server) handleTask(ev *Event) (interface{}, error) { - for _, h := range s.handlers { - if h.TaskName == ev.Name { - log.Printf("ZeroRPC server handling task %s with args %s", ev.Name, ev.Args) - return (*h.HandlerFunc)(ev.Args) - } + if h, ok := s.handlers[ev.Name]; ok { + + log.Printf("ZeroRPC server handling task %s with args %s", ev.Name, ev.Args) + return h(ev.Args) } return nil, ErrNoTaskHandler diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000..8fdf10c --- /dev/null +++ b/server_test.go @@ -0,0 +1,18 @@ +package zerorpc + +import "testing" + +func TestServerBind(t *testing.T) { + s, err := NewServer("tcp://0.0.0.0:4242") + if err != nil { + panic(err) + } + + defer s.Close() + + h := func(v []interface{}) (interface{}, error) { + return "Hello, " + v[0].(string), nil + } + + s.RegisterTask("hello", h) +} diff --git a/socket.go b/socket.go index ff8ab43..d28a313 100644 --- a/socket.go +++ b/socket.go @@ -4,9 +4,10 @@ package zerorpc import ( - zmq "github.com/pebbe/zmq4" "log" "sync" + + zmq "github.com/pebbe/zmq4" ) // ZeroRPC socket representation