Skip to content

Commit aed932b

Browse files
committed
Multi Server Executor
1 parent d99bc4e commit aed932b

File tree

2 files changed

+281
-0
lines changed

2 files changed

+281
-0
lines changed

src/Query/MultiServerExecutor.php

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
namespace React\Dns\Query;
4+
5+
use React\EventLoop\LoopInterface;
6+
use React\EventLoop\TimerInterface;
7+
use React\Promise\CancellablePromiseInterface;
8+
use React\Promise\Promise;
9+
use RuntimeException;
10+
11+
final class MultiServerExecutor implements ExecutorInterface
12+
{
13+
private $loop;
14+
15+
/**
16+
* @var ExecutorInterface[]
17+
*/
18+
public $executors = array();
19+
20+
private $executorsCount = 0;
21+
22+
public function __construct($nameservers, LoopInterface $loop)
23+
{
24+
$this->loop = $loop;
25+
foreach ($nameservers as $nameserver) {
26+
$this->executors[] = new UdpTransportExecutor($nameserver, $loop);
27+
$this->executorsCount++;
28+
}
29+
}
30+
31+
public function query(Query $query)
32+
{
33+
$that = $this;
34+
$executorsIndexes = range(0, $this->executorsCount - 1);
35+
shuffle($executorsIndexes);
36+
$loop = $this->loop;
37+
$promises = array();
38+
$timer = null;
39+
return new Promise(function ($resolve, $reject) use (&$promises, &$timer, &$executorsIndexes, $that, $loop, $query) {
40+
$resolveWrap = function ($index) use (&$promises, &$timer, $resolve, $loop) {
41+
return function ($result) use ($index, &$promises, &$timer, $resolve, $loop) {
42+
unset($promises[$index]);
43+
44+
$resolve($result);
45+
46+
if ($timer instanceof TimerInterface) {
47+
$loop->cancelTimer($timer);
48+
$timer = null;
49+
}
50+
51+
foreach ($promises as $promise) {
52+
if ($promise instanceof CancellablePromiseInterface) {
53+
$promise->cancel();
54+
}
55+
}
56+
};
57+
};
58+
$rejectWrap = function ($index) use (&$promises, &$timer, &$executorsIndexes, $reject, $loop) {
59+
return function ($error) use ($index, &$promises, &$timer, &$executorsIndexes, $reject, $loop) {
60+
unset($promises[$index]);
61+
62+
if (\count($promises) > 0 || \count($executorsIndexes) > 0) {
63+
return;
64+
}
65+
66+
$reject($error);
67+
};
68+
};
69+
$timer = $loop->addPeriodicTimer(0.05, function () use (&$promises, &$timer, &$executorsIndexes, $that, $loop, $query, $resolveWrap, $rejectWrap) {
70+
$index = array_pop($executorsIndexes);
71+
$promise = $that->executors[$index]->query($query);
72+
$promise->then($resolveWrap($index), $rejectWrap($index));
73+
$promises[$index] = $promise;
74+
75+
76+
if (count($executorsIndexes) <= 0) {
77+
$loop->cancelTimer($timer);
78+
$timer = null;
79+
}
80+
});
81+
82+
$index = array_pop($executorsIndexes);
83+
$promise = $that->executors[$index]->query($query);
84+
$promise->then($resolveWrap($index), $rejectWrap($index));
85+
$promises[$index] = $promise;
86+
}, function ($resolve, $reject) use (&$promises, &$timer, $loop) {
87+
if ($timer instanceof TimerInterface) {
88+
$loop->cancelTimer($timer);
89+
}
90+
91+
foreach ($promises as $promise) {
92+
if ($promise instanceof CancellablePromiseInterface) {
93+
$promise->cancel();
94+
}
95+
}
96+
97+
$reject(new RuntimeException('Lookup query has been canceled'));
98+
});
99+
}
100+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
<?php
2+
3+
use React\Dns\Model\Message;
4+
use React\Dns\Protocol\BinaryDumper;
5+
use React\Dns\Protocol\Parser;
6+
use React\Dns\Query\MultiServerExecutor;
7+
use React\Dns\Query\Query;
8+
use React\EventLoop\Factory;
9+
use React\EventLoop\LoopInterface;
10+
use React\Tests\Dns\TestCase;
11+
12+
class MultiServerExecutorTest extends TestCase
13+
{
14+
public $serverConnectCount = 0;
15+
public $serverWriteCount = 0;
16+
17+
public function testQueryWillResolve()
18+
{
19+
$loop = Factory::create();
20+
21+
$server = $this->createAnsweringServer($loop);
22+
$address = stream_socket_get_name($server, false);
23+
$executor = new MultiServerExecutor(array($address), $loop);
24+
25+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
26+
27+
$promise = $executor->query($query);
28+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
29+
30+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
31+
$this->assertSame(1, $this->serverConnectCount);
32+
$this->assertSame(1, $this->serverWriteCount);
33+
}
34+
35+
public function testQueryWillBeSendToAllServers()
36+
{
37+
$loop = Factory::create();
38+
39+
$answeringServer = $this->createWaitingAnsweringServer($loop, 0.1);
40+
$waitingServer = $this->createWaitingAnsweringServer($loop, 1);
41+
$answeringAddress = stream_socket_get_name($answeringServer, false);
42+
$waitingAddress = stream_socket_get_name($waitingServer, false);
43+
$executor = new MultiServerExecutor(array($answeringAddress, $waitingAddress), $loop);
44+
45+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
46+
47+
$promise = $executor->query($query);
48+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
49+
50+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
51+
$this->assertSame(2, $this->serverConnectCount);
52+
$this->assertSame(1, $this->serverWriteCount);
53+
}
54+
55+
public function testQueryWillNotFailWhenOneResponseIsTruncated()
56+
{
57+
$loop = Factory::create();
58+
59+
$servers = array();
60+
$addresses = array();
61+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
62+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
63+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2);
64+
foreach ($servers as $server) {
65+
$addresses[] = stream_socket_get_name($server, false);
66+
}
67+
$executor = new MultiServerExecutor($addresses, $loop);
68+
69+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
70+
71+
$promise = $executor->query($query);
72+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
73+
74+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
75+
$this->assertSame(3, $this->serverConnectCount);
76+
$this->assertSame(2, $this->serverWriteCount);
77+
}
78+
79+
/**
80+
* @expectedException RuntimeException
81+
* @expectedExceptionMessage DNS query for google.com failed: The server returned a truncated result for a UDP query, but retrying via TCP is currently not supported
82+
*/
83+
public function testQueryWillFailWhenAllResponseAraTruncated()
84+
{
85+
$loop = Factory::create();
86+
87+
$servers = array();
88+
$addresses = array();
89+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
90+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2, true);
91+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3, true);
92+
foreach ($servers as $server) {
93+
$addresses[] = stream_socket_get_name($server, false);
94+
}
95+
$executor = new MultiServerExecutor($addresses, $loop);
96+
97+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
98+
99+
$promise = $executor->query($query);
100+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
101+
102+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
103+
$this->assertSame(2, $this->serverConnectCount);
104+
$this->assertSame(2, $this->serverWriteCount);
105+
}
106+
107+
/**
108+
* @expectedException RuntimeException
109+
* @expectedExceptionMessage Lookup query has been canceled
110+
*/
111+
public function testCancelPromiseWillCancelAllPendingQueries()
112+
{
113+
$loop = Factory::create();
114+
115+
$servers = array();
116+
$addresses = array();
117+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.1, true);
118+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.2, true);
119+
$servers[] = $this->createWaitingAnsweringServer($loop, 0.3, true);
120+
foreach ($servers as $server) {
121+
$addresses[] = stream_socket_get_name($server, false);
122+
}
123+
$executor = new MultiServerExecutor($addresses, $loop);
124+
125+
$query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN);
126+
127+
$promise = $executor->query($query);
128+
$loop->futureTick(function () use ($promise) {
129+
$promise->cancel();
130+
});
131+
$response = \Clue\React\Block\await($promise, $loop, 0.5);
132+
133+
$this->assertInstanceOf('React\Dns\Model\Message', $response);
134+
$this->assertSame(2, $this->serverConnectCount);
135+
$this->assertSame(2, $this->serverWriteCount);
136+
}
137+
138+
private function createAnsweringServer(LoopInterface $loop)
139+
{
140+
$that = $this;
141+
$server = stream_socket_server('udp://127.0.0.1:0', $errno, $errstr, STREAM_SERVER_BIND);
142+
$loop->addReadStream($server, function ($server) use ($that) {
143+
$that->serverConnectCount++;
144+
$parser = new Parser();
145+
$dumper = new BinaryDumper();
146+
147+
$data = stream_socket_recvfrom($server, 512, 0, $peer);
148+
149+
$message = $parser->parseMessage($data);
150+
151+
stream_socket_sendto($server, $dumper->toBinary($message), 0, $peer);
152+
$that->serverWriteCount++;
153+
});
154+
155+
return $server;
156+
}
157+
158+
private function createWaitingAnsweringServer(LoopInterface $loop, $timerout, $truncated = false)
159+
{
160+
$that = $this;
161+
$server = stream_socket_server('udp://127.0.0.1:0', $errno, $errstr, STREAM_SERVER_BIND);
162+
$loop->addReadStream($server, function ($server) use ($loop, $timerout, $that, $truncated) {
163+
$that->serverConnectCount++;
164+
$parser = new Parser();
165+
166+
$data = stream_socket_recvfrom($server, 512, 0, $peer);
167+
168+
$message = $parser->parseMessage($data);
169+
$message->tc = $truncated;
170+
171+
$loop->addTimer($timerout, function () use ($server, $message, $peer, $that) {
172+
$dumper = new BinaryDumper();
173+
174+
stream_socket_sendto($server, $dumper->toBinary($message), 0, $peer);
175+
$that->serverWriteCount++;
176+
});
177+
});
178+
179+
return $server;
180+
}
181+
}

0 commit comments

Comments
 (0)