Skip to content

Conversation

@kingster
Copy link
Member

@kingster kingster commented Dec 17, 2025

Introduces a worker thread and event queue to handle callback events asynchronously in AudioStreamer, preventing blocking of the event loop and enabling full-duplex communication. Thread safety is ensured with mutexes and condition variables, and proper shutdown is handled in the destructor.

Solves voxcom-us#1

Summary by CodeRabbit

  • Refactor

    • Event handling now runs asynchronously via a background worker to improve responsiveness and thread safety
    • Events are queued and processed off the callback path; construction and shutdown ensure the worker is started and stopped cleanly
    • No public API changes
  • Chores

    • Added an extra debug log for binary write operations to aid troubleshooting
    • Updated an internal library pointer for maintenance

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 17, 2025

📝 Walkthrough

Walkthrough

Event handling moved from synchronous callbacks to an asynchronous model: events are enqueued as EventItem objects and a dedicated worker thread dequeues and dispatches CONNECT_SUCCESS, CONNECTION_DROPPED, CONNECT_ERROR, and MESSAGE events to session handlers; shutdown is signaled and joined safely.

Changes

Cohort / File(s) Summary
Asynchronous Event Processing
audio_streamer_glue.cpp
Adds EventItem struct, m_event_queue, m_queue_mutex, m_queue_cv, m_worker_thread, and m_shutdown. Constructor starts worker; eventCallback() enqueues events (drops when shutting down). Implements workerThread() to dequeue and dispatch CONNECT_SUCCESS, CONNECTION_DROPPED, CONNECT_ERROR, MESSAGE (calling send_initial_metadata, m_notify, media_bug_close, processMessage, logging). Ensures safe shutdown on destructor and on constructor failure (notify + join). Adds debug log in writeBinary. No public API signature changes.
Submodule pointer update
libs/libwsc
Updates submodule commit pointer; no API or code changes.

Sequence Diagram(s)

sequenceDiagram
    participant Ext as External Event Source
    participant CB as eventCallback
    participant Q as Event Queue
    participant WT as workerThread
    participant Sess as Session Manager
    participant Msg as processMessage

    Ext->>CB: Trigger event (CONNECT_SUCCESS / MESSAGE / ...)
    CB->>CB: if not m_shutdown
    CB->>Q: Enqueue EventItem
    CB->>Q: Notify m_queue_cv

    loop worker loop
        WT->>Q: Wait / Dequeue EventItem
        WT->>Sess: Lookup session by id
        alt Session Found
            alt MESSAGE
                WT->>Msg: processMessage(message)
                Msg-->>WT: return
            else CONNECT_SUCCESS / CONNECTION_DROPPED / CONNECT_ERROR
                WT->>Sess: invoke handler (send_initial_metadata / m_notify / media_bug_close)
                Sess-->>WT: return
            end
        else Session Not Found
            WT-->>WT: log missing session
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 I queued the hops, then raced the night,
A worker hums and sorts each byte,
Events line up, neat and quick,
I nibble logs and tap my stick,
Streams stay calm beneath moonlight.

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main technical change: introducing worker thread-based asynchronous event processing to enable full-duplex bidirectional communication, which aligns with the core modification in audio_streamer_glue.cpp.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.



📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 11a29e6 and 194b74b.

📒 Files selected for processing (1)
  • libs/libwsc
🚧 Files skipped from review as they are similar to previous changes (1)
  • libs/libwsc

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
audio_streamer_glue.cpp (1)

449-453: Consider draining behavior during shutdown.

The current logic drains all queued events before exiting on shutdown. If many events are queued or an event takes a long time to process (e.g., waiting on a lock in processMessage at line 272-283), the destructor's join() will block.

This is likely intentional for graceful shutdown, but if fast shutdown is ever needed, you could add an early exit check within the inner loop:

while (!m_event_queue.empty())
{
    if (m_shutdown.load()) break; // Optional: immediate exit
    // ... rest of processing
}
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b500d98 and b06589f.

