Skip to content

Conversation

@davegullo
Copy link

@davegullo davegullo commented Dec 18, 2025

📺 Hang Source (Consuming) - New Video Source

  • Receive and display MoQ streams directly in OBS as a video source
  • Supported Video Codecs for Hang Source:
    • H.264/AVC (h264, avc1, avc3)
    • HEVC/H.265 (hevc, h265, hev1, hvc1)
    • VP9 (vp9, vp09)
    • AV1 (av1, av01)
    • VP8 (vp8)
  • Keyframe-aware decoder initialization - waits for keyframe before decoding to avoid common decoder errors
  • Debounced reconnection (500ms) - prevents excessive reconnects while user is typing URL/broadcast settings
  • Thread-safe session management with generation tracking to handle concurrent callbacks safely
  • Graceful error recovery - flushes decoder and waits for next keyframe after consecutive decode errors
  • YUV420P → RGBA color space conversion for OBS display

Testing Concerns

  • When developing, make sure your increment supports changing the URL and Broadcast parameters, and does not cause race conditions, and properly shows the video preview screen (below)
  • This was stress tested for ~4 hours on Linux and MacOS and kept up at real time, with 0 memory leaks on exit
flyover-ranch

Implement hang_source, a new OBS source that consumes MoQ streams from
a relay using the libmoq library.

Features:
- Configurable URL and BROADCAST fields in OBS source properties
- H.264 decoder-safe: waits for keyframes with SPS/PPS before decoding
- Realtime-first: drops stale frames to minimize latency
- Clean session handling: fully destroys and recreates context on
  URL/broadcast changes to prevent stale frame processing

Files added:
- src/hang-source.c/cpp/h: Core source implementation

Files modified:
- CMakeLists.txt: Build integration for hang_source
- src/logger.h: Enhanced logging support
- src/moq-output.cpp/h: Output module updates
- src/obs-moq.cpp: Plugin registration
- Rename hang_source_disconnect -> hang_source_disconnect_locked and
  hang_source_destroy_decoder -> hang_source_destroy_decoder_locked to
  indicate they require the mutex
- Add mutex locking in hang_source_destroy()
- Add stale callback checks in on_session_status(), on_catalog(), and
  on_video_frame() to ignore callbacks from disconnected sessions
- Handle generation changes in on_catalog() to clean up tracks if
  reconnection happened during setup
- Add origin validity check in hang_source_start_consume()
- Refactor hang_source_init_decoder() to prepare new decoder state
  outside the mutex, then swap atomically inside the mutex
- Add comprehensive mutex protection throughout hang_source_decode_frame()
  with early exits on invalid decoder state
- Ensure ctx->frame.data[0] is set to NULL when frame buffer is freed
- Remove noisy per-frame LOG_DEBUG calls for video frame receive/output
API updates:
- moq_consume_video_track -> moq_consume_video_ordered
- moq_consume_video_track_close -> moq_consume_video_close
- moq_publish_media_init -> moq_publish_media_ordered

Thread safety fixes in hang-source.cpp:
- Add mutex protection for ctx->url and ctx->broadcast in hang_source_update
- Pass generation number through on_session_status to hang_source_start_consume
- Capture origin handle and broadcast copy while holding mutex in start_consume
- Verify generation hasn't changed after moq_origin_consume completes
- Create origin/session into local variables in hang_source_reconnect
- Re-verify generation before committing new handles to context
- Clean up stale resources if generation changed during reconnect setup

These fixes prevent race conditions when the user rapidly changes broadcast
settings, which could cause callbacks from old sessions to interfere with
new connections.
- Update CMakeLists.txt and CMakePresets.json build configuration
- Refactor moq-output.cpp to use master's lifecycle pattern:
  - Create origin in constructor, close in destructor
  - Connect session before publishing broadcast
  - Simplified Stop() cleanup
- Fix moq_session_connect call: use 0 instead of NULL for origin_consume
- Update moq-service.cpp
- Add hang-source registration to obs-moq.cpp
Add a volatile bool shutting_down flag to prevent use-after-free when
MoQ callbacks fire during destruction. The issue was that closing MoQ
handles triggers async callbacks, but by the time they fire the context
may already be freed.

Changes:
- Add shutting_down flag to hang_source struct
- Set flag at start of hang_source_destroy before disconnecting
- Add 100ms sleep after disconnect to let callbacks drain
- Add early-exit checks for shutting_down in all callbacks:
  - on_session_status
  - on_catalog
  - on_video_frame
  - hang_source_video_tick
  - hang_source_decode_frame

Callbacks now check the flag while holding the mutex and exit
immediately if shutting down, avoiding access to freed memory.
@coderabbitai
Copy link

coderabbitai bot commented Dec 18, 2025

Walkthrough

CMakeLists.txt adds FFmpeg detection via pkg_check_modules and wires FFmpeg includes/libs to the build. Logging macros in src/logger.h were replaced with a MOQ_-prefixed macro family (MOQ_LOG) and per-level macros (LOG_DEBUG/INFO/WARNING/ERROR) now call MOQ_LOG with numeric levels. A new MoQ OBS source is introduced via src/moq-source.cpp and src/moq-source.h (adds register_moq_source()), and src/obs-moq.cpp now calls register_moq_source(). moq-output.cpp passes 0 instead of NULL to moq_session_connect.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.54% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description check ✅ Passed The description comprehensively explains the new Hang Source feature, supported codecs, key features, testing approach, and includes a preview screenshot, all directly related to the changeset.
Title check ✅ Passed The title accurately summarizes the main objective of the changeset: adding a new MoQ Source for receiving broadcasts.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (9)
CMakeLists.txt (1)

54-59: Consider removing unused libswresample dependency.

The FFmpeg integration looks correct. However, libswresample is included in the pkg-config check but doesn't appear to be used in hang-source.cpp (only libavcodec, libavutil, and libswscale are imported). Consider removing it to reduce unnecessary dependencies.

🔎 Suggested fix
-pkg_check_modules(FFMPEG REQUIRED libavcodec libavutil libswscale libswresample)
+pkg_check_modules(FFMPEG REQUIRED libavcodec libavutil libswscale)
src/hang-source.cpp (4)

33-37: Consider using std::atomic instead of volatile for thread-safety.

While the code correctly uses mutex protection around most accesses, volatile does not provide memory ordering guarantees in C/C++. For clearer intent and proper semantics, consider using std::atomic<bool> and std::atomic<uint32_t>.

