diff --git a/src/CompositeStream.php b/src/CompositeStream.php index b28cdd7..47f057c 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -8,7 +8,6 @@ class CompositeStream extends EventEmitter implements DuplexStreamInterface { protected $readable; protected $writable; - protected $pipeSource; protected $closed = false; public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable) @@ -21,13 +20,6 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt $this->readable->on('close', array($this, 'close')); $this->writable->on('close', array($this, 'close')); - - $this->on('pipe', array($this, 'handlePipeEvent')); - } - - public function handlePipeEvent($source) - { - $this->pipeSource = $source; } public function isReadable() @@ -37,10 +29,6 @@ public function isReadable() public function pause() { - if ($this->pipeSource) { - $this->pipeSource->pause(); - } - $this->readable->pause(); } @@ -50,10 +38,6 @@ public function resume() return; } - if ($this->pipeSource) { - $this->pipeSource->resume(); - } - $this->readable->resume(); } @@ -85,8 +69,6 @@ public function close() } $this->closed = true; - $this->pipeSource = null; - $this->readable->close(); $this->writable->close(); diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index 62dfe98..d1c9257 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -146,66 +146,6 @@ public function itShouldHandlePipingCorrectly() $input->emit('data', array('foo')); } - /** @test */ - public function itShouldForwardPauseUpstreamWhenPipedTo() - { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); - $readable->expects($this->any())->method('isReadable')->willReturn(true); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); - $writable->expects($this->any())->method('isWritable')->willReturn(true); - - $composite = new CompositeStream($readable, $writable); - - $input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock(); - $input - ->expects($this->once()) - ->method('pause'); - - $input->pipe($composite); - $composite->pause(); - } - - /** @test */ - public function itShouldForwardResumeUpstreamWhenPipedTo() - { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); - $readable->expects($this->any())->method('isReadable')->willReturn(true); - $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); - $writable->expects($this->any())->method('isWritable')->willReturn(true); - - $composite = new CompositeStream($readable, $writable); - - $input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock(); - $input - ->expects($this->once()) - ->method('resume'); - - $input->pipe($composite); - $composite->resume(); - } - - /** @test */ - public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo() - { - $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); - $writable = new ThroughStream(); - $writable->pause(); - - $composite = new CompositeStream($readable, $writable); - - $input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock(); - $input - ->expects($this->once()) - ->method('pause'); - $input - ->expects($this->once()) - ->method('resume'); - - $input->pipe($composite); - $input->emit('data', array('foo')); - $writable->emit('drain'); - } - /** @test */ public function itShouldForwardPipeCallsToReadableStream() {