Adding ConsumedByWorkerStamp as way to mark a message in a "worker context"
This commit is contained in:
parent
38f19a960c
commit
01a9fefe77
@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
|
|||||||
use Doctrine\ORM\EntityManagerInterface;
|
use Doctrine\ORM\EntityManagerInterface;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Middleware\StackInterface;
|
use Symfony\Component\Messenger\Middleware\StackInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears entity manager after calling all handlers.
|
* Clears entity manager after calling all handlers.
|
||||||
@ -28,7 +28,7 @@ class DoctrineClearEntityManagerMiddleware extends AbstractDoctrineMiddleware
|
|||||||
try {
|
try {
|
||||||
return $stack->next()->handle($envelope, $stack);
|
return $stack->next()->handle($envelope, $stack);
|
||||||
} finally {
|
} finally {
|
||||||
if (null !== $envelope->last(ReceivedStamp::class)) {
|
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
|
||||||
$entityManager->clear();
|
$entityManager->clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
|
|||||||
use Doctrine\ORM\EntityManagerInterface;
|
use Doctrine\ORM\EntityManagerInterface;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Middleware\StackInterface;
|
use Symfony\Component\Messenger\Middleware\StackInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes connection and therefore saves number of connections.
|
* Closes connection and therefore saves number of connections.
|
||||||
@ -30,7 +30,7 @@ class DoctrineCloseConnectionMiddleware extends AbstractDoctrineMiddleware
|
|||||||
|
|
||||||
return $stack->next()->handle($envelope, $stack);
|
return $stack->next()->handle($envelope, $stack);
|
||||||
} finally {
|
} finally {
|
||||||
if (null !== $envelope->last(ReceivedStamp::class)) {
|
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
|
||||||
$connection->close();
|
$connection->close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ namespace Symfony\Bridge\Doctrine\Messenger;
|
|||||||
use Doctrine\ORM\EntityManagerInterface;
|
use Doctrine\ORM\EntityManagerInterface;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Middleware\StackInterface;
|
use Symfony\Component\Messenger\Middleware\StackInterface;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks whether the connection is still open or reconnects otherwise.
|
* Checks whether the connection is still open or reconnects otherwise.
|
||||||
@ -25,7 +25,7 @@ class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
|
|||||||
{
|
{
|
||||||
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
|
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
|
||||||
{
|
{
|
||||||
if (null !== $envelope->last(ReceivedStamp::class)) {
|
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
|
||||||
$this->pingConnection($entityManager);
|
$this->pingConnection($entityManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ use Doctrine\ORM\EntityManagerInterface;
|
|||||||
use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware;
|
use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||||
|
|
||||||
class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
|
class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
|
||||||
@ -36,7 +36,7 @@ class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
|
|||||||
$middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default');
|
$middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default');
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new ReceivedStamp('async'),
|
new ConsumedByWorkerStamp(),
|
||||||
]);
|
]);
|
||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface;
|
|||||||
use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware;
|
use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||||
|
|
||||||
class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
|
class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
|
||||||
@ -51,7 +51,7 @@ class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
|
|||||||
;
|
;
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new ReceivedStamp('async'),
|
new ConsumedByWorkerStamp(),
|
||||||
]);
|
]);
|
||||||
$this->middleware->handle($envelope, $this->getStackMock());
|
$this->middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ use Doctrine\ORM\EntityManagerInterface;
|
|||||||
use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware;
|
use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
|
||||||
|
|
||||||
class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
|
class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
|
||||||
@ -58,7 +58,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
|
|||||||
;
|
;
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new ReceivedStamp('async'),
|
new ConsumedByWorkerStamp(),
|
||||||
]);
|
]);
|
||||||
$this->middleware->handle($envelope, $this->getStackMock());
|
$this->middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
@ -75,7 +75,7 @@ class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
|
|||||||
;
|
;
|
||||||
|
|
||||||
$envelope = new Envelope(new \stdClass(), [
|
$envelope = new Envelope(new \stdClass(), [
|
||||||
new ReceivedStamp('async'),
|
new ConsumedByWorkerStamp(),
|
||||||
]);
|
]);
|
||||||
$this->middleware->handle($envelope, $this->getStackMock());
|
$this->middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@
|
|||||||
"symfony/dependency-injection": "^3.4|^4.0|^5.0",
|
"symfony/dependency-injection": "^3.4|^4.0|^5.0",
|
||||||
"symfony/form": "^4.4|^5.0",
|
"symfony/form": "^4.4|^5.0",
|
||||||
"symfony/http-kernel": "^3.4|^4.0|^5.0",
|
"symfony/http-kernel": "^3.4|^4.0|^5.0",
|
||||||
"symfony/messenger": "^4.3|^5.0",
|
"symfony/messenger": "^4.4|^5.0",
|
||||||
"symfony/property-access": "^3.4|^4.0|^5.0",
|
"symfony/property-access": "^3.4|^4.0|^5.0",
|
||||||
"symfony/property-info": "^3.4|^4.0|^5.0",
|
"symfony/property-info": "^3.4|^4.0|^5.0",
|
||||||
"symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0",
|
"symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0",
|
||||||
|
@ -0,0 +1,19 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A marker that this message was consumed by a worker process.
|
||||||
|
*/
|
||||||
|
class ConsumedByWorkerStamp implements NonSendableStampInterface
|
||||||
|
{
|
||||||
|
}
|
@ -174,7 +174,8 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
|
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
|
||||||
with stamps: [
|
with stamps: [
|
||||||
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
|
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
|
||||||
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
|
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
|
||||||
|
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
|
||||||
]
|
]
|
||||||
Done.
|
Done.
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
|||||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||||
use Symfony\Component\Messenger\MessageBusInterface;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
@ -47,11 +48,11 @@ class WorkerTest extends TestCase
|
|||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
|
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with(
|
$bus->expects($this->at(0))->method('dispatch')->with(
|
||||||
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')])
|
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
||||||
)->willReturn($envelope);
|
)->willReturn($envelope);
|
||||||
|
|
||||||
$bus->expects($this->at(1))->method('dispatch')->with(
|
$bus->expects($this->at(1))->method('dispatch')->with(
|
||||||
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')])
|
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
|
||||||
)->willReturn($envelope);
|
)->willReturn($envelope);
|
||||||
|
|
||||||
$worker = new Worker(['transport' => $receiver], $bus);
|
$worker = new Worker(['transport' => $receiver], $bus);
|
||||||
@ -69,7 +70,7 @@ class WorkerTest extends TestCase
|
|||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('API'));
|
$envelope = new Envelope(new DummyMessage('API'));
|
||||||
$receiver = new DummyReceiver([[$envelope]]);
|
$receiver = new DummyReceiver([[$envelope]]);
|
||||||
$envelope = $envelope->with(new ReceivedStamp('transport'));
|
$envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
|
||||||
|
|
||||||
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
|
||||||
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
|
||||||
|
@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
|||||||
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
|
||||||
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
use Symfony\Component\Messenger\Stamp\DelayStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
@ -132,7 +133,7 @@ class Worker implements WorkerInterface
|
|||||||
];
|
];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
|
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
|
||||||
} catch (\Throwable $throwable) {
|
} catch (\Throwable $throwable) {
|
||||||
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
|
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
|
||||||
if ($rejectFirst) {
|
if ($rejectFirst) {
|
||||||
|
Reference in New Issue
Block a user