[Messenger] improve logs
This commit is contained in:
parent
9865988ac2
commit
2ac7027b71
@ -64,7 +64,7 @@ class HandleMessageMiddleware implements MiddlewareInterface
|
|||||||
$handler = $handlerDescriptor->getHandler();
|
$handler = $handlerDescriptor->getHandler();
|
||||||
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
|
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
|
||||||
$envelope = $envelope->with($handledStamp);
|
$envelope = $envelope->with($handledStamp);
|
||||||
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getHandlerName()]);
|
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
$exceptions[] = $e;
|
$exceptions[] = $e;
|
||||||
}
|
}
|
||||||
@ -75,7 +75,7 @@ class HandleMessageMiddleware implements MiddlewareInterface
|
|||||||
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
|
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->logger->info('No handler for message "{class}"', $context);
|
$this->logger->info('No handler for message {class}', $context);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (\count($exceptions)) {
|
if (\count($exceptions)) {
|
||||||
|
@ -54,37 +54,30 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
|
|
||||||
$sender = null;
|
$sender = null;
|
||||||
|
|
||||||
try {
|
if ($envelope->all(ReceivedStamp::class)) {
|
||||||
if ($envelope->all(ReceivedStamp::class)) {
|
// it's a received message, do not send it back
|
||||||
// it's a received message, do not send it back
|
$this->logger->info('Received message {class}', $context);
|
||||||
$this->logger->info('Received message "{class}"', $context);
|
} else {
|
||||||
} else {
|
/** @var RedeliveryStamp|null $redeliveryStamp */
|
||||||
/** @var RedeliveryStamp|null $redeliveryStamp */
|
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
||||||
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
|
|
||||||
|
|
||||||
// dispatch event unless this is a redelivery
|
// dispatch event unless this is a redelivery
|
||||||
$shouldDispatchEvent = null === $redeliveryStamp;
|
$shouldDispatchEvent = null === $redeliveryStamp;
|
||||||
foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
|
foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
|
||||||
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
|
if (null !== $this->eventDispatcher && $shouldDispatchEvent) {
|
||||||
$event = new SendMessageToTransportsEvent($envelope);
|
$event = new SendMessageToTransportsEvent($envelope);
|
||||||
$this->eventDispatcher->dispatch($event);
|
$this->eventDispatcher->dispatch($event);
|
||||||
$envelope = $event->getEnvelope();
|
$envelope = $event->getEnvelope();
|
||||||
$shouldDispatchEvent = false;
|
$shouldDispatchEvent = false;
|
||||||
}
|
|
||||||
|
|
||||||
$this->logger->info('Sending message "{class}" with "{sender}"', $context + ['sender' => \get_class($sender)]);
|
|
||||||
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (null === $sender) {
|
$this->logger->info('Sending message {class} with {sender}', $context + ['sender' => \get_class($sender)]);
|
||||||
return $stack->next()->handle($envelope, $stack);
|
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
||||||
}
|
}
|
||||||
} catch (\Throwable $e) {
|
}
|
||||||
$context['exception'] = $e;
|
|
||||||
$this->logger->warning('An exception occurred while handling message "{class}": '.$e->getMessage(), $context);
|
|
||||||
|
|
||||||
throw $e;
|
if (null === $sender) {
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
}
|
}
|
||||||
|
|
||||||
// message should only be sent and not be handled by the next middleware
|
// message should only be sent and not be handled by the next middleware
|
||||||
|
@ -70,14 +70,14 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
$retries = $this->getCurrentRetryCount($message);
|
$retries = RedeliveryStamp::getRetryCountFromEnvelope($message);
|
||||||
|
|
||||||
return $retries < $this->maxRetries;
|
return $retries < $this->maxRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getWaitingTime(Envelope $message): int
|
public function getWaitingTime(Envelope $message): int
|
||||||
{
|
{
|
||||||
$retries = $this->getCurrentRetryCount($message);
|
$retries = RedeliveryStamp::getRetryCountFromEnvelope($message);
|
||||||
|
|
||||||
$delay = $this->delayMilliseconds * pow($this->multiplier, $retries);
|
$delay = $this->delayMilliseconds * pow($this->multiplier, $retries);
|
||||||
|
|
||||||
@ -87,12 +87,4 @@ class MultiplierRetryStrategy implements RetryStrategyInterface
|
|||||||
|
|
||||||
return $delay;
|
return $delay;
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getCurrentRetryCount(Envelope $message): int
|
|
||||||
{
|
|
||||||
/** @var RedeliveryStamp|null $retryMessageStamp */
|
|
||||||
$retryMessageStamp = $message->last(RedeliveryStamp::class);
|
|
||||||
|
|
||||||
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Stamp;
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
use Symfony\Component\Debug\Exception\FlattenException;
|
use Symfony\Component\Debug\Exception\FlattenException;
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stamp applied when a messages needs to be redelivered.
|
* Stamp applied when a messages needs to be redelivered.
|
||||||
@ -38,6 +39,14 @@ class RedeliveryStamp implements StampInterface
|
|||||||
$this->redeliveredAt = new \DateTimeImmutable();
|
$this->redeliveredAt = new \DateTimeImmutable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static function getRetryCountFromEnvelope(Envelope $envelope): int
|
||||||
|
{
|
||||||
|
/** @var self|null $retryMessageStamp */
|
||||||
|
$retryMessageStamp = $envelope->last(self::class);
|
||||||
|
|
||||||
|
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
public function getRetryCount(): int
|
public function getRetryCount(): int
|
||||||
{
|
{
|
||||||
return $this->retryCount;
|
return $this->retryCount;
|
||||||
|
@ -140,14 +140,16 @@ class Worker implements WorkerInterface
|
|||||||
|
|
||||||
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
|
$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable, $shouldRetry));
|
||||||
|
|
||||||
|
$retryCount = RedeliveryStamp::getRetryCountFromEnvelope($envelope);
|
||||||
if ($shouldRetry) {
|
if ($shouldRetry) {
|
||||||
$retryCount = $this->getRetryCount($envelope) + 1;
|
++$retryCount;
|
||||||
|
$delay = $retryStrategy->getWaitingTime($envelope);
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
|
$this->logger->error('Error thrown while handling message {class}. Dispatching for retry #{retryCount} using {delay} ms delay. Error: "{error}"', $context + ['retryCount' => $retryCount, 'delay' => $delay, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// add the delay and retry stamp info + remove ReceivedStamp
|
// add the delay and retry stamp info + remove ReceivedStamp
|
||||||
$retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
|
$retryEnvelope = $envelope->with(new DelayStamp($delay))
|
||||||
->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
|
->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
|
||||||
->withoutAll(ReceivedStamp::class);
|
->withoutAll(ReceivedStamp::class);
|
||||||
|
|
||||||
@ -157,7 +159,7 @@ class Worker implements WorkerInterface
|
|||||||
$receiver->ack($envelope);
|
$receiver->ack($envelope);
|
||||||
} else {
|
} else {
|
||||||
if (null !== $this->logger) {
|
if (null !== $this->logger) {
|
||||||
$this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
|
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
|
||||||
}
|
}
|
||||||
|
|
||||||
$receiver->reject($envelope);
|
$receiver->reject($envelope);
|
||||||
@ -207,14 +209,6 @@ class Worker implements WorkerInterface
|
|||||||
return $retryStrategy->isRetryable($envelope);
|
return $retryStrategy->isRetryable($envelope);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getRetryCount(Envelope $envelope): int
|
|
||||||
{
|
|
||||||
/** @var RedeliveryStamp|null $retryMessageStamp */
|
|
||||||
$retryMessageStamp = $envelope->last(RedeliveryStamp::class);
|
|
||||||
|
|
||||||
return $retryMessageStamp ? $retryMessageStamp->getRetryCount() : 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private function getSenderClassOrAlias(Envelope $envelope): string
|
private function getSenderClassOrAlias(Envelope $envelope): string
|
||||||
{
|
{
|
||||||
/** @var SentStamp|null $sentStamp */
|
/** @var SentStamp|null $sentStamp */
|
||||||
|
Reference in New Issue
Block a user