From 01a9fefe771ac01a100cc1389197963414148bd2 Mon Sep 17 00:00:00 2001 From: Ryan Weaver Date: Mon, 28 Oct 2019 11:58:08 -0400 Subject: [PATCH] Adding ConsumedByWorkerStamp as way to mark a message in a "worker context" --- .../DoctrineClearEntityManagerMiddleware.php | 4 ++-- .../DoctrineCloseConnectionMiddleware.php | 4 ++-- .../DoctrinePingConnectionMiddleware.php | 4 ++-- ...ctrineClearEntityManagerMiddlewareTest.php | 4 ++-- .../DoctrineCloseConnectionMiddlewareTest.php | 4 ++-- .../DoctrinePingConnectionMiddlewareTest.php | 6 +++--- src/Symfony/Bridge/Doctrine/composer.json | 2 +- .../Messenger/Stamp/ConsumedByWorkerStamp.php | 19 +++++++++++++++++++ .../AmqpExt/AmqpExtIntegrationTest.php | 3 ++- .../Component/Messenger/Tests/WorkerTest.php | 7 ++++--- src/Symfony/Component/Messenger/Worker.php | 3 ++- 11 files changed, 41 insertions(+), 19 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Stamp/ConsumedByWorkerStamp.php diff --git a/src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php b/src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php index 16aec310c6..0bef9be641 100644 --- a/src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php +++ b/src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php @@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger; use Doctrine\ORM\EntityManagerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Middleware\StackInterface; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; /** * Clears entity manager after calling all handlers. @@ -28,7 +28,7 @@ class DoctrineClearEntityManagerMiddleware extends AbstractDoctrineMiddleware try { return $stack->next()->handle($envelope, $stack); } finally { - if (null !== $envelope->last(ReceivedStamp::class)) { + if (null !== $envelope->last(ConsumedByWorkerStamp::class)) { $entityManager->clear(); } } diff --git a/src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php b/src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php index b5160d13aa..b0a96e05da 100644 --- a/src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php +++ b/src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php @@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger; use Doctrine\ORM\EntityManagerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Middleware\StackInterface; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; /** * Closes connection and therefore saves number of connections. @@ -30,7 +30,7 @@ class DoctrineCloseConnectionMiddleware extends AbstractDoctrineMiddleware return $stack->next()->handle($envelope, $stack); } finally { - if (null !== $envelope->last(ReceivedStamp::class)) { + if (null !== $envelope->last(ConsumedByWorkerStamp::class)) { $connection->close(); } } diff --git a/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php b/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php index a5c2531afe..f6febb2a7e 100644 --- a/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php +++ b/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php @@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger; use Doctrine\ORM\EntityManagerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Middleware\StackInterface; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; /** * Checks whether the connection is still open or reconnects otherwise. @@ -25,7 +25,7 @@ class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware { protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope { - if (null !== $envelope->last(ReceivedStamp::class)) { + if (null !== $envelope->last(ConsumedByWorkerStamp::class)) { $this->pingConnection($entityManager); } diff --git a/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php b/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php index 4b53fef761..580cd3fb8e 100644 --- a/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php +++ b/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php @@ -16,7 +16,7 @@ use Doctrine\ORM\EntityManagerInterface; use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase @@ -36,7 +36,7 @@ class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase $middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default'); $envelope = new Envelope(new \stdClass(), [ - new ReceivedStamp('async'), + new ConsumedByWorkerStamp(), ]); $middleware->handle($envelope, $this->getStackMock()); } diff --git a/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php b/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php index 0f44049bb6..5a0b7a7452 100644 --- a/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php +++ b/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php @@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface; use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase @@ -51,7 +51,7 @@ class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase ; $envelope = new Envelope(new \stdClass(), [ - new ReceivedStamp('async'), + new ConsumedByWorkerStamp(), ]); $this->middleware->handle($envelope, $this->getStackMock()); } diff --git a/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php b/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php index 1b09189cc9..446d63013d 100644 --- a/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php +++ b/src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php @@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface; use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; -use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase @@ -58,7 +58,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase ; $envelope = new Envelope(new \stdClass(), [ - new ReceivedStamp('async'), + new ConsumedByWorkerStamp(), ]); $this->middleware->handle($envelope, $this->getStackMock()); } @@ -75,7 +75,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase ; $envelope = new Envelope(new \stdClass(), [ - new ReceivedStamp('async'), + new ConsumedByWorkerStamp(), ]); $this->middleware->handle($envelope, $this->getStackMock()); } diff --git a/src/Symfony/Bridge/Doctrine/composer.json b/src/Symfony/Bridge/Doctrine/composer.json index 0578a4be8e..75eaffdc5a 100644 --- a/src/Symfony/Bridge/Doctrine/composer.json +++ b/src/Symfony/Bridge/Doctrine/composer.json @@ -29,7 +29,7 @@ "symfony/dependency-injection": "^3.4|^4.0|^5.0", "symfony/form": "^4.4|^5.0", "symfony/http-kernel": "^3.4|^4.0|^5.0", - "symfony/messenger": "^4.3|^5.0", + "symfony/messenger": "^4.4|^5.0", "symfony/property-access": "^3.4|^4.0|^5.0", "symfony/property-info": "^3.4|^4.0|^5.0", "symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0", diff --git a/src/Symfony/Component/Messenger/Stamp/ConsumedByWorkerStamp.php b/src/Symfony/Component/Messenger/Stamp/ConsumedByWorkerStamp.php new file mode 100644 index 0000000000..3ae37ba6ad --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/ConsumedByWorkerStamp.php @@ -0,0 +1,19 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * A marker that this message was consumed by a worker process. + */ +class ConsumedByWorkerStamp implements NonSendableStampInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index d62e3014da..9fddc0646c 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -174,7 +174,8 @@ class AmqpExtIntegrationTest extends TestCase Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage with stamps: [ "Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp", - "Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp" + "Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp", + "Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp" ] Done. diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 9a09c0a04a..c4d6c782af 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Retry\RetryStrategyInterface; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; @@ -47,11 +48,11 @@ class WorkerTest extends TestCase $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with( - $envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')]) + $envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) )->willReturn($envelope); $bus->expects($this->at(1))->method('dispatch')->with( - $envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')]) + $envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()]) )->willReturn($envelope); $worker = new Worker(['transport' => $receiver], $bus); @@ -69,7 +70,7 @@ class WorkerTest extends TestCase { $envelope = new Envelope(new DummyMessage('API')); $receiver = new DummyReceiver([[$envelope]]); - $envelope = $envelope->with(new ReceivedStamp('transport')); + $envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp()); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope); diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 41a589fae2..1765fbec2a 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface; use Symfony\Component\Messenger\Retry\RetryStrategyInterface; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; @@ -132,7 +133,7 @@ class Worker implements WorkerInterface ]; try { - $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName))); + $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp())); } catch (\Throwable $throwable) { $rejectFirst = $throwable instanceof RejectRedeliveredMessageException; if ($rejectFirst) {