feature #36655 Automatically provide Messenger Doctrine schema to "diff" (weaverryan)

This PR was squashed before being merged into the 5.1-dev branch.

Discussion
----------

Automatically provide Messenger Doctrine schema to "diff"

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | Alternative to #36629
| License       | MIT
| Doc PR        | TODO - WILL be needed

This follows this conversation: https://github.com/symfony/symfony/pull/36629#issuecomment-621745821 - it automatically adds SQL to Doctrine's migration/diff system when features are added the require a database table:

The new feature works for:

### A) Messenger Doctrine transport
**FULL support**
Works perfectly: configure a doctrine transport and run `make:migration`

**Note**: There is no current way to disable this. So if you have `auto_setup` ON and you
run `make:migration` before trying Messenger, it will generate the table SQL. Adding a
flag to disable it might be very complicated, because we need to know (in DoctrineBundle, at compile time) whether or not this feature is enabled/disabled so that we can decide *not* to add `messenger_messages` to the `schema_filter`.

### B) `PdoAdapter` from Cache
**FULL support**
Works perfectly: configure a doctrine transport and run `make:migration`

### C) `PdoStore` from Lock
**PARTIAL support**
I added `PdoStore::configureSchema()` but did NOT add a listener. While `PdoStore` *does* accept a DBAL `Connection`, I don't think it's possible via the `framework.lock` config to create a `PdoStore` that is passed a `Connection`. In other words: if we added a listener that called `PdoStore::configureSchema` if the user configured a `pdo` lock, that service will *never* have a `Connection` object... so it's kind of worthless.

**NEED**: A proper way to inject a DBAL `Connection` into `PdoStore` via `framework.lock` config.

### D) `PdoSessionHandler`
**NO support**

This class doesn't accept a DBAL `Connection` object. And so, we can't reliably create a listener to add the schema because (if there are multiple connections) we wouldn't know which Connection to use.

We could compare (`===`) the `PDO` instance inside `PdoSessionHandler` to the wrapped `PDO` connection in Doctrine. That would only work if the user has configured their `PdoSessionHandler` to re-use the Doctrine PDO connection.

The `PdoSessionHandler` *already* has a `createTable()` method on it to help with manual migration. But... it's not easy to call from a migration because you would need to fetch the `PdoSessionHandler` service from the container. Adding something

**NEED**: Either:

A) A way for `PdoSessionHandler` to use a DBAL Connection
or
B) We try to hack this feature by comparing the `PDO` instances in the event subscriber
or
C) We add an easier way to access the `createTable()` method from inside a migration.

TODOs

* [X] Determine service injection XML needed for getting all PdoAdapter pools
* [ ] Finish DoctrineBundle PR: https://github.com/doctrine/DoctrineBundle/pull/1163

Commits
-------

2dd9c3c3c8 Automatically provide Messenger Doctrine schema to "diff"
This commit is contained in:
Fabien Potencier 2020-05-05 08:13:58 +02:00
commit b9d41490fe
16 changed files with 566 additions and 27 deletions

View File

@ -0,0 +1,96 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\SchemaListener;
use Doctrine\Common\EventSubscriber;
use Doctrine\DBAL\Event\SchemaCreateTableEventArgs;
use Doctrine\DBAL\Events;
use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs;
use Doctrine\ORM\Tools\ToolEvents;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* Automatically adds any required database tables to the Doctrine Schema.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
final class MessengerTransportDoctrineSchemaSubscriber implements EventSubscriber
{
private const PROCESSING_TABLE_FLAG = self::class.':processing';
private $transports;
/**
* @param iterable|TransportInterface[] $transports
*/
public function __construct(iterable $transports)
{
$this->transports = $transports;
}
public function postGenerateSchema(GenerateSchemaEventArgs $event): void
{
$dbalConnection = $event->getEntityManager()->getConnection();
foreach ($this->transports as $transport) {
if (!$transport instanceof DoctrineTransport) {
continue;
}
$transport->configureSchema($event->getSchema(), $dbalConnection);
}
}
public function onSchemaCreateTable(SchemaCreateTableEventArgs $event): void
{
$table = $event->getTable();
// if this method triggers a nested create table below, allow Doctrine to work like normal
if ($table->hasOption(self::PROCESSING_TABLE_FLAG)) {
return;
}
foreach ($this->transports as $transport) {
if (!$transport instanceof DoctrineTransport) {
continue;
}
$extraSql = $transport->getExtraSetupSqlForTable($table);
if (null === $extraSql) {
continue;
}
// avoid this same listener from creating a loop on this table
$table->addOption(self::PROCESSING_TABLE_FLAG, true);
$createTableSql = $event->getPlatform()->getCreateTableSQL($table);
/*
* Add all the SQL needed to create the table and tell Doctrine
* to "preventDefault" so that only our SQL is used. This is
* the only way to inject some extra SQL.
*/
$event->addSql($createTableSql);
$event->addSql($extraSql);
$event->preventDefault();
return;
}
}
public function getSubscribedEvents(): array
{
return [
ToolEvents::postGenerateSchema,
Events::onSchemaCreateTable,
];
}
}

