Adding ConsumedByWorkerStamp as way to mark a message in a "worker context"

This commit is contained in:
Ryan Weaver 2019-10-28 11:58:08 -04:00
parent 38f19a960c
commit 01a9fefe77
11 changed files with 41 additions and 19 deletions

View File

@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/**
* Clears entity manager after calling all handlers.
@ -28,7 +28,7 @@ class DoctrineClearEntityManagerMiddleware extends AbstractDoctrineMiddleware
try {
return $stack->next()->handle($envelope, $stack);
} finally {
if (null !== $envelope->last(ReceivedStamp::class)) {
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$entityManager->clear();
}
}

View File

@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/**
* Closes connection and therefore saves number of connections.
@ -30,7 +30,7 @@ class DoctrineCloseConnectionMiddleware extends AbstractDoctrineMiddleware
return $stack->next()->handle($envelope, $stack);
} finally {
if (null !== $envelope->last(ReceivedStamp::class)) {
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$connection->close();
}
}

View File

@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
/**
* Checks whether the connection is still open or reconnects otherwise.
@ -25,7 +25,7 @@ class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
{
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{
if (null !== $envelope->last(ReceivedStamp::class)) {
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
$this->pingConnection($entityManager);
}

View File

@ -16,7 +16,7 @@ use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
@ -36,7 +36,7 @@ class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
$middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default');
$envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'),
new ConsumedByWorkerStamp(),
]);
$middleware->handle($envelope, $this->getStackMock());
}

View File

@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
@ -51,7 +51,7 @@ class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
;
$envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'),
new ConsumedByWorkerStamp(),
]);
$this->middleware->handle($envelope, $this->getStackMock());
}

View File

@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface;
use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
@ -58,7 +58,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
;
$envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'),
new ConsumedByWorkerStamp(),
]);
$this->middleware->handle($envelope, $this->getStackMock());
}
@ -75,7 +75,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
;
$envelope = new Envelope(new \stdClass(), [
new ReceivedStamp('async'),
new ConsumedByWorkerStamp(),
]);
$this->middleware->handle($envelope, $this->getStackMock());
}

View File

@ -29,7 +29,7 @@
"symfony/dependency-injection": "^3.4|^4.0|^5.0",
"symfony/form": "^4.4|^5.0",
"symfony/http-kernel": "^3.4|^4.0|^5.0",
"symfony/messenger": "^4.3|^5.0",
"symfony/messenger": "^4.4|^5.0",
"symfony/property-access": "^3.4|^4.0|^5.0",
"symfony/property-info": "^3.4|^4.0|^5.0",
"symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0",

View File

@ -0,0 +1,19 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Stamp;
/**
* A marker that this message was consumed by a worker process.
*/
class ConsumedByWorkerStamp implements NonSendableStampInterface
{
}

View File

@ -174,7 +174,8 @@ class AmqpExtIntegrationTest extends TestCase
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
with stamps: [
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
]
Done.

View File

@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
@ -47,11 +48,11 @@ class WorkerTest extends TestCase
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with(
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')])
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with(
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')])
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
)->willReturn($envelope);
$worker = new Worker(['transport' => $receiver], $bus);
@ -69,7 +70,7 @@ class WorkerTest extends TestCase
{
$envelope = new Envelope(new DummyMessage('API'));
$receiver = new DummyReceiver([[$envelope]]);
$envelope = $envelope->with(new ReceivedStamp('transport'));
$envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);

View File

@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
@ -132,7 +133,7 @@ class Worker implements WorkerInterface
];
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
} catch (\Throwable $throwable) {
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
if ($rejectFirst) {