[Messenger] fix delay delivery for non-fanout exchanges

also fix dsn parsing of plain amqp:// uri
This commit is contained in:
Tobias Schultze 2019-06-14 02:13:58 +02:00
parent 9865988ac2
commit 0f15306d61
2 changed files with 54 additions and 37 deletions

View File

@ -25,14 +25,14 @@ class ConnectionTest extends TestCase
{ {
/** /**
* @expectedException \InvalidArgumentException * @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid. * @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
*/ */
public function testItCannotBeConstructedWithAWrongDsn() public function testItCannotBeConstructedWithAWrongDsn()
{ {
Connection::fromDsn('amqp://'); Connection::fromDsn('amqp://:');
} }
public function testItGetsParametersFromTheDsn() public function testItCanBeConstructedWithDefaults()
{ {
$this->assertEquals( $this->assertEquals(
new Connection([ new Connection([
@ -44,7 +44,23 @@ class ConnectionTest extends TestCase
], [ ], [
'messages' => [], 'messages' => [],
]), ]),
Connection::fromDsn('amqp://localhost/%2f/messages') Connection::fromDsn('amqp://')
);
}
public function testItGetsParametersFromTheDsn()
{
$this->assertEquals(
new Connection([
'host' => 'host',
'port' => 5672,
'vhost' => '/',
], [
'name' => 'custom',
], [
'custom' => [],
]),
Connection::fromDsn('amqp://host/%2f/custom')
); );
} }
@ -52,9 +68,9 @@ class ConnectionTest extends TestCase
{ {
$this->assertEquals( $this->assertEquals(
new Connection([ new Connection([
'host' => 'redis', 'host' => 'localhost',
'port' => 1234, 'port' => 1234,
'vhost' => '/', 'vhost' => 'vhost',
'login' => 'guest', 'login' => 'guest',
'password' => 'password', 'password' => 'password',
], [ ], [
@ -62,7 +78,7 @@ class ConnectionTest extends TestCase
], [ ], [
'queueName' => [], 'queueName' => [],
]), ]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]') Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]')
); );
} }
@ -70,18 +86,16 @@ class ConnectionTest extends TestCase
{ {
$this->assertEquals( $this->assertEquals(
new Connection([ new Connection([
'host' => 'redis', 'host' => 'localhost',
'port' => 1234, 'port' => 5672,
'vhost' => '/', 'vhost' => '/',
'login' => 'guest',
'password' => 'password',
'persistent' => 'true', 'persistent' => 'true',
], [ ], [
'name' => 'exchangeName', 'name' => 'exchangeName',
], [ ], [
'queueName' => [], 'queueName' => [],
]), ]),
Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [ Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [
'persistent' => 'true', 'persistent' => 'true',
'exchange' => ['name' => 'toBeOverwritten'], 'exchange' => ['name' => 'toBeOverwritten'],
]) ])

View File

@ -58,8 +58,22 @@ class Connection
*/ */
private $amqpDelayExchange; private $amqpDelayExchange;
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
/** /**
* Constructor. * Creates a connection based on the DSN and options.
* *
* Available options: * Available options:
* *
@ -81,29 +95,19 @@ class Connection
* * delay: * * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%") * * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%") * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry") * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000)
* * prefetch_count: set channel prefetch count * * prefetch_count: set channel prefetch count
*/ */
public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null)
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%',
],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
$this->queuesOptions = $queuesOptions;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
{ {
if (false === $parsedUrl = parse_url($dsn)) { if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn)); // this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
if ('amqp://' !== $dsn) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
}
$parsedUrl = [];
} }
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : []; $pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
@ -275,18 +279,17 @@ class Connection
$queue = $this->amqpFactory->createQueue($this->channel()); $queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace( $queue->setName(str_replace(
['%delay%', '%routing_key%'], ['%delay%', '%routing_key%'],
[$delay, $routingKey ?: ''], [$delay, $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern'] $this->connectionOptions['delay']['queue_name_pattern']
)); ));
$queue->setArguments([ $queue->setArguments([
'x-message-ttl' => $delay, 'x-message-ttl' => $delay,
'x-dead-letter-exchange' => $this->exchange()->getName(), 'x-dead-letter-exchange' => $this->exchange()->getName(),
]); ]);
if (null !== $routingKey) { // after being released from to DLX, make sure the original routing key will be used
// after being released from to DLX, this routing key will be used // we must use an empty string instead of null for the argument to be picked up
$queue->setArgument('x-dead-letter-routing-key', $routingKey); $queue->setArgument('x-dead-letter-routing-key', $routingKey ?? '');
}
return $queue; return $queue;
} }
@ -295,7 +298,7 @@ class Connection
{ {
return str_replace( return str_replace(
['%delay%', '%routing_key%'], ['%delay%', '%routing_key%'],
[$delay, $finalRoutingKey ?: ''], [$delay, $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern'] $this->connectionOptions['delay']['routing_key_pattern']
); );
} }