Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ $streamingBrowser->get($url)->then(function (ResponseInterface $response) {
});
```

See also the [stream bandwidth example](examples/91-stream-bandwidth.php) and
See also the [stream download example](examples/91-benchmark-download.php) and
the [stream forwarding example](examples/21-stream-forwarding.php).

You can invoke the following methods on the message body:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
<?php

// a) simple download benchmark against public HTTP endpoint:
// $ php examples/91-benchmark-download.php http://httpbin.org/get

// b) local 10 GB download benchmark against localhost address to avoid network overhead
//
// b1) first run example HTTP server, e.g. from react/http:
// $ cd workspace/reactphp-http
// $ php examples/99-benchmark-download.php 8080
//
// b2) run HTTP client receiving a 10 GB download:
// $ php examples/92-benchmark-download.php http://localhost:8080/ 10000

use Clue\React\Buzz\Browser;
use Psr\Http\Message\ResponseInterface;
use React\Stream\ReadableStreamInterface;
use RingCentral\Psr7;

$url = isset($argv[1]) ? $argv[1] : 'http://google.com/';

require __DIR__ . '/../vendor/autoload.php';

if (extension_loaded('xdebug')) {
echo 'NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL;
}

$loop = React\EventLoop\Factory::create();
$client = new Browser($loop);

echo 'Requesting ' . $url . '…' . PHP_EOL;

$client->withOptions(array('streaming' => true))->get($url)->then(function (ResponseInterface $response) use ($loop) {
echo 'Headers received' . PHP_EOL;
echo Psr7\str($response);
echo RingCentral\Psr7\str($response);

$stream = $response->getBody();
if (!$stream instanceof ReadableStreamInterface) {
Expand All @@ -41,7 +56,7 @@

$time = microtime(true) - $time;

echo "\r" . 'Downloaded ' . $bytes . ' bytes in ' . round($time, 3) . 's => ' . round($bytes / $time / 1024 / 1024, 1) . ' MiB/s' . PHP_EOL;
echo "\r" . 'Downloaded ' . $bytes . ' bytes in ' . round($time, 3) . 's => ' . round($bytes / $time / 1000000, 1) . ' MB/s' . PHP_EOL;
});
}, 'printf');

Expand Down
125 changes: 125 additions & 0 deletions examples/92-benchmark-upload.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<?php

// a) simple 1 MB upload benchmark against public HTTP endpoint
// $ php examples/92-benchmark-upload.php http://httpbin.org/post 1
//
// b) local 10 GB upload benchmark against localhost address to avoid network overhead
//
// b1) first run example HTTP server, e.g. from react/http
// $ cd workspace/reactphp-http
// $ php examples/13-stream-request.php 8080
//
// b2) run HTTP client sending a 10 GB upload
// $ php examples/92-benchmark-upload.php http://localhost:8080/ 10000

use Clue\React\Buzz\Browser;
use Evenement\EventEmitter;
use Psr\Http\Message\ResponseInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

require __DIR__ . '/../vendor/autoload.php';

if (extension_loaded('xdebug')) {
echo 'NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL;
}

/** A readable stream that can emit a lot of data */
class ChunkRepeater extends EventEmitter implements ReadableStreamInterface
{
private $chunk;
private $count;
private $position = 0;
private $paused = true;
private $closed = false;

public function __construct($chunk, $count)
{
$this->chunk = $chunk;
$this->count = $count;
}

public function pause()
{
$this->paused = true;
}

public function resume()
{
if (!$this->paused || $this->closed) {
return;
}

// keep emitting until stream is paused
$this->paused = false;
while ($this->position < $this->count && !$this->paused) {
++$this->position;
$this->emit('data', array($this->chunk));
}

// end once the last chunk has been written
if ($this->position >= $this->count) {
$this->emit('end');
$this->close();
}
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
return Util::pipe($this, $dest, $options);
}

public function isReadable()
{
return !$this->closed;
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->count = 0;
$this->paused = true;
$this->emit('close');
}

public function getPosition()
{
return $this->position * strlen($this->chunk);
}
}

$loop = React\EventLoop\Factory::create();
$client = new Browser($loop);

$url = isset($argv[1]) ? $argv[1] : 'http://httpbin.org/post';
$n = isset($argv[2]) ? $argv[2] : 10;
$source = new ChunkRepeater(str_repeat('x', 1000000), $n);
$loop->futureTick(function () use ($source) {
$source->resume();
});

echo 'POSTing ' . $n . ' MB to ' . $url . PHP_EOL;

$start = microtime(true);
$report = $loop->addPeriodicTimer(0.05, function () use ($source, $start) {
printf("\r%d bytes in %0.3fs...", $source->getPosition(), microtime(true) - $start);
});

$client->post($url, array('Content-Length' => $n * 1000000), $source)->then(function (ResponseInterface $response) use ($source, $report, $loop, $start) {
$now = microtime(true);
$loop->cancelTimer($report);

printf("\r%d bytes in %0.3fs => %.1f MB/s\n", $source->getPosition(), $now - $start, $source->getPosition() / ($now - $start) / 1000000);

echo rtrim(preg_replace('/x{5,}/','x…', (string) $response->getBody()), PHP_EOL) . PHP_EOL;
}, function ($e) use ($loop, $report) {
$loop->cancelTimer($report);
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$loop->run();