View File

@ -0,0 +1,50 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\SchemaListener;
use Doctrine\Common\EventSubscriber;
use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs;
use Doctrine\ORM\Tools\ToolEvents;
use Symfony\Component\Cache\Adapter\PdoAdapter;
/**
* Automatically adds the cache table needed for the PdoAdapter.
*
* @author Ryan Weaver <ryan@symfonycasts.com>
*/
final class PdoCacheAdapterDoctrineSchemaSubscriber implements EventSubscriber
{
private $pdoAdapters;
/**
* @param iterable|PdoAdapter[] $pdoAdapters
*/
public function __construct(iterable $pdoAdapters)
{
$this->pdoAdapters = $pdoAdapters;
}
public function postGenerateSchema(GenerateSchemaEventArgs $event): void
{
$dbalConnection = $event->getEntityManager()->getConnection();
foreach ($this->pdoAdapters as $pdoAdapter) {
$pdoAdapter->configureSchema($event->getSchema(), $dbalConnection);
}
}
public function getSubscribedEvents(): array
{
return [
ToolEvents::postGenerateSchema,
];
}
}

View File

@ -0,0 +1,99 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Tests\SchemaListener;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Event\SchemaCreateTableEventArgs;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs;
use PHPUnit\Framework\TestCase;
use Symfony\Bridge\Doctrine\SchemaListener\MessengerTransportDoctrineSchemaSubscriber;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\Transport\TransportInterface;
class MessengerTransportDoctrineSchemaSubscriberTest extends TestCase
{
public function testPostGenerateSchema()
{
$schema = new Schema();
$dbalConnection = $this->createMock(Connection::class);
$entityManager = $this->createMock(EntityManagerInterface::class);
$entityManager->expects($this->once())
->method('getConnection')
->willReturn($dbalConnection);
$event = new GenerateSchemaEventArgs($entityManager, $schema);
$doctrineTransport = $this->createMock(DoctrineTransport::class);
$doctrineTransport->expects($this->once())
->method('configureSchema')
->with($schema, $dbalConnection);
$otherTransport = $this->createMock(TransportInterface::class);
$otherTransport->expects($this->never())
->method($this->anything());
$subscriber = new MessengerTransportDoctrineSchemaSubscriber([$doctrineTransport, $otherTransport]);
$subscriber->postGenerateSchema($event);
}
public function testOnSchemaCreateTable()
{
$platform = $this->createMock(AbstractPlatform::class);
$table = new Table('queue_table');
$event = new SchemaCreateTableEventArgs($table, [], [], $platform);
$otherTransport = $this->createMock(TransportInterface::class);
$otherTransport->expects($this->never())
->method($this->anything());
$doctrineTransport = $this->createMock(DoctrineTransport::class);
$doctrineTransport->expects($this->once())
->method('getExtraSetupSqlForTable')
->with($table)
->willReturn('ALTER TABLE pizza ADD COLUMN extra_cheese boolean');
// we use the platform to generate the full create table sql
$platform->expects($this->once())
->method('getCreateTableSQL')
->with($table)
->willReturn('CREATE TABLE pizza (id integer NOT NULL)');
$subscriber = new MessengerTransportDoctrineSchemaSubscriber([$otherTransport, $doctrineTransport]);
$subscriber->onSchemaCreateTable($event);
$this->assertTrue($event->isDefaultPrevented());
$this->assertSame([
'CREATE TABLE pizza (id integer NOT NULL)',
'ALTER TABLE pizza ADD COLUMN extra_cheese boolean',
], $event->getSql());
}
public function testOnSchemaCreateTableNoExtraSql()
{
$platform = $this->createMock(AbstractPlatform::class);
$table = new Table('queue_table');
$event = new SchemaCreateTableEventArgs($table, [], [], $platform);
$doctrineTransport = $this->createMock(DoctrineTransport::class);
$doctrineTransport->expects($this->once())
->method('getExtraSetupSqlForTable')
->willReturn(null);
$platform->expects($this->never())
->method('getCreateTableSQL');
$subscriber = new MessengerTransportDoctrineSchemaSubscriber([$doctrineTransport]);
$subscriber->onSchemaCreateTable($event);
$this->assertFalse($event->isDefaultPrevented());
}
}

