[Messenger] changed AmqpExt classes constructor signature

This commit is contained in:
Fabien Potencier 2018-09-10 08:16:50 +02:00
parent 0cd1da66d7
commit 2d55ae5212
11 changed files with 29 additions and 23 deletions

View File

@ -4,6 +4,8 @@ CHANGELOG
4.2.0 4.2.0
----- -----
* [BC BREAK] The signature of Amqp* classes changed to take a `Connection` as a first argument and an optional
`Serializer` as a second argument.
* [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator` * [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator`
Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders. Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders.
Instead, it accepts the sender instance itself instead of its identifier in the container. Instead, it accepts the sender instance itself instead of its identifier in the container.

View File

@ -48,8 +48,8 @@ class AmqpExtIntegrationTest extends TestCase
$connection->setup(); $connection->setup();
$connection->queue()->purge(); $connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection); $sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($serializer, $connection); $receiver = new AmqpReceiver($connection, $serializer);
$sender->send($first = Envelope::wrap(new DummyMessage('First'))); $sender->send($first = Envelope::wrap(new DummyMessage('First')));
$sender->send($second = Envelope::wrap(new DummyMessage('Second'))); $sender->send($second = Envelope::wrap(new DummyMessage('Second')));
@ -74,7 +74,7 @@ class AmqpExtIntegrationTest extends TestCase
$connection->setup(); $connection->setup();
$connection->queue()->purge(); $connection->queue()->purge();
$sender = new AmqpSender($serializer, $connection); $sender = new AmqpSender($connection, $serializer);
$sender->send(Envelope::wrap(new DummyMessage('Hello'))); $sender->send(Envelope::wrap(new DummyMessage('Hello')));
$amqpReadTimeout = 30; $amqpReadTimeout = 30;
@ -123,7 +123,7 @@ TXT
$connection->setup(); $connection->setup();
$connection->queue()->purge(); $connection->queue()->purge();
$receiver = new AmqpReceiver($serializer, $connection); $receiver = new AmqpReceiver($connection, $serializer);
$receivedMessages = 0; $receivedMessages = 0;
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {

View File

@ -44,7 +44,7 @@ class AmqpReceiverTest extends TestCase
$connection->expects($this->once())->method('ack')->with($envelope); $connection->expects($this->once())->method('ack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection); $receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) { $receiver->receive(function (?Envelope $envelope) use ($receiver) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
$receiver->stop(); $receiver->stop();
@ -71,7 +71,7 @@ class AmqpReceiverTest extends TestCase
$connection->expects($this->once())->method('nack')->with($envelope); $connection->expects($this->once())->method('nack')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection); $receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () { $receiver->receive(function () {
throw new InterruptException('Well...'); throw new InterruptException('Well...');
}); });
@ -96,7 +96,7 @@ class AmqpReceiverTest extends TestCase
$connection->method('get')->willReturn($envelope); $connection->method('get')->willReturn($envelope);
$connection->expects($this->once())->method('reject')->with($envelope); $connection->expects($this->once())->method('reject')->with($envelope);
$receiver = new AmqpReceiver($serializer, $connection); $receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () { $receiver->receive(function () {
throw new WillNeverWorkException('Well...'); throw new WillNeverWorkException('Well...');
}); });

View File

@ -34,7 +34,7 @@ class AmqpSenderTest extends TestCase
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']); $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
$sender = new AmqpSender($serializer, $connection); $sender = new AmqpSender($connection, $serializer);
$sender->send($envelope); $sender->send($envelope);
} }
} }

View File

@ -38,7 +38,7 @@ class AmqpTransportFactoryTest extends TestCase
true true
); );
$expectedTransport = new AmqpTransport($serializer, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true)); $expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), $serializer);
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar'))); $this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
} }

View File

@ -59,6 +59,6 @@ class AmqpTransportTest extends TestCase
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock(); $serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
return new AmqpTransport($serializer, $connection); return new AmqpTransport($connection, $serializer);
} }
} }

View File

@ -26,7 +26,7 @@ $serializer = new Serializer(
); );
$connection = Connection::fromDsn(getenv('DSN')); $connection = Connection::fromDsn(getenv('DSN'));
$receiver = new AmqpReceiver($serializer, $connection); $receiver = new AmqpReceiver($connection, $serializer);
$worker = new Worker($receiver, new class() implements MessageBusInterface { $worker = new Worker($receiver, new class() implements MessageBusInterface {
public function dispatch($envelope) public function dispatch($envelope)

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/** /**
@ -26,10 +27,10 @@ class AmqpReceiver implements ReceiverInterface
private $connection; private $connection;
private $shouldStop; private $shouldStop;
public function __construct(SerializerInterface $serializer, Connection $connection) public function __construct(Connection $connection, SerializerInterface $serializer = null)
{ {
$this->serializer = $serializer;
$this->connection = $connection; $this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
} }
/** /**

View File

@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\SenderInterface; use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/** /**
@ -25,10 +26,10 @@ class AmqpSender implements SenderInterface
private $serializer; private $serializer;
private $connection; private $connection;
public function __construct(SerializerInterface $serializer, Connection $connection) public function __construct(Connection $connection, SerializerInterface $serializer = null)
{ {
$this->serializer = $serializer;
$this->connection = $connection; $this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
} }
/** /**

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt; namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
@ -25,10 +26,10 @@ class AmqpTransport implements TransportInterface
private $receiver; private $receiver;
private $sender; private $sender;
public function __construct(SerializerInterface $serializer, Connection $connection) public function __construct(Connection $connection, SerializerInterface $serializer = null)
{ {
$this->serializer = $serializer;
$this->connection = $connection; $this->connection = $connection;
$this->serializer = $serializer ?? Serializer::create();
} }
/** /**
@ -57,11 +58,11 @@ class AmqpTransport implements TransportInterface
private function getReceiver() private function getReceiver()
{ {
return $this->receiver = new AmqpReceiver($this->serializer, $this->connection); return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
} }
private function getSender() private function getSender()
{ {
return $this->sender = new AmqpSender($this->serializer, $this->connection); return $this->sender = new AmqpSender($this->connection, $this->serializer);
} }
} }

View File

@ -11,6 +11,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt; namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
@ -23,15 +24,15 @@ class AmqpTransportFactory implements TransportFactoryInterface
private $serializer; private $serializer;
private $debug; private $debug;
public function __construct(SerializerInterface $serializer, bool $debug) public function __construct(SerializerInterface $serializer = null, bool $debug = false)
{ {
$this->serializer = $serializer; $this->serializer = $serializer ?? Serializer::create();
$this->debug = $debug; $this->debug = $debug;
} }
public function createTransport(string $dsn, array $options): TransportInterface public function createTransport(string $dsn, array $options): TransportInterface
{ {
return new AmqpTransport($this->serializer, Connection::fromDsn($dsn, $options, $this->debug)); return new AmqpTransport(Connection::fromDsn($dsn, $options, $this->debug), $this->serializer);
} }
public function supports(string $dsn, array $options): bool public function supports(string $dsn, array $options): bool