feature #28419 [Messenger] Change AmqpExt classes constructor signature (fabpot)
This PR was merged into the 4.2-dev branch.
Discussion
----------
[Messenger] Change AmqpExt classes constructor signature
| Q | A
| ------------- | ---
| Branch? | master
| Bug fix? | no
| New feature? | yes
| BC breaks? | yes
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass? | yes <!-- please add some, will be required by reviewers -->
| Fixed tickets | n/a
| License | MIT
| Doc PR | n/a
This pull requests does 2 things:
* It makes `Connection` a first argument of AmqpExt classes. I think it makes sense as this is the most important argument for those classes.
* As the `Serializer` is now a second argument, I propose to make it optional and use the default serializer that we've added recently if `null` (`Serializer::create()`)
It makes the component even more user friendly when not using Symfony full stack (and provide good defaults).
Commits
-------
2d55ae5212
[Messenger] changed AmqpExt classes constructor signature
This commit is contained in:
commit
766a82bddf
@ -4,11 +4,13 @@ CHANGELOG
|
|||||||
4.2.0
|
4.2.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
* [BC BREAK] The signature of Amqp* classes changed to take a `Connection` as a first argument and an optional
|
||||||
|
`Serializer` as a second argument.
|
||||||
* [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator`
|
* [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator`
|
||||||
Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders.
|
Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders.
|
||||||
Instead, it accepts the sender instance itself instead of its identifier in the container.
|
Instead, it accepts the sender instance itself instead of its identifier in the container.
|
||||||
* [BC BREAK] `MessageSubscriberInterface::getHandledMessages()` return value has changed. The value of an array item
|
* [BC BREAK] `MessageSubscriberInterface::getHandledMessages()` return value has changed. The value of an array item
|
||||||
needs to be an associative array or the method name.
|
needs to be an associative array or the method name.
|
||||||
* `ValidationMiddleware::handle()` and `SendMessageMiddleware::handle()` now require an `Envelope` object
|
* `ValidationMiddleware::handle()` and `SendMessageMiddleware::handle()` now require an `Envelope` object
|
||||||
* `EnvelopeItemInterface` doesn't extend `Serializable` anymore
|
* `EnvelopeItemInterface` doesn't extend `Serializable` anymore
|
||||||
* [BC BREAK] The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface`
|
* [BC BREAK] The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface`
|
||||||
|
@ -48,8 +48,8 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
|
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
|
||||||
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
|
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
|
||||||
@ -74,7 +74,7 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
|
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
|
||||||
|
|
||||||
$amqpReadTimeout = 30;
|
$amqpReadTimeout = 30;
|
||||||
@ -123,7 +123,7 @@ TXT
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
$receivedMessages = 0;
|
$receivedMessages = 0;
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
||||||
|
@ -44,7 +44,7 @@ class AmqpReceiverTest extends TestCase
|
|||||||
|
|
||||||
$connection->expects($this->once())->method('ack')->with($envelope);
|
$connection->expects($this->once())->method('ack')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||||
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
@ -71,7 +71,7 @@ class AmqpReceiverTest extends TestCase
|
|||||||
|
|
||||||
$connection->expects($this->once())->method('nack')->with($envelope);
|
$connection->expects($this->once())->method('nack')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function () {
|
$receiver->receive(function () {
|
||||||
throw new InterruptException('Well...');
|
throw new InterruptException('Well...');
|
||||||
});
|
});
|
||||||
@ -96,7 +96,7 @@ class AmqpReceiverTest extends TestCase
|
|||||||
$connection->method('get')->willReturn($envelope);
|
$connection->method('get')->willReturn($envelope);
|
||||||
$connection->expects($this->once())->method('reject')->with($envelope);
|
$connection->expects($this->once())->method('reject')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function () {
|
$receiver->receive(function () {
|
||||||
throw new WillNeverWorkException('Well...');
|
throw new WillNeverWorkException('Well...');
|
||||||
});
|
});
|
||||||
|
@ -34,7 +34,7 @@ class AmqpSenderTest extends TestCase
|
|||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$sender->send($envelope);
|
$sender->send($envelope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ class AmqpTransportFactoryTest extends TestCase
|
|||||||
true
|
true
|
||||||
);
|
);
|
||||||
|
|
||||||
$expectedTransport = new AmqpTransport($serializer, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true));
|
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), $serializer);
|
||||||
|
|
||||||
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
|
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
|
||||||
}
|
}
|
||||||
|
@ -59,6 +59,6 @@ class AmqpTransportTest extends TestCase
|
|||||||
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
|
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||||
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
return new AmqpTransport($serializer, $connection);
|
return new AmqpTransport($connection, $serializer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ $serializer = new Serializer(
|
|||||||
);
|
);
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('DSN'));
|
$connection = Connection::fromDsn(getenv('DSN'));
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
||||||
public function dispatch($envelope)
|
public function dispatch($envelope)
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -26,10 +27,10 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
private $connection;
|
private $connection;
|
||||||
private $shouldStop;
|
private $shouldStop;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -25,10 +26,10 @@ class AmqpSender implements SenderInterface
|
|||||||
private $serializer;
|
private $serializer;
|
||||||
private $connection;
|
private $connection;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
|
|
||||||
@ -25,10 +26,10 @@ class AmqpTransport implements TransportInterface
|
|||||||
private $receiver;
|
private $receiver;
|
||||||
private $sender;
|
private $sender;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -57,11 +58,11 @@ class AmqpTransport implements TransportInterface
|
|||||||
|
|
||||||
private function getReceiver()
|
private function getReceiver()
|
||||||
{
|
{
|
||||||
return $this->receiver = new AmqpReceiver($this->serializer, $this->connection);
|
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getSender()
|
private function getSender()
|
||||||
{
|
{
|
||||||
return $this->sender = new AmqpSender($this->serializer, $this->connection);
|
return $this->sender = new AmqpSender($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
@ -23,15 +24,15 @@ class AmqpTransportFactory implements TransportFactoryInterface
|
|||||||
private $serializer;
|
private $serializer;
|
||||||
private $debug;
|
private $debug;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, bool $debug)
|
public function __construct(SerializerInterface $serializer = null, bool $debug = false)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
$this->debug = $debug;
|
$this->debug = $debug;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function createTransport(string $dsn, array $options): TransportInterface
|
public function createTransport(string $dsn, array $options): TransportInterface
|
||||||
{
|
{
|
||||||
return new AmqpTransport($this->serializer, Connection::fromDsn($dsn, $options, $this->debug));
|
return new AmqpTransport(Connection::fromDsn($dsn, $options, $this->debug), $this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function supports(string $dsn, array $options): bool
|
public function supports(string $dsn, array $options): bool
|
||||||
|
Reference in New Issue
Block a user