Adding the "sync" transport to call handlers synchronously
This commit is contained in:
parent
162d5a8272
commit
3da5a438aa
@ -67,6 +67,10 @@
|
|||||||
<argument type="service" id="messenger.transport.serializer" />
|
<argument type="service" id="messenger.transport.serializer" />
|
||||||
</service>
|
</service>
|
||||||
|
|
||||||
|
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
||||||
|
<tag name="messenger.transport_factory" />
|
||||||
|
</service>
|
||||||
|
|
||||||
<!-- retry -->
|
<!-- retry -->
|
||||||
<service id="messenger.retry_strategy_locator">
|
<service id="messenger.retry_strategy_locator">
|
||||||
<tag name="container.service_locator" />
|
<tag name="container.service_locator" />
|
||||||
|
@ -4,6 +4,8 @@ CHANGELOG
|
|||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||||
|
explicitly handle messages asynchronously.
|
||||||
* Added optional parameter `prefetch_count` in connection configuration,
|
* Added optional parameter `prefetch_count` in connection configuration,
|
||||||
to setup channel prefetch count
|
to setup channel prefetch count
|
||||||
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||||
|
@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait;
|
|||||||
use Psr\Log\NullLogger;
|
use Psr\Log\NullLogger;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
@ -81,7 +82,13 @@ class SendMessageMiddleware implements MiddlewareInterface
|
|||||||
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// on a redelivery, never call local handlers
|
// if the message was marked (usually by SyncTransport) that it handlers
|
||||||
|
// MUST be called, mark them to be handled.
|
||||||
|
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
|
||||||
|
$handle = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// on a redelivery, only send back to queue: never call local handlers
|
||||||
if (null !== $redeliveryStamp) {
|
if (null !== $redeliveryStamp) {
|
||||||
$handle = false;
|
$handle = false;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Stamp;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stamp marks that the handlers *should* be called immediately.
|
||||||
|
*
|
||||||
|
* This is used by the SyncTransport to indicate to the
|
||||||
|
* SendMessageMiddleware that handlers *should* be called
|
||||||
|
* immediately, even though a transport was set.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
class ForceCallHandlersStamp implements StampInterface
|
||||||
|
{
|
||||||
|
}
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
|
|||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||||
@ -86,6 +87,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
public function testItSendsToOnlyOneSenderOnRedelivery()
|
public function testItSendsToOnlyOneSenderOnRedelivery()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
|
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
|
||||||
|
// even with a ForceCallHandlersStamp, the next middleware won't be called
|
||||||
|
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
|
||||||
@ -237,7 +240,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
$middleware->handle($envelope, $this->getStackMock());
|
$middleware->handle($envelope, $this->getStackMock());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItDoesNotDispatchOnRetry()
|
public function testItDoesNotDispatchOnRedeliver()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
$envelope = new Envelope(new DummyMessage('original envelope'));
|
||||||
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
|
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
|
||||||
@ -251,4 +254,18 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
|||||||
|
|
||||||
$middleware->handle($envelope, $this->getStackMock(false));
|
$middleware->handle($envelope, $this->getStackMock(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testItHandlesWithForceCallHandlersStamp()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new DummyMessage('original envelope'));
|
||||||
|
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||||
|
|
||||||
|
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||||
|
$sender->expects($this->once())->method('send')->willReturn($envelope);
|
||||||
|
|
||||||
|
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||||
|
|
||||||
|
// next handler *should* be called
|
||||||
|
$middleware->handle($envelope, $this->getStackMock(true));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,52 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||||
|
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||||
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A "fake" transport that marks messages to be handled immediately.
|
||||||
|
*
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
class SyncTransport implements TransportInterface
|
||||||
|
{
|
||||||
|
public function receive(callable $handler): void
|
||||||
|
{
|
||||||
|
throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function stop(): void
|
||||||
|
{
|
||||||
|
throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function ack(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function reject(Envelope $envelope): void
|
||||||
|
{
|
||||||
|
throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
|
||||||
|
}
|
||||||
|
|
||||||
|
public function send(Envelope $envelope): Envelope
|
||||||
|
{
|
||||||
|
return $envelope->with(new ForceCallHandlersStamp());
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,33 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This file is part of the Symfony package.
|
||||||
|
*
|
||||||
|
* (c) Fabien Potencier <fabien@symfony.com>
|
||||||
|
*
|
||||||
|
* For the full copyright and license information, please view the LICENSE
|
||||||
|
* file that was distributed with this source code.
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @experimental in 4.3
|
||||||
|
*
|
||||||
|
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||||
|
*/
|
||||||
|
class SyncTransportFactory implements TransportFactoryInterface
|
||||||
|
{
|
||||||
|
public function createTransport(string $dsn, array $options): TransportInterface
|
||||||
|
{
|
||||||
|
return new SyncTransport();
|
||||||
|
}
|
||||||
|
|
||||||
|
public function supports(string $dsn, array $options): bool
|
||||||
|
{
|
||||||
|
return 0 === strpos($dsn, 'sync://');
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user