[Messenger] prevent infinite redelivery loops and blocked queues
by republishing the redelivered messages as retries with a retry limit and potential delay
This commit is contained in:
parent
b9f69441c5
commit
d211904c8e
@ -1649,6 +1649,7 @@ class FrameworkExtension extends Extension
|
|||||||
$defaultMiddleware = [
|
$defaultMiddleware = [
|
||||||
'before' => [
|
'before' => [
|
||||||
['id' => 'add_bus_name_stamp_middleware'],
|
['id' => 'add_bus_name_stamp_middleware'],
|
||||||
|
['id' => 'reject_redelivered_message_middleware'],
|
||||||
['id' => 'dispatch_after_current_bus'],
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'failed_message_processing_middleware'],
|
['id' => 'failed_message_processing_middleware'],
|
||||||
],
|
],
|
||||||
|
@ -48,6 +48,8 @@
|
|||||||
<argument type="service" id="validator" />
|
<argument type="service" id="validator" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />
|
||||||
|
|
||||||
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
|
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
|
||||||
|
|
||||||
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
|
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
|
||||||
|
@ -739,6 +739,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
|
||||||
$this->assertEquals([
|
$this->assertEquals([
|
||||||
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
|
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
|
||||||
|
['id' => 'reject_redelivered_message_middleware'],
|
||||||
['id' => 'dispatch_after_current_bus'],
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'failed_message_processing_middleware'],
|
['id' => 'failed_message_processing_middleware'],
|
||||||
['id' => 'send_message'],
|
['id' => 'send_message'],
|
||||||
@ -748,6 +749,7 @@ abstract class FrameworkExtensionTest extends TestCase
|
|||||||
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
|
||||||
$this->assertEquals([
|
$this->assertEquals([
|
||||||
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
|
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
|
||||||
|
['id' => 'reject_redelivered_message_middleware'],
|
||||||
['id' => 'dispatch_after_current_bus'],
|
['id' => 'dispatch_after_current_bus'],
|
||||||
['id' => 'failed_message_processing_middleware'],
|
['id' => 'failed_message_processing_middleware'],
|
||||||
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
|
||||||
|
@ -42,7 +42,7 @@
|
|||||||
"symfony/expression-language": "~3.4|~4.0",
|
"symfony/expression-language": "~3.4|~4.0",
|
||||||
"symfony/http-client": "^4.3",
|
"symfony/http-client": "^4.3",
|
||||||
"symfony/mailer": "^4.3",
|
"symfony/mailer": "^4.3",
|
||||||
"symfony/messenger": "^4.3",
|
"symfony/messenger": "^4.3.6",
|
||||||
"symfony/mime": "^4.3",
|
"symfony/mime": "^4.3",
|
||||||
"symfony/process": "~3.4|~4.0",
|
"symfony/process": "~3.4|~4.0",
|
||||||
"symfony/security-csrf": "~3.4|~4.0",
|
"symfony/security-csrf": "~3.4|~4.0",
|
||||||
@ -73,7 +73,7 @@
|
|||||||
"symfony/dotenv": "<4.2",
|
"symfony/dotenv": "<4.2",
|
||||||
"symfony/dom-crawler": "<4.3",
|
"symfony/dom-crawler": "<4.3",
|
||||||
"symfony/form": "<4.3",
|
"symfony/form": "<4.3",
|
||||||
"symfony/messenger": "<4.3",
|
"symfony/messenger": "<4.3.6",
|
||||||
"symfony/property-info": "<3.4",
|
"symfony/property-info": "<3.4",
|
||||||
"symfony/serializer": "<4.2",
|
"symfony/serializer": "<4.2",
|
||||||
"symfony/stopwatch": "<3.4",
|
"symfony/stopwatch": "<3.4",
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Tobias Schultze <http://tobion.de>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class RejectRedeliveredMessageException extends RuntimeException
|
||||||
|
{
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Middleware;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
|
||||||
|
*
|
||||||
|
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
|
||||||
|
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
|
||||||
|
*
|
||||||
|
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
|
||||||
|
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
|
||||||
|
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
|
||||||
|
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
|
||||||
|
* limit and potential delay.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Tobias Schultze <http://tobion.de>
|
||||||
|
*/
|
||||||
|
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
|
||||||
|
{
|
||||||
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||||
|
{
|
||||||
|
// ignore the dispatched messages for retry
|
||||||
|
if (null !== $envelope->last(ReceivedStamp::class)) {
|
||||||
|
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||||
|
|
||||||
|
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
|
||||||
|
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
}
|
||||||
|
}
|
@ -118,8 +118,8 @@ class WorkerTest extends TestCase
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// old message acknowledged
|
// old message rejected
|
||||||
$this->assertSame(1, $receiver->getAcknowledgeCount());
|
$this->assertSame(1, $receiver->getRejectCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
|
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
|
||||||
|
@ -18,6 +18,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
|||||||
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||||
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
||||||
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
||||||
|
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
@ -135,6 +136,13 @@ class Worker implements WorkerInterface
|
|||||||
try {
|
try {
|
||||||
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
|
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
|
||||||
} catch (\Throwable $throwable) {
|
} catch (\Throwable $throwable) {
|
||||||
|
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
|
||||||
|
if ($rejectFirst) {
|
||||||
|
// redelivered messages are rejected first so that continuous failures in an event listener or while
|
||||||
|
// publishing for retry does not cause infinite redelivery loops
|
||||||
|
$receiver->reject($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
if ($throwable instanceof HandlerFailedException) {
|
if ($throwable instanceof HandlerFailedException) {
|
||||||
$envelope = $throwable->getEnvelope();
|
$envelope = $throwable->getEnvelope();
|
||||||
}
|
}
|
||||||
@ -156,15 +164,15 @@ class Worker implements WorkerInterface
|
|||||||
->with(new RedeliveryStamp($retryCount, $transportName))
|
->with(new RedeliveryStamp($retryCount, $transportName))
|
||||||
->withoutAll(ReceivedStamp::class);
|
->withoutAll(ReceivedStamp::class);
|
||||||
|
|
||||||
// re-send the message
|
// re-send the message for retry
|
||||||
$this->bus->dispatch($retryEnvelope);
|
$this->bus->dispatch($retryEnvelope);
|
||||||
// acknowledge the previous message has received
|
|
||||||
$receiver->ack($envelope);
|
|
||||||
} else {
|
} else {
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$rejectFirst) {
|
||||||
$receiver->reject($envelope);
|
$receiver->reject($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user