📒 Files selected for processing (1)
  • audio_streamer_glue.cpp (7 hunks)
🔇 Additional comments (4)
audio_streamer_glue.cpp (4)

180-196: LGTM - Thread-safe event enqueueing.

The lock guard ensures thread safety, the shutdown check prevents queuing during destruction, and copying the message to a std::string avoids dangling pointer issues when the original buffer is freed.


375-388: LGTM - Proper worker thread shutdown sequence.

The shutdown flag is set under the lock (consistent with the worker's wait predicate), the condition variable is notified, and the thread is joined before the destructor completes. Since client is a member, its destructor runs after this body, and any callbacks will be safely rejected by the m_shutdown check in eventCallback.


465-497: Session lifecycle handling looks correct.

The switch_core_session_locate / switch_core_session_rwunlock pattern correctly handles the case where the session might be destroyed between event queuing and processing. Events for non-existent sessions are silently dropped, which is appropriate behavior.


720-729: Advisory: Detached thread in finish() interacts with the new async model.

The existing finish() function creates a detached thread that calls disconnect(). With the new async model:

  1. disconnect() may trigger setCloseCallback which calls eventCallback(CONNECTION_DROPPED, ...)
  2. Since m_shutdown is still false at this point, the event will be queued
  3. The worker thread will try to process it via switch_core_session_locate()
  4. If the session is already gone, the event is safely dropped

This appears safe due to the session lookup guard, but the AudioStreamer might outlive stream_session_cleanup since the detached thread holds the only reference. Consider whether this detached pattern is still desirable, or if the disconnect should be synchronous now that you have a worker thread that can handle it asynchronously.

@kingster kingster changed the title Add worker thread for async event processing (full duplex bidi) Worker thread based async event processing (full duplex bidi) Dec 17, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
audio_streamer_glue.cpp (1)

412-437: Add mutex protection for WebSocket client operations.

The client object is accessed from multiple threads without synchronization:

  • The write_frame_thread calls writeBinary() at multiple points (lines 1020, 1056, 1071, 1117, 1134)
  • Main/session threads call writeText() (lines 197, 877, 1176)
  • A detached thread calls disconnect() (line 749)
  • WebSocket callbacks invoke eventCallback() from the library's internal thread

The check-then-act patterns in writeBinary() and writeText() (checking isConnected() then calling client.sendBinary()/client.sendMessage()) are not atomic. Another thread could call disconnect() between the check and the send. Add a dedicated mutex to protect all client operations: isConnected(), sendBinary(), sendMessage(), and disconnect().

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b06589f and 68a778a.

📒 Files selected for processing (1)
  • audio_streamer_glue.cpp (7 hunks)
🔇 Additional comments (7)
audio_streamer_glue.cpp (7)

11-15: LGTM!

The threading-related includes are appropriate for the worker thread implementation.


136-162: Exception safety properly implemented!

The try-catch block correctly addresses the past review comment. If client.connect() throws, the worker thread is cleanly shut down before the exception propagates, preventing std::terminate() from being called. The cleanup sequence (lock, signal, notify, join, rethrow) is correct.


202-218: LGTM!

The event queueing logic is thread-safe and correct. The use of std::move at line 216 optimizes performance by avoiding unnecessary string copies, and the shutdown check prevents events from being queued during teardown.


397-410: LGTM!

The destructor correctly implements graceful shutdown by signaling the worker thread and waiting for it to complete all queued work before terminating.


427-429: LGTM!

The debug log addition is helpful for troubleshooting and respects the suppress_log setting.


452-457: LGTM!

The EventItem struct is a clean, simple container for queueing events.


534-540: LGTM!

The member variables are appropriately typed for the worker thread implementation. The use of std::atomic<bool> for m_shutdown is a safe choice, though technically unnecessary since it's always accessed under m_queue_mutex.

Introduces a worker thread and event queue to handle callback events asynchronously in AudioStreamer, preventing blocking of the event loop and enabling full-duplex communication. Thread safety is ensured with mutexes and condition variables, and proper shutdown is handled in the destructor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants