[Messenger] Setup the doctrine transport when consuming
This commit is contained in:
parent
de7d7a9ceb
commit
b2f3b53253
@ -15,6 +15,7 @@ use Doctrine\DBAL\DBALException;
|
||||
use Doctrine\DBAL\Driver\Statement;
|
||||
use Doctrine\DBAL\Platforms\AbstractPlatform;
|
||||
use Doctrine\DBAL\Query\QueryBuilder;
|
||||
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
|
||||
@ -25,6 +26,7 @@ class ConnectionTest extends TestCase
|
||||
{
|
||||
$queryBuilder = $this->getQueryBuilderMock();
|
||||
$driverConnection = $this->getDBALConnectionMock();
|
||||
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
|
||||
$stmt = $this->getStatementMock([
|
||||
'id' => 1,
|
||||
'body' => '{"message":"Hi"}',
|
||||
@ -44,7 +46,7 @@ class ConnectionTest extends TestCase
|
||||
->method('prepare')
|
||||
->willReturn($stmt);
|
||||
|
||||
$connection = new Connection([], $driverConnection);
|
||||
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
||||
$doctrineEnvelope = $connection->get();
|
||||
$this->assertEquals(1, $doctrineEnvelope['id']);
|
||||
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
|
||||
@ -55,6 +57,7 @@ class ConnectionTest extends TestCase
|
||||
{
|
||||
$queryBuilder = $this->getQueryBuilderMock();
|
||||
$driverConnection = $this->getDBALConnectionMock();
|
||||
$schemaSynchronizer = $this->getSchemaSynchronizerMock();
|
||||
$stmt = $this->getStatementMock(false);
|
||||
|
||||
$queryBuilder
|
||||
@ -68,7 +71,7 @@ class ConnectionTest extends TestCase
|
||||
$driverConnection->expects($this->never())
|
||||
->method('update');
|
||||
|
||||
$connection = new Connection([], $driverConnection);
|
||||
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
||||
$doctrineEnvelope = $connection->get();
|
||||
$this->assertNull($doctrineEnvelope);
|
||||
}
|
||||
@ -142,6 +145,12 @@ class ConnectionTest extends TestCase
|
||||
return $stmt;
|
||||
}
|
||||
|
||||
private function getSchemaSynchronizerMock()
|
||||
{
|
||||
return $this->getMockBuilder(SchemaSynchronizer::class)
|
||||
->getMock();
|
||||
}
|
||||
|
||||
/**
|
||||
* @dataProvider buildConfigurationProvider
|
||||
*/
|
||||
|
@ -164,4 +164,17 @@ class DoctrineIntegrationTest extends TestCase
|
||||
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
|
||||
$this->connection->reject($next['id']);
|
||||
}
|
||||
|
||||
public function testTheTransportIsSetupOnGet()
|
||||
{
|
||||
// If the table does not exist and we call the get (i.e run messenger:consume) the table must be setup
|
||||
// so first delete the tables
|
||||
$this->driverConnection->exec('DROP TABLE messenger_messages');
|
||||
|
||||
$this->assertNull($this->connection->get());
|
||||
|
||||
$this->connection->send('the body', ['my' => 'header']);
|
||||
$envelope = $this->connection->get();
|
||||
$this->assertEquals('the body', $envelope['body']);
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ use Doctrine\DBAL\DBALException;
|
||||
use Doctrine\DBAL\Exception\TableNotFoundException;
|
||||
use Doctrine\DBAL\Query\QueryBuilder;
|
||||
use Doctrine\DBAL\Schema\Schema;
|
||||
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
|
||||
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
|
||||
use Doctrine\DBAL\Types\Type;
|
||||
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
|
||||
@ -50,11 +51,13 @@ class Connection
|
||||
*/
|
||||
private $configuration = [];
|
||||
private $driverConnection;
|
||||
private $schemaSynchronizer;
|
||||
|
||||
public function __construct(array $configuration, DBALConnection $driverConnection)
|
||||
public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
|
||||
{
|
||||
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
|
||||
$this->driverConnection = $driverConnection;
|
||||
$this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
|
||||
}
|
||||
|
||||
public function getConfiguration(): array
|
||||
@ -127,6 +130,9 @@ class Connection
|
||||
|
||||
public function get(): ?array
|
||||
{
|
||||
if ($this->configuration['auto_setup']) {
|
||||
$this->setup();
|
||||
}
|
||||
$this->driverConnection->beginTransaction();
|
||||
try {
|
||||
$query = $this->createAvailableMessagesQueryBuilder()
|
||||
@ -187,8 +193,7 @@ class Connection
|
||||
|
||||
public function setup(): void
|
||||
{
|
||||
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
|
||||
$synchronizer->updateSchema($this->getSchema(), true);
|
||||
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
|
||||
}
|
||||
|
||||
public function getMessageCount(): int
|
||||
|
Reference in New Issue
Block a user