🔎 Suggested change
+#include <atomic>
+
 struct hang_source {
     // ...
-    volatile bool shutting_down;
+    std::atomic<bool> shutting_down;
     // ...
-    volatile uint32_t generation;
+    std::atomic<uint32_t> generation;

138-140: The 100ms sleep is a pragmatic workaround but not a guarantee against use-after-free.

This relies on timing rather than synchronization primitives. If a callback is mid-execution when shutting_down is set and takes longer than 100ms, there's still a race. Consider documenting this limitation or using reference counting for more robust lifetime management in future iterations.


682-694: Hardcoded AV_PIX_FMT_YUV420P assumption may fail for non-YUV420P H.264 streams.

While YUV420P is the most common pixel format for H.264, the codec can produce other formats (YUV422P, YUV444P, etc.). Consider validating frame->format after decoding and recreating the SwsContext if it differs, or logging a warning when the assumption doesn't hold.


360-366: Hardcoded video track index 0.

The implementation assumes a single video track. This is reasonable for the initial implementation, but consider logging or handling the case where multiple video tracks exist in the catalog.

src/hang-source.c (4)

17-18: Consider making frame dimensions dynamic.

The hardcoded 1920x1080 dimensions may not match the actual video stream resolution. While the decoder initialization (lines 349-356) updates ctx->frame.width and ctx->frame.height from the video config, the initial allocation and any fallback logic assumes these fixed dimensions.

Consider initializing these as defaults but allowing full dynamic sizing based on the actual stream configuration.


28-28: Remove volatile qualifier - mutex provides sufficient synchronization.

The generation field is marked volatile, but all accesses are protected by ctx->mutex. The mutex provides the necessary memory ordering guarantees, making volatile redundant and potentially misleading. In fact, volatile does not provide atomicity or memory barriers sufficient for multithreading.

🔎 Apply this diff to remove the volatile qualifier:
-	volatile uint32_t generation;  // Increments on reconnect
+	uint32_t generation;  // Increments on reconnect

91-94: Frame dimensions initialized to hardcoded values.

The OBS frame structure is initialized with fixed 1920x1080 dimensions, which will be overwritten later if the video config specifies different dimensions (lines 349-356). This works, but creates a brief window where the dimensions are inconsistent.

Consider deferring frame structure initialization until after the video config is received, or document that these are placeholder values.


181-190: Consider adding generation check for consistency.

Unlike on_catalog (lines 198-200) and the pattern shown in relevant_code_snippets, this callback doesn't validate the current generation. While the session status callback is relatively benign (just logging), adding a generation check would be consistent with the defensive pattern used elsewhere and prevent confusing log messages from stale callbacks.

🔎 Suggested addition:
 static void on_session_status(void *user_data, int32_t code)
 {
 	struct hang_source *ctx = (struct hang_source *)user_data;
+	
+	pthread_mutex_lock(&ctx->mutex);
+	bool shutting_down = ctx->shutting_down;  // if added
+	pthread_mutex_unlock(&ctx->mutex);
+	
+	if (shutting_down)
+		return;
 	
 	if (code == 0) {
 		LOG_INFO("MoQ session connected successfully");
 	} else {
 		LOG_ERROR("MoQ session failed with code: %d", code);
 	}
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3de95a9 and b644d05.

📒 Files selected for processing (7)
  • CMakeLists.txt (1 hunks)
  • src/hang-source.c (1 hunks)
  • src/hang-source.cpp (1 hunks)
  • src/hang-source.h (1 hunks)
  • src/logger.h (1 hunks)
  • src/moq-output.cpp (1 hunks)
  • src/obs-moq.cpp (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
src/obs-moq.cpp (2)
src/hang-source.cpp (2)
  • register_hang_source (895-913)
  • register_hang_source (895-895)
src/hang-source.c (1)
  • register_hang_source (501-509)
src/hang-source.cpp (1)
src/hang-source.c (11)
  • hang_source_update (117-141)
  • hang_source_reconnect (253-297)
  • hang_source_init_decoder (330-397)
  • on_video_frame (240-250)
  • hang_source_decode_frame (417-498)
  • on_session_status (181-190)
  • on_catalog (192-238)
  • hang_source_create (68-99)
  • hang_source_destroy (101-115)
  • hang_source_get_defaults (143-147)
  • hang_source_properties (149-159)
src/hang-source.c (1)
src/hang-source.cpp (34)
  • hang_source_update (64-64)
  • hang_source_update (153-187)
  • hang_source_update (153-153)
  • hang_source_destroy (65-65)
  • hang_source_destroy (128-151)
  • hang_source_destroy (128-128)
  • hang_source_properties (67-67)
  • hang_source_properties (195-205)
  • hang_source_properties (195-195)
  • hang_source_get_defaults (68-68)
  • hang_source_get_defaults (189-193)
  • hang_source_get_defaults (189-189)
  • on_session_status (71-71)
  • on_session_status (281-322)
  • on_session_status (281-281)
  • on_catalog (72-72)
  • on_catalog (324-398)
  • on_catalog (324-324)
  • on_video_frame (73-73)
  • on_video_frame (400-428)
  • on_video_frame (400-400)
  • hang_source_reconnect (76-76)
  • hang_source_reconnect (431-505)
  • hang_source_reconnect (431-431)
  • hang_source_init_decoder (79-79)
  • hang_source_init_decoder (628-728)
  • hang_source_init_decoder (628-628)
  • hang_source_decode_frame (81-81)
  • hang_source_decode_frame (750-892)
  • hang_source_decode_frame (750-750)
  • hang_source_create (83-126)
  • hang_source_create (83-83)
  • register_hang_source (895-913)
  • register_hang_source (895-895)
src/hang-source.h (2)
src/hang-source.cpp (2)
  • register_hang_source (895-913)
  • register_hang_source (895-895)
src/hang-source.c (1)
  • register_hang_source (501-509)
🔇 Additional comments (8)
CMakeLists.txt (1)

64-64: LGTM!

New source files properly added to the build.

src/moq-output.cpp (1)

87-87: LGTM!

The parameter change from NULL to 0 is correct. This output is publishing (not consuming), so the consume origin is correctly set to 0, while the publish origin is passed as the third parameter.

src/hang-source.h (1)

1-3: LGTM!

Clean header with appropriate include guard and function declaration.

src/obs-moq.cpp (1)

23-23: LGTM!

Clean integration of the new Hang Source following the existing registration pattern.

Also applies to: 44-44

src/hang-source.cpp (2)

894-913: LGTM!

Clean source registration following OBS patterns.


83-126: LGTM!

Proper initialization of the source context with clear field setup.

src/hang-source.c (1)

500-519: LGTM! Source registration is correctly configured.

The registration properly sets up the Hang Source with appropriate flags (OBS_SOURCE_ASYNC_VIDEO for asynchronous frame delivery and OBS_SOURCE_DO_NOT_DUPLICATE to prevent source duplication), and wires all necessary lifecycle callbacks.

src/logger.h (1)

3-8: LGTM! Logging macros correctly implement OBS log levels.

The refactored macros properly:

  • Use the MOQ_ prefix to avoid naming conflicts with OBS log level constants
  • Map to the correct OBS log levels (DEBUG=400, INFO=300, WARNING=200, ERROR=100)
  • Add a consistent [obs-moq] prefix to all log messages for easy filtering

The implementation is clean and follows OBS logging conventions.

Comment on lines +811 to +818
packet->data = (uint8_t *)frame_data.payload;
packet->size = frame_data.payload_size;
packet->pts = frame_data.timestamp_us / 1000; // Convert to milliseconds
packet->dts = packet->pts;

// Send packet to decoder
int ret = avcodec_send_packet(ctx->codec_ctx, packet);
av_packet_free(&packet);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

The code is safe, but relies on FFmpeg's fallback data copying behavior.

When a packet's buf field is not set, avcodec_send_packet will copy the packet data if it is not reference-counted rather than creating a reference. Since the packet is manually constructed with external data (frame_data.payload) and no reference-counted buffer is attached, FFmpeg will internally copy the data before the decoder processes it. This copy occurs inside avcodec_send_packet, ensuring data remains valid even after moq_consume_frame_close frees the frame at line 891.

For better practice, ensure the packet data is reference-counted by using av_packet_from_data() or similar mechanisms instead of relying on this fallback copying behavior.

🤖 Prompt for AI Agents
In src/hang-source.cpp around lines 811 to 818, the packet is built by assigning
external frame_data.payload to packet->data without attaching a
reference-counted buffer, relying on FFmpeg's internal copy behavior; change
this to create a reference-counted packet (e.g., use av_packet_from_data or
allocate an AVBufferRef via av_buffer_create and set packet->buf) so the packet
owns its data and remains valid after moq_consume_frame_close frees the frame,
and ensure proper error handling and freeing of the packet/buffer when done.

Thread safety improvements:
- Replace volatile with std::atomic for shutting_down and generation fields
- Use proper atomic operations (.load()/.store()) for generation counter
- Copy url/broadcast while holding mutex before logging in video_tick
  to prevent use-after-free race condition

Memory safety fixes:
- Use av_mallocz instead of av_malloc for codec extradata to ensure
  padding bytes are zero-initialized
- Detect mid-stream resolution changes and reinitialize scaler context
  and frame buffer to prevent out-of-bounds memory access

Documentation:
- Add detailed comment explaining the 100ms callback drain delay
  limitation and suggest reference counting as a more robust solution
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/hang-source.cpp (2)

130-165: Consider reference counting for more robust shutdown guarantee.

The 100ms sleep-based approach is pragmatic and documented, but as noted in the comments, it's a timing-based workaround rather than a synchronization guarantee. For improved robustness, consider implementing atomic reference counting:

  • Increment refcount when entering a callback
  • Decrement when exiting
  • Wait for refcount to reach zero before freeing ctx

This could be implemented using std::shared_ptr or a manual atomic refcount with a condition variable.

Example approach using atomic refcount

Add to hang_source structure:

std::atomic<int> refcount{0};
pthread_cond_t refcount_zero;

Wrapper for callbacks:

struct callback_guard {
    hang_source *ctx;
    callback_guard(hang_source *c) : ctx(c) { ctx->refcount++; }
    ~callback_guard() { 
        if (--ctx->refcount == 0) 
            pthread_cond_signal(&ctx->refcount_zero); 
    }
};

In destroy:

pthread_mutex_lock(&ctx->mutex);
ctx->shutting_down = true;
hang_source_disconnect_locked(ctx);
while (ctx->refcount > 0) {
    pthread_cond_wait(&ctx->refcount_zero, &ctx->mutex);
}
pthread_mutex_unlock(&ctx->mutex);

823-860: Consider using reference-counted packet for clarity.

The current implementation assigns external buffer (frame_data.payload) directly to packet->data without setting packet->buf. While FFmpeg will internally copy the data when no buffer reference is attached (making it safe since the copy occurs before moq_consume_frame_close at line 891), this relies on implicit fallback behavior.

For better code clarity and explicit ownership semantics, consider using av_packet_from_data() or creating an AVBufferRef with av_buffer_create() that wraps the external data with a custom free function.

Example using av_buffer_create with callback
// Callback to close MoQ frame when buffer is freed
static void moq_frame_buffer_free(void *opaque, uint8_t *data) {
    int32_t frame_id = (int32_t)(intptr_t)opaque;
    moq_consume_frame_close(frame_id);
}

// In hang_source_decode_frame, replace packet creation:
AVPacket *packet = av_packet_alloc();
if (!packet) {
    pthread_mutex_unlock(&ctx->mutex);
    moq_consume_frame_close(frame_id);
    return;
}

AVBufferRef *buf = av_buffer_create(
    (uint8_t *)frame_data.payload,
    frame_data.payload_size,
    moq_frame_buffer_free,
    (void *)(intptr_t)frame_id,
    0  // flags
);

if (!buf) {
    av_packet_free(&packet);
    pthread_mutex_unlock(&ctx->mutex);
    moq_consume_frame_close(frame_id);
    return;
}

packet->buf = buf;
packet->data = buf->data;
packet->size = buf->size;
packet->pts = frame_data.timestamp_us / 1000;
packet->dts = packet->pts;

// Send packet (it now owns the buffer)
int ret = avcodec_send_packet(ctx->codec_ctx, packet);
av_packet_free(&packet);  // This will trigger moq_frame_buffer_free when refcount reaches 0

// Remove the moq_consume_frame_close call at lines 858, 891 etc. as it's handled by the buffer free callback

Note: This requires careful integration to ensure frame_id lifetime and error paths are handled correctly.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b644d05 and 6a4911e.

📒 Files selected for processing (1)
  • src/hang-source.cpp (1 hunks)
🔇 Additional comments (16)
src/hang-source.cpp (16)

19-20: LGTM: Hardcoded dimensions serve as defaults.

The hardcoded FRAME_WIDTH and FRAME_HEIGHT values are used as initial defaults. The code properly handles dynamic resolution changes later (lines 898-961 reinitialize the scaler when decoded frame dimensions differ).


22-60: LGTM: Well-designed structure with proper concurrency primitives.

The structure uses atomics appropriately (shutting_down, generation) for lock-free checks in callbacks, while protecting other shared state with mutex. Generation tracking is an effective pattern for invalidating stale asynchronous callbacks during reconnection.


85-128: LGTM: Thorough initialization of all state.

The creation function properly initializes all fields, uses bzalloc for zero-initialization, and correctly sets up the mutex and frame structure. The call to hang_source_update at the end applies initial settings appropriately.


167-201: LGTM: Proper debounce mechanism with thread-safe state updates.

The update function correctly defers reconnection to video_tick, recording only the timestamp and pending flag. String comparisons and memory management are handled properly, including NULL-safe logging.


222-295: LGTM: Race condition from past review is now resolved.

The debounce mechanism is well-implemented. The previous race condition (reading ctx->url and ctx->broadcast after mutex unlock) has been fixed by copying these values under mutex protection (lines 284-286) before logging.


301-342: LGTM: Robust session status handling with proper cleanup.

The callback correctly checks shutting_down and session validity before proceeding. Error handling properly cleans up session/origin resources and blanks the video display.


344-418: LGTM: Catalog callback with proper generation tracking.

The callback correctly detects stale callbacks using generation matching and properly cleans up resources when generation changes occur during setup. Mutex usage and error handling are appropriate.


420-448: LGTM: Video frame callback with insightful comment on race avoidance.

The callback correctly validates state and includes a helpful comment explaining why generation-based validation is used instead of checking video_track (to handle the race where frames may arrive before the track handle is stored in on_catalog).


451-525: LGTM: Robust reconnection with generation tracking and guard flag.

The reconnection logic properly prevents concurrent reconnects, increments generation to invalidate stale callbacks, and creates MoQ resources outside the mutex to avoid blocking. The generation check before installing new handles ensures stale resources are cleaned up if another reconnect occurred during setup.


527-604: LGTM: Start consume with comprehensive error handling and generation checks.

The function correctly validates generation at each step and cleans up resources appropriately on failures. Error paths properly blank the video to indicate disconnection.


606-646: LGTM: Clean resource cleanup hierarchy.

The disconnect function properly releases resources in the correct order (child resources before parent) and resets all state. The blank video function correctly clears the OBS video output by passing NULL.


648-748: LGTM: Padding initialization issue from past review is now resolved.

The decoder initialization correctly uses av_mallocz (line 679) which zero-initializes the entire allocation including the AV_INPUT_BUFFER_PADDING_SIZE padding bytes as required by FFmpeg. The swap pattern (create outside mutex, then lock/destroy-old/install-new/unlock) properly avoids blocking under mutex.


895-961: LGTM: Dimension validation issue from past review is now resolved.

The code now properly validates decoded frame dimensions (lines 900-912) before calling sws_scale, preventing out-of-bounds reads/writes on mid-stream resolution changes. The range check (0 < dimension ≤ 16384) and reinitialization logic are appropriate.


963-977: LGTM: Color conversion and frame output.

The YUV420P to RGBA conversion uses validated dimensions (from lines 900-961), making the sws_scale call safe. Frame timestamp and output to OBS are handled correctly, and resources are properly cleaned up.


750-768: LGTM: Clean decoder resource cleanup.

The decoder destruction function properly frees all FFmpeg resources and nulls the pointers, maintaining consistent state.


979-998: LGTM: Standard OBS source registration.

The registration function correctly configures the source with appropriate type (OBS_SOURCE_TYPE_INPUT) and flags (OBS_SOURCE_ASYNC_VIDEO | OBS_SOURCE_DO_NOT_DUPLICATE). All necessary callbacks including video_tick for debounced reconnection are properly registered.

Replace static H.264/YUV420P/1080p assumptions with dynamic detection
from moq_video_config and decoded frame properties.

Codec detection:
- Add codec_string_to_id() to map catalog codec strings to FFmpeg IDs
- Support H.264 (h264/avc/avc1), HEVC (hevc/h265/hev1/hvc1),
  VP9 (vp9/vp09), AV1 (av1/av01), and VP8
- Read codec from config->codec/codec_len instead of hardcoding H.264

Pixel format handling:
- Add current_pix_fmt field to track actual decoded pixel format
- Query frame->format after decode instead of assuming YUV420P
- Recreate swscale context when pixel format changes
- Supports any format: YUV420P, YUV420P10LE, YUV444P, NV12, etc.

Resolution handling:
- Remove FRAME_WIDTH/FRAME_HEIGHT (1920x1080) constants
- Initialize frame dimensions to 0, set dynamically from stream
- Try to get dimensions from codec context after avcodec_open2()
  if not available in moq_video_config
- Existing mid-stream resolution change handling preserved

Deferred initialization:
- Defer sws_ctx and frame_buffer allocation to first decoded frame
- Allows proper format detection before committing to scaler config
- Reduces wasted allocations when config dimensions are unavailable

Logging improvements:
- Include av_get_pix_fmt_name() for readable format names
- Log codec string, dimensions, and pixel format on initialization
- Log format/dimension changes when scaler is reinitialized
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/hang-source.cpp (2)

184-198: The 100ms sleep is a pragmatic workaround; consider implementing reference counting for robustness.

The comment clearly documents that the timing-based delay doesn't guarantee synchronization if a callback takes longer than 100ms. While this is acceptable for fast callbacks, a more robust solution would use atomic reference counting (increment on callback entry, decrement on exit, wait for zero before freeing).

Possible refcounting approach

Replace the timing-based sleep with an atomic counter and condition variable:

// In struct hang_source:
std::atomic<int32_t> callback_refcount;
pthread_cond_t refcount_cond;

// In callbacks (entry):
ctx->callback_refcount.fetch_add(1);

// In callbacks (exit):
if (ctx->callback_refcount.fetch_sub(1) == 1) {
    pthread_cond_signal(&ctx->refcount_cond);
}

// In destroy:
pthread_mutex_lock(&ctx->mutex);
ctx->shutting_down = true;
while (ctx->callback_refcount.load() > 0) {
    pthread_cond_wait(&ctx->refcount_cond, &ctx->mutex);
}
pthread_mutex_unlock(&ctx->mutex);

This would provide deterministic synchronization instead of relying on timing.


883-897: Consider using reference-counted packets for better practice.

While the current implementation is functionally safe (avcodec_send_packet copies data internally when packet->buf is NULL), it relies on FFmpeg's fallback behavior rather than explicitly managing the packet data lifetime.

Recommended approach using av_packet_from_data

To make the packet explicitly own its data and avoid relying on internal copying:

-	packet->data = (uint8_t *)frame_data.payload;
-	packet->size = frame_data.payload_size;
+	// Copy data into a reference-counted buffer owned by the packet
+	uint8_t *data_copy = (uint8_t *)av_malloc(frame_data.payload_size);
+	if (!data_copy) {
+		av_packet_free(&packet);
+		pthread_mutex_unlock(&ctx->mutex);
+		moq_consume_frame_close(frame_id);
+		return;
+	}
+	memcpy(data_copy, frame_data.payload, frame_data.payload_size);
+	av_packet_from_data(packet, data_copy, frame_data.payload_size);

This makes the ownership explicit and allows closing the frame immediately after creating the packet, rather than relying on FFmpeg's internal copy timing.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6a4911e and 9756763.

📒 Files selected for processing (1)
  • src/hang-source.cpp (1 hunks)
🔇 Additional comments (11)
src/hang-source.cpp (11)

21-60: LGTM - Codec mapping is comprehensive and safe.

The function correctly handles multiple codec name variants with proper length-based string comparisons, preventing buffer overruns. The case-insensitive matching is appropriate for codec identifiers.


62-102: Well-designed state structure with proper thread-safety primitives.

The struct effectively separates pending from active settings for debounced updates, uses atomic generation tracking to invalidate stale callbacks, and includes comprehensive decoder state management.


127-172: LGTM - Proper initialization sequence.

The create function correctly zero-initializes the context, sets invalid values for handles, and initializes the mutex before calling update. The explicit field initialization is thorough.


211-245: LGTM - Clean debounce pattern implementation.

The update function correctly defers reconnection by only marking settings as pending and recording the timestamp. The actual reconnection is handled by video_tick after the debounce period.


265-339: LGTM - Debounce logic correctly implemented with proper logging safety.

The video_tick function properly implements the debounce pattern by checking elapsed time, comparing pending vs active settings, and applying changes after the delay. The fix from previous review (copying strings for logging while holding mutex) is correctly applied.


495-569: LGTM - Robust reconnection with generation tracking.

The reconnect logic properly uses generation tracking to detect and handle concurrent reconnection attempts. The reconnect_in_progress flag prevents overlapping reconnects, and stale resources are cleaned up correctly when generation changes.


345-492: LGTM - Callbacks properly handle shutdown and generation tracking.

The callback implementations correctly check shutting_down early to prevent use-after-free, use generation tracking to detect stale callbacks, and clean up resources appropriately on errors. The pattern is consistent across all three callbacks.


571-648: LGTM - Start consume logic with proper error handling.

The function correctly validates generation at each step, copies strings under mutex for thread safety, and thoroughly cleans up all resources on any failure path.


692-826: LGTM - Decoder initialization with proper resource management.

The decoder initialization correctly uses av_mallocz for extradata allocation (addressing the previous review comment about zero-padding), properly swaps decoder state under mutex, and the destroy function thoroughly cleans up all resources.


828-1061: Frame decoding implementation is robust with proper error recovery.

The decode function correctly handles:

  • Keyframe-aware initialization with decoder flushing
  • Consecutive error tracking with automatic recovery (flush and wait for keyframe after 5 errors)
  • Dynamic dimension and pixel format changes with proper validation (lines 974-991)
  • Resource cleanup in all paths

The past review comment about dimension validation before sws_scale has been addressed.


1063-1082: LGTM - Source registration is properly configured.

The registration function correctly sets up the OBS source structure with appropriate flags (ASYNC_VIDEO, DO_NOT_DUPLICATE) and provides all necessary callbacks for a video input source.

@davegullo davegullo changed the title hang_source Add Hang Source for receiving MoQ video streams Dec 18, 2025
Copy link
Collaborator

@kixelated kixelated left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a LOT of pthread_mutex that kind of concerns me, but otherwise seems like a good starting point. Just double check on the tick thing because checking if the URL changed every few milliseconds just seems super wrong.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/hang-source.cpp (2)

28-31: Consider adding avc3 codec identifier support.

Line 30 checks for avc (3 characters) which would match avc3, but a previous comment suggests explicitly adding avc3 support. The avc3 variant is a valid ISO-BMFF brand for AVC/H.264 where parameter sets are stored in-band rather than in the sample entry. Adding an explicit check would make the intent clearer and avoid potential issues if the order of checks changes.

🔎 Suggested addition
 	// H.264/AVC
 	if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) ||
 	    (len >= 4 && strncasecmp(codec, "avc1", 4) == 0) ||
+	    (len >= 4 && strncasecmp(codec, "avc3", 4) == 0) ||
 	    (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) {
 		return AV_CODEC_ID_H264;
 	}

184-198: Timing-based callback synchronization is a known limitation.

The code acknowledges that the 100ms sleep is a workaround rather than a robust synchronization mechanism. While the PR testing indicates this works in practice (4 hours stress test with no issues), the comment correctly identifies that reference counting would be more reliable.

The suggested approach in the comment (reference counting with condition variable) would eliminate the theoretical race condition where a slow callback could access freed memory. However, given the testing results and the fact that callbacks are documented as fast (<1ms), this is acceptable for the current implementation.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9756763 and b5fccc7.

📒 Files selected for processing (1)
  • src/hang-source.cpp (1 hunks)
🔇 Additional comments (8)
src/hang-source.cpp (8)

62-172: LGTM: Well-structured initialization with proper thread safety.

The struct definition includes comprehensive state tracking (generation counter, shutdown flag, decoder state, etc.) and the creation function correctly initializes all fields. The use of std::atomic for concurrent access flags and proper mutex initialization demonstrates good thread-safety awareness.


211-339: LGTM: Debounced settings update with proper synchronization.

The implementation correctly separates pending settings from active settings and uses a 500ms debounce delay to avoid excessive reconnections during user input. The race condition flagged in previous reviews (lines 328-337) has been properly addressed by copying the URL and broadcast strings while holding the mutex before logging.


345-492: LGTM: Callbacks are properly guarded with generation tracking.

The MoQ callbacks correctly check for shutdown state and use generation tracking to detect and ignore stale callbacks from previous connections. Resource cleanup is handled properly in all error paths.


495-648: LGTM: Reconnection logic with proper generation tracking and resource cleanup.

The reconnection implementation correctly prevents concurrent reconnects, increments the generation counter to invalidate old callbacks, and handles all error paths with appropriate cleanup. The 50ms delay (line 521) allows the MoQ library to clean up properly between connections.


735-741: LGTM: Extradata padding properly zero-initialized.

The previous issue regarding uninitialized padding bytes has been correctly addressed by using av_mallocz instead of av_malloc, which zero-initializes the entire allocation including the AV_INPUT_BUFFER_PADDING_SIZE bytes. This satisfies FFmpeg's requirements.


890-897: Packet data lifetime is safe due to FFmpeg's internal copying.

The previous review comment noted that the packet is constructed with external frame_data.payload without a reference-counted buffer. This is safe because avcodec_send_packet will internally copy the data when packet->buf is NULL. The frame is closed at line 1060 after decoding completes, ensuring the data remains valid throughout the decode operation.

While using av_packet_from_data would be more explicit about ownership semantics, the current implementation is correct and follows a common FFmpeg pattern.


958-1045: LGTM: Dynamic scaler reinitialization with comprehensive validation.

The code properly handles dimension and pixel format changes by:

  1. Detecting changes (lines 958-961)
  2. Validating dimensions are positive and reasonable (lines 975-982)
  3. Validating pixel format (lines 985-991)
  4. Recreating the scaler context with new parameters (lines 1000-1013)
  5. Reallocating the frame buffer (lines 1015-1026)

This addresses the previous review concern about missing dimension validation before sws_scale.


1063-1082: LGTM: Source registration is complete and correct.

The registration properly configures the OBS source with:

  • Correct source type (INPUT) and flags (ASYNC_VIDEO, DO_NOT_DUPLICATE)
  • All necessary callbacks (create, destroy, update, get_defaults, get_properties, video_tick)
  • Clear, descriptive name "Hang Source (MoQ)"

The video_tick callback is appropriately used for debounced reconnection logic.

…ention

Major refactoring to simplify the hang_source plugin architecture:

Removed debounce mechanism:
- Removed pending_url, pending_broadcast, settings_changed_time, reconnect_pending fields
- Removed DEBOUNCE_DELAY_MS constant and video_tick polling function
- Settings now apply immediately when user clicks OK (not on every keystroke)

Simplified settings flow:
- hang_source_update() now detects if settings actually changed
- Auto-reconnects when settings change to valid values
- Auto-disconnects and blanks video when settings become invalid
- No more pending vs active settings distinction

Reduced mutex contention:
- Added fast-path shutting_down.load() checks before acquiring mutex in callbacks
- on_session_status, on_catalog, on_video_frame, hang_source_decode_frame all
  check the atomic flag first for early exit without lock acquisition

Result: 62 fewer lines of code, cleaner architecture, same functionality.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
src/hang-source.cpp (5)

28-31: Consider explicitly checking for "avc3" instead of relying on the catch-all "avc" check.

The PR objectives list "avc3" as a supported codec, and ISO BMFF defines "avc3" as a valid H.264 variant. While the current catch-all check for "avc" (line 30) matches "avc3", it also matches any string starting with "avc", which could include invalid codec identifiers. For maintainability and clarity, explicitly checking for "avc3" similar to line 29 would make the supported codecs more obvious.

🔎 Suggested refinement
 	// H.264/AVC
 	if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) ||
 	    (len >= 4 && strncasecmp(codec, "avc1", 4) == 0) ||
+	    (len >= 4 && strncasecmp(codec, "avc3", 4) == 0) ||
 	    (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) {
 		return AV_CODEC_ID_H264;
 	}

Alternatively, if you want to be more restrictive and only accept well-known codec strings, you could remove the catch-all "avc" check entirely:

 	// H.264/AVC
 	if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) ||
 	    (len >= 4 && strncasecmp(codec, "avc1", 4) == 0) ||
