2017-09-30 13:16:30 +01:00
|
|
|
<?php
|
|
|
|
|
|
|
|
/*
|
|
|
|
* This file is part of the Symfony package.
|
|
|
|
*
|
|
|
|
* (c) Fabien Potencier <fabien@symfony.com>
|
|
|
|
*
|
|
|
|
* For the full copyright and license information, please view the LICENSE
|
|
|
|
* file that was distributed with this source code.
|
|
|
|
*/
|
|
|
|
|
|
|
|
namespace Symfony\Component\Messenger;
|
|
|
|
|
2019-03-13 19:48:28 +00:00
|
|
|
use Psr\Log\LoggerInterface;
|
|
|
|
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
|
|
|
|
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
|
|
|
|
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
|
2019-04-27 00:00:27 +01:00
|
|
|
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
|
2019-04-06 11:33:50 +01:00
|
|
|
use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
2019-03-13 19:48:28 +00:00
|
|
|
use Symfony\Component\Messenger\Exception\LogicException;
|
|
|
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
|
|
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
|
|
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
2018-10-18 09:53:38 +01:00
|
|
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
2019-03-13 19:48:28 +00:00
|
|
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
|
|
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
2018-10-22 09:20:29 +01:00
|
|
|
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
2019-03-25 16:04:58 +00:00
|
|
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
|
2017-09-30 13:16:30 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* @author Samuel Roze <samuel.roze@gmail.com>
|
2018-10-31 05:40:28 +00:00
|
|
|
*
|
|
|
|
* @experimental in 4.2
|
2018-11-24 10:21:28 +00:00
|
|
|
*
|
|
|
|
* @final
|
2017-09-30 13:16:30 +01:00
|
|
|
*/
|
2019-03-26 13:12:30 +00:00
|
|
|
class Worker implements WorkerInterface
|
2017-09-30 13:16:30 +01:00
|
|
|
{
|
2019-03-26 13:12:30 +00:00
|
|
|
private $receivers;
|
2017-09-30 13:16:30 +01:00
|
|
|
private $bus;
|
2019-03-26 13:12:30 +00:00
|
|
|
private $retryStrategies;
|
2019-03-13 19:48:28 +00:00
|
|
|
private $eventDispatcher;
|
|
|
|
private $logger;
|
2019-03-26 13:12:30 +00:00
|
|
|
private $shouldStop = false;
|
2017-09-30 13:16:30 +01:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
/**
|
|
|
|
* @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier"
|
|
|
|
* @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
|
|
|
|
*/
|
|
|
|
public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
|
2017-09-30 13:16:30 +01:00
|
|
|
{
|
2019-03-26 13:12:30 +00:00
|
|
|
$this->receivers = $receivers;
|
2017-09-30 13:16:30 +01:00
|
|
|
$this->bus = $bus;
|
2019-03-26 13:12:30 +00:00
|
|
|
$this->retryStrategies = $retryStrategies;
|
2019-03-13 19:48:28 +00:00
|
|
|
$this->eventDispatcher = $eventDispatcher;
|
|
|
|
$this->logger = $logger;
|
2017-09-30 13:16:30 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Receive the messages and dispatch them to the bus.
|
2019-03-26 13:12:30 +00:00
|
|
|
*
|
|
|
|
* Valid options are:
|
|
|
|
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
|
2017-09-30 13:16:30 +01:00
|
|
|
*/
|
2019-03-26 13:12:30 +00:00
|
|
|
public function run(array $options = [], callable $onHandledCallback = null): void
|
2017-09-30 13:16:30 +01:00
|
|
|
{
|
2019-03-26 13:12:30 +00:00
|
|
|
$options = array_merge([
|
|
|
|
'sleep' => 1000000,
|
|
|
|
], $options);
|
|
|
|
|
2018-04-12 16:12:27 +01:00
|
|
|
if (\function_exists('pcntl_signal')) {
|
2018-03-13 16:34:51 +00:00
|
|
|
pcntl_signal(SIGTERM, function () {
|
2019-03-26 13:12:30 +00:00
|
|
|
$this->stop();
|
2018-03-13 16:34:51 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
$onHandled = function (?Envelope $envelope) use ($onHandledCallback) {
|
|
|
|
if (\function_exists('pcntl_signal_dispatch')) {
|
|
|
|
pcntl_signal_dispatch();
|
|
|
|
}
|
2019-03-13 19:48:28 +00:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
if (null !== $onHandledCallback) {
|
|
|
|
$onHandledCallback($envelope);
|
2018-03-13 16:34:51 +00:00
|
|
|
}
|
2019-03-26 13:12:30 +00:00
|
|
|
};
|
2017-09-30 13:16:30 +01:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
while (false === $this->shouldStop) {
|
|
|
|
$envelopeHandled = false;
|
|
|
|
foreach ($this->receivers as $receiverName => $receiver) {
|
|
|
|
$envelopes = $receiver->get();
|
|
|
|
|
|
|
|
foreach ($envelopes as $envelope) {
|
|
|
|
$envelopeHandled = true;
|
2019-03-13 19:48:28 +00:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
$this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null);
|
|
|
|
$onHandled($envelope);
|
2019-03-13 19:48:28 +00:00
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
// after handling a single receiver, quit and start the loop again
|
|
|
|
// this should prevent multiple lower priority receivers from
|
|
|
|
// blocking too long before the higher priority are checked
|
|
|
|
if ($envelopeHandled) {
|
|
|
|
break;
|
|
|
|
}
|
2019-03-13 19:48:28 +00:00
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
if (false === $envelopeHandled) {
|
|
|
|
$onHandled(null);
|
2019-03-13 19:48:28 +00:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
usleep($options['sleep']);
|
2019-03-13 19:48:28 +00:00
|
|
|
}
|
2019-03-26 13:12:30 +00:00
|
|
|
}
|
2019-04-27 00:00:27 +01:00
|
|
|
|
|
|
|
$this->dispatchEvent(new WorkerStoppedEvent());
|
2019-03-26 13:12:30 +00:00
|
|
|
}
|
2019-03-13 19:48:28 +00:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)
|
|
|
|
{
|
|
|
|
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName));
|
2019-03-13 19:48:28 +00:00
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
$message = $envelope->getMessage();
|
|
|
|
$context = [
|
|
|
|
'message' => $message,
|
|
|
|
'class' => \get_class($message),
|
|
|
|
];
|
|
|
|
|
|
|
|
try {
|
|
|
|
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
|
|
|
|
} catch (\Throwable $throwable) {
|
2019-04-06 11:33:50 +01:00
|
|
|
if ($throwable instanceof HandlerFailedException) {
|
2019-01-29 13:21:02 +00:00
|
|
|
$envelope = $throwable->getEnvelope();
|
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
|
|
|
|
|
|
|
|
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
|
|
|
|
|
|
|
|
if ($shouldRetry) {
|
|
|
|
if (null === $retryStrategy) {
|
|
|
|
// not logically allowed, but check just in case
|
|
|
|
throw new LogicException('Retrying is not supported without a retry strategy.');
|
|
|
|
}
|
|
|
|
|
|
|
|
$retryCount = $this->getRetryCount($envelope) + 1;
|
|
|
|
if (null !== $this->logger) {
|
|
|
|
$this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
|
|
|
|
}
|
|
|
|
|
|
|
|
// add the delay and retry stamp info + remove ReceivedStamp
|
|
|
|
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
|
|
|
|
->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
|
|
|
|
->withoutAll(ReceivedStamp::class);
|
|
|
|
|
|
|
|
// re-send the message
|
|
|
|
$this->bus->dispatch($retryEnvelope);
|
|
|
|
// acknowledge the previous message has received
|
|
|
|
$receiver->ack($envelope);
|
|
|
|
} else {
|
|
|
|
if (null !== $this->logger) {
|
|
|
|
$this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
|
|
|
|
}
|
|
|
|
|
|
|
|
$receiver->reject($envelope);
|
2019-03-13 19:48:28 +00:00
|
|
|
}
|
2019-03-26 13:12:30 +00:00
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName));
|
|
|
|
|
|
|
|
if (null !== $this->logger) {
|
|
|
|
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
|
|
|
|
}
|
|
|
|
|
|
|
|
$receiver->ack($envelope);
|
|
|
|
}
|
|
|
|
|
|
|
|
public function stop(): void
|
|
|
|
{
|
|
|
|
$this->shouldStop = true;
|
2017-09-30 13:16:30 +01:00
|
|
|
}
|
2019-03-13 19:48:28 +00:00
|
|
|
|
2019-03-25 16:04:58 +00:00
|
|
|
private function dispatchEvent($event)
|
2019-03-13 19:48:28 +00:00
|
|
|
{
|
|
|
|
if (null === $this->eventDispatcher) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->eventDispatcher->dispatch($event);
|
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyInterface $retryStrategy): bool
|
2019-03-13 19:48:28 +00:00
|
|
|
{
|
|
|
|
if ($e instanceof UnrecoverableMessageHandlingException) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
if (null === $retryStrategy) {
|
2019-03-13 19:48:28 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2019-03-26 13:12:30 +00:00
|
|
|
return $retryStrategy->isRetryable($envelope);
|
2019-03-13 19:48:28 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
private function getRetryCount(Envelope $envelope): int
|
|
|
|
{
|
|
|
|
/** @var RedeliveryStamp|null $retryMessageStamp */
|
|
|
|
$retryMessageStamp = $envelope->last(RedeliveryStamp::class);
|
|
|
|
|
|
|
|
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
private function getSenderAlias(Envelope $envelope): ?string
|
|
|
|
{
|
|
|
|
/** @var SentStamp|null $sentStamp */
|
|
|
|
$sentStamp = $envelope->last(SentStamp::class);
|
|
|
|
|
|
|
|
return $sentStamp ? $sentStamp->getSenderAlias() : null;
|
|
|
|
}
|
2017-09-30 13:16:30 +01:00
|
|
|
}
|