Skip to content

Conversation

@ttypic
Copy link
Contributor

@ttypic ttypic commented Dec 3, 2025

Implemented Spec points:

Message Publishing Specifications (RTL6)

RTL6c - Messages published on channels in specific states

  • Messages published when channel is not ATTACHED should be published immediately

RTL6c2 - Message queuing behavior

  • Messages can be queued when connection/channel is not ready
  • Relates to processing queued messages when connection becomes ready

RTL6c3 - Publishing without implicit attach

RTL6c4 - Behavior when queueMessages client option is false

RTL6d - Message bundling restrictions

RTL6d1: Maximum message size limits for bundling

  • RTL6d2: All messages in bundle must have same clientId

RTL6d3: Can only bundle messages for same channel

  • RTL6d4: Can only bundle messages with same action (MESSAGE or PRESENCE)

RTL6d7: Cannot bundle idempotent messages with non-idempotent messages


Message Acknowledgment (RTN7)

RTN7a

All PRESENCE, MESSAGE, ANNOTATION, and OBJECT ProtocolMessages sent to Ably expect either an ACK or NACK to confirm successful receipt or failure

RTN7b

Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing msgSerial integer starting at zero

RTN7c

If connection enters SUSPENDED, CLOSED, or FAILED state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues

RTN7d

If queueMessages is false, messages entering DISCONNECTED state without acknowledgment should be treated as failed immediately

RTN7e

When connection state changes to SUSPENDED/CLOSED/FAILED, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed


Message Resending and Serial Handling (RTN19)

RTN19a

Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK

RTN19a2

In the event of a new connectionId (connection not resumed), previous msgSerials are meaningless and must be reset. The msgSerial counter resets to 0 for the new connection


Channel State and Reattachment (RTL3, RTL4, RTL5)

RTL3c

Channel state implications when connection goes into SUSPENDED

RTL3d

When connection enters CONNECTED state, channels in ATTACHING, ATTACHED, or SUSPENDED states should transition to ATTACHING and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish

RTL4c - Attach sequence

  • RTL4c1: ATTACH message includes channel serial to resume from previous message or attachment

RTL5i

If channel is DETACHING, re-send DETACH and remain in 'detaching' state

Summary by CodeRabbit

  • New Features

    • Async publish on realtime channels that awaits server ACK/NACK, enforces message-size limits, and validates client IDs; transport-level ACK/NACK handling added.
  • Bug Fixes / Reliability

    • Per-message ACK tracking, durable pending/queued message handling, requeueing before transport disposal, and msgSerial sequencing/reset on reconnect to preserve resume semantics.
  • Tests

    • Extensive test suite covering ACK/NACK, sequencing, queuing, resends, reconnect/resume flows, clientId validation, and edge cases.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 3, 2025

Walkthrough

Adds per-message ACK/NACK tracking and queuing (msgSerial, PendingMessage/PendingMessageQueue) in ConnectionManager; a new async RealtimeChannel.publish with client_id validation, encryption, encoding and size validation; transport-level ACK/NACK routing in WebSocketTransport; helper validate_message_size; and extensive publish tests.

Changes

Cohort / File(s) Summary
Pending message infra
ably/realtime/connectionmanager.py
Added PendingMessage and PendingMessageQueue; public msg_serial and pending_message_queue; send path now creates PendingMessage, assigns msgSerial for ack-required messages, enqueues/requeues/fails pending messages, exposes on_ack/on_nack, and integrates requeue/fail logic into transport lifecycle and state transitions.
Channel publish API
ably/realtime/realtime_channel.py
Added async publish() accepting Message/dict/list/(name,data); validates client_id, encrypts when configured, encodes respecting binary protocol, enforces validate_message_size, checks publishable state via _throw_if_unpublishable_state, sends MESSAGE protocol and awaits per-message ACK/NACK.
Transport ACK/NACK routing
ably/transport/websockettransport.py
Extended ProtocolMessageAction enum with ACK/NACK (plus additional actions); on_protocol_message now parses ACK/NACK and calls connection_manager.on_ack / on_nack (converting protocol error to AblyException).
Message size validator
ably/util/helper.py
Added validate_message_size(encoded_messages, use_binary_protocol, max_message_size) computing JSON/msgpack encoded size and raising AblyException (code 40009 / HTTP 400) when exceeding max.
Tests
test/ably/realtime/realtimechannel_publish_test.py
New comprehensive tests for RTN7 publish behaviors: ACK/NACK resolution, msgSerial sequencing and reset, pending/queued handling across connection states, requeue/resend on reconnect, concurrency and edge cases; includes transport/message interception scaffolding.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant Channel as RealtimeChannel
    participant ConnMgr as ConnectionManager
    participant Transport as WebSocketTransport
    participant Server as Server

    rect rgba(100,150,200,0.12)
    Note over App,Channel: Publish -> await per-message ACK/NACK
    App->>Channel: publish(message)
    Channel->>Channel: validate client_id, encrypt, encode, size check
    Channel->>ConnMgr: send_protocol_message(MESSAGE)
    ConnMgr->>ConnMgr: create PendingMessage + Future
    ConnMgr->>ConnMgr: assign msgSerial (if ack-required) and enqueue/send
    ConnMgr->>Transport: send MESSAGE (or enqueue if disconnected)
    Transport->>Server: transmit
    Server-->>Transport: ACK {msgSerial, count}
    Transport->>ConnMgr: on_ack(msgSerial, count)
    ConnMgr->>ConnMgr: complete PendingMessage future(s)
    Channel->>App: publish resolves
    end

    rect rgba(220,120,120,0.12)
    Note over Server,ConnMgr: NACK path
    Server-->>Transport: NACK {msgSerial, count, error}
    Transport->>ConnMgr: on_nack(msgSerial, count, error)
    ConnMgr->>ConnMgr: fail PendingMessage future(s) with error
    Channel->>App: publish raises AblyException
    end

    rect rgba(150,200,120,0.12)
    Note over ConnMgr,Transport: Reconnect / Requeue
    alt connectionId changed
      ConnMgr->>ConnMgr: on_connected -> reset msgSerial
    end
    ConnMgr->>ConnMgr: requeue_pending_messages()
    ConnMgr->>Transport: resend pending MESSAGEs preserving futures/msgSerial
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Review focus:
    • msgSerial assignment, increment, concurrency and reset semantics in ably/realtime/connectionmanager.py
    • Correct completion/failure of PendingMessage futures across ACK/NACK and state transitions
    • Requeueing/resend behavior on transport disposal, reconnect, and differing connectionId (resume vs new connection)
    • ACK/NACK parsing and error conversion in ably/transport/websockettransport.py
    • Message encoding/encryption and size validation (ably/realtime/realtime_channel.py + ably/util/helper.py)
    • Determinism and resource cleanup in test/ably/realtime/realtimechannel_publish_test.py