-	    (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) {
+	    (len >= 4 && strncasecmp(codec, "avc3", 4) == 0)) {
 		return AV_CODEC_ID_H264;
 	}

170-184: The 100ms sleep provides practical protection but isn't a synchronization guarantee.

Your detailed comment accurately describes the limitation. While this timing-based approach should work for fast callbacks, a reference-counting mechanism would be more robust. If you'd like to eliminate the theoretical race condition, I can help implement an atomic reference counter with proper waiting logic.

Would you like me to generate a reference-counting implementation, or is the current pragmatic approach sufficient for your use case?


454-455: The 50ms sleep assumes the MoQ library completes cleanup within that timeframe.

Similar to the 100ms sleep in the destructor, this timing-based delay may not provide guaranteed synchronization. If the MoQ library provides a synchronous close operation or a completion callback, using that would be more robust than an arbitrary sleep duration.

Could you verify whether the MoQ library offers any synchronization primitives for ensuring cleanup completion, or if the 50ms delay is sufficient based on testing?


822-837: Packet data relies on FFmpeg's implicit copy behavior.

The packet's data field points to frame_data.payload without establishing a reference-counted buffer (packet->buf remains NULL). While avcodec_send_packet will internally copy non-reference-counted packet data as a fallback, explicitly managing the packet's ownership via av_packet_from_data() or av_buffer_create() would be more robust and make the data lifetime guarantees clear.

