[Messenger] fix delay delivery for non-fanout exchanges
also fix dsn parsing of plain amqp:// uri
This commit is contained in:
parent
9865988ac2
commit
0f15306d61
@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* @expectedException \InvalidArgumentException
|
* @expectedException \InvalidArgumentException
|
||||||
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
|
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
|
||||||
*/
|
*/
|
||||||
public function testItCannotBeConstructedWithAWrongDsn()
|
public function testItCannotBeConstructedWithAWrongDsn()
|
||||||
{
|
{
|
||||||
Connection::fromDsn('amqp://');
|
Connection::fromDsn('amqp://:');
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testItGetsParametersFromTheDsn()
|
public function testItCanBeConstructedWithDefaults()
|
||||||
{
|
{
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection([
|
new Connection([
|
||||||
@ -44,7 +44,23 @@ class ConnectionTest extends TestCase
|
|||||||
], [
|
], [
|
||||||
'messages' => [],
|
'messages' => [],
|
||||||
]),
|
]),
|
||||||
Connection::fromDsn('amqp://localhost/%2f/messages')
|
Connection::fromDsn('amqp://')
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testItGetsParametersFromTheDsn()
|
||||||
|
{
|
||||||
|
$this->assertEquals(
|
||||||
|
new Connection([
|
||||||
|
'host' => 'host',
|
||||||
|
'port' => 5672,
|
||||||
|
'vhost' => '/',
|
||||||
|
], [
|
||||||
|
'name' => 'custom',
|
||||||
|
], [
|
||||||
|
'custom' => [],
|
||||||
|
]),
|
||||||
|
Connection::fromDsn('amqp://host/%2f/custom')
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,9 +68,9 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection([
|
new Connection([
|
||||||
'host' => 'redis',
|
'host' => 'localhost',
|
||||||
'port' => 1234,
|
'port' => 1234,
|
||||||
'vhost' => '/',
|
'vhost' => 'vhost',
|
||||||
'login' => 'guest',
|
'login' => 'guest',
|
||||||
'password' => 'password',
|
'password' => 'password',
|
||||||
], [
|
], [
|
||||||
@ -62,7 +78,7 @@ class ConnectionTest extends TestCase
|
|||||||
], [
|
], [
|
||||||
'queueName' => [],
|
'queueName' => [],
|
||||||
]),
|
]),
|
||||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
|
Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,18 +86,16 @@ class ConnectionTest extends TestCase
|
|||||||
{
|
{
|
||||||
$this->assertEquals(
|
$this->assertEquals(
|
||||||
new Connection([
|
new Connection([
|
||||||
'host' => 'redis',
|
'host' => 'localhost',
|
||||||
'port' => 1234,
|
'port' => 5672,
|
||||||
'vhost' => '/',
|
'vhost' => '/',
|
||||||
'login' => 'guest',
|
|
||||||
'password' => 'password',
|
|
||||||
'persistent' => 'true',
|
'persistent' => 'true',
|
||||||
], [
|
], [
|
||||||
'name' => 'exchangeName',
|
'name' => 'exchangeName',
|
||||||
], [
|
], [
|
||||||
'queueName' => [],
|
'queueName' => [],
|
||||||
]),
|
]),
|
||||||
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
|
Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
|
||||||
'persistent' => 'true',
|
'persistent' => 'true',
|
||||||
'exchange' => ['name' => 'toBeOverwritten'],
|
'exchange' => ['name' => 'toBeOverwritten'],
|
||||||
])
|
])
|
||||||
|
@ -58,8 +58,22 @@ class Connection
|
|||||||
*/
|
*/
|
||||||
private $amqpDelayExchange;
|
private $amqpDelayExchange;
|
||||||
|
|
||||||
|
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
|
||||||
|
{
|
||||||
|
$this->connectionOptions = array_replace_recursive([
|
||||||
|
'delay' => [
|
||||||
|
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
|
||||||
|
'exchange_name' => 'delay',
|
||||||
|
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
|
||||||
|
],
|
||||||
|
], $connectionOptions);
|
||||||
|
$this->exchangeOptions = $exchangeOptions;
|
||||||
|
$this->queuesOptions = $queuesOptions;
|
||||||
|
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Creates a connection based on the DSN and options.
|
||||||
*
|
*
|
||||||
* Available options:
|
* Available options:
|
||||||
*
|
*
|
||||||
@ -81,31 +95,21 @@ class Connection
|
|||||||
* * delay:
|
* * delay:
|
||||||
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
|
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
|
||||||
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
|
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
|
||||||
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
|
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
|
||||||
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
|
||||||
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
|
|
||||||
* * prefetch_count: set channel prefetch count
|
* * prefetch_count: set channel prefetch count
|
||||||
*/
|
*/
|
||||||
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
|
|
||||||
{
|
|
||||||
$this->connectionOptions = array_replace_recursive([
|
|
||||||
'delay' => [
|
|
||||||
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
|
|
||||||
'exchange_name' => 'delay',
|
|
||||||
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
|
|
||||||
],
|
|
||||||
], $connectionOptions);
|
|
||||||
$this->exchangeOptions = $exchangeOptions;
|
|
||||||
$this->queuesOptions = $queuesOptions;
|
|
||||||
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
|
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
|
||||||
{
|
{
|
||||||
if (false === $parsedUrl = parse_url($dsn)) {
|
if (false === $parsedUrl = parse_url($dsn)) {
|
||||||
|
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
|
||||||
|
if ('amqp://' !== $dsn) {
|
||||||
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
|
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$parsedUrl = [];
|
||||||
|
}
|
||||||
|
|
||||||
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
|
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
|
||||||
$exchangeName = $pathParts[1] ?? 'messages';
|
$exchangeName = $pathParts[1] ?? 'messages';
|
||||||
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
|
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
|
||||||
@ -275,7 +279,7 @@ class Connection
|
|||||||
$queue = $this->amqpFactory->createQueue($this->channel());
|
$queue = $this->amqpFactory->createQueue($this->channel());
|
||||||
$queue->setName(str_replace(
|
$queue->setName(str_replace(
|
||||||
['%delay%', '%routing_key%'],
|
['%delay%', '%routing_key%'],
|
||||||
[$delay, $routingKey ?: ''],
|
[$delay, $routingKey ?? ''],
|
||||||
$this->connectionOptions['delay']['queue_name_pattern']
|
$this->connectionOptions['delay']['queue_name_pattern']
|
||||||
));
|
));
|
||||||
$queue->setArguments([
|
$queue->setArguments([
|
||||||
@ -283,10 +287,9 @@ class Connection
|
|||||||
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
'x-dead-letter-exchange' => $this->exchange()->getName(),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (null !== $routingKey) {
|
// after being released from to DLX, make sure the original routing key will be used
|
||||||
// after being released from to DLX, this routing key will be used
|
// we must use an empty string instead of null for the argument to be picked up
|
||||||
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
|
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
|
||||||
}
|
|
||||||
|
|
||||||
return $queue;
|
return $queue;
|
||||||
}
|
}
|
||||||
@ -295,7 +298,7 @@ class Connection
|
|||||||
{
|
{
|
||||||
return str_replace(
|
return str_replace(
|
||||||
['%delay%', '%routing_key%'],
|
['%delay%', '%routing_key%'],
|
||||||
[$delay, $finalRoutingKey ?: ''],
|
[$delay, $finalRoutingKey ?? ''],
|
||||||
$this->connectionOptions['delay']['routing_key_pattern']
|
$this->connectionOptions['delay']['routing_key_pattern']
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user