diff --git a/src/CompositeStream.php b/src/CompositeStream.php index b2f88b0..1446900 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -9,14 +9,15 @@ class CompositeStream extends EventEmitter implements DuplexStreamInterface protected $readable; protected $writable; protected $pipeSource; + protected $closed = false; public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable) { $this->readable = $readable; $this->writable = $writable; - Util::forwardEvents($this->readable, $this, array('data', 'end', 'error', 'close')); - Util::forwardEvents($this->writable, $this, array('drain', 'error', 'close', 'pipe')); + Util::forwardEvents($this->readable, $this, array('data', 'end', 'error')); + Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe')); $this->readable->on('close', array($this, 'close')); $this->writable->on('close', array($this, 'close')); @@ -76,9 +77,16 @@ public function end($data = null) public function close() { + if ($this->closed) { + return; + } + + $this->closed = true; $this->pipeSource = null; $this->readable->close(); $this->writable->close(); + + $this->emit('close'); } } diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index 975a788..2c1eacd 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -80,6 +80,19 @@ public function closeShouldCloseBothStreams() $composite->close(); } + /** @test */ + public function itShouldForwardCloseOnlyOnce() + { + $readable = new ReadableStream(); + $writable = new WritableStream(); + + $composite = new CompositeStream($readable, $writable); + $composite->on('close', $this->expectCallableOnce()); + + $readable->close(); + $writable->close(); + } + /** @test */ public function itShouldReceiveForwardedEvents() { diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 64fe109..acc964c 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -95,9 +95,12 @@ public function resumeShouldDelegateToPipeSource() } /** @test */ - public function closeShouldClose() + public function closeShouldCloseOnce() { $through = new ThroughStream(); + + $through->on('close', $this->expectCallableOnce()); + $through->close(); $this->assertFalse($through->isReadable()); @@ -105,9 +108,12 @@ public function closeShouldClose() } /** @test */ - public function doubleCloseShouldWork() + public function doubleCloseShouldCloseOnce() { $through = new ThroughStream(); + + $through->on('close', $this->expectCallableOnce()); + $through->close(); $through->close();