Skip to content
51 changes: 41 additions & 10 deletions src/Gateway/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Ragnarok\Fenrir\Constants\WebsocketEvents;
use Ragnarok\Fenrir\DataMapper;
use Ragnarok\Fenrir\EventHandler;
use Ragnarok\Fenrir\Gateway\Events\Meta\MetaEvent;
use Ragnarok\Fenrir\Gateway\Handlers\HeartbeatAcknowledgedEvent;
use Ragnarok\Fenrir\Gateway\Handlers\IdentifyHelloEvent;
use Ragnarok\Fenrir\Gateway\Handlers\IdentifyResumeEvent;
Expand All @@ -32,6 +31,7 @@
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\PromiseInterface;
use InflateContext;

/**
* @SuppressWarnings(PHPMD.TooManyPublicMethods)
Expand All @@ -41,22 +41,26 @@ class Connection implements ConnectionInterface
public const DISCORD_VERSION = 10;
public const DEFAULT_WEBSOCKET_URL = 'wss://gateway.discord.gg/';

private const QUERY_DATA = ['v' => self::DISCORD_VERSION];
private const QUERY_DATA = [
'v' => self::DISCORD_VERSION,
'encoding' => 'json',
'compress' => 'zlib-stream'
];

private const HEARTBEAT_ACK_TIMEOUT = 2.5;

private ?int $sequence = null;

private ?string $sessionId = null;
private ?string $resumeUrl = null;

public EventHandler $events;

private TimerInterface $heartbeatTimer;
private TimerInterface $unacknowledgedHeartbeatTimer;

private ShardInterface $shard;

private string $buffer = '';
private InflateContext|false $inflate;

public function __construct(
private LoopInterface $loop,
private string $token,
Expand All @@ -69,23 +73,50 @@ public function __construct(
private Retrier $retrier = new Retrier(),
) {
$this->events = new EventHandler($mapper);
$this->resetInflater();

$this->websocket->on(WebsocketEvents::MESSAGE, function (MessageInterface $message) {
$parsedMessage = json_decode((string) $message, depth: 1024);
if ($parsedMessage === null) {
$this->buffer .= $message->getPayload();

if (!str_ends_with($this->buffer, "\x00\x00\xff\xff")) {
return;
}

$payload = $this->mapper->map($parsedMessage, Payload::class);
$json = inflate_add($this->inflate, $this->buffer);
$this->buffer = '';

$this->raw->emit((string) $payload->op, [$this, $payload, $this->logger]);
if ($json === false) {
$this->logger->warning('ZLIB decompression error');
return;
}

$parsed = json_decode($json);
if ($parsed === null) {
$this->logger->warning('Failed to decode JSON payload');
return;
}

$payload = $this->mapper->map($parsed, Payload::class);

$this->loop->futureTick(function () use ($payload) {
$this->raw->emit((string) $payload->op, [$this, $payload, $this->logger]);
});
});

$this->websocket->on(WebsocketEvents::CLOSE, $this->handleClose(...));
$this->websocket->on(WebsocketEvents::CLOSE, function (int $code, string $reason) {
$this->resetInflater();
$this->handleClose($code, $reason);
});

$this->registerEvents();
}

private function resetInflater(): void
{
$this->inflate = inflate_init(ZLIB_ENCODING_DEFLATE);
$this->buffer = '';
}

private function registerEvents(): void
{
$this->raw->register(
Expand Down
30 changes: 22 additions & 8 deletions tests/Gateway/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

class ConnectionTest extends MockeryTestCase
{
private const EXPECTED_QUERY_PARAMS = [
'v' => 10,
'encoding' => 'json',
'compress' => 'zlib-stream'
];

public function testGetDefaultUrl(): void
{
$connection = new Connection(
Expand Down Expand Up @@ -80,7 +86,7 @@ public function testConnect(): void

$websocket->expects()
->open()
->with('::ws url::?v=10')
->with('::ws url::?' . http_build_query(self::EXPECTED_QUERY_PARAMS))
->andReturns(PromiseFake::get('::return::'))
->once();

Expand Down Expand Up @@ -462,8 +468,16 @@ public function testItEmitsGatewayMessagesAsEvents(): void
->registerOnce()
->withAnyArgs();

$loop = Mockery::mock(LoopInterface::class);
$loop->shouldReceive('futureTick')
->once()
->with(Mockery::on(function ($callback) {
$callback();
return true;
}));

$connection = new Connection(
Mockery::mock(LoopInterface::class),
$loop,
'::token::',
new Bitwise(),
new DataMapper(new NullLogger()),
Expand All @@ -486,8 +500,8 @@ public function testItEmitsGatewayMessagesAsEvents(): void
/** @var MessageInterface&MockInterface */
$message = Mockery::mock(MessageInterface::class);
$message->expects()
->__toString()
->andReturns('{"op": 1}')
->getPayload()
->andReturns(zlib_encode('{"op":1}', ZLIB_ENCODING_DEFLATE) . "\x00\x00\xff\xff")
->once();

$websocket->emit(WebsocketEvents::MESSAGE, [$message]);
Expand Down Expand Up @@ -556,7 +570,7 @@ public function testItReconnectsWhenWebsocketConnectionClosedWithCertainCodes(in

$websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']);

$this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?v=' . Connection::DISCORD_VERSION], $websocket->openings);
$this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings);
}

public static function reconnectCloseCodesProvider(): array
Expand Down Expand Up @@ -606,7 +620,7 @@ public function testItResumesWhenWebsocketConnectionClosedWithCertainCodes(int $

$websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']);

$this->assertEquals(['::resume url::?v=' . Connection::DISCORD_VERSION], $websocket->openings);
$this->assertEquals(['::resume url::?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings);
}

/**
Expand Down Expand Up @@ -641,7 +655,7 @@ public function testItReconnectsIfMissingResumeUrl(int $code)

$websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']);

$this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?v=' . Connection::DISCORD_VERSION], $websocket->openings);
$this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings);
}

/**
Expand Down Expand Up @@ -676,7 +690,7 @@ public function testItReconnectsIfMissingSessionId(int $code)

$websocket->emit(WebsocketEvents::CLOSE, [$code, 'reason']);

$this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?v=' . Connection::DISCORD_VERSION], $websocket->openings);
$this->assertEquals([Connection::DEFAULT_WEBSOCKET_URL . '?' . http_build_query(self::EXPECTED_QUERY_PARAMS)], $websocket->openings);
}

public static function resumeCloseCodesProvider(): array
Expand Down