diff --git a/src/Gateway/Connection.php b/src/Gateway/Connection.php index 147c395..52c1f5e 100644 --- a/src/Gateway/Connection.php +++ b/src/Gateway/Connection.php @@ -14,7 +14,6 @@ use Ragnarok\Fenrir\Constants\WebsocketEvents; use Ragnarok\Fenrir\DataMapper; use Ragnarok\Fenrir\EventHandler; -use Ragnarok\Fenrir\Gateway\Events\Meta\MetaEvent; use Ragnarok\Fenrir\Gateway\Handlers\HeartbeatAcknowledgedEvent; use Ragnarok\Fenrir\Gateway\Handlers\IdentifyHelloEvent; use Ragnarok\Fenrir\Gateway\Handlers\IdentifyResumeEvent; @@ -32,6 +31,7 @@ use React\EventLoop\LoopInterface; use React\EventLoop\TimerInterface; use React\Promise\PromiseInterface; +use InflateContext; /** * @SuppressWarnings(PHPMD.TooManyPublicMethods) @@ -41,22 +41,26 @@ class Connection implements ConnectionInterface public const DISCORD_VERSION = 10; public const DEFAULT_WEBSOCKET_URL = 'wss://gateway.discord.gg/'; - private const QUERY_DATA = ['v' => self::DISCORD_VERSION]; + private const QUERY_DATA = [ + 'v' => self::DISCORD_VERSION, + 'encoding' => 'json', + 'compress' => 'zlib-stream' + ]; private const HEARTBEAT_ACK_TIMEOUT = 2.5; private ?int $sequence = null; - private ?string $sessionId = null; private ?string $resumeUrl = null; public EventHandler $events; - private TimerInterface $heartbeatTimer; private TimerInterface $unacknowledgedHeartbeatTimer; - private ShardInterface $shard; + private string $buffer = ''; + private InflateContext|false $inflate; + public function __construct( private LoopInterface $loop, private string $token, @@ -69,23 +73,50 @@ public function __construct( private Retrier $retrier = new Retrier(), ) { $this->events = new EventHandler($mapper); + $this->resetInflater(); $this->websocket->on(WebsocketEvents::MESSAGE, function (MessageInterface $message) { - $parsedMessage = json_decode((string) $message, depth: 1024); - if ($parsedMessage === null) { + $this->buffer .= $message->getPayload(); + + if (!str_ends_with($this->buffer, "\x00\x00\xff\xff")) { return; } - $payload = $this->mapper->map($parsedMessage, Payload::class); + $json = inflate_add($this->inflate, $this->buffer); + $this->buffer = ''; - $this->raw->emit((string) $payload->op, [$this, $payload, $this->logger]); + if ($json === false) { + $this->logger->warning('ZLIB decompression error'); + return; + } + + $parsed = json_decode($json); + if ($parsed === null) { + $this->logger->warning('Failed to decode JSON payload'); + return; + } + + $payload = $this->mapper->map($parsed, Payload::class); + + $this->loop->futureTick(function () use ($payload) { + $this->raw->emit((string) $payload->op, [$this, $payload, $this->logger]); + }); }); - $this->websocket->on(WebsocketEvents::CLOSE, $this->handleClose(...)); + $this->websocket->on(WebsocketEvents::CLOSE, function (int $code, string $reason) { + $this->resetInflater(); + $this->handleClose($code, $reason); + }); $this->registerEvents(); } + private function resetInflater(): void + { + $this->inflate = inflate_init(ZLIB_ENCODING_DEFLATE); + $this->buffer = ''; + } + private function registerEvents(): void { $this->raw->register( diff --git a/tests/Gateway/ConnectionTest.php b/tests/Gateway/ConnectionTest.php index c2de949..ef09ac5 100644 --- a/tests/Gateway/ConnectionTest.php +++ b/tests/Gateway/ConnectionTest.php @@ -35,6 +35,12 @@ class ConnectionTest extends MockeryTestCase { + private const EXPECTED_QUERY_PARAMS = [ + 'v' => 10, + 'encoding' => 'json', + 'compress' => 'zlib-stream' + ]; + public function testGetDefaultUrl(): void { $connection = new Connection( @@ -80,7 +86,7 @@ public function testConnect(): void $websocket->expects() ->open() - ->with('::ws url::?v=10') + ->with('::ws url::?' . http_build_query(self::EXPECTED_QUERY_PARAMS)) ->andReturns(PromiseFake::get('::return::')) ->once(); @@ -462,8 +468,16 @@ public function testItEmitsGatewayMessagesAsEvents(): void ->registerOnce() ->withAnyArgs(); + $loop = Mockery::mock(LoopInterface::class); + $loop->shouldReceive('futureTick') + ->once() + ->with(Mockery::on(function ($callback) { + $callback(); + return true; + })); + $connection = new Connection( - Mockery::mock(LoopInterface::class), + $loop, '::token::', new Bitwise(), new DataMapper(new NullLogger()), @@ -486,8 +500,8 @@ public function testItEmitsGatewayMessagesAsEvents(): void /** @var MessageInterface&MockInterface */ $message = Mockery::mock(MessageInterface::class); $message->expects() - ->__toString() - ->andReturns('{"op": 1}') + ->getPayload() + ->andReturns(zlib_encode('{"op":1}', ZLIB_ENCODING_DEFLATE) . "\x00\x00\xff\xff") ->once(); $websocket->emit(WebsocketEvents::MESSAGE, [$message]); @@ -556,7 +570,7 @@ public function testItReconnectsWhenWebsocketConnectionClosedWithCertainCodes(in $websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']); - $this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?v=' . Connection::DISCORD_VERSION], $websocket->openings); + $this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings); } public static function reconnectCloseCodesProvider(): array @@ -606,7 +620,7 @@ public function testItResumesWhenWebsocketConnectionClosedWithCertainCodes(int $ $websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']); - $this->assertEquals(['::resume url::?v=' . Connection::DISCORD_VERSION], $websocket->openings); + $this->assertEquals(['::resume url::?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings); } /** @@ -641,7 +655,7 @@ public function testItReconnectsIfMissingResumeUrl(int $code) $websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']); - $this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?v=' . Connection::DISCORD_VERSION], $websocket->openings); + $this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings); } /** @@ -676,7 +690,7 @@ public function testItReconnectsIfMissingSessionId(int $code) $websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']); - $this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?v=' . Connection::DISCORD_VERSION], $websocket->openings); + $this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings); } public static function resumeCloseCodesProvider(): array