bug #34082 Revert "[Messenger] Fix exception message of failed message is dropped (Tobion)

This PR was merged into the 4.3 branch.

Discussion
----------

Revert "[Messenger] Fix exception message of failed message is dropped

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tickets       |
| License       | MIT
| Doc PR        |

This reverts #33600 because it makes the message grow for each retry until AMQP cannot handle it anymore. On each retry, the full exception trace is added to the message. So in our case on the 5th retry, the message is too big for the AMQP library to encode it. AMQP extension then throws the exception

> Library error: table too large for buffer

(ref. https://github.com/alanxz/rabbitmq-c/issues/224 and https://github.com/pdezwart/php-amqp/issues/131) when trying to publish the message.

To solve this, I suggest to revert #33600 (this PR) and merge #32341 instead which does not re-add the exception on each failure.

Btw, the above problem causes other problematic side-effects of Symfony messenger. As the new retry message fails to be published with an exception, the old (currently processed message) also does not get removed (acknowledged) from the delay queue. So rabbitmq redelivers the message and the same thing happens forever. This can block the consumers and have a huge toll on your service. That's just another case for https://github.com/symfony/symfony/issues/32055#issuecomment-529630654. I'll try to fix this in another PR.

Commits
-------

3dbe924f47 Revert "[Messenger] Fix exception message of failed message is dropped on retry"
This commit is contained in:
Tobias Schultze 2019-10-23 14:36:36 +02:00
commit 2e17488332
2 changed files with 8 additions and 61 deletions

View File

@ -13,15 +13,12 @@ namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Worker;
class FailedMessagesRetryCommandTest extends TestCase
{
@ -37,52 +34,16 @@ class FailedMessagesRetryCommandTest extends TestCase
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
$command = new FailedMessagesRetryCommand('failure_receiver', $receiver, $bus, $dispatcher);
$command = new FailedMessagesRetryCommand(
'failure_receiver',
$receiver,
$bus,
$dispatcher
);
$tester = new CommandTester($command);
$tester->execute(['id' => [10, 12]]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
public function testExceptionOnRetry()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(10)->willReturn(new Envelope(new \stdClass()));
// message will eventually be ack'ed in Worker
$receiver->expects($this->once())->method('ack');
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->at(0))
->method('dispatch')
->with($this->callback(function (Envelope $envelope) {
$lastReceivedStamp = $envelope->last(ReceivedStamp::class);
return $lastReceivedStamp instanceof ReceivedStamp && \is_string($lastReceivedStamp->getTransportName());
}))
->will($this->throwException(new \Exception('Mock test exception')));
$bus->expects($this->at(1))
->method('dispatch')
->with($this->callback(function (Envelope $envelope) {
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
return $lastRedeliveryStamp instanceof RedeliveryStamp &&
\is_string($lastRedeliveryStamp->getExceptionMessage()) &&
$lastRedeliveryStamp->getFlattenException() instanceof FlattenException;
}))
->willReturn(new Envelope(new \stdClass()));
$retryStrategy = $this->createMock(RetryStrategyInterface::class);
$retryStrategy->expects($this->once())->method('isRetryable')->with($this->isInstanceOf(Envelope::class))->willReturn(true);
$command = new FailedMessagesRetryCommand('failure_receiver', $receiver, $bus, $dispatcher, $retryStrategy);
$tester = new CommandTester($command);
$tester->execute(['id' => [10]]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
}

View File

@ -12,7 +12,6 @@
namespace Symfony\Component\Messenger;
use Psr\Log\LoggerInterface;
use Symfony\Component\Debug\Exception\FlattenException;
use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
@ -154,7 +153,7 @@ class Worker implements WorkerInterface
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($delay))
->with(new RedeliveryStamp($retryCount, $transportName, $throwable->getMessage(), $this->flattenedException($throwable)))
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);
// re-send the message
@ -217,17 +216,4 @@ class Worker implements WorkerInterface
return $retryStrategy->isRetryable($envelope);
}
private function flattenedException(\Throwable $throwable): ?FlattenException
{
if (!class_exists(FlattenException::class)) {
return null;
}
if ($throwable instanceof HandlerFailedException) {
$throwable = $throwable->getNestedExceptions()[0];
}
return FlattenException::createFromThrowable($throwable);
}
}