diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php index 6e7724d506..b7ac30bee1 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php @@ -44,8 +44,11 @@ class ConnectionTest extends TestCase $queryBuilder ->method('getParameters') ->willReturn([]); + $queryBuilder + ->method('getParameterTypes') + ->willReturn([]); $driverConnection - ->method('prepare') + ->method('executeQuery') ->willReturn($stmt); $connection = new Connection([], $driverConnection, $schemaSynchronizer); @@ -65,13 +68,17 @@ class ConnectionTest extends TestCase $queryBuilder ->method('getParameters') ->willReturn([]); + $queryBuilder + ->method('getParameterTypes') + ->willReturn([]); $driverConnection->expects($this->once()) ->method('createQueryBuilder') ->willReturn($queryBuilder); - $driverConnection->method('prepare') - ->willReturn($stmt); $driverConnection->expects($this->never()) ->method('update'); + $driverConnection + ->method('executeQuery') + ->willReturn($stmt); $connection = new Connection([], $driverConnection, $schemaSynchronizer); $doctrineEnvelope = $connection->get(); @@ -273,7 +280,7 @@ class ConnectionTest extends TestCase ->method('getParameters') ->willReturn([]); $driverConnection - ->method('prepare') + ->method('executeQuery') ->willReturn($stmt); $connection = new Connection([], $driverConnection, $schemaSynchronizer); @@ -316,8 +323,11 @@ class ConnectionTest extends TestCase $queryBuilder ->method('getParameters') ->willReturn([]); + $queryBuilder + ->method('getParameterTypes') + ->willReturn([]); $driverConnection - ->method('prepare') + ->method('executeQuery') ->willReturn($stmt); $connection = new Connection([], $driverConnection, $schemaSynchronizer); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php index fa05a43b0b..a01e68db39 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php @@ -80,25 +80,25 @@ class DoctrineIntegrationTest extends TestCase 'body' => '{"message": "Hi handled"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'delivered_at' => Connection::formatDateTime(new \DateTime()), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'delivered_at' => $this->formatDateTime(new \DateTime()), ]); // one available later $this->driverConnection->insert('messenger_messages', [ 'body' => '{"message": "Hi delayed"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 13:00:00')), ]); // one available $this->driverConnection->insert('messenger_messages', [ 'body' => '{"message": "Hi available"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')), ]); $encoded = $this->connection->get(); @@ -114,33 +114,33 @@ class DoctrineIntegrationTest extends TestCase 'body' => '{"message": "Hi handled"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'delivered_at' => Connection::formatDateTime(new \DateTime()), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'delivered_at' => $this->formatDateTime(new \DateTime()), ]); // one available later $this->driverConnection->insert('messenger_messages', [ 'body' => '{"message": "Hi delayed"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime((new \DateTime())->modify('+1 minute')), ]); // one available $this->driverConnection->insert('messenger_messages', [ 'body' => '{"message": "Hi available"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')), ]); // another available $this->driverConnection->insert('messenger_messages', [ 'body' => '{"message": "Hi available"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')), ]); $this->assertSame(2, $this->connection->getMessageCount()); @@ -155,16 +155,16 @@ class DoctrineIntegrationTest extends TestCase 'body' => '{"message": "Hi requeued"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'delivered_at' => Connection::formatDateTime($twoHoursAgo), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'delivered_at' => $this->formatDateTime($twoHoursAgo), ]); $this->driverConnection->insert('messenger_messages', [ 'body' => '{"message": "Hi available"}', 'headers' => json_encode(['type' => DummyMessage::class]), 'queue_name' => 'default', - 'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')), - 'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')), + 'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')), + 'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')), ]); $next = $this->connection->get(); @@ -181,4 +181,9 @@ class DoctrineIntegrationTest extends TestCase $envelope = $this->connection->get(); $this->assertEquals('the body', $envelope['body']); } + + private function formatDateTime(\DateTime $dateTime) + { + return $dateTime->format($this->driverConnection->getDatabasePlatform()->getDateTimeFormatString()); + } } diff --git a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php index 2c8bcd35cc..f665e8b31c 100644 --- a/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php @@ -124,8 +124,14 @@ class Connection $body, json_encode($headers), $this->configuration['queue_name'], - self::formatDateTime($now), - self::formatDateTime($availableAt), + $now, + $availableAt, + ], [ + null, + null, + null, + Type::DATETIME, + Type::DATETIME, ]); return $this->driverConnection->lastInsertId(); @@ -143,7 +149,8 @@ class Connection // use SELECT ... FOR UPDATE to lock table $doctrineEnvelope = $this->executeQuery( $query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(), - $query->getParameters() + $query->getParameters(), + $query->getParameterTypes() )->fetch(); if (false === $doctrineEnvelope) { @@ -160,8 +167,10 @@ class Connection ->where('id = ?'); $now = new \DateTime(); $this->executeQuery($queryBuilder->getSQL(), [ - self::formatDateTime($now), + $now, $doctrineEnvelope['id'], + ], [ + Type::DATETIME, ]); $this->driverConnection->commit(); @@ -228,7 +237,7 @@ class Connection ->select('COUNT(m.id) as message_count') ->setMaxResults(1); - return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn(); + return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchColumn(); } public function findAll(int $limit = null): array @@ -238,7 +247,7 @@ class Connection $queryBuilder->setMaxResults($limit); } - $data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll(); + $data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchAll(); return array_map(function ($doctrineEnvelope) { return $this->decodeEnvelopeHeaders($doctrineEnvelope); @@ -267,9 +276,12 @@ class Connection ->andWhere('m.available_at <= ?') ->andWhere('m.queue_name = ?') ->setParameters([ - self::formatDateTime($redeliverLimit), - self::formatDateTime($now), + $redeliverLimit, + $now, $this->configuration['queue_name'], + ], [ + Type::DATETIME, + Type::DATETIME, ]); } @@ -280,12 +292,10 @@ class Connection ->from($this->configuration['table_name'], 'm'); } - private function executeQuery(string $sql, array $parameters = []) + private function executeQuery(string $sql, array $parameters = [], array $types = []) { - $stmt = null; try { - $stmt = $this->driverConnection->prepare($sql); - $stmt->execute($parameters); + $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types); } catch (TableNotFoundException $e) { if ($this->driverConnection->isTransactionActive()) { throw $e; @@ -295,11 +305,7 @@ class Connection if ($this->autoSetup) { $this->setup(); } - // statement not prepared ? SQLite throw on exception on prepare if the table does not exist - if (null === $stmt) { - $stmt = $this->driverConnection->prepare($sql); - } - $stmt->execute($parameters); + $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types); } return $stmt; @@ -332,11 +338,6 @@ class Connection return $schema; } - public static function formatDateTime(\DateTimeInterface $dateTime) - { - return $dateTime->format('Y-m-d\TH:i:s'); - } - private function decodeEnvelopeHeaders(array $doctrineEnvelope): array { $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);