diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index 04f2bdbe16..4355e632f3 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -1111,6 +1111,20 @@ class Configuration implements ConfigurationInterface ->prototype('variable') ->end() ->end() + ->arrayNode('retry_strategy') + ->addDefaultsIfNotSet() + ->validate() + ->ifTrue(function ($v) { return null !== $v['service'] && (isset($v['max_retries']) || isset($v['delay']) || isset($v['multiplier']) || isset($v['max_delay'])); }) + ->thenInvalid('"service" cannot be used along with the other retry_strategy options.') + ->end() + ->children() + ->scalarNode('service')->defaultNull()->info('Service id to override the retry strategy entirely')->end() + ->integerNode('max_retries')->defaultValue(3)->min(0)->end() + ->integerNode('delay')->defaultValue(1000)->min(0)->info('Time in ms to delay (or the initial value when multiplier is used)')->end() + ->floatNode('multiplier')->defaultValue(2)->min(1)->info('If greater than 1, delay will grow exponentially for each retry: this delay = (delay * (multiple ^ retries))')->end() + ->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end() + ->end() + ->end() ->end() ->end() ->end() diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index d2d0181f8c..0e12127272 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1653,6 +1653,7 @@ class FrameworkExtension extends Extension } $senderAliases = []; + $transportRetryReferences = []; foreach ($config['transports'] as $name => $transport) { if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) { throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".'); @@ -1665,6 +1666,21 @@ class FrameworkExtension extends Extension ; $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); $senderAliases[$name] = $transportId; + + if (null !== $transport['retry_strategy']['service']) { + $transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']); + } else { + $retryServiceId = sprintf('messenger.retry.multiplier_retry_strategy.%s', $name); + $retryDefinition = new ChildDefinition('messenger.retry.abstract_multiplier_retry_strategy'); + $retryDefinition + ->replaceArgument(0, $transport['retry_strategy']['max_retries']) + ->replaceArgument(1, $transport['retry_strategy']['delay']) + ->replaceArgument(2, $transport['retry_strategy']['multiplier']) + ->replaceArgument(3, $transport['retry_strategy']['max_delay']); + $container->setDefinition($retryServiceId, $retryDefinition); + + $transportRetryReferences[$name] = new Reference($retryServiceId); + } } $messageToSendersMapping = []; @@ -1686,6 +1702,9 @@ class FrameworkExtension extends Extension ->replaceArgument(0, $messageToSendersMapping) ->replaceArgument(1, $messagesToSendAndHandle) ; + + $container->getDefinition('messenger.retry_strategy_locator') + ->replaceArgument(0, $transportRetryReferences); } private function registerCacheConfiguration(array $config, ContainerBuilder $container) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 9ce2c77373..eeb36961f7 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -82,8 +82,12 @@ + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index d50d13bde2..3d471c9338 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -64,5 +64,18 @@ + + + + + + + + + + + + + diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 227cddb5ee..40f8c1dc04 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -3,7 +3,34 @@ CHANGELOG 4.3.0 ----- - + + * [BC BREAK] 2 new methods were added to `ReceiverInterface`: + `ack()` and `reject()`. + * [BC BREAK] Error handling was moved from the receivers into + `Worker`. Implementations of `ReceiverInterface::handle()` + should now allow all exceptions to be thrown, except for transport + exceptions. They should also not retry (e.g. if there's a queue, + remove from the queue) if there is a problem decoding the message. + * [BC BREAK] `RejectMessageExceptionInterface` was removed and replaced + by `Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException`, + which has the same behavior: a message will not be retried + * The default command name for `ConsumeMessagesCommand` was + changed from `messenger:consume-messages` to `messenger:consume` + * `ConsumeMessagesCommand` has two new optional constructor arguments + * `Worker` has 4 new option constructor arguments. + * The `Worker` class now handles calling `pcntl_signal_dispatch()` the + receiver no longer needs to call this. + * The `AmqpSender` will now retry messages using a dead-letter exchange + and delayed queues, instead of retrying via `nack()` + * Senders now receive the `Envelope` with the `SentStamp` on it. Previously, + the `Envelope` was passed to the sender and *then* the `SentStamp` + was added. + * `SerializerInterface` implementations should now throw a + `Symfony\Component\Messenger\Exception\MessageDecodingFailedException` + if `decode()` fails for any reason. + * [BC BREAK] The default `Serializer` will now throw a + `MessageDecodingFailedException` if `decode()` fails, instead + of the underlying exceptions from the Serializer component. * Added `PhpSerializer` which uses PHP's native `serialize()` and `unserialize()` to serialize messages to a transport * [BC BREAK] If no serializer were passed, the default serializer diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 5f854f257f..a973a2337d 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -20,6 +20,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; @@ -32,21 +33,25 @@ use Symfony\Component\Messenger\Worker; */ class ConsumeMessagesCommand extends Command { - protected static $defaultName = 'messenger:consume-messages'; + protected static $defaultName = 'messenger:consume'; private $busLocator; private $receiverLocator; private $logger; private $receiverNames; private $busNames; + private $retryStrategyLocator; + private $eventDispatcher; - public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = []) + public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) { $this->busLocator = $busLocator; $this->receiverLocator = $receiverLocator; $this->logger = $logger; $this->receiverNames = $receiverNames; $this->busNames = $busNames; + $this->retryStrategyLocator = $retryStrategyLocator; + $this->eventDispatcher = $eventDispatcher; parent::__construct(); } @@ -132,6 +137,12 @@ EOF */ protected function execute(InputInterface $input, OutputInterface $output): void { + if (false !== strpos($input->getFirstArgument(), ':consume-')) { + $message = 'The use of the "messenger:consume-messages" command is deprecated since version 4.3 and will be removed in 5.0. Use "messenger:consume" instead.'; + @trigger_error($message, E_USER_DEPRECATED); + $output->writeln(sprintf('%s', $message)); + } + if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); } @@ -140,8 +151,13 @@ EOF throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName)); } + if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) { + throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName)); + } + $receiver = $this->receiverLocator->get($receiverName); $bus = $this->busLocator->get($busName); + $retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null; $stopsWhen = []; if ($limit = $input->getOption('limit')) { @@ -174,7 +190,7 @@ EOF $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); } - $worker = new Worker($receiver, $bus); + $worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger); $worker->run(); } diff --git a/src/Symfony/Component/Messenger/Envelope.php b/src/Symfony/Component/Messenger/Envelope.php index 02d0dca123..2e70480886 100644 --- a/src/Symfony/Component/Messenger/Envelope.php +++ b/src/Symfony/Component/Messenger/Envelope.php @@ -54,6 +54,18 @@ final class Envelope return $cloned; } + /** + * @return Envelope a new Envelope instance without any stamps of the given class + */ + public function withoutAll(string $stampFqcn): self + { + $cloned = clone $this; + + unset($cloned->stamps[$stampFqcn]); + + return $cloned; + } + public function last(string $stampFqcn): ?StampInterface { return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null; diff --git a/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php b/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php new file mode 100644 index 0000000000..0f2b644139 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/AbstractWorkerMessageEvent.php @@ -0,0 +1,43 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\EventDispatcher\Event; +use Symfony\Component\Messenger\Envelope; + +/** + * @experimental in 4.3 + */ +abstract class AbstractWorkerMessageEvent extends Event +{ + private $envelope; + private $receiverName; + + public function __construct(Envelope $envelope, string $receiverName) + { + $this->envelope = $envelope; + $this->receiverName = $receiverName; + } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } + + /** + * Returns a unique identifier for transport receiver this message was received from. + */ + public function getReceiverName(): string + { + return $this->receiverName; + } +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php new file mode 100644 index 0000000000..b3118633a3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php @@ -0,0 +1,45 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +use Symfony\Component\Messenger\Envelope; + +/** + * Dispatched when a message was received from a transport and handling failed. + * + * The event name is the class name. + * + * @experimental in 4.3 + */ +class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent +{ + private $throwable; + private $willRetry; + + public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry) + { + $this->throwable = $error; + $this->willRetry = $willRetry; + + parent::__construct($envelope, $receiverName); + } + + public function getThrowable(): \Throwable + { + return $this->throwable; + } + + public function willRetry(): bool + { + return $this->willRetry; + } +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageHandledEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageHandledEvent.php new file mode 100644 index 0000000000..c911a01288 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageHandledEvent.php @@ -0,0 +1,23 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +/** + * Dispatched after a message was received from a transport and successfully handled. + * + * The event name is the class name. + * + * @experimental in 4.3 + */ +class WorkerMessageHandledEvent extends AbstractWorkerMessageEvent +{ +} diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php new file mode 100644 index 0000000000..3df75b8723 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageReceivedEvent.php @@ -0,0 +1,23 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +/** + * Dispatched when a message was received from a transport but before sent to the bus. + * + * The event name is the class name. + * + * @experimental in 4.3 + */ +class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent +{ +} diff --git a/src/Symfony/Component/Messenger/Exception/MessageDecodingFailedException.php b/src/Symfony/Component/Messenger/Exception/MessageDecodingFailedException.php new file mode 100644 index 0000000000..9e429ecc9b --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/MessageDecodingFailedException.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * Thrown when a message cannot be decoded in a serializer. + * + * @experimental in 4.3 + */ +class MessageDecodingFailedException extends \InvalidArgumentException implements ExceptionInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Exception/UnrecoverableMessageHandlingException.php b/src/Symfony/Component/Messenger/Exception/UnrecoverableMessageHandlingException.php new file mode 100644 index 0000000000..df08e79d8b --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/UnrecoverableMessageHandlingException.php @@ -0,0 +1,26 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * Thrown while handling a message to indicate that handling will continue to fail. + * + * If something goes wrong while handling a message that's received from a transport + * and the message should not be retried, a handler can throw this exception. + * + * @author Frederic Bouchery + * + * @experimental in 4.3 + */ +class UnrecoverableMessageHandlingException extends RuntimeException +{ +} diff --git a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php index 6d6696a472..bc6f71761c 100644 --- a/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php @@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait; use Psr\Log\NullLogger; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface; @@ -54,9 +55,22 @@ class SendMessageMiddleware implements MiddlewareInterface // it's a received message, do not send it back $this->logger->info('Received message "{class}"', $context); } else { + /** @var RedeliveryStamp|null $redeliveryStamp */ + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) { + // on redelivery, only deliver to the given sender + if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) { + continue; + } + $this->logger->info('Sending message "{class}" with "{sender}"', $context + ['sender' => \get_class($sender)]); - $envelope = $sender->send($envelope)->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)); + $envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null))); + } + + // on a redelivery, never call local handlers + if (null !== $redeliveryStamp) { + $handle = false; } } diff --git a/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php b/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php new file mode 100644 index 0000000000..394be2509d --- /dev/null +++ b/src/Symfony/Component/Messenger/Retry/MultiplierRetryStrategy.php @@ -0,0 +1,98 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Retry; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; + +/** + * A retry strategy with a constant or exponential retry delay. + * + * For example, if $delayMilliseconds=10000 & $multiplier=1 (default), + * each retry will wait exactly 10 seconds. + * + * But if $delayMilliseconds=10000 & $multiplier=2: + * * Retry 1: 10 second delay + * * Retry 2: 20 second delay (10000 * 2 = 20000) + * * Retry 3: 40 second delay (20000 * 2 = 40000) + * + * @author Ryan Weaver + * + * @experimental in 4.3 + * @final + */ +class MultiplierRetryStrategy implements RetryStrategyInterface +{ + private $maxRetries; + private $delayMilliseconds; + private $multiplier; + private $maxDelayMilliseconds; + + /** + * @param int $maxRetries The maximum number of time to retry (0 means indefinitely) + * @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used) + * @param float $multiplier Multiplier to apply to the delay each time a retry occurs + * @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum) + */ + public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0) + { + $this->maxRetries = $maxRetries; + + if ($delayMilliseconds < 0) { + throw new InvalidArgumentException(sprintf('Delay must be greater than or equal to zero: "%s" passed.', $delayMilliseconds)); + } + $this->delayMilliseconds = $delayMilliseconds; + + if ($multiplier < 1) { + throw new InvalidArgumentException(sprintf('Multiplier must be greater than zero: "%s" passed.', $multiplier)); + } + $this->multiplier = $multiplier; + + if ($maxDelayMilliseconds < 0) { + throw new InvalidArgumentException(sprintf('Max delay must be greater than or equal to zero: "%s" passed.', $maxDelayMilliseconds)); + } + $this->maxDelayMilliseconds = $maxDelayMilliseconds; + } + + public function isRetryable(Envelope $message): bool + { + if (0 === $this->maxRetries) { + return true; + } + + $retries = $this->getCurrentRetryCount($message); + + return $retries < $this->maxRetries; + } + + public function getWaitingTime(Envelope $message): int + { + $retries = $this->getCurrentRetryCount($message); + + $delay = $this->delayMilliseconds * pow($this->multiplier, $retries); + + if ($delay > $this->maxDelayMilliseconds && 0 !== $this->maxDelayMilliseconds) { + return $this->maxDelayMilliseconds; + } + + return $delay; + } + + private function getCurrentRetryCount(Envelope $message): int + { + /** @var RedeliveryStamp|null $retryMessageStamp */ + $retryMessageStamp = $message->last(RedeliveryStamp::class); + + return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0; + } +} diff --git a/src/Symfony/Component/Messenger/Retry/RetryStrategyInterface.php b/src/Symfony/Component/Messenger/Retry/RetryStrategyInterface.php new file mode 100644 index 0000000000..474651de02 --- /dev/null +++ b/src/Symfony/Component/Messenger/Retry/RetryStrategyInterface.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Retry; + +use Symfony\Component\Messenger\Envelope; + +/** + * @author Fabien Potencier + * @author Grégoire Pineau + * @author Ryan Weaver + * + * @experimental in 4.3 + */ +interface RetryStrategyInterface +{ + public function isRetryable(Envelope $message): bool; + + /** + * @return int The time to delay/wait in milliseconds + */ + public function getWaitingTime(Envelope $message): int; +} diff --git a/src/Symfony/Component/Messenger/Stamp/DelayStamp.php b/src/Symfony/Component/Messenger/Stamp/DelayStamp.php new file mode 100644 index 0000000000..0fc5597044 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/DelayStamp.php @@ -0,0 +1,35 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Apply this stamp to delay delivery of your message on a transport. + * + * @experimental in 4.3 + */ +class DelayStamp implements StampInterface +{ + private $delay; + + /** + * @param int $delay The delay in milliseconds + */ + public function __construct(int $delay) + { + $this->delay = $delay; + } + + public function getDelay(): int + { + return $this->delay; + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php new file mode 100644 index 0000000000..d5f1259972 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php @@ -0,0 +1,60 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Stamp applied when a messages needs to be redelivered. + * + * @experimental in 4.3 + */ +class RedeliveryStamp implements StampInterface +{ + private $retryCount; + private $senderClassOrAlias; + + /** + * @param string $senderClassOrAlias Alias from SendersLocator or just the class name + */ + public function __construct(int $retryCount, string $senderClassOrAlias) + { + $this->retryCount = $retryCount; + $this->senderClassOrAlias = $senderClassOrAlias; + } + + public function getRetryCount(): int + { + return $this->retryCount; + } + + /** + * Needed for this class to serialize through Symfony's serializer. + * + * @internal + */ + public function getSenderClassOrAlias(): int + { + return $this->retryCount; + } + + public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool + { + if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) { + return true; + } + + if ($senderClass === $this->senderClassOrAlias) { + return true; + } + + return false; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index 6c97608c9f..24820df0c6 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -251,6 +251,7 @@ class MessengerPassTest extends TestCase $container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([ null, new Reference('messenger.receiver_locator'), + new Reference('messenger.retry_strategy_locator'), null, null, null, @@ -605,6 +606,14 @@ class DummyReceiver implements ReceiverInterface public function stop(): void { } + + public function ack(Envelope $envelope): void + { + } + + public function reject(Envelope $envelope): void + { + } } class InvalidReceiver diff --git a/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php b/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php index 67609d8cd9..04b99d9141 100644 --- a/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php +++ b/src/Symfony/Component/Messenger/Tests/EnvelopeTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ValidationStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -34,11 +35,21 @@ class EnvelopeTest extends TestCase public function testWithReturnsNewInstance() { - $envelope = new Envelope($dummy = new DummyMessage('dummy')); + $envelope = new Envelope(new DummyMessage('dummy')); $this->assertNotSame($envelope, $envelope->with(new ReceivedStamp())); } + public function testWithoutAll() + { + $envelope = new Envelope(new DummyMessage('dummy'), new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000)); + + $envelope = $envelope->withoutAll(ReceivedStamp::class); + + $this->assertEmpty($envelope->all(ReceivedStamp::class)); + $this->assertCount(1, $envelope->all(DelayStamp::class)); + } + public function testLast() { $receivedStamp = new ReceivedStamp(); diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php index 1470de5914..b1d26934d2 100644 --- a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php @@ -2,11 +2,14 @@ namespace Symfony\Component\Messenger\Tests\Fixtures; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; class CallbackReceiver implements ReceiverInterface { private $callable; + private $acknowledgeCount = 0; + private $rejectCount = 0; public function __construct(callable $callable) { @@ -22,4 +25,24 @@ class CallbackReceiver implements ReceiverInterface public function stop(): void { } + + public function ack(Envelope $envelope): void + { + ++$this->acknowledgeCount; + } + + public function reject(Envelope $envelope): void + { + ++$this->rejectCount; + } + + public function getAcknowledgeCount(): int + { + return $this->acknowledgeCount; + } + + public function getRejectCount(): int + { + return $this->rejectCount; + } } diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php index e74b18245e..be43799bf8 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php @@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage; @@ -32,7 +33,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]])); - $sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0)); $envelope = $middleware->handle($envelope, $this->getStackMock(false)); @@ -42,6 +43,69 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase $this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass()); } + public function testItSendsTheMessageToMultipleSenders() + { + $envelope = new Envelope(new DummyMessage('Hey')); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); + + $middleware = new SendMessageMiddleware(new SendersLocator([ + DummyMessage::class => ['foo' => $sender, 'bar' => $sender2], + ])); + + $sender->expects($this->once()) + ->method('send') + ->with($this->callback(function (Envelope $envelope) { + /** @var SentStamp|null $lastSentStamp */ + $lastSentStamp = $envelope->last(SentStamp::class); + + // last SentStamp should be the "foo" alias + return null !== $lastSentStamp && 'foo' === $lastSentStamp->getSenderAlias(); + })) + ->will($this->returnArgument(0)); + $sender2->expects($this->once()) + ->method('send') + ->with($this->callback(function (Envelope $envelope) { + /** @var SentStamp|null $lastSentStamp */ + $lastSentStamp = $envelope->last(SentStamp::class); + + // last SentStamp should be the "bar" alias + return null !== $lastSentStamp && 'bar' === $lastSentStamp->getSenderAlias(); + })) + ->will($this->returnArgument(0)); + + $envelope = $middleware->handle($envelope, $this->getStackMock(false)); + + /** @var SentStamp[] $sentStamps */ + $sentStamps = $envelope->all(SentStamp::class); + $this->assertCount(2, $sentStamps); + } + + public function testItSendsToOnlyOneSenderOnRedelivery() + { + $envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar')); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $sender2 = $this->getMockBuilder(SenderInterface::class)->getMock(); + + $middleware = new SendMessageMiddleware(new SendersLocator([ + DummyMessage::class => ['foo' => $sender, 'bar' => $sender2], + ], [ + // normally, this class sends and handles (but not on retry) + DummyMessage::class => true, + ])); + + $sender->expects($this->never()) + ->method('send') + ; + $sender2->expects($this->once()) + ->method('send') + ->will($this->returnArgument(0)); + + $mockStack = $this->getStackMock(false); // false because next should not be called + $envelope = $middleware->handle($envelope, $mockStack); + $this->assertCount(1, $envelope->all(SentStamp::class)); + } + public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage() { $envelope = new Envelope(new ChildDummyMessage('Hey')); @@ -49,7 +113,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase $middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]])); - $sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock(false)); } @@ -64,7 +128,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase DummyMessage::class => true, ])); - $sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -79,7 +143,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase DummyMessage::class => true, ])); - $sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -94,7 +158,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase DummyMessageInterface::class => true, ])); - $sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } @@ -109,7 +173,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase '*' => true, ])); - $sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope); + $sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope); $middleware->handle($envelope, $this->getStackMock()); } diff --git a/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php new file mode 100644 index 0000000000..6ff4d9af27 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Retry/MultiplierRetryStrategyTest.php @@ -0,0 +1,80 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Retry; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; + +class MultiplierRetryStrategyTest extends TestCase +{ + public function testIsRetryable() + { + $strategy = new MultiplierRetryStrategy(3); + $envelope = new Envelope(new \stdClass(), new RedeliveryStamp(0, 'sender_alias')); + + $this->assertTrue($strategy->isRetryable($envelope)); + } + + public function testIsNotRetryable() + { + $strategy = new MultiplierRetryStrategy(3); + $envelope = new Envelope(new \stdClass(), new RedeliveryStamp(3, 'sender_alias')); + + $this->assertFalse($strategy->isRetryable($envelope)); + } + + public function testIsRetryableWithNoStamp() + { + $strategy = new MultiplierRetryStrategy(3); + $envelope = new Envelope(new \stdClass()); + + $this->assertTrue($strategy->isRetryable($envelope)); + } + + /** + * @dataProvider getWaitTimeTests + */ + public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay) + { + $strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay); + $envelope = new Envelope(new \stdClass(), new RedeliveryStamp($previousRetries, 'sender_alias')); + + $this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope)); + } + + public function getWaitTimeTests() + { + // delay, multiplier, maxDelay, retries, expectedDelay + yield [1000, 1, 5000, 0, 1000]; + yield [1000, 1, 5000, 1, 1000]; + yield [1000, 1, 5000, 2, 1000]; + + yield [1000, 2, 10000, 0, 1000]; + yield [1000, 2, 10000, 1, 2000]; + yield [1000, 2, 10000, 2, 4000]; + yield [1000, 2, 10000, 3, 8000]; + yield [1000, 2, 10000, 4, 10000]; // max hit + yield [1000, 2, 0, 4, 16000]; // no max + + yield [1000, 3, 10000, 0, 1000]; + yield [1000, 3, 10000, 1, 3000]; + yield [1000, 3, 10000, 2, 9000]; + + yield [1000, 1, 500, 0, 500]; // max hit immediately + + // never a delay + yield [0, 2, 10000, 0, 0]; + yield [0, 2, 10000, 1, 0]; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php new file mode 100644 index 0000000000..b6eb477a71 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Stamp/RedeliveryStampTest.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Stamp; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; + +class RedeliveryStampTest extends TestCase +{ + public function testShouldRedeliverToSenderWithAlias() + { + $stamp = new RedeliveryStamp(5, 'foo_alias'); + + $this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias')); + $this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias')); + } + + public function testShouldRedeliverToSenderWithoutAlias() + { + $stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1'); + + $this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null)); + $this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null)); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index e77253bb7e..2949ae837f 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -13,15 +13,20 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Process\PhpProcess; use Symfony\Component\Process\Process; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; /** @@ -40,9 +45,7 @@ class AmqpExtIntegrationTest extends TestCase public function testItSendsAndReceivesMessages() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); + $serializer = $this->createSerializer(); $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN')); $connection->setup(); @@ -56,7 +59,9 @@ class AmqpExtIntegrationTest extends TestCase $receivedMessages = 0; $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { - $this->assertEquals(0 === $receivedMessages ? $first : $second, $envelope); + $expectedEnvelope = 0 === $receivedMessages ? $first : $second; + $this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage()); + $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class)); if (2 === ++$receivedMessages) { $receiver->stop(); @@ -64,11 +69,68 @@ class AmqpExtIntegrationTest extends TestCase }); } + public function testRetryAndDelay() + { + $serializer = $this->createSerializer(); + + $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN')); + $connection->setup(); + $connection->queue()->purge(); + + $sender = new AmqpSender($connection, $serializer); + $receiver = new AmqpReceiver($connection, $serializer); + + $sender->send($first = new Envelope(new DummyMessage('First'))); + + $receivedMessages = 0; + $startTime = time(); + $receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) { + if (null === $envelope) { + // if we have been processing for 4 seconds + have received 2 messages + // then it's safe to say no other messages will be received + if (time() > $startTime + 4 && 2 === $receivedMessages) { + $receiver->stop(); + } + + return; + } + + ++$receivedMessages; + + // retry the first time + if (1 === $receivedMessages) { + // imitate what Worker does + $envelope = $envelope + ->with(new DelayStamp(2000)) + ->with(new RedeliveryStamp(1, 'not_important')); + $sender->send($envelope); + $receiver->ack($envelope); + + return; + } + + if (2 === $receivedMessages) { + // should have a 2 second delay + $this->assertGreaterThanOrEqual($startTime + 2, time()); + // but only a 2 second delay + $this->assertLessThan($startTime + 4, time()); + + /** @var RedeliveryStamp|null $retryStamp */ + // verify the stamp still exists from the last send + $retryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertNotNull($retryStamp); + $this->assertSame(1, $retryStamp->getRetryCount()); + + $receiver->ack($envelope); + + return; + } + }); + } + public function testItReceivesSignals() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); + $serializer = $this->createSerializer(); $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN')); $connection->setup(); @@ -91,17 +153,20 @@ class AmqpExtIntegrationTest extends TestCase $signalTime = microtime(true); $timedOutTime = time() + 10; + // immediately after the process has started "booted", kill it $process->signal(15); while ($process->isRunning() && time() < $timedOutTime) { usleep(100 * 1000); // 100ms } + // make sure the process exited, after consuming only the 1 message $this->assertFalse($process->isRunning()); $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); $this->assertSame($expectedOutput.<<<'TXT' Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage with stamps: [ + "Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp", "Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp" ] Done. @@ -115,9 +180,7 @@ TXT */ public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); + $serializer = $this->createSerializer(); $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']); $connection->setup(); @@ -149,4 +212,11 @@ TXT throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.'); } + + private function createSerializer(): SerializerInterface + { + return new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()]) + ); + } } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 8e224e0653..d0c8abfa35 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; -use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; @@ -27,22 +26,15 @@ use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; */ class AmqpReceiverTest extends TestCase { - public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt() + public function testItSendTheDecodedMessageToTheHandler() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); - $envelope->method('getBody')->willReturn('{"message": "Hi"}'); - $envelope->method('getHeaders')->willReturn([ - 'type' => DummyMessage::class, - ]); - + $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($envelope); - - $connection->expects($this->once())->method('ack')->with($envelope); + $connection->method('get')->willReturn($amqpEnvelope); $receiver = new AmqpReceiver($connection, $serializer); $receiver->receive(function (?Envelope $envelope) use ($receiver) { @@ -51,57 +43,6 @@ class AmqpReceiverTest extends TestCase }); } - /** - * @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException - */ - public function testItNonAcknowledgeTheMessageIfAnExceptionHappened() - { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - - $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); - $envelope->method('getBody')->willReturn('{"message": "Hi"}'); - $envelope->method('getHeaders')->willReturn([ - 'type' => DummyMessage::class, - ]); - - $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($envelope); - - $connection->expects($this->once())->method('nack')->with($envelope); - - $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function () { - throw new InterruptException('Well...'); - }); - } - - /** - * @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\WillNeverWorkException - */ - public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface() - { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - - $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); - $envelope->method('getBody')->willReturn('{"message": "Hi"}'); - $envelope->method('getHeaders')->willReturn([ - 'type' => DummyMessage::class, - ]); - - $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($envelope); - $connection->expects($this->once())->method('reject')->with($envelope); - - $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function () { - throw new WillNeverWorkException('Well...'); - }); - } - /** * @expectedException \Symfony\Component\Messenger\Exception\TransportException */ @@ -111,19 +52,14 @@ class AmqpReceiverTest extends TestCase new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); - $envelope->method('getBody')->willReturn('{"message": "Hi"}'); - $envelope->method('getHeaders')->willReturn([ - 'type' => DummyMessage::class, - ]); - + $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($envelope); - - $connection->method('ack')->with($envelope)->willThrowException(new \AMQPException()); + $connection->method('get')->willReturn($amqpEnvelope); + $connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $receiver->ack($envelope); $receiver->stop(); }); } @@ -137,53 +73,26 @@ class AmqpReceiverTest extends TestCase new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); - $envelope->method('getBody')->willReturn('{"message": "Hi"}'); - $envelope->method('getHeaders')->willReturn([ - 'type' => DummyMessage::class, - ]); - + $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($envelope); - $connection->method('reject')->with($envelope)->willThrowException(new \AMQPException()); + $connection->method('get')->willReturn($amqpEnvelope); + $connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function () { - throw new WillNeverWorkException('Well...'); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $receiver->reject($envelope); + $receiver->stop(); }); } - /** - * @expectedException \Symfony\Component\Messenger\Exception\TransportException - */ - public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage() + private function createAMQPEnvelope() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); $envelope->method('getBody')->willReturn('{"message": "Hi"}'); $envelope->method('getHeaders')->willReturn([ 'type' => DummyMessage::class, ]); - $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); - $connection->method('get')->willReturn($envelope); - - $connection->method('nack')->with($envelope)->willThrowException(new \AMQPException()); - - $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function () { - throw new InterruptException('Well...'); - }); + return $envelope; } } - -class InterruptException extends \Exception -{ -} - -class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface -{ -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 3cea581a8c..73ae25de3c 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -255,6 +255,86 @@ class ConnectionTest extends TestCase $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory); $connection->publish('body', $headers); } + + public function testItDelaysTheMessage() + { + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(); + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(); + $delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(); + + $factory = $this->getMockBuilder(AmqpFactory::class)->getMock(); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->willReturn($delayQueue); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + )); + + $amqpExchange->expects($this->once())->method('setName')->with('messages'); + $amqpExchange->method('getName')->willReturn('messages'); + + $delayExchange->expects($this->once())->method('setName')->with('delay'); + $delayExchange->expects($this->once())->method('declareExchange'); + $delayExchange->method('getName')->willReturn('delay'); + + $delayQueue->expects($this->once())->method('setName')->with('delay_queue_5000'); + $delayQueue->expects($this->once())->method('setArguments')->with([ + 'x-message-ttl' => 5000, + 'x-dead-letter-exchange' => 'messages', + ]); + + $delayQueue->expects($this->once())->method('declareQueue'); + $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_5000'); + + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory); + $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); + } + + public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() + { + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(); + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(); + $delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(); + + $factory = $this->getMockBuilder(AmqpFactory::class)->getMock(); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->willReturn($delayQueue); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + )); + + $amqpExchange->expects($this->once())->method('setName')->with('messages'); + $amqpExchange->method('getName')->willReturn('messages'); + + $delayExchange->expects($this->once())->method('setName')->with('delay'); + $delayExchange->expects($this->once())->method('declareExchange'); + $delayExchange->method('getName')->willReturn('delay'); + + $connectionOptions = [ + 'retry' => [ + 'dead_routing_key' => 'my_dead_routing_key', + ], + ]; + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory); + + $delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000'); + $delayQueue->expects($this->once())->method('setArguments')->with([ + 'x-message-ttl' => 120000, + 'x-dead-letter-exchange' => 'messages', + ]); + + $delayQueue->expects($this->once())->method('declareQueue'); + $delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000'); + + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]); + $connection->publish('{}', [], 120000); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php index a7d4d8dcd7..ba72361035 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php @@ -14,20 +14,23 @@ require_once $autoload; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Worker; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()]) ); $connection = Connection::fromDsn(getenv('DSN')); $receiver = new AmqpReceiver($connection, $serializer); +$retryStrategy = new MultiplierRetryStrategy(3, 0); $worker = new Worker($receiver, new class() implements MessageBusInterface { public function dispatch($envelope): Envelope @@ -40,7 +43,7 @@ $worker = new Worker($receiver, new class() implements MessageBusInterface { return $envelope; } -}); +}, 'the_receiver', $retryStrategy); echo "Receiving messages...\n"; $worker->run(); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/PhpSerializerTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/PhpSerializerTest.php index d3f1da25e4..f9cd817f05 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/PhpSerializerTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/PhpSerializerTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -26,4 +27,28 @@ class PhpSerializerTest extends TestCase $this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope))); } + + public function testDecodingFailsWithBadFormat() + { + $this->expectException(MessageDecodingFailedException::class); + $this->expectExceptionMessageRegExp('/Could not decode/'); + + $serializer = new PhpSerializer(); + + $serializer->decode([ + 'body' => '{"message": "bar"}', + ]); + } + + public function testDecodingFailsWithBadClass() + { + $this->expectException(MessageDecodingFailedException::class); + $this->expectExceptionMessageRegExp('/class "ReceivedSt0mp" not found/'); + + $serializer = new PhpSerializer(); + + $serializer->decode([ + 'body' => 'O:13:"ReceivedSt0mp":0:{}', + ]); + } } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php index d68103f7ca..c46b3df15a 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Stamp\SerializerStamp; use Symfony\Component\Messenger\Stamp\ValidationStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -94,4 +95,28 @@ class SerializerTest extends TestCase $this->assertEquals($serializerStamp, $decoded->last(SerializerStamp::class)); $this->assertEquals($validationStamp, $decoded->last(ValidationStamp::class)); } + + public function testDecodingFailsWithBadFormat() + { + $this->expectException(MessageDecodingFailedException::class); + + $serializer = new Serializer(); + + $serializer->decode([ + 'body' => '{foo', + 'headers' => ['type' => 'stdClass'], + ]); + } + + public function testDecodingFailsWithBadClass() + { + $this->expectException(MessageDecodingFailedException::class); + + $serializer = new Serializer(); + + $serializer->decode([ + 'body' => '{}', + 'headers' => ['type' => 'NonExistentClass'], + ]); + } } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 0f6e208277..cd833b223d 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -12,9 +12,17 @@ namespace Symfony\Component\Messenger\Tests; use PHPUnit\Framework\TestCase; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; +use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Worker; @@ -33,11 +41,13 @@ class WorkerTest extends TestCase $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); - $bus->expects($this->at(0))->method('dispatch')->with(($envelope = new Envelope($apiMessage))->with(new ReceivedStamp()))->willReturn($envelope); - $bus->expects($this->at(1))->method('dispatch')->with(($envelope = new Envelope($ipaMessage))->with(new ReceivedStamp()))->willReturn($envelope); + $bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope); + $bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope); - $worker = new Worker($receiver, $bus); + $worker = new Worker($receiver, $bus, 'receiver_id'); $worker->run(); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); } public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() @@ -50,29 +60,79 @@ class WorkerTest extends TestCase $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope); + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $worker = new Worker($receiver, $bus); + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); $worker->run(); } - public function testWorkerIsThrowingExceptionsBackToGenerators() + public function testDispatchCausesRetry() { $receiver = new CallbackReceiver(function ($handler) { - try { - $handler(new Envelope(new DummyMessage('Hello'))); + $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))); + }); - $this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.'); - } catch (\InvalidArgumentException $e) { - // This should be called because of the exception sent back to the generator. - $this->assertTrue(true); - } + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + $bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); + + // 2nd call will be the retry + $bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) { + /** @var RedeliveryStamp|null $redeliveryStamp */ + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertNotNull($redeliveryStamp); + // retry count now at 1 + $this->assertSame(1, $redeliveryStamp->getRetryCount()); + $this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias')); + + // received stamp is removed + $this->assertNull($envelope->last(ReceivedStamp::class)); + + return true; + }))->willReturnArgument(0); + + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); + $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true); + + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); + $worker->run(); + + // old message acknowledged + $this->assertSame(1, $receiver->getAcknowledgeCount()); + } + + public function testDispatchCausesRejectWhenNoRetry() + { + $receiver = new CallbackReceiver(function ($handler) { + $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))); }); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); - $worker = new Worker($receiver, $bus); + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); + $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false); + + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); $worker->run(); + $this->assertSame(1, $receiver->getRejectCount()); + $this->assertSame(0, $receiver->getAcknowledgeCount()); + } + + public function testDispatchCausesRejectOnUnrecoverableMessage() + { + $receiver = new CallbackReceiver(function ($handler) { + $handler(new Envelope(new DummyMessage('Hello'))); + }); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + $bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work')); + + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); + $retryStrategy->expects($this->never())->method('isRetryable'); + + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); + $worker->run(); + $this->assertSame(1, $receiver->getRejectCount()); } public function testWorkerDoesNotSendNullMessagesToTheBus() @@ -83,8 +143,58 @@ class WorkerTest extends TestCase $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->never())->method('dispatch'); + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $worker = new Worker($receiver, $bus); + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); + $worker->run(); + } + + public function testWorkerDispatchesEventsOnSuccess() + { + $envelope = new Envelope(new DummyMessage('Hello')); + $receiver = new CallbackReceiver(function ($handler) use ($envelope) { + $handler($envelope); + }); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + $bus->method('dispatch')->willReturn($envelope); + + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); + $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock(); + + $eventDispatcher->expects($this->exactly(2)) + ->method('dispatch') + ->withConsecutive( + [$this->isInstanceOf(WorkerMessageReceivedEvent::class)], + [$this->isInstanceOf(WorkerMessageHandledEvent::class)] + ); + + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher); + $worker->run(); + } + + public function testWorkerDispatchesEventsOnError() + { + $envelope = new Envelope(new DummyMessage('Hello')); + $receiver = new CallbackReceiver(function ($handler) use ($envelope) { + $handler($envelope); + }); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + $exception = new \InvalidArgumentException('Oh no!'); + $bus->method('dispatch')->willThrowException($exception); + + $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); + $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock(); + + $eventDispatcher->expects($this->exactly(2)) + ->method('dispatch') + ->withConsecutive( + [$this->isInstanceOf(WorkerMessageReceivedEvent::class)], + [$this->isInstanceOf(WorkerMessageFailedEvent::class)] + ); + + $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher); $worker->run(); } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceivedStamp.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceivedStamp.php new file mode 100644 index 0000000000..9ed3dc30a3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceivedStamp.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\AmqpExt; + +use Symfony\Component\Messenger\Stamp\StampInterface; + +/** + * Stamp applied when a message is received from Amqp. + * + * @experimental in 4.3 + */ +class AmqpReceivedStamp implements StampInterface +{ + private $amqpEnvelope; + + public function __construct(\AMQPEnvelope $amqpEnvelope) + { + $this->amqpEnvelope = $amqpEnvelope; + } + + public function getAmqpEnvelope(): \AMQPEnvelope + { + return $this->amqpEnvelope; + } +} diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index cb7a4db013..93afbaff5d 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -11,8 +11,10 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; -use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -42,53 +44,74 @@ class AmqpReceiver implements ReceiverInterface public function receive(callable $handler): void { while (!$this->shouldStop) { - $AMQPEnvelope = $this->connection->get(); - if (null === $AMQPEnvelope) { + try { + $amqpEnvelope = $this->connection->get(); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + if (null === $amqpEnvelope) { $handler(null); - usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000); - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } + usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000); continue; } try { - $handler($this->serializer->decode([ - 'body' => $AMQPEnvelope->getBody(), - 'headers' => $AMQPEnvelope->getHeaders(), - ])); + $envelope = $this->serializer->decode([ + 'body' => $amqpEnvelope->getBody(), + 'headers' => $amqpEnvelope->getHeaders(), + ]); + } catch (MessageDecodingFailedException $exception) { + // invalid message of some type + $this->rejectAmqpEnvelope($amqpEnvelope); - $this->connection->ack($AMQPEnvelope); - } catch (RejectMessageExceptionInterface $e) { - try { - $this->connection->reject($AMQPEnvelope); - } catch (\AMQPException $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } - - throw $e; - } catch (\AMQPException $e) { - throw new TransportException($e->getMessage(), 0, $e); - } catch (\Throwable $e) { - try { - $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); - } catch (\AMQPException $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } - - throw $e; - } finally { - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } + throw $exception; } + + $envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope)); + $handler($envelope); } } + public function ack(Envelope $envelope): void + { + try { + $this->connection->ack($this->findAmqpEnvelope($envelope)); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + public function reject(Envelope $envelope): void + { + $this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope)); + } + public function stop(): void { $this->shouldStop = true; } + + private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void + { + try { + $this->connection->nack($amqpEnvelope, AMQP_NOPARAM); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + + private function findAmqpEnvelope(Envelope $envelope): \AMQPEnvelope + { + /** @var AmqpReceivedStamp|null $amqpReceivedStamp */ + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); + + if (null === $amqpReceivedStamp) { + throw new LogicException('No AmqpReceivedStamp found on the Envelope.'); + } + + return $amqpReceivedStamp->getAmqpEnvelope(); + } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index daf8199a5d..021a6bfaf8 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -42,8 +43,15 @@ class AmqpSender implements SenderInterface { $encodedMessage = $this->serializer->encode($envelope); + /** @var DelayStamp|null $delayStamp */ + $delayStamp = $envelope->last(DelayStamp::class); + $delay = 0; + if (null !== $delayStamp) { + $delay = $delayStamp->getDelay(); + } + try { - $this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? []); + $this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); } catch (\AMQPException $e) { throw new TransportException($e->getMessage(), 0, $e); } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php index 36967c8459..a98c905966 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -50,6 +50,22 @@ class AmqpTransport implements TransportInterface ($this->receiver ?? $this->getReceiver())->stop(); } + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->ack($envelope); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->reject($envelope); + } + /** * {@inheritdoc} */ diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index f1875a8b49..5d7a3e18e5 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -33,7 +33,7 @@ class Connection 'x-message-ttl', ]; - private $connectionCredentials; + private $connectionConfiguration; private $exchangeConfiguration; private $queueConfiguration; private $amqpFactory; @@ -53,9 +53,47 @@ class Connection */ private $amqpQueue; - public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null) + /** + * @var \AMQPExchange|null + */ + private $amqpDelayExchange; + + /** + * Constructor. + * + * Available options: + * + * * host: Hostname of the AMQP service + * * port: Port of the AMQP service + * * vhost: Virtual Host to use with the AMQP service + * * user: Username to use to connect the the AMQP service + * * password: Password to use the connect to the AMQP service + * * queue: + * * name: Name of the queue + * * routing_key: The routing key (if any) to use to push the messages to + * * flags: Queue flags (Default: AMQP_DURABLE) + * * arguments: Extra arguments + * * exchange: + * * name: Name of the exchange + * * type: Type of exchange (Default: fanout) + * * flags: Exchange flags (Default: AMQP_DURABLE) + * * arguments: Extra arguments + * * delay: + * * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%") + * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%") + * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry") + * * auto-setup: Enable or not the auto-setup of queues and exchanges (Default: true) + * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000) + */ + public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null) { - $this->connectionCredentials = $connectionCredentials; + $this->connectionConfiguration = array_replace_recursive([ + 'delay' => [ + 'routing_key_pattern' => 'delay_%delay%', + 'exchange_name' => 'delay', + 'queue_name_pattern' => 'delay_queue_%delay%', + ], + ], $connectionConfiguration); $this->exchangeConfiguration = $exchangeConfiguration; $this->queueConfiguration = $queueConfiguration; $this->amqpFactory = $amqpFactory ?: new AmqpFactory(); @@ -123,20 +161,102 @@ class Connection } /** + * @param int $delay The delay in milliseconds + * * @throws \AMQPException */ - public function publish(string $body, array $headers = []): void + public function publish(string $body, array $headers = [], int $delay = 0): void { + if (0 !== $delay) { + $this->publishWithDelay($body, $headers, $delay); + + return; + } + if ($this->shouldSetup()) { $this->setup(); } $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; - $attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); + $attributes = $this->getAttributes($headers); $this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes); } + /** + * @throws \AMQPException + */ + private function publishWithDelay(string $body, array $headers = [], int $delay) + { + if ($this->shouldSetup()) { + $this->setupDelay($delay); + } + + $routingKey = $this->getRoutingKeyForDelay($delay); + $flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM; + $attributes = $this->getAttributes($headers); + + $this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes); + } + + private function setupDelay(int $delay) + { + if (!$this->channel()->isConnected()) { + $this->clear(); + } + + $exchange = $this->getDelayExchange(); + $exchange->declareExchange(); + + $queue = $this->createDelayQueue($delay); + $queue->declareQueue(); + $queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay)); + } + + private function getDelayExchange(): \AMQPExchange + { + if (null === $this->amqpDelayExchange) { + $this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel()); + $this->amqpDelayExchange->setName($this->connectionConfiguration['delay']['exchange_name']); + $this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT); + } + + return $this->amqpDelayExchange; + } + + /** + * Creates a delay queue that will delay for a certain amount of time. + * + * This works by setting message TTL for the delay and pointing + * the dead letter exchange to the original exchange. The result + * is that after the TTL, the message is sent to the dead-letter-exchange, + * which is the original exchange, resulting on it being put back into + * the original queue. + */ + private function createDelayQueue(int $delay) + { + $delayConfiguration = $this->connectionConfiguration['delay']; + + $queue = $this->amqpFactory->createQueue($this->channel()); + $queue->setName(str_replace('%delay%', $delay, $delayConfiguration['queue_name_pattern'])); + $queue->setArguments([ + 'x-message-ttl' => $delay, + 'x-dead-letter-exchange' => $this->exchange()->getName(), + ]); + + if (isset($this->queueConfiguration['routing_key'])) { + // after being released from to DLX, this routing key will be used + $queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']); + } + + return $queue; + } + + private function getRoutingKeyForDelay(int $delay): string + { + return str_replace('%delay%', $delay, $this->connectionConfiguration['delay']['routing_key_pattern']); + } + /** * Waits and gets a message from the configured queue. * @@ -171,11 +291,6 @@ class Connection return $this->queue()->ack($message->getDeliveryTag()); } - public function reject(\AMQPEnvelope $message): bool - { - return $this->queue()->reject($message->getDeliveryTag()); - } - public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool { return $this->queue()->nack($message->getDeliveryTag(), $flags); @@ -196,8 +311,8 @@ class Connection public function channel(): \AMQPChannel { if (null === $this->amqpChannel) { - $connection = $this->amqpFactory->createConnection($this->connectionCredentials); - $connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect'; + $connection = $this->amqpFactory->createConnection($this->connectionConfiguration); + $connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect'; try { $connection->{$connectMethod}(); @@ -244,9 +359,9 @@ class Connection return $this->amqpExchange; } - public function getConnectionCredentials(): array + public function getConnectionConfiguration(): array { - return $this->connectionCredentials; + return $this->connectionConfiguration; } private function clear(): void @@ -258,14 +373,19 @@ class Connection private function shouldSetup(): bool { - if (!\array_key_exists('auto-setup', $this->connectionCredentials)) { + if (!\array_key_exists('auto-setup', $this->connectionConfiguration)) { return true; } - if (\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true)) { + if (\in_array($this->connectionConfiguration['auto-setup'], [false, 'false'], true)) { return false; } return true; } + + private function getAttributes(array $headers): array + { + return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]); + } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php deleted file mode 100644 index 9b820a7d8f..0000000000 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Exception/RejectMessageExceptionInterface.php +++ /dev/null @@ -1,27 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception; - -/** - * If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting - * or re-queuing the message. - * - * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will - * be rejected. Otherwise, it will be re-queued. - * - * @author Samuel Roze - * - * @experimental in 4.2 - */ -interface RejectMessageExceptionInterface extends \Throwable -{ -} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php index 6843bb7c34..29da741a11 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php @@ -11,8 +11,12 @@ namespace Symfony\Component\Messenger\Transport\Receiver; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; + /** * @author Samuel Roze + * @author Ryan Weaver * * @experimental in 4.2 */ @@ -23,6 +27,12 @@ interface ReceiverInterface * * The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message. * Note that this envelope can be `null` if the timeout to receive something has expired. + * + * If the received message cannot be decoded, the message should not + * be retried again (e.g. if there's a queue, it should be removed) + * and a MessageDecodingFailedException should be thrown. + * + * @throws TransportException If there is an issue communicating with the transport */ public function receive(callable $handler): void; @@ -30,4 +40,18 @@ interface ReceiverInterface * Stop receiving some messages. */ public function stop(): void; + + /** + * Acknowledge that the passed message was handled. + * + * @throws TransportException If there is an issue communicating with the transport + */ + public function ack(Envelope $envelope): void; + + /** + * Called when handling the message failed and it should not be retried. + * + * @throws TransportException If there is an issue communicating with the transport + */ + public function reject(Envelope $envelope): void; } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php index 41e0df22bc..09af4673b8 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php @@ -55,4 +55,14 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface { $this->decoratedReceiver->stop(); } + + public function ack(Envelope $envelope): void + { + $this->decoratedReceiver->ack($envelope); + } + + public function reject(Envelope $envelope): void + { + $this->decoratedReceiver->reject($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php index 8d36a15f8e..8be38d157e 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php @@ -52,4 +52,14 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface { $this->decoratedReceiver->stop(); } + + public function ack(Envelope $envelope): void + { + $this->decoratedReceiver->ack($envelope); + } + + public function reject(Envelope $envelope): void + { + $this->decoratedReceiver->reject($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php index b9bc24a16c..ade088b7da 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php @@ -53,4 +53,14 @@ class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface { $this->decoratedReceiver->stop(); } + + public function ack(Envelope $envelope): void + { + $this->decoratedReceiver->ack($envelope); + } + + public function reject(Envelope $envelope): void + { + $this->decoratedReceiver->reject($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php b/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php index f6b5edb425..f526a413e5 100644 --- a/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Sender/SenderInterface.php @@ -22,6 +22,9 @@ interface SenderInterface { /** * Sends the given envelope. + * + * The sender can read different stamps for transport configuration, + * like delivery delay. */ public function send(Envelope $envelope): Envelope; } diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/PhpSerializer.php b/src/Symfony/Component/Messenger/Transport/Serialization/PhpSerializer.php index ec6176c85d..da232d947a 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/PhpSerializer.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/PhpSerializer.php @@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\Serialization; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; /** * @author Ryan Weaver @@ -30,7 +31,7 @@ class PhpSerializer implements SerializerInterface throw new InvalidArgumentException('Encoded envelope should have at least a "body".'); } - return unserialize($encodedEnvelope['body']); + return $this->safelyUnserialize($encodedEnvelope['body']); } /** @@ -42,4 +43,35 @@ class PhpSerializer implements SerializerInterface 'body' => serialize($envelope), ]; } + + private function safelyUnserialize($contents) + { + $e = null; + $signalingException = new MessageDecodingFailedException(sprintf('Could not decode message using PHP serialization: %s.', $contents)); + $prevUnserializeHandler = ini_set('unserialize_callback_func', self::class.'::handleUnserializeCallback'); + $prevErrorHandler = set_error_handler(function ($type, $msg, $file, $line, $context = []) use (&$prevErrorHandler, $signalingException) { + if (__FILE__ === $file) { + throw $signalingException; + } + + return $prevErrorHandler ? $prevErrorHandler($type, $msg, $file, $line, $context) : false; + }); + + try { + $meta = unserialize($contents); + } finally { + restore_error_handler(); + ini_set('unserialize_callback_func', $prevUnserializeHandler); + } + + return $meta; + } + + /** + * @internal + */ + public static function handleUnserializeCallback($class) + { + throw new MessageDecodingFailedException(sprintf('Message class "%s" not found during decoding.', $class)); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php index 3f0979bcef..8b64d18d9d 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php @@ -14,9 +14,11 @@ namespace Symfony\Component\Messenger\Transport\Serialization; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Stamp\SerializerStamp; use Symfony\Component\Serializer\Encoder\JsonEncoder; use Symfony\Component\Serializer\Encoder\XmlEncoder; +use Symfony\Component\Serializer\Exception\UnexpectedValueException; use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; use Symfony\Component\Serializer\Serializer as SymfonySerializer; @@ -75,7 +77,11 @@ class Serializer implements SerializerInterface $context = end($stamps[SerializerStamp::class])->getContext() + $context; } - $message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context); + try { + $message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context); + } catch (UnexpectedValueException $e) { + throw new MessageDecodingFailedException(sprintf('Could not decode message: %s.', $e->getMessage()), $e->getCode(), $e); + } return new Envelope($message, ...$stamps); } @@ -107,7 +113,11 @@ class Serializer implements SerializerInterface continue; } - $stamps[] = $this->serializer->deserialize($value, substr($name, \strlen(self::STAMP_HEADER_PREFIX)).'[]', $this->format, $this->context); + try { + $stamps[] = $this->serializer->deserialize($value, substr($name, \strlen(self::STAMP_HEADER_PREFIX)).'[]', $this->format, $this->context); + } catch (UnexpectedValueException $e) { + throw new MessageDecodingFailedException(sprintf('Could not decode stamp: %s.', $e->getMessage()), $e->getCode(), $e); + } } if ($stamps) { $stamps = array_merge(...$stamps); diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/SerializerInterface.php b/src/Symfony/Component/Messenger/Transport/Serialization/SerializerInterface.php index df25a11911..f534659f50 100644 --- a/src/Symfony/Component/Messenger/Transport/Serialization/SerializerInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Serialization/SerializerInterface.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\Serialization; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; /** * @author Samuel Roze @@ -29,6 +30,8 @@ interface SerializerInterface * The most common keys are: * - `body` (string) - the message body * - `headers` (string) - a key/value pair of headers + * + * @throws MessageDecodingFailedException */ public function decode(array $encodedEnvelope): Envelope; diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index ae903217ac..19c5a4b0fa 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -11,7 +11,19 @@ namespace Symfony\Component\Messenger; +use Psr\Log\LoggerInterface; +use Symfony\Component\EventDispatcher\Event; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; +use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; +use Symfony\Component\Messenger\Retry\RetryStrategyInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; /** @@ -25,11 +37,24 @@ class Worker { private $receiver; private $bus; + private $receiverName; + private $retryStrategy; + private $eventDispatcher; + private $logger; - public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus) + public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) { $this->receiver = $receiver; $this->bus = $bus; + if (null === $receiverName) { + @trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED); + + $receiverName = 'unknown'; + } + $this->receiverName = $receiverName; + $this->retryStrategy = $retryStrategy; + $this->eventDispatcher = $eventDispatcher; + $this->logger = $logger; } /** @@ -45,10 +70,112 @@ class Worker $this->receiver->receive(function (?Envelope $envelope) { if (null === $envelope) { + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + return; } - $this->bus->dispatch($envelope->with(new ReceivedStamp())); + $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName)); + + $message = $envelope->getMessage(); + $context = [ + 'message' => $message, + 'class' => \get_class($message), + ]; + + try { + $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp())); + } catch (\Throwable $throwable) { + $shouldRetry = $this->shouldRetry($throwable, $envelope); + + $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry)); + + if ($shouldRetry) { + if (null === $this->retryStrategy) { + // not logically allowed, but check just in case + throw new LogicException('Retrying is not supported without a retry strategy.'); + } + + $retryCount = $this->getRetryCount($envelope) + 1; + if (null !== $this->logger) { + $this->logger->info('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]); + } + + // add the delay and retry stamp info + remove ReceivedStamp + $retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope))) + ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope))) + ->withoutAll(ReceivedStamp::class); + + // re-send the message + $this->bus->dispatch($retryEnvelope); + // acknowledge the previous message has received + $this->receiver->ack($envelope); + } else { + if (null !== $this->logger) { + $this->logger->info('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]); + } + + $this->receiver->reject($envelope); + } + + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + return; + } + + $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName)); + + if (null !== $this->logger) { + $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); + } + + $this->receiver->ack($envelope); + + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } }); } + + private function dispatchEvent(Event $event) + { + if (null === $this->eventDispatcher) { + return; + } + + $this->eventDispatcher->dispatch($event); + } + + private function shouldRetry(\Throwable $e, Envelope $envelope): bool + { + if ($e instanceof UnrecoverableMessageHandlingException) { + return false; + } + + if (null === $this->retryStrategy) { + return false; + } + + return $this->retryStrategy->isRetryable($envelope); + } + + private function getRetryCount(Envelope $envelope): int + { + /** @var RedeliveryStamp|null $retryMessageStamp */ + $retryMessageStamp = $envelope->last(RedeliveryStamp::class); + + return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0; + } + + private function getSenderAlias(Envelope $envelope): ?string + { + /** @var SentStamp|null $sentStamp */ + $sentStamp = $envelope->last(SentStamp::class); + + return $sentStamp ? $sentStamp->getSenderAlias() : null; + } } diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index d0b6fe31f9..d40963e2f6 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -22,6 +22,7 @@ "require-dev": { "symfony/console": "~3.4|~4.0", "symfony/dependency-injection": "~3.4.19|^4.1.8", + "symfony/event-dispatcher": "~4.3", "symfony/http-kernel": "~3.4|~4.0", "symfony/process": "~3.4|~4.0", "symfony/property-access": "~3.4|~4.0", @@ -30,6 +31,9 @@ "symfony/validator": "~3.4|~4.0", "symfony/var-dumper": "~3.4|~4.0" }, + "conflict": { + "symfony/event-dispatcher": "<4.3" + }, "suggest": { "enqueue/messenger-adapter": "For using the php-enqueue library as a transport." },