That said, the current implementation is functionally safe because FFmpeg copies the data before returning from avcodec_send_packet, and moq_consume_frame_close at line 1000 (called after decoding completes) properly frees the frame data.

🔎 More explicit packet ownership

To make packet ownership explicit:

 	// Create AVPacket from frame data
 	AVPacket *packet = av_packet_alloc();
 	if (!packet) {
 		pthread_mutex_unlock(&ctx->mutex);
 		moq_consume_frame_close(frame_id);
 		return;
 	}
 
-	packet->data = (uint8_t *)frame_data.payload;
-	packet->size = frame_data.payload_size;
+	// Create a reference-counted packet that doesn't own the data
+	// (FFmpeg will copy it internally in avcodec_send_packet)
+	if (av_packet_from_data(packet, (uint8_t *)frame_data.payload, frame_data.payload_size) < 0) {
+		av_packet_free(&packet);
+		pthread_mutex_unlock(&ctx->mutex);
+		moq_consume_frame_close(frame_id);
+		return;
+	}
 	packet->pts = frame_data.timestamp_us / 1000; // Convert to milliseconds
 	packet->dts = packet->pts;

Note: av_packet_from_data will allocate a new buffer and copy the data, which adds overhead. Since avcodec_send_packet copies the data anyway when packet->buf is NULL, the current approach is actually more efficient. The main benefit of the suggested change is code clarity about ownership semantics.


