Adding global retry support, events & more to messenger transport

Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
This commit is contained in:
Ryan Weaver 2019-03-13 15:48:28 -04:00
parent 59f20ad8fd
commit a989384999
47 changed files with 1601 additions and 227 deletions

View File

@ -1111,6 +1111,20 @@ class Configuration implements ConfigurationInterface
->prototype('variable')
->end()
->end()
->arrayNode('retry_strategy')
->addDefaultsIfNotSet()
->validate()
->ifTrue(function ($v) { return null !== $v['service'] && (isset($v['max_retries']) || isset($v['delay']) || isset($v['multiplier']) || isset($v['max_delay'])); })
->thenInvalid('"service" cannot be used along with the other retry_strategy options.')
->end()
->children()
->scalarNode('service')->defaultNull()->info('Service id to override the retry strategy entirely')->end()
->integerNode('max_retries')->defaultValue(3)->min(0)->end()
->integerNode('delay')->defaultValue(1000)->min(0)->info('Time in ms to delay (or the initial value when multiplier is used)')->end()
->floatNode('multiplier')->defaultValue(2)->min(1)->info('If greater than 1, delay will grow exponentially for each retry: this delay = (delay * (multiple ^ retries))')->end()
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
->end()
->end()
->end()
->end()
->end()

View File

@ -1653,6 +1653,7 @@ class FrameworkExtension extends Extension
}
$senderAliases = [];
$transportRetryReferences = [];
foreach ($config['transports'] as $name => $transport) {
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
@ -1665,6 +1666,21 @@ class FrameworkExtension extends Extension
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
if (null !== $transport['retry_strategy']['service']) {
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
} else {
$retryServiceId = sprintf('messenger.retry.multiplier_retry_strategy.%s', $name);
$retryDefinition = new ChildDefinition('messenger.retry.abstract_multiplier_retry_strategy');
$retryDefinition
->replaceArgument(0, $transport['retry_strategy']['max_retries'])
->replaceArgument(1, $transport['retry_strategy']['delay'])
->replaceArgument(2, $transport['retry_strategy']['multiplier'])
->replaceArgument(3, $transport['retry_strategy']['max_delay']);
$container->setDefinition($retryServiceId, $retryDefinition);
$transportRetryReferences[$name] = new Reference($retryServiceId);
}
}
$messageToSendersMapping = [];
@ -1686,6 +1702,9 @@ class FrameworkExtension extends Extension
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, $messagesToSendAndHandle)
;
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);
}
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

View File

@ -82,8 +82,12 @@
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->
<argument type="collection" /> <!-- Message bus names -->
<argument type="service" id="messenger.retry_strategy_locator" />
<argument type="service" id="event_dispatcher" />
<tag name="console.command" command="messenger:consume" />
<tag name="console.command" command="messenger:consume-messages" />
<tag name="monolog.logger" channel="messenger" />
</service>
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">

View File

@ -64,5 +64,18 @@
<tag name="messenger.transport_factory" />
<argument type="service" id="messenger.transport.serializer" />
</service>
<!-- retry -->
<service id="messenger.retry_strategy_locator">
<tag name="container.service_locator" />
<argument type="collection" />
</service>
<service id="messenger.retry.abstract_multiplier_retry_strategy" class="Symfony\Component\Messenger\Retry\MultiplierRetryStrategy" abstract="true">
<argument /> <!-- max retries -->
<argument /> <!-- delay ms -->
<argument /> <!-- multiplier -->
<argument /> <!-- max delay ms -->
</service>
</services>
</container>

View File

@ -3,7 +3,34 @@ CHANGELOG
4.3.0
-----
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
`ack()` and `reject()`.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
exceptions. They should also not retry (e.g. if there's a queue,
remove from the queue) if there is a problem decoding the message.
* [BC BREAK] `RejectMessageExceptionInterface` was removed and replaced
by `Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException`,
which has the same behavior: a message will not be retried
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
* `Worker` has 4 new option constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange
and delayed queues, instead of retrying via `nack()`
* Senders now receive the `Envelope` with the `SentStamp` on it. Previously,
the `Envelope` was passed to the sender and *then* the `SentStamp`
was added.
* `SerializerInterface` implementations should now throw a
`Symfony\Component\Messenger\Exception\MessageDecodingFailedException`
if `decode()` fails for any reason.
* [BC BREAK] The default `Serializer` will now throw a
`MessageDecodingFailedException` if `decode()` fails, instead
of the underlying exceptions from the Serializer component.
* Added `PhpSerializer` which uses PHP's native `serialize()` and
`unserialize()` to serialize messages to a transport
* [BC BREAK] If no serializer were passed, the default serializer

View File

