feature #30759 [Messenger] Adding the "sync" transport to call handlers synchronously (weaverryan)
This PR was merged into the 4.3-dev branch.
Discussion
----------
[Messenger] Adding the "sync" transport to call handlers synchronously
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | no
| Deprecations? | no
| Tests pass? | yes
| Fixed tickets | none
| License | MIT
| Doc PR | symfony/symfony-docs#11236
This adds a `sync://` transport that just calls the handlers immediately. Why? This allows you to route your messages to some "async" transport. But then, when developing locally or running your tests, you can choose to run them synchronously instead:
```yml
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\SmsNotification': async
'App\Message\OtherMessage': async
```
```
# .env
# by default, handle this sync
MESSENGER_TRANSPORT_DSN=sync://
```
```
# .env.local on production (or set this via real env vars)
# on production, use amqp
MESSENGER_TRANSPORT_DSN=amqp://.......
```
Cheers!
Commits
-------
3da5a438aa
Adding the "sync" transport to call handlers synchronously
This commit is contained in:
commit
6ffad7bba4
@ -67,6 +67,10 @@
|
||||
<argument type="service" id="messenger.transport.serializer" />
|
||||
</service>
|
||||
|
||||
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
|
||||
<tag name="messenger.transport_factory" />
|
||||
</service>
|
||||
|
||||
<!-- retry -->
|
||||
<service id="messenger.retry_strategy_locator">
|
||||
<tag name="container.service_locator" />
|
||||
|
@ -4,6 +4,8 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||
explicitly handle messages asynchronously.
|
||||
* Added optional parameter `prefetch_count` in connection configuration,
|
||||
to setup channel prefetch count
|
||||
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||
|
@ -15,6 +15,7 @@ use Psr\Log\LoggerAwareTrait;
|
||||
use Psr\Log\NullLogger;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
@ -81,7 +82,13 @@ class SendMessageMiddleware implements MiddlewareInterface
|
||||
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
|
||||
}
|
||||
|
||||
// on a redelivery, never call local handlers
|
||||
// if the message was marked (usually by SyncTransport) that it handlers
|
||||
// MUST be called, mark them to be handled.
|
||||
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
|
||||
$handle = true;
|
||||
}
|
||||
|
||||
// on a redelivery, only send back to queue: never call local handlers
|
||||
if (null !== $redeliveryStamp) {
|
||||
$handle = false;
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Stamp;
|
||||
|
||||
/**
|
||||
* Stamp marks that the handlers *should* be called immediately.
|
||||
*
|
||||
* This is used by the SyncTransport to indicate to the
|
||||
* SendMessageMiddleware that handlers *should* be called
|
||||
* immediately, even though a transport was set.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class ForceCallHandlersStamp implements StampInterface
|
||||
{
|
||||
}
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware;
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
|
||||
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
|
||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
|
||||
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
|
||||
use Symfony\Component\Messenger\Stamp\SentStamp;
|
||||
@ -86,6 +87,8 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
public function testItSendsToOnlyOneSenderOnRedelivery()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
|
||||
// even with a ForceCallHandlersStamp, the next middleware won't be called
|
||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
|
||||
@ -237,7 +240,7 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
$middleware->handle($envelope, $this->getStackMock());
|
||||
}
|
||||
|
||||
public function testItDoesNotDispatchOnRetry()
|
||||
public function testItDoesNotDispatchOnRedeliver()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
||||
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
|
||||
@ -251,4 +254,18 @@ class SendMessageMiddlewareTest extends MiddlewareTestCase
|
||||
|
||||
$middleware->handle($envelope, $this->getStackMock(false));
|
||||
}
|
||||
|
||||
public function testItHandlesWithForceCallHandlersStamp()
|
||||
{
|
||||
$envelope = new Envelope(new DummyMessage('original envelope'));
|
||||
$envelope = $envelope->with(new ForceCallHandlersStamp());
|
||||
|
||||
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
|
||||
$sender->expects($this->once())->method('send')->willReturn($envelope);
|
||||
|
||||
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
|
||||
|
||||
// next handler *should* be called
|
||||
$middleware->handle($envelope, $this->getStackMock(true));
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,52 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* A "fake" transport that marks messages to be handled immediately.
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class SyncTransport implements TransportInterface
|
||||
{
|
||||
public function receive(callable $handler): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
|
||||
}
|
||||
|
||||
public function stop(): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
|
||||
}
|
||||
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
|
||||
}
|
||||
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
|
||||
}
|
||||
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
return $envelope->with(new ForceCallHandlersStamp());
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Sync;
|
||||
|
||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @experimental in 4.3
|
||||
*
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*/
|
||||
class SyncTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
public function createTransport(string $dsn, array $options): TransportInterface
|
||||
{
|
||||
return new SyncTransport();
|
||||
}
|
||||
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === strpos($dsn, 'sync://');
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user