987-992: Consider using frame->height directly in sws_scale for clarity.

At line 992, ctx->frame.height is used as the source height parameter for sws_scale. While this is functionally correct (since ctx->frame.height was just set to frame->height at line 978), using frame->height directly would be clearer and more resilient to future changes. This makes the data flow more explicit: scaling from frame (FFmpeg's decoded frame) to dst_data (OBS output buffer).

 	// Convert YUV420P to RGBA
 	uint8_t *dst_data[4] = {ctx->frame_buffer, NULL, NULL, NULL};
 	int dst_linesize[4] = {static_cast<int>(ctx->frame.width * 4), 0, 0, 0};
 
 	sws_scale(ctx->sws_ctx, (const uint8_t *const *)frame->data, frame->linesize,
-	          0, ctx->frame.height, dst_data, dst_linesize);
+	          0, frame->height, dst_data, dst_linesize);
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b5fccc7 and 84fa116.

📒 Files selected for processing (1)
  • src/hang-source.cpp (1 hunks)
🔇 Additional comments (2)
src/hang-source.cpp (2)

626-736: LGTM! Decoder initialization is well-structured.

The implementation correctly handles:

  • Zero-initialized extradata padding (av_mallocz at line 670)
  • Dynamic dimension resolution from either config or decoder context
  • Thread-safe state swapping under mutex
  • Proper cleanup of old decoder state before installing new
  • Deferred allocation of scaler and frame buffer until first decoded frame with known pixel format