@ -20,6 +20,7 @@ use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@ -32,21 +33,25 @@ use Symfony\Component\Messenger\Worker;
*/
class ConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume-messages';
protected static $defaultName = 'messenger:consume';
private $busLocator;
private $receiverLocator;
private $logger;
private $receiverNames;
private $busNames;
private $retryStrategyLocator;
private $eventDispatcher;
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
$this->busLocator = $busLocator;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
$this->busNames = $busNames;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->eventDispatcher = $eventDispatcher;
parent::__construct();
}
@ -132,6 +137,12 @@ EOF
*/
protected function execute(InputInterface $input, OutputInterface $output): void
{
if (false !== strpos($input->getFirstArgument(), ':consume-')) {
$message = 'The use of the "messenger:consume-messages" command is deprecated since version 4.3 and will be removed in 5.0. Use "messenger:consume" instead.';
@trigger_error($message, E_USER_DEPRECATED);
$output->writeln(sprintf('<comment>%s</comment>', $message));
}
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}
@ -140,8 +151,13 @@ EOF
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
}
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
}
$receiver = $this->receiverLocator->get($receiverName);
$bus = $this->busLocator->get($busName);
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
@ -174,7 +190,7 @@ EOF
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
$worker = new Worker($receiver, $bus);
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
$worker->run();
}

View File

@ -54,6 +54,18 @@ final class Envelope
return $cloned;
}
/**
* @return Envelope a new Envelope instance without any stamps of the given class
*/
public function withoutAll(string $stampFqcn): self
{
$cloned = clone $this;
unset($cloned->stamps[$stampFqcn]);
return $cloned;
}
public function last(string $stampFqcn): ?StampInterface
{
return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null;

View File

@ -0,0 +1,43 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\Messenger\Envelope;
/**
* @experimental in 4.3
*/
abstract class AbstractWorkerMessageEvent extends Event
{
private $envelope;
private $receiverName;
public function __construct(Envelope $envelope, string $receiverName)
{
$this->envelope = $envelope;
$this->receiverName = $receiverName;
}
public function getEnvelope(): Envelope
{
return $this->envelope;
}
/**
* Returns a unique identifier for transport receiver this message was received from.
*/
public function getReceiverName(): string
{
return $this->receiverName;
}
}

View File

@ -0,0 +1,45 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
use Symfony\Component\Messenger\Envelope;
/**
* Dispatched when a message was received from a transport and handling failed.
*
* The event name is the class name.
*
* @experimental in 4.3
*/
class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
{
private $throwable;
private $willRetry;
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
{
$this->throwable = $error;
$this->willRetry = $willRetry;
parent::__construct($envelope, $receiverName);
}
public function getThrowable(): \Throwable
{
return $this->throwable;
}
public function willRetry(): bool
{
return $this->willRetry;
}
}

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
/**
* Dispatched after a message was received from a transport and successfully handled.
*
* The event name is the class name.
*
* @experimental in 4.3
*/
class WorkerMessageHandledEvent extends AbstractWorkerMessageEvent
{
}

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
/**
* Dispatched when a message was received from a transport but before sent to the bus.
*
* The event name is the class name.
*
* @experimental in 4.3
*/
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
{
}

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* Thrown when a message cannot be decoded in a serializer.
*
* @experimental in 4.3
*/
class MessageDecodingFailedException extends \InvalidArgumentException implements ExceptionInterface
{
}

View File

@ -0,0 +1,26 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Exception;
/**
* Thrown while handling a message to indicate that handling will continue to fail.
*
* If something goes wrong while handling a message that's received from a transport
* and the message should not be retried, a handler can throw this exception.
*
* @author Frederic Bouchery <frederic@bouchery.fr>
*
* @experimental in 4.3
*/
class UnrecoverableMessageHandlingException extends RuntimeException
{
}

View File

@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
@ -54,9 +55,22 @@ class SendMessageMiddleware implements MiddlewareInterface
// it's a received message, do not send it back
$this->logger->info('Received message "{class}"', $context);
} else {
/** @var RedeliveryStamp|null $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
// on redelivery, only deliver to the given sender
if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) {
continue;
}
$this->logger->info('Sending message "{class}" with "{sender}"', $context + ['sender' => \get_class($sender)]);
$envelope = $sender->send($envelope)->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null));
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
}
// on a redelivery, never call local handlers
if (null !== $redeliveryStamp) {
$handle = false;
}
}

View File

@ -0,0 +1,98 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Retry;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
/**
* A retry strategy with a constant or exponential retry delay.
*
* For example, if $delayMilliseconds=10000 & $multiplier=1 (default),
* each retry will wait exactly 10 seconds.
*
* But if $delayMilliseconds=10000 & $multiplier=2:
* * Retry 1: 10 second delay
* * Retry 2: 20 second delay (10000 * 2 = 20000)
* * Retry 3: 40 second delay (20000 * 2 = 40000)
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
* @final
*/
class MultiplierRetryStrategy implements RetryStrategyInterface
{
private $maxRetries;
private $delayMilliseconds;
private $multiplier;
private $maxDelayMilliseconds;
/**
* @param int $maxRetries The maximum number of time to retry (0 means indefinitely)
* @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used)
* @param float $multiplier Multiplier to apply to the delay each time a retry occurs
* @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum)
*/
public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
{
$this->maxRetries = $maxRetries;
if ($delayMilliseconds < 0) {
throw new InvalidArgumentException(sprintf('Delay must be greater than or equal to zero: "%s" passed.', $delayMilliseconds));
}
$this->delayMilliseconds = $delayMilliseconds;
if ($multiplier < 1) {
throw new InvalidArgumentException(sprintf('Multiplier must be greater than zero: "%s" passed.', $multiplier));
}
$this->multiplier = $multiplier;
if ($maxDelayMilliseconds < 0) {
throw new InvalidArgumentException(sprintf('Max delay must be greater than or equal to zero: "%s" passed.', $maxDelayMilliseconds));
}
$this->maxDelayMilliseconds = $maxDelayMilliseconds;
}
public function isRetryable(Envelope $message): bool
{
if (0 === $this->maxRetries) {
return true;
}
$retries = $this->getCurrentRetryCount($message);
return $retries < $this->maxRetries;
}
public function getWaitingTime(Envelope $message): int
{
$retries = $this->getCurrentRetryCount($message);
$delay = $this->delayMilliseconds * pow($this->multiplier, $retries);
if ($delay > $this->maxDelayMilliseconds && 0 !== $this->maxDelayMilliseconds) {
return $this->maxDelayMilliseconds;
}
return $delay;
}
private function getCurrentRetryCount(Envelope $message): int
{
/** @var RedeliveryStamp|null $retryMessageStamp */
$retryMessageStamp = $message->last(RedeliveryStamp::class);
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
}
}

View File

@ -0,0 +1,31 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Retry;
use Symfony\Component\Messenger\Envelope;
/**
* @author Fabien Potencier <fabien@symfony.com>
* @author Grégoire Pineau <lyrixx@lyrixx.info>
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
interface RetryStrategyInterface
{
public function isRetryable(Envelope $message): bool;
/**
* @return int The time to delay/wait in milliseconds
*/
public function getWaitingTime(Envelope $message): int;
}

