Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,21 @@ See also the [examples](examples).

Once a process is started, its I/O streams will be constructed as instances of
`React\Stream\ReadableStreamInterface` and `React\Stream\WritableStreamInterface`.
Before `start()` is called, these properties are `null`.Once a process terminates,
Before `start()` is called, these properties are not set. Once a process terminates,
the streams will become closed but not unset.

* `$stdin`
* `$stdout`
* `$stderr`
* `$stdin` or `$pipes[0]` is a `WritableStreamInterface`
* `$stdout` or `$pipes[1]` is a `ReadableStreamInterface`
* `$stderr` or `$pipes[2]` is a `ReadableStreamInterface`

Each of these implement the underlying
Following common Unix conventions, this library will always start each child
process with the three pipes matching the standard I/O streams as given above.
You can use the named references for common use cases or access these as an
array with all three pipes.

Because each of these implement the underlying
[`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface) or
[`WritableStreamInterface`](https://github.com/reactphp/stream#writablestreaminterface) and
[`WritableStreamInterface`](https://github.com/reactphp/stream#writablestreaminterface),
you can use any of their events and methods as usual:

```php
Expand Down Expand Up @@ -255,10 +260,10 @@ $process = new Process('sleep 10');
$process->start($loop);

$loop->addTimer(2.0, function () use ($process) {
$process->stdin->close();
$process->stdout->close();
$process->stderr->close();
$process->terminate(SIGKILL);
foreach ($process->pipes as $pipe) {
$pipe->close();
}
$process->terminate();
});
```

Expand Down
6 changes: 3 additions & 3 deletions examples/04-terminate.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

// forcefully terminate process after 2s
$loop->addTimer(2.0, function () use ($process) {
$process->stdin->close();
$process->stdout->close();
$process->stderr->close();
foreach ($process->pipes as $pipe) {
$pipe->close();
}
$process->terminate();
});

Expand Down
64 changes: 51 additions & 13 deletions src/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use React\Stream\ReadableResourceStream;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableResourceStream;
use React\Stream\WritableStreamInterface;

/**
* Process component.
Expand All @@ -17,15 +19,36 @@
*/
class Process extends EventEmitter
{
/**
* @var ?WritableStreamInterface
*/
public $stdin;

/**
* @var ?ReadableStreamInterface
*/
public $stdout;

/**
* @var ?ReadableStreamInterface
*/
public $stderr;

/**
* Array with all process pipes (once started)
* - 0: STDIN (`WritableStreamInterface`)
* - 1: STDOUT (`ReadableStreamInterface`)
* - 2: STDERR (`ReadableStreamInterface`)
*
* @var ReadableStreamInterface|WritableStreamInterface
*/
public $pipes = array();

private $cmd;
private $cwd;
private $env;
private $enhanceSigchildCompatibility;
private $pipes;
private $sigchildPipe;

private $process;
private $status;
Expand Down Expand Up @@ -90,13 +113,15 @@ public function start(LoopInterface $loop, $interval = 0.1)
array('pipe', 'w'), // stderr
);

$sigchild = null;
// Read exit code through fourth pipe to work around --enable-sigchild
if ($this->enhanceSigchildCompatibility) {
$fdSpec[] = array('pipe', 'w');
$cmd = sprintf('(%s) 3>/dev/null; code=$?; echo $code >&3; exit $code', $cmd);
$sigchild = 3;
$cmd = sprintf('(%s) ' . $sigchild . '>/dev/null; code=$?; echo $code >&' . $sigchild . '; exit $code', $cmd);
}

$this->process = proc_open($cmd, $fdSpec, $this->pipes, $this->cwd, $this->env);
$this->process = proc_open($cmd, $fdSpec, $pipes, $this->cwd, $this->env);

if (!is_resource($this->process)) {
throw new \RuntimeException('Unable to launch a new process.');
Expand Down Expand Up @@ -129,11 +154,24 @@ public function start(LoopInterface $loop, $interval = 0.1)
});
};

$this->stdin = new WritableResourceStream($this->pipes[0], $loop);
$this->stdout = new ReadableResourceStream($this->pipes[1], $loop);
$this->stdout->on('close', $streamCloseHandler);
$this->stderr = new ReadableResourceStream($this->pipes[2], $loop);
$this->stderr->on('close', $streamCloseHandler);
if ($sigchild !== null) {
$this->sigchildPipe = $pipes[$sigchild];
unset($pipes[$sigchild]);
}

foreach ($pipes as $n => $fd) {
if ($n === 0) {
$stream = new WritableResourceStream($fd, $loop);
} else {
$stream = new ReadableResourceStream($fd, $loop);
$stream->on('close', $streamCloseHandler);
}
$this->pipes[$n] = $stream;
}

$this->stdin = $this->pipes[0];
$this->stdout = $this->pipes[1];
$this->stderr = $this->pipes[2];
}

/**
Expand Down Expand Up @@ -337,11 +375,11 @@ public final static function setSigchildEnabled($sigchild)
*/
private function pollExitCodePipe()
{
if ( ! isset($this->pipes[3])) {
if ($this->sigchildPipe === null) {
return;
}

$r = array($this->pipes[3]);
$r = array($this->sigchildPipe);
$w = $e = null;

$n = @stream_select($r, $w, $e, 0);
Expand All @@ -364,12 +402,12 @@ private function pollExitCodePipe()
*/
private function closeExitCodePipe()
{
if ( ! isset($this->pipes[3])) {
if ($this->sigchildPipe === null) {
return;
}

fclose($this->pipes[3]);
unset($this->pipes[3]);
fclose($this->sigchildPipe);
$this->sigchildPipe = null;
}

/**
Expand Down
24 changes: 24 additions & 0 deletions tests/AbstractProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,30 @@ public function testGetCommand()
$this->assertSame('echo foo', $process->getCommand());
}

public function testPipesWillBeUnsetBeforeStarting()
{
$process = new Process('echo foo');

$this->assertNull($process->stdin);
$this->assertNull($process->stdout);
$this->assertNull($process->stderr);
$this->assertEquals(array(), $process->pipes);
}

public function testStartWillAssignPipes()
{
$process = new Process('echo foo');
$process->start($this->createLoop());

$this->assertInstanceOf('React\Stream\WritableStreamInterface', $process->stdin);
$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $process->stdout);
$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $process->stderr);
$this->assertCount(3, $process->pipes);
$this->assertSame($process->stdin, $process->pipes[0]);
$this->assertSame($process->stdout, $process->pipes[1]);
$this->assertSame($process->stderr, $process->pipes[2]);
}

public function testIsRunning()
{
$process = new Process('sleep 1');
Expand Down