Poem

🐰 I stitched my serial through the night,

a future glowing, waiting bright;
An ACK will sing, a NACK may sigh,
I queue and hop when links run dry;
Small paws, bold bytes — the messages fly.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed Title clearly identifies the main change: implementing RealtimeChannel publish functionality over WebSocket, matching the core PR objective.
Linked Issues check ✅ Passed Code changes comprehensively implement RTL6 (message publishing), RTN7 (message acknowledgment), RTN19 (serial handling), and RTL3/4/5 (channel state) as specified in AIT-96 objectives.
Out of Scope Changes check ✅ Passed All changes directly support realtime publish implementation: connection manager ACK/NACK handling, RealtimeChannel.publish method, message size validation, and comprehensive test coverage.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch AIT-96/realtime-publish

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 3, 2025 21:59 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
test/ably/realtime/realtimechannel_publish_test.py (1)

381-434: Consider simplifying test setup.

The test manually manipulates internal state (on_ack, pending_message_queue.push, queued_messages.get) which makes it fragile to implementation changes. However, it does effectively validate the requeue mechanics.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 7df692f and 8f52b9b.

📒 Files selected for processing (5)
  • ably/realtime/connectionmanager.py (7 hunks)
  • ably/realtime/realtime_channel.py (2 hunks)
  • ably/transport/websockettransport.py (3 hunks)
  • ably/util/helper.py (2 hunks)
  • test/ably/realtime/realtimechannel_publish_test.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/options.py (2)
  • use_binary_protocol (161-162)
  • use_binary_protocol (165-166)
test/ably/realtime/realtimechannel_publish_test.py (5)
ably/realtime/connectionmanager.py (10)
  • ably (741-742)
  • count (54-56)
  • notify_state (562-608)
  • state (745-746)
  • on_connected (322-353)
  • connection_details (749-750)
  • on_ack (431-439)
  • send_protocol_message (171-236)
  • requeue_pending_messages (244-271)
  • push (50-52)
ably/transport/websockettransport.py (6)
  • ProtocolMessageAction (34-56)
  • close (228-229)
  • send (231-236)
  • on_protocol_message (124-185)
  • dispose (214-226)
  • connect (76-82)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/util/helper.py (1)
  • cancel (75-76)
ably/types/connectiondetails.py (1)
  • ConnectionDetails (5-20)
ably/realtime/connectionmanager.py (3)
ably/transport/websockettransport.py (1)
  • ProtocolMessageAction (34-56)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/connectionstate.py (1)
  • ConnectionState (8-16)
ably/realtime/realtime_channel.py (6)
ably/realtime/connectionmanager.py (3)
  • ably (741-742)
  • state (745-746)
  • send_protocol_message (171-236)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • IncompatibleClientIdException (103-104)
ably/util/helper.py (1)
  • validate_message_size (78-100)
ably/types/message.py (6)
  • Message (24-231)
  • name (65-66)
  • data (69-70)
  • client_id (73-74)
  • encrypt (100-116)
  • as_dict (132-178)
ably/types/connectionstate.py (1)
  • ConnectionState (8-16)