View File

@ -0,0 +1,35 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* Apply this stamp to delay delivery of your message on a transport.
*
* @experimental in 4.3
*/
class DelayStamp implements StampInterface
{
private $delay;
/**
* @param int $delay The delay in milliseconds
*/
public function __construct(int $delay)
{
$this->delay = $delay;
}
public function getDelay(): int
{
return $this->delay;
}
}

View File

@ -0,0 +1,60 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* Stamp applied when a messages needs to be redelivered.
*
* @experimental in 4.3
*/
class RedeliveryStamp implements StampInterface
{
private $retryCount;
private $senderClassOrAlias;
/**
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
*/
public function __construct(int $retryCount, string $senderClassOrAlias)
{
$this->retryCount = $retryCount;
$this->senderClassOrAlias = $senderClassOrAlias;
}
public function getRetryCount(): int
{
return $this->retryCount;
}
/**
* Needed for this class to serialize through Symfony's serializer.
*
* @internal
*/
public function getSenderClassOrAlias(): int
{
return $this->retryCount;
}
public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool
{
if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) {
return true;
}
if ($senderClass === $this->senderClassOrAlias) {
return true;
}
return false;
}
}

View File

@ -251,6 +251,7 @@ class MessengerPassTest extends TestCase
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
null,
new Reference('messenger.receiver_locator'),
new Reference('messenger.retry_strategy_locator'),
null,
null,
null,
@ -605,6 +606,14 @@ class DummyReceiver implements ReceiverInterface
public function stop(): void
{
}
public function ack(Envelope $envelope): void
{
}
public function reject(Envelope $envelope): void
{
}
}
class InvalidReceiver

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ValidationStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -34,11 +35,21 @@ class EnvelopeTest extends TestCase
public function testWithReturnsNewInstance()
{
$envelope = new Envelope($dummy = new DummyMessage('dummy'));
$envelope = new Envelope(new DummyMessage('dummy'));
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp()));
}
public function testWithoutAll()
{
$envelope = new Envelope(new DummyMessage('dummy'), new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000));
$envelope = $envelope->withoutAll(ReceivedStamp::class);
$this->assertEmpty($envelope->all(ReceivedStamp::class));
$this->assertCount(1, $envelope->all(DelayStamp::class));
}
public function testLast()
{
$receivedStamp = new ReceivedStamp();

View File

@ -2,11 +2,14 @@
namespace Symfony\Component\Messenger\Tests\Fixtures;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
class CallbackReceiver implements ReceiverInterface
{
private $callable;
private $acknowledgeCount = 0;
private $rejectCount = 0;
public function __construct(callable $callable)
{
@ -22,4 +25,24 @@ class CallbackReceiver implements ReceiverInterface
public function stop(): void
{
}
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
}
public function reject(Envelope $envelope): void
{
++$this->rejectCount;
}
public function getAcknowledgeCount(): int
{
return $this->acknowledgeCount;
}
public function getRejectCount(): int
{
return $this->rejectCount;
}
}

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
@ -32,7 +33,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0));
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
@ -42,6 +43,69 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
}
public function testItSendsTheMessageToMultipleSenders()
{
$envelope = new Envelope(new DummyMessage('Hey'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
]));
$sender->expects($this->once())
->method('send')
->with($this->callback(function (Envelope $envelope) {
/** @var SentStamp|null $lastSentStamp */
$lastSentStamp = $envelope->last(SentStamp::class);
// last SentStamp should be the "foo" alias
return null !== $lastSentStamp && 'foo' === $lastSentStamp->getSenderAlias();
}))
->will($this->returnArgument(0));
$sender2->expects($this->once())
->method('send')
->with($this->callback(function (Envelope $envelope) {
/** @var SentStamp|null $lastSentStamp */
$lastSentStamp = $envelope->last(SentStamp::class);
// last SentStamp should be the "bar" alias
return null !== $lastSentStamp && 'bar' === $lastSentStamp->getSenderAlias();
}))
->will($this->returnArgument(0));
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
/** @var SentStamp[] $sentStamps */
$sentStamps = $envelope->all(SentStamp::class);
$this->assertCount(2, $sentStamps);
}
public function testItSendsToOnlyOneSenderOnRedelivery()
{
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
$middleware = new SendMessageMiddleware(new SendersLocator([
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
], [
// normally, this class sends and handles (but not on retry)
DummyMessage::class => true,
]));
$sender->expects($this->never())
->method('send')
;
$sender2->expects($this->once())
->method('send')
->will($this->returnArgument(0));
$mockStack = $this->getStackMock(false); // false because next should not be called
$envelope = $middleware->handle($envelope, $mockStack);
$this->assertCount(1, $envelope->all(SentStamp::class));
}
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
{
$envelope = new Envelope(new ChildDummyMessage('Hey'));
@ -49,7 +113,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock(false));
}
@ -64,7 +128,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
DummyMessage::class => true,
]));
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -79,7 +143,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
DummyMessage::class => true,
]));
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -94,7 +158,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
DummyMessageInterface::class => true,
]));
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}
@ -109,7 +173,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
'*' => true,
]));
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
$middleware->handle($envelope, $this->getStackMock());
}

View File

@ -0,0 +1,80 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Retry;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
class MultiplierRetryStrategyTest extends TestCase
{
public function testIsRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(0, 'sender_alias'));
$this->assertTrue($strategy->isRetryable($envelope));
}
public function testIsNotRetryable()
{
$strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(3, 'sender_alias'));
$this->assertFalse($strategy->isRetryable($envelope));
}
public function testIsRetryableWithNoStamp()
{
$strategy = new MultiplierRetryStrategy(3);
$envelope = new Envelope(new \stdClass());
$this->assertTrue($strategy->isRetryable($envelope));
}
/**
* @dataProvider getWaitTimeTests
*/
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
{
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp($previousRetries, 'sender_alias'));
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
}
public function getWaitTimeTests()
{
// delay, multiplier, maxDelay, retries, expectedDelay
yield [1000, 1, 5000, 0, 1000];
yield [1000, 1, 5000, 1, 1000];
yield [1000, 1, 5000, 2, 1000];
yield [1000, 2, 10000, 0, 1000];
yield [1000, 2, 10000, 1, 2000];
yield [1000, 2, 10000, 2, 4000];
yield [1000, 2, 10000, 3, 8000];
yield [1000, 2, 10000, 4, 10000]; // max hit
yield [1000, 2, 0, 4, 16000]; // no max
yield [1000, 3, 10000, 0, 1000];
yield [1000, 3, 10000, 1, 3000];
yield [1000, 3, 10000, 2, 9000];
yield [1000, 1, 500, 0, 500]; // max hit immediately
// never a delay
yield [0, 2, 10000, 0, 0];
yield [0, 2, 10000, 1, 0];
}
}

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Stamp;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
class RedeliveryStampTest extends TestCase
{
public function testShouldRedeliverToSenderWithAlias()
{
$stamp = new RedeliveryStamp(5, 'foo_alias');
$this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias'));
$this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias'));
}
public function testShouldRedeliverToSenderWithoutAlias()
{
$stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1');
$this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null));
$this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null));
}
}

View File

@ -13,15 +13,20 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Process\PhpProcess;
use Symfony\Component\Process\Process;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
/**
@ -40,9 +45,7 @@ class AmqpExtIntegrationTest extends TestCase
public function testItSendsAndReceivesMessages()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
@ -56,7 +59,9 @@ class AmqpExtIntegrationTest extends TestCase
$receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
$this->assertEquals(0 === $receivedMessages ? $first : $second, $envelope);
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
if (2 === ++$receivedMessages) {
$receiver->stop();
@ -64,11 +69,68 @@ class AmqpExtIntegrationTest extends TestCase
});
}
public function testRetryAndDelay()
{
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);
$sender->send($first = new Envelope(new DummyMessage('First')));
$receivedMessages = 0;
$startTime = time();
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
if (null === $envelope) {
// if we have been processing for 4 seconds + have received 2 messages
// then it's safe to say no other messages will be received
if (time() > $startTime + 4 && 2 === $receivedMessages) {
$receiver->stop();
}
return;
}
++$receivedMessages;
// retry the first time
if (1 === $receivedMessages) {
// imitate what Worker does
$envelope = $envelope
->with(new DelayStamp(2000))
->with(new RedeliveryStamp(1, 'not_important'));
$sender->send($envelope);
$receiver->ack($envelope);
return;
}
if (2 === $receivedMessages) {
// should have a 2 second delay
$this->assertGreaterThanOrEqual($startTime + 2, time());
// but only a 2 second delay
$this->assertLessThan($startTime + 4, time());
/** @var RedeliveryStamp|null $retryStamp */
// verify the stamp still exists from the last send
$retryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($retryStamp);
$this->assertSame(1, $retryStamp->getRetryCount());
$receiver->ack($envelope);
return;
}
});
}
public function testItReceivesSignals()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
@ -91,17 +153,20 @@ class AmqpExtIntegrationTest extends TestCase
$signalTime = microtime(true);
$timedOutTime = time() + 10;
// immediately after the process has started "booted", kill it
$process->signal(15);
while ($process->isRunning() && time() < $timedOutTime) {
usleep(100 * 1000); // 100ms
}
// make sure the process exited, after consuming only the 1 message
$this->assertFalse($process->isRunning());
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
$this->assertSame($expectedOutput.<<<'TXT'
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with stamps: [
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
]
Done.
@ -115,9 +180,7 @@ TXT
*/
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
$connection->setup();
@ -149,4 +212,11 @@ TXT
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
}
private function createSerializer(): SerializerInterface
{
return new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
);
}
}

View File

@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
@ -27,22 +26,15 @@ use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
*/
class AmqpReceiverTest extends TestCase
{
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
public function testItSendTheDecodedMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('ack')->with($envelope);
$connection->method('get')->willReturn($amqpEnvelope);
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
@ -51,57 +43,6 @@ class AmqpReceiverTest extends TestCase
});
}
/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException
*/
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('nack')->with($envelope);
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}
/**
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\WillNeverWorkException
*/
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('reject')->with($envelope);
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
});
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
@ -111,19 +52,14 @@ class AmqpReceiverTest extends TestCase
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->ack($envelope);
$receiver->stop();
});
}
@ -137,53 +73,26 @@ class AmqpReceiverTest extends TestCase
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->reject($envelope);
$receiver->stop();
});
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
private function createAMQPEnvelope()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
return $envelope;
}
}
class InterruptException extends \Exception
{
}
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
{
}

View File

@ -255,6 +255,86 @@ class ConnectionTest extends TestCase
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
$connection->publish('body', $headers);
}
public function testItDelaysTheMessage()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));
$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-dead-letter-exchange' => 'messages',
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_5000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));
$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');
$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
]);
$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000);
}
}
class TestAmqpFactory extends AmqpFactory

View File

@ -14,20 +14,23 @@ require_once $autoload;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
);
$connection = Connection::fromDsn(getenv('DSN'));
$receiver = new AmqpReceiver($connection, $serializer);
$retryStrategy = new MultiplierRetryStrategy(3, 0);
$worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($envelope): Envelope
@ -40,7 +43,7 @@ $worker = new Worker($receiver, new class() implements MessageBusInterface {
return $envelope;
}
});
}, 'the_receiver', $retryStrategy);
echo "Receiving messages...\n";
$worker->run();

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@ -26,4 +27,28 @@ class PhpSerializerTest extends TestCase
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
}
public function testDecodingFailsWithBadFormat()
{
$this->expectException(MessageDecodingFailedException::class);
$this->expectExceptionMessageRegExp('/Could not decode/');
$serializer = new PhpSerializer();
$serializer->decode([
'body' => '{"message": "bar"}',
]);
}
public function testDecodingFailsWithBadClass()
{
$this->expectException(MessageDecodingFailedException::class);
$this->expectExceptionMessageRegExp('/class "ReceivedSt0mp" not found/');
$serializer = new PhpSerializer();
$serializer->decode([
'body' => 'O:13:"ReceivedSt0mp":0:{}',
]);
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
use Symfony\Component\Messenger\Stamp\ValidationStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
@ -94,4 +95,28 @@ class SerializerTest extends TestCase
$this->assertEquals($serializerStamp, $decoded->last(SerializerStamp::class));
$this->assertEquals($validationStamp, $decoded->last(ValidationStamp::class));
}
public function testDecodingFailsWithBadFormat()
{
$this->expectException(MessageDecodingFailedException::class);
$serializer = new Serializer();
$serializer->decode([
'body' => '{foo',
'headers' => ['type' => 'stdClass'],
]);
}
public function testDecodingFailsWithBadClass()
{
$this->expectException(MessageDecodingFailedException::class);
$serializer = new Serializer();
$serializer->decode([
'body' => '{}',
'headers' => ['type' => 'NonExistentClass'],
]);
}
}

View File

@ -12,9 +12,17 @@
namespace Symfony\Component\Messenger\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Worker;
@ -33,11 +41,13 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(($envelope = new Envelope($apiMessage))->with(new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with(($envelope = new Envelope($ipaMessage))->with(new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
$worker = new Worker($receiver, $bus);
$worker = new Worker($receiver, $bus, 'receiver_id');
$worker->run();
$this->assertSame(2, $receiver->getAcknowledgeCount());
}
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
@ -50,29 +60,79 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$worker = new Worker($receiver, $bus);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
}
public function testWorkerIsThrowingExceptionsBackToGenerators()
public function testDispatchCausesRetry()
{
$receiver = new CallbackReceiver(function ($handler) {
try {
$handler(new Envelope(new DummyMessage('Hello')));
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
});
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
} catch (\InvalidArgumentException $e) {
// This should be called because of the exception sent back to the generator.
$this->assertTrue(true);
}
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
// 2nd call will be the retry
$bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) {
/** @var RedeliveryStamp|null $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($redeliveryStamp);
// retry count now at 1
$this->assertSame(1, $redeliveryStamp->getRetryCount());
$this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias'));
// received stamp is removed
$this->assertNull($envelope->last(ReceivedStamp::class));
return true;
}))->willReturnArgument(0);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
}
public function testDispatchCausesRejectWhenNoRetry()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
$worker = new Worker($receiver, $bus);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$this->assertSame(1, $receiver->getRejectCount());
$this->assertSame(0, $receiver->getAcknowledgeCount());
}
public function testDispatchCausesRejectOnUnrecoverableMessage()
{
$receiver = new CallbackReceiver(function ($handler) {
$handler(new Envelope(new DummyMessage('Hello')));
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->never())->method('isRetryable');
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
$this->assertSame(1, $receiver->getRejectCount());
}
public function testWorkerDoesNotSendNullMessagesToTheBus()
@ -83,8 +143,58 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->never())->method('dispatch');
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$worker = new Worker($receiver, $bus);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
$worker->run();
}
public function testWorkerDispatchesEventsOnSuccess()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
$handler($envelope);
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturn($envelope);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
$worker->run();
}
public function testWorkerDispatchesEventsOnError()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
$handler($envelope);
});
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$exception = new \InvalidArgumentException('Oh no!');
$bus->method('dispatch')->willThrowException($exception);
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
->method('dispatch')
->withConsecutive(
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
);
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
$worker->run();
}
}

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* Stamp applied when a message is received from Amqp.
*
* @experimental in 4.3
*/
class AmqpReceivedStamp implements StampInterface
{
private $amqpEnvelope;
public function __construct(\AMQPEnvelope $amqpEnvelope)
{
$this->amqpEnvelope = $amqpEnvelope;
}
public function getAmqpEnvelope(): \AMQPEnvelope
{
return $this->amqpEnvelope;
}
}

View File

@ -11,8 +11,10 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -42,53 +44,74 @@ class AmqpReceiver implements ReceiverInterface
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
try {
$amqpEnvelope = $this->connection->get();
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $amqpEnvelope) {
$handler(null);
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
continue;
}
try {
$handler($this->serializer->decode([
'body' => $AMQPEnvelope->getBody(),
'headers' => $AMQPEnvelope->getHeaders(),
]));
$envelope = $this->serializer->decode([
'body' => $amqpEnvelope->getBody(),
'headers' => $amqpEnvelope->getHeaders(),
]);
} catch (MessageDecodingFailedException $exception) {
// invalid message of some type
$this->rejectAmqpEnvelope($amqpEnvelope);
$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
try {
$this->connection->reject($AMQPEnvelope);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
throw $e;
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} catch (\Throwable $e) {
try {
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
throw $e;
} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
throw $exception;
}
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
$handler($envelope);
}
}
public function ack(Envelope $envelope): void
{
try {
$this->connection->ack($this->findAmqpEnvelope($envelope));
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function reject(Envelope $envelope): void
{
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
}
public function stop(): void
{
$this->shouldStop = true;
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
{
try {
$this->connection->nack($amqpEnvelope, AMQP_NOPARAM);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
private function findAmqpEnvelope(Envelope $envelope): \AMQPEnvelope
{
/** @var AmqpReceivedStamp|null $amqpReceivedStamp */
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if (null === $amqpReceivedStamp) {
throw new LogicException('No AmqpReceivedStamp found on the Envelope.');
}
return $amqpReceivedStamp->getAmqpEnvelope();
}
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -42,8 +43,15 @@ class AmqpSender implements SenderInterface
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = 0;
if (null !== $delayStamp) {
$delay = $delayStamp->getDelay();
}
try {
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? []);
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

View File

@ -50,6 +50,22 @@ class AmqpTransport implements TransportInterface
($this->receiver ?? $this->getReceiver())->stop();
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/

View File

@ -33,7 +33,7 @@ class Connection
'x-message-ttl',
];
private $connectionCredentials;
private $connectionConfiguration;
private $exchangeConfiguration;
private $queueConfiguration;
private $amqpFactory;
@ -53,9 +53,47 @@ class Connection
*/
private $amqpQueue;
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
/**
* @var \AMQPExchange|null
*/
private $amqpDelayExchange;
/**
* Constructor.
*
* Available options:
*
* * host: Hostname of the AMQP service
* * port: Port of the AMQP service
* * vhost: Virtual Host to use with the AMQP service
* * user: Username to use to connect the the AMQP service
* * password: Password to use the connect to the AMQP service
* * queue:
* * name: Name of the queue
* * routing_key: The routing key (if any) to use to push the messages to
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
* * name: Name of the exchange
* * type: Type of exchange (Default: fanout)
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * auto-setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
{
$this->connectionCredentials = $connectionCredentials;
$this->connectionConfiguration = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%delay%',
],
], $connectionConfiguration);
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
@ -123,20 +161,102 @@ class Connection
}
/**
* @param int $delay The delay in milliseconds
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = []): void
public function publish(string $body, array $headers = [], int $delay = 0): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay);
return;
}
if ($this->shouldSetup()) {
$this->setup();
}
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
$attributes = $this->getAttributes($headers);
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
}
/**
* @throws \AMQPException
*/
private function publishWithDelay(string $body, array $headers = [], int $delay)
{
if ($this->shouldSetup()) {
$this->setupDelay($delay);
}
$routingKey = $this->getRoutingKeyForDelay($delay);
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
$attributes = $this->getAttributes($headers);
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
}
private function setupDelay(int $delay)
{
if (!$this->channel()->isConnected()) {
$this->clear();
}
$exchange = $this->getDelayExchange();
$exchange->declareExchange();
$queue = $this->createDelayQueue($delay);
$queue->declareQueue();
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
}
private function getDelayExchange(): \AMQPExchange
{
if (null === $this->amqpDelayExchange) {
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
$this->amqpDelayExchange->setName($this->connectionConfiguration['delay']['exchange_name']);
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
}
return $this->amqpDelayExchange;
}
/**
* Creates a delay queue that will delay for a certain amount of time.
*
* This works by setting message TTL for the delay and pointing
* the dead letter exchange to the original exchange. The result
* is that after the TTL, the message is sent to the dead-letter-exchange,
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay)
{
$delayConfiguration = $this->connectionConfiguration['delay'];
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%delay%', $delay, $delayConfiguration['queue_name_pattern']));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);
if (isset($this->queueConfiguration['routing_key'])) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
}
return $queue;
}
private function getRoutingKeyForDelay(int $delay): string
{
return str_replace('%delay%', $delay, $this->connectionConfiguration['delay']['routing_key_pattern']);
}
/**
* Waits and gets a message from the configured queue.
*
@ -171,11 +291,6 @@ class Connection
return $this->queue()->ack($message->getDeliveryTag());
}
public function reject(\AMQPEnvelope $message): bool
{
return $this->queue()->reject($message->getDeliveryTag());
}
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
{
return $this->queue()->nack($message->getDeliveryTag(), $flags);
@ -196,8 +311,8 @@ class Connection
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
$connection = $this->amqpFactory->createConnection($this->connectionCredentials);
$connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect';
$connection = $this->amqpFactory->createConnection($this->connectionConfiguration);
$connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect';
try {
$connection->{$connectMethod}();
@ -244,9 +359,9 @@ class Connection
return $this->amqpExchange;
}
public function getConnectionCredentials(): array
public function getConnectionConfiguration(): array
{
return $this->connectionCredentials;
return $this->connectionConfiguration;
}
private function clear(): void
@ -258,14 +373,19 @@ class Connection
private function shouldSetup(): bool
{
if (!\array_key_exists('auto-setup', $this->connectionCredentials)) {
if (!\array_key_exists('auto-setup', $this->connectionConfiguration)) {
return true;
}
if (\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true)) {
if (\in_array($this->connectionConfiguration['auto-setup'], [false, 'false'], true)) {
return false;
}
return true;
}
private function getAttributes(array $headers): array
{
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
}
}

View File

@ -1,27 +0,0 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception;
/**
* If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting
* or re-queuing the message.
*
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will
* be rejected. Otherwise, it will be re-queued.
*
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.2
*/
interface RejectMessageExceptionInterface extends \Throwable
{
}

View File

@ -11,8 +11,12 @@
namespace Symfony\Component\Messenger\Transport\Receiver;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.2
*/
@ -23,6 +27,12 @@ interface ReceiverInterface
*
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
* Note that this envelope can be `null` if the timeout to receive something has expired.
*
* If the received message cannot be decoded, the message should not
* be retried again (e.g. if there's a queue, it should be removed)
* and a MessageDecodingFailedException should be thrown.
*
* @throws TransportException If there is an issue communicating with the transport
*/
public function receive(callable $handler): void;
@ -30,4 +40,18 @@ interface ReceiverInterface
* Stop receiving some messages.
*/
public function stop(): void;
/**
* Acknowledge that the passed message was handled.
*
* @throws TransportException If there is an issue communicating with the transport
*/
public function ack(Envelope $envelope): void;
/**
* Called when handling the message failed and it should not be retried.
*
* @throws TransportException If there is an issue communicating with the transport
*/
public function reject(Envelope $envelope): void;
}

View File

@ -55,4 +55,14 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}

View File

@ -52,4 +52,14 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}

View File

@ -53,4 +53,14 @@ class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
{
$this->decoratedReceiver->stop();
}
public function ack(Envelope $envelope): void
{
$this->decoratedReceiver->ack($envelope);
}
public function reject(Envelope $envelope): void
{
$this->decoratedReceiver->reject($envelope);
}
}

View File

@ -22,6 +22,9 @@ interface SenderInterface
{
/**
* Sends the given envelope.
*
* The sender can read different stamps for transport configuration,
* like delivery delay.
*/
public function send(Envelope $envelope): Envelope;
}

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
/**
* @author Ryan Weaver<ryan@symfonycasts.com>
@ -30,7 +31,7 @@ class PhpSerializer implements SerializerInterface
throw new InvalidArgumentException('Encoded envelope should have at least a "body".');
}
return unserialize($encodedEnvelope['body']);
return $this->safelyUnserialize($encodedEnvelope['body']);
}
/**
@ -42,4 +43,35 @@ class PhpSerializer implements SerializerInterface
'body' => serialize($envelope),
];
}
private function safelyUnserialize($contents)
{
$e = null;
$signalingException = new MessageDecodingFailedException(sprintf('Could not decode message using PHP serialization: %s.', $contents));
$prevUnserializeHandler = ini_set('unserialize_callback_func', self::class.'::handleUnserializeCallback');
$prevErrorHandler = set_error_handler(function ($type, $msg, $file, $line, $context = []) use (&$prevErrorHandler, $signalingException) {
if (__FILE__ === $file) {
throw $signalingException;
}
return $prevErrorHandler ? $prevErrorHandler($type, $msg, $file, $line, $context) : false;
});
try {
$meta = unserialize($contents);
} finally {
restore_error_handler();
ini_set('unserialize_callback_func', $prevUnserializeHandler);
}
return $meta;
}
/**
* @internal
*/
public static function handleUnserializeCallback($class)
{
throw new MessageDecodingFailedException(sprintf('Message class "%s" not found during decoding.', $class));
}
}

View File

@ -14,9 +14,11 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Encoder\XmlEncoder;
use Symfony\Component\Serializer\Exception\UnexpectedValueException;
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
use Symfony\Component\Serializer\Serializer as SymfonySerializer;
@ -75,7 +77,11 @@ class Serializer implements SerializerInterface
$context = end($stamps[SerializerStamp::class])->getContext() + $context;
}
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
try {
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
} catch (UnexpectedValueException $e) {
throw new MessageDecodingFailedException(sprintf('Could not decode message: %s.', $e->getMessage()), $e->getCode(), $e);
}
return new Envelope($message, ...$stamps);
}
@ -107,7 +113,11 @@ class Serializer implements SerializerInterface
continue;
}
$stamps[] = $this->serializer->deserialize($value, substr($name, \strlen(self::STAMP_HEADER_PREFIX)).'[]', $this->format, $this->context);
try {
$stamps[] = $this->serializer->deserialize($value, substr($name, \strlen(self::STAMP_HEADER_PREFIX)).'[]', $this->format, $this->context);
} catch (UnexpectedValueException $e) {
throw new MessageDecodingFailedException(sprintf('Could not decode stamp: %s.', $e->getMessage()), $e->getCode(), $e);
}
}
if ($stamps) {
$stamps = array_merge(...$stamps);

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Serialization;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
@ -29,6 +30,8 @@ interface SerializerInterface
* The most common keys are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
*
* @throws MessageDecodingFailedException
*/
public function decode(array $encodedEnvelope): Envelope;

View File

@ -11,7 +11,19 @@
namespace Symfony\Component\Messenger;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
/**
@ -25,11 +37,24 @@ class Worker
{
private $receiver;
private $bus;
private $receiverName;
private $retryStrategy;
private $eventDispatcher;
private $logger;
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus)
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{
$this->receiver = $receiver;
$this->bus = $bus;
if (null === $receiverName) {
@trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
$receiverName = 'unknown';
}
$this->receiverName = $receiverName;
$this->retryStrategy = $retryStrategy;
$this->eventDispatcher = $eventDispatcher;
$this->logger = $logger;
}
/**
@ -45,10 +70,112 @@ class Worker
$this->receiver->receive(function (?Envelope $envelope) {
if (null === $envelope) {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
return;
}
$this->bus->dispatch($envelope->with(new ReceivedStamp()));
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
$message = $envelope->getMessage();
$context = [
'message' => $message,
'class' => \get_class($message),
];
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
} catch (\Throwable $throwable) {
$shouldRetry = $this->shouldRetry($throwable, $envelope);
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
if ($shouldRetry) {
if (null === $this->retryStrategy) {
// not logically allowed, but check just in case
throw new LogicException('Retrying is not supported without a retry strategy.');
}
$retryCount = $this->getRetryCount($envelope) + 1;
if (null !== $this->logger) {
$this->logger->info('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
}
// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
->withoutAll(ReceivedStamp::class);
// re-send the message
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$this->receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->info('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
}
$this->receiver->reject($envelope);
}
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
return;
}
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
if (null !== $this->logger) {
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
}
$this->receiver->ack($envelope);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
});
}
private function dispatchEvent(Event $event)
{
if (null === $this->eventDispatcher) {
return;
}
$this->eventDispatcher->dispatch($event);
}
private function shouldRetry(\Throwable $e, Envelope $envelope): bool
{
if ($e instanceof UnrecoverableMessageHandlingException) {
return false;
}
if (null === $this->retryStrategy) {
return false;
}
return $this->retryStrategy->isRetryable($envelope);
}
private function getRetryCount(Envelope $envelope): int
{
/** @var RedeliveryStamp|null $retryMessageStamp */
$retryMessageStamp = $envelope->last(RedeliveryStamp::class);
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
}
private function getSenderAlias(Envelope $envelope): ?string
{
/** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);
return $sentStamp ? $sentStamp->getSenderAlias() : null;
}
}

View File

@ -22,6 +22,7 @@
"require-dev": {
"symfony/console": "~3.4|~4.0",
"symfony/dependency-injection": "~3.4.19|^4.1.8",
"symfony/event-dispatcher": "~4.3",
"symfony/http-kernel": "~3.4|~4.0",
"symfony/process": "~3.4|~4.0",
"symfony/property-access": "~3.4|~4.0",
@ -30,6 +31,9 @@
"symfony/validator": "~3.4|~4.0",
"symfony/var-dumper": "~3.4|~4.0"
},
"conflict": {
"symfony/event-dispatcher": "<4.3"
},
"suggest": {
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."
},