Skip to content

Conversation

@GaijinKa
Copy link
Member

@GaijinKa GaijinKa commented Jan 15, 2026

Description

This PR introduces the Remotizer module, a new gRPC-based distributed execution system that enables Juturna nodes to be deployed and executed remotely. The system allows nodes to run as standalone services while maintaining full compatibility with the existing pipeline architecture through a proxy pattern.
The implementation consists of:

  • A gRPC server (_remote_service.py) that wraps any Juturna node and exposes it via RPC
  • A client proxy node (Warp) that transparently forwards messages to remote nodes
  • Protocol Buffer definitions for serializing all payload types (Audio, Image, Video, Bytes, Object, Batch)
  • Utility functions for bidirectional conversion between Python objects and Protobuf messages

This enables horizontal scaling, language-agnostic node implementation, and deployment flexibility without modifying existing pipeline code.

PR type

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring
  • Performance improvement
  • Test update
  • Build/CI configuration change
  • Other (please describe):

Key modifications and changes

Core Infrastructure:

  • Added juturna/remotizer/ module with gRPC server and client implementations
  • Created Protocol Buffer schemas (payloads.proto, messaging_service.proto) defining wire format for all message types
  • Implemented _remote_service.py: async gRPC server with request correlation, timeout handling, and statistics tracking
  • Implemented Warp node: client-side proxy that forwards messages to remote services via gRPC

Serialization Layer:

  • Added utils.py with bidirectional converters for all payload types:
    • AudioPayload ↔ AudioProtoPayload (with numpy array serialization)
    • ImagePayload ↔ ImageProtoPayload
    • VideoPayload ↔ VideoProtoPayload
    • BytesPayload ↔ BytesProtoPayload
    • ObjectPayload ↔ ObjectProtoPayload
    • Batch ↔ BatchProto
  • Envelope system for request/response correlation and metadata transport

Payload Enhancements:

  • Added BasePayload.__deepcopy__() for proper deep copying of dataclass payloads
  • Added codec field to VideoPayload
  • Exported BasePayload in payloads/__init__.py

Testing & Build:

  • Added test_remote_integration.py with end-to-end tests for remote node execution
  • Added compile_protos.sh script for generating Python code from Protocol Buffers
  • Added remote_pipes/ to .gitignore for runtime artifacts
  • Added .tool-versions specifying Python 3.12.12

Bug Fixes:

  • Fixed PassthroughIdentity to properly copy message metadata in responses
  • Overwriting deepcopy in BasePayload to support ObjectPayloads

Affected components

New components:

  • juturna/remotizer/ (complete new module)
  • juturna/remotizer/_warp/ (Warp proxy node)
  • Protocol Buffer definitions and generated code

Build system:

  • Added gRPC dependencies to remotizer/requirements.txt
  • Added protobuf compilation tooling

Additional context

Architecture Overview: The Remotizer uses a request-response pattern over gRPC:

  1. Client (Warp node) converts Python Message → Protobuf → sends via gRPC
  2. Server (_remote_service) receives → deserializes → forwards to wrapped node
  3. Node processes message → returns response
  4. Server correlates response using correlation_id → serializes → returns
  5. Client deserializes → emits as normal Message to pipeline

Key Features:

  • Correlation system: Each request gets a unique correlation_id tracked in a dispatcher thread
  • Timeout handling: Configurable TTL with automatic cleanup of expired requests
  • Type safety: Request/response type hints for validation
  • Statistics: Built-in metrics for requests, failures, and timeouts
  • Graceful shutdown: Proper cleanup of pending requests and threads
  • Multi-client support: A single remote node instance can be shared across multiple pipelines simultaneously

Usage Example:
In pipeline config, replace a node with its remote proxy:

{ 
  "name": "my_processor",
  "type": "proc",
  "mark": "warp",  # Instead of the actual processor mark
  "configuration": {
      "grpc_host": "remote-server.example.com",
      "grpc_port": 50051,
      "timeout": 30,
      "remote_config": {       # Config for the actual remote node
        "param1": "value1"
      }
}

Before Merging

Required actions before accepting this PR:

  • Dependency management: Integrate gRPC dependencies into the main pyproject.toml, potentially under an optional extras tag (e.g., pip install juturna[remote]) to avoid forcing gRPC installation for users who don't need remote execution
  • Code reorganization:
    • Move the Warp node from juturna/remotizer/_warp/ to plugins/nodes/proc/warp/ following the standard plugin structure.
    • Move the remotization core (_remote_service.py, utils.py, Protocol Buffers) in the components module as part of the framework
    • Move the utils into the utils folder OR make serialization/deserialization helpers part of payloads.

Future Work:

  • Seamless local-to-remote transition: Enable transparent conversion from local to remote nodes by adding a single "warp": "<ip>:<port>" parameter to the node configuration. The pipeline builder will automatically detect this parameter during warmup and transform the node into a Warp proxy with the original configuration forwarded to the remote service
  • Runtime configuration updates: Introduce a new CONFIG control payload to allow dynamic reconfiguration of remote nodes during execution, ensuring ordered configuration updates across distributed components
  • Multi-node remote contexts
  • Compression support for large payloads (schema already defined in CompressedProtoPayload)
  • Load balancing across multiple remote instances

@GaijinKa GaijinKa requested a review from b3by January 15, 2026 13:59
@GaijinKa GaijinKa added type:enhancement New feature or request help wanted Extra attention is needed priority:medium labels Jan 15, 2026
- initialize client part (subject to change)
- initialize server part (subject to change)
- experiment with grpc (subject to change)
- define initial proto definition
- warp config.toml has assignation signal typo,
- the `SendMessage` implementation in _remote_service.py has been
  deleted
- refactor MessagingServiceImpl to use a dispatching thread for concurrent requests
- add `--node-mark` argument to _remote_service.py to specify node implementation
- fix typo in Warp node gRPC connection
- fix deserialization logic in utils.py
- correctly extract `data` from ObjectProtoPayload using `from_dict`
- update PassthroughIdentity to propagate metadata correctly
- add integration test

Note: this is an initial implementation; code is experimental and may require heavy refactoring.
- code commented for further improvement
- changing the expected response type in warp
- in remote service, messages with no correlation ids are discarded
- add a ResponseContext to better futures handling
- align BytesPayload
- refactor proto helpers
- add codec property to VideoPayload
- export BasePayload correctly
- move warp node into the nodes folder
- create the proc module in nodes folder
- limit the deepcopy overwrite to the ObjectPayload only
- move remote_context in its own module
- improve elif and fix other minor code issue in remote service
- add a new dependency group in pyconfig.toml
@b3by b3by merged commit f865ee4 into meetecho:main Jan 21, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

help wanted Extra attention is needed priority:medium type:enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants