Merge branch '4.3' into 4.4

This commit is contained in:
Tobias Schultze 2019-10-25 15:41:20 +02:00
commit c950130fc2
18 changed files with 271 additions and 146 deletions

View File

@ -1717,6 +1717,7 @@ class FrameworkExtension extends Extension
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],

View File

@ -48,6 +48,8 @@
<argument type="service" id="validator" />
</service>
<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />
<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />
<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">

View File

@ -740,6 +740,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
@ -749,6 +750,7 @@ abstract class FrameworkExtensionTest extends TestCase
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],

View File

@ -45,7 +45,7 @@
"symfony/http-client": "^4.4|^5.0",
"symfony/lock": "^4.4|^5.0",
"symfony/mailer": "^4.4|^5.0",
"symfony/messenger": "^4.3|^5.0",
"symfony/messenger": "^4.3.6|^5.0",
"symfony/mime": "^4.4|^5.0",
"symfony/process": "^3.4|^4.0|^5.0",
"symfony/security-csrf": "^3.4|^4.0|^5.0",
@ -77,7 +77,7 @@
"symfony/form": "<4.3",
"symfony/lock": "<4.4",
"symfony/mailer": "<4.4",
"symfony/messenger": "<4.3",
"symfony/messenger": "<4.3.6",
"symfony/mime": "<4.4",
"symfony/property-info": "<3.4",
"symfony/security-bundle": "<4.4",

View File

@ -16,11 +16,6 @@ use Symfony\Contracts\HttpClient\Test\HttpClientTestCase as BaseHttpClientTestCa
abstract class HttpClientTestCase extends BaseHttpClientTestCase
{
public function testMaxDuration()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
public function testAcceptHeader()
{
$client = $this->getHttpClient(__FUNCTION__);
@ -80,4 +75,34 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
$response = $client->request('GET', 'http://localhost:8057/404');
$stream = $response->toStream();
}
public function testInfoOnCanceledResponse()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
public function testBufferSink()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
public function testConditionalBuffering()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
public function testReentrantBufferCallback()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
public function testThrowingBufferCallback()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
public function testMaxDuration()
{
$this->markTestSkipped('Implemented as of version 4.4');
}
}

View File

@ -61,8 +61,7 @@ abstract class AbstractFailedMessagesCommand extends Command
/** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
$lastRedeliveryStampWithException = $this->getLastRedeliveryStampWithException($envelope);
$rows = [
['Class', \get_class($envelope->getMessage())],
@ -72,13 +71,13 @@ abstract class AbstractFailedMessagesCommand extends Command
$rows[] = ['Message Id', $id];
}
$flattenException = null === $lastRedeliveryStamp ? null : $lastRedeliveryStamp->getFlattenException();
$flattenException = null === $lastRedeliveryStampWithException ? null : $lastRedeliveryStampWithException->getFlattenException();
if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
['Failed at', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')],
['Error', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage()],
['Failed at', null === $lastRedeliveryStampWithException ? '' : $lastRedeliveryStampWithException->getRedeliveredAt()->format('Y-m-d H:i:s')],
['Error', null === $lastRedeliveryStampWithException ? '' : $lastRedeliveryStampWithException->getExceptionMessage()],
['Error Class', null === $flattenException ? '(unknown)' : $flattenException->getClass()],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
@ -120,4 +119,16 @@ abstract class AbstractFailedMessagesCommand extends Command
{
return $this->receiver;
}
protected function getLastRedeliveryStampWithException(Envelope $envelope): ?RedeliveryStamp
{
/** @var RedeliveryStamp $stamp */
foreach (array_reverse($envelope->all(RedeliveryStamp::class)) as $stamp) {
if (null !== $stamp->getExceptionMessage()) {
return $stamp;
}
}
return null;
}
}

View File

