[Messenger] fix delay delivery for non-fanout exchanges

also fix dsn parsing of plain amqp:// uri
This commit is contained in:
Tobias Schultze 2019-06-14 02:13:58 +02:00
parent 9865988ac2
commit 0f15306d61
2 changed files with 54 additions and 37 deletions

View File

@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
{
/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid.
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
*/
public function testItCannotBeConstructedWithAWrongDsn()
{
Connection::fromDsn('amqp://');
Connection::fromDsn('amqp://:');
}
public function testItGetsParametersFromTheDsn()
public function testItCanBeConstructedWithDefaults()
{
$this->assertEquals(
new Connection([
@ -44,7 +44,23 @@ class ConnectionTest extends TestCase
], [
'messages' => [],
]),
Connection::fromDsn('amqp://localhost/%2f/messages')
Connection::fromDsn('amqp://')
);
}
public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection([
'host' => 'host',
'port' => 5672,
'vhost' => '/',
], [
'name' => 'custom',
], [
'custom' => [],
]),
Connection::fromDsn('amqp://host/%2f/custom')
);
}
@ -52,9 +68,9 @@ class ConnectionTest extends TestCase
{
$this->assertEquals(
new Connection([
'host' => 'redis',
'host' => 'localhost',
'port' => 1234,
'vhost' => '/',
'vhost' => 'vhost',
'login' => 'guest',
'password' => 'password',
], [
@ -62,7 +78,7 @@ class ConnectionTest extends TestCase
], [
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]')
Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
);
}
@ -70,18 +86,16 @@ class ConnectionTest extends TestCase
{
$this->assertEquals(
new Connection([
'host' => 'redis',
'port' => 1234,
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'login' => 'guest',
'password' => 'password',
'persistent' => 'true',
], [
'name' => 'exchangeName',
], [
'queueName' => [],
]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
'persistent' => 'true',
'exchange' => ['name' => 'toBeOverwritten'],
])

View File

@ -58,8 +58,22 @@ class Connection
*/
private $amqpDelayExchange;
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
/**
* Constructor.
* Creates a connection based on the DSN and options.
*
* Available options:
*
@ -81,29 +95,19 @@ class Connection
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count
*/
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
if ('amqp://' !== $dsn) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
}
$parsedUrl = [];
}
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
@ -275,18 +279,17 @@ class Connection
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace(
['%delay%', '%routing_key%'],
[$delay, $routingKey ?: ''],
[$delay, $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
));
$queue->setArguments([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(),
]);
if (null !== $routingKey) {
// after being released from to DLX, this routing key will be used
$queue->setArgument('x-dead-letter-routing-key', $routingKey);
}
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
$queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
return $queue;
}
@ -295,7 +298,7 @@ class Connection
{
return str_replace(
['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?: ''],
[$delay, $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern']
);
}