[Messenger] Add WorkerStoppedEvent

This commit is contained in:
Robin Chalas 2019-04-27 01:00:27 +02:00
parent 27d10a658d
commit 0e7898b622
4 changed files with 34 additions and 4 deletions

View File

@ -4,6 +4,7 @@ CHANGELOG
4.3.0
-----
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
* Added optional `MessageCountAwareInterface` that receivers can implement
to give information about how many messages are waiting to be processed.
* [BC BREAK] The `Envelope::__construct()` signature changed:

View File

@ -0,0 +1,23 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Event;
/**
* Dispatched when a worker has been stopped.
*
* @author Robin Chalas <robin.chalas@gmail.com>
*
* @experimental in 4.3
*/
class WorkerStoppedEvent
{
}

View File

@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
@ -187,11 +188,12 @@ class WorkerTest extends TestCase
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
$eventDispatcher->expects($this->exactly(3))
->method('dispatch')
->withConsecutive(
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
[$this->isInstanceOf(WorkerMessageHandledEvent::class)],
[$this->isInstanceOf(WorkerStoppedEvent::class)]
);
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);
@ -214,11 +216,12 @@ class WorkerTest extends TestCase
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
$eventDispatcher->expects($this->exactly(3))
->method('dispatch')
->withConsecutive(
[$this->isInstanceOf(WorkerMessageReceivedEvent::class)],
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
[$this->isInstanceOf(WorkerMessageFailedEvent::class)],
[$this->isInstanceOf(WorkerStoppedEvent::class)]
);
$worker = new Worker([$receiver], $bus, [], $eventDispatcher);

View File

@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
@ -109,6 +110,8 @@ class Worker implements WorkerInterface
usleep($options['sleep']);
}
}
$this->dispatchEvent(new WorkerStoppedEvent());
}
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)