Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ Please refer to [clue/reactphp-block](https://github.com/clue/reactphp-block#rea

#### Command streaming

The following API endpoint resolves with a buffered string of the command output
The following API endpoints resolve with a buffered string of the command output
(STDOUT and/or STDERR):

```php
$client->containerLogs($container);
$client->execStart($exec);
```

Expand All @@ -181,10 +182,11 @@ for bigger command outputs, it's usually a better idea to use a streaming
approach.

This works for (any number of) commands of arbitrary sizes.
The following API endpoint complements the default Promise-based API and returns
The following API endpoints complement the default Promise-based API and return
a [`Stream`](https://github.com/reactphp/stream) instance instead:

```php
$stream = $client->containerLogsStream($container);
$stream = $client->execStartStream($exec);
```

Expand Down
34 changes: 34 additions & 0 deletions examples/logs-stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

// this example shows how the containerLogsStream() call can be used to get the logs of the given container.
// demonstrates the streaming logs API, which can be used to dump the logs as they arrive

use Clue\CaretNotation\Encoder;
use Clue\React\Docker\Client;

require __DIR__ . '/../vendor/autoload.php';

$container = isset($argv[1]) ? $argv[1] : 'asd';
echo 'Dumping logs (last 100 lines) of container "' . $container . '" (pass as argument to this example)' . PHP_EOL;

$loop = React\EventLoop\Factory::create();
$client = new Client($loop);

// use caret notation for any control characters except \t, \r and \n
$caret = new Encoder("\t\r\n");

$stream = $client->containerLogsStream($container, true, true, true, 0, false, 100);
$stream->on('data', function ($data) use ($caret) {
echo $caret->encode($data);
});

$stream->on('error', function ($e = null) {
// will be called if either parameter is invalid
echo 'ERROR requesting stream' . PHP_EOL . $e;
});

$stream->on('close', function ($e = null) {
echo 'CLOSED' . PHP_EOL . $e;
});

$loop->run();
40 changes: 40 additions & 0 deletions examples/logs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

// this example shows how the containerLogs() call can be used to get the logs of the given container.
// demonstrates the deferred logs API, which can be used to dump the logs in one go

use Clue\CaretNotation\Encoder;
use Clue\React\Docker\Client;

require __DIR__ . '/../vendor/autoload.php';

$container = isset($argv[1]) ? $argv[1] : 'asd';
echo 'Dumping logs (last 100 lines) of container "' . $container . '" (pass as argument to this example)' . PHP_EOL;

$loop = React\EventLoop\Factory::create();
$client = new Client($loop);

$client->containerLogs($container, false, true, true, 0, false, 100)->then(
function ($logs) {
echo 'Received the following logs:' . PHP_EOL;

// escape control characters (dumping logs of vi/nano etc.)
$caret = new Encoder("\t\r\n");
echo $caret->encode($logs);
},
function ($error) use ($container) {
echo <<<EOT
An error occured while trying to access the logs.

Have you tried running the following command?

$ docker run -i --name=$container busybox dmesg

Here's the error log:

$error
EOT;
}
);

$loop->run();
107 changes: 106 additions & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Clue\React\Docker;

use Clue\React\Buzz\Browser;
use Clue\React\Buzz\Io\Sender;
use Clue\React\Docker\Io\ResponseParser;
use Clue\React\Docker\Io\StreamingParser;
use React\EventLoop\LoopInterface;
Expand Down Expand Up @@ -257,6 +256,112 @@ public function containerTop($container, $ps_args = null)
)->then(array($this->parser, 'expectJson'));
}

/**
* Get stdout and stderr logs from the container id
*
* This resolves with a string containing the log output, i.e. STDOUT
* and STDERR as requested.
*
* Keep in mind that this means the whole string has to be kept in memory.
* For bigger container logs it's usually a better idea to use a streaming
* approach, see containerLogsStream() for more details.
* In particular, the same also applies for the $follow flag. It can be used
* to follow the container log messages as long as the container is running.
*
* Note that this endpoint works only for containers with the "json-file" or
* "journald" logging drivers.
*
* Note that this endpoint internally has to check the `containerInspect()`
* endpoint first in order to figure out the TTY settings to properly decode
* the raw log output.
*
* @param string $container container ID
* @param boolean $follow 1/True/true or 0/False/false, return stream. Default false
* @param boolean $stdout 1/True/true or 0/False/false, show stdout log. Default false
* @param boolean $stderr 1/True/true or 0/False/false, show stderr log. Default false
* @param int $since UNIX timestamp (integer) to filter logs. Specifying a timestamp will only output log-entries since that timestamp. Default: 0 (unfiltered) (requires API v1.19+ / Docker v1.7+)
* @param boolean $timestamps 1/True/true or 0/False/false, print timestamps for every log line. Default false
* @param int|null $tail Output specified number of lines at the end of logs: all or <number>. Default all
* @return PromiseInterface Promise<string> log output string
* @link https://docs.docker.com/engine/api/v1.40/#operation/ContainerLogs
* @uses self::containerLogsStream()
* @see self::containerLogsStream()
*/
public function containerLogs($container, $follow = false, $stdout = false, $stderr = false, $since = 0, $timestamps = false, $tail = null)
{
return $this->streamingParser->bufferedStream(
$this->containerLogsStream($container, $follow, $stdout, $stderr, $since, $timestamps, $tail)
);
}

