[Messenger] Setup the doctrine transport when consuming

This commit is contained in:
Vincent Touzet 2019-04-03 23:16:34 +02:00
parent de7d7a9ceb
commit b2f3b53253
3 changed files with 32 additions and 5 deletions

View File

@ -15,6 +15,7 @@ use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
@ -25,6 +26,7 @@ class ConnectionTest extends TestCase
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
$stmt = $this->getStatementMock([
'id' => 1,
'body' => '{"message":"Hi"}',
@ -44,7 +46,7 @@ class ConnectionTest extends TestCase
->method('prepare')
->willReturn($stmt);
$connection = new Connection([], $driverConnection);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
@ -55,6 +57,7 @@ class ConnectionTest extends TestCase
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
$stmt = $this->getStatementMock(false);
$queryBuilder
@ -68,7 +71,7 @@ class ConnectionTest extends TestCase
$driverConnection->expects($this->never())
->method('update');
$connection = new Connection([], $driverConnection);
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}
@ -142,6 +145,12 @@ class ConnectionTest extends TestCase
return $stmt;
}
private function getSchemaSynchronizerMock()
{
return $this->getMockBuilder(SchemaSynchronizer::class)
->getMock();
}
/**
* @dataProvider buildConfigurationProvider
*/

View File

@ -164,4 +164,17 @@ class DoctrineIntegrationTest extends TestCase
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
}
public function testTheTransportIsSetupOnGet()
{
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
// so first delete the tables
$this->driverConnection->exec('DROP TABLE messenger_messages');
$this->assertNull($this->connection->get());
$this->connection->send('the body', ['my' => 'header']);
$envelope = $this->connection->get();
$this->assertEquals('the body', $envelope['body']);
}
}

View File

@ -16,6 +16,7 @@ use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
@ -50,11 +51,13 @@ class Connection
*/
private $configuration = [];
private $driverConnection;
private $schemaSynchronizer;
public function __construct(array $configuration, DBALConnection $driverConnection)
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
}
public function getConfiguration(): array
@ -127,6 +130,9 @@ class Connection
public function get(): ?array
{
if ($this->configuration['auto_setup']) {
$this->setup();
}
$this->driverConnection->beginTransaction();
try {
$query = $this->createAvailableMessagesQueryBuilder()
@ -187,8 +193,7 @@ class Connection
public function setup(): void
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema(), true);
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
}
public function getMessageCount(): int