Add optional parameter prefetching in connection configuration, to setup channel prefetch count

Co-Authored-By: f2r <frederic.bouchery+github@gmail.com>
This commit is contained in:
fbouchery 2019-03-24 17:01:17 +01:00 committed by Samuel ROZE
parent 0034a0f420
commit 47777eedd6
3 changed files with 26 additions and 0 deletions

View File

@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
and `BusNameStamp` were added, which allow you to add a bus identifier
to the `Envelope` then find the correct bus when receiving from

View File

@ -256,6 +256,25 @@ class ConnectionTest extends TestCase
$connection->publish('body', $headers);
}
public function testSetChannelPrefetchWhenSetup()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->createMock(\AMQPConnection::class),
$amqpChannel = $this->createMock(\AMQPChannel::class),
$amqpQueue = $this->createMock(\AMQPQueue::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
);
// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
$connection->setup();
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
$connection->setup();
}
public function testItDelaysTheMessage()
{
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();

View File

@ -84,6 +84,7 @@ class Connection
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * auto-setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
{
@ -323,6 +324,10 @@ class Connection
throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s)', json_encode($credentials)), 0, $e);
}
$this->amqpChannel = $this->amqpFactory->createChannel($connection);
if (isset($this->connectionConfiguration['prefetch_count'])) {
$this->amqpChannel->setPrefetchCount($this->connectionConfiguration['prefetch_count']);
}
}
return $this->amqpChannel;