Simply code and rename "configuration" to "options"
This commit is contained in:
parent
3151b54b7a
commit
a515635f18
@ -19,8 +19,12 @@ CHANGELOG
|
||||
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
|
||||
explicitly handle messages synchronously.
|
||||
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
|
||||
* Removed publishing with a `routing_key` option from queue configuration, for
|
||||
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
|
||||
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
|
||||
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
|
||||
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
|
||||
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`
|
||||
have a new argument: the queue name.
|
||||
* Added optional parameter `prefetch_count` in connection configuration,
|
||||
to setup channel prefetch count.
|
||||
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
|
||||
@ -74,8 +78,7 @@ CHANGELOG
|
||||
only. Pass the `auto_setup` connection option to control this.
|
||||
* Added a `SetupTransportsCommand` command to setup the transports
|
||||
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
|
||||
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
|
||||
* Deprecated publishing with a routing key from queue configuration, use exchange configuration instead.
|
||||
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
|
||||
|
||||
4.2.0
|
||||
-----
|
||||
|
@ -81,10 +81,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
try {
|
||||
/* @var AmqpReceivedStamp $amqpReceivedStamp */
|
||||
$stamp = $this->findAmqpStamp($envelope);
|
||||
|
||||
$this->connection->ack(
|
||||
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
|
||||
$amqpReceivedStamp->getQueueName()
|
||||
$stamp->getAmqpEnvelope(),
|
||||
$stamp->getQueueName()
|
||||
);
|
||||
} catch (\AMQPException $exception) {
|
||||
throw new TransportException($exception->getMessage(), 0, $exception);
|
||||
@ -96,10 +97,11 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
*/
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
/* @var AmqpReceivedStamp $amqpReceivedStamp */
|
||||
$stamp = $this->findAmqpStamp($envelope);
|
||||
|
||||
$this->rejectAmqpEnvelope(
|
||||
$this->findAmqpEnvelope($envelope, $amqpReceivedStamp),
|
||||
$amqpReceivedStamp->getQueueName()
|
||||
$stamp->getAmqpEnvelope(),
|
||||
$stamp->getQueueName()
|
||||
);
|
||||
}
|
||||
|
||||
@ -120,14 +122,13 @@ class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
}
|
||||
}
|
||||
|
||||
private function findAmqpEnvelope(Envelope $envelope, AmqpReceivedStamp &$amqpReceivedStamp = null): \AMQPEnvelope
|
||||
private function findAmqpStamp(Envelope $envelope): AmqpReceivedStamp
|
||||
{
|
||||
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
|
||||
|
||||
if (null === $amqpReceivedStamp) {
|
||||
throw new LogicException('No AmqpReceivedStamp found on the Envelope.');
|
||||
throw new LogicException('No "AmqpReceivedStamp" stamp found on the Envelope.');
|
||||
}
|
||||
|
||||
return $amqpReceivedStamp->getAmqpEnvelope();
|
||||
return $amqpReceivedStamp;
|
||||
}
|
||||
}
|
||||
|
@ -33,9 +33,9 @@ class Connection
|
||||
'x-message-ttl',
|
||||
];
|
||||
|
||||
private $connectionConfiguration;
|
||||
private $exchangeConfiguration;
|
||||
private $queuesConfiguration;
|
||||
private $connectionOptions;
|
||||
private $exchangeOptions;
|
||||
private $queuesOptions;
|
||||
private $amqpFactory;
|
||||
|
||||
/**
|
||||
@ -86,17 +86,17 @@ class Connection
|
||||
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
|
||||
* * prefetch_count: set channel prefetch count
|
||||
*/
|
||||
public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queuesConfiguration, AmqpFactory $amqpFactory = null)
|
||||
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
|
||||
{
|
||||
$this->connectionConfiguration = array_replace_recursive([
|
||||
$this->connectionOptions = array_replace_recursive([
|
||||
'delay' => [
|
||||
'routing_key_pattern' => 'delay_%delay%',
|
||||
'exchange_name' => 'delay',
|
||||
'queue_name_pattern' => 'delay_queue_%delay%',
|
||||
],
|
||||
], $connectionConfiguration);
|
||||
$this->exchangeConfiguration = $exchangeConfiguration;
|
||||
$this->queuesConfiguration = $queuesConfiguration;
|
||||
], $connectionOptions);
|
||||
$this->exchangeOptions = $exchangeOptions;
|
||||
$this->queuesOptions = $queuesOptions;
|
||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||
}
|
||||
|
||||
@ -183,8 +183,6 @@ class Connection
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
// TODO - allow flag & attributes to be configured on the message
|
||||
|
||||
$this->exchange()->publish(
|
||||
$body,
|
||||
$routingKey ?? $this->getDefaultPublishRoutingKey(),
|
||||
@ -214,8 +212,6 @@ class Connection
|
||||
$this->setupDelay($delay, $exchangeRoutingKey);
|
||||
}
|
||||
|
||||
// TODO - allow flag & attributes to be configured on the message
|
||||
|
||||
$this->getDelayExchange()->publish(
|
||||
$body,
|
||||
$this->getRoutingKeyForDelay($delay),
|
||||
@ -244,7 +240,7 @@ class Connection
|
||||
{
|
||||
if (null === $this->amqpDelayExchange) {
|
||||
$this->amqpDelayExchange = $this->amqpFactory->createExchange($this->channel());
|
||||
$this->amqpDelayExchange->setName($this->connectionConfiguration['delay']['exchange_name']);
|
||||
$this->amqpDelayExchange->setName($this->connectionOptions['delay']['exchange_name']);
|
||||
$this->amqpDelayExchange->setType(AMQP_EX_TYPE_DIRECT);
|
||||
}
|
||||
|
||||
@ -262,10 +258,8 @@ class Connection
|
||||
*/
|
||||
private function createDelayQueue(int $delay, ?string $routingKey)
|
||||
{
|
||||
$delayConfiguration = $this->connectionConfiguration['delay'];
|
||||
|
||||
$queue = $this->amqpFactory->createQueue($this->channel());
|
||||
$queue->setName(str_replace('%delay%', $delay, $delayConfiguration['queue_name_pattern']));
|
||||
$queue->setName(str_replace('%delay%', $delay, $this->connectionOptions['delay']['queue_name_pattern']));
|
||||
$queue->setArguments([
|
||||
'x-message-ttl' => $delay,
|
||||
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
||||
@ -282,7 +276,7 @@ class Connection
|
||||
|
||||
private function getRoutingKeyForDelay(int $delay): string
|
||||
{
|
||||
return str_replace('%delay%', $delay, $this->connectionConfiguration['delay']['routing_key_pattern']);
|
||||
return str_replace('%delay%', $delay, $this->connectionOptions['delay']['routing_key_pattern']);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -332,7 +326,7 @@ class Connection
|
||||
|
||||
$this->exchange()->declareExchange();
|
||||
|
||||
foreach ($this->queuesConfiguration as $queueName => $queueConfig) {
|
||||
foreach ($this->queuesOptions as $queueName => $queueConfig) {
|
||||
$this->queue($queueName)->declareQueue();
|
||||
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
|
||||
$this->queue($queueName)->bind($this->exchange()->getName(), $bindingKey);
|
||||
@ -345,40 +339,37 @@ class Connection
|
||||
*/
|
||||
public function getQueueNames(): array
|
||||
{
|
||||
return array_keys($this->queuesConfiguration);
|
||||
return array_keys($this->queuesOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function channel(): \AMQPChannel
|
||||
{
|
||||
if (null === $this->amqpChannel) {
|
||||
$connection = $this->amqpFactory->createConnection($this->connectionConfiguration);
|
||||
$connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect';
|
||||
$connection = $this->amqpFactory->createConnection($this->connectionOptions);
|
||||
$connectMethod = 'true' === ($this->connectionOptions['persistent'] ?? 'false') ? 'pconnect' : 'connect';
|
||||
|
||||
try {
|
||||
$connection->{$connectMethod}();
|
||||
} catch (\AMQPConnectionException $e) {
|
||||
$credentials = $this->connectionConfiguration;
|
||||
$credentials = $this->connectionOptions;
|
||||
$credentials['password'] = '********';
|
||||
|
||||
throw new \AMQPException(sprintf('Could not connect to the AMQP server. Please verify the provided DSN. (%s)', json_encode($credentials)), 0, $e);
|
||||
}
|
||||
$this->amqpChannel = $this->amqpFactory->createChannel($connection);
|
||||
|
||||
if (isset($this->connectionConfiguration['prefetch_count'])) {
|
||||
$this->amqpChannel->setPrefetchCount($this->connectionConfiguration['prefetch_count']);
|
||||
if (isset($this->connectionOptions['prefetch_count'])) {
|
||||
$this->amqpChannel->setPrefetchCount($this->connectionOptions['prefetch_count']);
|
||||
}
|
||||
}
|
||||
|
||||
return $this->amqpChannel;
|
||||
}
|
||||
|
||||
private function queue(string $queueName): \AMQPQueue
|
||||
public function queue(string $queueName): \AMQPQueue
|
||||
{
|
||||
if (!isset($this->amqpQueues[$queueName])) {
|
||||
$queueConfig = $this->queuesConfiguration[$queueName];
|
||||
$queueConfig = $this->queuesOptions[$queueName];
|
||||
|
||||
$amqpQueue = $this->amqpFactory->createQueue($this->channel());
|
||||
$amqpQueue->setName($queueName);
|
||||
@ -394,27 +385,22 @@ class Connection
|
||||
return $this->amqpQueues[$queueName];
|
||||
}
|
||||
|
||||
private function exchange(): \AMQPExchange
|
||||
public function exchange(): \AMQPExchange
|
||||
{
|
||||
if (null === $this->amqpExchange) {
|
||||
$this->amqpExchange = $this->amqpFactory->createExchange($this->channel());
|
||||
$this->amqpExchange->setName($this->exchangeConfiguration['name']);
|
||||
$this->amqpExchange->setType($this->exchangeConfiguration['type'] ?? AMQP_EX_TYPE_FANOUT);
|
||||
$this->amqpExchange->setFlags($this->exchangeConfiguration['flags'] ?? AMQP_DURABLE);
|
||||
$this->amqpExchange->setName($this->exchangeOptions['name']);
|
||||
$this->amqpExchange->setType($this->exchangeOptions['type'] ?? AMQP_EX_TYPE_FANOUT);
|
||||
$this->amqpExchange->setFlags($this->exchangeOptions['flags'] ?? AMQP_DURABLE);
|
||||
|
||||
if (isset($this->exchangeConfiguration['arguments'])) {
|
||||
$this->amqpExchange->setArguments($this->exchangeConfiguration['arguments']);
|
||||
if (isset($this->exchangeOptions['arguments'])) {
|
||||
$this->amqpExchange->setArguments($this->exchangeOptions['arguments']);
|
||||
}
|
||||
}
|
||||
|
||||
return $this->amqpExchange;
|
||||
}
|
||||
|
||||
public function getConnectionConfiguration(): array
|
||||
{
|
||||
return $this->connectionConfiguration;
|
||||
}
|
||||
|
||||
private function clear(): void
|
||||
{
|
||||
$this->amqpChannel = null;
|
||||
@ -424,11 +410,11 @@ class Connection
|
||||
|
||||
private function shouldSetup(): bool
|
||||
{
|
||||
if (!\array_key_exists('auto_setup', $this->connectionConfiguration)) {
|
||||
if (!\array_key_exists('auto_setup', $this->connectionOptions)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (\in_array($this->connectionConfiguration['auto_setup'], [false, 'false'], true)) {
|
||||
if (\in_array($this->connectionOptions['auto_setup'], [false, 'false'], true)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -437,7 +423,7 @@ class Connection
|
||||
|
||||
private function getDefaultPublishRoutingKey(): ?string
|
||||
{
|
||||
return $this->exchangeConfiguration['default_publish_routing_key'] ?? null;
|
||||
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
|
||||
}
|
||||
|
||||
public function purgeQueues()
|
||||
|
Reference in New Issue
Block a user