From 92df80d557588949ba24296e1a5b87933dc8dfd6 Mon Sep 17 00:00:00 2001 From: "e.s.prilutskiy" Date: Tue, 9 Dec 2025 13:30:31 +0300 Subject: [PATCH 1/5] fix: messages out of order --- connection.go | 49 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/connection.go b/connection.go index 5e4865b..263e76c 100644 --- a/connection.go +++ b/connection.go @@ -45,19 +45,24 @@ type Connection struct { // notificationWg tracks in-flight notification handlers. This ensures SendRequest waits // for all notifications received before the response to complete processing. notificationWg sync.WaitGroup + + // notificationQueue serializes notification processing to maintain order + notificationQueue chan *anyMessage } func NewConnection(handler MethodHandler, peerInput io.Writer, peerOutput io.Reader) *Connection { ctx, cancel := context.WithCancelCause(context.Background()) c := &Connection{ - w: peerInput, - r: peerOutput, - handler: handler, - pending: make(map[string]*pendingResponse), - ctx: ctx, - cancel: cancel, + w: peerInput, + r: peerOutput, + handler: handler, + pending: make(map[string]*pendingResponse), + ctx: ctx, + cancel: cancel, + notificationQueue: make(chan *anyMessage, 100), } go c.receive() + go c.processNotifications() return c } @@ -99,10 +104,25 @@ func (c *Connection) receive() { c.handleResponse(&msg) case msg.Method != "": c.notificationWg.Add(1) - go func(m *anyMessage) { - defer c.notificationWg.Done() - c.handleInbound(m) - }(&msg) + // Queue the notification for sequential processing. + // Blocking send ensures order is maintained and notifications aren't dropped. + // The processNotifications goroutine should drain the queue quickly. + // We check ctx.Done() to avoid blocking forever if the connection is closed, + // but prioritize queuing the notification. + select { + case c.notificationQueue <- &msg: + // Successfully queued - this is the common case + case <-c.ctx.Done(): + // Connection is closing. Try one more time with a non-blocking send + // in case the queue can still accept the message. + select { + case c.notificationQueue <- &msg: + // Successfully queued despite context cancellation + default: + // Queue closed or full, must drop notification + c.notificationWg.Done() + } + } default: c.loggerOrDefault().Error("received message with neither id nor method", "raw", string(line)) } @@ -110,6 +130,15 @@ func (c *Connection) receive() { c.cancel(errors.New("peer connection closed")) c.loggerOrDefault().Info("peer connection closed") + close(c.notificationQueue) +} + +// processNotifications processes notifications sequentially to maintain order +func (c *Connection) processNotifications() { + for msg := range c.notificationQueue { + c.handleInbound(msg) + c.notificationWg.Done() + } } func (c *Connection) handleResponse(msg *anyMessage) { From 405e9e8b75c46f4c64d230e2661ac70428e5bfb6 Mon Sep 17 00:00:00 2001 From: "e.s.prilutskiy" Date: Tue, 9 Dec 2025 14:02:27 +0300 Subject: [PATCH 2/5] fix: blocking connection on queue --- connection.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/connection.go b/connection.go index 263e76c..c94eb0c 100644 --- a/connection.go +++ b/connection.go @@ -59,7 +59,7 @@ func NewConnection(handler MethodHandler, peerInput io.Writer, peerOutput io.Rea pending: make(map[string]*pendingResponse), ctx: ctx, cancel: cancel, - notificationQueue: make(chan *anyMessage, 100), + notificationQueue: make(chan *anyMessage, 1024), } go c.receive() go c.processNotifications() @@ -105,23 +105,19 @@ func (c *Connection) receive() { case msg.Method != "": c.notificationWg.Add(1) // Queue the notification for sequential processing. - // Blocking send ensures order is maintained and notifications aren't dropped. - // The processNotifications goroutine should drain the queue quickly. - // We check ctx.Done() to avoid blocking forever if the connection is closed, - // but prioritize queuing the notification. + // If the queue is full, fall back to concurrent processing (old behavior) + // to avoid blocking the receive loop and prevent requests from hanging. select { case c.notificationQueue <- &msg: - // Successfully queued - this is the common case - case <-c.ctx.Done(): - // Connection is closing. Try one more time with a non-blocking send - // in case the queue can still accept the message. - select { - case c.notificationQueue <- &msg: - // Successfully queued despite context cancellation - default: - // Queue closed or full, must drop notification - c.notificationWg.Done() - } + // Successfully queued for sequential processing + default: + // Queue is full - process concurrently to avoid blocking the receive loop. + // This maintains backward compatibility and prevents the receive loop from + // stalling, which could cause requests to hang waiting for responses. + go func(m *anyMessage) { + defer c.notificationWg.Done() + c.handleInbound(m) + }(&msg) } default: c.loggerOrDefault().Error("received message with neither id nor method", "raw", string(line)) From fe1570178f829effaa545367e7c0160751a35bda Mon Sep 17 00:00:00 2001 From: krulsaidme0w Date: Fri, 12 Dec 2025 20:21:04 +0300 Subject: [PATCH 3/5] fix: some --- connection.go | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/connection.go b/connection.go index c94eb0c..52a12d9 100644 --- a/connection.go +++ b/connection.go @@ -37,9 +37,19 @@ type Connection struct { nextID atomic.Uint64 pending map[string]*pendingResponse + // ctx/cancel govern connection lifetime and are used for Done() and for canceling + // callers waiting on responses when the peer disconnects. ctx context.Context cancel context.CancelCauseFunc + // inboundCtx/inboundCancel are used when invoking the inbound MethodHandler. + // This ctx is intentionally kept alive long enough to process notifications + // that were successfully received and queued just before a peer disconnect. + // Otherwise, handlers that respect context cancellation may drop end-of-connection + // messages that we already read off the wire. + inboundCtx context.Context + inboundCancel context.CancelCauseFunc + logger *slog.Logger // notificationWg tracks in-flight notification handlers. This ensures SendRequest waits @@ -52,6 +62,7 @@ type Connection struct { func NewConnection(handler MethodHandler, peerInput io.Writer, peerOutput io.Reader) *Connection { ctx, cancel := context.WithCancelCause(context.Background()) + inboundCtx, inboundCancel := context.WithCancelCause(context.Background()) c := &Connection{ w: peerInput, r: peerOutput, @@ -59,6 +70,8 @@ func NewConnection(handler MethodHandler, peerInput io.Writer, peerOutput io.Rea pending: make(map[string]*pendingResponse), ctx: ctx, cancel: cancel, + inboundCtx: inboundCtx, + inboundCancel: inboundCancel, notificationQueue: make(chan *anyMessage, 1024), } go c.receive() @@ -103,12 +116,22 @@ func (c *Connection) receive() { case msg.ID != nil && msg.Method == "": c.handleResponse(&msg) case msg.Method != "": + // Requests (method+id) must not be serialized behind notifications, otherwise + // a long-running request (e.g. session/prompt) can deadlock cancellation + // notifications (session/cancel) that are required to stop it. + if msg.ID != nil { + m := msg + go c.handleInbound(&m) + continue + } + c.notificationWg.Add(1) // Queue the notification for sequential processing. // If the queue is full, fall back to concurrent processing (old behavior) // to avoid blocking the receive loop and prevent requests from hanging. + m := msg select { - case c.notificationQueue <- &msg: + case c.notificationQueue <- &m: // Successfully queued for sequential processing default: // Queue is full - process concurrently to avoid blocking the receive loop. @@ -117,16 +140,25 @@ func (c *Connection) receive() { go func(m *anyMessage) { defer c.notificationWg.Done() c.handleInbound(m) - }(&msg) + }(&m) } default: c.loggerOrDefault().Error("received message with neither id nor method", "raw", string(line)) } } - c.cancel(errors.New("peer connection closed")) - c.loggerOrDefault().Info("peer connection closed") + cause := errors.New("peer connection closed") + + // First, signal disconnect to callers waiting on responses. + c.cancel(cause) + + // Then close and drain notifications so already-received messages are delivered + // to handlers with a still-valid inbound context. close(c.notificationQueue) + c.notificationWg.Wait() + c.inboundCancel(cause) + + c.loggerOrDefault().Info("peer connection closed") } // processNotifications processes notifications sequentially to maintain order @@ -166,7 +198,7 @@ func (c *Connection) handleInbound(req *anyMessage) { return } - result, err := c.handler(c.ctx, req.Method, req.Params) + result, err := c.handler(c.inboundCtx, req.Method, req.Params) if req.ID == nil { // Notification: no response is sent; log handler errors to surface decode failures. if err != nil { From 750beca6a6f6576c88e1429a301c0ec230648974 Mon Sep 17 00:00:00 2001 From: krulsaidme0w Date: Fri, 12 Dec 2025 21:23:25 +0300 Subject: [PATCH 4/5] fix: gracefull conection drop --- acp_test.go | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++ connection.go | 24 +++++++++++++++++++---- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/acp_test.go b/acp_test.go index 915bb87..b3d600e 100644 --- a/acp_test.go +++ b/acp_test.go @@ -2,6 +2,7 @@ package acp import ( "context" + "encoding/json" "io" "slices" "sync" @@ -467,6 +468,59 @@ func TestConnectionHandlesNotifications(t *testing.T) { } } +func TestConnection_DoesNotCancelInboundContextBeforeDrainingNotificationsOnDisconnect(t *testing.T) { + const n = 25 + + incomingR, incomingW := io.Pipe() + + var ( + wg sync.WaitGroup + canceledCount atomic.Int64 + ) + wg.Add(n) + + c := NewConnection(func(ctx context.Context, method string, _ json.RawMessage) (any, *RequestError) { + defer wg.Done() + // Slow down processing so some notifications are handled after the receive + // loop observes EOF and signals disconnect. + time.Sleep(10 * time.Millisecond) + if ctx.Err() != nil { + canceledCount.Add(1) + } + return nil, nil + }, io.Discard, incomingR) + + // Write notifications quickly and then close the stream to simulate a peer disconnect. + for i := 0; i < n; i++ { + if _, err := io.WriteString(incomingW, `{"jsonrpc":"2.0","method":"test/notify","params":{}}`+"\n"); err != nil { + t.Fatalf("write notification: %v", err) + } + } + _ = incomingW.Close() + + select { + case <-c.Done(): + // Expected: peer disconnect observed promptly. + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for connection Done()") + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatalf("timeout waiting for notification handlers") + } + + if got := canceledCount.Load(); got != 0 { + t.Fatalf("inbound handler context was canceled for %d/%d notifications", got, n) + } +} + // Test initialize method behavior func TestConnectionHandlesInitialize(t *testing.T) { c2aR, c2aW := io.Pipe() diff --git a/connection.go b/connection.go index 52a12d9..6e9de1e 100644 --- a/connection.go +++ b/connection.go @@ -10,6 +10,7 @@ import ( "log/slog" "sync" "sync/atomic" + "time" ) type anyMessage struct { @@ -152,11 +153,26 @@ func (c *Connection) receive() { // First, signal disconnect to callers waiting on responses. c.cancel(cause) - // Then close and drain notifications so already-received messages are delivered - // to handlers with a still-valid inbound context. + // Then close the notification queue so already-received messages can drain. + // IMPORTANT: Do not block this receive goroutine waiting for the drain to complete; + // notification handlers may legitimately block until their context is canceled. close(c.notificationQueue) - c.notificationWg.Wait() - c.inboundCancel(cause) + + // Cancel inboundCtx after notifications finish, but ensure we don't leak forever if a + // handler blocks waiting for cancellation. + const drainTimeout = 5 * time.Second + go func() { + done := make(chan struct{}) + go func() { + c.notificationWg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(drainTimeout): + } + c.inboundCancel(cause) + }() c.loggerOrDefault().Info("peer connection closed") } From a2465585be07514cb87a26c05dd61b86f286e40f Mon Sep 17 00:00:00 2001 From: krulsaidme0w Date: Fri, 12 Dec 2025 21:33:30 +0300 Subject: [PATCH 5/5] fix: gracefull conection --- acp_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ connection.go | 11 ++++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/acp_test.go b/acp_test.go index b3d600e..f31756c 100644 --- a/acp_test.go +++ b/acp_test.go @@ -521,6 +521,54 @@ func TestConnection_DoesNotCancelInboundContextBeforeDrainingNotificationsOnDisc } } +func TestConnection_CancelsRequestHandlersOnDisconnectEvenWithNotificationBacklog(t *testing.T) { + const numNotifications = 200 + + incomingR, incomingW := io.Pipe() + + reqDone := make(chan struct{}) + + c := NewConnection(func(ctx context.Context, method string, _ json.RawMessage) (any, *RequestError) { + switch method { + case "test/notify": + // Slow down to create a backlog of queued notifications. + time.Sleep(5 * time.Millisecond) + return nil, nil + case "test/request": + // Requests should be canceled promptly on disconnect (uses c.ctx). + <-ctx.Done() + close(reqDone) + return nil, NewInternalError(map[string]any{"error": "canceled"}) + default: + return nil, nil + } + }, io.Discard, incomingR) + + for i := 0; i < numNotifications; i++ { + if _, err := io.WriteString(incomingW, `{"jsonrpc":"2.0","method":"test/notify","params":{}}`+"\n"); err != nil { + t.Fatalf("write notification: %v", err) + } + } + if _, err := io.WriteString(incomingW, `{"jsonrpc":"2.0","id":1,"method":"test/request","params":{}}`+"\n"); err != nil { + t.Fatalf("write request: %v", err) + } + _ = incomingW.Close() + + // Disconnect should be observed quickly. + select { + case <-c.Done(): + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for connection Done()") + } + + // Even with a big notification backlog, the request handler should be canceled promptly. + select { + case <-reqDone: + case <-time.After(1 * time.Second): + t.Fatalf("timeout waiting for request handler cancellation") + } +} + // Test initialize method behavior func TestConnectionHandlesInitialize(t *testing.T) { c2aR, c2aW := io.Pipe() diff --git a/connection.go b/connection.go index 6e9de1e..cae8b59 100644 --- a/connection.go +++ b/connection.go @@ -202,6 +202,15 @@ func (c *Connection) handleResponse(msg *anyMessage) { func (c *Connection) handleInbound(req *anyMessage) { res := anyMessage{JSONRPC: "2.0"} + + // Notifications are allowed a slightly longer-lived context during disconnect so we can + // process already-received end-of-connection messages. Requests, however, should be + // canceled promptly when the peer disconnects to avoid doing unnecessary work after + // the caller is gone. + ctx := c.ctx + if req.ID == nil { + ctx = c.inboundCtx + } // copy ID if present if req.ID != nil { res.ID = req.ID @@ -214,7 +223,7 @@ func (c *Connection) handleInbound(req *anyMessage) { return } - result, err := c.handler(c.inboundCtx, req.Method, req.Params) + result, err := c.handler(ctx, req.Method, req.Params) if req.ID == nil { // Notification: no response is sent; log handler errors to surface decode failures. if err != nil {