ably/types/channelstate.py (1)
  • ChannelState (8-15)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
  • count (54-56)
  • on_ack (431-439)
  • on_nack (441-453)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • from_dict (83-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: check (3.9)
  • GitHub Check: check (3.11)
  • GitHub Check: check (3.12)
  • GitHub Check: check (3.13)
  • GitHub Check: check (3.8)
  • GitHub Check: check (3.10)
  • GitHub Check: check (3.7)
🔇 Additional comments (19)
ably/transport/websockettransport.py (2)

36-56: LGTM!

The ProtocolMessageAction enum expansion correctly adds the new action types with sequential integer values matching the Ably protocol specification.


168-179: LGTM!

The ACK/NACK handling correctly extracts msgSerial and count with sensible defaults, converts error dicts to AblyException for NACKs, and delegates to the appropriate ConnectionManager methods.

ably/realtime/realtime_channel.py (5)

16-17: LGTM!

New imports correctly added for client ID validation and message size enforcement.


387-440: LGTM!

The argument parsing correctly handles all specified forms: single Message, dict, list of messages/dicts, and the (name, data) pair form per RTL6i specification.


442-454: LGTM!

The clientId validation correctly implements RTL6g:

  • Rejects wildcard '*' clientId (RTL6g3)
  • Validates message clientId against configured client using can_assume_client_id

456-488: LGTM!

The encoding flow correctly handles encryption, serialization, size validation, and sending via the connection manager with proper acknowledgment awaiting.


490-517: LGTM!

The state validation correctly prevents publishing when connection or channel states would make it impossible, implementing RTL6c4 requirements.

test/ably/realtime/realtimechannel_publish_test.py (6)

1-11: LGTM!

Imports and test class setup are correctly structured for async testing.


19-104: LGTM!

Good coverage of basic ACK/NACK functionality and msgSerial sequencing per RTN7a/RTN7b. The transport interception pattern for simulating server responses is well-implemented.


106-190: LGTM!

RTN7e tests correctly verify that pending messages fail when connection enters SUSPENDED or FAILED states.


192-281: LGTM!

RTN7d tests correctly validate the queueMessages option behavior on DISCONNECTED state - failing messages when false, preserving when true.


283-379: LGTM!

RTN19a2 tests correctly verify msgSerial reset behavior based on connectionId changes, and concurrent publish handling.


587-676: LGTM!

Comprehensive integration test for RTL3d + RTL6c2 verifying that queued messages are sent immediately on reconnection without waiting for channel reattachment.

ably/realtime/connectionmanager.py (6)

27-42: LGTM!

PendingMessage correctly identifies which protocol actions require acknowledgment and maintains the future for async/await support.


44-104: LGTM!

PendingMessageQueue correctly implements serial-based message completion with proper handling of ACK count ranges and error propagation for NACKs.


244-271: LGTM!

requeue_pending_messages correctly implements RTN19a by preserving the PendingMessage objects (including their futures and msgSerials) when requeueing for resend after reconnection.


326-334: LGTM!

The msgSerial reset logic correctly implements RTN19a2 - only resetting when the connectionId changes (indicating a failed resume/new connection).


431-453: LGTM!

on_ack and on_nack correctly delegate to PendingMessageQueue.complete_messages, with on_nack providing a default error when none is supplied by the server.


599-608: LGTM!

State transition handling correctly implements RTN7e (fail on SUSPENDED/CLOSED/FAILED) and RTN7d (fail on DISCONNECTED when queueMessages is false).

@ttypic ttypic force-pushed the AIT-96/realtime-publish branch from 8f52b9b to c34b665 Compare December 3, 2025 22:11
@ttypic ttypic requested a review from owenpearson December 3, 2025 22:12
@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 3, 2025 22:12 Inactive
@ttypic ttypic force-pushed the AIT-96/realtime-publish branch from c34b665 to df15d8a Compare December 3, 2025 22:15
@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 3, 2025 22:16 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
ably/util/helper.py (1)

78-100: AblyException arguments are in wrong order.

The AblyException constructor signature is (message, status_code, code, cause=None). Here 40009 is passed as status_code and 400 as code, but 40009 is the Ably error code and 400 is the HTTP status code - these should be swapped.

     if size > max_message_size:
         raise AblyException(
             f"Maximum size of messages that can be published at once exceeded "
             f"(was {size} bytes; limit is {max_message_size} bytes)",
-            40009,
-            400
+            400,
+            40009
         )
ably/realtime/connectionmanager.py (1)

190-201: Duplicate push to pending_message_queue when queueing messages.

When a message is queued (DISCONNECTED/CONNECTING state), it's added to both queued_messages (line 194) and pending_message_queue (lines 196-197). Later, when send_queued_messages retrieves the PendingMessage and calls send_protocol_message_on_connected_state, lines 214-215 push it to pending_message_queue again, causing duplicates.

         if self.state in (
             ConnectionState.DISCONNECTED,
             ConnectionState.CONNECTING,
         ):
             self.queued_messages.put(pending_message)
-            # For queued messages requiring ack, add to pending queue
-            if pending_message.ack_required:
-                self.pending_message_queue.push(pending_message)

             if pending_message.ack_required:
                 await pending_message.future
             return None

The message should only be added to pending_message_queue when it's actually sent (in send_protocol_message_on_connected_state), not when it's queued.

🧹 Nitpick comments (1)
ably/realtime/connectionmanager.py (1)

75-77: Redundant conditional check.

The check if first: on line 76 is redundant since line 71 already returns early if self.messages is empty, guaranteeing that first is truthy at this point.

         first = self.messages[0]
-        if first:
-            start_serial = first.message.get('msgSerial')
+        start_serial = first.message.get('msgSerial')
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8f52b9b and df15d8a.

📒 Files selected for processing (5)
  • ably/realtime/connectionmanager.py (7 hunks)
  • ably/realtime/realtime_channel.py (2 hunks)
  • ably/transport/websockettransport.py (3 hunks)
  • ably/util/helper.py (2 hunks)
  • test/ably/realtime/realtimechannel_publish_test.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
ably/realtime/realtime_channel.py (3)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • IncompatibleClientIdException (103-104)
ably/util/helper.py (1)
  • validate_message_size (78-100)
ably/types/message.py (6)
  • Message (24-231)
  • name (65-66)
  • data (69-70)
  • client_id (73-74)
  • encrypt (100-116)
  • as_dict (132-178)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/options.py (2)
  • use_binary_protocol (161-162)
  • use_binary_protocol (165-166)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
  • count (57-59)
  • on_ack (423-431)
  • on_nack (433-445)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • from_dict (83-84)
ably/realtime/connectionmanager.py (2)
ably/transport/websockettransport.py (1)
  • ProtocolMessageAction (34-56)
ably/util/exceptions.py (1)
  • AblyException (9-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: check (3.13)
  • GitHub Check: check (3.11)
  • GitHub Check: check (3.10)
  • GitHub Check: check (3.7)
  • GitHub Check: check (3.12)
  • GitHub Check: check (3.9)
  • GitHub Check: check (3.8)
🔇 Additional comments (7)
ably/transport/websockettransport.py (1)

168-179: LGTM!

The ACK and NACK protocol message handling is correctly implemented. The code properly extracts msgSerial and count with sensible defaults, converts error dicts to AblyException using the existing factory method, and routes to the appropriate connection manager callbacks.

ably/realtime/realtime_channel.py (1)

387-488: Solid publish implementation with proper validation.

The publish method correctly implements RTL6 spec requirements:

  • Flexible argument parsing supporting Message objects, dicts, lists, and (name, data) pairs
  • Proper clientId validation for identified clients
  • Message encryption support when cipher is configured
  • Size validation before sending
  • Awaits server acknowledgment per RTL6b

The implementation is well-structured and handles edge cases appropriately.

test/ably/realtime/realtimechannel_publish_test.py (1)

13-677: Comprehensive test coverage for RTN7 spec.

The test suite provides excellent coverage of the realtime publish behavior including:

  • Basic ACK/NACK handling
  • Sequential msgSerial incrementing
  • Pending message failure on state transitions (SUSPENDED, FAILED, DISCONNECTED)
  • queueMessages option behavior
  • msgSerial reset/preservation on connection changes
  • Concurrent publish handling
  • Message requeueing and resend on reconnect
  • Multi-count ACK handling
  • Channel reattachment edge cases

The test patterns using transport interception and state manipulation are appropriate for testing these protocol-level behaviors.

ably/realtime/connectionmanager.py (4)

27-106: Well-designed pending message tracking primitives.

The PendingMessage and PendingMessageQueue classes provide clean abstractions for tracking messages awaiting acknowledgment. The design correctly:

  • Identifies ack-required actions (MESSAGE, PRESENCE, ANNOTATION, OBJECT)
  • Associates a Future with each pending message for async waiting
  • Handles batch ACK/NACK with serial and count parameters
  • Provides methods to complete or fail messages

314-345: Correct RTN19a2 implementation for msgSerial reset.

The on_connected method correctly resets msg_serial to 0 when the connectionId changes (indicating a new connection rather than a resume). This aligns with the RTN19a2 spec requirement.


236-263: Solid requeue implementation for RTN19a.

The requeue_pending_messages method correctly preserves PendingMessage objects (with their Futures and msgSerials) when requeueing for resend after transport disconnection. Using reversed() maintains proper message ordering.


591-600: Correct RTN7d/RTN7e state transition handling.

The notify_state method properly fails queued and pending messages on:

  • SUSPENDED, CLOSED, FAILED states (RTN7e)
  • DISCONNECTED when queueMessages=false (RTN7d)

This ensures messages don't hang indefinitely when the connection cannot recover.

@ttypic ttypic force-pushed the AIT-96/realtime-publish branch from df15d8a to a22b7d5 Compare December 4, 2025 01:33
@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 4, 2025 01:34 Inactive
Copy link
Member

@owenpearson owenpearson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! My initial thoughts:

  • I think the main thing I found that's broken is that websockettransport is hardcoded to use JSON serialisation regardless of the use_binary_protocol setting. I think this was fine when it was just sending control protocol mesasges, but now that it's sending real data it's quite important to fix this. One particular problem with this is that encryption won't work over json transport because encrypted messages can't always be json serialised. It also would be nice to have all tests running on JSON and binary transports so we can make sure there aren't any other subtle bugs here.
  • We're not implementing bundling messages here, but could be nice to implement that while we're working on this SDK. I think that could be in a separate PR anyway.
  • Test coverage is much lower than ably-js. I tried prompting claude to try and copy all the tests from ably-js and it found quite a few tests that we don't have yet on this branch:
  ✅ Message Size Validation Tests (2 tests)
  - test_publish_message_exceeding_size_limit - Verifies 70KB message is rejected (RSL1i)
  - test_publish_message_within_size_limit - Verifies 10KB message succeeds

  ✅ Client ID Validation Tests (4 tests)
  - test_publish_with_matching_client_id - Explicit matching clientId succeeds (RTL6g2)
  - test_publish_with_null_client_id_when_identified - Null clientId gets populated by server (RTL6g1)
  - test_publish_with_mismatched_client_id_fails - Mismatched clientId is rejected (RTL6g3)
  - test_publish_with_wildcard_client_id_fails - Wildcard clientId is rejected (RTL6g3)

  ✅ Data Type Variation Tests (6 tests)
  - test_publish_with_string_data - String data
  - test_publish_with_json_object_data - JSON object data
  - test_publish_with_json_array_data - JSON array data
  - test_publish_with_null_data - Null data (RTL6i3)
  - test_publish_with_null_name - Null name (RTL6i3)
  - test_publish_message_array - Array of messages (RTL6i2)

  ✅ Channel State Validation Tests (2 tests)
  - test_publish_fails_on_suspended_channel - Publishing on SUSPENDED channel fails (RTL6c4)
  - test_publish_fails_on_failed_channel - Publishing on FAILED channel fails (RTL6c4)

  ✅ Idempotent Publishing Test (1 test)
  - test_idempotent_realtime_publishing - Messages with same ID (RSL1k2, RSL1k5)

@ttypic ttypic force-pushed the AIT-96/realtime-publish branch from a22b7d5 to 4e5f0d8 Compare December 4, 2025 11:46
@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 4, 2025 11:47 Inactive
@ttypic ttypic force-pushed the AIT-96/realtime-publish branch from 4e5f0d8 to a818d7f Compare December 4, 2025 11:47
@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 4, 2025 11:48 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
test/ably/realtime/realtimechannel_publish_test.py (1)

181-185: AblyException arguments are in wrong order.

The AblyException constructor signature is (message, status_code, code). Here 80000 is passed as status_code and 500 as code, but 80000 is an Ably error code and 500 is the HTTP status code.

         # Force FAILED state
         connection_manager.notify_state(
             ConnectionState.FAILED,
-            AblyException('Test failure', 80000, 500)
+            AblyException('Test failure', 500, 80000)
         )
🧹 Nitpick comments (2)
ably/realtime/realtime_channel.py (1)

457-470: In-place message encryption may cause unexpected side effects.

The m.encrypt(self.cipher) call modifies the Message object in place. If the caller reuses the same Message object after publishing, it will be in an encrypted state, which could lead to double-encryption on retry or unexpected behavior.

Consider working on a copy of the message to avoid mutating the caller's object:

         for m in messages:
             # Encode the message with encryption if needed
+            msg_to_encode = m
             if self.cipher:
-                m.encrypt(self.cipher)
+                # Work on a copy to avoid mutating caller's message
+                msg_to_encode = Message(name=m.name, data=m.data, client_id=m.client_id, id=m.id,
+                                        connection_id=m.connection_id, extras=m.extras)
+                msg_to_encode.encrypt(self.cipher)

             # Convert to dict representation
-            msg_dict = m.as_dict(binary=self.ably.options.use_binary_protocol)
+            msg_dict = msg_to_encode.as_dict(binary=self.ably.options.use_binary_protocol)
             encoded_messages.append(msg_dict)
ably/realtime/connectionmanager.py (1)

61-95: Redundant truthiness check on line 76.

The check if first: on line 76 is always True because the function returns early on line 73 if self.messages is empty. This is harmless but slightly redundant.

         first = self.messages[0]
-        if first:
-            start_serial = first.message.get('msgSerial')
-            if start_serial is None:
-                log.warning('MessageQueue.complete_messages(): first message has no msgSerial')
-                return
+        start_serial = first.message.get('msgSerial')
+        if start_serial is None:
+            log.warning('MessageQueue.complete_messages(): first message has no msgSerial')
+            return
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a22b7d5 and a818d7f.

📒 Files selected for processing (5)
  • ably/realtime/connectionmanager.py (7 hunks)
  • ably/realtime/realtime_channel.py (2 hunks)
  • ably/transport/websockettransport.py (3 hunks)
  • ably/util/helper.py (2 hunks)
  • test/ably/realtime/realtimechannel_publish_test.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/options.py (2)
  • use_binary_protocol (161-162)
  • use_binary_protocol (165-166)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (2)
  • count (57-59)
  • on_ack (413-421)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • from_dict (83-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: check (3.12)
  • GitHub Check: check (3.11)
  • GitHub Check: check (3.8)
  • GitHub Check: check (3.9)
  • GitHub Check: check (3.13)
  • GitHub Check: check (3.10)
  • GitHub Check: check (3.7)
🔇 Additional comments (13)
ably/util/helper.py (1)

78-100: LGTM! Size validation implementation is correct.

The function properly handles both binary (msgpack) and JSON encoding paths, uses compact JSON separators for accurate size calculation, and the AblyException arguments are now in the correct order (status_code=400, code=40009).

ably/transport/websockettransport.py (2)

34-56: Protocol message action enum expansion looks correct.

The new enum values (ACK, NACK, CONNECT, DISCONNECT, PRESENCE, SYNC, ACTIVATE, OBJECT, OBJECT_SYNC, ANNOTATION) align with the Ably protocol message action definitions.


168-179: ACK/NACK handling implementation is correct.

The handlers properly extract msgSerial and count with appropriate defaults: msgSerial=0 follows Ably protocol initialization (used by SDKs), and count=1 aligns with the protocol specification which treats missing or zero counts as 1. Error dictionaries are correctly converted to AblyException when present, and messages are routed to the appropriate ConnectionManager methods.

test/ably/realtime/realtimechannel_publish_test.py (2)

14-32: Good test structure with proper setup and cleanup.

The test class properly sets up test variables and each test ensures cleanup with await ably.close(). The test coverage for RTN7 (ACK/NACK), RTN19 (resend/msgSerial reset), RTL6 (publish behavior), and RSL1i (size limits) is comprehensive.


927-975: Idempotent publishing test validates deduplication correctly.

The test verifies that messages with the same explicit ID are deduplicated by the server (RSL1k5), confirming only 2 unique messages are received when 3 are published (2 with same ID, 1 different).

ably/realtime/realtime_channel.py (3)

387-440: Publish method argument parsing is well-structured.

The implementation correctly handles all RTL6i input forms: single Message, dict, list of messages, and (name, data) pairs. The RTL6i3 compliance for allowing null name/data is also properly handled.


442-455: Client ID validation correctly implements RTL6g.

The validation properly rejects wildcard (*) client IDs (RTL6g3) and validates client ID compatibility using can_assume_client_id. This aligns with the spec requirements.


490-517: State validation helper correctly implements RTL6c4.

The _throw_if_unpublishable_state method properly checks both connection state (allowing CONNECTED, CONNECTING, DISCONNECTED) and channel state (rejecting SUSPENDED, FAILED). The AblyException arguments are now in the correct order.

ably/realtime/connectionmanager.py (5)

27-45: PendingMessage class correctly models message acknowledgment tracking.

The class properly determines which actions require acknowledgment (MESSAGE, PRESENCE, ANNOTATION, OBJECT) and creates a Future only when needed.


202-218: Duplicate push prevention correctly addresses the past review concern.

Line 205 properly checks if pending_message not in self.pending_message_queue.messages before pushing, preventing the duplicate push issue noted in previous reviews.


304-316: msgSerial reset on new connectionId correctly implements RTN19a2.

The logic properly detects when connectionId changes (indicating a new connection rather than resume) and resets msg_serial to 0.


413-435: ACK/NACK handlers correctly route to queue completion.

The on_ack and on_nack methods properly delegate to pending_message_queue.complete_messages with appropriate error handling for NACK cases.


581-590: RTN7d and RTN7e state-based message failure is correctly implemented.

The logic properly fails pending messages on SUSPENDED/CLOSED/FAILED (RTN7e) and on DISCONNECTED when queue_messages=False (RTN7d).

@ttypic ttypic requested a review from owenpearson December 4, 2025 12:00
@ttypic ttypic force-pushed the AIT-96/realtime-publish branch from a818d7f to 8f1e406 Compare December 4, 2025 14:55
@github-actions github-actions bot temporarily deployed to staging/pull/643/features December 4, 2025 14:56 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
ably/realtime/connectionmanager.py (2)

245-249: Comment is misleading and requeued messages lack priority.

The comment says "Add back to front of queue" but append() adds to the end. Per RTN19a, messages awaiting ACK should be resent on reconnect, implying they should have priority over messages queued during disconnect.

To give requeued messages priority over new messages:

         # Add back to front of queue (FIFO but priority over new messages)
         # Store the entire PendingMessage object to preserve Future
-        for pending_msg in reversed(pending_messages):
+        for pending_msg in pending_messages:
             # PendingMessage object retains its Future, msgSerial
-            self.queued_messages.append(pending_msg)
+            self.queued_messages.appendleft(pending_msg)

This adds messages to the front in original order, ensuring they're sent before any newly queued messages.


220-224: Use popleft() for FIFO order consistency.

If the intent is FIFO (first-in-first-out), use popleft() to match the comment in requeue_pending_messages() about maintaining order.

     def send_queued_messages(self) -> None:
         log.info(f'ConnectionManager.send_queued_messages(): sending {len(self.queued_messages)} message(s)')
         while len(self.queued_messages) > 0:
-            pending_message = self.queued_messages.pop()
+            pending_message = self.queued_messages.popleft()
             asyncio.create_task(self._send_protocol_message_on_connected_state(pending_message))
test/ably/realtime/realtimechannel_publish_test.py (1)

182-185: AblyException arguments are in wrong order.

The AblyException constructor signature is (message, status_code, code). Here 80000 (Ably error code) is passed as status_code and 500 (HTTP status) as code. These should be swapped.

         # Force FAILED state
         connection_manager.notify_state(
             ConnectionState.FAILED,
-            AblyException('Test failure', 80000, 500)
+            AblyException('Test failure', 500, 80000)
         )
🧹 Nitpick comments (2)
ably/realtime/connectionmanager.py (2)

75-76: Redundant truthiness check.

After confirming self.messages is non-empty at line 71, first = self.messages[0] will always be a PendingMessage object (truthy). The if first: check is redundant.

         first = self.messages[0]
-        if first:
-            start_serial = first.message.get('msgSerial')
+        start_serial = first.message.get('msgSerial')

Then dedent the remaining logic accordingly.


263-266: Use log.error instead of log.exception when not handling an exception.

log.exception() is intended for use within exception handlers as it includes stack trace information. Here you're just logging a failure message, not handling an exception.

-            log.exception(
+            log.error(
                 f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: "
                 f"{pending_msg.message}"
             )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a818d7f and 8f1e406.

📒 Files selected for processing (5)
  • ably/realtime/connectionmanager.py (8 hunks)
  • ably/realtime/realtime_channel.py (2 hunks)
  • ably/transport/websockettransport.py (3 hunks)
  • ably/util/helper.py (2 hunks)
  • test/ably/realtime/realtimechannel_publish_test.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
test/ably/realtime/realtimechannel_publish_test.py (8)
ably/realtime/connectionmanager.py (10)
  • ably (723-724)
  • count (57-59)
  • notify_state (544-590)
  • state (727-728)
  • on_connected (304-335)
  • connection_details (731-732)
  • on_ack (413-421)
  • send_protocol_message (174-200)
  • requeue_pending_messages (226-253)
  • push (53-55)
ably/types/connectionstate.py (1)
  • ConnectionState (8-16)
ably/types/channelstate.py (1)
  • ChannelState (8-15)
ably/transport/websockettransport.py (3)
  • ProtocolMessageAction (34-56)
  • send (231-236)
  • connect (76-82)
ably/types/message.py (2)
  • Message (24-231)
  • connection_key (89-90)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • IncompatibleClientIdException (103-104)
test/ably/utils.py (4)
  • BaseAsyncTestCase (44-59)
  • WaitableEvent (239-253)
  • assert_waiter (187-199)
  • wait (249-250)
ably/realtime/realtime_channel.py (5)
  • attach (160-198)
  • publish (388-488)
  • state (690-692)
  • state (695-696)
  • name (684-686)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/options.py (2)
  • use_binary_protocol (161-162)
  • use_binary_protocol (165-166)
ably/transport/websockettransport.py (1)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • from_dict (83-84)
ably/realtime/connectionmanager.py (2)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/connectionstate.py (1)
  • ConnectionState (8-16)
🪛 GitHub Actions: Linting check
ably/realtime/connectionmanager.py

[error] 126-126: UP006 Use collections.deque instead of Deque for type annotation.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: check (3.11)
  • GitHub Check: check (3.7)
  • GitHub Check: check (3.13)
  • GitHub Check: check (3.12)
  • GitHub Check: check (3.9)
  • GitHub Check: check (3.10)
  • GitHub Check: check (3.8)
🔇 Additional comments (14)
ably/realtime/connectionmanager.py (5)

27-45: LGTM!

The PendingMessage class correctly identifies actions requiring acknowledgment (MESSAGE, PRESENCE, ANNOTATION, OBJECT) per RTN7a and creates a Future for awaiting the ACK/NACK.


308-316: LGTM!

The msgSerial reset logic correctly implements RTN19a2: reset to 0 only when connectionId changes (failed resume/new connection), preserving serial on successful resume.


413-435: LGTM!

The ACK/NACK handlers correctly delegate to PendingMessageQueue.complete_messages() with appropriate error handling for NACK.


575-590: LGTM!

State transition handling correctly implements RTN7e (fail on SUSPENDED/CLOSED/FAILED) and RTN7d (fail on DISCONNECTED when queueMessages=false).


437-443: LGTM!

Correctly implements RTN19a by requeuing pending messages before transport disposal.

ably/util/helper.py (1)

78-100: LGTM!

The validate_message_size function correctly computes encoded payload size based on the protocol (msgpack vs JSON) and raises an appropriately structured AblyException when the limit is exceeded.

test/ably/realtime/realtimechannel_publish_test.py (2)

14-16: Well-structured test suite with comprehensive spec coverage.

The test class covers key spec points (RTN7a/b/d/e, RTN19a/a2, RSL1i/k, RTL6c4/g/i) with clear docstrings mapping to requirements.


928-975: LGTM!

The idempotent publishing test correctly verifies that messages with explicit IDs are sent and server-side deduplication occurs for duplicate IDs (RSL1k2, RSL1k5).

ably/realtime/realtime_channel.py (4)

387-441: LGTM!

The publish method correctly handles multiple argument forms (RTL6i), with clear validation and parsing logic.


442-455: LGTM!

Client ID validation correctly implements RTL6g: rejecting wildcard * and mismatched client IDs with appropriate error codes.


468-470: LGTM!

Message size validation correctly uses the helper function with appropriate default (64KB per RSL1i).


490-517: LGTM!

State validation correctly implements RTL6c4, preventing publish on invalid connection/channel states with properly structured exceptions.

ably/transport/websockettransport.py (2)

34-56: LGTM!

The ProtocolMessageAction enum is correctly extended with the necessary protocol actions for message acknowledgment (ACK, NACK) and other protocol messages.


168-179: LGTM!

ACK/NACK handling correctly extracts protocol fields with sensible defaults and delegates to ConnectionManager for processing. Error conversion properly uses AblyException.from_dict().

Implemented Spec points:

## Message Publishing Specifications (RTL6)

### RTL6c - Messages published on channels in specific states
- Messages published when channel is not **ATTACHED** should be published immediately

### RTL6c2 - Message queuing behavior
- Messages can be queued when connection/channel is not ready
- Relates to processing queued messages when connection becomes ready

### RTL6c3 - Publishing without implicit attach

### RTL6c4 - Behavior when queueMessages client option is false

### RTL6d - Message bundling restrictions

#### RTL6d1: Maximum message size limits for bundling
- **RTL6d2**: All messages in bundle must have same clientId

#### RTL6d3: Can only bundle messages for same channel
- **RTL6d4**: Can only bundle messages with same action (MESSAGE or PRESENCE)

#### RTL6d7: Cannot bundle idempotent messages with non-idempotent messages

---

## Message Acknowledgment (RTN7)

### RTN7a
All **PRESENCE**, **MESSAGE**, **ANNOTATION**, and **OBJECT** ProtocolMessages sent to Ably expect either an **ACK** or **NACK** to confirm successful receipt or failure

### RTN7b
Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing `msgSerial` integer starting at zero

### RTN7c
If connection enters **SUSPENDED**, **CLOSED**, or **FAILED** state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues

### RTN7d
If `queueMessages` is false, messages entering **DISCONNECTED** state without acknowledgment should be treated as failed immediately

### RTN7e
When connection state changes to **SUSPENDED**/**CLOSED**/**FAILED**, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed

---

## Message Resending and Serial Handling (RTN19)

### RTN19a
Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK

### RTN19a2
In the event of a new `connectionId` (connection not resumed), previous `msgSerials` are meaningless and must be reset. The `msgSerial` counter resets to 0 for the new connection

---

## Channel State and Reattachment (RTL3, RTL4, RTL5)

### RTL3c
Channel state implications when connection goes into **SUSPENDED**

### RTL3d
When connection enters **CONNECTED** state, channels in **ATTACHING**, **ATTACHED**, or **SUSPENDED** states should transition to **ATTACHING** and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish

### RTL4c - Attach sequence
- **RTL4c1**: ATTACH message includes channel serial to resume from previous message or attachment

### RTL5i
If channel is **DETACHING**, re-send **DETACH** and remain in 'detaching' state
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
test/ably/realtime/realtimechannel_publish_test.py (1)

182-185: AblyException arguments are in wrong order.

The AblyException constructor signature is (message, status_code, code). Here 80000 is passed as status_code and 500 as code, but these should be swapped—500 is the HTTP status and 80000 is the Ably error code.

         # Force FAILED state
         connection_manager.notify_state(
             ConnectionState.FAILED,
-            AblyException('Test failure', 80000, 500)
+            AblyException('Test failure', 500, 80000)
         )
ably/realtime/connectionmanager.py (3)

220-224: Use popleft() for proper FIFO ordering.

Combined with changing appendleft() to append() above, use popleft() here for consistent FIFO semantics (first-in, first-out):

     def send_queued_messages(self) -> None:
         log.info(f'ConnectionManager.send_queued_messages(): sending {len(self.queued_messages)} message(s)')
         while len(self.queued_messages) > 0:
-            pending_message = self.queued_messages.pop()
+            pending_message = self.queued_messages.popleft()
             asyncio.create_task(self._send_protocol_message_on_connected_state(pending_message))

This ensures messages are sent in the order they were queued.


260-262: Use popleft() in fail_queued_messages for consistency.

For consistent FIFO semantics across the codebase:

         error = err or AblyException("Connection failed", 80000, 500)
         while len(self.queued_messages) > 0:
-            pending_msg = self.queued_messages.pop()
+            pending_msg = self.queued_messages.popleft()
             log.exception(

194-198: LIFO ordering issue: new messages queued with appendleft() but sent with pop() creates reverse order.

New messages are added to the front with appendleft() (line 195), but send_queued_messages() uses pop() (line 223) which removes from the end. This means newly queued messages will be sent after older messages in the queue—effectively LIFO order for new messages.

For FIFO ordering, use append() here instead of appendleft():

         if self.state in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING):
-            self.queued_messages.appendleft(pending_message)
+            self.queued_messages.append(pending_message)
             if pending_message.ack_required:
                 await pending_message.future
             return None

This also aligns with requeue_pending_messages() at line 249 which correctly uses append().

🧹 Nitpick comments (2)
ably/realtime/connectionmanager.py (2)

245-249: Update misleading comment: append() adds to end, not front.

The comment says "Add back to front of queue" but append() adds to the end. Update the comment to match the actual behavior:

-        # Add back to front of queue (FIFO but priority over new messages)
+        # Add to end of queue (these will be sent first via popleft in send_queued_messages)
         # Store the entire PendingMessage object to preserve Future
         for pending_msg in reversed(pending_messages):
             # PendingMessage object retains its Future, msgSerial
             self.queued_messages.append(pending_msg)

Note: With popleft() semantics, using reversed() here ensures requeued messages maintain their original order and are sent before any new messages added via append().


75-77: Redundant truthiness check.

After the empty check at lines 71-73, first at line 75 will always be truthy (since self.messages[0] returns the first element of a non-empty list). The if first: check is redundant.

         first = self.messages[0]
-        if first:
-            start_serial = first.message.get('msgSerial')
+        start_serial = first.message.get('msgSerial')
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8f1e406 and b9ab475.

📒 Files selected for processing (5)
  • ably/realtime/connectionmanager.py (8 hunks)
  • ably/realtime/realtime_channel.py (2 hunks)
  • ably/transport/websockettransport.py (3 hunks)
  • ably/util/helper.py (2 hunks)
  • test/ably/realtime/realtimechannel_publish_test.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
ably/realtime/connectionmanager.py (3)
ably/transport/websockettransport.py (1)
  • ProtocolMessageAction (34-56)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/connectionstate.py (1)
  • ConnectionState (8-16)
ably/util/helper.py (2)
ably/util/exceptions.py (1)
  • AblyException (9-84)
ably/types/options.py (2)
  • use_binary_protocol (161-162)
  • use_binary_protocol (165-166)
ably/transport/websockettransport.py (2)
ably/realtime/connectionmanager.py (3)
  • count (57-59)
  • on_ack (413-421)
  • on_nack (423-435)
ably/util/exceptions.py (2)
  • AblyException (9-84)
  • from_dict (83-84)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: check (3.7)
  • GitHub Check: check (3.12)
  • GitHub Check: check (3.11)
  • GitHub Check: check (3.10)
  • GitHub Check: check (3.13)
  • GitHub Check: check (3.8)
  • GitHub Check: check (3.9)
🔇 Additional comments (11)
ably/util/helper.py (1)

78-100: LGTM!

The validate_message_size function correctly validates message size limits using the appropriate encoding (msgpack for binary protocol, compact JSON otherwise). The AblyException arguments are in the correct order (status_code=400, code=40009).

ably/realtime/connectionmanager.py (4)

27-45: LGTM!

The PendingMessage class correctly encapsulates message tracking with appropriate ACK requirement detection for MESSAGE, PRESENCE, ANNOTATION, and OBJECT actions.


47-106: LGTM!

The PendingMessageQueue class provides clean abstractions for managing pending messages with proper serial-based completion logic.


304-316: LGTM!

The RTN19a2 implementation correctly resets msgSerial to 0 when the connectionId changes, indicating a new connection rather than a resume.


413-435: LGTM!

The on_ack and on_nack handlers correctly delegate to PendingMessageQueue.complete_messages, with appropriate error creation for NACK without explicit error.

ably/transport/websockettransport.py (2)

34-56: LGTM!

The ProtocolMessageAction enum expansion correctly adds all required protocol actions per the Ably specification.


168-179: LGTM!

The ACK/NACK handling correctly extracts msgSerial and count with sensible defaults, and properly routes to the connection manager's handlers.

ably/realtime/realtime_channel.py (2)

387-488: LGTM!

The publish method implementation correctly handles:

  • Multiple input forms (Message, dict, list, name+data)
  • RTL6g client ID validation for identified clients
  • Message encryption when cipher is configured
  • Size validation via validate_message_size
  • State validation before sending
  • Protocol message construction and awaiting acknowledgment

490-517: LGTM!

The _throw_if_unpublishable_state method correctly validates connection and channel states per RTL6c4, with properly ordered AblyException arguments.

test/ably/realtime/realtimechannel_publish_test.py (2)

14-16: LGTM!

The test class provides comprehensive coverage for RTN7 message acknowledgment spec points including ACK/NACK handling, msgSerial sequencing, pending message failures, and queueing behavior.


927-975: LGTM!

The idempotent publishing test correctly verifies RSL1k2/RSL1k5 behavior by checking that the server deduplicates messages with the same ID.

Copy link
Member

@owenpearson owenpearson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! :shipit:

@ttypic ttypic merged commit d117b79 into main Dec 4, 2025
10 checks passed
@ttypic ttypic deleted the AIT-96/realtime-publish branch December 4, 2025 15:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants