Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -753,21 +753,22 @@ take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.

The `$bufferSize` property controls the maximum buffer size in bytes to read
at once from the stream.
This class takes an optional `int|null $readChunkSize` parameter that controls
the maximum buffer size in bytes to read at once from the stream.
You can use a `null` value here in order to apply its default value.
This value SHOULD NOT be changed unless you know what you're doing.
This can be a positive number which means that up to X bytes will be read
at once from the underlying stream resource. Note that the actual number
of bytes read may be lower if the stream resource has less than X bytes
currently available.
This can be `null` which means "read everything available" from the
This can be `-1` which means "read everything available" from the
underlying stream resource.
This should read until the stream resource is not readable anymore
(i.e. underlying buffer drained), note that this does not neccessarily
mean it reached EOF.

```php
$stream->bufferSize = 8192;
$stream = new ReadableResourceStream(STDIN, $loop, 8192);
```

### WritableResourceStream
Expand Down Expand Up @@ -819,11 +820,14 @@ ready to accept data.
For this, it uses an in-memory buffer string to collect all outstanding writes.
This buffer has a soft-limit applied which defines how much data it is willing
to accept before the caller SHOULD stop sending further data.
It currently defaults to 64 KiB and can be controlled through the public
`$softLimit` property like this:

This class takes an optional `int|null $writeBufferSoftLimit` parameter that controls
this maximum buffer size in bytes.
You can use a `null` value here in order to apply its default value.
This value SHOULD NOT be changed unless you know what you're doing.

```php
$stream->softLimit = 8192;
$stream = new WritableResourceStream(STDOUT, $loop, 8192);
```

See also [`write()`](#write) for more details.
Expand Down Expand Up @@ -873,21 +877,23 @@ take care of the underlying stream resource.
You SHOULD only use its public API and SHOULD NOT interfere with the underlying
stream resource manually.

The `$bufferSize` property controls the maximum buffer size in bytes to read
at once from the stream.
This class takes an optional `int|null $readChunkSize` parameter that controls
the maximum buffer size in bytes to read at once from the stream.
You can use a `null` value here in order to apply its default value.
This value SHOULD NOT be changed unless you know what you're doing.
This can be a positive number which means that up to X bytes will be read
at once from the underlying stream resource. Note that the actual number
of bytes read may be lower if the stream resource has less than X bytes
currently available.
This can be `null` which means "read everything available" from the
This can be `-1` which means "read everything available" from the
underlying stream resource.
This should read until the stream resource is not readable anymore
(i.e. underlying buffer drained), note that this does not neccessarily
mean it reached EOF.

```php
$stream->bufferSize = 8192;
$conn = stream_socket_client('tcp://google.com:80');
$stream = new DuplexResourceStream($conn, $loop, 8192);
```

Any `write()` calls to this class will not be performaned instantly, but will
Expand All @@ -896,15 +902,22 @@ ready to accept data.
For this, it uses an in-memory buffer string to collect all outstanding writes.
This buffer has a soft-limit applied which defines how much data it is willing
to accept before the caller SHOULD stop sending further data.
It currently defaults to 64 KiB and can be controlled through the public
`$softLimit` property like this:

This class takes another optional `WritableStreamInterface|null $buffer` parameter
that controls this write behavior of this stream.
You can use a `null` value here in order to apply its default value.
This value SHOULD NOT be changed unless you know what you're doing.

If you want to change the write buffer soft limit, you can pass an instance of
[`WritableResourceStream`](#writableresourcestream) like this:

```php
$buffer = $stream->getBuffer();
$buffer->softLimit = 8192;
$conn = stream_socket_client('tcp://google.com:80');
$buffer = new WritableResourceStream($conn, $loop, 8192);
$stream = new DuplexResourceStream($conn, $loop, null, $buffer);
```

See also [`write()`](#write) for more details.
See also [`WritableResourceStream`](#writableresourcestream) for more details.

### ThroughStream

Expand Down
19 changes: 6 additions & 13 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
* of bytes read may be lower if the stream resource has less than X bytes
* currently available.
*
* This can be `null` which means read everything available from the
* This can be `-1` which means read everything available from the
* underlying stream resource.
* This should read until the stream resource is not readable anymore
* (i.e. underlying buffer drained), note that this does not neccessarily
* mean it reached EOF.
*
* @var int|null
* @var int
*/
public $bufferSize = 65536;
private $bufferSize;

private $stream;
protected $readable = true;
Expand All @@ -33,7 +33,7 @@ class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
protected $loop;
protected $buffer;

public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
public function __construct($stream, LoopInterface $loop, $readChunkSize = null, WritableStreamInterface $buffer = null)
{
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
Expand Down Expand Up @@ -69,6 +69,7 @@ public function __construct($stream, LoopInterface $loop, WritableStreamInterfac

$this->stream = $stream;
$this->loop = $loop;
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
$this->buffer = $buffer;

$that = $this;
Expand Down Expand Up @@ -169,7 +170,7 @@ public function handleData($stream)
);
});

$data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize);
$data = stream_get_contents($stream, $this->bufferSize);

restore_error_handler();

Expand All @@ -195,14 +196,6 @@ public function handleClose()
}
}

