Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
This commit is contained in:
parent
59f20ad8fd
commit
a989384999
@ -1111,6 +1111,20 @@ class Configuration implements ConfigurationInterface
|
|||||||
->prototype('variable')
|
->prototype('variable')
|
||||||
->end()
|
->end()
|
||||||
->end()
|
->end()
|
||||||
|
->arrayNode('retry_strategy')
|
||||||
|
->addDefaultsIfNotSet()
|
||||||
|
->validate()
|
||||||
|
->ifTrue(function ($v) { return null !== $v['service'] && (isset($v['max_retries']) || isset($v['delay']) || isset($v['multiplier']) || isset($v['max_delay'])); })
|
||||||
|
->thenInvalid('"service" cannot be used along with the other retry_strategy options.')
|
||||||
|
->end()
|
||||||
|
->children()
|
||||||
|
->scalarNode('service')->defaultNull()->info('Service id to override the retry strategy entirely')->end()
|
||||||
|
->integerNode('max_retries')->defaultValue(3)->min(0)->end()
|
||||||
|
->integerNode('delay')->defaultValue(1000)->min(0)->info('Time in ms to delay (or the initial value when multiplier is used)')->end()
|
||||||
|
->floatNode('multiplier')->defaultValue(2)->min(1)->info('If greater than 1, delay will grow exponentially for each retry: this delay = (delay * (multiple ^ retries))')->end()
|
||||||
|
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
|
||||||
|
->end()
|
||||||
|
->end()
|
||||||
->end()
|
->end()
|
||||||
->end()
|
->end()
|
||||||
->end()
|
->end()
|
||||||
|
@ -1653,6 +1653,7 @@ class FrameworkExtension extends Extension
|
|||||||
}
|
}
|
||||||
|
|
||||||
$senderAliases = [];
|
$senderAliases = [];
|
||||||
|
$transportRetryReferences = [];
|
||||||
foreach ($config['transports'] as $name => $transport) {
|
foreach ($config['transports'] as $name => $transport) {
|
||||||
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
|
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
|
||||||
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
|
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
|
||||||
@ -1665,6 +1666,21 @@ class FrameworkExtension extends Extension
|
|||||||
;
|
;
|
||||||
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
|
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
|
||||||
$senderAliases[$name] = $transportId;
|
$senderAliases[$name] = $transportId;
|
||||||
|
|
||||||
|
if (null !== $transport['retry_strategy']['service']) {
|
||||||
|
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
|
||||||
|
} else {
|
||||||
|
$retryServiceId = sprintf('messenger.retry.multiplier_retry_strategy.%s', $name);
|
||||||
|
$retryDefinition = new ChildDefinition('messenger.retry.abstract_multiplier_retry_strategy');
|
||||||
|
$retryDefinition
|
||||||
|
->replaceArgument(0, $transport['retry_strategy']['max_retries'])
|
||||||
|
->replaceArgument(1, $transport['retry_strategy']['delay'])
|
||||||
|
->replaceArgument(2, $transport['retry_strategy']['multiplier'])
|
||||||
|
->replaceArgument(3, $transport['retry_strategy']['max_delay']);
|
||||||
|
$container->setDefinition($retryServiceId, $retryDefinition);
|
||||||
|
|
||||||
|
$transportRetryReferences[$name] = new Reference($retryServiceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
$messageToSendersMapping = [];
|
$messageToSendersMapping = [];
|
||||||
@ -1686,6 +1702,9 @@ class FrameworkExtension extends Extension
|
|||||||
->replaceArgument(0, $messageToSendersMapping)
|
->replaceArgument(0, $messageToSendersMapping)
|
||||||
->replaceArgument(1, $messagesToSendAndHandle)
|
->replaceArgument(1, $messagesToSendAndHandle)
|
||||||
;
|
;
|
||||||
|
|
||||||
|
$container->getDefinition('messenger.retry_strategy_locator')
|
||||||
|
->replaceArgument(0, $transportRetryReferences);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
|
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
|
||||||
|
@ -82,8 +82,12 @@
|
|||||||
<argument type="service" id="logger" on-invalid="null" />
|
<argument type="service" id="logger" on-invalid="null" />
|
||||||
<argument type="collection" /> <!-- Receiver names -->
|
<argument type="collection" /> <!-- Receiver names -->
|
||||||
<argument type="collection" /> <!-- Message bus names -->
|
<argument type="collection" /> <!-- Message bus names -->
|
||||||
|
<argument type="service" id="messenger.retry_strategy_locator" />
|
||||||
|
<argument type="service" id="event_dispatcher" />
|
||||||
|
|
||||||
|
<tag name="console.command" command="messenger:consume" />
|
||||||
<tag name="console.command" command="messenger:consume-messages" />
|
<tag name="console.command" command="messenger:consume-messages" />
|
||||||
|
<tag name="monolog.logger" channel="messenger" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">
|
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">
|
||||||
|
@ -64,5 +64,18 @@
|
|||||||
<tag name="messenger.transport_factory" />
|
<tag name="messenger.transport_factory" />
|
||||||
<argument type="service" id="messenger.transport.serializer" />
|
<argument type="service" id="messenger.transport.serializer" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
<!-- retry -->
|
||||||
|
<service id="messenger.retry_strategy_locator">
|
||||||
|
<tag name="container.service_locator" />
|
||||||
|
<argument type="collection" />
|
||||||
|
</service>
|
||||||
|
|
||||||
|
<service id="messenger.retry.abstract_multiplier_retry_strategy" class="Symfony\Component\Messenger\Retry\MultiplierRetryStrategy" abstract="true">
|
||||||
|
<argument /> <!-- max retries -->
|
||||||
|
<argument /> <!-- delay ms -->
|
||||||
|
<argument /> <!-- multiplier -->
|
||||||
|
<argument /> <!-- max delay ms -->
|
||||||
|
</service>
|
||||||
</services>
|
</services>
|
||||||
</container>
|
</container>
|
||||||
|
@ -3,7 +3,34 @@ CHANGELOG
|
|||||||
|
|
||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
|
||||||
|
`ack()` and `reject()`.
|
||||||
|
* [BC BREAK] Error handling was moved from the receivers into
|
||||||
|
`Worker`. Implementations of `ReceiverInterface::handle()`
|
||||||
|
should now allow all exceptions to be thrown, except for transport
|
||||||
|
exceptions. They should also not retry (e.g. if there's a queue,
|
||||||
|
remove from the queue) if there is a problem decoding the message.
|
||||||
|
* [BC BREAK] `RejectMessageExceptionInterface` was removed and replaced
|
||||||
|
by `Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException`,
|
||||||
|
which has the same behavior: a message will not be retried
|
||||||
|
* The default command name for `ConsumeMessagesCommand` was
|
||||||
|
changed from `messenger:consume-messages` to `messenger:consume`
|
||||||
|
* `ConsumeMessagesCommand` has two new optional constructor arguments
|
||||||
|
* `Worker` has 4 new option constructor arguments.
|
||||||
|
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
|
||||||
|
receiver no longer needs to call this.
|
||||||
|
* The `AmqpSender` will now retry messages using a dead-letter exchange
|
||||||
|
and delayed queues, instead of retrying via `nack()`
|
||||||
|
* Senders now receive the `Envelope` with the `SentStamp` on it. Previously,
|
||||||
|
the `Envelope` was passed to the sender and *then* the `SentStamp`
|
||||||
|
was added.
|
||||||
|
* `SerializerInterface` implementations should now throw a
|
||||||
|
`Symfony\Component\Messenger\Exception\MessageDecodingFailedException`
|
||||||
|
if `decode()` fails for any reason.
|
||||||
|
* [BC BREAK] The default `Serializer` will now throw a
|
||||||
|
`MessageDecodingFailedException` if `decode()` fails, instead
|
||||||
|
of the underlying exceptions from the Serializer component.
|
||||||
* Added `PhpSerializer` which uses PHP's native `serialize()` and
|
* Added `PhpSerializer` which uses PHP's native `serialize()` and
|
||||||
`unserialize()` to serialize messages to a transport
|
`unserialize()` to serialize messages to a transport
|
||||||
* [BC BREAK] If no serializer were passed, the default serializer
|
* [BC BREAK] If no serializer were passed, the default serializer
|
||||||
|
@ -20,6 +20,7 @@ use Symfony\Component\Console\Input\InputInterface;
|
|||||||
use Symfony\Component\Console\Input\InputOption;
|
use Symfony\Component\Console\Input\InputOption;
|
||||||
use Symfony\Component\Console\Output\OutputInterface;
|
use Symfony\Component\Console\Output\OutputInterface;
|
||||||
use Symfony\Component\Console\Style\SymfonyStyle;
|
use Symfony\Component\Console\Style\SymfonyStyle;
|
||||||
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
|
||||||
@ -32,21 +33,25 @@ use Symfony\Component\Messenger\Worker;
|
|||||||
*/
|
*/
|
||||||
class ConsumeMessagesCommand extends Command
|
class ConsumeMessagesCommand extends Command
|
||||||
{
|
{
|
||||||
protected static $defaultName = 'messenger:consume-messages';
|
protected static $defaultName = 'messenger:consume';
|
||||||
|
|
||||||
private $busLocator;
|
private $busLocator;
|
||||||
private $receiverLocator;
|
private $receiverLocator;
|
||||||
private $logger;
|
private $logger;
|
||||||
private $receiverNames;
|
private $receiverNames;
|
||||||
private $busNames;
|
private $busNames;
|
||||||
|
private $retryStrategyLocator;
|
||||||
|
private $eventDispatcher;
|
||||||
|
|
||||||
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
|
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
|
||||||
{
|
{
|
||||||
$this->busLocator = $busLocator;
|
$this->busLocator = $busLocator;
|
||||||
$this->receiverLocator = $receiverLocator;
|
$this->receiverLocator = $receiverLocator;
|
||||||
$this->logger = $logger;
|
$this->logger = $logger;
|
||||||
$this->receiverNames = $receiverNames;
|
$this->receiverNames = $receiverNames;
|
||||||
$this->busNames = $busNames;
|
$this->busNames = $busNames;
|
||||||
|
$this->retryStrategyLocator = $retryStrategyLocator;
|
||||||
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
|
|
||||||
parent::__construct();
|
parent::__construct();
|
||||||
}
|
}
|
||||||
@ -132,6 +137,12 @@ EOF
|
|||||||
*/
|
*/
|
||||||
protected function execute(InputInterface $input, OutputInterface $output): void
|
protected function execute(InputInterface $input, OutputInterface $output): void
|
||||||
{
|
{
|
||||||
|
if (false !== strpos($input->getFirstArgument(), ':consume-')) {
|
||||||
|
$message = 'The use of the "messenger:consume-messages" command is deprecated since version 4.3 and will be removed in 5.0. Use "messenger:consume" instead.';
|
||||||
|
@trigger_error($message, E_USER_DEPRECATED);
|
||||||
|
$output->writeln(sprintf('<comment>%s</comment>', $message));
|
||||||
|
}
|
||||||
|
|
||||||
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
|
||||||
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
|
||||||
}
|
}
|
||||||
@ -140,8 +151,13 @@ EOF
|
|||||||
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
|
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
|
||||||
|
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
|
||||||
|
}
|
||||||
|
|
||||||
$receiver = $this->receiverLocator->get($receiverName);
|
$receiver = $this->receiverLocator->get($receiverName);
|
||||||
$bus = $this->busLocator->get($busName);
|
$bus = $this->busLocator->get($busName);
|
||||||
|
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
|
||||||
|
|
||||||
$stopsWhen = [];
|
$stopsWhen = [];
|
||||||
if ($limit = $input->getOption('limit')) {
|
if ($limit = $input->getOption('limit')) {
|
||||||
@ -174,7 +190,7 @@ EOF
|
|||||||
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
|
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
|
||||||
}
|
}
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,6 +54,18 @@ final class Envelope
|
|||||||
return $cloned;
|
return $cloned;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Envelope a new Envelope instance without any stamps of the given class
|
||||||
|
*/
|
||||||
|
public function withoutAll(string $stampFqcn): self
|
||||||
|
{
|
||||||
|
$cloned = clone $this;
|
||||||
|
|
||||||
|
unset($cloned->stamps[$stampFqcn]);
|
||||||
|
|
||||||
|
return $cloned;
|
||||||
|
}
|
||||||
|
|
||||||
public function last(string $stampFqcn): ?StampInterface
|
public function last(string $stampFqcn): ?StampInterface
|
||||||
{
|
{
|
||||||
return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null;
|
return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null;
|
||||||
|
@ -0,0 +1,43 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Event;
|
||||||
|
|
||||||
|
use Symfony\Component\EventDispatcher\Event;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
abstract class AbstractWorkerMessageEvent extends Event
|
||||||
|
{
|
||||||
|
private $envelope;
|
||||||
|
private $receiverName;
|
||||||
|
|
||||||
|
public function __construct(Envelope $envelope, string $receiverName)
|
||||||
|
{
|
||||||
|
$this->envelope = $envelope;
|
||||||
|
$this->receiverName = $receiverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getEnvelope(): Envelope
|
||||||
|
{
|
||||||
|
return $this->envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a unique identifier for transport receiver this message was received from.
|
||||||
|
*/
|
||||||
|
public function getReceiverName(): string
|
||||||
|
{
|
||||||
|
return $this->receiverName;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,45 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Event;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatched when a message was received from a transport and handling failed.
|
||||||
|
*
|
||||||
|
* The event name is the class name.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
|
||||||
|
{
|
||||||
|
private $throwable;
|
||||||
|
private $willRetry;
|
||||||
|
|
||||||
|
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
|
||||||
|
{
|
||||||
|
$this->throwable = $error;
|
||||||
|
$this->willRetry = $willRetry;
|
||||||
|
|
||||||
|
parent::__construct($envelope, $receiverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getThrowable(): \Throwable
|
||||||
|
{
|
||||||
|
return $this->throwable;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function willRetry(): bool
|
||||||
|
{
|
||||||
|
return $this->willRetry;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,23 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Event;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatched after a message was received from a transport and successfully handled.
|
||||||
|
*
|
||||||
|
* The event name is the class name.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class WorkerMessageHandledEvent extends AbstractWorkerMessageEvent
|
||||||
|
{
|
||||||
|
}
|
@ -0,0 +1,23 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Event;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dispatched when a message was received from a transport but before sent to the bus.
|
||||||
|
*
|
||||||
|
* The event name is the class name.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
|
||||||
|
{
|
||||||
|
}
|
@ -0,0 +1,21 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown when a message cannot be decoded in a serializer.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class MessageDecodingFailedException extends \InvalidArgumentException implements ExceptionInterface
|
||||||
|
{
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown while handling a message to indicate that handling will continue to fail.
|
||||||
|
*
|
||||||
|
* If something goes wrong while handling a message that's received from a transport
|
||||||
|
* and the message should not be retried, a handler can throw this exception.
|
||||||
|
*
|
||||||
|
* @author Frederic Bouchery <frederic@bouchery.fr>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class UnrecoverableMessageHandlingException extends RuntimeException
|
||||||
|
{
|
||||||
|
}
|
@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait;
|
|||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
|
||||||
|
|
||||||
@ -54,9 +55,22 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
// it's a received message, do not send it back
|
// it's a received message, do not send it back
|
||||||
$this->logger->info('Received message "{class}"', $context);
|
$this->logger->info('Received message "{class}"', $context);
|
||||||
} else {
|
} else {
|
||||||
|
/** @var RedeliveryStamp|null $redeliveryStamp */
|
||||||
|
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
|
|
||||||
foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
|
foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
|
||||||
|
// on redelivery, only deliver to the given sender
|
||||||
|
if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
$this->logger->info('Sending message "{class}" with "{sender}"', $context + ['sender' => \get_class($sender)]);
|
$this->logger->info('Sending message "{class}" with "{sender}"', $context + ['sender' => \get_class($sender)]);
|
||||||
$envelope = $sender->send($envelope)->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null));
|
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// on a redelivery, never call local handlers
|
||||||
|
if (null !== $redeliveryStamp) {
|
||||||
|
$handle = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,98 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Retry;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A retry strategy with a constant or exponential retry delay.
|
||||||
|
*
|
||||||
|
* For example, if $delayMilliseconds=10000 & $multiplier=1 (default),
|
||||||
|
* each retry will wait exactly 10 seconds.
|
||||||
|
*
|
||||||
|
* But if $delayMilliseconds=10000 & $multiplier=2:
|
||||||
|
* * Retry 1: 10 second delay
|
||||||
|
* * Retry 2: 20 second delay (10000 * 2 = 20000)
|
||||||
|
* * Retry 3: 40 second delay (20000 * 2 = 40000)
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
* @final
|
||||||
|
*/
|
||||||
|
class MultiplierRetryStrategy implements RetryStrategyInterface
|
||||||
|
{
|
||||||
|
private $maxRetries;
|
||||||
|
private $delayMilliseconds;
|
||||||
|
private $multiplier;
|
||||||
|
private $maxDelayMilliseconds;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param int $maxRetries The maximum number of time to retry (0 means indefinitely)
|
||||||
|
* @param int $delayMilliseconds Amount of time to delay (or the initial value when multiplier is used)
|
||||||
|
* @param float $multiplier Multiplier to apply to the delay each time a retry occurs
|
||||||
|
* @param int $maxDelayMilliseconds Maximum delay to allow (0 means no maximum)
|
||||||
|
*/
|
||||||
|
public function __construct(int $maxRetries = 3, int $delayMilliseconds = 1000, float $multiplier = 1, int $maxDelayMilliseconds = 0)
|
||||||
|
{
|
||||||
|
$this->maxRetries = $maxRetries;
|
||||||
|
|
||||||
|
if ($delayMilliseconds < 0) {
|
||||||
|
throw new InvalidArgumentException(sprintf('Delay must be greater than or equal to zero: "%s" passed.', $delayMilliseconds));
|
||||||
|
}
|
||||||
|
$this->delayMilliseconds = $delayMilliseconds;
|
||||||
|
|
||||||
|
if ($multiplier < 1) {
|
||||||
|
throw new InvalidArgumentException(sprintf('Multiplier must be greater than zero: "%s" passed.', $multiplier));
|
||||||
|
}
|
||||||
|
$this->multiplier = $multiplier;
|
||||||
|
|
||||||
|
if ($maxDelayMilliseconds < 0) {
|
||||||
|
throw new InvalidArgumentException(sprintf('Max delay must be greater than or equal to zero: "%s" passed.', $maxDelayMilliseconds));
|
||||||
|
}
|
||||||
|
$this->maxDelayMilliseconds = $maxDelayMilliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function isRetryable(Envelope $message): bool
|
||||||
|
{
|
||||||
|
if (0 === $this->maxRetries) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
$retries = $this->getCurrentRetryCount($message);
|
||||||
|
|
||||||
|
return $retries < $this->maxRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getWaitingTime(Envelope $message): int
|
||||||
|
{
|
||||||
|
$retries = $this->getCurrentRetryCount($message);
|
||||||
|
|
||||||
|
$delay = $this->delayMilliseconds * pow($this->multiplier, $retries);
|
||||||
|
|
||||||
|
if ($delay > $this->maxDelayMilliseconds && 0 !== $this->maxDelayMilliseconds) {
|
||||||
|
return $this->maxDelayMilliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getCurrentRetryCount(Envelope $message): int
|
||||||
|
{
|
||||||
|
/** @var RedeliveryStamp|null $retryMessageStamp */
|
||||||
|
$retryMessageStamp = $message->last(RedeliveryStamp::class);
|
||||||
|
|
||||||
|
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Retry;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Fabien Potencier <fabien@symfony.com>
|
||||||
|
* @author Grégoire Pineau <lyrixx@lyrixx.info>
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
interface RetryStrategyInterface
|
||||||
|
{
|
||||||
|
public function isRetryable(Envelope $message): bool;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return int The time to delay/wait in milliseconds
|
||||||
|
*/
|
||||||
|
public function getWaitingTime(Envelope $message): int;
|
||||||
|
}
|
35
src/Symfony/Component/Messenger/Stamp/DelayStamp.php
Normal file
35
src/Symfony/Component/Messenger/Stamp/DelayStamp.php
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply this stamp to delay delivery of your message on a transport.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class DelayStamp implements StampInterface
|
||||||
|
{
|
||||||
|
private $delay;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param int $delay The delay in milliseconds
|
||||||
|
*/
|
||||||
|
public function __construct(int $delay)
|
||||||
|
{
|
||||||
|
$this->delay = $delay;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getDelay(): int
|
||||||
|
{
|
||||||
|
return $this->delay;
|
||||||
|
}
|
||||||
|
}
|
60
src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
Normal file
60
src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stamp applied when a messages needs to be redelivered.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class RedeliveryStamp implements StampInterface
|
||||||
|
{
|
||||||
|
private $retryCount;
|
||||||
|
private $senderClassOrAlias;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
|
||||||
|
*/
|
||||||
|
public function __construct(int $retryCount, string $senderClassOrAlias)
|
||||||
|
{
|
||||||
|
$this->retryCount = $retryCount;
|
||||||
|
$this->senderClassOrAlias = $senderClassOrAlias;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getRetryCount(): int
|
||||||
|
{
|
||||||
|
return $this->retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Needed for this class to serialize through Symfony's serializer.
|
||||||
|
*
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
public function getSenderClassOrAlias(): int
|
||||||
|
{
|
||||||
|
return $this->retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function shouldRedeliverToSender(string $senderClass, ?string $senderAlias): bool
|
||||||
|
{
|
||||||
|
if (null !== $senderAlias && $senderAlias === $this->senderClassOrAlias) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($senderClass === $this->senderClassOrAlias) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
@ -251,6 +251,7 @@ class MessengerPassTest extends TestCase
|
|||||||
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
|
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments([
|
||||||
null,
|
null,
|
||||||
new Reference('messenger.receiver_locator'),
|
new Reference('messenger.receiver_locator'),
|
||||||
|
new Reference('messenger.retry_strategy_locator'),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
@ -605,6 +606,14 @@ class DummyReceiver implements ReceiverInterface
|
|||||||
public function stop(): void
|
public function stop(): void
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class InvalidReceiver
|
class InvalidReceiver
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ValidationStamp;
|
use Symfony\Component\Messenger\Stamp\ValidationStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
@ -34,11 +35,21 @@ class EnvelopeTest extends TestCase
|
|||||||
|
|
||||||
public function testWithReturnsNewInstance()
|
public function testWithReturnsNewInstance()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope($dummy = new DummyMessage('dummy'));
|
$envelope = new Envelope(new DummyMessage('dummy'));
|
||||||
|
|
||||||
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp()));
|
$this->assertNotSame($envelope, $envelope->with(new ReceivedStamp()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testWithoutAll()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('dummy'), new ReceivedStamp(), new ReceivedStamp(), new DelayStamp(5000));
|
||||||
|
|
||||||
|
$envelope = $envelope->withoutAll(ReceivedStamp::class);
|
||||||
|
|
||||||
|
$this->assertEmpty($envelope->all(ReceivedStamp::class));
|
||||||
|
$this->assertCount(1, $envelope->all(DelayStamp::class));
|
||||||
|
}
|
||||||
|
|
||||||
public function testLast()
|
public function testLast()
|
||||||
{
|
{
|
||||||
$receivedStamp = new ReceivedStamp();
|
$receivedStamp = new ReceivedStamp();
|
||||||
|
@ -2,11 +2,14 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
namespace Symfony\Component\Messenger\Tests\Fixtures;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
|
|
||||||
class CallbackReceiver implements ReceiverInterface
|
class CallbackReceiver implements ReceiverInterface
|
||||||
{
|
{
|
||||||
private $callable;
|
private $callable;
|
||||||
|
private $acknowledgeCount = 0;
|
||||||
|
private $rejectCount = 0;
|
||||||
|
|
||||||
public function __construct(callable $callable)
|
public function __construct(callable $callable)
|
||||||
{
|
{
|
||||||
@ -22,4 +25,24 @@ class CallbackReceiver implements ReceiverInterface
|
|||||||
public function stop(): void
|
public function stop(): void
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
++$this->acknowledgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
++$this->rejectCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getAcknowledgeCount(): int
|
||||||
|
{
|
||||||
|
return $this->acknowledgeCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getRejectCount(): int
|
||||||
|
{
|
||||||
|
return $this->rejectCount;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\ChildDummyMessage;
|
||||||
@ -32,7 +33,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
|
|
||||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->will($this->returnArgument(0));
|
||||||
|
|
||||||
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
|
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
|
||||||
|
|
||||||
@ -42,6 +43,69 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
|
$this->assertStringMatchesFormat('Mock_SenderInterface_%s', $stamp->getSenderClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItSendsTheMessageToMultipleSenders()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('Hey'));
|
||||||
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
|
||||||
|
$middleware = new SendMessageMiddleware(new SendersLocator([
|
||||||
|
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
|
||||||
|
]));
|
||||||
|
|
||||||
|
$sender->expects($this->once())
|
||||||
|
->method('send')
|
||||||
|
->with($this->callback(function (Envelope $envelope) {
|
||||||
|
/** @var SentStamp|null $lastSentStamp */
|
||||||
|
$lastSentStamp = $envelope->last(SentStamp::class);
|
||||||
|
|
||||||
|
// last SentStamp should be the "foo" alias
|
||||||
|
return null !== $lastSentStamp && 'foo' === $lastSentStamp->getSenderAlias();
|
||||||
|
}))
|
||||||
|
->will($this->returnArgument(0));
|
||||||
|
$sender2->expects($this->once())
|
||||||
|
->method('send')
|
||||||
|
->with($this->callback(function (Envelope $envelope) {
|
||||||
|
/** @var SentStamp|null $lastSentStamp */
|
||||||
|
$lastSentStamp = $envelope->last(SentStamp::class);
|
||||||
|
|
||||||
|
// last SentStamp should be the "bar" alias
|
||||||
|
return null !== $lastSentStamp && 'bar' === $lastSentStamp->getSenderAlias();
|
||||||
|
}))
|
||||||
|
->will($this->returnArgument(0));
|
||||||
|
|
||||||
|
$envelope = $middleware->handle($envelope, $this->getStackMock(false));
|
||||||
|
|
||||||
|
/** @var SentStamp[] $sentStamps */
|
||||||
|
$sentStamps = $envelope->all(SentStamp::class);
|
||||||
|
$this->assertCount(2, $sentStamps);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItSendsToOnlyOneSenderOnRedelivery()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
|
||||||
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
|
||||||
|
$middleware = new SendMessageMiddleware(new SendersLocator([
|
||||||
|
DummyMessage::class => ['foo' => $sender, 'bar' => $sender2],
|
||||||
|
], [
|
||||||
|
// normally, this class sends and handles (but not on retry)
|
||||||
|
DummyMessage::class => true,
|
||||||
|
]));
|
||||||
|
|
||||||
|
$sender->expects($this->never())
|
||||||
|
->method('send')
|
||||||
|
;
|
||||||
|
$sender2->expects($this->once())
|
||||||
|
->method('send')
|
||||||
|
->will($this->returnArgument(0));
|
||||||
|
|
||||||
|
$mockStack = $this->getStackMock(false); // false because next should not be called
|
||||||
|
$envelope = $middleware->handle($envelope, $mockStack);
|
||||||
|
$this->assertCount(1, $envelope->all(SentStamp::class));
|
||||||
|
}
|
||||||
|
|
||||||
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
|
public function testItSendsTheMessageToAssignedSenderWithPreWrappedMessage()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new ChildDummyMessage('Hey'));
|
$envelope = new Envelope(new ChildDummyMessage('Hey'));
|
||||||
@ -49,7 +113,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
|
|
||||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock(false));
|
$middleware->handle($envelope, $this->getStackMock(false));
|
||||||
}
|
}
|
||||||
@ -64,7 +128,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
DummyMessage::class => true,
|
DummyMessage::class => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
@ -79,7 +143,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
DummyMessage::class => true,
|
DummyMessage::class => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
@ -94,7 +158,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
DummyMessageInterface::class => true,
|
DummyMessageInterface::class => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
@ -109,7 +173,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
'*' => true,
|
'*' => true,
|
||||||
]));
|
]));
|
||||||
|
|
||||||
$sender->expects($this->once())->method('send')->with($envelope)->willReturn($envelope);
|
$sender->expects($this->once())->method('send')->with($envelope->with(new SentStamp(\get_class($sender))))->willReturn($envelope);
|
||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,80 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Retry;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
|
||||||
|
class MultiplierRetryStrategyTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testIsRetryable()
|
||||||
|
{
|
||||||
|
$strategy = new MultiplierRetryStrategy(3);
|
||||||
|
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(0, 'sender_alias'));
|
||||||
|
|
||||||
|
$this->assertTrue($strategy->isRetryable($envelope));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testIsNotRetryable()
|
||||||
|
{
|
||||||
|
$strategy = new MultiplierRetryStrategy(3);
|
||||||
|
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp(3, 'sender_alias'));
|
||||||
|
|
||||||
|
$this->assertFalse($strategy->isRetryable($envelope));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testIsRetryableWithNoStamp()
|
||||||
|
{
|
||||||
|
$strategy = new MultiplierRetryStrategy(3);
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
|
||||||
|
$this->assertTrue($strategy->isRetryable($envelope));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @dataProvider getWaitTimeTests
|
||||||
|
*/
|
||||||
|
public function testGetWaitTime(int $delay, int $multiplier, int $maxDelay, int $previousRetries, int $expectedDelay)
|
||||||
|
{
|
||||||
|
$strategy = new MultiplierRetryStrategy(10, $delay, $multiplier, $maxDelay);
|
||||||
|
$envelope = new Envelope(new \stdClass(), new RedeliveryStamp($previousRetries, 'sender_alias'));
|
||||||
|
|
||||||
|
$this->assertSame($expectedDelay, $strategy->getWaitingTime($envelope));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getWaitTimeTests()
|
||||||
|
{
|
||||||
|
// delay, multiplier, maxDelay, retries, expectedDelay
|
||||||
|
yield [1000, 1, 5000, 0, 1000];
|
||||||
|
yield [1000, 1, 5000, 1, 1000];
|
||||||
|
yield [1000, 1, 5000, 2, 1000];
|
||||||
|
|
||||||
|
yield [1000, 2, 10000, 0, 1000];
|
||||||
|
yield [1000, 2, 10000, 1, 2000];
|
||||||
|
yield [1000, 2, 10000, 2, 4000];
|
||||||
|
yield [1000, 2, 10000, 3, 8000];
|
||||||
|
yield [1000, 2, 10000, 4, 10000]; // max hit
|
||||||
|
yield [1000, 2, 0, 4, 16000]; // no max
|
||||||
|
|
||||||
|
yield [1000, 3, 10000, 0, 1000];
|
||||||
|
yield [1000, 3, 10000, 1, 3000];
|
||||||
|
yield [1000, 3, 10000, 2, 9000];
|
||||||
|
|
||||||
|
yield [1000, 1, 500, 0, 500]; // max hit immediately
|
||||||
|
|
||||||
|
// never a delay
|
||||||
|
yield [0, 2, 10000, 0, 0];
|
||||||
|
yield [0, 2, 10000, 1, 0];
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Tests\Stamp;
|
||||||
|
|
||||||
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
|
||||||
|
class RedeliveryStampTest extends TestCase
|
||||||
|
{
|
||||||
|
public function testShouldRedeliverToSenderWithAlias()
|
||||||
|
{
|
||||||
|
$stamp = new RedeliveryStamp(5, 'foo_alias');
|
||||||
|
|
||||||
|
$this->assertFalse($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'bar_alias'));
|
||||||
|
$this->assertTrue($stamp->shouldRedeliverToSender('Foo\Bar\Sender', 'foo_alias'));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testShouldRedeliverToSenderWithoutAlias()
|
||||||
|
{
|
||||||
|
$stampToRedeliverToSender1 = new RedeliveryStamp(5, 'App\Sender1');
|
||||||
|
|
||||||
|
$this->assertTrue($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender1', null));
|
||||||
|
$this->assertFalse($stampToRedeliverToSender1->shouldRedeliverToSender('App\Sender2', null));
|
||||||
|
}
|
||||||
|
}
|
@ -13,15 +13,20 @@ namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Process\PhpProcess;
|
use Symfony\Component\Process\PhpProcess;
|
||||||
use Symfony\Component\Process\Process;
|
use Symfony\Component\Process\Process;
|
||||||
use Symfony\Component\Serializer as SerializerComponent;
|
use Symfony\Component\Serializer as SerializerComponent;
|
||||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||||
|
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
|
||||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -40,9 +45,7 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
|
|
||||||
public function testItSendsAndReceivesMessages()
|
public function testItSendsAndReceivesMessages()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = $this->createSerializer();
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||||
$connection->setup();
|
$connection->setup();
|
||||||
@ -56,7 +59,9 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
|
|
||||||
$receivedMessages = 0;
|
$receivedMessages = 0;
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
|
||||||
$this->assertEquals(0 === $receivedMessages ? $first : $second, $envelope);
|
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
|
||||||
|
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
|
||||||
|
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
|
||||||
|
|
||||||
if (2 === ++$receivedMessages) {
|
if (2 === ++$receivedMessages) {
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
@ -64,11 +69,68 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testRetryAndDelay()
|
||||||
|
{
|
||||||
|
$serializer = $this->createSerializer();
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||||
|
$connection->setup();
|
||||||
|
$connection->queue()->purge();
|
||||||
|
|
||||||
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
|
$sender->send($first = new Envelope(new DummyMessage('First')));
|
||||||
|
|
||||||
|
$receivedMessages = 0;
|
||||||
|
$startTime = time();
|
||||||
|
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
|
||||||
|
if (null === $envelope) {
|
||||||
|
// if we have been processing for 4 seconds + have received 2 messages
|
||||||
|
// then it's safe to say no other messages will be received
|
||||||
|
if (time() > $startTime + 4 && 2 === $receivedMessages) {
|
||||||
|
$receiver->stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
++$receivedMessages;
|
||||||
|
|
||||||
|
// retry the first time
|
||||||
|
if (1 === $receivedMessages) {
|
||||||
|
// imitate what Worker does
|
||||||
|
$envelope = $envelope
|
||||||
|
->with(new DelayStamp(2000))
|
||||||
|
->with(new RedeliveryStamp(1, 'not_important'));
|
||||||
|
$sender->send($envelope);
|
||||||
|
$receiver->ack($envelope);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (2 === $receivedMessages) {
|
||||||
|
// should have a 2 second delay
|
||||||
|
$this->assertGreaterThanOrEqual($startTime + 2, time());
|
||||||
|
// but only a 2 second delay
|
||||||
|
$this->assertLessThan($startTime + 4, time());
|
||||||
|
|
||||||
|
/** @var RedeliveryStamp|null $retryStamp */
|
||||||
|
// verify the stamp still exists from the last send
|
||||||
|
$retryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
|
$this->assertNotNull($retryStamp);
|
||||||
|
$this->assertSame(1, $retryStamp->getRetryCount());
|
||||||
|
|
||||||
|
$receiver->ack($envelope);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public function testItReceivesSignals()
|
public function testItReceivesSignals()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = $this->createSerializer();
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||||
$connection->setup();
|
$connection->setup();
|
||||||
@ -91,17 +153,20 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$signalTime = microtime(true);
|
$signalTime = microtime(true);
|
||||||
$timedOutTime = time() + 10;
|
$timedOutTime = time() + 10;
|
||||||
|
|
||||||
|
// immediately after the process has started "booted", kill it
|
||||||
$process->signal(15);
|
$process->signal(15);
|
||||||
|
|
||||||
while ($process->isRunning() && time() < $timedOutTime) {
|
while ($process->isRunning() && time() < $timedOutTime) {
|
||||||
usleep(100 * 1000); // 100ms
|
usleep(100 * 1000); // 100ms
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make sure the process exited, after consuming only the 1 message
|
||||||
$this->assertFalse($process->isRunning());
|
$this->assertFalse($process->isRunning());
|
||||||
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
|
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
|
||||||
$this->assertSame($expectedOutput.<<<'TXT'
|
$this->assertSame($expectedOutput.<<<'TXT'
|
||||||
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
|
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
|
||||||
with stamps: [
|
with stamps: [
|
||||||
|
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
|
||||||
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
|
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
|
||||||
]
|
]
|
||||||
Done.
|
Done.
|
||||||
@ -115,9 +180,7 @@ TXT
|
|||||||
*/
|
*/
|
||||||
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
|
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = $this->createSerializer();
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
|
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
|
||||||
$connection->setup();
|
$connection->setup();
|
||||||
@ -149,4 +212,11 @@ TXT
|
|||||||
|
|
||||||
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
|
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function createSerializer(): SerializerInterface
|
||||||
|
{
|
||||||
|
return new Serializer(
|
||||||
|
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,6 @@ use Symfony\Component\Messenger\Envelope;
|
|||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Serializer as SerializerComponent;
|
use Symfony\Component\Serializer as SerializerComponent;
|
||||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||||
@ -27,22 +26,15 @@ use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
|||||||
*/
|
*/
|
||||||
class AmqpReceiverTest extends TestCase
|
class AmqpReceiverTest extends TestCase
|
||||||
{
|
{
|
||||||
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
|
public function testItSendTheDecodedMessageToTheHandler()
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
$serializer = new Serializer(
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||||
);
|
);
|
||||||
|
|
||||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
|
||||||
$envelope->method('getHeaders')->willReturn([
|
|
||||||
'type' => DummyMessage::class,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->method('get')->willReturn($envelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
|
|
||||||
$connection->expects($this->once())->method('ack')->with($envelope);
|
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||||
@ -51,57 +43,6 @@ class AmqpReceiverTest extends TestCase
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException
|
|
||||||
*/
|
|
||||||
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
|
|
||||||
{
|
|
||||||
$serializer = new Serializer(
|
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
|
||||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
|
||||||
$envelope->method('getHeaders')->willReturn([
|
|
||||||
'type' => DummyMessage::class,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
|
||||||
$connection->method('get')->willReturn($envelope);
|
|
||||||
|
|
||||||
$connection->expects($this->once())->method('nack')->with($envelope);
|
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
|
||||||
$receiver->receive(function () {
|
|
||||||
throw new InterruptException('Well...');
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\WillNeverWorkException
|
|
||||||
*/
|
|
||||||
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
|
|
||||||
{
|
|
||||||
$serializer = new Serializer(
|
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
|
||||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
|
||||||
$envelope->method('getHeaders')->willReturn([
|
|
||||||
'type' => DummyMessage::class,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
|
||||||
$connection->method('get')->willReturn($envelope);
|
|
||||||
$connection->expects($this->once())->method('reject')->with($envelope);
|
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
|
||||||
$receiver->receive(function () {
|
|
||||||
throw new WillNeverWorkException('Well...');
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
||||||
*/
|
*/
|
||||||
@ -111,19 +52,14 @@ class AmqpReceiverTest extends TestCase
|
|||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||||
);
|
);
|
||||||
|
|
||||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
|
||||||
$envelope->method('getHeaders')->willReturn([
|
|
||||||
'type' => DummyMessage::class,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->method('get')->willReturn($envelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
|
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
|
||||||
$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
|
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||||
|
$receiver->ack($envelope);
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -137,53 +73,26 @@ class AmqpReceiverTest extends TestCase
|
|||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
||||||
);
|
);
|
||||||
|
|
||||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
$amqpEnvelope = $this->createAMQPEnvelope();
|
||||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
|
||||||
$envelope->method('getHeaders')->willReturn([
|
|
||||||
'type' => DummyMessage::class,
|
|
||||||
]);
|
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->method('get')->willReturn($envelope);
|
$connection->method('get')->willReturn($amqpEnvelope);
|
||||||
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());
|
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function () {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||||
throw new WillNeverWorkException('Well...');
|
$receiver->reject($envelope);
|
||||||
|
$receiver->stop();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private function createAMQPEnvelope()
|
||||||
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
|
|
||||||
*/
|
|
||||||
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
|
|
||||||
{
|
{
|
||||||
$serializer = new Serializer(
|
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
|
||||||
);
|
|
||||||
|
|
||||||
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
|
||||||
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
|
||||||
$envelope->method('getHeaders')->willReturn([
|
$envelope->method('getHeaders')->willReturn([
|
||||||
'type' => DummyMessage::class,
|
'type' => DummyMessage::class,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
return $envelope;
|
||||||
$connection->method('get')->willReturn($envelope);
|
|
||||||
|
|
||||||
$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
|
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
|
||||||
$receiver->receive(function () {
|
|
||||||
throw new InterruptException('Well...');
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class InterruptException extends \Exception
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
@ -255,6 +255,86 @@ class ConnectionTest extends TestCase
|
|||||||
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
|
||||||
$connection->publish('body', $headers);
|
$connection->publish('body', $headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItDelaysTheMessage()
|
||||||
|
{
|
||||||
|
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
|
||||||
|
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
|
||||||
|
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
|
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
|
||||||
|
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||||
|
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||||
|
$factory->method('createQueue')->willReturn($delayQueue);
|
||||||
|
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||||
|
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
|
||||||
|
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||||
|
));
|
||||||
|
|
||||||
|
$amqpExchange->expects($this->once())->method('setName')->with('messages');
|
||||||
|
$amqpExchange->method('getName')->willReturn('messages');
|
||||||
|
|
||||||
|
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
||||||
|
$delayExchange->expects($this->once())->method('declareExchange');
|
||||||
|
$delayExchange->method('getName')->willReturn('delay');
|
||||||
|
|
||||||
|
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_5000');
|
||||||
|
$delayQueue->expects($this->once())->method('setArguments')->with([
|
||||||
|
'x-message-ttl' => 5000,
|
||||||
|
'x-dead-letter-exchange' => 'messages',
|
||||||
|
]);
|
||||||
|
|
||||||
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
|
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_5000');
|
||||||
|
|
||||||
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
|
||||||
|
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
|
||||||
|
{
|
||||||
|
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
|
||||||
|
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
|
||||||
|
$delayQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
|
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
|
||||||
|
$factory->method('createConnection')->willReturn($amqpConnection);
|
||||||
|
$factory->method('createChannel')->willReturn($amqpChannel);
|
||||||
|
$factory->method('createQueue')->willReturn($delayQueue);
|
||||||
|
$factory->method('createExchange')->will($this->onConsecutiveCalls(
|
||||||
|
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
|
||||||
|
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
|
||||||
|
));
|
||||||
|
|
||||||
|
$amqpExchange->expects($this->once())->method('setName')->with('messages');
|
||||||
|
$amqpExchange->method('getName')->willReturn('messages');
|
||||||
|
|
||||||
|
$delayExchange->expects($this->once())->method('setName')->with('delay');
|
||||||
|
$delayExchange->expects($this->once())->method('declareExchange');
|
||||||
|
$delayExchange->method('getName')->willReturn('delay');
|
||||||
|
|
||||||
|
$connectionOptions = [
|
||||||
|
'retry' => [
|
||||||
|
'dead_routing_key' => 'my_dead_routing_key',
|
||||||
|
],
|
||||||
|
];
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
|
||||||
|
|
||||||
|
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_120000');
|
||||||
|
$delayQueue->expects($this->once())->method('setArguments')->with([
|
||||||
|
'x-message-ttl' => 120000,
|
||||||
|
'x-dead-letter-exchange' => 'messages',
|
||||||
|
]);
|
||||||
|
|
||||||
|
$delayQueue->expects($this->once())->method('declareQueue');
|
||||||
|
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
|
||||||
|
|
||||||
|
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
|
||||||
|
$connection->publish('{}', [], 120000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestAmqpFactory extends AmqpFactory
|
class TestAmqpFactory extends AmqpFactory
|
||||||
|
@ -14,20 +14,23 @@ require_once $autoload;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
use Symfony\Component\Serializer as SerializerComponent;
|
use Symfony\Component\Serializer as SerializerComponent;
|
||||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||||
|
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
|
||||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||||
|
|
||||||
$serializer = new Serializer(
|
$serializer = new Serializer(
|
||||||
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
|
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
|
||||||
);
|
);
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('DSN'));
|
$connection = Connection::fromDsn(getenv('DSN'));
|
||||||
$receiver = new AmqpReceiver($connection, $serializer);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
$retryStrategy = new MultiplierRetryStrategy(3, 0);
|
||||||
|
|
||||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
||||||
public function dispatch($envelope): Envelope
|
public function dispatch($envelope): Envelope
|
||||||
@ -40,7 +43,7 @@ $worker = new Worker($receiver, new class() implements MessageBusInterface {
|
|||||||
|
|
||||||
return $envelope;
|
return $envelope;
|
||||||
}
|
}
|
||||||
});
|
}, 'the_receiver', $retryStrategy);
|
||||||
|
|
||||||
echo "Receiving messages...\n";
|
echo "Receiving messages...\n";
|
||||||
$worker->run();
|
$worker->run();
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||||
|
|
||||||
@ -26,4 +27,28 @@ class PhpSerializerTest extends TestCase
|
|||||||
|
|
||||||
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
|
$this->assertEquals($envelope, $serializer->decode($serializer->encode($envelope)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testDecodingFailsWithBadFormat()
|
||||||
|
{
|
||||||
|
$this->expectException(MessageDecodingFailedException::class);
|
||||||
|
$this->expectExceptionMessageRegExp('/Could not decode/');
|
||||||
|
|
||||||
|
$serializer = new PhpSerializer();
|
||||||
|
|
||||||
|
$serializer->decode([
|
||||||
|
'body' => '{"message": "bar"}',
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testDecodingFailsWithBadClass()
|
||||||
|
{
|
||||||
|
$this->expectException(MessageDecodingFailedException::class);
|
||||||
|
$this->expectExceptionMessageRegExp('/class "ReceivedSt0mp" not found/');
|
||||||
|
|
||||||
|
$serializer = new PhpSerializer();
|
||||||
|
|
||||||
|
$serializer->decode([
|
||||||
|
'body' => 'O:13:"ReceivedSt0mp":0:{}',
|
||||||
|
]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Tests\Transport\Serialization;
|
|||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
use Symfony\Component\Messenger\Stamp\SerializerStamp;
|
use Symfony\Component\Messenger\Stamp\SerializerStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ValidationStamp;
|
use Symfony\Component\Messenger\Stamp\ValidationStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
@ -94,4 +95,28 @@ class SerializerTest extends TestCase
|
|||||||
$this->assertEquals($serializerStamp, $decoded->last(SerializerStamp::class));
|
$this->assertEquals($serializerStamp, $decoded->last(SerializerStamp::class));
|
||||||
$this->assertEquals($validationStamp, $decoded->last(ValidationStamp::class));
|
$this->assertEquals($validationStamp, $decoded->last(ValidationStamp::class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testDecodingFailsWithBadFormat()
|
||||||
|
{
|
||||||
|
$this->expectException(MessageDecodingFailedException::class);
|
||||||
|
|
||||||
|
$serializer = new Serializer();
|
||||||
|
|
||||||
|
$serializer->decode([
|
||||||
|
'body' => '{foo',
|
||||||
|
'headers' => ['type' => 'stdClass'],
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testDecodingFailsWithBadClass()
|
||||||
|
{
|
||||||
|
$this->expectException(MessageDecodingFailedException::class);
|
||||||
|
|
||||||
|
$serializer = new Serializer();
|
||||||
|
|
||||||
|
$serializer->decode([
|
||||||
|
'body' => '{}',
|
||||||
|
'headers' => ['type' => 'NonExistentClass'],
|
||||||
|
]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,9 +12,17 @@
|
|||||||
namespace Symfony\Component\Messenger\Tests;
|
namespace Symfony\Component\Messenger\Tests;
|
||||||
|
|
||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||||
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Worker;
|
use Symfony\Component\Messenger\Worker;
|
||||||
@ -33,11 +41,13 @@ class WorkerTest extends TestCase
|
|||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with(($envelope = new Envelope($apiMessage))->with(new ReceivedStamp()))->willReturn($envelope);
|
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
|
||||||
$bus->expects($this->at(1))->method('dispatch')->with(($envelope = new Envelope($ipaMessage))->with(new ReceivedStamp()))->willReturn($envelope);
|
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus, 'receiver_id');
|
||||||
$worker->run();
|
$worker->run();
|
||||||
|
|
||||||
|
$this->assertSame(2, $receiver->getAcknowledgeCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
|
||||||
@ -50,29 +60,79 @@ class WorkerTest extends TestCase
|
|||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||||
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerIsThrowingExceptionsBackToGenerators()
|
public function testDispatchCausesRetry()
|
||||||
{
|
{
|
||||||
$receiver = new CallbackReceiver(function ($handler) {
|
$receiver = new CallbackReceiver(function ($handler) {
|
||||||
try {
|
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
|
||||||
$handler(new Envelope(new DummyMessage('Hello')));
|
});
|
||||||
|
|
||||||
$this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.');
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
} catch (\InvalidArgumentException $e) {
|
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
||||||
// This should be called because of the exception sent back to the generator.
|
|
||||||
$this->assertTrue(true);
|
// 2nd call will be the retry
|
||||||
}
|
$bus->expects($this->at(1))->method('dispatch')->with($this->callback(function (Envelope $envelope) {
|
||||||
|
/** @var RedeliveryStamp|null $redeliveryStamp */
|
||||||
|
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
|
$this->assertNotNull($redeliveryStamp);
|
||||||
|
// retry count now at 1
|
||||||
|
$this->assertSame(1, $redeliveryStamp->getRetryCount());
|
||||||
|
$this->assertTrue($redeliveryStamp->shouldRedeliverToSender('Some\Sender', 'sender_alias'));
|
||||||
|
|
||||||
|
// received stamp is removed
|
||||||
|
$this->assertNull($envelope->last(ReceivedStamp::class));
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}))->willReturnArgument(0);
|
||||||
|
|
||||||
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
|
||||||
|
|
||||||
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
||||||
|
$worker->run();
|
||||||
|
|
||||||
|
// old message acknowledged
|
||||||
|
$this->assertSame(1, $receiver->getAcknowledgeCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testDispatchCausesRejectWhenNoRetry()
|
||||||
|
{
|
||||||
|
$receiver = new CallbackReceiver(function ($handler) {
|
||||||
|
$handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
|
||||||
});
|
});
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
|
||||||
|
|
||||||
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
|
$this->assertSame(1, $receiver->getRejectCount());
|
||||||
|
$this->assertSame(0, $receiver->getAcknowledgeCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testDispatchCausesRejectOnUnrecoverableMessage()
|
||||||
|
{
|
||||||
|
$receiver = new CallbackReceiver(function ($handler) {
|
||||||
|
$handler(new Envelope(new DummyMessage('Hello')));
|
||||||
|
});
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
|
||||||
|
|
||||||
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
$retryStrategy->expects($this->never())->method('isRetryable');
|
||||||
|
|
||||||
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
||||||
|
$worker->run();
|
||||||
|
$this->assertSame(1, $receiver->getRejectCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testWorkerDoesNotSendNullMessagesToTheBus()
|
public function testWorkerDoesNotSendNullMessagesToTheBus()
|
||||||
@ -83,8 +143,58 @@ class WorkerTest extends TestCase
|
|||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->expects($this->never())->method('dispatch');
|
$bus->expects($this->never())->method('dispatch');
|
||||||
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
|
||||||
$worker = new Worker($receiver, $bus);
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
|
||||||
|
$worker->run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWorkerDispatchesEventsOnSuccess()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('Hello'));
|
||||||
|
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
|
||||||
|
$handler($envelope);
|
||||||
|
});
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
$bus->method('dispatch')->willReturn($envelope);
|
||||||
|
|
||||||
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
||||||
|
|
||||||
|
$eventDispatcher->expects($this->exactly(2))
|
||||||
|
->method('dispatch')
|
||||||
|
->withConsecutive(
|
||||||
|
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
|
||||||
|
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
|
||||||
|
);
|
||||||
|
|
||||||
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
|
||||||
|
$worker->run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testWorkerDispatchesEventsOnError()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('Hello'));
|
||||||
|
$receiver = new CallbackReceiver(function ($handler) use ($envelope) {
|
||||||
|
$handler($envelope);
|
||||||
|
});
|
||||||
|
|
||||||
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
$exception = new \InvalidArgumentException('Oh no!');
|
||||||
|
$bus->method('dispatch')->willThrowException($exception);
|
||||||
|
|
||||||
|
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
|
||||||
|
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
|
||||||
|
|
||||||
|
$eventDispatcher->expects($this->exactly(2))
|
||||||
|
->method('dispatch')
|
||||||
|
->withConsecutive(
|
||||||
|
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
|
||||||
|
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
|
||||||
|
);
|
||||||
|
|
||||||
|
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
|
||||||
$worker->run();
|
$worker->run();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,34 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Stamp\StampInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stamp applied when a message is received from Amqp.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*/
|
||||||
|
class AmqpReceivedStamp implements StampInterface
|
||||||
|
{
|
||||||
|
private $amqpEnvelope;
|
||||||
|
|
||||||
|
public function __construct(\AMQPEnvelope $amqpEnvelope)
|
||||||
|
{
|
||||||
|
$this->amqpEnvelope = $amqpEnvelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getAmqpEnvelope(): \AMQPEnvelope
|
||||||
|
{
|
||||||
|
return $this->amqpEnvelope;
|
||||||
|
}
|
||||||
|
}
|
@ -11,8 +11,10 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\LogicException;
|
||||||
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
use Symfony\Component\Messenger\Exception\TransportException;
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
@ -42,53 +44,74 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
public function receive(callable $handler): void
|
public function receive(callable $handler): void
|
||||||
{
|
{
|
||||||
while (!$this->shouldStop) {
|
while (!$this->shouldStop) {
|
||||||
$AMQPEnvelope = $this->connection->get();
|
try {
|
||||||
if (null === $AMQPEnvelope) {
|
$amqpEnvelope = $this->connection->get();
|
||||||
|
} catch (\AMQPException $exception) {
|
||||||
|
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null === $amqpEnvelope) {
|
||||||
$handler(null);
|
$handler(null);
|
||||||
|
|
||||||
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
|
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
|
||||||
if (\function_exists('pcntl_signal_dispatch')) {
|
|
||||||
pcntl_signal_dispatch();
|
|
||||||
}
|
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$handler($this->serializer->decode([
|
$envelope = $this->serializer->decode([
|
||||||
'body' => $AMQPEnvelope->getBody(),
|
'body' => $amqpEnvelope->getBody(),
|
||||||
'headers' => $AMQPEnvelope->getHeaders(),
|
'headers' => $amqpEnvelope->getHeaders(),
|
||||||
]));
|
]);
|
||||||
|
} catch (MessageDecodingFailedException $exception) {
|
||||||
|
// invalid message of some type
|
||||||
|
$this->rejectAmqpEnvelope($amqpEnvelope);
|
||||||
|
|
||||||
$this->connection->ack($AMQPEnvelope);
|
throw $exception;
|
||||||
} catch (RejectMessageExceptionInterface $e) {
|
|
||||||
try {
|
|
||||||
$this->connection->reject($AMQPEnvelope);
|
|
||||||
} catch (\AMQPException $exception) {
|
|
||||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw $e;
|
|
||||||
} catch (\AMQPException $e) {
|
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
|
||||||
} catch (\Throwable $e) {
|
|
||||||
try {
|
|
||||||
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
|
|
||||||
} catch (\AMQPException $exception) {
|
|
||||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw $e;
|
|
||||||
} finally {
|
|
||||||
if (\function_exists('pcntl_signal_dispatch')) {
|
|
||||||
pcntl_signal_dispatch();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
|
||||||
|
$handler($envelope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$this->connection->ack($this->findAmqpEnvelope($envelope));
|
||||||
|
} catch (\AMQPException $exception) {
|
||||||
|
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
|
||||||
|
}
|
||||||
|
|
||||||
public function stop(): void
|
public function stop(): void
|
||||||
{
|
{
|
||||||
$this->shouldStop = true;
|
$this->shouldStop = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
$this->connection->nack($amqpEnvelope, AMQP_NOPARAM);
|
||||||
|
} catch (\AMQPException $exception) {
|
||||||
|
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private function findAmqpEnvelope(Envelope $envelope): \AMQPEnvelope
|
||||||
|
{
|
||||||
|
/** @var AmqpReceivedStamp|null $amqpReceivedStamp */
|
||||||
|
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||||
|
|
||||||
|
if (null === $amqpReceivedStamp) {
|
||||||
|
throw new LogicException('No AmqpReceivedStamp found on the Envelope.');
|
||||||
|
}
|
||||||
|
|
||||||
|
return $amqpReceivedStamp->getAmqpEnvelope();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\TransportException;
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
@ -42,8 +43,15 @@ class AmqpSender implements SenderInterface
|
|||||||
{
|
{
|
||||||
$encodedMessage = $this->serializer->encode($envelope);
|
$encodedMessage = $this->serializer->encode($envelope);
|
||||||
|
|
||||||
|
/** @var DelayStamp|null $delayStamp */
|
||||||
|
$delayStamp = $envelope->last(DelayStamp::class);
|
||||||
|
$delay = 0;
|
||||||
|
if (null !== $delayStamp) {
|
||||||
|
$delay = $delayStamp->getDelay();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? []);
|
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
|
||||||
} catch (\AMQPException $e) {
|
} catch (\AMQPException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,22 @@ class AmqpTransport implements TransportInterface
|
|||||||
($this->receiver ?? $this->getReceiver())->stop();
|
($this->receiver ?? $this->getReceiver())->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
($this->receiver ?? $this->getReceiver())->ack($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
($this->receiver ?? $this->getReceiver())->reject($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
|
@ -33,7 +33,7 @@ class Connection
|
|||||||
'x-message-ttl',
|
'x-message-ttl',
|
||||||
];
|
];
|
||||||
|
|
||||||
private $connectionCredentials;
|
private $connectionConfiguration;
|
||||||
private $exchangeConfiguration;
|
private $exchangeConfiguration;
|
||||||
private $queueConfiguration;
|
private $queueConfiguration;
|
||||||
private $amqpFactory;
|
private $amqpFactory;
|
||||||
@ -53,9 +53,47 @@ class Connection
|
|||||||
*/
|
*/
|
||||||
private $amqpQueue;
|
private $amqpQueue;
|
||||||
|
|
||||||
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
|
/**
|
||||||
|
* @var \AMQPExchange|null
|
||||||
|
*/
|
||||||
|
private $amqpDelayExchange;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*
|
||||||
|
* Available options:
|
||||||
|
*
|
||||||
|
* * host: Hostname of the AMQP service
|
||||||
|
* * port: Port of the AMQP service
|
||||||
|
* * vhost: Virtual Host to use with the AMQP service
|
||||||
|
* * user: Username to use to connect the the AMQP service
|
||||||
|
* * password: Password to use the connect to the AMQP service
|
||||||
|
* * queue:
|
||||||
|
* * name: Name of the queue
|
||||||
|
* * routing_key: The routing key (if any) to use to push the messages to
|
||||||
|
* * flags: Queue flags (Default: AMQP_DURABLE)
|
||||||
|
* * arguments: Extra arguments
|
||||||
|
* * exchange:
|
||||||
|
* * name: Name of the exchange
|
||||||
|
* * type: Type of exchange (Default: fanout)
|
||||||
|
* * flags: Exchange flags (Default: AMQP_DURABLE)
|
||||||
|
* * arguments: Extra arguments
|
||||||
|
* * delay:
|
||||||
|
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%delay%")
|
||||||
|
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%delay%")
|
||||||
|
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
|
||||||
|
* * auto-setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
||||||
|
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
|
||||||
|
*/
|
||||||
|
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
|
||||||
{
|
{
|
||||||
$this->connectionCredentials = $connectionCredentials;
|
$this->connectionConfiguration = array_replace_recursive([
|
||||||
|
'delay' => [
|
||||||
|
'routing_key_pattern' => 'delay_%delay%',
|
||||||
|
'exchange_name' => 'delay',
|
||||||
|
'queue_name_pattern' => 'delay_queue_%delay%',
|
||||||
|
],
|
||||||
|
], $connectionConfiguration);
|
||||||
$this->exchangeConfiguration = $exchangeConfiguration;
|
$this->exchangeConfiguration = $exchangeConfiguration;
|
||||||
$this->queueConfiguration = $queueConfiguration;
|
$this->queueConfiguration = $queueConfiguration;
|
||||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||||
@ -123,20 +161,102 @@ class Connection
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @param int $delay The delay in milliseconds
|
||||||
|
*
|
||||||
* @throws \AMQPException
|
* @throws \AMQPException
|
||||||
*/
|
*/
|
||||||
public function publish(string $body, array $headers = []): void
|
public function publish(string $body, array $headers = [], int $delay = 0): void
|
||||||
{
|
{
|
||||||
|
if (0 !== $delay) {
|
||||||
|
$this->publishWithDelay($body, $headers, $delay);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if ($this->shouldSetup()) {
|
if ($this->shouldSetup()) {
|
||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
|
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
|
||||||
$attributes = array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
|
$attributes = $this->getAttributes($headers);
|
||||||
|
|
||||||
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
|
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws \AMQPException
|
||||||
|
*/
|
||||||
|
private function publishWithDelay(string $body, array $headers = [], int $delay)
|
||||||
|
{
|
||||||
|
if ($this->shouldSetup()) {
|
||||||
|
$this->setupDelay($delay);
|
||||||
|
}
|
||||||
|
|
||||||
|
$routingKey = $this->getRoutingKeyForDelay($delay);
|
||||||
|
$flags = $this->queueConfiguration['flags'] ?? AMQP_NOPARAM;
|
||||||
|
$attributes = $this->getAttributes($headers);
|
||||||
|
|
||||||
|
$this->getDelayExchange()->publish($body, $routingKey, $flags, $attributes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function setupDelay(int $delay)
|
||||||
|
{
|
||||||
|
if (!$this->channel()->isConnected()) {
|
||||||
|
$this->clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
$exchange = $this->getDelayExchange();
|
||||||
|
$exchange->declareExchange();
|
||||||
|
|
||||||
|
$queue = $this->createDelayQueue($delay);
|
||||||
|
$queue->declareQueue();
|
||||||
|
$queue->bind($exchange->getName(), $this->getRoutingKeyForDelay($delay));
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getDelayExchange(): \AMQPExchange
|
||||||
|
{
|
||||||
|
if (null === $this->amqpDelayExchange) {
|
||||||
|
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
|
||||||
|
$this->amqpDelayExchange->setName($this->connectionConfiguration['delay']['exchange_name']);
|
||||||
|
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->amqpDelayExchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a delay queue that will delay for a certain amount of time.
|
||||||
|
*
|
||||||
|
* This works by setting message TTL for the delay and pointing
|
||||||
|
* the dead letter exchange to the original exchange. The result
|
||||||
|
* is that after the TTL, the message is sent to the dead-letter-exchange,
|
||||||
|
* which is the original exchange, resulting on it being put back into
|
||||||
|
* the original queue.
|
||||||
|
*/
|
||||||
|
private function createDelayQueue(int $delay)
|
||||||
|
{
|
||||||
|
$delayConfiguration = $this->connectionConfiguration['delay'];
|
||||||
|
|
||||||
|
$queue = $this->amqpFactory->createQueue($this->channel());
|
||||||
|
$queue->setName(str_replace('%delay%', $delay, $delayConfiguration['queue_name_pattern']));
|
||||||
|
$queue->setArguments([
|
||||||
|
'x-message-ttl' => $delay,
|
||||||
|
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
||||||
|
]);
|
||||||
|
|
||||||
|
if (isset($this->queueConfiguration['routing_key'])) {
|
||||||
|
// after being released from to DLX, this routing key will be used
|
||||||
|
$queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getRoutingKeyForDelay(int $delay): string
|
||||||
|
{
|
||||||
|
return str_replace('%delay%', $delay, $this->connectionConfiguration['delay']['routing_key_pattern']);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits and gets a message from the configured queue.
|
* Waits and gets a message from the configured queue.
|
||||||
*
|
*
|
||||||
@ -171,11 +291,6 @@ class Connection
|
|||||||
return $this->queue()->ack($message->getDeliveryTag());
|
return $this->queue()->ack($message->getDeliveryTag());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function reject(\AMQPEnvelope $message): bool
|
|
||||||
{
|
|
||||||
return $this->queue()->reject($message->getDeliveryTag());
|
|
||||||
}
|
|
||||||
|
|
||||||
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
|
public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM): bool
|
||||||
{
|
{
|
||||||
return $this->queue()->nack($message->getDeliveryTag(), $flags);
|
return $this->queue()->nack($message->getDeliveryTag(), $flags);
|
||||||
@ -196,8 +311,8 @@ class Connection
|
|||||||
public function channel(): \AMQPChannel
|
public function channel(): \AMQPChannel
|
||||||
{
|
{
|
||||||
if (null === $this->amqpChannel) {
|
if (null === $this->amqpChannel) {
|
||||||
$connection = $this->amqpFactory->createConnection($this->connectionCredentials);
|
$connection = $this->amqpFactory->createConnection($this->connectionConfiguration);
|
||||||
$connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect';
|
$connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect';
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$connection->{$connectMethod}();
|
$connection->{$connectMethod}();
|
||||||
@ -244,9 +359,9 @@ class Connection
|
|||||||
return $this->amqpExchange;
|
return $this->amqpExchange;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getConnectionCredentials(): array
|
public function getConnectionConfiguration(): array
|
||||||
{
|
{
|
||||||
return $this->connectionCredentials;
|
return $this->connectionConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function clear(): void
|
private function clear(): void
|
||||||
@ -258,14 +373,19 @@ class Connection
|
|||||||
|
|
||||||
private function shouldSetup(): bool
|
private function shouldSetup(): bool
|
||||||
{
|
{
|
||||||
if (!\array_key_exists('auto-setup', $this->connectionCredentials)) {
|
if (!\array_key_exists('auto-setup', $this->connectionConfiguration)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true)) {
|
if (\in_array($this->connectionConfiguration['auto-setup'], [false, 'false'], true)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function getAttributes(array $headers): array
|
||||||
|
{
|
||||||
|
return array_merge_recursive($this->queueConfiguration['attributes'] ?? [], ['headers' => $headers]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,27 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This file is part of the Symfony package.
|
|
||||||
*
|
|
||||||
* (c) Fabien Potencier <fabien@symfony.com>
|
|
||||||
*
|
|
||||||
* For the full copyright and license information, please view the LICENSE
|
|
||||||
* file that was distributed with this source code.
|
|
||||||
*/
|
|
||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt\Exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting
|
|
||||||
* or re-queuing the message.
|
|
||||||
*
|
|
||||||
* If the exception that is thrown by the bus while dispatching the message implements this interface, the message will
|
|
||||||
* be rejected. Otherwise, it will be re-queued.
|
|
||||||
*
|
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
|
||||||
*
|
|
||||||
* @experimental in 4.2
|
|
||||||
*/
|
|
||||||
interface RejectMessageExceptionInterface extends \Throwable
|
|
||||||
{
|
|
||||||
}
|
|
@ -11,8 +11,12 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
namespace Symfony\Component\Messenger\Transport\Receiver;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\TransportException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
*
|
*
|
||||||
* @experimental in 4.2
|
* @experimental in 4.2
|
||||||
*/
|
*/
|
||||||
@ -23,6 +27,12 @@ interface ReceiverInterface
|
|||||||
*
|
*
|
||||||
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
|
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
|
||||||
* Note that this envelope can be `null` if the timeout to receive something has expired.
|
* Note that this envelope can be `null` if the timeout to receive something has expired.
|
||||||
|
*
|
||||||
|
* If the received message cannot be decoded, the message should not
|
||||||
|
* be retried again (e.g. if there's a queue, it should be removed)
|
||||||
|
* and a MessageDecodingFailedException should be thrown.
|
||||||
|
*
|
||||||
|
* @throws TransportException If there is an issue communicating with the transport
|
||||||
*/
|
*/
|
||||||
public function receive(callable $handler): void;
|
public function receive(callable $handler): void;
|
||||||
|
|
||||||
@ -30,4 +40,18 @@ interface ReceiverInterface
|
|||||||
* Stop receiving some messages.
|
* Stop receiving some messages.
|
||||||
*/
|
*/
|
||||||
public function stop(): void;
|
public function stop(): void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acknowledge that the passed message was handled.
|
||||||
|
*
|
||||||
|
* @throws TransportException If there is an issue communicating with the transport
|
||||||
|
*/
|
||||||
|
public function ack(Envelope $envelope): void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when handling the message failed and it should not be retried.
|
||||||
|
*
|
||||||
|
* @throws TransportException If there is an issue communicating with the transport
|
||||||
|
*/
|
||||||
|
public function reject(Envelope $envelope): void;
|
||||||
}
|
}
|
||||||
|
@ -55,4 +55,14 @@ class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
|
|||||||
{
|
{
|
||||||
$this->decoratedReceiver->stop();
|
$this->decoratedReceiver->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->ack($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->reject($envelope);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,4 +52,14 @@ class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
|
|||||||
{
|
{
|
||||||
$this->decoratedReceiver->stop();
|
$this->decoratedReceiver->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->ack($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->reject($envelope);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,4 +53,14 @@ class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
|
|||||||
{
|
{
|
||||||
$this->decoratedReceiver->stop();
|
$this->decoratedReceiver->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->ack($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
$this->decoratedReceiver->reject($envelope);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,9 @@ interface SenderInterface
|
|||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Sends the given envelope.
|
* Sends the given envelope.
|
||||||
|
*
|
||||||
|
* The sender can read different stamps for transport configuration,
|
||||||
|
* like delivery delay.
|
||||||
*/
|
*/
|
||||||
public function send(Envelope $envelope): Envelope;
|
public function send(Envelope $envelope): Envelope;
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Ryan Weaver<ryan@symfonycasts.com>
|
* @author Ryan Weaver<ryan@symfonycasts.com>
|
||||||
@ -30,7 +31,7 @@ class PhpSerializer implements SerializerInterface
|
|||||||
throw new InvalidArgumentException('Encoded envelope should have at least a "body".');
|
throw new InvalidArgumentException('Encoded envelope should have at least a "body".');
|
||||||
}
|
}
|
||||||
|
|
||||||
return unserialize($encodedEnvelope['body']);
|
return $this->safelyUnserialize($encodedEnvelope['body']);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -42,4 +43,35 @@ class PhpSerializer implements SerializerInterface
|
|||||||
'body' => serialize($envelope),
|
'body' => serialize($envelope),
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function safelyUnserialize($contents)
|
||||||
|
{
|
||||||
|
$e = null;
|
||||||
|
$signalingException = new MessageDecodingFailedException(sprintf('Could not decode message using PHP serialization: %s.', $contents));
|
||||||
|
$prevUnserializeHandler = ini_set('unserialize_callback_func', self::class.'::handleUnserializeCallback');
|
||||||
|
$prevErrorHandler = set_error_handler(function ($type, $msg, $file, $line, $context = []) use (&$prevErrorHandler, $signalingException) {
|
||||||
|
if (__FILE__ === $file) {
|
||||||
|
throw $signalingException;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $prevErrorHandler ? $prevErrorHandler($type, $msg, $file, $line, $context) : false;
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
$meta = unserialize($contents);
|
||||||
|
} finally {
|
||||||
|
restore_error_handler();
|
||||||
|
ini_set('unserialize_callback_func', $prevUnserializeHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $meta;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
public static function handleUnserializeCallback($class)
|
||||||
|
{
|
||||||
|
throw new MessageDecodingFailedException(sprintf('Message class "%s" not found during decoding.', $class));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,9 +14,11 @@ namespace Symfony\Component\Messenger\Transport\Serialization;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
use Symfony\Component\Messenger\Exception\LogicException;
|
use Symfony\Component\Messenger\Exception\LogicException;
|
||||||
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
use Symfony\Component\Messenger\Stamp\SerializerStamp;
|
use Symfony\Component\Messenger\Stamp\SerializerStamp;
|
||||||
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
use Symfony\Component\Serializer\Encoder\JsonEncoder;
|
||||||
use Symfony\Component\Serializer\Encoder\XmlEncoder;
|
use Symfony\Component\Serializer\Encoder\XmlEncoder;
|
||||||
|
use Symfony\Component\Serializer\Exception\UnexpectedValueException;
|
||||||
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
|
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
|
||||||
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
|
||||||
use Symfony\Component\Serializer\Serializer as SymfonySerializer;
|
use Symfony\Component\Serializer\Serializer as SymfonySerializer;
|
||||||
@ -75,7 +77,11 @@ class Serializer implements SerializerInterface
|
|||||||
$context = end($stamps[SerializerStamp::class])->getContext() + $context;
|
$context = end($stamps[SerializerStamp::class])->getContext() + $context;
|
||||||
}
|
}
|
||||||
|
|
||||||
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
|
try {
|
||||||
|
$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], $this->format, $context);
|
||||||
|
} catch (UnexpectedValueException $e) {
|
||||||
|
throw new MessageDecodingFailedException(sprintf('Could not decode message: %s.', $e->getMessage()), $e->getCode(), $e);
|
||||||
|
}
|
||||||
|
|
||||||
return new Envelope($message, ...$stamps);
|
return new Envelope($message, ...$stamps);
|
||||||
}
|
}
|
||||||
@ -107,7 +113,11 @@ class Serializer implements SerializerInterface
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
$stamps[] = $this->serializer->deserialize($value, substr($name, \strlen(self::STAMP_HEADER_PREFIX)).'[]', $this->format, $this->context);
|
try {
|
||||||
|
$stamps[] = $this->serializer->deserialize($value, substr($name, \strlen(self::STAMP_HEADER_PREFIX)).'[]', $this->format, $this->context);
|
||||||
|
} catch (UnexpectedValueException $e) {
|
||||||
|
throw new MessageDecodingFailedException(sprintf('Could not decode stamp: %s.', $e->getMessage()), $e->getCode(), $e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if ($stamps) {
|
if ($stamps) {
|
||||||
$stamps = array_merge(...$stamps);
|
$stamps = array_merge(...$stamps);
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\Serialization;
|
namespace Symfony\Component\Messenger\Transport\Serialization;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||||
@ -29,6 +30,8 @@ interface SerializerInterface
|
|||||||
* The most common keys are:
|
* The most common keys are:
|
||||||
* - `body` (string) - the message body
|
* - `body` (string) - the message body
|
||||||
* - `headers` (string<string>) - a key/value pair of headers
|
* - `headers` (string<string>) - a key/value pair of headers
|
||||||
|
*
|
||||||
|
* @throws MessageDecodingFailedException
|
||||||
*/
|
*/
|
||||||
public function decode(array $encodedEnvelope): Envelope;
|
public function decode(array $encodedEnvelope): Envelope;
|
||||||
|
|
||||||
|
@ -11,7 +11,19 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger;
|
namespace Symfony\Component\Messenger;
|
||||||
|
|
||||||
|
use Psr\Log\LoggerInterface;
|
||||||
|
use Symfony\Component\EventDispatcher\Event;
|
||||||
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
||||||
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
||||||
|
use Symfony\Component\Messenger\Exception\LogicException;
|
||||||
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -25,11 +37,24 @@ class Worker
|
|||||||
{
|
{
|
||||||
private $receiver;
|
private $receiver;
|
||||||
private $bus;
|
private $bus;
|
||||||
|
private $receiverName;
|
||||||
|
private $retryStrategy;
|
||||||
|
private $eventDispatcher;
|
||||||
|
private $logger;
|
||||||
|
|
||||||
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus)
|
public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
||||||
{
|
{
|
||||||
$this->receiver = $receiver;
|
$this->receiver = $receiver;
|
||||||
$this->bus = $bus;
|
$this->bus = $bus;
|
||||||
|
if (null === $receiverName) {
|
||||||
|
@trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
|
||||||
|
|
||||||
|
$receiverName = 'unknown';
|
||||||
|
}
|
||||||
|
$this->receiverName = $receiverName;
|
||||||
|
$this->retryStrategy = $retryStrategy;
|
||||||
|
$this->eventDispatcher = $eventDispatcher;
|
||||||
|
$this->logger = $logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -45,10 +70,112 @@ class Worker
|
|||||||
|
|
||||||
$this->receiver->receive(function (?Envelope $envelope) {
|
$this->receiver->receive(function (?Envelope $envelope) {
|
||||||
if (null === $envelope) {
|
if (null === $envelope) {
|
||||||
|
if (\function_exists('pcntl_signal_dispatch')) {
|
||||||
|
pcntl_signal_dispatch();
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->bus->dispatch($envelope->with(new ReceivedStamp()));
|
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
|
||||||
|
|
||||||
|
$message = $envelope->getMessage();
|
||||||
|
$context = [
|
||||||
|
'message' => $message,
|
||||||
|
'class' => \get_class($message),
|
||||||
|
];
|
||||||
|
|
||||||
|
try {
|
||||||
|
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
|
||||||
|
} catch (\Throwable $throwable) {
|
||||||
|
$shouldRetry = $this->shouldRetry($throwable, $envelope);
|
||||||
|
|
||||||
|
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
|
||||||
|
|
||||||
|
if ($shouldRetry) {
|
||||||
|
if (null === $this->retryStrategy) {
|
||||||
|
// not logically allowed, but check just in case
|
||||||
|
throw new LogicException('Retrying is not supported without a retry strategy.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$retryCount = $this->getRetryCount($envelope) + 1;
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the delay and retry stamp info + remove ReceivedStamp
|
||||||
|
$retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
|
||||||
|
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
|
||||||
|
->withoutAll(ReceivedStamp::class);
|
||||||
|
|
||||||
|
// re-send the message
|
||||||
|
$this->bus->dispatch($retryEnvelope);
|
||||||
|
// acknowledge the previous message has received
|
||||||
|
$this->receiver->ack($envelope);
|
||||||
|
} else {
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->receiver->reject($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (\function_exists('pcntl_signal_dispatch')) {
|
||||||
|
pcntl_signal_dispatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
|
||||||
|
|
||||||
|
if (null !== $this->logger) {
|
||||||
|
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->receiver->ack($envelope);
|
||||||
|
|
||||||
|
if (\function_exists('pcntl_signal_dispatch')) {
|
||||||
|
pcntl_signal_dispatch();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function dispatchEvent(Event $event)
|
||||||
|
{
|
||||||
|
if (null === $this->eventDispatcher) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->eventDispatcher->dispatch($event);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function shouldRetry(\Throwable $e, Envelope $envelope): bool
|
||||||
|
{
|
||||||
|
if ($e instanceof UnrecoverableMessageHandlingException) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (null === $this->retryStrategy) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->retryStrategy->isRetryable($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getRetryCount(Envelope $envelope): int
|
||||||
|
{
|
||||||
|
/** @var RedeliveryStamp|null $retryMessageStamp */
|
||||||
|
$retryMessageStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
|
|
||||||
|
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getSenderAlias(Envelope $envelope): ?string
|
||||||
|
{
|
||||||
|
/** @var SentStamp|null $sentStamp */
|
||||||
|
$sentStamp = $envelope->last(SentStamp::class);
|
||||||
|
|
||||||
|
return $sentStamp ? $sentStamp->getSenderAlias() : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
"require-dev": {
|
"require-dev": {
|
||||||
"symfony/console": "~3.4|~4.0",
|
"symfony/console": "~3.4|~4.0",
|
||||||
"symfony/dependency-injection": "~3.4.19|^4.1.8",
|
"symfony/dependency-injection": "~3.4.19|^4.1.8",
|
||||||
|
"symfony/event-dispatcher": "~4.3",
|
||||||
"symfony/http-kernel": "~3.4|~4.0",
|
"symfony/http-kernel": "~3.4|~4.0",
|
||||||
"symfony/process": "~3.4|~4.0",
|
"symfony/process": "~3.4|~4.0",
|
||||||
"symfony/property-access": "~3.4|~4.0",
|
"symfony/property-access": "~3.4|~4.0",
|
||||||
@ -30,6 +31,9 @@
|
|||||||
"symfony/validator": "~3.4|~4.0",
|
"symfony/validator": "~3.4|~4.0",
|
||||||
"symfony/var-dumper": "~3.4|~4.0"
|
"symfony/var-dumper": "~3.4|~4.0"
|
||||||
},
|
},
|
||||||
|
"conflict": {
|
||||||
|
"symfony/event-dispatcher": "<4.3"
|
||||||
|
},
|
||||||
"suggest": {
|
"suggest": {
|
||||||
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."
|
"enqueue/messenger-adapter": "For using the php-enqueue library as a transport."
|
||||||
},
|
},
|
||||||
|
Reference in New Issue
Block a user