@ -18,7 +18,6 @@ use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
/**
@ -83,14 +82,13 @@ EOF
$rows = [];
foreach ($envelopes as $envelope) {
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
$lastRedeliveryStampWithException = $this->getLastRedeliveryStampWithException($envelope);
$rows[] = [
$this->getMessageId($envelope),
\get_class($envelope->getMessage()),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage(),
null === $lastRedeliveryStampWithException ? '' : $lastRedeliveryStampWithException->getRedeliveredAt()->format('Y-m-d H:i:s'),
null === $lastRedeliveryStampWithException ? '' : $lastRedeliveryStampWithException->getExceptionMessage(),
];
}

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* @author Tobias Schultze <http://tobion.de>
*
* @experimental in 4.3
*/
class RejectRedeliveredMessageException extends RuntimeException
{
}

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
/**
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
*
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
*
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
* limit and potential delay.
*
* @experimental in 4.3
*
* @author Tobias Schultze <http://tobion.de>
*/
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// ignore the dispatched messages for retry
if (null !== $envelope->last(ReceivedStamp::class)) {
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}
}
return $stack->next()->handle($envelope, $stack);
}
}

View File

@ -13,15 +13,10 @@ namespace Symfony\Component\Messenger\Tests\Command;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Debug\Exception\FlattenException as LegacyFlattenException;
use Symfony\Component\ErrorRenderer\Exception\FlattenException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
class FailedMessagesRetryCommandTest extends TestCase
@ -38,52 +33,16 @@ class FailedMessagesRetryCommandTest extends TestCase
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));
$command = new FailedMessagesRetryCommand('failure_receiver', $receiver, $bus, $dispatcher);
$command = new FailedMessagesRetryCommand(
'failure_receiver',
$receiver,
$bus,
$dispatcher
);
$tester = new CommandTester($command);
$tester->execute(['id' => [10, 12]]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
public function testExceptionOnRetry()
{
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(10)->willReturn(new Envelope(new \stdClass()));
// message will eventually be ack'ed in Worker
$receiver->expects($this->once())->method('ack');
$dispatcher = $this->createMock(EventDispatcherInterface::class);
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->at(0))
->method('dispatch')
->with($this->callback(function (Envelope $envelope) {
$lastReceivedStamp = $envelope->last(ReceivedStamp::class);
return $lastReceivedStamp instanceof ReceivedStamp && \is_string($lastReceivedStamp->getTransportName());
}))
->will($this->throwException(new \Exception('Mock test exception')));
$bus->expects($this->at(1))
->method('dispatch')
->with($this->callback(function (Envelope $envelope) {
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
return $lastRedeliveryStamp instanceof RedeliveryStamp &&
\is_string($lastRedeliveryStamp->getExceptionMessage()) &&
($lastRedeliveryStamp->getFlattenException() instanceof FlattenException || $lastRedeliveryStamp->getFlattenException() instanceof LegacyFlattenException);
}))
->willReturn(new Envelope(new \stdClass()));
$retryStrategy = $this->createMock(RetryStrategyInterface::class);
$retryStrategy->expects($this->once())->method('isRetryable')->with($this->isInstanceOf(Envelope::class))->willReturn(true);
$command = new FailedMessagesRetryCommand('failure_receiver', $receiver, $bus, $dispatcher, $retryStrategy);
$tester = new CommandTester($command);
$tester->execute(['id' => [10]]);
$this->assertStringContainsString('[OK]', $tester->getDisplay());
}
}

View File

