[Messenger] InMemoryTransport handle acknowledged and rejected messages

This commit is contained in:
tien.xuan.vo 2019-07-27 20:40:43 +07:00
parent 3522590290
commit 71e7bdff22
3 changed files with 37 additions and 3 deletions

View File

@ -7,6 +7,7 @@ CHANGELOG
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor, * Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead. pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams. * Added support for auto trimming of Redis streams.
* `InMemoryTransport` handle acknowledged and rejected messages.
4.3.0 4.3.0
----- -----

View File

@ -34,7 +34,20 @@ class InMemoryTransportTest extends TestCase
{ {
$envelope = new Envelope(new \stdClass()); $envelope = new Envelope(new \stdClass());
$this->transport->send($envelope); $this->transport->send($envelope);
$this->assertSame([$envelope], $this->transport->get()); $this->assertSame([$envelope], $this->transport->getSent());
}
public function testQueue()
{
$envelope1 = new Envelope(new \stdClass());
$this->transport->send($envelope1);
$envelope2 = new Envelope(new \stdClass());
$this->transport->send($envelope2);
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
$this->transport->ack($envelope1);
$this->assertSame([$envelope2], $this->transport->get());
$this->transport->reject($envelope2);
$this->assertSame([], $this->transport->get());
} }
public function testAck() public function testAck()
@ -63,5 +76,6 @@ class InMemoryTransportTest extends TestCase
$this->assertEmpty($this->transport->get(), 'Should be empty after reset'); $this->assertEmpty($this->transport->get(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset'); $this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset'); $this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getSent(), 'Should be empty after reset');
} }
} }

View File

@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface
*/ */
private $rejected = []; private $rejected = [];
/**
* @var Envelope[]
*/
private $queue = [];
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function get(): iterable public function get(): iterable
{ {
return $this->sent; return array_values($this->queue);
} }
/** /**
@ -50,6 +55,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function ack(Envelope $envelope): void public function ack(Envelope $envelope): void
{ {
$this->acknowledged[] = $envelope; $this->acknowledged[] = $envelope;
$id = spl_object_hash($envelope);
unset($this->queue[$id]);
} }
/** /**
@ -58,6 +65,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function reject(Envelope $envelope): void public function reject(Envelope $envelope): void
{ {
$this->rejected[] = $envelope; $this->rejected[] = $envelope;
$id = spl_object_hash($envelope);
unset($this->queue[$id]);
} }
/** /**
@ -66,13 +75,15 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function send(Envelope $envelope): Envelope public function send(Envelope $envelope): Envelope
{ {
$this->sent[] = $envelope; $this->sent[] = $envelope;
$id = spl_object_hash($envelope);
$this->queue[$id] = $envelope;
return $envelope; return $envelope;
} }
public function reset() public function reset()
{ {
$this->sent = $this->rejected = $this->acknowledged = []; $this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
} }
/** /**
@ -90,4 +101,12 @@ class InMemoryTransport implements TransportInterface, ResetInterface
{ {
return $this->rejected; return $this->rejected;
} }
/**
* @return Envelope[]
*/
public function getSent(): array
{
return $this->sent;
}
} }