The defensive approach of validating and logging codec strings (lines 632-636) is good practice.


839-858: Excellent graceful error recovery implementation.

The consecutive error tracking with decoder flush and keyframe wait provides robust recovery from transient decode failures. Key strengths:

  • Avoids log spam by only logging the first error in a sequence (lines 852-854, 883-886)
  • Correctly ignores AVERROR(EAGAIN) which is expected during normal operation
  • 5-error threshold before recovery action is reasonable
  • Resetting to wait for keyframe after flush ensures clean decoder state

This matches the PR's stated objective of "graceful error recovery."

Also applies to: 871-892

@davegullo
Copy link
Author

@kixelated 👆🏼 this latest rev should hopefully address the janky attempt to handle "onKeyUp" and removes all the downstream mutex hacks that were in place to solve for that.

Part of the challenge that lead to this was caused by developing against some flaky RTSP sources at the same time, and with a clean relay and rtsp gstreamer ingest, this now works just as intended, with no need to add a [ Connect ] button. It just works on change, and if no stream exists, it's gracefully handling that.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 84fa116 and f9d916f.

📒 Files selected for processing (1)
  • src/moq-output.cpp

// Start establishing a session with the MoQ server
// NOTE: You could publish the same broadcasts to multiple sessions if you want (redundant ingest).
session = moq_session_connect(server_url.data(), server_url.size(), origin, NULL, session_connect_callback, this);
session = moq_session_connect(server_url.data(), server_url.size(), origin, 0, session_connect_callback, this);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

