bug #32456 [Messenger] use database platform to convert correctly the DateTime (roukmoute)
This PR was merged into the 4.3 branch.
Discussion
----------
[Messenger] use database platform to convert correctly the DateTime
| Q | A
| ------------- | ---
| Branch? | 4.3
| Bug fix? | yes
| New feature? | no
| BC breaks? | no
| Deprecations? | yes <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass? | yes
| Fixed tickets | https://github.com/symfony/symfony/issues/32427
| License | MIT
In Doctrine Messenger the method `\Symfony\Component\Messenger\Transport\Doctrine\Connection::formatDateTime()` is used to format dateTime into this: `Y-m-d\TH:i:s`.
But this is not supported in all databases platform.
Here we use the database platform to convert correctly the dateTime.
Commits
-------
cfa11561d1
Format DateTime depending on database platform
This commit is contained in:
commit
8bf8c50363
@ -44,8 +44,11 @@ class ConnectionTest extends TestCase
|
|||||||
$queryBuilder
|
$queryBuilder
|
||||||
->method('getParameters')
|
->method('getParameters')
|
||||||
->willReturn([]);
|
->willReturn([]);
|
||||||
|
$queryBuilder
|
||||||
|
->method('getParameterTypes')
|
||||||
|
->willReturn([]);
|
||||||
$driverConnection
|
$driverConnection
|
||||||
->method('prepare')
|
->method('executeQuery')
|
||||||
->willReturn($stmt);
|
->willReturn($stmt);
|
||||||
|
|
||||||
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
||||||
@ -65,13 +68,17 @@ class ConnectionTest extends TestCase
|
|||||||
$queryBuilder
|
$queryBuilder
|
||||||
->method('getParameters')
|
->method('getParameters')
|
||||||
->willReturn([]);
|
->willReturn([]);
|
||||||
|
$queryBuilder
|
||||||
|
->method('getParameterTypes')
|
||||||
|
->willReturn([]);
|
||||||
$driverConnection->expects($this->once())
|
$driverConnection->expects($this->once())
|
||||||
->method('createQueryBuilder')
|
->method('createQueryBuilder')
|
||||||
->willReturn($queryBuilder);
|
->willReturn($queryBuilder);
|
||||||
$driverConnection->method('prepare')
|
|
||||||
->willReturn($stmt);
|
|
||||||
$driverConnection->expects($this->never())
|
$driverConnection->expects($this->never())
|
||||||
->method('update');
|
->method('update');
|
||||||
|
$driverConnection
|
||||||
|
->method('executeQuery')
|
||||||
|
->willReturn($stmt);
|
||||||
|
|
||||||
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
||||||
$doctrineEnvelope = $connection->get();
|
$doctrineEnvelope = $connection->get();
|
||||||
@ -273,7 +280,7 @@ class ConnectionTest extends TestCase
|
|||||||
->method('getParameters')
|
->method('getParameters')
|
||||||
->willReturn([]);
|
->willReturn([]);
|
||||||
$driverConnection
|
$driverConnection
|
||||||
->method('prepare')
|
->method('executeQuery')
|
||||||
->willReturn($stmt);
|
->willReturn($stmt);
|
||||||
|
|
||||||
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
||||||
@ -316,8 +323,11 @@ class ConnectionTest extends TestCase
|
|||||||
$queryBuilder
|
$queryBuilder
|
||||||
->method('getParameters')
|
->method('getParameters')
|
||||||
->willReturn([]);
|
->willReturn([]);
|
||||||
|
$queryBuilder
|
||||||
|
->method('getParameterTypes')
|
||||||
|
->willReturn([]);
|
||||||
$driverConnection
|
$driverConnection
|
||||||
->method('prepare')
|
->method('executeQuery')
|
||||||
->willReturn($stmt);
|
->willReturn($stmt);
|
||||||
|
|
||||||
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
$connection = new Connection([], $driverConnection, $schemaSynchronizer);
|
||||||
|
@ -80,25 +80,25 @@ class DoctrineIntegrationTest extends TestCase
|
|||||||
'body' => '{"message": "Hi handled"}',
|
'body' => '{"message": "Hi handled"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'delivered_at' => Connection::formatDateTime(new \DateTime()),
|
'delivered_at' => $this->formatDateTime(new \DateTime()),
|
||||||
]);
|
]);
|
||||||
// one available later
|
// one available later
|
||||||
$this->driverConnection->insert('messenger_messages', [
|
$this->driverConnection->insert('messenger_messages', [
|
||||||
'body' => '{"message": "Hi delayed"}',
|
'body' => '{"message": "Hi delayed"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 13:00:00')),
|
||||||
]);
|
]);
|
||||||
// one available
|
// one available
|
||||||
$this->driverConnection->insert('messenger_messages', [
|
$this->driverConnection->insert('messenger_messages', [
|
||||||
'body' => '{"message": "Hi available"}',
|
'body' => '{"message": "Hi available"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$encoded = $this->connection->get();
|
$encoded = $this->connection->get();
|
||||||
@ -114,33 +114,33 @@ class DoctrineIntegrationTest extends TestCase
|
|||||||
'body' => '{"message": "Hi handled"}',
|
'body' => '{"message": "Hi handled"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'delivered_at' => Connection::formatDateTime(new \DateTime()),
|
'delivered_at' => $this->formatDateTime(new \DateTime()),
|
||||||
]);
|
]);
|
||||||
// one available later
|
// one available later
|
||||||
$this->driverConnection->insert('messenger_messages', [
|
$this->driverConnection->insert('messenger_messages', [
|
||||||
'body' => '{"message": "Hi delayed"}',
|
'body' => '{"message": "Hi delayed"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')),
|
'available_at' => $this->formatDateTime((new \DateTime())->modify('+1 minute')),
|
||||||
]);
|
]);
|
||||||
// one available
|
// one available
|
||||||
$this->driverConnection->insert('messenger_messages', [
|
$this->driverConnection->insert('messenger_messages', [
|
||||||
'body' => '{"message": "Hi available"}',
|
'body' => '{"message": "Hi available"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
||||||
]);
|
]);
|
||||||
// another available
|
// another available
|
||||||
$this->driverConnection->insert('messenger_messages', [
|
$this->driverConnection->insert('messenger_messages', [
|
||||||
'body' => '{"message": "Hi available"}',
|
'body' => '{"message": "Hi available"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$this->assertSame(2, $this->connection->getMessageCount());
|
$this->assertSame(2, $this->connection->getMessageCount());
|
||||||
@ -155,16 +155,16 @@ class DoctrineIntegrationTest extends TestCase
|
|||||||
'body' => '{"message": "Hi requeued"}',
|
'body' => '{"message": "Hi requeued"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
|
'delivered_at' => $this->formatDateTime($twoHoursAgo),
|
||||||
]);
|
]);
|
||||||
$this->driverConnection->insert('messenger_messages', [
|
$this->driverConnection->insert('messenger_messages', [
|
||||||
'body' => '{"message": "Hi available"}',
|
'body' => '{"message": "Hi available"}',
|
||||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||||
'queue_name' => 'default',
|
'queue_name' => 'default',
|
||||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
'created_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
'available_at' => $this->formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$next = $this->connection->get();
|
$next = $this->connection->get();
|
||||||
@ -181,4 +181,9 @@ class DoctrineIntegrationTest extends TestCase
|
|||||||
$envelope = $this->connection->get();
|
$envelope = $this->connection->get();
|
||||||
$this->assertEquals('the body', $envelope['body']);
|
$this->assertEquals('the body', $envelope['body']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function formatDateTime(\DateTime $dateTime)
|
||||||
|
{
|
||||||
|
return $dateTime->format($this->driverConnection->getDatabasePlatform()->getDateTimeFormatString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,8 +124,14 @@ class Connection
|
|||||||
$body,
|
$body,
|
||||||
json_encode($headers),
|
json_encode($headers),
|
||||||
$this->configuration['queue_name'],
|
$this->configuration['queue_name'],
|
||||||
self::formatDateTime($now),
|
$now,
|
||||||
self::formatDateTime($availableAt),
|
$availableAt,
|
||||||
|
], [
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
Type::DATETIME,
|
||||||
|
Type::DATETIME,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
return $this->driverConnection->lastInsertId();
|
return $this->driverConnection->lastInsertId();
|
||||||
@ -143,7 +149,8 @@ class Connection
|
|||||||
// use SELECT ... FOR UPDATE to lock table
|
// use SELECT ... FOR UPDATE to lock table
|
||||||
$doctrineEnvelope = $this->executeQuery(
|
$doctrineEnvelope = $this->executeQuery(
|
||||||
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
|
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
|
||||||
$query->getParameters()
|
$query->getParameters(),
|
||||||
|
$query->getParameterTypes()
|
||||||
)->fetch();
|
)->fetch();
|
||||||
|
|
||||||
if (false === $doctrineEnvelope) {
|
if (false === $doctrineEnvelope) {
|
||||||
@ -160,8 +167,10 @@ class Connection
|
|||||||
->where('id = ?');
|
->where('id = ?');
|
||||||
$now = new \DateTime();
|
$now = new \DateTime();
|
||||||
$this->executeQuery($queryBuilder->getSQL(), [
|
$this->executeQuery($queryBuilder->getSQL(), [
|
||||||
self::formatDateTime($now),
|
$now,
|
||||||
$doctrineEnvelope['id'],
|
$doctrineEnvelope['id'],
|
||||||
|
], [
|
||||||
|
Type::DATETIME,
|
||||||
]);
|
]);
|
||||||
|
|
||||||
$this->driverConnection->commit();
|
$this->driverConnection->commit();
|
||||||
@ -228,7 +237,7 @@ class Connection
|
|||||||
->select('COUNT(m.id) as message_count')
|
->select('COUNT(m.id) as message_count')
|
||||||
->setMaxResults(1);
|
->setMaxResults(1);
|
||||||
|
|
||||||
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
|
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchColumn();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function findAll(int $limit = null): array
|
public function findAll(int $limit = null): array
|
||||||
@ -238,7 +247,7 @@ class Connection
|
|||||||
$queryBuilder->setMaxResults($limit);
|
$queryBuilder->setMaxResults($limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
$data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchAll();
|
$data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchAll();
|
||||||
|
|
||||||
return array_map(function ($doctrineEnvelope) {
|
return array_map(function ($doctrineEnvelope) {
|
||||||
return $this->decodeEnvelopeHeaders($doctrineEnvelope);
|
return $this->decodeEnvelopeHeaders($doctrineEnvelope);
|
||||||
@ -267,9 +276,12 @@ class Connection
|
|||||||
->andWhere('m.available_at <= ?')
|
->andWhere('m.available_at <= ?')
|
||||||
->andWhere('m.queue_name = ?')
|
->andWhere('m.queue_name = ?')
|
||||||
->setParameters([
|
->setParameters([
|
||||||
self::formatDateTime($redeliverLimit),
|
$redeliverLimit,
|
||||||
self::formatDateTime($now),
|
$now,
|
||||||
$this->configuration['queue_name'],
|
$this->configuration['queue_name'],
|
||||||
|
], [
|
||||||
|
Type::DATETIME,
|
||||||
|
Type::DATETIME,
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,12 +292,10 @@ class Connection
|
|||||||
->from($this->configuration['table_name'], 'm');
|
->from($this->configuration['table_name'], 'm');
|
||||||
}
|
}
|
||||||
|
|
||||||
private function executeQuery(string $sql, array $parameters = [])
|
private function executeQuery(string $sql, array $parameters = [], array $types = [])
|
||||||
{
|
{
|
||||||
$stmt = null;
|
|
||||||
try {
|
try {
|
||||||
$stmt = $this->driverConnection->prepare($sql);
|
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
|
||||||
$stmt->execute($parameters);
|
|
||||||
} catch (TableNotFoundException $e) {
|
} catch (TableNotFoundException $e) {
|
||||||
if ($this->driverConnection->isTransactionActive()) {
|
if ($this->driverConnection->isTransactionActive()) {
|
||||||
throw $e;
|
throw $e;
|
||||||
@ -295,11 +305,7 @@ class Connection
|
|||||||
if ($this->autoSetup) {
|
if ($this->autoSetup) {
|
||||||
$this->setup();
|
$this->setup();
|
||||||
}
|
}
|
||||||
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
|
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
|
||||||
if (null === $stmt) {
|
|
||||||
$stmt = $this->driverConnection->prepare($sql);
|
|
||||||
}
|
|
||||||
$stmt->execute($parameters);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return $stmt;
|
return $stmt;
|
||||||
@ -332,11 +338,6 @@ class Connection
|
|||||||
return $schema;
|
return $schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function formatDateTime(\DateTimeInterface $dateTime)
|
|
||||||
{
|
|
||||||
return $dateTime->format('Y-m-d\TH:i:s');
|
|
||||||
}
|
|
||||||
|
|
||||||
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
|
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
|
||||||
{
|
{
|
||||||
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
|
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
|
||||||
|
Reference in New Issue
Block a user