Adding ConsumedByWorkerStamp as way to mark a message in a "worker context"

This commit is contained in:
Ryan Weaver 2019-10-28 11:58:08 -04:00
parent 38f19a960c
commit 01a9fefe77
11 changed files with 41 additions and 19 deletions

View File

@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface; use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/** /**
* Clears entity manager after calling all handlers. * Clears entity manager after calling all handlers.
@ -28,7 +28,7 @@ class DoctrineClearEntityManagerMiddleware extends AbstractDoctrineMiddleware
try { try {
return $stack->next()->handle($envelope, $stack); return $stack->next()->handle($envelope, $stack);
} finally { } finally {
if (null !== $envelope->last(ReceivedStamp::class)) { if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$entityManager->clear(); $entityManager->clear();
} }
} }

View File

@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface; use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/** /**
* Closes connection and therefore saves number of connections. * Closes connection and therefore saves number of connections.
@ -30,7 +30,7 @@ class DoctrineCloseConnectionMiddleware extends AbstractDoctrineMiddleware
return $stack->next()->handle($envelope, $stack); return $stack->next()->handle($envelope, $stack);
} finally { } finally {
if (null !== $envelope->last(ReceivedStamp::class)) { if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$connection->close(); $connection->close();
} }
} }

View File

@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface; use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/** /**
* Checks whether the connection is still open or reconnects otherwise. * Checks whether the connection is still open or reconnects otherwise.
@ -25,7 +25,7 @@ class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
{ {
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{ {
if (null !== $envelope->last(ReceivedStamp::class)) { if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$this->pingConnection($entityManager); $this->pingConnection($entityManager);
} }

View File

@ -16,7 +16,7 @@ use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware; use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
@ -36,7 +36,7 @@ class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
$middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default'); $middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default');
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'), new ConsumedByWorkerStamp(),
]); ]);
$middleware->handle($envelope, $this->getStackMock()); $middleware->handle($envelope, $this->getStackMock());
} }

View File

@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware; use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
@ -51,7 +51,7 @@ class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
; ;
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'), new ConsumedByWorkerStamp(),
]); ]);
$this->middleware->handle($envelope, $this->getStackMock()); $this->middleware->handle($envelope, $this->getStackMock());
} }

View File

@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware; use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
@ -58,7 +58,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
; ;
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'), new ConsumedByWorkerStamp(),
]); ]);
$this->middleware->handle($envelope, $this->getStackMock()); $this->middleware->handle($envelope, $this->getStackMock());
} }
@ -75,7 +75,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
; ;
$envelope = new Envelope(new \stdClass(), [ $envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'), new ConsumedByWorkerStamp(),
]); ]);
$this->middleware->handle($envelope, $this->getStackMock()); $this->middleware->handle($envelope, $this->getStackMock());
} }

View File

@ -29,7 +29,7 @@
"symfony/dependency-injection": "^3.4|^4.0|^5.0", "symfony/dependency-injection": "^3.4|^4.0|^5.0",
"symfony/form": "^4.4|^5.0", "symfony/form": "^4.4|^5.0",
"symfony/http-kernel": "^3.4|^4.0|^5.0", "symfony/http-kernel": "^3.4|^4.0|^5.0",
"symfony/messenger": "^4.3|^5.0", "symfony/messenger": "^4.4|^5.0",
"symfony/property-access": "^3.4|^4.0|^5.0", "symfony/property-access": "^3.4|^4.0|^5.0",
"symfony/property-info": "^3.4|^4.0|^5.0", "symfony/property-info": "^3.4|^4.0|^5.0",
"symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0", "symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0",

View File

@ -0,0 +1,19 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* A marker that this message was consumed by a worker process.
*/
class ConsumedByWorkerStamp implements NonSendableStampInterface
{
}

View File

@ -174,7 +174,8 @@ class AmqpExtIntegrationTest extends TestCase
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with stamps: [ with stamps: [
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp", "Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp" "Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
] ]
Done. Done.

View File

@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\SentStamp;
@ -47,11 +48,11 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with( $bus->expects($this->at(0))->method('dispatch')->with(
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')]) $envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturn($envelope); )->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with( $bus->expects($this->at(1))->method('dispatch')->with(
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')]) $envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturn($envelope); )->willReturn($envelope);
$worker = new Worker(['transport' => $receiver], $bus); $worker = new Worker(['transport' => $receiver], $bus);
@ -69,7 +70,7 @@ class WorkerTest extends TestCase
{ {
$envelope = new Envelope(new DummyMessage('API')); $envelope = new Envelope(new DummyMessage('API'));
$receiver = new DummyReceiver([[$envelope]]); $receiver = new DummyReceiver([[$envelope]]);
$envelope = $envelope->with(new ReceivedStamp('transport')); $envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope); $bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);

View File

@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface; use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
@ -132,7 +133,7 @@ class Worker implements WorkerInterface
]; ];
try { try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName))); $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
} catch (\Throwable $throwable) { } catch (\Throwable $throwable) {
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException; $rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
if ($rejectFirst) { if ($rejectFirst) {