Simply code and rename "configuration" to "options"

This commit is contained in:
Samuel ROZE 2019-04-06 11:43:17 +02:00
parent 3151b54b7a
commit a515635f18
3 changed files with 46 additions and 56 deletions

View File

@ -19,8 +19,12 @@ CHANGELOG
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
explicitly handle messages synchronously.
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Removed publishing with a `routing_key` option from queue configuration, for
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`
have a new argument: the queue name.
* Added optional parameter `prefetch_count` in connection configuration,
to setup channel prefetch count.
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
@ -74,8 +78,7 @@ CHANGELOG
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
4.2.0
-----

View File

@ -81,10 +81,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
public function ack(Envelope $envelope): void
{
try {
/* @var AmqpReceivedStamp $amqpReceivedStamp */
$stamp = $this->findAmqpStamp($envelope);
$this->connection->ack(
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
$amqpReceivedStamp->getQueueName()
$stamp->getAmqpEnvelope(),
$stamp->getQueueName()
);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
@ -96,10 +97,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
*/
public function reject(Envelope $envelope): void
{
/* @var AmqpReceivedStamp $amqpReceivedStamp */
$stamp = $this->findAmqpStamp($envelope);
$this->rejectAmqpEnvelope(
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
$amqpReceivedStamp->getQueueName()
$stamp->getAmqpEnvelope(),
$stamp->getQueueName()
);
}
@ -120,14 +122,13 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
}
}
private function findAmqpEnvelope(Envelope $envelope, AmqpReceivedStamp &$amqpReceivedStamp = null): \AMQPEnvelope
private function findAmqpStamp(Envelope $envelope): AmqpReceivedStamp
{
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if (null === $amqpReceivedStamp) {
throw new LogicException('No AmqpReceivedStamp found on the Envelope.');
throw new LogicException('No "AmqpReceivedStamp" stamp found on the Envelope.');
}
return $amqpReceivedStamp->getAmqpEnvelope();
return $amqpReceivedStamp;
}
}

View File

@ -33,9 +33,9 @@ class Connection
'x-message-ttl',
];
private $connectionConfiguration;
private $exchangeConfiguration;
private $queuesConfiguration;
private $connectionOptions;
private $exchangeOptions;
private $queuesOptions;
private $amqpFactory;
/**
@ -86,17 +86,17 @@ class Connection
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionConfiguration = array_replace_recursive([
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%delay%',
],
], $connectionConfiguration);
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queuesConfiguration = $queuesConfiguration;
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
@ -183,8 +183,6 @@ class Connection
$this->setup();
}
// TODO - allow flag & attributes to be configured on the message
$this->exchange()->publish(
$body,
$routingKey ?? $this->getDefaultPublishRoutingKey(),
@ -214,8 +212,6 @@ class Connection
$this->setupDelay($delay, $exchangeRoutingKey);
}
// TODO - allow flag & attributes to be configured on the message
$this->getDelayExchange()->publish(
$body,
$this->getRoutingKeyForDelay($delay),
@ -244,7 +240,7 @@ class Connection
{
if (null === $this->amqpDelayExchange) {
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
$this->amqpDelayExchange->setName($this->connectionConfiguration['delay']['exchange_name']);
$this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
}
@ -262,10 +258,8 @@ class Connection
*/
private function createDelayQueue(int $delay, ?string $routingKey)
{
$delayConfiguration = $this->connectionConfiguration['delay'];
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%delay%', $delay, $delayConfiguration['queue_name_pattern']));
$queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern']));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
@ -282,7 +276,7 @@ class Connection
private function getRoutingKeyForDelay(int $delay): string
{
return str_replace('%delay%', $delay, $this->connectionConfiguration['delay']['routing_key_pattern']);
return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']);
}
/**
@ -332,7 +326,7 @@ class Connection
$this->exchange()->declareExchange();
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchange()->getName(), $bindingKey);
@ -345,40 +339,37 @@ class Connection
*/
public function getQueueNames(): array
{
return array_keys($this->queuesConfiguration);
return array_keys($this->queuesOptions);
}
/**
* @internal
*/
public function channel(): \AMQPChannel
{
if (null === $this->amqpChannel) {
$connection = $this->amqpFactory->createConnection($this->connectionConfiguration);
$connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect';
$connection = $this->amqpFactory->createConnection($this->connectionOptions);
$connectMethod = 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pconnect' : 'connect';
try {
$connection->{$connectMethod}();
} catch (\AMQPConnectionException $e) {
$credentials = $this->connectionConfiguration;
$credentials = $this->connectionOptions;
$credentials['password'] = '********';
throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s)', json_encode($credentials)), 0, $e);
}
$this->amqpChannel = $this->amqpFactory->createChannel($connection);
if (isset($this->connectionConfiguration['prefetch_count'])) {
$this->amqpChannel->setPrefetchCount($this->connectionConfiguration['prefetch_count']);
if (isset($this->connectionOptions['prefetch_count'])) {
$this->amqpChannel->setPrefetchCount($this->connectionOptions['prefetch_count']);
}
}
return $this->amqpChannel;
}
private function queue(string $queueName): \AMQPQueue
public function queue(string $queueName): \AMQPQueue
{
if (!isset($this->amqpQueues[$queueName])) {
$queueConfig = $this->queuesConfiguration[$queueName];
$queueConfig = $this->queuesOptions[$queueName];
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
$amqpQueue->setName($queueName);
@ -394,27 +385,22 @@ class Connection
return $this->amqpQueues[$queueName];
}
private function exchange(): \AMQPExchange
public function exchange(): \AMQPExchange
{
if (null === $this->amqpExchange) {
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
$this->amqpExchange->setName($this->exchangeConfiguration['name']);
$this->amqpExchange->setType($this->exchangeConfiguration['type'] ?? AMQP_EX_TYPE_FANOUT);
$this->amqpExchange->setFlags($this->exchangeConfiguration['flags'] ?? AMQP_DURABLE);
$this->amqpExchange->setName($this->exchangeOptions['name']);
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? AMQP_EX_TYPE_FANOUT);
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? AMQP_DURABLE);
if (isset($this->exchangeConfiguration['arguments'])) {
$this->amqpExchange->setArguments($this->exchangeConfiguration['arguments']);
if (isset($this->exchangeOptions['arguments'])) {
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
}
}
return $this->amqpExchange;
}
public function getConnectionConfiguration(): array
{
return $this->connectionConfiguration;
}
private function clear(): void
{
$this->amqpChannel = null;
@ -424,11 +410,11 @@ class Connection
private function shouldSetup(): bool
{
if (!\array_key_exists('auto_setup', $this->connectionConfiguration)) {
if (!\array_key_exists('auto_setup', $this->connectionOptions)) {
return true;
}
if (\in_array($this->connectionConfiguration['auto_setup'], [false, 'false'], true)) {
if (\in_array($this->connectionOptions['auto_setup'], [false, 'false'], true)) {
return false;
}
@ -437,7 +423,7 @@ class Connection
private function getDefaultPublishRoutingKey(): ?string
{
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
}
public function purgeQueues()