diff --git a/src/ExtEventLoop.php b/src/ExtEventLoop.php index b1cb3e74..8eee66ad 100644 --- a/src/ExtEventLoop.php +++ b/src/ExtEventLoop.php @@ -22,6 +22,7 @@ class ExtEventLoop implements LoopInterface private $streamCallback; private $streamEvents = []; private $streamFlags = []; + private $streamRefs = []; private $readListeners = []; private $writeListeners = []; private $running; @@ -110,7 +111,8 @@ private function removeStream($stream) $this->streamFlags[$key], $this->streamEvents[$key], $this->readListeners[$key], - $this->writeListeners[$key] + $this->writeListeners[$key], + $this->streamRefs[$key] ); } } @@ -224,6 +226,12 @@ private function subscribeStreamEvent($stream, $flag) $this->streamEvents[$key] = $event; $this->streamFlags[$key] = $flag; + + // ext-event does not increase refcount on stream resources for PHP 7+ + // manually keep track of stream resource to prevent premature garbage collection + if (PHP_VERSION_ID >= 70000) { + $this->streamRefs[$key] = $stream; + } } $event->add(); diff --git a/tests/AbstractLoopTest.php b/tests/AbstractLoopTest.php index a256c82a..bee7f495 100644 --- a/tests/AbstractLoopTest.php +++ b/tests/AbstractLoopTest.php @@ -61,6 +61,39 @@ public function testAddReadStreamIgnoresSecondCallable() $this->tickLoop($this->loop); } + public function testAddReadStreamReceivesDataFromStreamReference() + { + $this->received = ''; + $this->subAddReadStreamReceivesDataFromStreamReference(); + $this->assertEquals('', $this->received); + + $this->assertRunFasterThan($this->tickTimeout * 2); + $this->assertEquals('[hello]X', $this->received); + } + + /** + * Telper for above test. This happens in another helper method to verify + * the loop keep track of assigned stream resources (refcount). + */ + private function subAddReadStreamReceivesDataFromStreamReference() + { + list ($input, $output) = $this->createSocketPair(); + + fwrite($input, 'hello'); + fclose($input); + + $this->loop->addReadStream($output, function ($output) { + $chunk = fread($output, 1024); + if ($chunk === '') { + $this->received .= 'X'; + $this->loop->removeReadStream($output); + fclose($output); + } else { + $this->received .= '[' . $chunk . ']'; + } + }); + } + public function testAddWriteStream() { list ($input) = $this->createSocketPair();