View File

@ -0,0 +1,42 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Bridge\Doctrine\Tests\SchemaListener;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Tools\Event\GenerateSchemaEventArgs;
use PHPUnit\Framework\TestCase;
use Symfony\Bridge\Doctrine\SchemaListener\PdoCacheAdapterDoctrineSchemaSubscriber;
use Symfony\Component\Cache\Adapter\PdoAdapter;
class PdoCacheAdapterDoctrineSchemaSubscriberTest extends TestCase
{
public function testPostGenerateSchema()
{
$schema = new Schema();
$dbalConnection = $this->createMock(Connection::class);
$entityManager = $this->createMock(EntityManagerInterface::class);
$entityManager->expects($this->once())
->method('getConnection')
->willReturn($dbalConnection);
$event = new GenerateSchemaEventArgs($entityManager, $schema);
$pdoAdapter = $this->createMock(PdoAdapter::class);
$pdoAdapter->expects($this->once())
->method('configureSchema')
->with($schema, $dbalConnection);
$subscriber = new PdoCacheAdapterDoctrineSchemaSubscriber([$pdoAdapter]);
$subscriber->postGenerateSchema($event);
}
}

View File

@ -26,11 +26,13 @@
},
"require-dev": {
"symfony/stopwatch": "^4.4|^5.0",
"symfony/cache": "^5.1",
"symfony/config": "^4.4|^5.0",
"symfony/dependency-injection": "^4.4|^5.0",
"symfony/form": "^5.1",
"symfony/http-kernel": "^5.0",
"symfony/messenger": "^4.4|^5.0",
"symfony/doctrine-messenger": "^5.1",
"symfony/property-access": "^4.4|^5.0",
"symfony/property-info": "^5.0",
"symfony/proxy-manager-bridge": "^4.4|^5.0",

View File

@ -115,24 +115,8 @@ class PdoAdapter extends AbstractAdapter implements PruneableInterface
$conn = $this->getConnection();
if ($conn instanceof Connection) {
$types = [
'mysql' => 'binary',
'sqlite' => 'text',
'pgsql' => 'string',
'oci' => 'string',
'sqlsrv' => 'string',
];
if (!isset($types[$this->driver])) {
throw new \DomainException(sprintf('Creating the cache table is currently not implemented for PDO driver "%s".', $this->driver));
}
$schema = new Schema();
$table = $schema->createTable($this->table);
$table->addColumn($this->idCol, $types[$this->driver], ['length' => 255]);
$table->addColumn($this->dataCol, 'blob', ['length' => 16777215]);
$table->addColumn($this->lifetimeCol, 'integer', ['unsigned' => true, 'notnull' => false]);
$table->addColumn($this->timeCol, 'integer', ['unsigned' => true]);
$table->setPrimaryKey([$this->idCol]);
$this->addTableToSchema($schema);
foreach ($schema->toSql($conn->getDatabasePlatform()) as $sql) {
$conn->exec($sql);
@ -169,6 +153,23 @@ class PdoAdapter extends AbstractAdapter implements PruneableInterface
$conn->exec($sql);
}
/**
* Adds the Table to the Schema if the adapter uses this Connection.
*/
public function configureSchema(Schema $schema, Connection $forConnection): void
{
// only update the schema for this connection
if ($forConnection !== $this->getConnection()) {
return;
}
if ($schema->hasTable($this->table)) {
return;
}
$this->addTableToSchema($schema);
}
/**
* {@inheritdoc}
*/
@ -467,4 +468,25 @@ class PdoAdapter extends AbstractAdapter implements PruneableInterface
return $this->serverVersion;
}
private function addTableToSchema(Schema $schema): void
{
$types = [
'mysql' => 'binary',
'sqlite' => 'text',
'pgsql' => 'string',
'oci' => 'string',
'sqlsrv' => 'string',
];
if (!isset($types[$this->driver])) {
throw new \DomainException(sprintf('Creating the cache table is currently not implemented for PDO driver "%s".', $this->driver));
}
$table = $schema->createTable($this->table);
$table->addColumn($this->idCol, $types[$this->driver], ['length' => 255]);
$table->addColumn($this->dataCol, 'blob', ['length' => 16777215]);
$table->addColumn($this->lifetimeCol, 'integer', ['unsigned' => true, 'notnull' => false]);
$table->addColumn($this->timeCol, 'integer', ['unsigned' => true]);
$table->setPrimaryKey([$this->idCol]);
}
}