/**
* Get stdout and stderr logs from the container id
*
* This is a streaming API endpoint that returns a readable stream instance
* containing the the log output, i.e. STDOUT and STDERR as requested.
*
* This works for container logs of arbitrary sizes as only small chunks have to
* be kept in memory.
*
* This is particularly useful for the $follow flag. It can be used
* to follow the container log messages as long as the container is running.
*
* Note that by default the output of both STDOUT and STDERR will be emitted
* as normal "data" events. You can optionally pass a custom event name which
* will be used to emit STDERR data so that it can be handled separately.
* Note that the normal streaming primitives likely do not know about this
* event, so special care may have to be taken.
* Also note that this option has no effect if the container has been
* created with a TTY.
*
* Note that this endpoint works only for containers with the "json-file" or
* "journald" logging drivers.
*
* Note that this endpoint internally has to check the `containerInspect()`
* endpoint first in order to figure out the TTY settings to properly decode
* the raw log output.
*
* @param string $container container ID
* @param boolean $follow 1/True/true or 0/False/false, return stream. Default false
* @param boolean $stdout 1/True/true or 0/False/false, show stdout log. Default false
* @param boolean $stderr 1/True/true or 0/False/false, show stderr log. Default false
* @param int $since UNIX timestamp (integer) to filter logs. Specifying a timestamp will only output log-entries since that timestamp. Default: 0 (unfiltered) (requires API v1.19+ / Docker v1.7+)
* @param boolean $timestamps 1/True/true or 0/False/false, print timestamps for every log line. Default false
* @param int|null $tail Output specified number of lines at the end of logs: all or <number>. Default all
* @param string $stderrEvent custom event to emit for STDERR data (otherwise emits as "data")
* @return ReadableStreamInterface log output stream
* @link https://docs.docker.com/engine/api/v1.40/#operation/ContainerLogs
* @see self::containerLogs()
*/
public function containerLogsStream($container, $follow = false, $stdout = false, $stderr = false, $since = 0, $timestamps = false, $tail = null, $stderrEvent = null)
{
$parser = $this->streamingParser;
$browser = $this->browser;
$url = $this->uri->expand(
'/containers/{container}/logs{?follow,stdout,stderr,since,timestamps,tail}',
array(
'container' => $container,
'follow' => $this->boolArg($follow),
'stdout' => $this->boolArg($stdout),
'stderr' => $this->boolArg($stderr),
'since' => ($since === 0) ? null : $since,
'timestamps' => $this->boolArg($timestamps),
'tail' => $tail
)
);

// first inspect container to check TTY setting, then request logs with appropriate log parser
return \React\Promise\Stream\unwrapReadable($this->containerInspect($container)->then(function ($info) use ($url, $browser, $parser, $stderrEvent) {
$stream = $parser->parsePlainStream($browser->withOptions(array('streaming' => true))->get($url));

if (!$info['Config']['Tty']) {
$stream = $parser->demultiplexStream($stream, $stderrEvent);
}

return $stream;
}));
}