@ -58,4 +58,40 @@ EOF
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
public function testMultipleRedeliveryFails()
{
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp1 = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
$redeliveryStamp2 = new RedeliveryStamp(0, 'failure_receiver');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp1,
$redeliveryStamp2,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
$command = new FailedMessagesShowCommand(
'failure_receiver',
$receiver
);
$tester = new CommandTester($command);
$tester->execute(['id' => 15]);
$this->assertStringContainsString(sprintf(<<<EOF
------------- ---------------------
Class stdClass
Message Id 15
Failed at %s
Error Things are bad!
Error Class (unknown)
Transport async
EOF
,
$redeliveryStamp2->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
}

View File

@ -44,8 +44,11 @@ class ConnectionTest extends TestCase
$queryBuilder
->method('getParameters')
->willReturn([]);
$queryBuilder
->method('getParameterTypes')
->willReturn([]);
$driverConnection
->method('prepare')
->method('executeQuery')
->willReturn($stmt);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
@ -65,13 +68,17 @@ class ConnectionTest extends TestCase
$queryBuilder
->method('getParameters')
->willReturn([]);
$queryBuilder
->method('getParameterTypes')
->willReturn([]);
$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
$driverConnection->method('prepare')
->willReturn($stmt);
$driverConnection->expects($this->never())
->method('update');
$driverConnection
->method('executeQuery')
->willReturn($stmt);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
@ -273,7 +280,7 @@ class ConnectionTest extends TestCase
->method('getParameters')
->willReturn([]);
$driverConnection
->method('prepare')
->method('executeQuery')
->willReturn($stmt);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
@ -316,8 +323,11 @@ class ConnectionTest extends TestCase
$queryBuilder
->method('getParameters')
->willReturn([]);
$queryBuilder
->method('getParameterTypes')
->willReturn([]);
$driverConnection
->method('prepare')
->method('executeQuery')
->willReturn($stmt);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);

View File

@ -80,25 +80,25 @@ class DoctrineIntegrationTest extends TestCase
'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime(new \DateTime()),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => $this->formatDateTime(new \DateTime()),
]);
// one available later
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi delayed"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 13:00:00')),
]);
// one available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
$encoded = $this->connection->get();
@ -114,33 +114,33 @@ class DoctrineIntegrationTest extends TestCase
'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime(new \DateTime()),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => $this->formatDateTime(new \DateTime()),
]);
// one available later
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi delayed"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime((new \DateTime())->modify('+1 minute')),
]);
// one available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
// another available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
$this->assertSame(2, $this->connection->getMessageCount());
@ -155,16 +155,16 @@ class DoctrineIntegrationTest extends TestCase
'body' => '{"message": "Hi requeued"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => $this->formatDateTime($twoHoursAgo),
]);
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
$next = $this->connection->get();
@ -181,4 +181,9 @@ class DoctrineIntegrationTest extends TestCase
$envelope = $this->connection->get();
$this->assertEquals('the body', $envelope['body']);
}
private function formatDateTime(\DateTime $dateTime)
{
return $dateTime->format($this->driverConnection->getDatabasePlatform()->getDateTimeFormatString());
}
}

View File

@ -57,13 +57,8 @@ class ConnectionTest extends TestCase
public function testFromDsnWithOptions()
{
$this->assertEquals(
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'stream_max_entries' => 20000], [
'host' => 'localhost',
'port' => 6379,
], [
'serializer' => 2,
]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2, 'auto_setup' => false, 'stream_max_entries' => 20000])
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
);
}
@ -99,7 +94,21 @@ class ConnectionTest extends TestCase
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(1))->method('auth')
->with('password');
->with('password')
->willReturn(true);
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
}
public function testFailedAuth()
{
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('Redis connection failed');
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
$redis->expects($this->exactly(1))->method('auth')
->with('password')
->willReturn(false);
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
}

View File

@ -118,8 +118,8 @@ class WorkerTest extends TestCase
}
});
// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
// old message rejected
$this->assertSame(1, $receiver->getRejectCount());
}
public function testUnrecoverableMessageHandlingExceptionPreventsRetries()

View File