View File

@ -11,7 +11,10 @@
namespace Symfony\Component\Cache\Tests\Adapter;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Driver;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\Schema;
use Psr\Cache\CacheItemPoolInterface;
use Symfony\Component\Cache\Adapter\PdoAdapter;
use Symfony\Component\Cache\Tests\Traits\PdoPruneableTrait;
@ -43,4 +46,50 @@ class PdoDbalAdapterTest extends AdapterTestCase
{
return new PdoAdapter(DriverManager::getConnection(['driver' => 'pdo_sqlite', 'path' => self::$dbFile]), '', $defaultLifetime);
}
public function testConfigureSchema()
{
$connection = DriverManager::getConnection(['driver' => 'pdo_sqlite', 'path' => self::$dbFile]);
$schema = new Schema();
$adapter = new PdoAdapter($connection);
$adapter->configureSchema($schema, $connection);
$this->assertTrue($schema->hasTable('cache_items'));
}
public function testConfigureSchemaDifferentDbalConnection()
{
$otherConnection = $this->createConnectionMock();
$schema = new Schema();
$adapter = $this->createCachePool();
$adapter->configureSchema($schema, $otherConnection);
$this->assertFalse($schema->hasTable('cache_items'));
}
public function testConfigureSchemaTableExists()
{
$connection = DriverManager::getConnection(['driver' => 'pdo_sqlite', 'path' => self::$dbFile]);
$schema = new Schema();
$schema->createTable('cache_items');
$adapter = new PdoAdapter($connection);
$adapter->configureSchema($schema, $connection);
$table = $schema->getTable('cache_items');
$this->assertEmpty($table->getColumns(), 'The table was not overwritten');
}
private function createConnectionMock()
{
$connection = $this->createMock(Connection::class);
$driver = $this->createMock(Driver::class);
$driver->expects($this->any())
->method('getName')
->willReturn('pdo_mysql');
$connection->expects($this->any())
->method('getDriver')
->willReturn($driver);
return $connection;
}
}

View File

