bug #32308 [Messenger] DoctrineTransport: ensure auto setup is only done once (bendavies)
This PR was squashed before being merged into the 4.3 branch (closes #32308).
Discussion
----------
[Messenger] DoctrineTransport: ensure auto setup is only done once
| Q | A
| ------------- | ---
| Branch? | 4.3
| Bug fix? | yes
| New feature? | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks? | no <!-- see https://symfony.com/bc -->
| Deprecations? |no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass? | yes <!-- please add some, will be required by reviewers -->
| Fixed tickets | #... <!-- #-prefixed issue number(s), if any -->
| License | MIT
| Doc PR | symfony/symfony-docs#... <!-- required for new features -->
Setup is done for every invocation of `get`, `find`, and `findAll`.
For `get`, this causes message consumption to be very slow, as slow as 1 message per second on my moderately sized schema, because a schema diff is done in `setup` every `get`.
I'm not sure what the desired behaviour is here, but it seems like we should try performing a query, fail once, `setup`, and retry. This will the same approach taken by the PDO Cache.
However, we still need auto setup in `get`, as `get` starts a transaction, so the auto setup in `executeQuery` won't be called.
Further more, the same problem seems to exist for the AMPQ Transport, but the performance impact should be less there, but i have not tried.
Commits
-------
02414027e1
[Messenger] DoctrineTransport: ensure auto setup is only done once
This commit is contained in:
commit
38b3dc898e
@ -21,29 +21,27 @@ use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||
*/
|
||||
class DoctrineIntegrationTest extends TestCase
|
||||
{
|
||||
/** @var \Doctrine\DBAL\Connection */
|
||||
private $driverConnection;
|
||||
/** @var Connection */
|
||||
private $connection;
|
||||
/** @var string */
|
||||
private $sqliteFile;
|
||||
|
||||
/**
|
||||
* @after
|
||||
*/
|
||||
public function cleanup()
|
||||
protected function setUp(): void
|
||||
{
|
||||
@unlink(sys_get_temp_dir().'/symfony.messenger.sqlite');
|
||||
}
|
||||
|
||||
/**
|
||||
* @before
|
||||
*/
|
||||
public function createConnection()
|
||||
{
|
||||
$dsn = getenv('MESSENGER_DOCTRINE_DSN') ?: 'sqlite:///'.sys_get_temp_dir().'/symfony.messenger.sqlite';
|
||||
$this->sqliteFile = sys_get_temp_dir().'/symfony.messenger.sqlite';
|
||||
$dsn = getenv('MESSENGER_DOCTRINE_DSN') ?: 'sqlite:///'.$this->sqliteFile;
|
||||
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
|
||||
$this->connection = new Connection([], $this->driverConnection);
|
||||
// call send to auto-setup the table
|
||||
$this->connection->setup();
|
||||
// ensure the table is clean for tests
|
||||
$this->driverConnection->exec('DELETE FROM messenger_messages');
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$this->driverConnection->close();
|
||||
if (file_exists($this->sqliteFile)) {
|
||||
unlink($this->sqliteFile);
|
||||
}
|
||||
}
|
||||
|
||||
public function testConnectionSendAndGet()
|
||||
@ -75,6 +73,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testItRetrieveTheFirstAvailableMessage()
|
||||
{
|
||||
$this->connection->setup();
|
||||
// insert messages
|
||||
// one currently handled
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
@ -108,6 +107,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testItCountMessages()
|
||||
{
|
||||
$this->connection->setup();
|
||||
// insert messages
|
||||
// one currently handled
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
@ -148,6 +148,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
|
||||
{
|
||||
$this->connection->setup();
|
||||
$twoHoursAgo = new \DateTime('now');
|
||||
$twoHoursAgo->modify('-2 hours');
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
@ -173,10 +174,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testTheTransportIsSetupOnGet()
|
||||
{
|
||||
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
|
||||
// so first delete the tables
|
||||
$this->driverConnection->exec('DROP TABLE messenger_messages');
|
||||
|
||||
$this->assertFalse($this->driverConnection->getSchemaManager()->tablesExist('messenger_messages'));
|
||||
$this->assertNull($this->connection->get());
|
||||
|
||||
$this->connection->send('the body', ['my' => 'header']);
|
||||
|
@ -52,12 +52,14 @@ class Connection
|
||||
private $configuration = [];
|
||||
private $driverConnection;
|
||||
private $schemaSynchronizer;
|
||||
private $autoSetup;
|
||||
|
||||
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
|
||||
{
|
||||
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
|
||||
$this->driverConnection = $driverConnection;
|
||||
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
|
||||
$this->autoSetup = $this->configuration['auto_setup'];
|
||||
}
|
||||
|
||||
public function getConfiguration(): array
|
||||
@ -131,9 +133,7 @@ class Connection
|
||||
|
||||
public function get(): ?array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
get:
|
||||
$this->driverConnection->beginTransaction();
|
||||
try {
|
||||
$query = $this->createAvailableMessagesQueryBuilder()
|
||||
@ -170,6 +170,11 @@ class Connection
|
||||
} catch (\Throwable $e) {
|
||||
$this->driverConnection->rollBack();
|
||||
|
||||
if ($this->autoSetup && $e instanceof TableNotFoundException) {
|
||||
$this->setup();
|
||||
goto get;
|
||||
}
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
@ -213,6 +218,8 @@ class Connection
|
||||
} else {
|
||||
$this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression($assetFilter);
|
||||
}
|
||||
|
||||
$this->autoSetup = false;
|
||||
}
|
||||
|
||||
public function getMessageCount(): int
|
||||
@ -226,10 +233,6 @@ class Connection
|
||||
|
||||
public function findAll(int $limit = null): array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
|
||||
if (null !== $limit) {
|
||||
$queryBuilder->setMaxResults($limit);
|
||||
@ -244,10 +247,6 @@ class Connection
|
||||
|
||||
public function find($id): ?array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createQueryBuilder()
|
||||
->where('m.id = ?');
|
||||
|
||||
@ -288,8 +287,12 @@ class Connection
|
||||
$stmt = $this->driverConnection->prepare($sql);
|
||||
$stmt->execute($parameters);
|
||||
} catch (TableNotFoundException $e) {
|
||||
if ($this->driverConnection->isTransactionActive()) {
|
||||
throw $e;
|
||||
}
|
||||
|
||||
// create table
|
||||
if (!$this->driverConnection->isTransactionActive() && $this->configuration['auto_setup']) {
|
||||
if ($this->autoSetup) {
|
||||
$this->setup();
|
||||
}
|
||||
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
|
||||
|
Reference in New Issue
Block a user