[Messenger] Allow InMemoryTransport to serialize message
This commit is contained in:
parent
2a453c2c89
commit
46a8007afc
@ -1,6 +1,11 @@
|
|||||||
CHANGELOG
|
CHANGELOG
|
||||||
=========
|
=========
|
||||||
|
|
||||||
|
5.3.0
|
||||||
|
-----
|
||||||
|
|
||||||
|
* `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`.
|
||||||
|
|
||||||
5.2.0
|
5.2.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
@ -49,6 +49,35 @@ class InMemoryTransportFactoryTest extends TestCase
|
|||||||
$this->assertInstanceOf(InMemoryTransport::class, $this->factory->createTransport('in-memory://', [], $serializer));
|
$this->assertInstanceOf(InMemoryTransport::class, $this->factory->createTransport('in-memory://', [], $serializer));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testCreateTransportWithoutSerializer()
|
||||||
|
{
|
||||||
|
/** @var SerializerInterface $serializer */
|
||||||
|
$serializer = $this->createMock(SerializerInterface::class);
|
||||||
|
$serializer
|
||||||
|
->expects($this->never())
|
||||||
|
->method('encode')
|
||||||
|
;
|
||||||
|
$transport = $this->factory->createTransport('in-memory://?serialize=false', [], $serializer);
|
||||||
|
$message = Envelope::wrap(new DummyMessage('Hello.'));
|
||||||
|
$transport->send($message);
|
||||||
|
|
||||||
|
$this->assertSame([$message], $transport->get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testCreateTransportWithSerializer()
|
||||||
|
{
|
||||||
|
/** @var SerializerInterface $serializer */
|
||||||
|
$serializer = $this->createMock(SerializerInterface::class);
|
||||||
|
$message = Envelope::wrap(new DummyMessage('Hello.'));
|
||||||
|
$serializer
|
||||||
|
->expects($this->once())
|
||||||
|
->method('encode')
|
||||||
|
->with($this->equalTo($message))
|
||||||
|
;
|
||||||
|
$transport = $this->factory->createTransport('in-memory://?serialize=true', [], $serializer);
|
||||||
|
$transport->send($message);
|
||||||
|
}
|
||||||
|
|
||||||
public function testResetCreatedTransports()
|
public function testResetCreatedTransports()
|
||||||
{
|
{
|
||||||
$transport = $this->factory->createTransport('in-memory://', [], $this->createMock(SerializerInterface::class));
|
$transport = $this->factory->createTransport('in-memory://', [], $this->createMock(SerializerInterface::class));
|
||||||
@ -63,6 +92,8 @@ class InMemoryTransportFactoryTest extends TestCase
|
|||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
'Supported' => ['in-memory://foo'],
|
'Supported' => ['in-memory://foo'],
|
||||||
|
'Serialize enabled' => ['in-memory://?serialize=true'],
|
||||||
|
'Serialize disabled' => ['in-memory://?serialize=false'],
|
||||||
'Unsupported' => ['amqp://bar', false],
|
'Unsupported' => ['amqp://bar', false],
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,9 @@ namespace Symfony\Component\Messenger\Tests\Transport;
|
|||||||
use PHPUnit\Framework\TestCase;
|
use PHPUnit\Framework\TestCase;
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
|
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
|
||||||
|
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
|
||||||
use Symfony\Component\Messenger\Transport\InMemoryTransport;
|
use Symfony\Component\Messenger\Transport\InMemoryTransport;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Gary PEGEOT <garypegeot@gmail.com>
|
* @author Gary PEGEOT <garypegeot@gmail.com>
|
||||||
@ -26,9 +28,21 @@ class InMemoryTransportTest extends TestCase
|
|||||||
*/
|
*/
|
||||||
private $transport;
|
private $transport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var InMemoryTransport
|
||||||
|
*/
|
||||||
|
private $serializeTransport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var SerializerInterface
|
||||||
|
*/
|
||||||
|
private $serializer;
|
||||||
|
|
||||||
protected function setUp(): void
|
protected function setUp(): void
|
||||||
{
|
{
|
||||||
|
$this->serializer = $this->createMock(SerializerInterface::class);
|
||||||
$this->transport = new InMemoryTransport();
|
$this->transport = new InMemoryTransport();
|
||||||
|
$this->serializeTransport = new InMemoryTransport($this->serializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testSend()
|
public function testSend()
|
||||||
@ -38,6 +52,24 @@ class InMemoryTransportTest extends TestCase
|
|||||||
$this->assertSame([$envelope], $this->transport->getSent());
|
$this->assertSame([$envelope], $this->transport->getSent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testSendWithSerialization()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
|
||||||
|
$this->serializer
|
||||||
|
->method('encode')
|
||||||
|
->with($this->equalTo($envelope))
|
||||||
|
->willReturn(['foo' => 'ba'])
|
||||||
|
;
|
||||||
|
$this->serializer
|
||||||
|
->method('decode')
|
||||||
|
->with(['foo' => 'ba'])
|
||||||
|
->willReturn($envelopeDecoded)
|
||||||
|
;
|
||||||
|
$this->serializeTransport->send($envelope);
|
||||||
|
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getSent());
|
||||||
|
}
|
||||||
|
|
||||||
public function testQueue()
|
public function testQueue()
|
||||||
{
|
{
|
||||||
$envelope1 = new Envelope(new \stdClass());
|
$envelope1 = new Envelope(new \stdClass());
|
||||||
@ -51,6 +83,24 @@ class InMemoryTransportTest extends TestCase
|
|||||||
$this->assertSame([], $this->transport->get());
|
$this->assertSame([], $this->transport->get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testQueueWithSerialization()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
|
||||||
|
$this->serializer
|
||||||
|
->method('encode')
|
||||||
|
->with($this->equalTo($envelope))
|
||||||
|
->willReturn(['foo' => 'ba'])
|
||||||
|
;
|
||||||
|
$this->serializer
|
||||||
|
->method('decode')
|
||||||
|
->with(['foo' => 'ba'])
|
||||||
|
->willReturn($envelopeDecoded)
|
||||||
|
;
|
||||||
|
$this->serializeTransport->send($envelope);
|
||||||
|
$this->assertSame([$envelopeDecoded], $this->serializeTransport->get());
|
||||||
|
}
|
||||||
|
|
||||||
public function testAcknowledgeSameMessageWithDifferentStamps()
|
public function testAcknowledgeSameMessageWithDifferentStamps()
|
||||||
{
|
{
|
||||||
$envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
|
$envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
|
||||||
@ -71,6 +121,24 @@ class InMemoryTransportTest extends TestCase
|
|||||||
$this->assertSame([$envelope], $this->transport->getAcknowledged());
|
$this->assertSame([$envelope], $this->transport->getAcknowledged());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testAckWithSerialization()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
|
||||||
|
$this->serializer
|
||||||
|
->method('encode')
|
||||||
|
->with($this->equalTo($envelope))
|
||||||
|
->willReturn(['foo' => 'ba'])
|
||||||
|
;
|
||||||
|
$this->serializer
|
||||||
|
->method('decode')
|
||||||
|
->with(['foo' => 'ba'])
|
||||||
|
->willReturn($envelopeDecoded)
|
||||||
|
;
|
||||||
|
$this->serializeTransport->ack($envelope);
|
||||||
|
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getAcknowledged());
|
||||||
|
}
|
||||||
|
|
||||||
public function testReject()
|
public function testReject()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
@ -78,6 +146,24 @@ class InMemoryTransportTest extends TestCase
|
|||||||
$this->assertSame([$envelope], $this->transport->getRejected());
|
$this->assertSame([$envelope], $this->transport->getRejected());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testRejectWithSerialization()
|
||||||
|
{
|
||||||
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
|
||||||
|
$this->serializer
|
||||||
|
->method('encode')
|
||||||
|
->with($this->equalTo($envelope))
|
||||||
|
->willReturn(['foo' => 'ba'])
|
||||||
|
;
|
||||||
|
$this->serializer
|
||||||
|
->method('decode')
|
||||||
|
->with(['foo' => 'ba'])
|
||||||
|
->willReturn($envelopeDecoded)
|
||||||
|
;
|
||||||
|
$this->serializeTransport->reject($envelope);
|
||||||
|
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getRejected());
|
||||||
|
}
|
||||||
|
|
||||||
public function testReset()
|
public function testReset()
|
||||||
{
|
{
|
||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
namespace Symfony\Component\Messenger\Transport;
|
namespace Symfony\Component\Messenger\Transport;
|
||||||
|
|
||||||
use Symfony\Component\Messenger\Envelope;
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
||||||
use Symfony\Contracts\Service\ResetInterface;
|
use Symfony\Contracts\Service\ResetInterface;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -41,12 +42,22 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
private $queue = [];
|
private $queue = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var SerializerInterface|null
|
||||||
|
*/
|
||||||
|
private $serializer;
|
||||||
|
|
||||||
|
public function __construct(SerializerInterface $serializer = null)
|
||||||
|
{
|
||||||
|
$this->serializer = $serializer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function get(): iterable
|
public function get(): iterable
|
||||||
{
|
{
|
||||||
return array_values($this->queue);
|
return array_values($this->decode($this->queue));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -54,7 +65,7 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
public function ack(Envelope $envelope): void
|
public function ack(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
$this->acknowledged[] = $envelope;
|
$this->acknowledged[] = $this->encode($envelope);
|
||||||
$id = spl_object_hash($envelope->getMessage());
|
$id = spl_object_hash($envelope->getMessage());
|
||||||
unset($this->queue[$id]);
|
unset($this->queue[$id]);
|
||||||
}
|
}
|
||||||
@ -64,7 +75,7 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
public function reject(Envelope $envelope): void
|
public function reject(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
$this->rejected[] = $envelope;
|
$this->rejected[] = $this->encode($envelope);
|
||||||
$id = spl_object_hash($envelope->getMessage());
|
$id = spl_object_hash($envelope->getMessage());
|
||||||
unset($this->queue[$id]);
|
unset($this->queue[$id]);
|
||||||
}
|
}
|
||||||
@ -74,9 +85,10 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
public function send(Envelope $envelope): Envelope
|
public function send(Envelope $envelope): Envelope
|
||||||
{
|
{
|
||||||
$this->sent[] = $envelope;
|
$encodedEnvelope = $this->encode($envelope);
|
||||||
|
$this->sent[] = $encodedEnvelope;
|
||||||
$id = spl_object_hash($envelope->getMessage());
|
$id = spl_object_hash($envelope->getMessage());
|
||||||
$this->queue[$id] = $envelope;
|
$this->queue[$id] = $encodedEnvelope;
|
||||||
|
|
||||||
return $envelope;
|
return $envelope;
|
||||||
}
|
}
|
||||||
@ -91,7 +103,7 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
public function getAcknowledged(): array
|
public function getAcknowledged(): array
|
||||||
{
|
{
|
||||||
return $this->acknowledged;
|
return $this->decode($this->acknowledged);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -99,7 +111,7 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
public function getRejected(): array
|
public function getRejected(): array
|
||||||
{
|
{
|
||||||
return $this->rejected;
|
return $this->decode($this->rejected);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -107,6 +119,35 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
public function getSent(): array
|
public function getSent(): array
|
||||||
{
|
{
|
||||||
return $this->sent;
|
return $this->decode($this->sent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Envelope|array
|
||||||
|
*/
|
||||||
|
private function encode(Envelope $envelope)
|
||||||
|
{
|
||||||
|
if (null === $this->serializer) {
|
||||||
|
return $envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $this->serializer->encode($envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param array<mixed> $messagesEncoded
|
||||||
|
*
|
||||||
|
* @return Envelope[]
|
||||||
|
*/
|
||||||
|
private function decode(array $messagesEncoded): array
|
||||||
|
{
|
||||||
|
if (null === $this->serializer) {
|
||||||
|
return $messagesEncoded;
|
||||||
|
}
|
||||||
|
|
||||||
|
return array_map(
|
||||||
|
[$this->serializer, 'decode'],
|
||||||
|
$messagesEncoded
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,9 @@ class InMemoryTransportFactory implements TransportFactoryInterface, ResetInterf
|
|||||||
|
|
||||||
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
||||||
{
|
{
|
||||||
return $this->createdTransports[] = new InMemoryTransport();
|
['serialize' => $serialize] = $this->parseDsn($dsn);
|
||||||
|
|
||||||
|
return $this->createdTransports[] = new InMemoryTransport($serialize ? $serializer : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function supports(string $dsn, array $options): bool
|
public function supports(string $dsn, array $options): bool
|
||||||
@ -40,4 +42,16 @@ class InMemoryTransportFactory implements TransportFactoryInterface, ResetInterf
|
|||||||
$transport->reset();
|
$transport->reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private function parseDsn(string $dsn): array
|
||||||
|
{
|
||||||
|
$query = [];
|
||||||
|
if ($queryAsString = strstr($dsn, '?')) {
|
||||||
|
parse_str(ltrim($queryAsString, '?'), $query);
|
||||||
|
}
|
||||||
|
|
||||||
|
return [
|
||||||
|
'serialize' => filter_var($query['serialize'] ?? false, \FILTER_VALIDATE_BOOLEAN),
|
||||||
|
];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user