feature #30757 [Messenger] Adding MessageCountAwareInterface to get transport message count (weaverryan)

This PR was squashed before being merged into the 4.3-dev branch (closes #30757).

Discussion
----------

[Messenger] Adding MessageCountAwareInterface to get transport message count

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | none
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

This adds a new optional interface that receivers should implement to give an approximate number of the messages "waiting" to be handled. Why? Because, with this, you could design a system that dynamically adds/removes worker processes if a specific transport is getting slammed and needs help. Creating that system could be something we discuss for core later, but this at least makes it possible - and means it could be implemented by the user or in a bundle... which I might do if we don't get it in core ;).

Commits
-------

fc5b0cf570 [Messenger] Adding MessageCountAwareInterface to get transport message count
This commit is contained in:
Fabien Potencier 2019-04-03 16:45:30 +02:00
commit 574097fd6a
10 changed files with 164 additions and 16 deletions

View File

@ -4,6 +4,8 @@ CHANGELOG
4.3.0
-----
* Added optional `MessageCountAwareInterface` that receivers can implement
to give information about how many messages are waiting to be processed.
* [BC BREAK] The `Envelope::__construct()` signature changed:
you can no longer pass an unlimited number of stamps as the second,
third, fourth, arguments etc: stamps are now an array passed to the

View File

@ -167,6 +167,24 @@ TXT
, $process->getOutput());
}
public function testItCountsMessagesInQueue()
{
$serializer = $this->createSerializer();
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
$connection->setup();
$connection->queue()->purge();
$sender = new AmqpSender($connection, $serializer);
$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));
$sender->send($second = new Envelope(new DummyMessage('Third')));
sleep(1); // give amqp a moment to have the messages ready
$this->assertSame(3, $connection->countMessagesInQueue());
}
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;

View File

@ -37,6 +37,9 @@ class ConnectionTest extends TestCase
$queryBuilder
->method('getSQL')
->willReturn('');
$queryBuilder
->method('getParameters')
->willReturn([]);
$driverConnection
->method('prepare')
->willReturn($stmt);
@ -54,6 +57,9 @@ class ConnectionTest extends TestCase
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getStatementMock(false);
$queryBuilder
->method('getParameters')
->willReturn([]);
$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
@ -119,6 +125,7 @@ class ConnectionTest extends TestCase
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
$queryBuilder->method('setParameters')->willReturn($queryBuilder);
return $queryBuilder;
}

View File

@ -100,6 +100,46 @@ class DoctrineIntegrationTest extends TestCase
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
}
public function testItCountMessages()
{
// insert messages
// one currently handled
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
]);
// one available later
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi delayed"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime((new \DateTime())->modify('+1 minute')),
]);
// one available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
// another available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
$this->assertSame(2, $this->connection->getMessageCount());
}
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
{
$twoHoursAgo = new \DateTime('now');

View File

@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -26,7 +27,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
*
* @experimental in 4.2
*/
class AmqpReceiver implements ReceiverInterface
class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
@ -87,6 +88,14 @@ class AmqpReceiver implements ReceiverInterface
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return $this->connection->countMessagesInQueue();
}
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
{
try {

View File

@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
@ -22,7 +23,7 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
*
* @experimental in 4.2
*/
class AmqpTransport implements TransportInterface, SetupableTransportInterface
class AmqpTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
@ -75,6 +76,14 @@ class AmqpTransport implements TransportInterface, SetupableTransportInterface
$this->connection->setup();
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return ($this->receiver ?? $this->getReceiver())->getMessageCount();
}
private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);

View File

@ -184,6 +184,14 @@ class Connection
$this->exchange()->publish($body, $this->queueConfiguration['routing_key'] ?? null, $flags, $attributes);
}
/**
* Returns an approximate count of the messages in a queue.
*/
public function countMessagesInQueue(): int
{
return $this->queue()->declareQueue();
}
/**
* @throws \AMQPException
*/

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
@ -128,25 +129,14 @@ class Connection
{
$this->driverConnection->beginTransaction();
try {
$query = $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm')
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
$query = $this->createAvailableMessagesQueryBuilder()
->orderBy('available_at', 'ASC')
->setMaxResults(1);
$now = \DateTime::createFromFormat('U.u', microtime(true));
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
// use SELECT ... FOR UPDATE to lock table
$doctrineEnvelope = $this->executeQuery(
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
[
':now' => self::formatDateTime($now),
':queue_name' => $this->configuration['queue_name'],
':redeliver_limit' => self::formatDateTime($redeliverLimit),
]
$query->getParameters()
)->fetch();
if (false === $doctrineEnvelope) {
@ -161,6 +151,7 @@ class Connection
->update($this->configuration['table_name'])
->set('delivered_at', ':delivered_at')
->where('id = :id');
$now = \DateTime::createFromFormat('U.u', microtime(true));
$this->executeQuery($queryBuilder->getSQL(), [
':id' => $doctrineEnvelope['id'],
':delivered_at' => self::formatDateTime($now),
@ -200,6 +191,33 @@ class Connection
$synchronizer->updateSchema($this->getSchema(), true);
}
public function getMessageCount(): int
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
->select('COUNT(m.id) as message_count')
->setMaxResults(1);
return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters())->fetchColumn();
}
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = \DateTime::createFromFormat('U.u', microtime(true));
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
return $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm')
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
->setParameters([
':now' => self::formatDateTime($now),
':queue_name' => $this->configuration['queue_name'],
':redeliver_limit' => self::formatDateTime($redeliverLimit),
]);
}
private function executeQuery(string $sql, array $parameters = [])
{
$stmt = null;

View File

@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@ -25,7 +26,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
*
* @experimental in 4.3
*/
class DoctrineReceiver implements ReceiverInterface
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface
{
private $connection;
private $serializer;
@ -81,6 +82,14 @@ class DoctrineReceiver implements ReceiverInterface
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
}
/**
* {@inheritdoc}
*/
public function getMessageCount(): int
{
return $this->connection->getMessageCount();
}
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
{
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */

View File

@ -0,0 +1,28 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Receiver;
/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
interface MessageCountAwareInterface
{
/**
* Returns the number of messages waiting to be handled.
*
* In some systems, this may be an approximate number.
*/
public function getMessageCount(): int;
}