fd -e h -e hpp | xargs rg -l "moq_session_connect" | head -5

Repository: moq-dev/obs

Length of output: 37


🏁 Script executed:

rg -B 2 -A 8 "moq_session_connect\s*\(" --type=c --type=h --type=cpp

Repository: moq-dev/obs

Length of output: 1240


🏁 Script executed:

fd -e h -e hpp | xargs grep -l "moq_session_connect"

Repository: moq-dev/obs

Length of output: 39


🏁 Script executed:

rg "moq_session_connect" --type=h --type=hpp -B 5 -A 5

Repository: moq-dev/obs

Length of output: 82


🏁 Script executed:

fd "\.h$" | head -20

Repository: moq-dev/obs

Length of output: 120


🏁 Script executed:

rg "moq_session_connect" -B 5 -A 10

Repository: moq-dev/obs

Length of output: 1723


🏁 Script executed:

rg "include.*moq" src/moq-output.cpp src/hang-source.cpp

Repository: moq-dev/obs

Length of output: 170


🏁 Script executed:

rg "#include" src/moq-output.cpp | head -20

Repository: moq-dev/obs

Length of output: 144


🏁 Script executed:

find . -name "moq.h" -type f

Repository: moq-dev/obs

Length of output: 37


🏁 Script executed:

rg "int32_t.*moq_session_connect|moq_session_connect.*int32_t" -A 8 -B 2

Repository: moq-dev/obs

Length of output: 554


🏁 Script executed:

rg "moq_session_connect" -B 10 -A 10 src/hang-source.cpp

Repository: moq-dev/obs

Length of output: 652


🏁 Script executed:

git log --oneline -20 | grep -i moq

Repository: moq-dev/obs

Length of output: 109


Use nullptr instead of 0 for the null pointer.

The fourth parameter to moq_session_connect is a pointer type (origin_consume). While 0 works as a null pointer in C++, modern C++ style prefers nullptr for better type safety and clarity. Change 0 to nullptr.

🤖 Prompt for AI Agents
In src/moq-output.cpp around line 87, the call to moq_session_connect passes a
raw literal 0 for the fourth parameter which is a pointer type; replace the 0
with nullptr to convey a null pointer in modern C++ and ensure type safety and
clarity when calling moq_session_connect.

@davegullo
Copy link
Author

NOTE:
I have WIP for supporting audio.

Copy link
Collaborator

@kixelated kixelated left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO rename it to moq-source for the SEO.

Hang Source ➡️  MoQ Source
hang_source ➡️  moq_source

(filenames and variables too)
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/moq-source.cpp (1)

830-833: Packet data relies on FFmpeg's internal copy behavior.

As noted in previous review, assigning external frame_data.payload directly to packet->data without a reference-counted buffer relies on FFmpeg's fallback copying. This works but isn't the documented pattern. Minor concern for long-term maintainability.

🧹 Nitpick comments (1)
src/moq-source.cpp (1)

832-833: Misleading comment on pts conversion.

The comment says "Convert to milliseconds" but FFmpeg's pts/dts are in time_base units, not milliseconds. While this likely works for the current streaming use case (no B-frames), the comment is inaccurate. Consider clarifying or removing it.

🔎 Suggested fix
-	packet->pts = frame_data.timestamp_us / 1000; // Convert to milliseconds
-	packet->dts = packet->pts;
+	packet->pts = frame_data.timestamp_us / 1000;
+	packet->dts = packet->pts;
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f9d916f and fdd5257.

📒 Files selected for processing (4)
  • CMakeLists.txt
  • src/moq-source.cpp
  • src/moq-source.h
  • src/obs-moq.cpp
🚧 Files skipped from review as they are similar to previous changes (2)
  • CMakeLists.txt
  • src/obs-moq.cpp
🧰 Additional context used
🧬 Code graph analysis (1)
src/moq-source.h (1)
src/moq-source.cpp (2)
  • register_moq_source (1004-1020)
  • register_moq_source (1004-1004)
🔇 Additional comments (11)
src/moq-source.h (1)

1-3: LGTM!

Clean and minimal header with appropriate include guard and function declaration.

src/moq-source.cpp (10)

1-19: LGTM!

Includes are properly organized with FFmpeg C headers correctly wrapped in extern "C".


21-60: LGTM!

Codec string mapping correctly handles all documented codec variants. The check order ensures specific matches (e.g., avc1) are evaluated before generic prefixes (e.g., avc).


62-96: LGTM!

Well-designed struct with appropriate use of atomics for the shutdown flag and generation counter, combined with mutex for complex state protection. The generation tracking pattern effectively handles stale callback invalidation.


160-193: Acknowledged timing-based workaround for callback draining.

The 100ms sleep with thorough documentation is acceptable for now. The comment at lines 173-183 clearly describes the limitation and the reference counting solution. Consider tracking this as future technical debt.


195-237: LGTM!

Settings change detection is thorough, handling all null/non-null combinations correctly. The reconnect logic appropriately triggers only when settings are both changed and valid.


261-426: LGTM!

Callbacks implement consistent patterns: double-checked atomic shutdown flag, generation tracking for stale callback detection, and proper resource cleanup on all exit paths.


429-582: LGTM!

Robust reconnection logic with proper generation tracking, concurrent reconnect prevention via reconnect_in_progress flag, and correct resource cleanup when generation changes during setup. The pattern of capturing state under mutex, performing blocking operations outside, then re-validating is well implemented.


