diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 6df7ddcd1f..f5a51a3468 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -7,6 +7,7 @@ CHANGELOG * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, pass a `RoutableMessageBus` instance instead. * Added support for auto trimming of Redis streams. + * `InMemoryTransport` handle acknowledged and rejected messages. 4.3.0 ----- diff --git a/src/Symfony/Component/Messenger/Tests/Transport/InMemoryTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/InMemoryTransportTest.php index 2d37e624d3..22149f8a39 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/InMemoryTransportTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/InMemoryTransportTest.php @@ -34,7 +34,20 @@ class InMemoryTransportTest extends TestCase { $envelope = new Envelope(new \stdClass()); $this->transport->send($envelope); - $this->assertSame([$envelope], $this->transport->get()); + $this->assertSame([$envelope], $this->transport->getSent()); + } + + public function testQueue() + { + $envelope1 = new Envelope(new \stdClass()); + $this->transport->send($envelope1); + $envelope2 = new Envelope(new \stdClass()); + $this->transport->send($envelope2); + $this->assertSame([$envelope1, $envelope2], $this->transport->get()); + $this->transport->ack($envelope1); + $this->assertSame([$envelope2], $this->transport->get()); + $this->transport->reject($envelope2); + $this->assertSame([], $this->transport->get()); } public function testAck() @@ -63,5 +76,6 @@ class InMemoryTransportTest extends TestCase $this->assertEmpty($this->transport->get(), 'Should be empty after reset'); $this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset'); $this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset'); + $this->assertEmpty($this->transport->getSent(), 'Should be empty after reset'); } } diff --git a/src/Symfony/Component/Messenger/Transport/InMemoryTransport.php b/src/Symfony/Component/Messenger/Transport/InMemoryTransport.php index 1f458b68da..354bb601a1 100644 --- a/src/Symfony/Component/Messenger/Transport/InMemoryTransport.php +++ b/src/Symfony/Component/Messenger/Transport/InMemoryTransport.php @@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface */ private $rejected = []; + /** + * @var Envelope[] + */ + private $queue = []; + /** * {@inheritdoc} */ public function get(): iterable { - return $this->sent; + return array_values($this->queue); } /** @@ -50,6 +55,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface public function ack(Envelope $envelope): void { $this->acknowledged[] = $envelope; + $id = spl_object_hash($envelope); + unset($this->queue[$id]); } /** @@ -58,6 +65,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface public function reject(Envelope $envelope): void { $this->rejected[] = $envelope; + $id = spl_object_hash($envelope); + unset($this->queue[$id]); } /** @@ -66,13 +75,15 @@ class InMemoryTransport implements TransportInterface, ResetInterface public function send(Envelope $envelope): Envelope { $this->sent[] = $envelope; + $id = spl_object_hash($envelope); + $this->queue[$id] = $envelope; return $envelope; } public function reset() { - $this->sent = $this->rejected = $this->acknowledged = []; + $this->sent = $this->queue = $this->rejected = $this->acknowledged = []; } /** @@ -90,4 +101,12 @@ class InMemoryTransport implements TransportInterface, ResetInterface { return $this->rejected; } + + /** + * @return Envelope[] + */ + public function getSent(): array + { + return $this->sent; + } }