@ -146,7 +146,7 @@ class PdoStore implements PersistingStoreInterface
public function putOffExpiration(Key $key, float $ttl)
{
if ($ttl < 1) {
throw new InvalidTtlException(sprintf('"%s()" expects a TTL greater or equals to 1 second. Got %s.', __METHOD__, $ttl));
throw new InvalidTtlException(sprintf('"%s()" expects a TTL greater or equals to 1 second. Got "%s".', __METHOD__, $ttl));
}
$key->reduceLifetime($ttl);
@ -249,11 +249,7 @@ class PdoStore implements PersistingStoreInterface
if ($conn instanceof Connection) {
$schema = new Schema();
$table = $schema->createTable($this->table);
$table->addColumn($this->idCol, 'string', ['length' => 64]);
$table->addColumn($this->tokenCol, 'string', ['length' => 44]);
$table->addColumn($this->expirationCol, 'integer', ['unsigned' => true]);
$table->setPrimaryKey([$this->idCol]);
$this->addTableToSchema($schema);
foreach ($schema->toSql($conn->getDatabasePlatform()) as $sql) {
$conn->exec($sql);
@ -285,6 +281,22 @@ class PdoStore implements PersistingStoreInterface
$conn->exec($sql);
}
/**
* Adds the Table to the Schema if it doesn't exist.
*/
public function configureSchema(Schema $schema): void
{
if (!$this->getConnection() instanceof Connection) {
throw new \BadMethodCallException(sprintf('"%s::%s()" is only supported when using a doctrine/dbal Connection.', __CLASS__, __METHOD__));
}
if ($schema->hasTable($this->table)) {
return;
}
$this->addTableToSchema($schema);
}
/**
* Cleans up the table by removing all expired locks.
*/
@ -351,4 +363,13 @@ class PdoStore implements PersistingStoreInterface
return time();
}
}
private function addTableToSchema(Schema $schema): void
{
$table = $schema->createTable($this->table);
$table->addColumn($this->idCol, 'string', ['length' => 64]);
$table->addColumn($this->tokenCol, 'string', ['length' => 44]);
$table->addColumn($this->expirationCol, 'integer', ['unsigned' => true]);
$table->setPrimaryKey([$this->idCol]);
}
}

View File

@ -11,7 +11,9 @@
namespace Symfony\Component\Lock\Tests\Store;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\Schema;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Lock\Store\PdoStore;
@ -59,4 +61,12 @@ class PdoDbalStoreTest extends AbstractStoreTest
{
$this->markTestSkipped('Pdo expects a TTL greater than 1 sec. Simulating a slow network is too hard');
}
public function testConfigureSchema()
{
$store = new PdoStore($this->createMock(Connection::class), ['db_table' => 'lock_table']);
$schema = new Schema();
$store->configureSchema($schema);
$this->assertTrue($schema->hasTable('lock_table'));
}
}

View File

@ -16,6 +16,7 @@ use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use PHPUnit\Framework\TestCase;
@ -343,4 +344,37 @@ class ConnectionTest extends TestCase
$this->assertEquals('{"message":"Hi again"}', $doctrineEnvelopes[1]['body']);
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelopes[1]['headers']);
}
public function testConfigureSchema()
{
$driverConnection = $this->getDBALConnectionMock();
$schema = new Schema();
$connection = new Connection(['table_name' => 'queue_table'], $driverConnection);
$connection->configureSchema($schema, $driverConnection);
$this->assertTrue($schema->hasTable('queue_table'));
}
public function testConfigureSchemaDifferentDbalConnection()
{
$driverConnection = $this->getDBALConnectionMock();
$driverConnection2 = $this->getDBALConnectionMock();
$schema = new Schema();
$connection = new Connection([], $driverConnection);
$connection->configureSchema($schema, $driverConnection2);
$this->assertFalse($schema->hasTable('messenger_messages'));
}
public function testConfigureSchemaTableExists()
{
$driverConnection = $this->getDBALConnectionMock();
$schema = new Schema();
$schema->createTable('messenger_messages');
$connection = new Connection([], $driverConnection);
$connection->configureSchema($schema, $driverConnection);
$table = $schema->getTable('messenger_messages');
$this->assertEmpty($table->getColumns(), 'The table was not overwritten');
}
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
use Doctrine\DBAL\Connection as DbalConnection;
use Doctrine\DBAL\Schema\Schema;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
@ -50,6 +52,23 @@ class DoctrineTransportTest extends TestCase
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}
public function testConfigureSchema()
{
$transport = $this->getTransport(
null,
$connection = $this->createMock(Connection::class)
);
$schema = new Schema();
$dbalConnection = $this->createMock(DbalConnection::class);
$connection->expects($this->once())
->method('configureSchema')
->with($schema, $dbalConnection);
$transport->configureSchema($schema, $dbalConnection);
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null): DoctrineTransport
{
$serializer = $serializer ?: $this->createMock(SerializerInterface::class);

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Table;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
@ -43,4 +44,24 @@ class PostgreSqlConnectionTest extends TestCase
$connection = new PostgreSqlConnection([], $driverConnection, $schemaSynchronizer);
$connection->__wakeup();
}
public function testGetExtraSetupSql()
{
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
$connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection);
$table = new Table('queue_table');
$table->addOption('_symfony_messenger_table_name', 'queue_table');
$this->assertStringContainsString('CREATE TRIGGER', $connection->getExtraSetupSqlForTable($table));
}
public function testGetExtraSetupSqlWrongTable()
{
$driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class);
$connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection);
$table = new Table('queue_table');
// don't set the _symfony_messenger_table_name option
$this->assertNull($connection->getExtraSetupSqlForTable($table));
}
}

