fixed roundrobin dead transport which should recover
This commit is contained in:
parent
7e2fbe13c8
commit
ccbb171312
@ -64,6 +64,69 @@ class FailoverTransportTest extends TestCase
|
|||||||
$t->send(new RawMessage(''));
|
$t->send(new RawMessage(''));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testSendOneDeadAndRecoveryNotWithinRetryPeriod()
|
||||||
|
{
|
||||||
|
$t1 = $this->createMock(TransportInterface::class);
|
||||||
|
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
|
||||||
|
$t1->expects($this->once())->method('send');
|
||||||
|
$t2 = $this->createMock(TransportInterface::class);
|
||||||
|
$t2->expects($this->exactly(5))->method('send');
|
||||||
|
$t = new FailoverTransport([$t1, $t2], 40);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testSendOneDeadAndRecoveryWithinRetryPeriod()
|
||||||
|
{
|
||||||
|
$t1 = $this->createMock(TransportInterface::class);
|
||||||
|
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
|
||||||
|
$t1->expects($this->at(1))->method('send');
|
||||||
|
$t1->expects($this->exactly(3))->method('send');
|
||||||
|
$t2 = $this->createMock(TransportInterface::class);
|
||||||
|
$t2->expects($this->at(0))->method('send');
|
||||||
|
$t2->expects($this->at(1))->method('send');
|
||||||
|
$t2->expects($this->at(2))->method('send');
|
||||||
|
$t2->expects($this->at(3))->method('send')->will($this->throwException(new TransportException()));
|
||||||
|
$t2->expects($this->exactly(4))->method('send');
|
||||||
|
$t = new FailoverTransport([$t1, $t2], 6);
|
||||||
|
$t->send(new RawMessage('')); // t1>fail - t2>sent
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage('')); // t2>sent
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage('')); // t2>sent
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage('')); // t2>fail - t1>sent
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage('')); // t1>sent
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testSendAllDeadWithinRetryPeriod()
|
||||||
|
{
|
||||||
|
$t1 = $this->createMock(TransportInterface::class);
|
||||||
|
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
|
||||||
|
$t1->expects($this->once())->method('send');
|
||||||
|
$t2 = $this->createMock(TransportInterface::class);
|
||||||
|
$t2->expects($this->at(0))->method('send');
|
||||||
|
$t2->expects($this->at(1))->method('send');
|
||||||
|
$t2->expects($this->at(2))->method('send')->will($this->throwException(new TransportException()));
|
||||||
|
$t2->expects($this->exactly(3))->method('send');
|
||||||
|
$t = new FailoverTransport([$t1, $t2], 40);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(4);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(4);
|
||||||
|
$this->expectException(TransportException::class);
|
||||||
|
$this->expectExceptionMessage('All transports failed.');
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
}
|
||||||
|
|
||||||
public function testSendOneDeadButRecover()
|
public function testSendOneDeadButRecover()
|
||||||
{
|
{
|
||||||
$t1 = $this->createMock(TransportInterface::class);
|
$t1 = $this->createMock(TransportInterface::class);
|
||||||
|
@ -64,16 +64,35 @@ class RoundRobinTransportTest extends TestCase
|
|||||||
$t->send(new RawMessage(''));
|
$t->send(new RawMessage(''));
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testSendOneDeadButRecover()
|
public function testSendOneDeadAndRecoveryNotWithinRetryPeriod()
|
||||||
{
|
{
|
||||||
$t1 = $this->createMock(TransportInterface::class);
|
$t1 = $this->createMock(TransportInterface::class);
|
||||||
$t1->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
|
$t1->expects($this->exactly(4))->method('send');
|
||||||
$t1->expects($this->at(1))->method('send');
|
|
||||||
$t2 = $this->createMock(TransportInterface::class);
|
$t2 = $this->createMock(TransportInterface::class);
|
||||||
|
$t2->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
|
||||||
$t2->expects($this->once())->method('send');
|
$t2->expects($this->once())->method('send');
|
||||||
$t = new RoundRobinTransport([$t1, $t2], 1);
|
$t = new RoundRobinTransport([$t1, $t2], 60);
|
||||||
$t->send(new RawMessage(''));
|
$t->send(new RawMessage(''));
|
||||||
sleep(2);
|
$t->send(new RawMessage(''));
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testSendOneDeadAndRecoveryWithinRetryPeriod()
|
||||||
|
{
|
||||||
|
$t1 = $this->createMock(TransportInterface::class);
|
||||||
|
$t1->expects($this->exactly(3))->method('send');
|
||||||
|
$t2 = $this->createMock(TransportInterface::class);
|
||||||
|
$t2->expects($this->at(0))->method('send')->will($this->throwException(new TransportException()));
|
||||||
|
$t2->expects($this->at(1))->method('send');
|
||||||
|
$t2->expects($this->exactly(2))->method('send');
|
||||||
|
$t = new RoundRobinTransport([$t1, $t2], 3);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(5);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(5);
|
||||||
|
$t->send(new RawMessage(''));
|
||||||
|
sleep(5);
|
||||||
$t->send(new RawMessage(''));
|
$t->send(new RawMessage(''));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,11 +66,20 @@ class RoundRobinTransport implements TransportInterface
|
|||||||
if (!$this->isTransportDead($transport)) {
|
if (!$this->isTransportDead($transport)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((microtime(true) - $this->deadTransports[$transport]) > $this->retryPeriod) {
|
if ((microtime(true) - $this->deadTransports[$transport]) > $this->retryPeriod) {
|
||||||
$this->deadTransports->detach($transport);
|
$this->deadTransports->detach($transport);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($transport) {
|
||||||
|
$this->transports[] = $transport;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->deadTransports->count() >= \count($this->transports)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($transport) {
|
if ($transport) {
|
||||||
|
Reference in New Issue
Block a user