Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"react/http-client": "^0.5.8",
"react/promise": "^2.2.1 || ^1.2.1",
"react/promise-stream": "^1.0 || ^0.1.1",
"react/socket": "^1.0 || ^0.8.4",
"react/socket": "^1.1",
"react/stream": "^1.0 || ^0.7",
"ringcentral/psr7": "^1.2"
},
Expand Down
32 changes: 32 additions & 0 deletions examples/03-any.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

// concurrently request a number of URIs.
// return immediately once the first is completed, cancel all others.

use Clue\React\Buzz\Browser;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$loop = React\EventLoop\Factory::create();
$client = new Browser($loop);

$promises = array(
$client->head('http://www.github.com/clue/http-react'),
$client->get('https://httpbin.org/'),
$client->get('https://google.com'),
$client->get('http://www.lueck.tv/psocksd'),
$client->get('http://www.httpbin.org/absolute-redirect/5')
);

React\Promise\any($promises)->then(function (ResponseInterface $response) use ($promises) {
// first response arrived => cancel all other pending requests
foreach ($promises as $promise) {
$promise->cancel();
}

var_dump($response->getHeaders());
echo PHP_EOL . $response->getBody();
});

$loop->run();
4 changes: 2 additions & 2 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public function send(RequestInterface $request, MessageFactory $messageFactory)
$requestStream = $this->http->request($request->getMethod(), (string)$uri, $headers, $request->getProtocolVersion());

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is canceled
$reject(new \RuntimeException('Request canceled'));
// close request stream if request is cancelled
$reject(new \RuntimeException('Request cancelled'));
$requestStream->close();
});

Expand Down
48 changes: 33 additions & 15 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\UriInterface;
use React\Promise;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Promise\Stream;
use React\Stream\ReadableStreamInterface;
use Exception;

/**
* @internal
Expand Down Expand Up @@ -50,10 +48,22 @@ public function __construct(RequestInterface $request, Sender $sender, array $op

public function send()
{
return $this->next($this->request);
$deferred = new Deferred(function () use (&$deferred) {
if (isset($deferred->pending)) {
$deferred->pending->cancel();
unset($deferred->pending);
}
});

$this->next($this->request, $deferred)->then(
array($deferred, 'resolve'),
array($deferred, 'reject')
);

return $deferred->promise();
}

private function next(RequestInterface $request)
private function next(RequestInterface $request, Deferred $deferred)
{
$this->progress('request', array($request));

Expand All @@ -63,12 +73,16 @@ private function next(RequestInterface $request)
$promise = $this->sender->send($request, $this->messageFactory);

if (!$this->streaming) {
$promise = $promise->then(array($that, 'bufferResponse'));
$promise = $promise->then(function ($response) use ($deferred, $that) {
return $that->bufferResponse($response, $deferred);
});
}

$deferred->pending = $promise;

return $promise->then(
function (ResponseInterface $response) use ($request, $that) {
return $that->onResponse($response, $request);
function (ResponseInterface $response) use ($request, $that, $deferred) {
return $that->onResponse($response, $request, $deferred);
}
);
}
Expand All @@ -78,18 +92,18 @@ function (ResponseInterface $response) use ($request, $that) {
* @param ResponseInterface $response
* @return PromiseInterface Promise<ResponseInterface, Exception>
*/
public function bufferResponse(ResponseInterface $response)
public function bufferResponse(ResponseInterface $response, $deferred)
{
$stream = $response->getBody();

// body is not streaming => already buffered
if (!$stream instanceof ReadableStreamInterface) {
return Promise\resolve($response);
return \React\Promise\resolve($response);
}

// buffer stream and resolve with buffered body
$messageFactory = $this->messageFactory;
return Stream\buffer($stream)->then(
$promise = \React\Promise\Stream\buffer($stream)->then(
function ($body) use ($response, $messageFactory) {
return $response->withBody($messageFactory->body($body));
},
Expand All @@ -100,6 +114,10 @@ function ($e) use ($stream) {
throw $e;
}
);

$deferred->pending = $promise;

return $promise;
}

/**
Expand All @@ -109,12 +127,12 @@ function ($e) use ($stream) {
* @throws ResponseException
* @return ResponseInterface|PromiseInterface
*/
public function onResponse(ResponseInterface $response, RequestInterface $request)
public function onResponse(ResponseInterface $response, RequestInterface $request, $deferred)
{
$this->progress('response', array($response, $request));

if ($this->followRedirects && ($response->getStatusCode() >= 300 && $response->getStatusCode() < 400)) {
return $this->onResponseRedirect($response, $request);
return $this->onResponseRedirect($response, $request, $deferred);
}

// only status codes 200-399 are considered to be valid, reject otherwise
Expand All @@ -132,7 +150,7 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques
* @return PromiseInterface
* @throws \RuntimeException
*/
private function onResponseRedirect(ResponseInterface $response, RequestInterface $request)
private function onResponseRedirect(ResponseInterface $response, RequestInterface $request, $deferred)
{
// resolve location relative to last request URI
$location = $this->messageFactory->uriRelative($request->getUri(), $response->getHeaderLine('Location'));
Expand All @@ -144,7 +162,7 @@ private function onResponseRedirect(ResponseInterface $response, RequestInterfac
throw new \RuntimeException('Maximum number of redirects (' . $this->maxRedirects . ') exceeded');
}

return $this->next($request);
return $this->next($request, $deferred);
}

/**
Expand Down
28 changes: 28 additions & 0 deletions tests/BrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,32 @@ public function testWithBaseUriNotAbsoluteFails()
{
$this->browser->withBase('hello');
}

public function testCancelGetRequestShouldCancelUnderlyingSocketConnection()
{
$pending = new Promise(function () { }, $this->expectCallableOnce());

$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn($pending);

$this->browser = new Browser($this->loop, $connector);

$promise = $this->browser->get('http://example.com/');
$promise->cancel();
}

protected function expectCallableOnce()
{
$mock = $this->createCallableMock();
$mock
->expects($this->once())
->method('__invoke');

return $mock;
}

protected function createCallableMock()
{
return $this->getMockBuilder('stdClass')->setMethods(array('__invoke'))->getMock();
}
}
16 changes: 16 additions & 0 deletions tests/FunctionalBrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ public function testRedirectFromPageWithInvalidAuthToPageWithCorrectAuthenticati
Block\await($this->browser->get($base . 'redirect-to?url=' . urlencode($target)), $this->loop);
}

/**
* @expectedException RuntimeException
* @expectedExceptionMessage Request cancelled
* @group online
*/
public function testCancelRedirectedRequestShouldReject()
{
$promise = $this->browser->get($this->base . 'redirect-to?url=delay%2F10');

$this->loop->addTimer(0.1, function () use ($promise) {
$promise->cancel();
});

Block\await($promise, $this->loop);
}

/**
* @group online
* @doesNotPerformAssertions
Expand Down
Loading