feature #29007 [Messenger] Add a Doctrine transport (vincenttouzet)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Add a Doctrine transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10616
| DoctrineBundle PR | doctrine/DoctrineBundle#868

As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.

Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).

# How it works

The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868

## Configuration

To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)
```yml
        # config/packages/messenger.yaml
        framework:
            messenger:
                transports:
                    my_transport: "doctrine://default?queue=important"
```

## Table schema

Dispatched messages are stored into a database table with the following schema:

| Column       | Type     | Options                  | Description                                                       |
|--------------|----------|--------------------------|-------------------------------------------------------------------|
| id           | bigint   | AUTO_INCREMENT, NOT NULL | Primary key                                                       |
| body         | text     | NOT NULL                 | Body of the message                                               |
| headers      | text     | NOT NULL                 | Headers of the message                                            |
| queue      | varchar(32)     | NOT NULL                 | Headers of the message                                            |
| created_at   | datetime | NOT NULL                 | When the message was inserted onto the table. (automatically set) |
| available_at       | datetime   | NOT NULL                 | When the message is available to be handled                      |
| delivered_at | datetime | NULL                     | When the message was delivered to a worker                        |

## Message dispatching

When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`

## Message consuming

The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.

### Getting the next message

* Start a transaction
* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)
* Update the message in database to update the delivered_at columns
* Commit the transaction

### Handling the message

The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.

If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.

## Message requeueing

It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a  `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)

# TODO

- [x] Add tests
- [x] Create DOC PR
- [x] PR on doctrine-bundle for transport factory
- [x] Add a `available_at` column
- [x] Add a `queue` column
- [x] Implement the retry functionnality : See #30557
- [x] Rebase after #29476

Commits
-------

88d008c828 [Messenger] Add a Doctrine transport
This commit is contained in:
Samuel ROZE 2019-03-31 17:25:18 +01:00
commit d9e2732a7d
14 changed files with 1147 additions and 1 deletions

View File

