[Messenger] changed AmqpExt classes constructor signature
This commit is contained in:
parent
0cd1da66d7
commit
2d55ae5212
@ -4,6 +4,8 @@ CHANGELOG
|
|||||||
4.2.0
|
4.2.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
* [BC BREAK] The signature of Amqp* classes changed to take a `Connection` as a first argument and an optional
|
||||||
|
`Serializer` as a second argument.
|
||||||
* [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator`
|
* [BC BREAK] `SenderLocator` has been renamed to `ContainerSenderLocator`
|
||||||
Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders.
|
Be careful as there is still a `SenderLocator` class, but it does not rely on a `ContainerInterface` to find senders.
|
||||||
Instead, it accepts the sender instance itself instead of its identifier in the container.
|
Instead, it accepts the sender instance itself instead of its identifier in the container.
|
||||||
|
@ -48,8 +48,8 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
|
$sender->send($first = Envelope::wrap(new DummyMessage('First')));
|
||||||
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
|
$sender->send($second = Envelope::wrap(new DummyMessage('Second')));
|
||||||
@ -74,7 +74,7 @@ class AmqpExtIntegrationTest extends TestCase
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
|
$sender->send(Envelope::wrap(new DummyMessage('Hello')));
|
||||||
|
|
||||||
$amqpReadTimeout = 30;
|
$amqpReadTimeout = 30;
|
||||||
@ -123,7 +123,7 @@ TXT
|
|||||||
$connection->setup();
|
$connection->setup();
|
||||||
$connection->queue()->purge();
|
$connection->queue()->purge();
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
$receivedMessages = 0;
|
$receivedMessages = 0;
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
|
||||||
|
@ -44,7 +44,7 @@ class AmqpReceiverTest extends TestCase
|
|||||||
|
|
||||||
$connection->expects($this->once())->method('ack')->with($envelope);
|
$connection->expects($this->once())->method('ack')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
|
||||||
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
|
||||||
$receiver->stop();
|
$receiver->stop();
|
||||||
@ -71,7 +71,7 @@ class AmqpReceiverTest extends TestCase
|
|||||||
|
|
||||||
$connection->expects($this->once())->method('nack')->with($envelope);
|
$connection->expects($this->once())->method('nack')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function () {
|
$receiver->receive(function () {
|
||||||
throw new InterruptException('Well...');
|
throw new InterruptException('Well...');
|
||||||
});
|
});
|
||||||
@ -96,7 +96,7 @@ class AmqpReceiverTest extends TestCase
|
|||||||
$connection->method('get')->willReturn($envelope);
|
$connection->method('get')->willReturn($envelope);
|
||||||
$connection->expects($this->once())->method('reject')->with($envelope);
|
$connection->expects($this->once())->method('reject')->with($envelope);
|
||||||
|
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
$receiver->receive(function () {
|
$receiver->receive(function () {
|
||||||
throw new WillNeverWorkException('Well...');
|
throw new WillNeverWorkException('Well...');
|
||||||
});
|
});
|
||||||
|
@ -34,7 +34,7 @@ class AmqpSenderTest extends TestCase
|
|||||||
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']);
|
||||||
|
|
||||||
$sender = new AmqpSender($serializer, $connection);
|
$sender = new AmqpSender($connection, $serializer);
|
||||||
$sender->send($envelope);
|
$sender->send($envelope);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ class AmqpTransportFactoryTest extends TestCase
|
|||||||
true
|
true
|
||||||
);
|
);
|
||||||
|
|
||||||
$expectedTransport = new AmqpTransport($serializer, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true));
|
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), $serializer);
|
||||||
|
|
||||||
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
|
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
|
||||||
}
|
}
|
||||||
|
@ -59,6 +59,6 @@ class AmqpTransportTest extends TestCase
|
|||||||
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
|
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
|
||||||
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
return new AmqpTransport($serializer, $connection);
|
return new AmqpTransport($connection, $serializer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ $serializer = new Serializer(
|
|||||||
);
|
);
|
||||||
|
|
||||||
$connection = Connection::fromDsn(getenv('DSN'));
|
$connection = Connection::fromDsn(getenv('DSN'));
|
||||||
$receiver = new AmqpReceiver($serializer, $connection);
|
$receiver = new AmqpReceiver($connection, $serializer);
|
||||||
|
|
||||||
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
$worker = new Worker($receiver, new class() implements MessageBusInterface {
|
||||||
public function dispatch($envelope)
|
public function dispatch($envelope)
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
|
||||||
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
use Symfony\Component\Messenger\Transport\ReceiverInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -26,10 +27,10 @@ class AmqpReceiver implements ReceiverInterface
|
|||||||
private $connection;
|
private $connection;
|
||||||
private $shouldStop;
|
private $shouldStop;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -13,6 +13,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
|||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Transport\SenderInterface;
|
use Symfony\Component\Messenger\Transport\SenderInterface;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -25,10 +26,10 @@ class AmqpSender implements SenderInterface
|
|||||||
private $serializer;
|
private $serializer;
|
||||||
private $connection;
|
private $connection;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
|
|
||||||
@ -25,10 +26,10 @@ class AmqpTransport implements TransportInterface
|
|||||||
private $receiver;
|
private $receiver;
|
||||||
private $sender;
|
private $sender;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, Connection $connection)
|
public function __construct(Connection $connection, SerializerInterface $serializer = null)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -57,11 +58,11 @@ class AmqpTransport implements TransportInterface
|
|||||||
|
|
||||||
private function getReceiver()
|
private function getReceiver()
|
||||||
{
|
{
|
||||||
return $this->receiver = new AmqpReceiver($this->serializer, $this->connection);
|
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getSender()
|
private function getSender()
|
||||||
{
|
{
|
||||||
return $this->sender = new AmqpSender($this->serializer, $this->connection);
|
return $this->sender = new AmqpSender($this->connection, $this->serializer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
|
||||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||||
@ -23,15 +24,15 @@ class AmqpTransportFactory implements TransportFactoryInterface
|
|||||||
private $serializer;
|
private $serializer;
|
||||||
private $debug;
|
private $debug;
|
||||||
|
|
||||||
public function __construct(SerializerInterface $serializer, bool $debug)
|
public function __construct(SerializerInterface $serializer = null, bool $debug = false)
|
||||||
{
|
{
|
||||||
$this->serializer = $serializer;
|
$this->serializer = $serializer ?? Serializer::create();
|
||||||
$this->debug = $debug;
|
$this->debug = $debug;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function createTransport(string $dsn, array $options): TransportInterface
|
public function createTransport(string $dsn, array $options): TransportInterface
|
||||||
{
|
{
|
||||||
return new AmqpTransport($this->serializer, Connection::fromDsn($dsn, $options, $this->debug));
|
return new AmqpTransport(Connection::fromDsn($dsn, $options, $this->debug), $this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function supports(string $dsn, array $options): bool
|
public function supports(string $dsn, array $options): bool
|
||||||
|
Reference in New Issue
Block a user