Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mess with most of the low-level details.
* [Browser](#browser)
* [Methods](#methods)
* [Promises](#promises)
* [Cancellation](#cancellation)
* [Blocking](#blocking)
* [Streaming](#streaming)
* [submit()](#submit)
Expand Down Expand Up @@ -140,7 +141,22 @@ You may also want to look into the [streaming API](#streaming):
* If you're dealing with lots of concurrent requests (100+) or
* If you want to process individual data chunks as they happen (without having to wait for the full response body) or
* If you're expecting a big response body size (1 MiB or more, for example when downloading binary files) or
* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance).
* If you're unsure about the response body size (better be safe than sorry when accessing arbitrary remote HTTP endpoints and the response body size is unknown in advance).

#### Cancellation

The returned Promise is implemented in such a way that it can be cancelled
when it is still pending.
Cancelling a pending promise will reject its value with an Exception and
clean up any underlying resources.

```php
$promise = $browser->get($url);

$loop->addTimer(2.0, function () use ($promise) {
$promise->cancel();
});
```

#### Blocking

Expand Down
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"php": ">=5.4",
"react/event-loop": "^0.4 || ^0.3",
"react/http-client": "^0.5",
"react/promise": "^2 || ^1.1",
"react/promise": "^2.2.1",
"react/promise-stream": "^0.1.1",
"react/socket": "^0.8",
"react/socket-client": "^0.7 || ^0.6",
"react/stream": "^0.6 || ^0.5 || ^0.4.6",
Expand All @@ -28,7 +29,6 @@
"clue/block-react": "^1.0",
"clue/socks-react": "^0.8 || ^0.7",
"phpunit/phpunit": "^4.5",
"react/http": "^0.7.2",
"react/promise-stream": "^0.1.1"
"react/http": "^0.7.2"
}
}
8 changes: 6 additions & 2 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
$headers[$name] = implode(', ', $values);
}

$deferred = new Deferred();

$requestStream = $this->http->request($request->getMethod(), (string)$uri, $headers, $request->getProtocolVersion());

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is canceled
$reject(new \RuntimeException('Request canceled'));
$requestStream->close();
});

$requestStream->on('error', function($error) use ($deferred) {
$deferred->reject($error);
});
Expand Down
42 changes: 16 additions & 26 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

namespace Clue\React\Buzz\Io;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Exception;
use Clue\React\Buzz\Browser;
use React\HttpClient\Client as HttpClient;
use Clue\React\Buzz\Io\Sender;
use Clue\React\Buzz\Message\ResponseException;
use Clue\React\Buzz\Message\MessageFactory;
use Clue\React\Buzz\Message\BufferedResponse;
use React\Stream\BufferedSink;
use React\Stream\ReadableStreamInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Promise;
use React\Promise\Stream;
use React\Stream\ReadableStreamInterface;
use Exception;

/**
* @internal
Expand Down Expand Up @@ -71,9 +69,6 @@ protected function next(RequestInterface $request)
return $promise->then(
function (ResponseInterface $response) use ($request, $that) {
return $that->onResponse($response, $request);
},
function ($error) use ($request, $that) {
return $that->onError($error, $request);
}
);
}
Expand All @@ -94,9 +89,17 @@ public function bufferResponse(ResponseInterface $response)

// buffer stream and resolve with buffered body
$messageFactory = $this->messageFactory;
return BufferedSink::createPromise($stream)->then(function ($body) use ($response, $messageFactory) {
return $response->withBody($messageFactory->body($body));
});
return Stream\buffer($stream)->then(
function ($body) use ($response, $messageFactory) {
return $response->withBody($messageFactory->body($body));
},
function ($e) use ($stream) {
// try to close stream if buffering fails (or is cancelled)
$stream->close();

throw $e;
}
);
}

/**
Expand All @@ -123,19 +126,6 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques
return $response;
}

/**
* @internal
* @param Exception $error
* @param RequestInterface $request
* @throws Exception
*/
public function onError(Exception $error, RequestInterface $request)
{
$this->progress('error', array($error, $request));

throw $error;
}

private function onResponseRedirect(ResponseInterface $response, RequestInterface $request)
{
// resolve location relative to last request URI
Expand Down
12 changes: 12 additions & 0 deletions tests/FunctionalBrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ public function testSimpleRequest()
Block\await($this->browser->get($this->base . 'get'), $this->loop);
}

/**
* @expectedException RuntimeException
* @group online
*/
public function testCancelPromiseWillRejectRequest()
{
$promise = $this->browser->get($this->base . 'get');
$promise->cancel();

Block\await($promise, $this->loop);
}

/** @group online */
public function testRedirectRequestRelative()
{
Expand Down
39 changes: 39 additions & 0 deletions tests/Io/SenderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,45 @@ public function testSenderLegacyConnectorRejection()
Block\await($promise, $this->loop);
}

public function testCancelRequestWillCancelConnector()
{
$promise = new \React\Promise\Promise(function () { }, function () {
throw new \RuntimeException();
});

$connector = $this->getMock('React\Socket\ConnectorInterface');
$connector->expects($this->once())->method('connect')->willReturn($promise);

$sender = new Sender(new HttpClient($this->loop, $connector));

$request = new Request('GET', 'http://www.google.com/');

$promise = $sender->send($request, $this->getMock('Clue\React\Buzz\Message\MessageFactory'));
$promise->cancel();

$this->setExpectedException('RuntimeException');
Block\await($promise, $this->loop);
}

public function testCancelRequestWillCloseConnection()
{
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$connection->expects($this->once())->method('close');

$connector = $this->getMock('React\Socket\ConnectorInterface');
$connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($connection));

$sender = new Sender(new HttpClient($this->loop, $connector));

$request = new Request('GET', 'http://www.google.com/');

$promise = $sender->send($request, $this->getMock('Clue\React\Buzz\Message\MessageFactory'));
$promise->cancel();

$this->setExpectedException('RuntimeException');
Block\await($promise, $this->loop);
}

public function provideRequestProtocolVersion()
{
return array(
Expand Down
26 changes: 26 additions & 0 deletions tests/Io/TransactionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,32 @@ public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefau
$this->assertEquals('hello world', (string)$response->getBody());
}

/**
* @expectedException RuntimeException
*/
public function testCancelBufferingResponseWillCloseStreamAndReject()
{
$messageFactory = new MessageFactory();
$loop = Factory::create();

$stream = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
$stream->expects($this->any())->method('isReadable')->willReturn(true);
$stream->expects($this->once())->method('close');

$request = $this->getMock('Psr\Http\Message\RequestInterface');
$response = $messageFactory->response(1.0, 200, 'OK', array(), $stream);

// mock sender to resolve promise with the given $response in response to the given $request
$sender = $this->getMockBuilder('Clue\React\Buzz\Io\Sender')->disableOriginalConstructor()->getMock();
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));

$transaction = new Transaction($request, $sender, array(), $messageFactory);
$promise = $transaction->send();
$promise->cancel();

Block\await($promise, $loop);
}

public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled()
{
$messageFactory = new MessageFactory();
Expand Down