@ -59,8 +59,8 @@ CHANGELOG
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto_setup` connection option to control this.
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
4.2.0
-----

View File

@ -0,0 +1,214 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
class ConnectionTest extends TestCase
{
public function testGetAMessageWillChangeItsStatus()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getStatementMock([
'id' => 1,
'body' => '{"message":"Hi"}',
'headers' => \json_encode(['type' => DummyMessage::class]),
]);
$driverConnection
->method('createQueryBuilder')
->willReturn($queryBuilder);
$queryBuilder
->method('getSQL')
->willReturn('');
$driverConnection
->method('prepare')
->willReturn($stmt);
$connection = new Connection([], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertEquals(1, $doctrineEnvelope['id']);
$this->assertEquals('{"message":"Hi"}', $doctrineEnvelope['body']);
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelope['headers']);
}
public function testGetWithNoPendingMessageWillReturnNull()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();
$stmt = $this->getStatementMock(false);
$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);
$driverConnection->method('prepare')
->willReturn($stmt);
$driverConnection->expects($this->never())
->method('update');
$connection = new Connection([], $driverConnection);
$doctrineEnvelope = $connection->get();
$this->assertNull($doctrineEnvelope);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$driverConnection = $this->getDBALConnectionMock();
$driverConnection->method('delete')->willThrowException(new DBALException());
$connection = new Connection([], $driverConnection);
$connection->ack('dummy_id');
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$driverConnection = $this->getDBALConnectionMock();
$driverConnection->method('delete')->willThrowException(new DBALException());
$connection = new Connection([], $driverConnection);
$connection->reject('dummy_id');
}
private function getDBALConnectionMock()
{
$driverConnection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$platform = $this->getMockBuilder(AbstractPlatform::class)
->getMock();
$platform->method('getWriteLockSQL')->willReturn('FOR UPDATE');
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
return $driverConnection;
}
private function getQueryBuilderMock()
{
$queryBuilder = $this->getMockBuilder(QueryBuilder::class)
->disableOriginalConstructor()
->getMock();
$queryBuilder->method('select')->willReturn($queryBuilder);
$queryBuilder->method('update')->willReturn($queryBuilder);
$queryBuilder->method('from')->willReturn($queryBuilder);
$queryBuilder->method('set')->willReturn($queryBuilder);
$queryBuilder->method('where')->willReturn($queryBuilder);
$queryBuilder->method('andWhere')->willReturn($queryBuilder);
$queryBuilder->method('orderBy')->willReturn($queryBuilder);
$queryBuilder->method('setMaxResults')->willReturn($queryBuilder);
$queryBuilder->method('setParameter')->willReturn($queryBuilder);
return $queryBuilder;
}
private function getStatementMock($expectedResult)
{
$stmt = $this->getMockBuilder(Statement::class)
->disableOriginalConstructor()
->getMock();
$stmt->expects($this->once())
->method('fetch')
->willReturn($expectedResult);
return $stmt;
}
/**
* @dataProvider buildConfigurationProvider
*/
public function testBuildConfiguration($dsn, $options, $expectedManager, $expectedTableName, $expectedRedeliverTimeout, $expectedQueue)
{
$config = Connection::buildConfiguration($dsn, $options);
$this->assertEquals($expectedManager, $config['connection']);
$this->assertEquals($expectedTableName, $config['table_name']);
$this->assertEquals($expectedRedeliverTimeout, $config['redeliver_timeout']);
$this->assertEquals($expectedQueue, $config['queue_name']);
}
public function buildConfigurationProvider()
{
return [
[
'dsn' => 'doctrine://default',
'options' => [],
'expectedManager' => 'default',
'expectedTableName' => 'messenger_messages',
'expectedRedeliverTimeout' => 3600,
'expectedQueue' => 'default',
],
// test options from options array
[
'dsn' => 'doctrine://default',
'options' => [
'table_name' => 'name_from_options',
'redeliver_timeout' => 1800,
'queue_name' => 'important',
],
'expectedManager' => 'default',
'expectedTableName' => 'name_from_options',
'expectedRedeliverTimeout' => 1800,
'expectedQueue' => 'important',
],
// tests options from dsn
[
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
'options' => [],
'expectedManager' => 'default',
'expectedTableName' => 'name_from_dsn',
'expectedRedeliverTimeout' => 1200,
'expectedQueue' => 'normal',
],
// test options from options array wins over options from dsn
[
'dsn' => 'doctrine://default?table_name=name_from_dsn&redeliver_timeout=1200&queue_name=normal',
'options' => [
'table_name' => 'name_from_options',
'redeliver_timeout' => 1800,
'queue_name' => 'important',
],
'expectedManager' => 'default',
'expectedTableName' => 'name_from_options',
'expectedRedeliverTimeout' => 1800,
'expectedQueue' => 'important',
],
];
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsAnExceptionIfAnExtraOptionsInDefined()
{
Connection::buildConfiguration('doctrine://default', ['new_option' => 'woops']);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
{
Connection::buildConfiguration('doctrine://default?new_option=woops');
}
}

View File

@ -0,0 +1,127 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use Doctrine\DBAL\DriverManager;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
/**
* @requires pdo_mysql
*/
class DoctrineIntegrationTest extends TestCase
{
private $driverConnection;
private $connection;
protected function setUp()
{
parent::setUp();
if (!getenv('MESSENGER_DOCTRINE_DSN')) {
$this->markTestSkipped('The "MESSENGER_DOCTRINE_DSN" environment variable is required.');
}
$dsn = getenv('MESSENGER_DOCTRINE_DSN');
$this->driverConnection = DriverManager::getConnection(['url' => $dsn]);
$this->connection = new Connection([], $this->driverConnection);
// call send to auto-setup the table
$this->connection->setup();
// ensure the table is clean for tests
$this->driverConnection->exec('DELETE FROM messenger_messages');
}
public function testConnectionSendAndGet()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}
public function testSendWithDelay()
{
$this->connection->send('{"message": "Hi i am delayed"}', ['type' => DummyMessage::class], 600000);
$available_at = $this->driverConnection->createQueryBuilder()
->select('m.available_at')
->from('messenger_messages', 'm')
->where('m.body = :body')
->setParameter(':body', '{"message": "Hi i am delayed"}')
->execute()
->fetchColumn();
$available_at = new \DateTime($available_at);
$now = \DateTime::createFromFormat('U.u', microtime(true));
$now->modify('+60 seconds');
$this->assertGreaterThan($now, $available_at);
}
public function testItRetrieveTheFirstAvailableMessage()
{
// insert messages
// one currently handled
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi handled"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime(\DateTime::createFromFormat('U.u', microtime(true))),
]);
// one available later
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi delayed"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 13:00:00')),
]);
// one available
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
$encoded = $this->connection->get();
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
}
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
{
$twoHoursAgo = new \DateTime('now');
$twoHoursAgo->modify('-2 hours');
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi requeued"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'delivered_at' => Connection::formatDateTime($twoHoursAgo),
]);
$this->driverConnection->insert('messenger_messages', [
'body' => '{"message": "Hi available"}',
'headers' => json_encode(['type' => DummyMessage::class]),
'queue_name' => 'default',
'created_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:00:00')),
'available_at' => Connection::formatDateTime(new \DateTime('2019-03-15 12:30:00')),
]);
$next = $this->connection->get();
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
$this->connection->reject($next['id']);
}
}

View File

@ -0,0 +1,77 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
class DoctrineReceiverTest extends TestCase
{
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = $this->createSerializer();
$doctrineEnvelop = $this->createDoctrineEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($doctrineEnvelop);
$receiver = new DoctrineReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\MessageDecodingFailedException
*/
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
{
$serializer = $this->createMock(PhpSerializer::class);
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
$doctrineEnvelop = $this->createDoctrineEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($doctrineEnvelop);
$connection->expects($this->once())->method('reject');
$receiver = new DoctrineReceiver($connection, $serializer);
iterator_to_array($receiver->get());
}
private function createDoctrineEnvelope()
{
return [
'id' => 1,
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
];
}
private function createSerializer(): Serializer
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);
return $serializer;
}
}

View File

@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineSender;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class DoctrineSenderTest extends TestCase
{
public function testSend()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new DoctrineSender($connection, $serializer);
$sender->send($envelope);
}
public function testSendWithDelay()
{
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new DelayStamp(500));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500);
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
$sender = new DoctrineSender($connection, $serializer);
$sender->send($envelope);
}
}

View File

@ -0,0 +1,75 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
use PHPUnit\Framework\TestCase;
use Symfony\Bridge\Doctrine\RegistryInterface;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport;
use Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransportFactory;
class DoctrineTransportFactoryTest extends TestCase
{
public function testSupports()
{
$factory = new DoctrineTransportFactory(
$this->getMockBuilder(RegistryInterface::class)->getMock(),
null,
false
);
$this->assertTrue($factory->supports('doctrine://default', []));
$this->assertFalse($factory->supports('amqp://localhost', []));
}
public function testCreateTransport()
{
$connection = $this->getMockBuilder(\Doctrine\DBAL\Connection::class)
->disableOriginalConstructor()
->getMock();
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
$registry->expects($this->once())
->method('getConnection')
->willReturn($connection);
$factory = new DoctrineTransportFactory(
$registry,
null
);
$this->assertEquals(
new DoctrineTransport(new Connection(Connection::buildConfiguration('doctrine://default'), $connection), null),
$factory->createTransport('doctrine://default', [])
);
}
/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
* @expectedExceptionMessage Could not find Doctrine connection from DSN "doctrine://default".
*/
public function testCreateTransportMustThrowAnExceptionIfManagerIsNotFound()
{
$registry = $this->getMockBuilder(RegistryInterface::class)->getMock();
$registry->expects($this->once())
->method('getConnection')
->will($this->returnCallback(function () {
throw new \InvalidArgumentException();
}));
$factory = new DoctrineTransportFactory(
$registry,
null
);
$factory->createTransport('doctrine://default', []);
}
}

View File

@ -67,6 +67,9 @@ class AmqpReceiver implements ReceiverInterface
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
try {
@ -76,6 +79,9 @@ class AmqpReceiver implements ReceiverInterface
}
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));

View File

@ -0,0 +1,259 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @final
*
* @experimental in 4.3
*/
class Connection
{
const DEFAULT_OPTIONS = [
'table_name' => 'messenger_messages',
'queue_name' => 'default',
'redeliver_timeout' => 3600,
'loop_sleep' => 200000,
'auto_setup' => true,
];
/**
* Configuration of the connection.
*
* Available options:
*
* * table_name: name of the table
* * connection: name of the Doctrine's entity manager
* * queue_name: name of the queue
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
* * loop_sleep: Number of micro seconds to wait for a next message to handle
* * auto_setup: Whether the table should be created automatically during send / get. Default : true
*/
private $configuration = [];
private $driverConnection;
public function __construct(array $configuration, DBALConnection $driverConnection)
{
$this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
}
public function getConfiguration(): array
{
return $this->configuration;
}
public static function buildConfiguration($dsn, array $options = [])
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given Doctrine DSN "%s" is invalid.', $dsn));
}
$components = parse_url($dsn);
$query = [];
if (isset($components['query'])) {
parse_str($components['query'], $query);
}
$configuration = [
'connection' => $components['host'],
'table_name' => $options['table_name'] ?? ($query['table_name'] ?? self::DEFAULT_OPTIONS['table_name']),
'queue_name' => $options['queue_name'] ?? ($query['queue_name'] ?? self::DEFAULT_OPTIONS['queue_name']),
'redeliver_timeout' => $options['redeliver_timeout'] ?? ($query['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']),
'loop_sleep' => $options['loop_sleep'] ?? ($query['loop_sleep'] ?? self::DEFAULT_OPTIONS['loop_sleep']),
'auto_setup' => $options['auto_setup'] ?? ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
];
// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration));
if (0 < \count($optionsExtraKeys)) {
throw new TransportException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', self::DEFAULT_OPTIONS)));
}
// check for extra keys in options
$queryExtraKeys = array_diff(array_keys($query), array_keys($configuration));
if (0 < \count($queryExtraKeys)) {
throw new TransportException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', self::DEFAULT_OPTIONS)));
}
return $configuration;
}
/**
* @param int $delay The delay in milliseconds
*
* @throws \Doctrine\DBAL\DBALException
*/
public function send(string $body, array $headers, int $delay = 0): void
{
$now = (\DateTime::createFromFormat('U.u', microtime(true)));
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
$queryBuilder = $this->driverConnection->createQueryBuilder()
->insert($this->configuration['table_name'])
->values([
'body' => ':body',
'headers' => ':headers',
'queue_name' => ':queue_name',
'created_at' => ':created_at',
'available_at' => ':available_at',
]);
$this->executeQuery($queryBuilder->getSQL(), [
':body' => $body,
':headers' => \json_encode($headers),
':queue_name' => $this->configuration['queue_name'],
':created_at' => self::formatDateTime($now),
':available_at' => self::formatDateTime($availableAt),
]);
}
public function get(): ?array
{
$this->driverConnection->beginTransaction();
try {
$query = $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm')
->where('m.delivered_at is null OR m.delivered_at < :redeliver_limit')
->andWhere('m.available_at <= :now')
->andWhere('m.queue_name = :queue_name')
->orderBy('available_at', 'ASC')
->setMaxResults(1);
$now = \DateTime::createFromFormat('U.u', microtime(true));
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
// use SELECT ... FOR UPDATE to lock table
$doctrineEnvelope = $this->executeQuery(
$query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
[
':now' => self::formatDateTime($now),
':queue_name' => $this->configuration['queue_name'],
':redeliver_limit' => self::formatDateTime($redeliverLimit),
]
)->fetch();
if (false === $doctrineEnvelope) {
$this->driverConnection->commit();
return null;
}
$doctrineEnvelope['headers'] = \json_decode($doctrineEnvelope['headers'], true);
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', ':delivered_at')
->where('id = :id');
$this->executeQuery($queryBuilder->getSQL(), [
':id' => $doctrineEnvelope['id'],
':delivered_at' => self::formatDateTime($now),
]);
$this->driverConnection->commit();
return $doctrineEnvelope;
} catch (\Throwable $e) {
$this->driverConnection->rollBack();
throw $e;
}
}
public function ack(string $id): bool
{
try {
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function reject(string $id): bool
{
try {
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function setup(): void
{
$synchronizer = new SingleDatabaseSynchronizer($this->driverConnection);
$synchronizer->updateSchema($this->getSchema(), true);
}
private function executeQuery(string $sql, array $parameters = [])
{
$stmt = null;
try {
$stmt = $this->driverConnection->prepare($sql);
$stmt->execute($parameters);
} catch (TableNotFoundException $e) {
// create table
if (!$this->driverConnection->isTransactionActive() && $this->configuration['auto_setup']) {
$this->setup();
}
// statement not prepared ? SQLite throw on exception on prepare if the table does not exist
if (null === $stmt) {
$stmt = $this->driverConnection->prepare($sql);
}
$stmt->execute($parameters);
}
return $stmt;
}
private function getSchema(): Schema
{
$schema = new Schema();
$table = $schema->createTable($this->configuration['table_name']);
$table->addColumn('id', Type::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Type::TEXT)
->setNotnull(true);
$table->addColumn('headers', Type::STRING)
->setNotnull(true);
$table->addColumn('queue_name', Type::STRING)
->setNotnull(true);
$table->addColumn('created_at', Type::DATETIME)
->setNotnull(true);
$table->addColumn('available_at', Type::DATETIME)
->setNotnull(true);
$table->addColumn('delivered_at', Type::DATETIME)
->setNotnull(false);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);
return $schema;
}
public static function formatDateTime(\DateTimeInterface $dateTime)
{
return $dateTime->format('Y-m-d\TH:i:s.uZ');
}
}

View File

@ -0,0 +1,34 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Symfony\Component\Messenger\Stamp\StampInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @experimental in 4.3
*/
class DoctrineReceivedStamp implements StampInterface
{
private $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}

View File

@ -0,0 +1,95 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\DBALException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @experimental in 4.3
*/
class DoctrineReceiver implements ReceiverInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
try {
$doctrineEnvelope = $this->connection->get();
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
if (null === $doctrineEnvelope) {
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $doctrineEnvelope['body'],
'headers' => $doctrineEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($doctrineEnvelope['id']);
throw $exception;
}
yield $envelope->with(new DoctrineReceivedStamp($doctrineEnvelope['id']));
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
$this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
}
private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
{
/** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
$doctrineReceivedStamp = $envelope->last(DoctrineReceivedStamp::class);
if (null === $doctrineReceivedStamp) {
throw new LogicException('No DoctrineReceivedStamp found on the Envelope.');
}
return $doctrineReceivedStamp;
}
}

View File

@ -0,0 +1,57 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\DBALException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @experimental in 4.3
*/
class DoctrineSender implements SenderInterface
{
private $connection;
private $serializer;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);
/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
try {
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
return $envelope;
}
}

View File

@ -0,0 +1,87 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @experimental in 4.3
*/
class DoctrineTransport implements TransportInterface, SetupableTransportInterface
{
private $connection;
private $serializer;
private $receiver;
private $sender;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
$this->connection = $connection;
$this->serializer = $serializer ?? new PhpSerializer();
}
/**
* {@inheritdoc}
*/
public function get(): iterable
{
($this->receiver ?? $this->getReceiver())->get();
}
/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->ack($envelope);
}
/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
($this->receiver ?? $this->getReceiver())->reject($envelope);
}
/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
return ($this->sender ?? $this->getSender())->send($envelope);
}
/**
* {@inheritdoc}
*/
public function setup(): void
{
$this->connection->setup();
}
private function getReceiver(): DoctrineReceiver
{
return $this->receiver = new DoctrineReceiver($this->connection, $this->serializer);
}
private function getSender(): DoctrineSender
{
return $this->sender = new DoctrineSender($this->connection, $this->serializer);
}
}

View File

@ -0,0 +1,56 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Symfony\Bridge\Doctrine\RegistryInterface;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*
* @experimental in 4.3
*/
class DoctrineTransportFactory implements TransportFactoryInterface
{
private $registry;
private $serializer;
public function __construct(RegistryInterface $registry, SerializerInterface $serializer = null)
{
$this->registry = $registry;
$this->serializer = $serializer ?? new PhpSerializer();
}
public function createTransport(string $dsn, array $options): TransportInterface
{
$configuration = Connection::buildConfiguration($dsn, $options);
try {
$driverConnection = $this->registry->getConnection($configuration['connection']);
} catch (\InvalidArgumentException $e) {
throw new TransportException(sprintf('Could not find Doctrine connection from DSN "%s".', $dsn), 0, $e);
}
$connection = new Connection($configuration, $driverConnection);
return new DoctrineTransport($connection, $this->serializer);
}
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'doctrine://');
}
}

View File

@ -20,8 +20,10 @@
"psr/log": "~1.0"
},
"require-dev": {
"doctrine/dbal": "~2.4",
"symfony/console": "~3.4|~4.0",
"symfony/dependency-injection": "~3.4.19|^4.1.8",
"symfony/doctrine-bridge": "~3.4|~4.0",
"symfony/event-dispatcher": "~4.3",
"symfony/http-kernel": "~3.4|~4.0",
"symfony/process": "~3.4|~4.0",