diff --git a/src/Stream.php b/src/Stream.php index b8b034c..eb7d4c8 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -8,7 +8,24 @@ class Stream extends EventEmitter implements DuplexStreamInterface { + /** + * Controls the maximum buffer size in bytes to ready at once from the stream. + * + * This can be a positive number which means that up to X bytes will be read + * at once from the underlying stream resource. Note that the actual number + * of bytes read may be lower if the stream resource has less than X bytes + * currently available. + * + * This can be `null` which means read everything available from the + * underlying stream resource. + * This should read until the stream resource is not readable anymore + * (i.e. underlying buffer drained), note that this does not neccessarily + * mean it reached EOF. + * + * @var int|null + */ public $bufferSize = 4096; + public $stream; protected $readable = true; protected $writable = true; @@ -139,7 +156,7 @@ public function handleData($stream) ); }); - $data = fread($stream, $this->bufferSize); + $data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize); restore_error_handler(); diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 0c913ac..d671432 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -51,6 +51,58 @@ public function testDataEvent() $this->assertSame("foobar\n", $capturedData); } + /** + * @covers React\Stream\Stream::__construct + * @covers React\Stream\Stream::handleData + */ + public function testDataEventDoesEmitOneChunkMatchingBufferSize() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $capturedData = null; + + $conn = new Stream($stream, $loop); + $conn->on('data', function ($data) use (&$capturedData) { + $capturedData = $data; + }); + + fwrite($stream, str_repeat("a", 100000)); + rewind($stream); + + $conn->handleData($stream); + + $this->assertTrue($conn->isReadable()); + $this->assertEquals($conn->bufferSize, strlen($capturedData)); + } + + /** + * @covers React\Stream\Stream::__construct + * @covers React\Stream\Stream::handleData + */ + public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfinite() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $capturedData = null; + + $conn = new Stream($stream, $loop); + $conn->bufferSize = null; + + $conn->on('data', function ($data) use (&$capturedData) { + $capturedData = $data; + }); + + fwrite($stream, str_repeat("a", 100000)); + rewind($stream); + + $conn->handleData($stream); + + $this->assertFalse($conn->isReadable()); + $this->assertEquals(100000, strlen($capturedData)); + } + /** * @covers React\Stream\Stream::handleData */