[Messenger] DoctrineTransport: ensure auto setup is only done once
This commit is contained in:
parent
abf11d8ba8
commit
02414027e1
@ -21,29 +21,27 @@ use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||
*/
|
||||
class DoctrineIntegrationTest extends TestCase
|
||||
{
|
||||
/** @var \Doctrine\DBAL\Connection */
|
||||
private $driverConnection;
|
||||
/** @var Connection */
|
||||
private $connection;
|
||||
/** @var string */
|
||||
private $sqliteFile;
|
||||
|
||||
/**
|
||||
* @after
|
||||
*/
|
||||
public function cleanup()
|
||||
protected function setUp(): void
|
||||
{
|
||||
@unlink(sys_get_temp_dir().'/symfony.messenger.sqlite');
|
||||
}
|
||||
|
||||
/**
|
||||
* @before
|
||||
*/
|
||||
public function createConnection()
|
||||
{
|
||||
$dsn = getenv('MESSENGER_DOCTRINE_DSN') ?: 'sqlite:///'.sys_get_temp_dir().'/symfony.messenger.sqlite';
|
||||
$this->sqliteFile = sys_get_temp_dir().'/symfony.messenger.sqlite';
|
||||
$dsn = getenv('MESSENGER_DOCTRINE_DSN') ?: 'sqlite:///'.$this->sqliteFile;
|
||||
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
|
||||
$this->connection = new Connection([], $this->driverConnection);
|
||||
// call send to auto-setup the table
|
||||
$this->connection->setup();
|
||||
// ensure the table is clean for tests
|
||||
$this->driverConnection->exec('DELETE FROM messenger_messages');
|
||||
}
|
||||
|
||||
protected function tearDown(): void
|
||||
{
|
||||
$this->driverConnection->close();
|
||||
if (file_exists($this->sqliteFile)) {
|
||||
unlink($this->sqliteFile);
|
||||
}
|
||||
}
|
||||
|
||||
public function testConnectionSendAndGet()
|
||||
@ -75,6 +73,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testItRetrieveTheFirstAvailableMessage()
|
||||
{
|
||||
$this->connection->setup();
|
||||
// insert messages
|
||||
// one currently handled
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
@ -108,6 +107,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testItCountMessages()
|
||||
{
|
||||
$this->connection->setup();
|
||||
// insert messages
|
||||
// one currently handled
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
@ -148,6 +148,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
|
||||
{
|
||||
$this->connection->setup();
|
||||
$twoHoursAgo = new \DateTime('now');
|
||||
$twoHoursAgo->modify('-2 hours');
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
@ -173,10 +174,7 @@ class DoctrineIntegrationTest extends TestCase
|
||||
|
||||
public function testTheTransportIsSetupOnGet()
|
||||
{
|
||||
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
|
||||
// so first delete the tables
|
||||
$this->driverConnection->exec('DROP TABLE messenger_messages');
|
||||
|
||||
$this->assertFalse($this->driverConnection->getSchemaManager()->tablesExist('messenger_messages'));
|
||||
$this->assertNull($this->connection->get());
|
||||
|
||||
$this->connection->send('the body', ['my' => 'header']);
|
||||
|
@ -52,12 +52,14 @@ class Connection
|
||||
private $configuration = [];
|
||||
private $driverConnection;
|
||||
private $schemaSynchronizer;
|
||||
private $autoSetup;
|
||||
|
||||
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
|
||||
{
|
||||
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
|
||||
$this->driverConnection = $driverConnection;
|
||||
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
|
||||
$this->autoSetup = $this->configuration['auto_setup'];
|
||||
}
|
||||
|
||||
public function getConfiguration(): array
|
||||
@ -131,9 +133,7 @@ class Connection
|
||||
|
||||
public function get(): ?array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
get:
|
||||
$this->driverConnection->beginTransaction();
|
||||
try {
|
||||
$query = $this->createAvailableMessagesQueryBuilder()
|
||||
@ -170,6 +170,11 @@ class Connection
|
||||
} catch (\Throwable $e) {
|
||||
$this->driverConnection->rollBack();
|
||||
|
||||
if ($this->autoSetup && $e instanceof TableNotFoundException) {
|
||||
$this->setup();
|
||||
goto get;
|
||||
}
|
||||
|
||||
throw $e;
|
||||
}
|
||||
}
|
||||
@ -213,6 +218,8 @@ class Connection
|
||||
} else {
|
||||
$this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression($assetFilter);
|
||||
}
|
||||
|
||||
$this->autoSetup = false;
|
||||
}
|
||||
|
||||
public function getMessageCount(): int
|
||||
@ -226,10 +233,6 @@ class Connection
|
||||
|
||||
public function findAll(int $limit = null): array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
|
||||
if (null !== $limit) {
|
||||
$queryBuilder->setMaxResults($limit);
|
||||
@ -244,10 +247,6 @@ class Connection
|
||||
|
||||
public function find($id): ?array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
|
||||
$queryBuilder = $this->createQueryBuilder()
|
||||
->where('m.id = ?');
|
||||
|
||||
@ -288,8 +287,12 @@ class Connection
|
||||
$stmt = $this->driverConnection->prepare($sql);
|
||||
$stmt->execute($parameters);
|
||||
} catch (TableNotFoundException $e) {
|
||||
if ($this->driverConnection->isTransactionActive()) {
|
||||
throw $e;
|
||||
}
|
||||
|
||||
// create table
|
||||
if (!$this->driverConnection->isTransactionActive() && $this->configuration['auto_setup']) {
|
||||
if ($this->autoSetup) {
|
||||
$this->setup();
|
||||
}
|
||||
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
|
||||
|
Reference in New Issue
Block a user