2017-09-30 13:16:30 +01:00
< ? php
/*
* This file is part of the Symfony package .
*
* ( c ) Fabien Potencier < fabien @ symfony . com >
*
* For the full copyright and license information , please view the LICENSE
* file that was distributed with this source code .
*/
namespace Symfony\Component\Messenger ;
2019-03-13 19:48:28 +00:00
use Psr\Log\LoggerInterface ;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent ;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent ;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent ;
2019-04-27 00:00:27 +01:00
use Symfony\Component\Messenger\Event\WorkerStoppedEvent ;
2019-04-06 11:33:50 +01:00
use Symfony\Component\Messenger\Exception\HandlerFailedException ;
2019-06-24 03:47:45 +01:00
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface ;
2019-03-13 19:48:28 +00:00
use Symfony\Component\Messenger\Retry\RetryStrategyInterface ;
use Symfony\Component\Messenger\Stamp\DelayStamp ;
2018-10-18 09:53:38 +01:00
use Symfony\Component\Messenger\Stamp\ReceivedStamp ;
2019-03-13 19:48:28 +00:00
use Symfony\Component\Messenger\Stamp\RedeliveryStamp ;
2018-10-22 09:20:29 +01:00
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface ;
2019-03-25 16:04:58 +00:00
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface ;
2017-09-30 13:16:30 +01:00
/**
* @ author Samuel Roze < samuel . roze @ gmail . com >
2018-10-31 05:40:28 +00:00
*
2019-05-28 12:49:01 +01:00
* @ experimental in 4.3
2018-11-24 10:21:28 +00:00
*
* @ final
2017-09-30 13:16:30 +01:00
*/
2019-03-26 13:12:30 +00:00
class Worker implements WorkerInterface
2017-09-30 13:16:30 +01:00
{
2019-03-26 13:12:30 +00:00
private $receivers ;
2017-09-30 13:16:30 +01:00
private $bus ;
2019-03-26 13:12:30 +00:00
private $retryStrategies ;
2019-03-13 19:48:28 +00:00
private $eventDispatcher ;
private $logger ;
2019-03-26 13:12:30 +00:00
private $shouldStop = false ;
2017-09-30 13:16:30 +01:00
2019-03-26 13:12:30 +00:00
/**
2019-04-07 09:12:18 +01:00
* @ param ReceiverInterface [] $receivers Where the key is the transport name
2019-03-26 13:12:30 +00:00
* @ param RetryStrategyInterface [] $retryStrategies Retry strategies for each receiver ( array keys must match )
*/
2019-04-07 12:00:26 +01:00
public function __construct ( array $receivers , MessageBusInterface $bus , array $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null , LoggerInterface $logger = null )
2017-09-30 13:16:30 +01:00
{
2019-03-26 13:12:30 +00:00
$this -> receivers = $receivers ;
2017-09-30 13:16:30 +01:00
$this -> bus = $bus ;
2019-03-26 13:12:30 +00:00
$this -> retryStrategies = $retryStrategies ;
2019-03-13 19:48:28 +00:00
$this -> eventDispatcher = $eventDispatcher ;
$this -> logger = $logger ;
2017-09-30 13:16:30 +01:00
}
/**
* Receive the messages and dispatch them to the bus .
2019-03-26 13:12:30 +00:00
*
* Valid options are :
* * sleep ( default : 1000000 ) : Time in microseconds to sleep after no messages are found
2017-09-30 13:16:30 +01:00
*/
2019-03-26 13:12:30 +00:00
public function run ( array $options = [], callable $onHandledCallback = null ) : void
2017-09-30 13:16:30 +01:00
{
2019-03-26 13:12:30 +00:00
$options = array_merge ([
'sleep' => 1000000 ,
], $options );
2018-04-12 16:12:27 +01:00
if ( \function_exists ( 'pcntl_signal' )) {
2018-03-13 16:34:51 +00:00
pcntl_signal ( SIGTERM , function () {
2019-03-26 13:12:30 +00:00
$this -> stop ();
2018-03-13 16:34:51 +00:00
});
}
2019-03-26 13:12:30 +00:00
$onHandled = function ( ? Envelope $envelope ) use ( $onHandledCallback ) {
if ( \function_exists ( 'pcntl_signal_dispatch' )) {
pcntl_signal_dispatch ();
}
2019-03-13 19:48:28 +00:00
2019-03-26 13:12:30 +00:00
if ( null !== $onHandledCallback ) {
$onHandledCallback ( $envelope );
2018-03-13 16:34:51 +00:00
}
2019-03-26 13:12:30 +00:00
};
2017-09-30 13:16:30 +01:00
2019-03-26 13:12:30 +00:00
while ( false === $this -> shouldStop ) {
$envelopeHandled = false ;
2019-04-07 09:12:18 +01:00
foreach ( $this -> receivers as $transportName => $receiver ) {
2019-03-26 13:12:30 +00:00
$envelopes = $receiver -> get ();
foreach ( $envelopes as $envelope ) {
$envelopeHandled = true ;
2019-03-13 19:48:28 +00:00
2019-04-07 09:12:18 +01:00
$this -> handleMessage ( $envelope , $receiver , $transportName , $this -> retryStrategies [ $transportName ] ? ? null );
2019-03-26 13:12:30 +00:00
$onHandled ( $envelope );
2019-08-23 04:48:20 +01:00
if ( $this -> shouldStop ) {
break 2 ;
}
2019-03-13 19:48:28 +00:00
}
2019-03-26 13:12:30 +00:00
// after handling a single receiver, quit and start the loop again
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ( $envelopeHandled ) {
break ;
}
2019-03-13 19:48:28 +00:00
}
2019-03-26 13:12:30 +00:00
if ( false === $envelopeHandled ) {
$onHandled ( null );
2019-03-13 19:48:28 +00:00
2019-03-26 13:12:30 +00:00
usleep ( $options [ 'sleep' ]);
2019-03-13 19:48:28 +00:00
}
2019-03-26 13:12:30 +00:00
}
2019-04-27 00:00:27 +01:00
$this -> dispatchEvent ( new WorkerStoppedEvent ());
2019-03-26 13:12:30 +00:00
}
2019-03-13 19:48:28 +00:00
2019-04-07 12:00:26 +01:00
private function handleMessage ( Envelope $envelope , ReceiverInterface $receiver , string $transportName , ? RetryStrategyInterface $retryStrategy ) : void
2019-03-26 13:12:30 +00:00
{
2019-04-07 12:00:26 +01:00
$event = new WorkerMessageReceivedEvent ( $envelope , $transportName );
$this -> dispatchEvent ( $event );
if ( ! $event -> shouldHandle ()) {
return ;
}
2019-03-13 19:48:28 +00:00
2019-03-26 13:12:30 +00:00
$message = $envelope -> getMessage ();
$context = [
'message' => $message ,
'class' => \get_class ( $message ),
];
try {
2019-04-07 09:12:18 +01:00
$envelope = $this -> bus -> dispatch ( $envelope -> with ( new ReceivedStamp ( $transportName )));
2019-03-26 13:12:30 +00:00
} catch ( \Throwable $throwable ) {
2019-04-06 11:33:50 +01:00
if ( $throwable instanceof HandlerFailedException ) {
2019-01-29 13:21:02 +00:00
$envelope = $throwable -> getEnvelope ();
}
2019-05-06 03:31:50 +01:00
$shouldRetry = $retryStrategy && $this -> shouldRetry ( $throwable , $envelope , $retryStrategy );
2019-03-26 13:12:30 +00:00
2019-04-07 09:12:18 +01:00
$this -> dispatchEvent ( new WorkerMessageFailedEvent ( $envelope , $transportName , $throwable , $shouldRetry ));
2019-03-26 13:12:30 +00:00
2019-06-14 04:20:19 +01:00
$retryCount = RedeliveryStamp :: getRetryCountFromEnvelope ( $envelope );
2019-03-26 13:12:30 +00:00
if ( $shouldRetry ) {
2019-06-14 04:20:19 +01:00
++ $retryCount ;
$delay = $retryStrategy -> getWaitingTime ( $envelope );
2019-03-26 13:12:30 +00:00
if ( null !== $this -> logger ) {
2019-06-14 04:20:19 +01:00
$this -> logger -> error ( 'Error thrown while handling message {class}. Dispatching for retry #{retryCount} using {delay} ms delay. Error: "{error}"' , $context + [ 'retryCount' => $retryCount , 'delay' => $delay , 'error' => $throwable -> getMessage (), 'exception' => $throwable ]);
2019-03-26 13:12:30 +00:00
}
// add the delay and retry stamp info + remove ReceivedStamp
2019-06-14 04:20:19 +01:00
$retryEnvelope = $envelope -> with ( new DelayStamp ( $delay ))
2019-06-14 21:19:29 +01:00
-> with ( new RedeliveryStamp ( $retryCount , $transportName ))
2019-03-26 13:12:30 +00:00
-> withoutAll ( ReceivedStamp :: class );
// re-send the message
$this -> bus -> dispatch ( $retryEnvelope );
// acknowledge the previous message has received
$receiver -> ack ( $envelope );
} else {
if ( null !== $this -> logger ) {
2019-06-14 04:20:19 +01:00
$this -> logger -> critical ( 'Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"' , $context + [ 'retryCount' => $retryCount , 'error' => $throwable -> getMessage (), 'exception' => $throwable ]);
2019-03-26 13:12:30 +00:00
}
$receiver -> reject ( $envelope );
2019-03-13 19:48:28 +00:00
}
2019-03-26 13:12:30 +00:00
return ;
}
2019-04-07 09:12:18 +01:00
$this -> dispatchEvent ( new WorkerMessageHandledEvent ( $envelope , $transportName ));
2019-03-26 13:12:30 +00:00
if ( null !== $this -> logger ) {
$this -> logger -> info ( '{class} was handled successfully (acknowledging to transport).' , $context );
}
$receiver -> ack ( $envelope );
}
public function stop () : void
{
$this -> shouldStop = true ;
2017-09-30 13:16:30 +01:00
}
2019-03-13 19:48:28 +00:00
2019-03-25 16:04:58 +00:00
private function dispatchEvent ( $event )
2019-03-13 19:48:28 +00:00
{
if ( null === $this -> eventDispatcher ) {
return ;
}
$this -> eventDispatcher -> dispatch ( $event );
}
2019-05-06 03:31:50 +01:00
private function shouldRetry ( \Throwable $e , Envelope $envelope , RetryStrategyInterface $retryStrategy ) : bool
2019-03-13 19:48:28 +00:00
{
2019-07-17 21:40:10 +01:00
// if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
if ( $e instanceof HandlerFailedException ) {
$shouldNotRetry = true ;
foreach ( $e -> getNestedExceptions () as $nestedException ) {
if ( ! $nestedException instanceof UnrecoverableExceptionInterface ) {
$shouldNotRetry = false ;
break ;
}
}
if ( $shouldNotRetry ) {
return false ;
}
}
2019-06-24 03:47:45 +01:00
if ( $e instanceof UnrecoverableExceptionInterface ) {
2019-03-13 19:48:28 +00:00
return false ;
}
2019-03-26 13:12:30 +00:00
return $retryStrategy -> isRetryable ( $envelope );
2019-03-13 19:48:28 +00:00
}
2017-09-30 13:16:30 +01:00
}