Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/91-benchmark-count.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

// 1) download a large CSV/TSV dataset, for example:
// @link https://datasets.imdbws.com/
// @link https://github.com/fivethirtyeight/russian-troll-tweets
//
// 2) convert CSV/TSV to NDJSON, for example:
// @link https://github.com/clue/reactphp-csv/blob/v1.0.0/examples/11-csv2ndjson.php
//
// 3) pipe NDJSON into benchmark script:
// $ examples/91-benchmark-count.php < title.ratings.ndjson

use Clue\React\NDJson\Decoder;
use React\EventLoop\Factory;
use React\Stream\ReadableResourceStream;

require __DIR__ . '/../vendor/autoload.php';

if (extension_loaded('xdebug')) {
echo 'NOTICE: The "xdebug" extension is loaded, this has a major impact on performance.' . PHP_EOL;
}

$loop = Factory::create();
$decoder = new Decoder(new ReadableResourceStream(STDIN, $loop), true);

$count = 0;
$decoder->on('data', function () use (&$count) {
++$count;
});

$start = microtime(true);
$report = $loop->addPeriodicTimer(0.05, function () use (&$count, $start) {
printf("\r%d records in %0.3fs...", $count, microtime(true) - $start);
});

$decoder->on('close', function () use (&$count, $report, $loop, $start) {
$now = microtime(true);
$loop->cancelTimer($report);

printf("\r%d records in %0.3fs => %d records/s\n", $count, $now - $start, $count / ($now - $start));
});

