feature #32783 [Messenger] InMemoryTransport handle acknowledged and rejected messages (tienvx)

This PR was merged into the 4.4 branch.

Discussion
----------

[Messenger] InMemoryTransport handle acknowledged and rejected messages

| Q             | A
| ------------- | ---
| Branch?       | 4.4 <!-- see below -->
| Bug fix?      | no
| New feature?  | yes <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | #...   <!-- #-prefixed issue number(s), if any -->
| License       | MIT
| Doc PR        | symfony/symfony-docs/pull/12045 <!-- required for new features -->

This PR do 2 things:
* Limit receiving messages from InMemoryTransport to 1 (reverted, another PR will fix the bug: worker does not stop when receiver return more messages than the number specify by the `--limit` option)
* Handle acknowledged and rejected messages in InMemoryTransport. Currently, it does not care about acknowledged and rejected messages. So it always return all messages that have been sent. So if we run console command `messenger:consume`, it will never stop, even though we set the `--limit` option.

For more information, please check the [reproduction](https://github.com/tienvx/symfony-messenger-in-memory-reproduction) project for the expected behavior.

See also my [messenger-memory-transport](https://github.com/tienvx/messenger-memory-transport) project

Commits
-------

71e7bdff22 [Messenger] InMemoryTransport handle acknowledged and rejected messages
This commit is contained in:
Fabien Potencier 2019-08-18 10:25:08 +02:00
commit ff63bb325d
3 changed files with 37 additions and 3 deletions

View File

@ -7,6 +7,7 @@ CHANGELOG
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
* `InMemoryTransport` handle acknowledged and rejected messages.
4.3.0
-----

View File

@ -34,7 +34,20 @@ class InMemoryTransportTest extends TestCase
{
$envelope = new Envelope(new \stdClass());
$this->transport->send($envelope);
$this->assertSame([$envelope], $this->transport->get());
$this->assertSame([$envelope], $this->transport->getSent());
}
public function testQueue()
{
$envelope1 = new Envelope(new \stdClass());
$this->transport->send($envelope1);
$envelope2 = new Envelope(new \stdClass());
$this->transport->send($envelope2);
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
$this->transport->ack($envelope1);
$this->assertSame([$envelope2], $this->transport->get());
$this->transport->reject($envelope2);
$this->assertSame([], $this->transport->get());
}
public function testAck()
@ -63,5 +76,6 @@ class InMemoryTransportTest extends TestCase
$this->assertEmpty($this->transport->get(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getSent(), 'Should be empty after reset');
}
}

View File

@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface
*/
private $rejected = [];
/**
* @var Envelope[]
*/
private $queue = [];
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return $this->sent;
return array_values($this->queue);
}
/**
@ -50,6 +55,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function ack(Envelope $envelope): void
{
$this->acknowledged[] = $envelope;
$id = spl_object_hash($envelope);
unset($this->queue[$id]);
}
/**
@ -58,6 +65,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function reject(Envelope $envelope): void
{
$this->rejected[] = $envelope;
$id = spl_object_hash($envelope);
unset($this->queue[$id]);
}
/**
@ -66,13 +75,15 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function send(Envelope $envelope): Envelope
{
$this->sent[] = $envelope;
$id = spl_object_hash($envelope);
$this->queue[$id] = $envelope;
return $envelope;
}
public function reset()
{
$this->sent = $this->rejected = $this->acknowledged = [];
$this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
}
/**
@ -90,4 +101,12 @@ class InMemoryTransport implements TransportInterface, ResetInterface
{
return $this->rejected;
}
/**
* @return Envelope[]
*/
public function getSent(): array
{
return $this->sent;
}
}