[Messenger] fix delay delivery for non-fanout exchanges
also fix dsn parsing of plain amqp:// uri
This commit is contained in:
parent
9865988ac2
commit
0f15306d61
@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
|
||||
{
|
||||
/**
|
||||
* @expectedException \InvalidArgumentException
|
||||
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
|
||||
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
|
||||
*/
|
||||
public function testItCannotBeConstructedWithAWrongDsn()
|
||||
{
|
||||
Connection::fromDsn('amqp://');
|
||||
Connection::fromDsn('amqp://:');
|
||||
}
|
||||
|
||||
public function testItGetsParametersFromTheDsn()
|
||||
public function testItCanBeConstructedWithDefaults()
|
||||
{
|
||||
$this->assertEquals(
|
||||
new Connection([
|
||||
@ -44,7 +44,23 @@ class ConnectionTest extends TestCase
|
||||
], [
|
||||
'messages' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://localhost/%2f/messages')
|
||||
Connection::fromDsn('amqp://')
|
||||
);
|
||||
}
|
||||
|
||||
public function testItGetsParametersFromTheDsn()
|
||||
{
|
||||
$this->assertEquals(
|
||||
new Connection([
|
||||
'host' => 'host',
|
||||
'port' => 5672,
|
||||
'vhost' => '/',
|
||||
], [
|
||||
'name' => 'custom',
|
||||
], [
|
||||
'custom' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://host/%2f/custom')
|
||||
);
|
||||
}
|
||||
|
||||
@ -52,9 +68,9 @@ class ConnectionTest extends TestCase
|
||||
{
|
||||
$this->assertEquals(
|
||||
new Connection([
|
||||
'host' => 'redis',
|
||||
'host' => 'localhost',
|
||||
'port' => 1234,
|
||||
'vhost' => '/',
|
||||
'vhost' => 'vhost',
|
||||
'login' => 'guest',
|
||||
'password' => 'password',
|
||||
], [
|
||||
@ -62,7 +78,7 @@ class ConnectionTest extends TestCase
|
||||
], [
|
||||
'queueName' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
|
||||
Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
|
||||
);
|
||||
}
|
||||
|
||||
@ -70,18 +86,16 @@ class ConnectionTest extends TestCase
|
||||
{
|
||||
$this->assertEquals(
|
||||
new Connection([
|
||||
'host' => 'redis',
|
||||
'port' => 1234,
|
||||
'host' => 'localhost',
|
||||
'port' => 5672,
|
||||
'vhost' => '/',
|
||||
'login' => 'guest',
|
||||
'password' => 'password',
|
||||
'persistent' => 'true',
|
||||
], [
|
||||
'name' => 'exchangeName',
|
||||
], [
|
||||
'queueName' => [],
|
||||
]),
|
||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
|
||||
Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
|
||||
'persistent' => 'true',
|
||||
'exchange' => ['name' => 'toBeOverwritten'],
|
||||
])
|
||||
|
@ -58,8 +58,22 @@ class Connection
|
||||
*/
|
||||
private $amqpDelayExchange;
|
||||
|
||||
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
|
||||
{
|
||||
$this->connectionOptions = array_replace_recursive([
|
||||
'delay' => [
|
||||
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
|
||||
'exchange_name' => 'delay',
|
||||
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
|
||||
],
|
||||
], $connectionOptions);
|
||||
$this->exchangeOptions = $exchangeOptions;
|
||||
$this->queuesOptions = $queuesOptions;
|
||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* Creates a connection based on the DSN and options.
|
||||
*
|
||||
* Available options:
|
||||
*
|
||||
@ -81,31 +95,21 @@ class Connection
|
||||
* * delay:
|
||||
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
|
||||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
|
||||
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
|
||||
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
|
||||
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
||||
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
|
||||
* * prefetch_count: set channel prefetch count
|
||||
*/
|
||||
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
|
||||
{
|
||||
$this->connectionOptions = array_replace_recursive([
|
||||
'delay' => [
|
||||
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
|
||||
'exchange_name' => 'delay',
|
||||
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
|
||||
],
|
||||
], $connectionOptions);
|
||||
$this->exchangeOptions = $exchangeOptions;
|
||||
$this->queuesOptions = $queuesOptions;
|
||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||
}
|
||||
|
||||
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
|
||||
{
|
||||
if (false === $parsedUrl = parse_url($dsn)) {
|
||||
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
|
||||
if ('amqp://' !== $dsn) {
|
||||
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
|
||||
}
|
||||
|
||||
$parsedUrl = [];
|
||||
}
|
||||
|
||||
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
|
||||
$exchangeName = $pathParts[1] ?? 'messages';
|
||||
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
|
||||
@ -275,7 +279,7 @@ class Connection
|
||||
$queue = $this->amqpFactory->createQueue($this->channel());
|
||||
$queue->setName(str_replace(
|
||||
['%delay%', '%routing_key%'],
|
||||
[$delay, $routingKey ?: ''],
|
||||
[$delay, $routingKey ?? ''],
|
||||
$this->connectionOptions['delay']['queue_name_pattern']
|
||||
));
|
||||
$queue->setArguments([
|
||||
@ -283,10 +287,9 @@ class Connection
|
||||
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
||||
]);
|
||||
|
||||
if (null !== $routingKey) {
|
||||
// after being released from to DLX, this routing key will be used
|
||||
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
|
||||
}
|
||||
// after being released from to DLX, make sure the original routing key will be used
|
||||
// we must use an empty string instead of null for the argument to be picked up
|
||||
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
|
||||
|
||||
return $queue;
|
||||
}
|
||||
@ -295,7 +298,7 @@ class Connection
|
||||
{
|
||||
return str_replace(
|
||||
['%delay%', '%routing_key%'],
|
||||
[$delay, $finalRoutingKey ?: ''],
|
||||
[$delay, $finalRoutingKey ?? ''],
|
||||
$this->connectionOptions['delay']['routing_key_pattern']
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user