bug #36449 [Messenger] Make sure redis transports are initialized correctly (Seldaek)

This PR was squashed before being merged into the 4.4 branch (closes #36449).

Discussion
----------

[Messenger] Make sure redis transports are initialized correctly

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       |
| License       | MIT

Took me a while to figure out why my messages were dispatched/handled twice.. Turns out I had two messenger transports using the same dsn and the parsing was broken so they ended up both polling from a `""` stream, which caused them to both handle the messages (I think).

Below my config, note the trailing slash in the dsn was the root cause of the issue, but I don't think it's that unreasonable.

```
.env:

MESSENGER_TRANSPORT_DSN=redis://localhost:6379/?dbindex=3

messenger.yml:

            async_urgent:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    group: async_urgent
                    stream: messages_urgent
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    group: async
                    stream: messages

```

Commits
-------

68b3b898d8 [Messenger] Make sure redis transports are initialized correctly
This commit is contained in:
Robin Chalas 2020-04-14 13:53:10 +02:00
commit efb0dee407
2 changed files with 15 additions and 1 deletions

View File

@ -62,6 +62,14 @@ class ConnectionTest extends TestCase
);
}
public function testFromDsnWithOptionsAndTrailingSlash()
{
$this->assertEquals(
Connection::fromDsn('redis://localhost/', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
);
}
public function testFromDsnWithQueryOptions()
{
$this->assertEquals(

View File

@ -63,6 +63,12 @@ class Connection
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
}
foreach (['stream', 'group', 'consumer'] as $key) {
if (isset($configuration[$key]) && '' === $configuration[$key]) {
throw new InvalidArgumentException(sprintf('"%s" should be configured, got an empty string.', $key));
}
}
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
@ -77,7 +83,7 @@ class Connection
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$pathParts = explode('/', rtrim($parsedUrl['path'] ?? '', '/'));
$stream = $pathParts[1] ?? $redisOptions['stream'] ?? null;
$group = $pathParts[2] ?? $redisOptions['group'] ?? null;