$loop->run();
28 changes: 14 additions & 14 deletions src/Decoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* The Decoder / Parser reads from a plain stream and emits data objects for each JSON element
Expand Down Expand Up @@ -33,11 +33,11 @@ class Decoder extends EventEmitter implements ReadableStreamInterface
public function __construct(ReadableStreamInterface $input, $assoc = false, $depth = 512, $options = 0, $maxlength = 65536)
{
// @codeCoverageIgnoreStart
if ($options !== 0 && PHP_VERSION < 5.4) {
if ($options !== 0 && \PHP_VERSION < 5.4) {
throw new \BadMethodCallException('Options parameter is only supported on PHP 5.4+');
}
if (defined('JSON_THROW_ON_ERROR')) {
$options = $options & ~JSON_THROW_ON_ERROR;
if (\defined('JSON_THROW_ON_ERROR')) {
$options = $options & ~\JSON_THROW_ON_ERROR;
}
// @codeCoverageIgnoreEnd

Expand Down Expand Up @@ -102,30 +102,30 @@ public function handleData($data)
$this->buffer .= $data;

// keep parsing while a newline has been found
while (($newline = strpos($this->buffer, "\n")) !== false && $newline <= $this->maxlength) {
while (($newline = \strpos($this->buffer, "\n")) !== false && $newline <= $this->maxlength) {
// read data up until newline and remove from buffer
$data = (string)substr($this->buffer, 0, $newline);
$this->buffer = (string)substr($this->buffer, $newline + 1);
$data = (string)\substr($this->buffer, 0, $newline);
$this->buffer = (string)\substr($this->buffer, $newline + 1);

// decode data with options given in ctor
if ($this->options === 0) {
$data = json_decode($data, $this->assoc, $this->depth);
$data = \json_decode($data, $this->assoc, $this->depth);
} else {
$data = json_decode($data, $this->assoc, $this->depth, $this->options);
$data = \json_decode($data, $this->assoc, $this->depth, $this->options);
}

// abort stream if decoding failed
if ($data === null && json_last_error() !== JSON_ERROR_NONE) {
if ($data === null && \json_last_error() !== \JSON_ERROR_NONE) {
// @codeCoverageIgnoreStart
if (PHP_VERSION_ID > 50500) {
$errstr = json_last_error_msg();
} elseif (json_last_error() === JSON_ERROR_SYNTAX) {
if (\PHP_VERSION_ID > 50500) {
$errstr = \json_last_error_msg();
} elseif (\json_last_error() === \JSON_ERROR_SYNTAX) {
$errstr = 'Syntax error';
} else {
$errstr = 'Unknown error';
}
// @codeCoverageIgnoreEnd
return $this->handleError(new \RuntimeException('Unable to decode JSON: ' . $errstr, json_last_error()));
return $this->handleError(new \RuntimeException('Unable to decode JSON: ' . $errstr, \json_last_error()));
}

$this->emit('data', array($data));
Expand Down
30 changes: 15 additions & 15 deletions src/Encoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ class Encoder extends EventEmitter implements WritableStreamInterface
public function __construct(WritableStreamInterface $output, $options = 0, $depth = 512)
{
// @codeCoverageIgnoreStart
if (defined('JSON_PRETTY_PRINT') && $options & JSON_PRETTY_PRINT) {
if (\defined('JSON_PRETTY_PRINT') && $options & \JSON_PRETTY_PRINT) {
throw new \InvalidArgumentException('Pretty printing not available for NDJSON');
}
if ($depth !== 512 && PHP_VERSION < 5.5) {
if ($depth !== 512 && \PHP_VERSION < 5.5) {
throw new \BadMethodCallException('Depth parameter is only supported on PHP 5.5+');
}
if (defined('JSON_THROW_ON_ERROR')) {
$options = $options & ~JSON_THROW_ON_ERROR;
if (\defined('JSON_THROW_ON_ERROR')) {
$options = $options & ~\JSON_THROW_ON_ERROR;
}
// @codeCoverageIgnoreEnd

Expand Down Expand Up @@ -61,39 +61,39 @@ public function write($data)
// we have to handle PHP warnings for legacy PHP < 5.5
// certain values (such as INF etc.) emit a warning, but still encode successfully
// @codeCoverageIgnoreStart
if (PHP_VERSION_ID < 50500) {
if (\PHP_VERSION_ID < 50500) {
$errstr = null;
set_error_handler(function ($_, $error) use (&$errstr) {
\set_error_handler(function ($_, $error) use (&$errstr) {
$errstr = $error;
});

// encode data with options given in ctor (depth not supported)
$data = json_encode($data, $this->options);
$data = \json_encode($data, $this->options);

// always check error code and match missing error messages
restore_error_handler();
$errno = json_last_error();
if (defined('JSON_ERROR_UTF8') && $errno === JSON_ERROR_UTF8) {
\restore_error_handler();
$errno = \json_last_error();
if (\defined('JSON_ERROR_UTF8') && $errno === \JSON_ERROR_UTF8) {
// const JSON_ERROR_UTF8 added in PHP 5.3.3, but no error message assigned in legacy PHP < 5.5
// this overrides PHP 5.3.14 only: https://3v4l.org/IGP8Z#v5314
$errstr = 'Malformed UTF-8 characters, possibly incorrectly encoded';
} elseif ($errno !== JSON_ERROR_NONE && $errstr === null) {
} elseif ($errno !== \JSON_ERROR_NONE && $errstr === null) {
// error number present, but no error message applicable
$errstr = 'Unknown error';
}

// abort stream if encoding fails
if ($errno !== JSON_ERROR_NONE || $errstr !== null) {
if ($errno !== \JSON_ERROR_NONE || $errstr !== null) {
$this->handleError(new \RuntimeException('Unable to encode JSON: ' . $errstr, $errno));
return false;
}
} else {
// encode data with options given in ctor
$data = json_encode($data, $this->options, $this->depth);
$data = \json_encode($data, $this->options, $this->depth);

// abort stream if encoding fails
if ($data === false && json_last_error() !== JSON_ERROR_NONE) {
$this->handleError(new \RuntimeException('Unable to encode JSON: ' . json_last_error_msg(), json_last_error()));
if ($data === false && \json_last_error() !== \JSON_ERROR_NONE) {
$this->handleError(new \RuntimeException('Unable to encode JSON: ' . \json_last_error_msg(), \json_last_error()));
return false;
}
}
Expand Down