feature #35295 [Messenger] Messenger redis local sock dsn (JJarrie)

This PR was squashed before being merged into the 5.1-dev branch (closes #35295).

Discussion
----------

[Messenger] Messenger redis local sock dsn

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #34682
| License       | MIT
| Doc PR        | symfony/symfony-docs#12924

Enable connection by unix socket to Redis.

``` messenger.yaml
framework:
    messenger:
        transports:
            async: "redis:///var/run/redis/redis.sock"
```

Commits
-------

0421e01ae1 [Messenger] Messenger redis local sock dsn
This commit is contained in:
Fabien Potencier 2020-01-16 10:19:59 +01:00
commit af4035d4ec
2 changed files with 44 additions and 19 deletions

View File

@ -54,6 +54,17 @@ class ConnectionTest extends TestCase
);
}
public function testFromDsnOnUnixSocket()
{
$this->assertEquals(
new Connection(['stream' => 'queue'], [
'host' => '/var/run/redis/redis.sock',
'port' => 0,
], [], $redis = $this->createMock(\Redis::class)),
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue'], $redis)
);
}
public function testFromDsnWithOptions()
{
$this->assertEquals(

View File

@ -73,22 +73,15 @@ class Connection
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
$url = $dsn;
if (preg_match('#^redis:///([^:@])+$#', $dsn)) {
$url = str_replace('redis:', 'file:', $dsn);
}
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$stream = $pathParts[1] ?? $redisOptions['stream'] ?? null;
$group = $pathParts[2] ?? $redisOptions['group'] ?? null;
$consumer = $pathParts[3] ?? $redisOptions['consumer'] ?? null;
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
'auth' => $parsedUrl['pass'] ?? $parsedUrl['user'] ?? null,
];
if (false === $parsedUrl = parse_url($url)) {
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
}
if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $redisOptions);
}
@ -111,14 +104,35 @@ class Connection
unset($redisOptions['dbindex']);
}
return new self([
'stream' => $stream,
'group' => $group,
'consumer' => $consumer,
$configuration = [
'stream' => $redisOptions['stream'] ?? null,
'group' => $redisOptions['group'] ?? null,
'consumer' => $redisOptions['consumer'] ?? null,
'auto_setup' => $autoSetup,
'stream_max_entries' => $maxEntries,
'dbindex' => $dbIndex,
], $connectionCredentials, $redisOptions, $redis);
];
if (isset($parsedUrl['host'])) {
$connectionCredentials = [
'host' => $parsedUrl['host'] ?? '127.0.0.1',
'port' => $parsedUrl['port'] ?? 6379,
'auth' => $parsedUrl['pass'] ?? $parsedUrl['user'] ?? null,
];
$pathParts = explode('/', $parsedUrl['path'] ?? '');
$configuration['stream'] = $pathParts[1] ?? $configuration['stream'];
$configuration['group'] = $pathParts[2] ?? $configuration['group'];
$configuration['consumer'] = $pathParts[3] ?? $configuration['consumer'];
} else {
$connectionCredentials = [
'host' => $parsedUrl['path'],
'port' => 0,
];
}
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
}
public function get(): ?array