[Messenger] Adding MessageCountAwareInterface to get transport message count
This commit is contained in:
parent
9bcea2e9f4
commit
fc5b0cf570
@ -4,6 +4,8 @@ CHANGELOG
|
||||
4.3.0
|
||||
-----
|
||||
|
||||
* Added optional `MessageCountAwareInterface` that receivers can implement
|
||||
to give information about how many messages are waiting to be processed.
|
||||
* [BC BREAK] The `Envelope::__construct()` signature changed:
|
||||
you can no longer pass an unlimited number of stamps as the second,
|
||||
third, fourth, arguments etc: stamps are now an array passed to the
|
||||
|
@ -167,6 +167,24 @@ TXT
|
||||
, $process->getOutput());
|
||||
}
|
||||
|
||||
public function testItCountsMessagesInQueue()
|
||||
{
|
||||
$serializer = $this->createSerializer();
|
||||
|
||||
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
|
||||
$connection->setup();
|
||||
$connection->queue()->purge();
|
||||
|
||||
$sender = new AmqpSender($connection, $serializer);
|
||||
|
||||
$sender->send($first = new Envelope(new DummyMessage('First')));
|
||||
$sender->send($second = new Envelope(new DummyMessage('Second')));
|
||||
$sender->send($second = new Envelope(new DummyMessage('Third')));
|
||||
|
||||
sleep(1); // give amqp a moment to have the messages ready
|
||||
$this->assertSame(3, $connection->countMessagesInQueue());
|
||||
}
|
||||
|
||||
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
|
||||
{
|
||||
$timedOutTime = time() + $timeoutInSeconds;
|
||||
|
@ -37,6 +37,9 @@ class ConnectionTest extends TestCase
|
||||
$queryBuilder
|
||||
->method('getSQL')
|
||||
->willReturn('');
|
||||
$queryBuilder
|
||||
->method('getParameters')
|
||||
->willReturn([]);
|
||||
$driverConnection
|
||||
->method('prepare')
|
||||
->willReturn($stmt);
|
||||
@ -54,6 +57,9 @@ class ConnectionTest extends TestCase
|
||||
$driverConnection = $this->getDBALConnectionMock();
|
||||
$stmt = $this->getStatementMock(false);
|
||||
|
||||
$queryBuilder
|
||||
->method('getParameters')
|
||||
->willReturn([]);
|
||||
$driverConnection->expects($this->once())
|
||||
->method('createQueryBuilder')
|
||||
->willReturn($queryBuilder);
|
||||
@ -119,6 +125,7 @@ class ConnectionTest extends TestCase
|
||||
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
|
||||
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
|
||||
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
|
||||
$queryBuilder->method('setParameters')->willReturn($queryBuilder);
|
||||
|
||||
return $queryBuilder;
|
||||
}
|
||||
|
@ -100,6 +100,46 @@ class DoctrineIntegrationTest extends TestCase
|
||||
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
|
||||
}
|
||||
|
||||
public function testItCountMessages()
|
||||
{
|
||||
// insert messages
|
||||
// one currently handled
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
'body' => '{"message": "Hi handled"}',
|
||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||
'queue_name' => 'default',
|
||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
|
||||
]);
|
||||
// one available later
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
'body' => '{"message": "Hi delayed"}',
|
||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||
'queue_name' => 'default',
|
||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||
'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')),
|
||||
]);
|
||||
// one available
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
'body' => '{"message": "Hi available"}',
|
||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||
'queue_name' => 'default',
|
||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
||||
]);
|
||||
// another available
|
||||
$this->driverConnection->insert('messenger_messages', [
|
||||
'body' => '{"message": "Hi available"}',
|
||||
'headers' => json_encode(['type' => DummyMessage::class]),
|
||||
'queue_name' => 'default',
|
||||
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
|
||||
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
|
||||
]);
|
||||
|
||||
$this->assertSame(2, $this->connection->getMessageCount());
|
||||
}
|
||||
|
||||
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
|
||||
{
|
||||
$twoHoursAgo = new \DateTime('now');
|
||||
|
@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
@ -26,7 +27,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
*
|
||||
* @experimental in 4.2
|
||||
*/
|
||||
class AmqpReceiver implements ReceiverInterface
|
||||
class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
{
|
||||
private $serializer;
|
||||
private $connection;
|
||||
@ -87,6 +88,14 @@ class AmqpReceiver implements ReceiverInterface
|
||||
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return $this->connection->countMessagesInQueue();
|
||||
}
|
||||
|
||||
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
|
||||
{
|
||||
try {
|
||||
|
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\Messenger\Transport\AmqpExt;
|
||||
|
||||
use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
|
||||
@ -22,7 +23,7 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
*
|
||||
* @experimental in 4.2
|
||||
*/
|
||||
class AmqpTransport implements TransportInterface, SetupableTransportInterface
|
||||
class AmqpTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
|
||||
{
|
||||
private $serializer;
|
||||
private $connection;
|
||||
@ -75,6 +76,14 @@ class AmqpTransport implements TransportInterface, SetupableTransportInterface
|
||||
$this->connection->setup();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
|
||||
}
|
||||
|
||||
private function getReceiver()
|
||||
{
|
||||
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
|
||||
|
@ -184,6 +184,14 @@ class Connection
|
||||
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an approximate count of the messages in a queue.
|
||||
*/
|
||||
public function countMessagesInQueue(): int
|
||||
{
|
||||
return $this->queue()->declareQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \AMQPException
|
||||
*/
|
||||
|
@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Transport\Doctrine;
|
||||
use Doctrine\DBAL\Connection as DBALConnection;
|
||||
use Doctrine\DBAL\DBALException;
|
||||
use Doctrine\DBAL\Exception\TableNotFoundException;
|
||||
use Doctrine\DBAL\Query\QueryBuilder;
|
||||
use Doctrine\DBAL\Schema\Schema;
|
||||
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
|
||||
use Doctrine\DBAL\Types\Type;
|
||||
@ -128,25 +129,14 @@ class Connection
|
||||
{
|
||||
$this->driverConnection->beginTransaction();
|
||||
try {
|
||||
$query = $this->driverConnection->createQueryBuilder()
|
||||
->select('m.*')
|
||||
->from($this->configuration['table_name'], 'm')
|
||||
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
|
||||
->andWhere('m.available_at <= :now')
|
||||
->andWhere('m.queue_name = :queue_name')
|
||||
$query = $this->createAvailableMessagesQueryBuilder()
|
||||
->orderBy('available_at', 'ASC')
|
||||
->setMaxResults(1);
|
||||
|
||||
$now = \DateTime::createFromFormat('U.u', microtime(true));
|
||||
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
|
||||
// use SELECT ... FOR UPDATE to lock table
|
||||
$doctrineEnvelope = $this->executeQuery(
|
||||
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
|
||||
[
|
||||
':now' => self::formatDateTime($now),
|
||||
':queue_name' => $this->configuration['queue_name'],
|
||||
':redeliver_limit' => self::formatDateTime($redeliverLimit),
|
||||
]
|
||||
$query->getParameters()
|
||||
)->fetch();
|
||||
|
||||
if (false === $doctrineEnvelope) {
|
||||
@ -161,6 +151,7 @@ class Connection
|
||||
->update($this->configuration['table_name'])
|
||||
->set('delivered_at', ':delivered_at')
|
||||
->where('id = :id');
|
||||
$now = \DateTime::createFromFormat('U.u', microtime(true));
|
||||
$this->executeQuery($queryBuilder->getSQL(), [
|
||||
':id' => $doctrineEnvelope['id'],
|
||||
':delivered_at' => self::formatDateTime($now),
|
||||
@ -200,6 +191,33 @@ class Connection
|
||||
$synchronizer->updateSchema($this->getSchema(), true);
|
||||
}
|
||||
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
|
||||
->select('COUNT(m.id) as message_count')
|
||||
->setMaxResults(1);
|
||||
|
||||
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
|
||||
}
|
||||
|
||||
private function createAvailableMessagesQueryBuilder(): QueryBuilder
|
||||
{
|
||||
$now = \DateTime::createFromFormat('U.u', microtime(true));
|
||||
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
|
||||
|
||||
return $this->driverConnection->createQueryBuilder()
|
||||
->select('m.*')
|
||||
->from($this->configuration['table_name'], 'm')
|
||||
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
|
||||
->andWhere('m.available_at <= :now')
|
||||
->andWhere('m.queue_name = :queue_name')
|
||||
->setParameters([
|
||||
':now' => self::formatDateTime($now),
|
||||
':queue_name' => $this->configuration['queue_name'],
|
||||
':redeliver_limit' => self::formatDateTime($redeliverLimit),
|
||||
]);
|
||||
}
|
||||
|
||||
private function executeQuery(string $sql, array $parameters = [])
|
||||
{
|
||||
$stmt = null;
|
||||
|
@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Envelope;
|
||||
use Symfony\Component\Messenger\Exception\LogicException;
|
||||
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
|
||||
use Symfony\Component\Messenger\Exception\TransportException;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
|
||||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
|
||||
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
@ -25,7 +26,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
class DoctrineReceiver implements ReceiverInterface
|
||||
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
|
||||
{
|
||||
private $connection;
|
||||
private $serializer;
|
||||
@ -81,6 +82,14 @@ class DoctrineReceiver implements ReceiverInterface
|
||||
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMessageCount(): int
|
||||
{
|
||||
return $this->connection->getMessageCount();
|
||||
}
|
||||
|
||||
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
|
||||
{
|
||||
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
|
||||
|
@ -0,0 +1,28 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Component\Messenger\Transport\Receiver;
|
||||
|
||||
/**
|
||||
* @author Samuel Roze <samuel.roze@gmail.com>
|
||||
* @author Ryan Weaver <ryan@symfonycasts.com>
|
||||
*
|
||||
* @experimental in 4.3
|
||||
*/
|
||||
interface MessageCountAwareInterface
|
||||
{
|
||||
/**
|
||||
* Returns the number of messages waiting to be handled.
|
||||
*
|
||||
* In some systems, this may be an approximate number.
|
||||
*/
|
||||
public function getMessageCount(): int;
|
||||
}
|
Reference in New Issue
Block a user