[Messenger] InMemoryTransport handle acknowledged and rejected messages

This commit is contained in:
tien.xuan.vo 2019-07-27 20:40:43 +07:00
parent 3522590290
commit 71e7bdff22
3 changed files with 37 additions and 3 deletions

View File

@ -7,6 +7,7 @@ CHANGELOG
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
* `InMemoryTransport` handle acknowledged and rejected messages.
4.3.0
-----

View File

@ -34,7 +34,20 @@ class InMemoryTransportTest extends TestCase
{
$envelope = new Envelope(new \stdClass());
$this->transport->send($envelope);
$this->assertSame([$envelope], $this->transport->get());
$this->assertSame([$envelope], $this->transport->getSent());
}
public function testQueue()
{
$envelope1 = new Envelope(new \stdClass());
$this->transport->send($envelope1);
$envelope2 = new Envelope(new \stdClass());
$this->transport->send($envelope2);
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
$this->transport->ack($envelope1);
$this->assertSame([$envelope2], $this->transport->get());
$this->transport->reject($envelope2);
$this->assertSame([], $this->transport->get());
}
public function testAck()
@ -63,5 +76,6 @@ class InMemoryTransportTest extends TestCase
$this->assertEmpty($this->transport->get(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
$this->assertEmpty($this->transport->getSent(), 'Should be empty after reset');
}
}

View File

@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface
*/
private $rejected = [];
/**
* @var Envelope[]
*/
private $queue = [];
/**
* {@inheritdoc}
*/
public function get(): iterable
{
return $this->sent;
return array_values($this->queue);
}
/**
@ -50,6 +55,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function ack(Envelope $envelope): void
{
$this->acknowledged[] = $envelope;
$id = spl_object_hash($envelope);
unset($this->queue[$id]);
}
/**
@ -58,6 +65,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function reject(Envelope $envelope): void
{
$this->rejected[] = $envelope;
$id = spl_object_hash($envelope);
unset($this->queue[$id]);
}
/**
@ -66,13 +75,15 @@ class InMemoryTransport implements TransportInterface, ResetInterface
public function send(Envelope $envelope): Envelope
{
$this->sent[] = $envelope;
$id = spl_object_hash($envelope);
$this->queue[$id] = $envelope;
return $envelope;
}
public function reset()
{
$this->sent = $this->rejected = $this->acknowledged = [];
$this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
}
/**
@ -90,4 +101,12 @@ class InMemoryTransport implements TransportInterface, ResetInterface
{
return $this->rejected;
}
/**
* @return Envelope[]
*/
public function getSent(): array
{
return $this->sent;
}
}