diff --git a/xconn/async_session.py b/xconn/async_session.py index 51e47cc..7265629 100644 --- a/xconn/async_session.py +++ b/xconn/async_session.py @@ -25,17 +25,7 @@ def __init__(self, registration_id: int, session: AsyncSession): self._session = session async def unregister(self) -> None: - if not await self._session._base_session.transport.is_connected(): - raise Exception("cannot unregister procedure: session not established") - - unregister = messages.Unregister(messages.UnregisterFields(self._session._idgen.next(), self.registration_id)) - data = self._session._session.send_message(unregister) - - f: Future = Future() - self._session._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, self.registration_id) - await self._session._base_session.send(data) - - return await f + return await self._session._unregister(self) @dataclass @@ -50,19 +40,7 @@ def __init__(self, subscription_id: int, session: AsyncSession): self._session = session async def unsubscribe(self) -> None: - if not await self._session._base_session.transport.is_connected(): - raise Exception("cannot unsubscribe topic: session not established") - - unsubscribe = messages.Unsubscribe( - messages.UnsubscribeFields(self._session._idgen.next(), self.subscription_id) - ) - data = self._session._session.send_message(unsubscribe) - - f: Future = Future() - self._session._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, self.subscription_id) - await self._session._base_session.send(data) - - return await f + return await self._session._unsubscribe(self) class AsyncSession: @@ -237,6 +215,19 @@ async def call( return await f + async def _unregister(self, reg: Registration) -> None: + if not await self._base_session.transport.is_connected(): + raise Exception("cannot unregister procedure: session not established") + + unregister = messages.Unregister(messages.UnregisterFields(self._idgen.next(), reg.registration_id)) + data = self._session.send_message(unregister) + + f: Future = Future() + self._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, reg.registration_id) + await self._base_session.send(data) + + return await f + async def subscribe( self, topic: str, event_handler: Callable[[types.Event], Awaitable[None]], options: dict | None = None ) -> Subscription: @@ -266,6 +257,19 @@ async def publish( await self._base_session.send(data) + async def _unsubscribe(self, sub: Subscription) -> None: + if not await self._base_session.transport.is_connected(): + raise Exception("cannot unsubscribe topic: session not established") + + unsubscribe = messages.Unsubscribe(messages.UnsubscribeFields(self._idgen.next(), sub.subscription_id)) + data = self._session.send_message(unsubscribe) + + f: Future = Future() + self._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, sub.subscription_id) + await self._base_session.send(data) + + return await f + async def leave(self) -> None: self._goodbye_request = Future() diff --git a/xconn/session.py b/xconn/session.py index 1cf83e6..b13aed7 100644 --- a/xconn/session.py +++ b/xconn/session.py @@ -24,17 +24,7 @@ def __init__(self, registration_id: int, session: Session): self._session = session def unregister(self) -> None: - if not self._session._base_session.transport.is_connected(): - raise Exception("cannot unregister procedure: session not established") - - unregister = messages.Unregister(messages.UnregisterFields(self._session._idgen.next(), self.registration_id)) - data = self._session._session.send_message(unregister) - - f: Future = Future() - self._session._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, self.registration_id) - self._session._base_session.send(data) - - f.result() + self._session._unregister(self) @dataclass @@ -49,19 +39,7 @@ def __init__(self, subscription_id: int, session: Session): self._session = session def unsubscribe(self) -> None: - if not self._session._base_session.transport.is_connected(): - raise Exception("cannot unsubscribe topic: session not established") - - unsubscribe = messages.Unsubscribe( - messages.UnsubscribeFields(self._session._idgen.next(), self.subscription_id) - ) - data = self._session._session.send_message(unsubscribe) - - f: Future = Future() - self._session._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, self.subscription_id) - self._session._base_session.send(data) - - f.result() + self._session._unsubscribe(self) class Session: @@ -252,6 +230,19 @@ def register( return f.result() + def _unregister(self, reg: Registration) -> None: + if not self._base_session.transport.is_connected(): + raise Exception("cannot unregister procedure: session not established") + + unregister = messages.Unregister(messages.UnregisterFields(self._idgen.next(), reg.registration_id)) + data = self._session.send_message(unregister) + + f: Future = Future() + self._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, reg.registration_id) + self._base_session.send(data) + + f.result() + def subscribe(self, topic: str, event_handler: Callable[[types.Event], None], options: dict = None) -> Subscription: subscribe = messages.Subscribe(messages.SubscribeFields(self._idgen.next(), topic, options=options)) data = self._session.send_message(subscribe) @@ -274,6 +265,19 @@ def publish(self, topic: str, args: list[Any] = None, kwargs: dict = None, optio self._base_session.send(data) + def _unsubscribe(self, sub: Subscription) -> None: + if not self._base_session.transport.is_connected(): + raise Exception("cannot unsubscribe topic: session not established") + + unsubscribe = messages.Unsubscribe(messages.UnsubscribeFields(self._idgen.next(), sub.subscription_id)) + data = self._session.send_message(unsubscribe) + + f: Future = Future() + self._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, sub.subscription_id) + self._base_session.send(data) + + f.result() + def leave(self): self._goodbye_request = Future()