669-675: Previous concern addressed.

Using av_mallocz correctly zero-initializes the AV_INPUT_BUFFER_PADDING_SIZE padding bytes as required by FFmpeg.


897-985: Previous concern addressed with dynamic scaler reinitialization.

The logic now properly detects dimension and pixel format changes, validates bounds, and reinitializes the scaler and frame buffer as needed. This prevents out-of-bounds access from mid-stream resolution changes.


1003-1020: LGTM!

Standard OBS source registration with appropriate flags for async video. The lambda for get_name is a clean pattern for simple string returns.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/moq-source.cpp (2)

169-183: Consider implementing reference counting for callback synchronization.

The 100ms sleep is a pragmatic workaround, but as the comment notes, it doesn't provide guarantees. While callbacks are typically fast (<1ms), a more robust solution would use reference counting or a completion mechanism to ensure all callbacks have finished before freeing the context.

Potential implementation approach

One approach would be to:

  • Add std::atomic<int> callback_refcount to the struct
  • Increment at the start of each callback, decrement on exit
  • In moq_source_destroy, spin or use a condition variable to wait until callback_refcount == 0
  • Then proceed with freeing resources

This would eliminate the timing assumption.


821-836: Consider using reference-counted packets for clearer ownership semantics.

The current code at line 829 assigns external frame_data.payload to packet->data without a reference-counted buffer. While this works because FFmpeg's avcodec_send_packet internally copies non-reference-counted data, it relies on implicit behavior. Using av_packet_from_data() or av_buffer_create() would make ownership explicit and align with FFmpeg best practices.

This pattern is safe given the current usage, but explicit reference counting would improve code clarity and resilience to future FFmpeg API changes.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fdd5257 and f4f128a.

📒 Files selected for processing (1)
  • src/moq-source.cpp
🔇 Additional comments (14)
src/moq-source.cpp (14)

21-59: LGTM! Codec mapping handles all specified formats correctly.

The codec string mapping logic is comprehensive and correctly handles all variants mentioned in the PR description (H.264/AVC including avc1/avc3, HEVC/H.265, VP9, AV1, VP8). The case-insensitive prefix matching with strncasecmp is appropriate for this use case.


61-95: Well-structured state management with appropriate thread-safety primitives.

The struct design uses atomic variables for lock-free checks (shutting_down, generation) combined with mutex protection for complex state transitions. The generation-based callback invalidation is a robust pattern for handling async callbacks during reconnection.


116-157: LGTM! Proper initialization with defensive defaults.

All state is correctly initialized with safe defaults (handles to -1, NULL pointers, zero counters). The call to moq_source_update at the end ensures settings are applied and connection is established if valid.


194-236: LGTM! Settings update logic handles all edge cases correctly.

The change detection logic comprehensively handles NULL, empty, and changed strings. The pattern of holding the mutex only while accessing shared state and performing the reconnect outside the lock is correct and avoids potential deadlocks.


260-306: LGTM! Robust session status handling with proper cleanup.

The double-check pattern for shutting_down (check before lock, re-check after) efficiently avoids unnecessary lock contention while preventing races. Error handling correctly cleans up both session and origin on connection failure.


308-389: LGTM! Catalog callback properly handles generation invalidation.

The callback correctly checks generation to detect stale operations and cleans up resources appropriately. The sequence of getting video config → initializing decoder → subscribing to track is correct, with proper error handling at each step.


391-425: LGTM! Video frame callback correctly handles callback timing race.

The comment at lines 407-408 clearly explains why checking consume instead of video_track is necessary—frames can arrive before the track handle is stored in on_catalog. This is a subtle but important correctness detail.


428-502: LGTM! Reconnect logic properly handles concurrent reconnection attempts.

The reconnect_in_progress flag provides debouncing, and the generation check at lines 487-495 handles the race where settings change multiple times rapidly. The pattern of copying URL under lock, then releasing it for blocking operations is correct.


504-581: LGTM! Consume setup handles generation changes at multiple checkpoints.

The function validates generation at multiple points (lines 509, 526, 543, 560) to ensure operations are still relevant. This prevents resource leaks when reconnections happen during async setup. All error paths properly clean up session/origin/consume handles.


584-615: LGTM! Disconnect cleanup follows proper ordering.

Resources are released in reverse order of acquisition (video_track → catalog → consume → session → origin), which is the correct pattern. Decoder state and error counters are properly reset.


625-735: LGTM! Decoder initialization correctly uses av_mallocz for extradata.

The code properly zero-initializes the padding bytes by using av_mallocz at line 669, addressing the previous review comment. The pattern of creating the new decoder outside the mutex and then atomically swapping it in is correct. Deferring sws_ctx and frame_buffer allocation until the first decoded frame is appropriate since the actual pixel format may differ from expectations.


860-891: LGTM! Receive frame error handling includes graceful recovery.

The error handling correctly tracks consecutive decode failures and triggers decoder flush + keyframe wait after 5 errors. Logging is appropriately throttled to avoid spam (only logs the first error in a sequence).


896-991: LGTM! Scaler reinitialization includes comprehensive validation.

The code correctly detects dimension and pixel format changes (lines 898-900), validates dimensions and format (lines 913-930), and reinitializes the scaler with the actual decoded frame properties. This addresses the previous review concern about dimension validation before sws_scale. The dynamic reinitialization handles mid-stream resolution changes gracefully.


1002-1019: LGTM! Source registration properly configured for async video.

The OBS_SOURCE_ASYNC_VIDEO flag is correct for a network-based video source, and OBS_SOURCE_DO_NOT_DUPLICATE appropriately prevents unintended multiple instances. All required callbacks are properly wired.

@davegullo
Copy link
Author

@kixelated sez:

IMO rename it to moq-source for the SEO.

✅ DONE

@davegullo
Copy link
Author

DG sez:

NOTE: I have WIP for supporting audio.

Testing on Linux failed, with tons of audio skipping. Tweaked encoder settings, which reduced frequency of skipping, but did not eliminate it.

In an attempt to validate things, tried testing audio tracks from an OBS instance's moq_output, but they were also skipping on the web player, so that may be the root cause of audio timestamping issues, not the moq_source per se.

Going to test by sending broadcast from MacOS OBS to see if the audio timestamp/synchronization issues persist.

In other words, it's difficult to add audio capabilities to this moq_source, when moq_output seems to always send non-integral audio streams using the default audio encoder settings, and a myriad of other variants.

@davegullo davegullo changed the title Add Hang Source for receiving MoQ video streams Add MoQ Source for receiving MoQ broadcasts Dec 24, 2025
@davegullo davegullo changed the title Add MoQ Source for receiving MoQ broadcasts Add MoQ Source for receiving broadcasts Dec 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants