From 620b61e26b7d1fadff3beef297e69357e4b1f7b0 Mon Sep 17 00:00:00 2001 From: Omer Akram Date: Tue, 28 Oct 2025 17:05:27 +0500 Subject: [PATCH 1/7] add initial support for payload codec --- tests/codec_test.py | 39 +++++++++++++++++++++++++++++++++++++++ xconn/codec.py | 16 ++++++++++++++++ xconn/session.py | 20 +++++++++++++++++++- 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 tests/codec_test.py create mode 100644 xconn/codec.py diff --git a/tests/codec_test.py b/tests/codec_test.py new file mode 100644 index 0000000..0468d92 --- /dev/null +++ b/tests/codec_test.py @@ -0,0 +1,39 @@ +import base64 +from typing import Type + +from xconn.client import connect_anonymous +from xconn import codec + + +class String(str): + pass + + +class Base64Codec(codec.Codec[String]): + def name(self) -> str: + return "base64" + + def encode(self, obj: String) -> str: + return base64.b64encode(obj.encode("utf-8")).decode("utf-8") + + def decode(self, data: str, out_type: Type[String]) -> String: + return out_type(base64.b64decode(data.encode("utf-8")).decode()) + + +def test_base64_codec(): + encoder = Base64Codec() + encoded = encoder.encode(String("hello")) + assert isinstance(encoded, str) + + decoded = encoder.decode(encoded, String) + assert isinstance(decoded, String) + assert decoded == "hello" + + +def test_something(): + # session = connect_anonymous("ws://localhost:8080/ws", "realm1") + # session.set_payload_codec(Base64Codec()) + # result = session.call_object("io.xconn.object", String("hello"), String) + # print(result) + # session.leave() + pass diff --git a/xconn/codec.py b/xconn/codec.py new file mode 100644 index 0000000..689b66d --- /dev/null +++ b/xconn/codec.py @@ -0,0 +1,16 @@ +from typing import Any, Generic, Type, TypeVar + +T = TypeVar("T") + + +class Codec(Generic[T]): + def name(self) -> str: + raise NotImplementedError + + def encode(self, obj: Any) -> bytes | str: + """Serialize a Python object to bytes.""" + raise NotImplementedError + + def decode(self, data: bytes | str, out_type: Type[T]) -> T: + """Deserialize bytes into an instance of out_type.""" + raise NotImplementedError diff --git a/xconn/session.py b/xconn/session.py index b13aed7..a1212de 100644 --- a/xconn/session.py +++ b/xconn/session.py @@ -2,15 +2,19 @@ from concurrent.futures import Future from threading import Thread -from typing import Callable, Any +from typing import Callable, Any, TypeVar, Type from dataclasses import dataclass from wampproto import messages, session, uris from xconn import types, exception, uris as xconn_uris +from xconn.codec import Codec from xconn.exception import ApplicationError from xconn.helpers import exception_from_error, SessionScopeIDGenerator +TReq = TypeVar("TReq") +TRes = TypeVar("TRes") + @dataclass class RegisterRequest: @@ -68,6 +72,8 @@ def __init__(self, base_session: types.BaseSession): self._disconnect_callback: list[Callable[[], None] | None] = [] + self._payload_codec: Codec = None + thread = Thread(target=self._wait, daemon=False) thread.start() @@ -170,6 +176,18 @@ def _process_incoming_message(self, msg: messages.Message): else: raise ValueError("received unknown message") + def set_payload_codec(self, codec: Codec) -> None: + self._payload_codec = codec + + def call_object(self, procedure: str, request: TReq, return_type: Type[TRes] = None) -> TReq | None: + if self._payload_codec is None: + raise ValueError("no payload codec set") + + encoded = self._payload_codec.encode(request) + result = self.call(procedure, [encoded]) + + return self._payload_codec.decode(result.args[0], return_type) + def call( self, procedure: str, From 2099da9da76d2338ac06634e868717ca174c9d2f Mon Sep 17 00:00:00 2001 From: Mahad Munir Date: Tue, 28 Oct 2025 18:38:55 +0500 Subject: [PATCH 2/7] add test for protobuf --- pyproject.toml | 3 ++- tests/codec_test.py | 49 +++++++++++++++++++++++++++++++++++++++++++ tests/profile.proto | 15 +++++++++++++ tests/profile_pb2.py | 30 ++++++++++++++++++++++++++ tests/profile_pb2.pyi | 38 +++++++++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 tests/profile.proto create mode 100644 tests/profile_pb2.py create mode 100644 tests/profile_pb2.pyi diff --git a/pyproject.toml b/pyproject.toml index ca58e34..4415a8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ test = [ "pytest-timeout", ] publish = ["twine", "build"] -dev = ["mkdocs-material[imaging]", "wampproto@git+https://github.com/xconnio/wampproto-python"] +dev = ["mkdocs-material[imaging]", "wampproto@git+https://github.com/xconnio/wampproto-python", "protobuf"] capnproto = [ "wampproto-messages-capnproto@git+https://github.com/xconnio/wampproto-messages-capnproto@main#subdirectory=python" ] @@ -81,6 +81,7 @@ exclude = [ "node_modules", "site-packages", "venv", + "tests/profile_pb2.py" ] line-length = 120 diff --git a/tests/codec_test.py b/tests/codec_test.py index 0468d92..f0120c3 100644 --- a/tests/codec_test.py +++ b/tests/codec_test.py @@ -1,8 +1,12 @@ import base64 from typing import Type +from google.protobuf.message import Message + from xconn.client import connect_anonymous from xconn import codec +from xconn.types import Invocation, Result +from tests.profile_pb2 import ProfileCreate, ProfileGet class String(str): @@ -37,3 +41,48 @@ def test_something(): # print(result) # session.leave() pass + + +class ProtobufCodec(codec.Codec[Message]): + def name(self) -> str: + return "protobuf" + + def encode(self, obj: Message) -> bytes: + return obj.SerializeToString() + + def decode(self, data: bytes, out_type: Type[Message]) -> Message: + msg = out_type() + msg.ParseFromString(data) + return msg + + +def test_protobuf_codec(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(ProtobufCodec()) + + def inv_handler(inv: Invocation) -> Result: + profile = ProfileCreate() + profile.ParseFromString(inv.args[0]) + + profile_get = ProfileGet( + id="123", + username=profile.username, + email=profile.email, + age=profile.age, + created_at="2025-10-28T17:00:00Z", + ) + + return Result(args=[profile_get.SerializeToString()]) + + session.register("io.xconn.profile.create", inv_handler) + create_msg = ProfileCreate(username="john", email="john@xconn.io", age=25) + + result = session.call_object("io.xconn.profile.create", create_msg, ProfileGet) + assert isinstance(result, ProfileGet) + assert result.username == "john" + assert result.email == "john@xconn.io" + assert result.age == 25 + assert result.id == "123" + assert result.created_at == "2025-10-28T17:00:00Z" + + session.leave() diff --git a/tests/profile.proto b/tests/profile.proto new file mode 100644 index 0000000..49d1b18 --- /dev/null +++ b/tests/profile.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message ProfileCreate { + string username = 1; + string email = 2; + int32 age = 3; +} + +message ProfileGet { + string id = 1; + string username = 2; + string email = 3; + int32 age = 4; + string created_at = 5; +} diff --git a/tests/profile_pb2.py b/tests/profile_pb2.py new file mode 100644 index 0000000..14889cf --- /dev/null +++ b/tests/profile_pb2.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: tests/profile.proto +"""Generated protocol buffer code.""" + +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x13tests/profile.proto"=\n\rProfileCreate\x12\x10\n\x08username\x18\x01 \x01(\t\x12\r\n\x05\x65' + b'mail\x18\x02 \x01(\t\x12\x0b\n\x03\x61ge\x18\x03 \x01(\x05"Z\n\nProfileGet\x12\n\n\x02id\x18\x01 ' + b'\x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\r\n\x05\x65mail\x18\x03 \x01(\t\x12\x0b\n\x03\x61' + b'ge\x18\x04 \x01(\x05\x12\x12\n\ncreated_at\x18\x05 \x01(\tb\x06proto3' +) + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "tests.profile_pb2", globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _PROFILECREATE._serialized_start = 23 + _PROFILECREATE._serialized_end = 84 + _PROFILEGET._serialized_start = 86 + _PROFILEGET._serialized_end = 176 +# @@protoc_insertion_point(module_scope) diff --git a/tests/profile_pb2.pyi b/tests/profile_pb2.pyi new file mode 100644 index 0000000..a1f8304 --- /dev/null +++ b/tests/profile_pb2.pyi @@ -0,0 +1,38 @@ +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional + +DESCRIPTOR: _descriptor.FileDescriptor + +class ProfileCreate(_message.Message): + __slots__ = ["age", "email", "username"] + AGE_FIELD_NUMBER: _ClassVar[int] + EMAIL_FIELD_NUMBER: _ClassVar[int] + USERNAME_FIELD_NUMBER: _ClassVar[int] + age: int + email: str + username: str + def __init__( + self, username: _Optional[str] = ..., email: _Optional[str] = ..., age: _Optional[int] = ... + ) -> None: ... + +class ProfileGet(_message.Message): + __slots__ = ["age", "created_at", "email", "id", "username"] + AGE_FIELD_NUMBER: _ClassVar[int] + CREATED_AT_FIELD_NUMBER: _ClassVar[int] + EMAIL_FIELD_NUMBER: _ClassVar[int] + ID_FIELD_NUMBER: _ClassVar[int] + USERNAME_FIELD_NUMBER: _ClassVar[int] + age: int + created_at: str + email: str + id: str + username: str + def __init__( + self, + id: _Optional[str] = ..., + username: _Optional[str] = ..., + email: _Optional[str] = ..., + age: _Optional[int] = ..., + created_at: _Optional[str] = ..., + ) -> None: ... From 4f1317925c7b3449dc3e31a134d93154e39b709f Mon Sep 17 00:00:00 2001 From: Mahad Munir Date: Wed, 29 Oct 2025 19:50:26 +0500 Subject: [PATCH 3/7] add publish_object & subscribe_object --- tests/codec_test.py | 16 +++++++++++++++- xconn/session.py | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/codec_test.py b/tests/codec_test.py index f0120c3..c2a6b09 100644 --- a/tests/codec_test.py +++ b/tests/codec_test.py @@ -5,7 +5,7 @@ from xconn.client import connect_anonymous from xconn import codec -from xconn.types import Invocation, Result +from xconn.types import Invocation, Result, Event from tests.profile_pb2 import ProfileCreate, ProfileGet @@ -86,3 +86,17 @@ def inv_handler(inv: Invocation) -> Result: assert result.created_at == "2025-10-28T17:00:00Z" session.leave() + + +def test_pubsub_object(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(Base64Codec()) + + def event_handler(event: Event): + assert event.args[0] == "hello" + + session.subscribe_object("io.xconn.object", event_handler, String) + + session.publish_object("io.xconn.object", String("hello")) + + session.leave() diff --git a/xconn/session.py b/xconn/session.py index a1212de..05b0f10 100644 --- a/xconn/session.py +++ b/xconn/session.py @@ -188,6 +188,27 @@ def call_object(self, procedure: str, request: TReq, return_type: Type[TRes] = N return self._payload_codec.decode(result.args[0], return_type) + def subscribe_object(self, topic: str, event_handler: Callable[[types.Event], None], return_type: Type[TRes]): + if self._payload_codec is None: + raise ValueError("no payload codec set") + + def _event_handler(event: types.Event): + if len(event.args) != 1: + raise ValueError("only one argument expected in event") + + data = event.args[0] + d = self._payload_codec.decode(data, return_type) + event_handler(types.Event(args=[d], kwargs={}, details={})) + + return self.subscribe(topic, _event_handler) + + def publish_object(self, topic: str, request: TReq): + if self._payload_codec is None: + raise ValueError("no payload codec set") + + encoded = self._payload_codec.encode(request) + return self.publish(topic, [encoded]) + def call( self, procedure: str, From c2efc8fb917b8a492c59188973960891c0bab3b6 Mon Sep 17 00:00:00 2001 From: Mahad Munir Date: Thu, 30 Oct 2025 17:22:01 +0500 Subject: [PATCH 4/7] add register_object --- tests/codec_test.py | 79 +++++++++++++++++++++++++++++++++++++++++++++ xconn/session.py | 43 ++++++++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/tests/codec_test.py b/tests/codec_test.py index c2a6b09..1e4b8a0 100644 --- a/tests/codec_test.py +++ b/tests/codec_test.py @@ -100,3 +100,82 @@ def event_handler(event: Event): session.publish_object("io.xconn.object", String("hello")) session.leave() + + +def test_register_object_one_param_with_return_type(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(ProtobufCodec()) + + def create_profile_handler(prof: ProfileCreate) -> ProfileGet: + return ProfileGet( + id="356", + username=prof.username, + email=prof.email, + age=prof.age, + created_at="2025-10-30T17:00:00Z", + ) + + session.register_object("io.xconn.profile.create", create_profile_handler) + + profile_create = ProfileCreate(username="john", email="john@xconn.io", age=25) + result = session.call("io.xconn.profile.create", [profile_create.SerializeToString()]) + + profile = ProfileGet() + profile.ParseFromString(result.args[0]) + + assert profile.id == "356" + assert profile.username == "john" + assert profile.email == "john@xconn.io" + assert profile.age == 25 + assert profile.created_at == "2025-10-30T17:00:00Z" + + session.leave() + + +def test_register_object_no_param(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(ProtobufCodec()) + + options = {"flag": False} + + def invocation_handler() -> None: + options["flag"] = True + + session.register_object("io.xconn.param.none", invocation_handler) + + result = session.call("io.xconn.param.none") + + assert options["flag"] is True + assert result.args is None + assert result.kwargs is None + + session.leave() + + +def test_register_object_no_param_with_return(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(ProtobufCodec()) + + def get_profile_handler() -> ProfileGet: + return ProfileGet( + id="636", + username="admin", + email="admin@xconn.io", + age=30, + created_at="2025-10-30T17:00:00Z", + ) + + session.register_object("io.xconn.profile.get", get_profile_handler) + + result = session.call("io.xconn.profile.get") + + profile = ProfileGet() + profile.ParseFromString(result.args[0]) + + assert profile.id == "636" + assert profile.username == "admin" + assert profile.email == "admin@xconn.io" + assert profile.age == 30 + assert profile.created_at == "2025-10-30T17:00:00Z" + + session.leave() diff --git a/xconn/session.py b/xconn/session.py index 05b0f10..6e52bf5 100644 --- a/xconn/session.py +++ b/xconn/session.py @@ -1,5 +1,6 @@ from __future__ import annotations +import inspect from concurrent.futures import Future from threading import Thread from typing import Callable, Any, TypeVar, Type @@ -209,6 +210,48 @@ def publish_object(self, topic: str, request: TReq): encoded = self._payload_codec.encode(request) return self.publish(topic, [encoded]) + def register_object( + self, + procedure: str, + invocation_handler: Callable[[TReq], TRes | None] | Callable[[], TRes | None], + ): + if self._payload_codec is None: + raise ValueError("no payload codec set") + + sig = inspect.signature(invocation_handler) + + params = list(sig.parameters.values()) + if len(params) > 1: + raise ValueError("invocation handler must accept 0 or 1 argument") + + if len(params) == 1: + # get parameter's type hint + param_type = params[0].annotation + if param_type is inspect._empty: + raise TypeError("invocation handler parameter must have a type annotation") + else: + param_type = None + + def _invocation_handler(invocation: types.Invocation): + request_obj = None + if param_type is not None: + if len(invocation.args) != 1: + raise ValueError("only one argument expected in invocation") + + request_obj = self._payload_codec.decode(invocation.args[0], param_type) + + result = invocation_handler(request_obj) if param_type is not None else invocation_handler() + + # no return type in invocation handler + if sig.return_annotation is inspect._empty or result is None: + return None + + encoded = self._payload_codec.encode(result) + + return types.Result(args=[encoded]) + + return self.register(procedure, _invocation_handler) + def call( self, procedure: str, From 11762fafe219c30c30a9696ee0ecafc8d5b1394e Mon Sep 17 00:00:00 2001 From: Mahad Munir Date: Thu, 30 Oct 2025 19:05:30 +0500 Subject: [PATCH 5/7] use capnproto codec in tests --- pyproject.toml | 2 +- tests/codec_test.py | 110 +++++++++++++++++++++++++++++++++++++++++++- tests/user.capnp | 14 ++++++ 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 tests/user.capnp diff --git a/pyproject.toml b/pyproject.toml index 4415a8d..3fe57c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ test = [ "pytest-timeout", ] publish = ["twine", "build"] -dev = ["mkdocs-material[imaging]", "wampproto@git+https://github.com/xconnio/wampproto-python", "protobuf"] +dev = ["mkdocs-material[imaging]", "wampproto@git+https://github.com/xconnio/wampproto-python", "protobuf", "pycapnp"] capnproto = [ "wampproto-messages-capnproto@git+https://github.com/xconnio/wampproto-messages-capnproto@main#subdirectory=python" ] diff --git a/tests/codec_test.py b/tests/codec_test.py index 1e4b8a0..4d0d79b 100644 --- a/tests/codec_test.py +++ b/tests/codec_test.py @@ -1,6 +1,8 @@ import base64 -from typing import Type +from pathlib import Path +from typing import Type, TypeVar, Any +import capnp from google.protobuf.message import Message from xconn.client import connect_anonymous @@ -179,3 +181,109 @@ def get_profile_handler() -> ProfileGet: assert profile.created_at == "2025-10-30T17:00:00Z" session.leave() + + +T = TypeVar("T") +SCHEMA_PATH = Path(__file__).parent / "user.capnp" +user_capnp = capnp.load(str(SCHEMA_PATH)) + +UserCreate = user_capnp.UserCreate +UserGet = user_capnp.UserGet + + +class CapnpProtoCodec(codec.Codec[T]): + def name(self) -> str: + return "capnproto" + + def encode(self, obj: Any) -> bytes: + return obj.to_bytes_packed() + + def decode(self, data: bytes, out_type: Type[T]) -> T: + return out_type.from_bytes_packed(data) + + +def test_register_object_capnproto(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(CapnpProtoCodec()) + + def create_handler(user_create: UserCreate) -> UserGet: + user_get = UserGet.new_message() + user_get.id = 999 + user_get.name = user_create.name + user_get.email = user_create.email + user_get.age = user_create.age + user_get.isAdmin = False + + return user_get + + session.register_object("io.xconn.user.create", create_handler) + + new_user = UserCreate.new_message() + new_user.name = "john" + new_user.email = "john@xconn.io" + new_user.age = 35 + + result = session.call("io.xconn.user.create", [new_user.to_bytes_packed()]) + user = UserGet.from_bytes_packed(result.args[0]) + + assert user.id == 999 + assert user.name == "john" + assert user.email == "john@xconn.io" + assert user.age == 35 + assert not user.isAdmin + + session.leave() + + +def test_call_object_capnproto(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(CapnpProtoCodec()) + + def invocation_handler(inv: Invocation) -> Result: + user_create = UserCreate.from_bytes_packed(inv.args[0]) + + user_get = UserGet.new_message() + user_get.id = 78 + user_get.name = user_create.name + user_get.email = user_create.email + user_get.age = user_create.age + user_get.isAdmin = True + + return Result(args=[user_get.to_bytes_packed()]) + + session.register("io.xconn.user.create", invocation_handler) + new_user = UserCreate.new_message() + new_user.name = "alice" + new_user.email = "alice@xconn.io" + new_user.age = 23 + + result: UserGet = session.call_object("io.xconn.user.create", new_user, UserGet) + assert result.id == 78 + assert result.name == "alice" + assert result.email == "alice@xconn.io" + assert result.age == 23 + assert result.isAdmin + + session.leave() + + +def test_pubsub_capnproto(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(CapnpProtoCodec()) + + def event_handler(event: Event): + user: UserGet = event.args[0] + assert user.name == "alice" + assert user.email == "alice@xconn.io" + assert user.age == 21 + + session.subscribe_object("io.xconn.object", event_handler, UserCreate) + + new_user = UserCreate.new_message() + new_user.name = "alice" + new_user.email = "alice@xconn.io" + new_user.age = 21 + + session.publish_object("io.xconn.object", new_user) + + session.leave() diff --git a/tests/user.capnp b/tests/user.capnp new file mode 100644 index 0000000..6b63716 --- /dev/null +++ b/tests/user.capnp @@ -0,0 +1,14 @@ +@0x9d0356c942e754d3; +struct UserCreate { + name @0 :Text; + email @1 :Text; + age @2 :UInt16; +} + +struct UserGet { + id @0 :UInt32; + name @1 :Text; + email @2 :Text; + age @3 :UInt16; + isAdmin @4 :Bool; +} From a6bb296aa73f8476b7518094feb2f9685cf37aee Mon Sep 17 00:00:00 2001 From: Mahad Munir Date: Thu, 30 Oct 2025 20:07:56 +0500 Subject: [PATCH 6/7] chore: cleanup tests --- pyproject.toml | 2 +- tests/codec_test.py | 112 ++++++++++------------------ tests/schemas/__init__.py | 0 tests/{ => schemas}/profile.proto | 0 tests/{ => schemas}/profile_pb2.py | 4 +- tests/{ => schemas}/profile_pb2.pyi | 0 tests/{ => schemas}/user.capnp | 0 xconn/session.py | 14 +++- 8 files changed, 53 insertions(+), 79 deletions(-) create mode 100644 tests/schemas/__init__.py rename tests/{ => schemas}/profile.proto (100%) rename tests/{ => schemas}/profile_pb2.py (86%) rename tests/{ => schemas}/profile_pb2.pyi (100%) rename tests/{ => schemas}/user.capnp (100%) diff --git a/pyproject.toml b/pyproject.toml index 3fe57c8..4887b47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,7 @@ exclude = [ "node_modules", "site-packages", "venv", - "tests/profile_pb2.py" + "tests/schemas/*" ] line-length = 120 diff --git a/tests/codec_test.py b/tests/codec_test.py index 4d0d79b..7d29dfb 100644 --- a/tests/codec_test.py +++ b/tests/codec_test.py @@ -1,3 +1,4 @@ +import os import base64 from pathlib import Path from typing import Type, TypeVar, Any @@ -7,8 +8,8 @@ from xconn.client import connect_anonymous from xconn import codec -from xconn.types import Invocation, Result, Event -from tests.profile_pb2 import ProfileCreate, ProfileGet +from xconn.types import Event +from tests.schemas.profile_pb2 import ProfileCreate, ProfileGet class String(str): @@ -36,15 +37,6 @@ def test_base64_codec(): assert decoded == "hello" -def test_something(): - # session = connect_anonymous("ws://localhost:8080/ws", "realm1") - # session.set_payload_codec(Base64Codec()) - # result = session.call_object("io.xconn.object", String("hello"), String) - # print(result) - # session.leave() - pass - - class ProtobufCodec(codec.Codec[Message]): def name(self) -> str: return "protobuf" @@ -55,18 +47,16 @@ def encode(self, obj: Message) -> bytes: def decode(self, data: bytes, out_type: Type[Message]) -> Message: msg = out_type() msg.ParseFromString(data) + return msg -def test_protobuf_codec(): +def test_rpc_object_protobuf(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(ProtobufCodec()) - def inv_handler(inv: Invocation) -> Result: - profile = ProfileCreate() - profile.ParseFromString(inv.args[0]) - - profile_get = ProfileGet( + def inv_handler(profile: ProfileCreate) -> ProfileGet: + return ProfileGet( id="123", username=profile.username, email=profile.email, @@ -74,9 +64,7 @@ def inv_handler(inv: Invocation) -> Result: created_at="2025-10-28T17:00:00Z", ) - return Result(args=[profile_get.SerializeToString()]) - - session.register("io.xconn.profile.create", inv_handler) + session.register_object("io.xconn.profile.create", inv_handler) create_msg = ProfileCreate(username="john", email="john@xconn.io", age=25) result = session.call_object("io.xconn.profile.create", create_msg, ProfileGet) @@ -104,7 +92,25 @@ def event_handler(event: Event): session.leave() -def test_register_object_one_param_with_return_type(): +def test_pubsub_protobuf(): + session = connect_anonymous("ws://localhost:8080/ws", "realm1") + session.set_payload_codec(ProtobufCodec()) + + def event_handler(event: Event): + user: ProfileCreate = event.args[0] + assert user.username == "john" + assert user.email == "john@xconn.io" + assert user.age == 25 + + session.subscribe_object("io.xconn.object", event_handler, ProfileCreate) + + create_msg = ProfileCreate(username="john", email="john@xconn.io", age=25) + session.publish_object("io.xconn.object", create_msg) + + session.leave() + + +def test_rpc_object_one_param_with_return_type(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(ProtobufCodec()) @@ -120,10 +126,7 @@ def create_profile_handler(prof: ProfileCreate) -> ProfileGet: session.register_object("io.xconn.profile.create", create_profile_handler) profile_create = ProfileCreate(username="john", email="john@xconn.io", age=25) - result = session.call("io.xconn.profile.create", [profile_create.SerializeToString()]) - - profile = ProfileGet() - profile.ParseFromString(result.args[0]) + profile = session.call_object("io.xconn.profile.create", profile_create, ProfileGet) assert profile.id == "356" assert profile.username == "john" @@ -134,7 +137,7 @@ def create_profile_handler(prof: ProfileCreate) -> ProfileGet: session.leave() -def test_register_object_no_param(): +def test_rpc_object_no_param(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(ProtobufCodec()) @@ -145,16 +148,15 @@ def invocation_handler() -> None: session.register_object("io.xconn.param.none", invocation_handler) - result = session.call("io.xconn.param.none") + result = session.call_object("io.xconn.param.none") assert options["flag"] is True - assert result.args is None - assert result.kwargs is None + assert result is None session.leave() -def test_register_object_no_param_with_return(): +def test_rpc_object_no_param_with_return(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(ProtobufCodec()) @@ -169,10 +171,7 @@ def get_profile_handler() -> ProfileGet: session.register_object("io.xconn.profile.get", get_profile_handler) - result = session.call("io.xconn.profile.get") - - profile = ProfileGet() - profile.ParseFromString(result.args[0]) + profile = session.call_object("io.xconn.profile.get", return_type=ProfileGet) assert profile.id == "636" assert profile.username == "admin" @@ -184,8 +183,10 @@ def get_profile_handler() -> ProfileGet: T = TypeVar("T") -SCHEMA_PATH = Path(__file__).parent / "user.capnp" -user_capnp = capnp.load(str(SCHEMA_PATH)) + +root_dir = Path(__file__).resolve().parent +module_file = os.path.join(root_dir, "schemas", "user.capnp") +user_capnp = capnp.load(str(module_file)) UserCreate = user_capnp.UserCreate UserGet = user_capnp.UserGet @@ -202,7 +203,7 @@ def decode(self, data: bytes, out_type: Type[T]) -> T: return out_type.from_bytes_packed(data) -def test_register_object_capnproto(): +def test_rpc_object_capnproto(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(CapnpProtoCodec()) @@ -223,8 +224,7 @@ def create_handler(user_create: UserCreate) -> UserGet: new_user.email = "john@xconn.io" new_user.age = 35 - result = session.call("io.xconn.user.create", [new_user.to_bytes_packed()]) - user = UserGet.from_bytes_packed(result.args[0]) + user = session.call_object("io.xconn.user.create", new_user, UserGet) assert user.id == 999 assert user.name == "john" @@ -235,44 +235,12 @@ def create_handler(user_create: UserCreate) -> UserGet: session.leave() -def test_call_object_capnproto(): - session = connect_anonymous("ws://localhost:8080/ws", "realm1") - session.set_payload_codec(CapnpProtoCodec()) - - def invocation_handler(inv: Invocation) -> Result: - user_create = UserCreate.from_bytes_packed(inv.args[0]) - - user_get = UserGet.new_message() - user_get.id = 78 - user_get.name = user_create.name - user_get.email = user_create.email - user_get.age = user_create.age - user_get.isAdmin = True - - return Result(args=[user_get.to_bytes_packed()]) - - session.register("io.xconn.user.create", invocation_handler) - new_user = UserCreate.new_message() - new_user.name = "alice" - new_user.email = "alice@xconn.io" - new_user.age = 23 - - result: UserGet = session.call_object("io.xconn.user.create", new_user, UserGet) - assert result.id == 78 - assert result.name == "alice" - assert result.email == "alice@xconn.io" - assert result.age == 23 - assert result.isAdmin - - session.leave() - - def test_pubsub_capnproto(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(CapnpProtoCodec()) def event_handler(event: Event): - user: UserGet = event.args[0] + user: UserCreate = event.args[0] assert user.name == "alice" assert user.email == "alice@xconn.io" assert user.age == 21 diff --git a/tests/schemas/__init__.py b/tests/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/profile.proto b/tests/schemas/profile.proto similarity index 100% rename from tests/profile.proto rename to tests/schemas/profile.proto diff --git a/tests/profile_pb2.py b/tests/schemas/profile_pb2.py similarity index 86% rename from tests/profile_pb2.py rename to tests/schemas/profile_pb2.py index 14889cf..176d7e5 100644 --- a/tests/profile_pb2.py +++ b/tests/schemas/profile_pb2.py @@ -15,8 +15,8 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( b'\n\x13tests/profile.proto"=\n\rProfileCreate\x12\x10\n\x08username\x18\x01 \x01(\t\x12\r\n\x05\x65' b'mail\x18\x02 \x01(\t\x12\x0b\n\x03\x61ge\x18\x03 \x01(\x05"Z\n\nProfileGet\x12\n\n\x02id\x18\x01 ' - b'\x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\r\n\x05\x65mail\x18\x03 \x01(\t\x12\x0b\n\x03\x61' - b'ge\x18\x04 \x01(\x05\x12\x12\n\ncreated_at\x18\x05 \x01(\tb\x06proto3' + b"\x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\r\n\x05\x65mail\x18\x03 \x01(\t\x12\x0b\n\x03\x61" + b"ge\x18\x04 \x01(\x05\x12\x12\n\ncreated_at\x18\x05 \x01(\tb\x06proto3" ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) diff --git a/tests/profile_pb2.pyi b/tests/schemas/profile_pb2.pyi similarity index 100% rename from tests/profile_pb2.pyi rename to tests/schemas/profile_pb2.pyi diff --git a/tests/user.capnp b/tests/schemas/user.capnp similarity index 100% rename from tests/user.capnp rename to tests/schemas/user.capnp diff --git a/xconn/session.py b/xconn/session.py index 6e52bf5..18b8436 100644 --- a/xconn/session.py +++ b/xconn/session.py @@ -180,14 +180,20 @@ def _process_incoming_message(self, msg: messages.Message): def set_payload_codec(self, codec: Codec) -> None: self._payload_codec = codec - def call_object(self, procedure: str, request: TReq, return_type: Type[TRes] = None) -> TReq | None: + def call_object(self, procedure: str, request: TReq = None, return_type: Type[TRes] = None) -> TRes | None: if self._payload_codec is None: raise ValueError("no payload codec set") - encoded = self._payload_codec.encode(request) - result = self.call(procedure, [encoded]) + if request is not None: + encoded = self._payload_codec.encode(request) + result = self.call(procedure, [encoded]) + else: + result = self.call(procedure) + + if return_type is not None: + return self._payload_codec.decode(result.args[0], return_type) - return self._payload_codec.decode(result.args[0], return_type) + return None def subscribe_object(self, topic: str, event_handler: Callable[[types.Event], None], return_type: Type[TRes]): if self._payload_codec is None: From 9aa137776290cea29f20a78fad7a37d494432a81 Mon Sep 17 00:00:00 2001 From: Omer Akram Date: Fri, 31 Oct 2025 20:32:30 +0500 Subject: [PATCH 7/7] fixes --- tests/codec_test.py | 76 ++++++++++++++------------------------------- xconn/codec.py | 10 +++--- xconn/session.py | 18 +++++++++-- xconn/types.py | 20 ++++++++---- 4 files changed, 58 insertions(+), 66 deletions(-) diff --git a/tests/codec_test.py b/tests/codec_test.py index 7d29dfb..4804bf8 100644 --- a/tests/codec_test.py +++ b/tests/codec_test.py @@ -1,54 +1,34 @@ import os -import base64 from pathlib import Path -from typing import Type, TypeVar, Any +from typing import Type, TypeVar import capnp from google.protobuf.message import Message from xconn.client import connect_anonymous -from xconn import codec -from xconn.types import Event +from xconn.codec import Codec +from xconn.types import Event, OutgoingDataMessage, IncomingDataMessage from tests.schemas.profile_pb2 import ProfileCreate, ProfileGet -class String(str): - pass +T = TypeVar("T", bound=Message) -class Base64Codec(codec.Codec[String]): - def name(self) -> str: - return "base64" - - def encode(self, obj: String) -> str: - return base64.b64encode(obj.encode("utf-8")).decode("utf-8") - - def decode(self, data: str, out_type: Type[String]) -> String: - return out_type(base64.b64decode(data.encode("utf-8")).decode()) - - -def test_base64_codec(): - encoder = Base64Codec() - encoded = encoder.encode(String("hello")) - assert isinstance(encoded, str) - - decoded = encoder.decode(encoded, String) - assert isinstance(decoded, String) - assert decoded == "hello" - - -class ProtobufCodec(codec.Codec[Message]): +class ProtobufCodec(Codec[T]): def name(self) -> str: return "protobuf" - def encode(self, obj: Message) -> bytes: - return obj.SerializeToString() + def encode(self, obj: T) -> OutgoingDataMessage: + payload = obj.SerializeToString() + return OutgoingDataMessage(args=[payload], kwargs={}, details={}) - def decode(self, data: bytes, out_type: Type[Message]) -> Message: - msg = out_type() - msg.ParseFromString(data) + def decode(self, msg: IncomingDataMessage, out_type: Type[T]) -> T: + if len(msg.args) == 0 or not isinstance(msg.args[0], bytes): + raise ValueError("ProtobufCodec: cannot decode, expected first arg to be bytes") - return msg + obj = out_type() + obj.ParseFromString(msg.args[0]) + return obj def test_rpc_object_protobuf(): @@ -78,20 +58,6 @@ def inv_handler(profile: ProfileCreate) -> ProfileGet: session.leave() -def test_pubsub_object(): - session = connect_anonymous("ws://localhost:8080/ws", "realm1") - session.set_payload_codec(Base64Codec()) - - def event_handler(event: Event): - assert event.args[0] == "hello" - - session.subscribe_object("io.xconn.object", event_handler, String) - - session.publish_object("io.xconn.object", String("hello")) - - session.leave() - - def test_pubsub_protobuf(): session = connect_anonymous("ws://localhost:8080/ws", "realm1") session.set_payload_codec(ProtobufCodec()) @@ -192,15 +158,19 @@ def get_profile_handler() -> ProfileGet: UserGet = user_capnp.UserGet -class CapnpProtoCodec(codec.Codec[T]): +class CapnpProtoCodec(Codec[T]): def name(self) -> str: return "capnproto" - def encode(self, obj: Any) -> bytes: - return obj.to_bytes_packed() + def encode(self, obj: T) -> OutgoingDataMessage: + payload = obj.to_bytes_packed() + return OutgoingDataMessage(args=[payload], kwargs={}, details={}) + + def decode(self, msg: IncomingDataMessage, out_type: Type[T]) -> T: + if len(msg.args) == 0 or not isinstance(msg.args[0], bytes): + raise ValueError("CapnpProtoCodec: cannot decode, expected first arg to be bytes") - def decode(self, data: bytes, out_type: Type[T]) -> T: - return out_type.from_bytes_packed(data) + return out_type.from_bytes_packed(msg.args[0]) def test_rpc_object_capnproto(): diff --git a/xconn/codec.py b/xconn/codec.py index 689b66d..3e5f314 100644 --- a/xconn/codec.py +++ b/xconn/codec.py @@ -1,4 +1,6 @@ -from typing import Any, Generic, Type, TypeVar +from typing import Generic, Type, TypeVar + +from xconn.types import IncomingDataMessage, OutgoingDataMessage T = TypeVar("T") @@ -7,10 +9,10 @@ class Codec(Generic[T]): def name(self) -> str: raise NotImplementedError - def encode(self, obj: Any) -> bytes | str: + def encode(self, obj: T) -> OutgoingDataMessage: """Serialize a Python object to bytes.""" raise NotImplementedError - def decode(self, data: bytes | str, out_type: Type[T]) -> T: - """Deserialize bytes into an instance of out_type.""" + def decode(self, msg: IncomingDataMessage, out_type: Type[T]) -> T: + """Deserialize the incoming message into an instance of out_type.""" raise NotImplementedError diff --git a/xconn/session.py b/xconn/session.py index 18b8436..1611028 100644 --- a/xconn/session.py +++ b/xconn/session.py @@ -3,7 +3,7 @@ import inspect from concurrent.futures import Future from threading import Thread -from typing import Callable, Any, TypeVar, Type +from typing import Callable, Any, TypeVar, Type, overload from dataclasses import dataclass from wampproto import messages, session, uris @@ -180,13 +180,25 @@ def _process_incoming_message(self, msg: messages.Message): def set_payload_codec(self, codec: Codec) -> None: self._payload_codec = codec - def call_object(self, procedure: str, request: TReq = None, return_type: Type[TRes] = None) -> TRes | None: + @overload + def call_object(self, procedure: str, request: TReq, return_type: Type[TRes]) -> TRes: + ... + + @overload + def call_object(self, procedure: str, request: None = None, return_type: None = None) -> None: + ... + + @overload + def call_object(self, procedure: str, request: None, return_type: Type[TRes]) -> TRes: + ... + + def call_object(self, procedure: str, request: TReq = None, return_type: Type[TRes] | None = None) -> TRes | None: if self._payload_codec is None: raise ValueError("no payload codec set") if request is not None: encoded = self._payload_codec.encode(request) - result = self.call(procedure, [encoded]) + result = self.call(procedure, args=encoded.args, kwargs=encoded.kwargs, options=encoded.details) else: result = self.call(procedure) diff --git a/xconn/types.py b/xconn/types.py index 07b4d08..1f358a9 100644 --- a/xconn/types.py +++ b/xconn/types.py @@ -26,26 +26,34 @@ class UnsubscribeRequest: @dataclass -class Result: +class OutgoingDataMessage: args: list | None = None kwargs: dict | None = None details: dict | None = None @dataclass -class Invocation: - args: list | None - kwargs: dict | None - details: dict | None +class Result(OutgoingDataMessage): + pass @dataclass -class Event: +class IncomingDataMessage: args: list | None kwargs: dict | None details: dict | None +@dataclass +class Invocation(IncomingDataMessage): + pass + + +@dataclass() +class Event(IncomingDataMessage): + pass + + @dataclass class TransportConfig: # max wait time for connection to be established