[Messenger] prevent infinite redelivery loops and blocked queues

by republishing the redelivered messages as retries with a retry limit and potential delay
This commit is contained in:
Tobias Schultze 2019-10-24 17:19:11 +02:00
parent b9f69441c5
commit d211904c8e
8 changed files with 91 additions and 7 deletions

View File

@ -1649,6 +1649,7 @@ class FrameworkExtension extends Extension
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],

View File

@ -48,6 +48,8 @@
<argument type="service" id="validator" />
</service>
<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">

View File

@ -739,6 +739,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
@ -748,6 +749,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],

View File

@ -42,7 +42,7 @@
"symfony/expression-language": "~3.4|~4.0",
"symfony/http-client": "^4.3",
"symfony/mailer": "^4.3",
"symfony/messenger": "^4.3",
"symfony/messenger": "^4.3.6",
"symfony/mime": "^4.3",
"symfony/process": "~3.4|~4.0",
"symfony/security-csrf": "~3.4|~4.0",
@ -73,7 +73,7 @@
"symfony/dotenv": "<4.2",
"symfony/dom-crawler": "<4.3",
"symfony/form": "<4.3",
"symfony/messenger": "<4.3",
"symfony/messenger": "<4.3.6",
"symfony/property-info": "<3.4",
"symfony/serializer": "<4.2",
"symfony/stopwatch": "<3.4",

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* @author Tobias Schultze <http://tobion.de>
*
* @experimental in 4.3
*/
class RejectRedeliveredMessageException extends RuntimeException
{
}

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/**
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
*
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
*
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
* limit and potential delay.
*
* @experimental in 4.3
*
* @author Tobias Schultze <http://tobion.de>
*/
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// ignore the dispatched messages for retry
if (null !== $envelope->last(ReceivedStamp::class)) {
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}
}
return $stack->next()->handle($envelope, $stack);
}
}

View File

@ -118,8 +118,8 @@ class WorkerTest extends TestCase
}
});
// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
// old message rejected
$this->assertSame(1, $receiver->getRejectCount());
}
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()

View File

@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
@ -135,6 +136,13 @@ class Worker implements WorkerInterface
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
} catch (\Throwable $throwable) {
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
if ($rejectFirst) {
// redelivered messages are rejected first so that continuous failures in an event listener or while
// publishing for retry does not cause infinite redelivery loops
$receiver->reject($envelope);
}
if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}
@ -156,15 +164,15 @@ class Worker implements WorkerInterface
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);
// re-send the message
// re-send the message for retry
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}
if (!$rejectFirst) {
$receiver->reject($envelope);
}