diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 8cb3fdd242..3fd5ff9028 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1649,6 +1649,7 @@ class FrameworkExtension extends Extension $defaultMiddleware = [ 'before' => [ ['id' => 'add_bus_name_stamp_middleware'], + ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ], diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 5ef47e751e..f50b613dc7 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -48,6 +48,8 @@ + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index c1824ecafe..5f2e007ddb 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -739,6 +739,7 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0)); $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], + ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ['id' => 'send_message'], @@ -748,6 +749,7 @@ abstract class FrameworkExtensionTest extends TestCase $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0)); $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], + ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], diff --git a/src/Symfony/Bundle/FrameworkBundle/composer.json b/src/Symfony/Bundle/FrameworkBundle/composer.json index 8854a78d14..57c344f68f 100644 --- a/src/Symfony/Bundle/FrameworkBundle/composer.json +++ b/src/Symfony/Bundle/FrameworkBundle/composer.json @@ -42,7 +42,7 @@ "symfony/expression-language": "~3.4|~4.0", "symfony/http-client": "^4.3", "symfony/mailer": "^4.3", - "symfony/messenger": "^4.3", + "symfony/messenger": "^4.3.6", "symfony/mime": "^4.3", "symfony/process": "~3.4|~4.0", "symfony/security-csrf": "~3.4|~4.0", @@ -73,7 +73,7 @@ "symfony/dotenv": "<4.2", "symfony/dom-crawler": "<4.3", "symfony/form": "<4.3", - "symfony/messenger": "<4.3", + "symfony/messenger": "<4.3.6", "symfony/property-info": "<3.4", "symfony/serializer": "<4.2", "symfony/stopwatch": "<3.4", diff --git a/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php new file mode 100644 index 0000000000..79b94fa265 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Tobias Schultze + * + * @experimental in 4.3 + */ +class RejectRedeliveredMessageException extends RuntimeException +{ +} diff --git a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php new file mode 100644 index 0000000000..2c6e6b6ff7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php @@ -0,0 +1,50 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; + +/** + * Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP. + * + * The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly. + * The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy. + * + * AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out + * or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the + * redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent + * infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry + * limit and potential delay. + * + * @experimental in 4.3 + * + * @author Tobias Schultze + */ +class RejectRedeliveredMessageMiddleware implements MiddlewareInterface +{ + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + // ignore the dispatched messages for retry + if (null !== $envelope->last(ReceivedStamp::class)) { + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); + + if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { + throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); + } + } + + return $stack->next()->handle($envelope, $stack); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index ad7477253e..9a09c0a04a 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -118,8 +118,8 @@ class WorkerTest extends TestCase } }); - // old message acknowledged - $this->assertSame(1, $receiver->getAcknowledgeCount()); + // old message rejected + $this->assertSame(1, $receiver->getRejectCount()); } public function testUnrecoverableMessageHandlingExceptionPreventsRetries() diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 398124e994..6205baefd4 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface; use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; @@ -135,6 +136,13 @@ class Worker implements WorkerInterface try { $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName))); } catch (\Throwable $throwable) { + $rejectFirst = $throwable instanceof RejectRedeliveredMessageException; + if ($rejectFirst) { + // redelivered messages are rejected first so that continuous failures in an event listener or while + // publishing for retry does not cause infinite redelivery loops + $receiver->reject($envelope); + } + if ($throwable instanceof HandlerFailedException) { $envelope = $throwable->getEnvelope(); } @@ -156,15 +164,15 @@ class Worker implements WorkerInterface ->with(new RedeliveryStamp($retryCount, $transportName)) ->withoutAll(ReceivedStamp::class); - // re-send the message + // re-send the message for retry $this->bus->dispatch($retryEnvelope); - // acknowledge the previous message has received - $receiver->ack($envelope); } else { if (null !== $this->logger) { $this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]); } + } + if (!$rejectFirst) { $receiver->reject($envelope); }