diff --git a/src/Buffer.php b/src/Buffer.php index 1a6c32d..d05a253 100644 --- a/src/Buffer.php +++ b/src/Buffer.php @@ -12,6 +12,7 @@ class Buffer extends EventEmitter implements WritableStreamInterface public $listening = false; public $softLimit = 65536; private $writable = true; + private $closed = false; private $loop; private $data = ''; @@ -64,11 +65,21 @@ public function end($data = null) public function close() { + if ($this->closed) { + return; + } + + if ($this->listening) { + $this->listening = false; + $this->loop->removeWriteStream($this->stream); + } + + $this->closed = true; $this->writable = false; - $this->listening = false; $this->data = ''; $this->emit('close', array($this)); + $this->removeAllListeners(); } public function handleWrite() @@ -108,6 +119,7 @@ public function handleWrite() } $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error->getMessage(), 0, $error), $this)); + $this->close(); return; } diff --git a/tests/BufferTest.php b/tests/BufferTest.php index 2759d13..8df78c1 100644 --- a/tests/BufferTest.php +++ b/tests/BufferTest.php @@ -77,6 +77,20 @@ public function testEmptyWriteDoesNotAddToLoop() $buffer->write(null); } + /** + * @covers React\Stream\Buffer::write + */ + public function testWriteWillAddStreamToLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $buffer = new Buffer($stream, $loop); + + $loop->expects($this->once())->method('addWriteStream')->with($stream); + + $buffer->write('foo'); + } + /** * @covers React\Stream\Buffer::write * @covers React\Stream\Buffer::handleWrite @@ -292,6 +306,52 @@ public function testClose() $this->assertTrue($buffer->isWritable()); $buffer->close(); $this->assertFalse($buffer->isWritable()); + + $this->assertEquals(array(), $buffer->listeners('close')); + } + + /** + * @covers React\Stream\Buffer::close + */ + public function testClosingAfterWriteRemovesStreamFromLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $buffer = new Buffer($stream, $loop); + + $loop->expects($this->once())->method('removeWriteStream')->with($stream); + + $buffer->write('foo'); + $buffer->close(); + } + + /** + * @covers React\Stream\Buffer::close + */ + public function testClosingWithoutWritingDoesNotRemoveStreamFromLoop() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + $buffer = new Buffer($stream, $loop); + + $loop->expects($this->never())->method('removeWriteStream'); + + $buffer->close(); + } + + /** + * @covers React\Stream\Buffer::close + */ + public function testDoubleCloseWillEmitOnlyOnce() + { + $stream = fopen('php://temp', 'r+'); + $loop = $this->createLoopMock(); + + $buffer = new Buffer($stream, $loop); + $buffer->on('close', $this->expectCallableOnce()); + + $buffer->close(); + $buffer->close(); } /**