/**
* @return WritableStreamInterface
*/
public function getBuffer()
{
return $this->buffer;
}

/**
* Returns whether this is a pipe resource in a legacy environment
*
Expand Down
11 changes: 6 additions & 5 deletions src/ReadableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ class ReadableResourceStream extends EventEmitter implements ReadableStreamInter
* of bytes read may be lower if the stream resource has less than X bytes
* currently available.
*
* This can be `null` which means read everything available from the
* This can be `-1` which means read everything available from the
* underlying stream resource.
* This should read until the stream resource is not readable anymore
* (i.e. underlying buffer drained), note that this does not neccessarily
* mean it reached EOF.
*
* @var int|null
* @var int
*/
public $bufferSize = 65536;
private $bufferSize;

/**
* @var resource
Expand All @@ -36,7 +36,7 @@ class ReadableResourceStream extends EventEmitter implements ReadableStreamInter
private $closed = false;
private $loop;

public function __construct($stream, LoopInterface $loop)
public function __construct($stream, LoopInterface $loop, $readChunkSize = null)
{
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
Expand Down Expand Up @@ -68,6 +68,7 @@ public function __construct($stream, LoopInterface $loop)

$this->stream = $stream;
$this->loop = $loop;
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;

$this->resume();
}
Expand Down Expand Up @@ -123,7 +124,7 @@ public function handleData()
);
});

$data = stream_get_contents($this->stream, $this->bufferSize === null ? -1 : $this->bufferSize);
$data = stream_get_contents($this->stream, $this->bufferSize);

restore_error_handler();

Expand Down
7 changes: 4 additions & 3 deletions src/WritableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
class WritableResourceStream extends EventEmitter implements WritableStreamInterface
{
private $stream;
public $softLimit = 65536;
private $loop;
private $softLimit;

private $listening = false;
private $writable = true;
private $closed = false;
private $loop;
private $data = '';

public function __construct($stream, LoopInterface $loop)
public function __construct($stream, LoopInterface $loop, $writeBufferSoftLimit = null)
{
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
throw new \InvalidArgumentException('First parameter must be a valid stream resource');
Expand All @@ -35,6 +35,7 @@ public function __construct($stream, LoopInterface $loop)

$this->stream = $stream;
$this->loop = $loop;
$this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit;
}

public function isWritable()
Expand Down
9 changes: 3 additions & 6 deletions tests/DuplexResourceStreamIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ public function testBufferReadsLargeChunks($condition, $loopFactory)

list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);

$streamA = new DuplexResourceStream($sockA, $loop);
$streamB = new DuplexResourceStream($sockB, $loop);

$bufferSize = 4096;
$streamA->bufferSize = $bufferSize;
$streamB->bufferSize = $bufferSize;
$streamA = new DuplexResourceStream($sockA, $loop, $bufferSize);
$streamB = new DuplexResourceStream($sockB, $loop, $bufferSize);

$testString = str_repeat("*", $streamA->bufferSize + 1);
$testString = str_repeat("*", $bufferSize + 1);

$buffer = "";
$streamB->on('data', function ($data) use (&$buffer) {
Expand Down
18 changes: 8 additions & 10 deletions tests/DuplexResourceStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use React\Stream\DuplexResourceStream;
use Clue\StreamFilter as Filter;
use React\Stream\WritableResourceStream;

class DuplexResourceStreamTest extends TestCase
{
Expand Down Expand Up @@ -70,9 +71,7 @@ public function testConstructorAcceptsBuffer()

$buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();

$conn = new DuplexResourceStream($stream, $loop, $buffer);

$this->assertSame($buffer, $conn->getBuffer());
$conn = new DuplexResourceStream($stream, $loop, null, $buffer);
}

public function testCloseShouldEmitCloseEvent()
Expand All @@ -97,7 +96,7 @@ public function testEndShouldEndBuffer()
$buffer = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
$buffer->expects($this->once())->method('end')->with('foo');

$conn = new DuplexResourceStream($stream, $loop, $buffer);
$conn = new DuplexResourceStream($stream, $loop, null, $buffer);
$conn->end('foo');
}

Expand Down Expand Up @@ -149,7 +148,7 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize()

$capturedData = null;

$conn = new DuplexResourceStream($stream, $loop);
$conn = new DuplexResourceStream($stream, $loop, 4321);
$conn->on('data', function ($data) use (&$capturedData) {
$capturedData = $data;
});
Expand All @@ -160,7 +159,7 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize()
$conn->handleData($stream);

$this->assertTrue($conn->isReadable());
$this->assertEquals($conn->bufferSize, strlen($capturedData));
$this->assertEquals(4321, strlen($capturedData));
}

/**
Expand All @@ -174,8 +173,7 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi

$capturedData = null;

$conn = new DuplexResourceStream($stream, $loop);
$conn->bufferSize = null;
$conn = new DuplexResourceStream($stream, $loop, -1);

$conn->on('data', function ($data) use (&$capturedData) {
$capturedData = $data;
Expand Down Expand Up @@ -285,12 +283,12 @@ public function testBufferEventsShouldBubbleUp()
$stream = fopen('php://temp', 'r+');
$loop = $this->createLoopMock();

$conn = new DuplexResourceStream($stream, $loop);
$buffer = new WritableResourceStream($stream, $loop);
$conn = new DuplexResourceStream($stream, $loop, null, $buffer);

$conn->on('drain', $this->expectCallableOnce());
$conn->on('error', $this->expectCallableOnce());

$buffer = $conn->getBuffer();
$buffer->emit('drain');
$buffer->emit('error', array(new \RuntimeException('Whoops')));
}
Expand Down
7 changes: 3 additions & 4 deletions tests/ReadableResourceStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize()

$capturedData = null;

$conn = new ReadableResourceStream($stream, $loop);
$conn = new ReadableResourceStream($stream, $loop, 4321);
$conn->on('data', function ($data) use (&$capturedData) {
$capturedData = $data;
});
Expand All @@ -131,7 +131,7 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize()
$conn->handleData($stream);

$this->assertTrue($conn->isReadable());
$this->assertEquals($conn->bufferSize, strlen($capturedData));
$this->assertEquals(4321, strlen($capturedData));
}

/**
Expand All @@ -145,8 +145,7 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi

$capturedData = null;

$conn = new ReadableResourceStream($stream, $loop);
$conn->bufferSize = null;
$conn = new ReadableResourceStream($stream, $loop, -1);

$conn->on('data', function ($data) use (&$capturedData) {
$capturedData = $data;
Expand Down
Loading