/**
* Inspect changes on container id's filesystem
*
Expand Down
8 changes: 1 addition & 7 deletions src/Io/ReadableDemultiplexStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,12 @@ public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent =

$out = $this;
$buffer =& $this->buffer;
$closed =& $this->closed;

// pass all input data chunks through the parser
$multiplexed->on('data', array($out, 'push'));

// forward end event to output (unless parsing is still in progress)
$multiplexed->on('end', function () use (&$buffer, $out, &$closed) {
// ignore duplicate end events
if ($closed) {
return;
}

$multiplexed->on('end', function () use (&$buffer, $out) {
// buffer must be empty on end, otherwise this is an error situation
if ($buffer === '') {
$out->emit('end');
Expand Down
8 changes: 1 addition & 7 deletions src/Io/ReadableJsonStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ public function __construct(ReadableStreamInterface $input)

// forward end event to output
$out = $this;
$closed =& $this->closed;
$input->on('end', function () use ($out, $parser, &$closed) {
// ignore duplicate end events
if ($closed) {
return;
}

$input->on('end', function () use ($out, $parser) {
if ($parser->isEmpty()) {
$out->emit('end');
} else {
Expand Down
112 changes: 109 additions & 3 deletions tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
namespace Clue\Tests\React\Docker;

use Clue\React\Docker\Client;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Promise;
use React\Promise\Deferred;
use React\Stream\ThroughStream;
use RingCentral\Psr7\Response;

class ClientTest extends TestCase
Expand Down Expand Up @@ -169,6 +168,113 @@ public function testContainerChanges()
$this->expectPromiseResolveWith($json, $this->client->containerChanges(123));
}

public function testContainerLogsReturnsPendingPromiseWhenInspectingContainerIsPending()
{
$this->browser->expects($this->once())->method('get')->with('/containers/123/json')->willReturn(new \React\Promise\Promise(function () { }));

$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->isInstanceOf('React\Stream\ReadableStreamInterface'))->willReturn(new \React\Promise\Promise(function () { }));

$promise = $this->client->containerLogs('123');

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testContainerLogsRejectsWhenInspectingContainerRejects()
{
$this->browser->expects($this->once())->method('get')->with('/containers/123/json')->willReturn(\React\Promise\reject(new \RuntimeException()));

$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->isInstanceOf('React\Stream\ReadableStreamInterface'))->willReturn(\React\Promise\reject(new \RuntimeException()));

$promise = $this->client->containerLogs('123');

$promise->then($this->expectCallableNever(), $this->expectCallableOnce());
}

public function testContainerLogsReturnsPendingPromiseWhenInspectingContainerResolvesWithTtyAndContainerLogsArePending()
{
$this->browser->expects($this->once())->method('withOptions')->willReturnSelf();
$this->browser->expects($this->exactly(2))->method('get')->withConsecutive(
array('/containers/123/json'),
array('/containers/123/logs')
)->willReturnOnConsecutiveCalls(
\React\Promise\resolve(new Response(200, array(), '{"Config":{"Tty":true}}')),
new \React\Promise\Promise(function () { })
);

$this->parser->expects($this->once())->method('expectJson')->willReturn(array('Config' => array('Tty' => true)));
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->isInstanceOf('React\Stream\ReadableStreamInterface'))->willReturn(new \React\Promise\Promise(function () { }));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->willReturn($this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock());
$this->streamingParser->expects($this->never())->method('demultiplexStream');

$promise = $this->client->containerLogs('123');

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testContainerLogsReturnsPendingPromiseWhenInspectingContainerResolvesWithoutTtyAndContainerLogsArePending()
{
$this->browser->expects($this->once())->method('withOptions')->willReturnSelf();
$this->browser->expects($this->exactly(2))->method('get')->withConsecutive(
array('/containers/123/json'),
array('/containers/123/logs')
)->willReturnOnConsecutiveCalls(
\React\Promise\resolve(new Response(200, array(), '{"Config":{"Tty":false}}')),
new \React\Promise\Promise(function () { })
);

$this->parser->expects($this->once())->method('expectJson')->willReturn(array('Config' => array('Tty' => false)));
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->isInstanceOf('React\Stream\ReadableStreamInterface'))->willReturn(new \React\Promise\Promise(function () { }));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->willReturn($this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock());
$this->streamingParser->expects($this->once())->method('demultiplexStream')->willReturn($this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock());

$promise = $this->client->containerLogs('123');

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testContainerLogsResolvesWhenInspectingContainerResolvesWithTtyAndContainerLogsResolves()
{
$this->browser->expects($this->once())->method('withOptions')->willReturnSelf();
$this->browser->expects($this->exactly(2))->method('get')->withConsecutive(
array('/containers/123/json'),
array('/containers/123/logs')
)->willReturnOnConsecutiveCalls(
\React\Promise\resolve(new Response(200, array(), '{"Config":{"Tty":true}}')),
\React\Promise\resolve(new Response(200, array(), ''))
);

$this->parser->expects($this->once())->method('expectJson')->willReturn(array('Config' => array('Tty' => true)));
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->isInstanceOf('React\Stream\ReadableStreamInterface'))->willReturn(\React\Promise\resolve('output'));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->willReturn($this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock());
$this->streamingParser->expects($this->never())->method('demultiplexStream');

$promise = $this->client->containerLogs('123');

$promise->then($this->expectCallableOnceWith('output'), $this->expectCallableNever());
}

public function testContainerLogsStreamReturnStreamWhenInspectingContainerResolvesWithTtyAndContainerLogsResolves()
{
$this->browser->expects($this->once())->method('withOptions')->willReturnSelf();
$this->browser->expects($this->exactly(2))->method('get')->withConsecutive(
array('/containers/123/json'),
array('/containers/123/logs')
)->willReturnOnConsecutiveCalls(
\React\Promise\resolve(new Response(200, array(), '{"Config":{"Tty":true}}')),
\React\Promise\resolve(new Response(200, array(), ''))
);

$response = new ThroughStream();
$this->parser->expects($this->once())->method('expectJson')->willReturn(array('Config' => array('Tty' => true)));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->willReturn($response);
$this->streamingParser->expects($this->never())->method('demultiplexStream');

$stream = $this->client->containerLogsStream('123');

$stream->on('data', $this->expectCallableOnceWith('output'));
$response->write('output');
}

public function testContainerExport()
{
$data = 'tar stream';
Expand Down Expand Up @@ -506,7 +612,7 @@ public function testExecStart()
$this->expectRequest('POST', '/exec/123/start', $this->createResponse($data));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(Promise\resolve($data));
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(\React\Promise\resolve($data));

$this->expectPromiseResolveWith($data, $this->client->execStart(123, $config));
}
Expand Down
Loading