diff --git a/src/ReadableStream.php b/src/ReadableStream.php deleted file mode 100644 index d0a4e58..0000000 --- a/src/ReadableStream.php +++ /dev/null @@ -1,39 +0,0 @@ -closed; - } - - public function pause() - { - } - - public function resume() - { - } - - public function pipe(WritableStreamInterface $dest, array $options = array()) - { - return Util::pipe($this, $dest, $options); - } - - public function close() - { - if ($this->closed) { - return; - } - - $this->closed = true; - $this->emit('close'); - $this->removeAllListeners(); - } -} diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 6f63e2e..9a22b2d 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -185,5 +185,6 @@ public function close() $this->callback = null; $this->emit('close'); + $this->removeAllListeners(); } } diff --git a/src/WritableStream.php b/src/WritableStream.php deleted file mode 100644 index 7b96b62..0000000 --- a/src/WritableStream.php +++ /dev/null @@ -1,40 +0,0 @@ -write($data); - } - - $this->close(); - } - - public function isWritable() - { - return !$this->closed; - } - - public function close() - { - if ($this->closed) { - return; - } - - $this->closed = true; - $this->emit('close'); - $this->removeAllListeners(); - } -} diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index 3704aa8..62dfe98 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -3,8 +3,7 @@ namespace React\Tests\Stream; use React\Stream\CompositeStream; -use React\Stream\ReadableStream; -use React\Stream\WritableStream; +use React\Stream\ThroughStream; /** * @covers React\Stream\CompositeStream @@ -105,8 +104,8 @@ public function closeShouldCloseBothStreams() /** @test */ public function itShouldForwardCloseOnlyOnce() { - $readable = new ReadableStream(); - $writable = new WritableStream(); + $readable = new ThroughStream(); + $writable = new ThroughStream(); $composite = new CompositeStream($readable, $writable); $composite->on('close', $this->expectCallableOnce()); @@ -118,8 +117,8 @@ public function itShouldForwardCloseOnlyOnce() /** @test */ public function itShouldReceiveForwardedEvents() { - $readable = new ReadableStream(); - $writable = new WritableStream(); + $readable = new ThroughStream(); + $writable = new ThroughStream(); $composite = new CompositeStream($readable, $writable); $composite->on('data', $this->expectCallableOnce()); @@ -142,7 +141,7 @@ public function itShouldHandlePipingCorrectly() $composite = new CompositeStream($readable, $writable); - $input = new ReadableStream(); + $input = new ThroughStream(); $input->pipe($composite); $input->emit('data', array('foo')); } @@ -157,7 +156,7 @@ public function itShouldForwardPauseUpstreamWhenPipedTo() $composite = new CompositeStream($readable, $writable); - $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock(); + $input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock(); $input ->expects($this->once()) ->method('pause'); @@ -176,7 +175,7 @@ public function itShouldForwardResumeUpstreamWhenPipedTo() $composite = new CompositeStream($readable, $writable); - $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock(); + $input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock(); $input ->expects($this->once()) ->method('resume'); @@ -189,15 +188,12 @@ public function itShouldForwardResumeUpstreamWhenPipedTo() public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo() { $readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock(); - $writable = $this->getMockBuilder('React\Stream\WritableStream')->setMethods(array('write'))->getMock(); - $writable - ->expects($this->once()) - ->method('write') - ->will($this->returnValue(false)); + $writable = new ThroughStream(); + $writable->pause(); $composite = new CompositeStream($readable, $writable); - $input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause', 'resume'))->getMock(); + $input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock(); $input ->expects($this->once()) ->method('pause'); @@ -213,7 +209,7 @@ public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo() /** @test */ public function itShouldForwardPipeCallsToReadableStream() { - $readable = new ReadableStream(); + $readable = new ThroughStream(); $writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); $writable->expects($this->any())->method('isWritable')->willReturn(True); $composite = new CompositeStream($readable, $writable); diff --git a/tests/ReadableStreamTest.php b/tests/ReadableStreamTest.php deleted file mode 100644 index c2b0f65..0000000 --- a/tests/ReadableStreamTest.php +++ /dev/null @@ -1,67 +0,0 @@ -assertTrue($readable->isReadable()); - } - - /** @test */ - public function pauseShouldDoNothing() - { - $readable = new ReadableStream(); - $readable->pause(); - } - - /** @test */ - public function resumeShouldDoNothing() - { - $readable = new ReadableStream(); - $readable->resume(); - } - - /** @test */ - public function pipeShouldReturnDestination() - { - $dest = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock(); - $readable = new ReadableStream(); - - $this->assertSame($dest, $readable->pipe($dest)); - } - - /** @test */ - public function closeShouldClose() - { - $readable = new ReadableStream(); - $readable->close(); - - $this->assertFalse($readable->isReadable()); - } - - /** @test */ - public function closeShouldEmitCloseEvent() - { - $readable = new ReadableStream(); - $readable->on('close', $this->expectCallableOnce()); - $readable->on('end', $this->expectCallableNever()); - - $readable->close(); - } - - /** @test */ - public function doubleCloseShouldWork() - { - $readable = new ReadableStream(); - $readable->close(); - $readable->close(); - - $this->assertFalse($readable->isReadable()); - } -} diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 0ffd93d..a98badf 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -2,7 +2,6 @@ namespace React\Tests\Stream; -use React\Stream\ReadableStream; use React\Stream\ThroughStream; /** @@ -122,7 +121,7 @@ public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause() /** @test */ public function pipingStuffIntoItShouldWork() { - $readable = new ReadableStream(); + $readable = new ThroughStream(); $through = new ThroughStream(); $through->on('data', $this->expectCallableOnceWith('foo')); diff --git a/tests/UtilTest.php b/tests/UtilTest.php index b87068b..3d113ab 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -3,10 +3,9 @@ namespace React\Tests\Stream; use React\Stream\WritableResourceStream; -use React\Stream\ReadableStream; use React\Stream\Util; -use React\Stream\WritableStream; use React\Stream\CompositeStream; +use React\Stream\ThroughStream; /** * @covers React\Stream\Util @@ -77,7 +76,7 @@ public function testPipeClosingDestPausesSource() ->expects($this->once()) ->method('pause'); - $writable = new WritableStream(); + $writable = new ThroughStream(); Util::pipe($readable, $writable); @@ -191,8 +190,8 @@ public function testPipeWithWritableResourceStream() public function testPipeSetsUpListeners() { - $source = new ReadableStream(); - $dest = new WritableStream(); + $source = new ThroughStream(); + $dest = new ThroughStream(); $this->assertCount(0, $source->listeners('data')); $this->assertCount(0, $source->listeners('end')); @@ -207,8 +206,8 @@ public function testPipeSetsUpListeners() public function testPipeClosingSourceRemovesListeners() { - $source = new ReadableStream(); - $dest = new WritableStream(); + $source = new ThroughStream(); + $dest = new ThroughStream(); Util::pipe($source, $dest); @@ -221,8 +220,8 @@ public function testPipeClosingSourceRemovesListeners() public function testPipeClosingDestRemovesListeners() { - $source = new ReadableStream(); - $dest = new WritableStream(); + $source = new ThroughStream(); + $dest = new ThroughStream(); Util::pipe($source, $dest); @@ -251,8 +250,8 @@ public function testPipeDuplexIntoSelfEndsOnEnd() /** @test */ public function forwardEventsShouldSetupForwards() { - $source = new ReadableStream(); - $target = new ReadableStream(); + $source = new ThroughStream(); + $target = new ThroughStream(); Util::forwardEvents($source, $target, array('data')); $target->on('data', $this->expectCallableOnce()); diff --git a/tests/WritableStreamTest.php b/tests/WritableStreamTest.php deleted file mode 100644 index b88a74c..0000000 --- a/tests/WritableStreamTest.php +++ /dev/null @@ -1,81 +0,0 @@ -pipe($through); - $readable->emit('data', array('foo')); - } - - /** @test */ - public function endShouldCloseTheStream() - { - $through = new WritableStream(); - $through->on('data', $this->expectCallableNever()); - $through->end(); - - $this->assertFalse($through->isWritable()); - } - - /** @test */ - public function endShouldWriteDataBeforeClosing() - { - $through = $this->getMockBuilder('React\Stream\WritableStream')->setMethods(array('write'))->getMock(); - $through - ->expects($this->once()) - ->method('write') - ->with('foo'); - $through->end('foo'); - - $this->assertFalse($through->isWritable()); - } - - /** @test */ - public function itShouldBeWritableByDefault() - { - $through = new WritableStream(); - $this->assertTrue($through->isWritable()); - } - - /** @test */ - public function closeShouldClose() - { - $through = new WritableStream(); - $through->close(); - - $this->assertFalse($through->isWritable()); - } - - /** @test */ - public function closeShouldEmitCloseEvent() - { - $through = new WritableStream(); - $through->on('close', $this->expectCallableOnce()); - $through->on('end', $this->expectCallableNever()); - - $through->close(); - } - - /** @test */ - public function doubleCloseShouldWork() - { - $through = new WritableStream(); - $through->close(); - $through->close(); - - $this->assertFalse($through->isWritable()); - } -}