View File

@ -19,6 +19,7 @@ use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
@ -33,6 +34,8 @@ use Symfony\Contracts\Service\ResetInterface;
*/
class Connection implements ResetInterface
{
protected const TABLE_OPTION_NAME = '_symfony_messenger_table_name';
protected const DEFAULT_OPTIONS = [
'table_name' => 'messenger_messages',
'queue_name' => 'default',
@ -290,6 +293,31 @@ class Connection implements ResetInterface
return false === $data ? null : $this->decodeEnvelopeHeaders($data);
}
/**
* @internal
*/
public function configureSchema(Schema $schema, DBALConnection $forConnection): void
{
// only update the schema for this connection
if ($forConnection !== $this->driverConnection) {
return;
}
if ($schema->hasTable($this->configuration['table_name'])) {
return;
}
$this->addTableToSchema($schema);
}
/**
* @internal
*/
public function getExtraSetupSqlForTable(Table $createdTable): ?string
{
return null;
}
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = new \DateTime();
@ -341,7 +369,16 @@ class Connection implements ResetInterface
private function getSchema(): Schema
{
$schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig());
$this->addTableToSchema($schema);
return $schema;
}
private function addTableToSchema(Schema $schema): void
{
$table = $schema->createTable($this->configuration['table_name']);
// add an internal option to mark that we created this & the non-namespaced table name
$table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']);
$table->addColumn('id', self::$useDeprecatedConstants ? Type::BIGINT : Types::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
@ -361,8 +398,6 @@ class Connection implements ResetInterface
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);
return $schema;
}
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array

View File

@ -11,6 +11,9 @@
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Connection as DbalConnection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@ -98,6 +101,22 @@ class DoctrineTransport implements TransportInterface, SetupableTransportInterfa
$this->connection->setup();
}
/**
* Adds the Table to the Schema if this transport uses this connection.
*/
public function configureSchema(Schema $schema, DbalConnection $forConnection): void
{
$this->connection->configureSchema($schema, $forConnection);
}
/**
* Adds extra SQL if the given table was created by the Connection.
*/
public function getExtraSetupSqlForTable(Table $createdTable): ?string
{
return $this->connection->getExtraSetupSqlForTable($createdTable);
}
private function getReceiver(): DoctrineReceiver
{
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Schema\Table;
/**
* Uses PostgreSQL LISTEN/NOTIFY to push messages to workers.
*
@ -83,7 +85,25 @@ final class PostgreSqlConnection extends Connection
{
parent::setup();
$sql = sprintf(<<<'SQL'
$this->driverConnection->exec($this->getTriggerSql());
}
public function getExtraSetupSqlForTable(Table $createdTable): ?string
{
if (!$createdTable->hasOption(self::TABLE_OPTION_NAME)) {
return null;
}
if ($createdTable->getOption(self::TABLE_OPTION_NAME) !== $this->configuration['table_name']) {
return null;
}
return $this->getTriggerSql();
}
private function getTriggerSql(): string
{
return sprintf(<<<'SQL'
LOCK TABLE %1$s;
-- create trigger function
CREATE OR REPLACE FUNCTION notify_%1$s() RETURNS TRIGGER AS $$
@ -102,7 +122,6 @@ ON %1$s
FOR EACH ROW EXECUTE PROCEDURE notify_%1$s();
SQL
, $this->configuration['table_name']);
$this->driverConnection->exec($sql);
}
private function unlisten()

View File

@ -23,6 +23,7 @@
"symfony/service-contracts": "^1.1|^2"
},
"require-dev": {
"doctrine/orm": "^2.6.3",
"symfony/serializer": "^4.4|^5.0",
"symfony/property-access": "^4.4|^5.0"
},