From 0982ad8a0ecddcfaf1ef209570f6c5cf44751923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sat, 5 Mar 2016 15:19:39 +0100 Subject: [PATCH] Emit error and close when reading from stream fails --- composer.json | 3 ++- src/Stream.php | 19 +++++++++++++++ tests/StreamTest.php | 57 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 09e50e8..0b9f875 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,8 @@ }, "require-dev": { "react/event-loop": "^0.4|^0.3", - "react/promise": "^2.0|^1.0" + "react/promise": "^2.0|^1.0", + "clue/stream-filter": "~1.2" }, "suggest": { "react/event-loop": "^0.4", diff --git a/src/Stream.php b/src/Stream.php index 873ad36..b8b034c 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -128,8 +128,27 @@ public function pipe(WritableStreamInterface $dest, array $options = array()) public function handleData($stream) { + $error = null; + set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { + $error = new \ErrorException( + $errstr, + 0, + $errno, + $errfile, + $errline + ); + }); + $data = fread($stream, $this->bufferSize); + restore_error_handler(); + + if ($error !== null) { + $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error), $this)); + $this->close(); + return; + } + if ($data !== '') { $this->emit('data', array($data, $this)); } diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 33f36b6..0c913ac 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -3,6 +3,7 @@ namespace React\Tests\Stream; use React\Stream\Stream; +use Clue\StreamFilter as Filter; class StreamTest extends TestCase { @@ -127,6 +128,62 @@ public function testClosingStreamInDataEventShouldNotTriggerError() $conn->handleData($stream); } + /** + * @covers React\Stream\Stream::handleData + */ + public function testDataFiltered() + { + $stream = fopen('php://temp', 'r+'); + + // add a filter which removes every 'a' when reading + Filter\append($stream, function ($chunk) { + return str_replace('a', '', $chunk); + }, STREAM_FILTER_READ); + + $loop = $this->createLoopMock(); + + $capturedData = null; + + $conn = new Stream($stream, $loop); + $conn->on('data', function ($data) use (&$capturedData) { + $capturedData = $data; + }); + + fwrite($stream, "foobar\n"); + rewind($stream); + + $conn->handleData($stream); + $this->assertSame("foobr\n", $capturedData); + } + + /** + * @covers React\Stream\Stream::handleData + */ + public function testDataErrorShouldEmitErrorAndClose() + { + $stream = fopen('php://temp', 'r+'); + + // add a filter which returns an error when encountering an 'a' when reading + Filter\append($stream, function ($chunk) { + if (strpos($chunk, 'a') !== false) { + throw new \Exception('Invalid'); + } + return $chunk; + }, STREAM_FILTER_READ); + + $loop = $this->createLoopMock(); + + $conn = new Stream($stream, $loop); + $conn->on('data', $this->expectCallableNever()); + $conn->on('error', $this->expectCallableOnce()); + $conn->on('close', $this->expectCallableOnce()); + + fwrite($stream, "foobar\n"); + rewind($stream); + + $conn->handleData($stream); + } + private function createWriteableLoopMock() { $loop = $this->createLoopMock();