@ -13,7 +13,7 @@ namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Driver\ResultStatement;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
@ -123,8 +123,14 @@ class Connection
$body,
json_encode($headers),
$this->configuration['queue_name'],
self::formatDateTime($now),
self::formatDateTime($availableAt),
$now,
$availableAt,
], [
null,
null,
null,
Type::DATETIME,
Type::DATETIME,
]);
return $this->driverConnection->lastInsertId();
@ -142,7 +148,8 @@ class Connection
// use SELECT ... FOR UPDATE to lock table
$doctrineEnvelope = $this->executeQuery(
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
$query->getParameters()
$query->getParameters(),
$query->getParameterTypes()
)->fetch();
if (false === $doctrineEnvelope) {
@ -159,8 +166,10 @@ class Connection
->where('id = ?');
$now = new \DateTime();
$this->executeQuery($queryBuilder->getSQL(), [
self::formatDateTime($now),
$now,
$doctrineEnvelope['id'],
], [
Type::DATETIME,
]);
$this->driverConnection->commit();
@ -227,7 +236,7 @@ class Connection
->select('COUNT(m.id) as message_count')
->setMaxResults(1);
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchColumn();
}
public function findAll(int $limit = null): array
@ -237,7 +246,7 @@ class Connection
$queryBuilder->setMaxResults($limit);
}
$data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll();
$data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchAll();
return array_map(function ($doctrineEnvelope) {
return $this->decodeEnvelopeHeaders($doctrineEnvelope);
@ -266,9 +275,12 @@ class Connection
->andWhere('m.available_at <= ?')
->andWhere('m.queue_name = ?')
->setParameters([
self::formatDateTime($redeliverLimit),
self::formatDateTime($now),
$redeliverLimit,
$now,
$this->configuration['queue_name'],
], [
Type::DATETIME,
Type::DATETIME,
]);
}
@ -279,12 +291,10 @@ class Connection
->from($this->configuration['table_name'], 'm');
}
private function executeQuery(string $sql, array $parameters = []): Statement
private function executeQuery(string $sql, array $parameters = [], array $types = []): ResultStatement
{
$stmt = null;
try {
$stmt = $this->driverConnection->prepare($sql);
$stmt->execute($parameters);
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
} catch (TableNotFoundException $e) {
if ($this->driverConnection->isTransactionActive()) {
throw $e;
@ -294,11 +304,7 @@ class Connection
if ($this->autoSetup) {
$this->setup();
}
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
if (null === $stmt) {
$stmt = $this->driverConnection->prepare($sql);
}
$stmt->execute($parameters);
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
}
return $stmt;
@ -331,11 +337,6 @@ class Connection
return $schema;
}
public static function formatDateTime(\DateTimeInterface $dateTime): string
{
return $dateTime->format('Y-m-d\TH:i:s');
}
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
{
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);

View File

@ -20,6 +20,7 @@ use Symfony\Component\Messenger\Exception\TransportException;
*
* @author Alexander Schranz <alexander@sulu.io>
* @author Antoine Bluchet <soyuka@gmail.com>
* @author Robin Chalas <robin.chalas@gmail.com>
*
* @internal
* @final
@ -53,8 +54,8 @@ class Connection
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
if (isset($connectionCredentials['auth'])) {
$this->connection->auth($connectionCredentials['auth']);
if (isset($connectionCredentials['auth']) && !$this->connection->auth($connectionCredentials['auth'])) {
throw new InvalidArgumentException(sprintf('Redis connection failed: %s', $redis->getLastError()));
}
if (($dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex']) && !$this->connection->select($dbIndex)) {
@ -76,9 +77,9 @@ class Connection
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? null;
$group = $pathParts[2] ?? null;
$consumer = $pathParts[3] ?? null;
$stream = $pathParts[1] ?? $redisOptions['stream'] ?? null;
$group = $pathParts[2] ?? $redisOptions['group'] ?? null;
$consumer = $pathParts[3] ?? $redisOptions['consumer'] ?? null;
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',

View File

@ -12,13 +12,13 @@
namespace Symfony\Component\Messenger;
use Psr\Log\LoggerInterface;
use Symfony\Component\ErrorRenderer\Exception\FlattenException;
use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
@ -134,6 +134,13 @@ class Worker implements WorkerInterface
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
} catch (\Throwable $throwable) {
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
if ($rejectFirst) {
// redelivered messages are rejected first so that continuous failures in an event listener or while
// publishing for retry does not cause infinite redelivery loops
$receiver->reject($envelope);
}
if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}
@ -152,18 +159,18 @@ class Worker implements WorkerInterface
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($delay))
->with(new RedeliveryStamp($retryCount, $transportName, $throwable->getMessage(), $this->flattenedException($throwable)))
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);
// re-send the message
// re-send the message for retry
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}
if (!$rejectFirst) {
$receiver->reject($envelope);
}
@ -215,17 +222,4 @@ class Worker implements WorkerInterface
return $retryStrategy->isRetryable($envelope);
}
private function flattenedException(\Throwable $throwable): ?FlattenException
{
if (!class_exists(FlattenException::class)) {
return null;
}
if ($throwable instanceof HandlerFailedException) {
$throwable = $throwable->getNestedExceptions()[0];
}
return FlattenException::createFromThrowable($throwable);
}
}