Releases: Youngestdev/broadcaster
Releases · Youngestdev/broadcaster
v0.1.6
Release summary
Improved WebSocket client reliability with ping/pong monitoring and automatic disconnects; added extensibility and resiliency improvements to the core broadcaster (custom hooks, per-collection retry/backoff, field filtering, and recipient extraction).
Highlights
- WebSocket ping/pong: Added a ping/pong monitor (60s interval) to detect dead clients and update last-pong timestamps.
- Auto-disconnect on timeout:
WebSocketChannel(disconnect_on_timeout=True, timeout=<seconds>)will now drop stale connections. - Authentication hook:
WebSocketChannel.connect(...)accepts anauthenticatecoroutine that returns(bool, client_id)to verify and set client IDs. - Single-connection-per-client: New connections for the same
client_idreplace previous ones (old socket closed, ping task cancelled). - Safer sends: Uses safe JSON serialization and robust error handling during send/broadcast.
custom_fnhook:MongoChangeBroadcasteraccepts acustom_fnto transformChangeEventobjects before they are sent to channels.- Retry/backoff on watchers: Per-collection watchers use exponential backoff (via
tenacity) to improve resiliency to transient errors. - Mongo URI validation: Connection URI is validated before creating the Mongo client.
- Field-level filtering: Use
fields_to_watchinCollectionConfigto ignore unrelated update events. - Recipient extraction: Support for dot-notation recipient paths (e.g.,
owner.idorfullDocument._id) for targeted delivery. - Cleaner lifecycle: Improved start/stop behavior and logging for watchers and channels.
Notable files changed
mongo_broadcaster/channels/websocket.py— ping/pong monitor, timeout behavior, authentication, replacement of existing connections.mongo_broadcaster/broadcaster.py—custom_fnhook, retry wrapper for watchers, field filtering, recipient extraction, Mongo URI validation.
Upgrade / migration notes
- If you relied on previous behavior of multiple simultaneous connections with the same
client_id, update clients to avoid unexpected disconnects (or set unique IDs). - If you run WebSocket endpoints behind a proxy/load-balancer, ensure proxy timeouts are longer than the ping interval or that the ping/pong path is preserved.
- When using
authenticate, ensure the coroutine returns(True, client_id)on success; otherwise connection will be closed. - If you depend on raw
ChangeEventcontent, note thatcustom_fncan modify events — review hooks in your deployment.
Example usage
from mongo_broadcaster.broadcaster import MongoChangeBroadcaster
from mongo_broadcaster.channels.websocket import WebSocketChannel
ws = WebSocketChannel(disconnect_on_timeout=True, timeout=120)
b = MongoChangeBroadcaster(config, custom_fn)
b.add_channel(ws)v0.1.3
V0.1.3
Enhancements and Fixes:
-
Improved WebSocket Handling:
- Introduced an optional timeout for WebSocket disconnections.
- Added a mechanism to ping WebSocket connections every 60 seconds.
- Commit Link
-
Field Watch Enhancements:
- Updated logic to watch for explicitly defined fields instead of sending all changes.
- Commit Link
-
Operation-Aware Field Extraction:
- Enhanced the
extract_fieldsfunction to be operation-aware, returning fields as required by the operation. - Commit Link
- Enhanced the
-
JSON Serialization Refinements:
- Adjusted JSON handling to avoid errors with unserializable objects (e.g., dates).
- Improved checks to ensure collections are not
Nonebefore processing. - Commit Link
-
Version Update:
- Updated the version to reflect recent changes.
- Commit Link
Release v0.1.1 - MongoDB Change Broadcaster
🎉 Initial Release Highlights
A robust Python package for transforming MongoDB change streams into real-time events across multiple channels.
✨ Key Features
- Multi-Channel Broadcasting: Send changes to WebSocket, Redis, HTTP, and Database
- Extensible Architecture: Easily add custom channels via
BaseChannelsubclassing - Configurable Pipelines: Filter and project change events with MongoDB aggregation syntax
- Async-First: Built on
motorandasynciofor high-performance streaming
🛠 Setup
$ pip install mongo-broadcaster🚀 Quick Start
from mongo_broadcaster import MongoChangeBroadcaster, BroadcasterConfig
config = BroadcasterConfig(
mongo_uri="mongodb://localhost:27017",
collections=[{"collection_name": "users"}]
)
broadcaster = MongoChangeBroadcaster(config)
broadcaster.add_channel(WebSocketChannel())
await broadcaster.start()📦 Included Channels
| Channel | Use Case | Dependencies |
|---|---|---|
WebSocketChannel |
Browser real-time updates | fastapi, websockets |
RedisPubSubChannel |
Microservice messaging | redis |
HTTPCallbackChannel |
Webhook integrations | aiohttp |
DatabaseChannel |
Internal logging |
⚙️ Breaking Changes
- Requires MongoDB replica set configuration for change streams
- Python 3.8+ only