[Messenger] fix wrong use of generator returns
And some other minor cleanups
This commit is contained in:
parent
b82b09eefb
commit
e8a09e9d85
@ -32,7 +32,7 @@ class SentToFailureTransportStamp implements StampInterface
|
|||||||
$this->exceptionMessage = $exceptionMessage;
|
$this->exceptionMessage = $exceptionMessage;
|
||||||
$this->originalReceiverName = $originalReceiverName;
|
$this->originalReceiverName = $originalReceiverName;
|
||||||
$this->flattenException = $flattenException;
|
$this->flattenException = $flattenException;
|
||||||
$this->sentAt = new \DateTime();
|
$this->sentAt = new \DateTimeImmutable();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getExceptionMessage(): string
|
public function getExceptionMessage(): string
|
||||||
@ -50,7 +50,7 @@ class SentToFailureTransportStamp implements StampInterface
|
|||||||
return $this->flattenException;
|
return $this->flattenException;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getSentAt(): \DateTime
|
public function getSentAt(): \DateTimeInterface
|
||||||
{
|
{
|
||||||
return $this->sentAt;
|
return $this->sentAt;
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,6 @@ class SentToFailureTransportStampTest extends TestCase
|
|||||||
$this->assertSame('exception message', $stamp->getExceptionMessage());
|
$this->assertSame('exception message', $stamp->getExceptionMessage());
|
||||||
$this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
|
$this->assertSame('original_receiver', $stamp->getOriginalReceiverName());
|
||||||
$this->assertSame($flattenException, $stamp->getFlattenException());
|
$this->assertSame($flattenException, $stamp->getFlattenException());
|
||||||
$this->assertInstanceOf(\DateTime::class, $stamp->getSentAt());
|
$this->assertInstanceOf(\DateTimeInterface::class, $stamp->getSentAt());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ class DoctrineReceiverTest extends TestCase
|
|||||||
$connection->method('get')->willReturn($doctrineEnvelope);
|
$connection->method('get')->willReturn($doctrineEnvelope);
|
||||||
|
|
||||||
$receiver = new DoctrineReceiver($connection, $serializer);
|
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||||
$actualEnvelopes = iterator_to_array($receiver->get());
|
$actualEnvelopes = $receiver->get();
|
||||||
$this->assertCount(1, $actualEnvelopes);
|
$this->assertCount(1, $actualEnvelopes);
|
||||||
/** @var Envelope $actualEnvelope */
|
/** @var Envelope $actualEnvelope */
|
||||||
$actualEnvelope = $actualEnvelopes[0];
|
$actualEnvelope = $actualEnvelopes[0];
|
||||||
@ -67,7 +67,7 @@ class DoctrineReceiverTest extends TestCase
|
|||||||
$connection->expects($this->once())->method('reject');
|
$connection->expects($this->once())->method('reject');
|
||||||
|
|
||||||
$receiver = new DoctrineReceiver($connection, $serializer);
|
$receiver = new DoctrineReceiver($connection, $serializer);
|
||||||
iterator_to_array($receiver->get());
|
$receiver->get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testAll()
|
public function testAll()
|
||||||
|
@ -46,7 +46,7 @@ class DoctrineTransportTest extends TestCase
|
|||||||
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
||||||
$connection->method('get')->willReturn($doctrineEnvelope);
|
$connection->method('get')->willReturn($doctrineEnvelope);
|
||||||
|
|
||||||
$envelopes = iterator_to_array($transport->get());
|
$envelopes = $transport->get();
|
||||||
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
|
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,12 +13,8 @@ namespace Symfony\Component\Messenger\Tests\Transport\Sender;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
|
|
||||||
|
|
||||||
class SingleMessageReceiverTest extends TestCase
|
class SingleMessageReceiverTest extends TestCase
|
||||||
{
|
{
|
||||||
@ -28,11 +24,11 @@ class SingleMessageReceiverTest extends TestCase
|
|||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
|
||||||
$receiver = new SingleMessageReceiver($innerReceiver, $envelope);
|
$receiver = new SingleMessageReceiver($innerReceiver, $envelope);
|
||||||
$received = \iterator_to_array($receiver->get());
|
$received = $receiver->get();
|
||||||
$this->assertCount(1, $received);
|
$this->assertCount(1, $received);
|
||||||
$this->assertSame($received[0], $envelope);
|
$this->assertSame($received[0], $envelope);
|
||||||
|
|
||||||
$this->assertEmpty(\iterator_to_array($receiver->get()));
|
$this->assertEmpty($receiver->get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testCallsAreForwarded()
|
public function testCallsAreForwarded()
|
||||||
|
@ -33,7 +33,7 @@ class RedisReceiverTest extends TestCase
|
|||||||
$connection->method('get')->willReturn($redisEnvelop);
|
$connection->method('get')->willReturn($redisEnvelop);
|
||||||
|
|
||||||
$receiver = new RedisReceiver($connection, $serializer);
|
$receiver = new RedisReceiver($connection, $serializer);
|
||||||
$actualEnvelopes = iterator_to_array($receiver->get());
|
$actualEnvelopes = $receiver->get();
|
||||||
$this->assertCount(1, $actualEnvelopes);
|
$this->assertCount(1, $actualEnvelopes);
|
||||||
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
|
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
|
||||||
}
|
}
|
||||||
@ -51,7 +51,7 @@ class RedisReceiverTest extends TestCase
|
|||||||
$connection->expects($this->once())->method('reject');
|
$connection->expects($this->once())->method('reject');
|
||||||
|
|
||||||
$receiver = new RedisReceiver($connection, $serializer);
|
$receiver = new RedisReceiver($connection, $serializer);
|
||||||
iterator_to_array($receiver->get());
|
$receiver->get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function createRedisEnvelope()
|
private function createRedisEnvelope()
|
||||||
|
@ -46,7 +46,7 @@ class RedisTransportTest extends TestCase
|
|||||||
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
|
||||||
$connection->method('get')->willReturn($redisEnvelope);
|
$connection->method('get')->willReturn($redisEnvelope);
|
||||||
|
|
||||||
$envelopes = iterator_to_array($transport->get());
|
$envelopes = $transport->get();
|
||||||
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
|
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (null === $amqpEnvelope) {
|
if (null === $amqpEnvelope) {
|
||||||
return [];
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -84,12 +84,12 @@ class AmqpTransport implements TransportInterface, SetupableTransportInterface,
|
|||||||
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
|
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getReceiver()
|
private function getReceiver(): AmqpReceiver
|
||||||
{
|
{
|
||||||
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
|
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getSender()
|
private function getSender(): AmqpSender
|
||||||
{
|
{
|
||||||
return $this->sender = new AmqpSender($this->connection, $this->serializer);
|
return $this->sender = new AmqpSender($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface,
|
|||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
yield $this->createEnvelopeFromData($doctrineEnvelope);
|
return [$this->createEnvelopeFromData($doctrineEnvelope)];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1,5 +1,14 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
namespace Symfony\Component\Messenger\Transport\Receiver;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
@ -1,5 +1,14 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
namespace Symfony\Component\Messenger\Transport\Receiver;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
@ -32,7 +41,7 @@ class SingleMessageReceiver implements ReceiverInterface
|
|||||||
|
|
||||||
$this->hasReceived = true;
|
$this->hasReceived = true;
|
||||||
|
|
||||||
yield $this->envelope;
|
return [$this->envelope];
|
||||||
}
|
}
|
||||||
|
|
||||||
public function ack(Envelope $envelope): void
|
public function ack(Envelope $envelope): void
|
||||||
|
@ -57,7 +57,7 @@ class RedisReceiver implements ReceiverInterface
|
|||||||
throw $exception;
|
throw $exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
yield $envelope->with(new RedisReceivedStamp($redisEnvelope['id']));
|
return [$envelope->with(new RedisReceivedStamp($redisEnvelope['id']))];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,12 +76,12 @@ class RedisTransport implements TransportInterface, SetupableTransportInterface
|
|||||||
$this->connection->setup();
|
$this->connection->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getReceiver()
|
private function getReceiver(): RedisReceiver
|
||||||
{
|
{
|
||||||
return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
|
return $this->receiver = new RedisReceiver($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getSender()
|
private function getSender(): RedisSender
|
||||||
{
|
{
|
||||||
return $this->sender = new RedisSender($this->connection, $this->serializer);
|
return $this->sender = new RedisSender($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
|
@ -136,16 +136,11 @@ class Worker implements WorkerInterface
|
|||||||
$envelope = $throwable->getEnvelope();
|
$envelope = $throwable->getEnvelope();
|
||||||
}
|
}
|
||||||
|
|
||||||
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
$shouldRetry = $retryStrategy && $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
|
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
|
||||||
|
|
||||||
if ($shouldRetry) {
|
if ($shouldRetry) {
|
||||||
if (null === $retryStrategy) {
|
|
||||||
// not logically allowed, but check just in case
|
|
||||||
throw new LogicException('Retrying is not supported without a retry strategy.');
|
|
||||||
}
|
|
||||||
|
|
||||||
$retryCount = $this->getRetryCount($envelope) + 1;
|
$retryCount = $this->getRetryCount($envelope) + 1;
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
|
$this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
|
||||||
@ -194,16 +189,12 @@ class Worker implements WorkerInterface
|
|||||||
$this->eventDispatcher->dispatch($event);
|
$this->eventDispatcher->dispatch($event);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyInterface $retryStrategy): bool
|
private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInterface $retryStrategy): bool
|
||||||
{
|
{
|
||||||
if ($e instanceof UnrecoverableMessageHandlingException) {
|
if ($e instanceof UnrecoverableMessageHandlingException) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (null === $retryStrategy) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
$sentStamp = $envelope->last(SentStamp::class);
|
$sentStamp = $envelope->last(SentStamp::class);
|
||||||
if (null === $sentStamp) {
|
if (null === $sentStamp) {
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
|
@ -59,7 +59,7 @@ class StopWhenRestartSignalIsReceived implements WorkerInterface
|
|||||||
$this->decoratedWorker->stop();
|
$this->decoratedWorker->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function shouldRestart(float $workerStartedAt)
|
private function shouldRestart(float $workerStartedAt): bool
|
||||||
{
|
{
|
||||||
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
|
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user