bug #30858 [Messenger] Setup the doctrine transport when consuming (vincenttouzet)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Setup the doctrine transport when consuming

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        |

Currently there is an error when the table does not exists and we run the `messenger:consume` command.

This is because all queries made in the `get` method of the `Connection` are in a transaction. Therefore the table is not created.

To avoid this error I added a call to the `setup` method before starting the transaction.

I needed to add the SchemaSynchronizer as construct parameter for the tests

Commits
-------

b2f3b53253 [Messenger] Setup the doctrine transport when consuming
This commit is contained in:
Fabien Potencier 2019-04-04 10:38:26 +02:00
commit 4dfb741330
3 changed files with 32 additions and 5 deletions

View File

@ -15,6 +15,7 @@ use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@ -25,6 +26,7 @@ class ConnectionTest extends TestCase
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
$stmt = $this->getStatementMock([
'id' => 1,
'body' => '{"message":"Hi"}',
@ -44,7 +46,7 @@ class ConnectionTest extends TestCase
->method('prepare')
->willReturn($stmt);
$connection = new Connection([], $driverConnection);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
@ -55,6 +57,7 @@ class ConnectionTest extends TestCase
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
$stmt = $this->getStatementMock(false);
$queryBuilder
@ -68,7 +71,7 @@ class ConnectionTest extends TestCase
$driverConnection->expects($this->never())
->method('update');
$connection = new Connection([], $driverConnection);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}
@ -142,6 +145,12 @@ class ConnectionTest extends TestCase
return $stmt;
}
private function getSchemaSynchronizerMock()
{
return $this->getMockBuilder(SchemaSynchronizer::class)
->getMock();
}
/**
* @dataProvider buildConfigurationProvider
*/

View File

@ -164,4 +164,17 @@ class DoctrineIntegrationTest extends TestCase
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
}
public function testTheTransportIsSetupOnGet()
{
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
// so first delete the tables
$this->driverConnection->exec('DROP TABLE messenger_messages');
$this->assertNull($this->connection->get());
$this->connection->send('the body', ['my' => 'header']);
$envelope = $this->connection->get();
$this->assertEquals('the body', $envelope['body']);
}
}

View File

@ -16,6 +16,7 @@ use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
@ -50,11 +51,13 @@ class Connection
*/
private $configuration = [];
private $driverConnection;
private $schemaSynchronizer;
public function __construct(array $configuration, DBALConnection $driverConnection)
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
}
public function getConfiguration(): array
@ -127,6 +130,9 @@ class Connection
public function get(): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$this->driverConnection->beginTransaction();
try {
$query = $this->createAvailableMessagesQueryBuilder()
@ -187,8 +193,7 @@ class Connection
public function setup(): void
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema(), true);
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
}
public function getMessageCount(): int