diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md index db21756b0c..aaed24815e 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md @@ -5,3 +5,4 @@ CHANGELOG ----- * Introduced the Doctrine bridge. + * Added support for PostgreSQL `LISTEN`/`NOTIFY`. diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php index a5ed0d4a2a..b423c21e27 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportFactoryTest.php @@ -18,6 +18,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; class DoctrineTransportFactoryTest extends TestCase @@ -49,7 +50,7 @@ class DoctrineTransportFactoryTest extends TestCase $serializer = $this->createMock(SerializerInterface::class); $this->assertEquals( - new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $driverConnection), $serializer), + new DoctrineTransport(new Connection(PostgreSqlConnection::buildConfiguration('doctrine://default'), $driverConnection), $serializer), $factory->createTransport('doctrine://default', [], $serializer) ); } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php new file mode 100644 index 0000000000..501fd785b2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php @@ -0,0 +1,46 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; + +use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; + +/** + * @author Kévin Dunglas + */ +class PostgreSqlConnectionTest extends TestCase +{ + public function testSerialize() + { + $this->expectException(\BadMethodCallException::class); + $this->expectExceptionMessage('Cannot serialize '.PostgreSqlConnection::class); + + $schemaSynchronizer = $this->createMock(SchemaSynchronizer::class); + $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + + $connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer); + serialize($connection); + } + + public function testUnserialize() + { + $this->expectException(\BadMethodCallException::class); + $this->expectExceptionMessage('Cannot unserialize '.PostgreSqlConnection::class); + + $schemaSynchronizer = $this->createMock(SchemaSynchronizer::class); + $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + + $connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer); + $connection->__wakeup(); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php index 960367a5c6..0092f235e1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php @@ -22,15 +22,17 @@ use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer; use Doctrine\DBAL\Types\Type; use Symfony\Component\Messenger\Exception\InvalidArgumentException; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Contracts\Service\ResetInterface; /** - * @author Vincent Touzet + * @internal since Symfony 5.1 * - * @final + * @author Vincent Touzet + * @author Kévin Dunglas */ -class Connection +class Connection implements ResetInterface { - private const DEFAULT_OPTIONS = [ + protected const DEFAULT_OPTIONS = [ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'redeliver_timeout' => 3600, @@ -45,22 +47,28 @@ class Connection * * table_name: name of the table * * connection: name of the Doctrine's entity manager * * queue_name: name of the queue - * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600 - * * auto_setup: Whether the table should be created automatically during send / get. Default : true + * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600 + * * auto_setup: Whether the table should be created automatically during send / get. Default: true */ - private $configuration = []; - private $driverConnection; + protected $configuration = []; + protected $driverConnection; + protected $queueEmptiedAt; private $schemaSynchronizer; private $autoSetup; public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null) { - $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration); + $this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration); $this->driverConnection = $driverConnection; $this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection); $this->autoSetup = $this->configuration['auto_setup']; } + public function reset() + { + $this->queueEmptiedAt = null; + } + public function getConfiguration(): array { return $this->configuration; @@ -78,20 +86,20 @@ class Connection } $configuration = ['connection' => $components['host']]; - $configuration += $options + $query + self::DEFAULT_OPTIONS; + $configuration += $options + $query + static::DEFAULT_OPTIONS; $configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN); // check for extra keys in options - $optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS)); + $optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS)); if (0 < \count($optionsExtraKeys)) { - throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS)))); + throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); } // check for extra keys in options - $queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS)); + $queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS)); if (0 < \count($queryExtraKeys)) { - throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS)))); + throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS)))); } return $configuration; @@ -154,9 +162,13 @@ class Connection if (false === $doctrineEnvelope) { $this->driverConnection->commit(); + $this->queueEmptiedAt = microtime(true) * 1000; return null; } + // Postgres can "group" notifications having the same channel and payload + // We need to be sure to empty the queue before blocking again + $this->queueEmptiedAt = null; $doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php index 3cd9089110..ed8f9b16f5 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransportFactory.php @@ -36,8 +36,10 @@ class DoctrineTransportFactory implements TransportFactoryInterface public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface { - unset($options['transport_name']); - $configuration = Connection::buildConfiguration($dsn, $options); + $useNotify = ($options['use_notify'] ?? true); + unset($options['transport_name'], $options['use_notify']); + // Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available + $configuration = PostgreSqlConnection::buildConfiguration($dsn, $options); try { $driverConnection = $this->registry->getConnection($configuration['connection']); @@ -45,7 +47,11 @@ class DoctrineTransportFactory implements TransportFactoryInterface throw new TransportException(sprintf('Could not find Doctrine connection from Messenger DSN "%s".', $dsn), 0, $e); } - $connection = new Connection($configuration, $driverConnection); + if ($useNotify && method_exists($driverConnection->getWrappedConnection(), 'pgsqlGetNotify')) { + $connection = new PostgreSqlConnection($configuration, $driverConnection); + } else { + $connection = new Connection($configuration, $driverConnection); + } return new DoctrineTransport($connection, $serializer); } diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php new file mode 100644 index 0000000000..5919e543e7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php @@ -0,0 +1,120 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport; + +/** + * Uses PostgreSQL LISTEN/NOTIFY to push messages to workers. + * + * @internal + * @final + * + * @author Kévin Dunglas + */ +class PostgreSqlConnection extends Connection +{ + /** + * * use_notify: Set to false to disable the use of LISTEN/NOTIFY. Default: true + * * check_delayed_interval: The interval to check for delayed messages, in milliseconds. Set to 0 to disable checks. Default: 1000 + * * get_notify_timeout: The length of time to wait for a response when calling PDO::pgsqlGetNotify, in milliseconds. Default: 0. + */ + protected const DEFAULT_OPTIONS = parent::DEFAULT_OPTIONS + [ + 'check_delayed_interval' => 1000, + 'get_notify_timeout' => 0, + ]; + + private $listening = false; + + public function __sleep() + { + throw new \BadMethodCallException('Cannot serialize '.__CLASS__); + } + + public function __wakeup() + { + throw new \BadMethodCallException('Cannot unserialize '.__CLASS__); + } + + public function __destruct() + { + $this->unlisten(); + } + + public function reset() + { + parent::reset(); + $this->unlisten(); + } + + public function get(): ?array + { + if (null === $this->queueEmptiedAt) { + return parent::get(); + } + + if (!$this->listening) { + // This is secure because the table name must be a valid identifier: + // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS + $this->driverConnection->exec(sprintf('LISTEN "%s"', $this->configuration['table_name'])); + $this->listening = true; + } + + $notification = $this->driverConnection->getWrappedConnection()->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']); + if ( + // no notifications, or for another table or queue + (false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) && + // delayed messages + (microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval']) + ) { + return null; + } + + return parent::get(); + } + + public function setup(): void + { + parent::setup(); + + $sql = sprintf(<<<'SQL' +LOCK TABLE %1$s; +-- create trigger function +CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('%1$s', NEW.queue_name::text); + RETURN NEW; + END; +$$ LANGUAGE plpgsql; + +-- register trigger +DROP TRIGGER IF EXISTS notify_trigger ON %1$s; + +CREATE TRIGGER notify_trigger +AFTER INSERT +ON %1$s +FOR EACH ROW EXECUTE PROCEDURE notify_%1$s(); +SQL + , $this->configuration['table_name']); + $this->driverConnection->exec($sql); + } + + private function unlisten() + { + if (!$this->listening) { + return; + } + + $this->driverConnection->exec(sprintf('UNLISTEN "%s"', $this->configuration['table_name'])); + $this->listening = false; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json b/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json index 9b4c738268..41652c8aae 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json @@ -19,7 +19,8 @@ "php": "^7.2.5", "doctrine/dbal": "^2.6", "doctrine/persistence": "^1.3", - "symfony/messenger": "^5.1" + "symfony/messenger": "^5.1", + "symfony/service-contracts": "^1.1|^2" }, "require-dev": { "symfony/serializer": "^4.4|^5.0",