[Messenger] InMemoryTransport handle acknowledged and rejected messages
This commit is contained in:
parent
3522590290
commit
71e7bdff22
@ -7,6 +7,7 @@ CHANGELOG
|
||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||
pass a `RoutableMessageBus` instance instead.
|
||||
* Added support for auto trimming of Redis streams.
|
||||
* `InMemoryTransport` handle acknowledged and rejected messages.
|
||||
|
||||
4.3.0
|
||||
-----
|
||||
|
@ -34,7 +34,20 @@ class InMemoryTransportTest extends TestCase
|
||||
{
|
||||
$envelope = new Envelope(new \stdClass());
|
||||
$this->transport->send($envelope);
|
||||
$this->assertSame([$envelope], $this->transport->get());
|
||||
$this->assertSame([$envelope], $this->transport->getSent());
|
||||
}
|
||||
|
||||
public function testQueue()
|
||||
{
|
||||
$envelope1 = new Envelope(new \stdClass());
|
||||
$this->transport->send($envelope1);
|
||||
$envelope2 = new Envelope(new \stdClass());
|
||||
$this->transport->send($envelope2);
|
||||
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
|
||||
$this->transport->ack($envelope1);
|
||||
$this->assertSame([$envelope2], $this->transport->get());
|
||||
$this->transport->reject($envelope2);
|
||||
$this->assertSame([], $this->transport->get());
|
||||
}
|
||||
|
||||
public function testAck()
|
||||
@ -63,5 +76,6 @@ class InMemoryTransportTest extends TestCase
|
||||
$this->assertEmpty($this->transport->get(), 'Should be empty after reset');
|
||||
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
|
||||
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
|
||||
$this->assertEmpty($this->transport->getSent(), 'Should be empty after reset');
|
||||
}
|
||||
}
|
||||
|
@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
||||
*/
|
||||
private $rejected = [];
|
||||
|
||||
/**
|
||||
* @var Envelope[]
|
||||
*/
|
||||
private $queue = [];
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function get(): iterable
|
||||
{
|
||||
return $this->sent;
|
||||
return array_values($this->queue);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -50,6 +55,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
$this->acknowledged[] = $envelope;
|
||||
$id = spl_object_hash($envelope);
|
||||
unset($this->queue[$id]);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -58,6 +65,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
$this->rejected[] = $envelope;
|
||||
$id = spl_object_hash($envelope);
|
||||
unset($this->queue[$id]);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -66,13 +75,15 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
$this->sent[] = $envelope;
|
||||
$id = spl_object_hash($envelope);
|
||||
$this->queue[$id] = $envelope;
|
||||
|
||||
return $envelope;
|
||||
}
|
||||
|
||||
public function reset()
|
||||
{
|
||||
$this->sent = $this->rejected = $this->acknowledged = [];
|
||||
$this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -90,4 +101,12 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
||||
{
|
||||
return $this->rejected;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Envelope[]
|
||||
*/
|
||||
public function getSent(): array
|
||||
{
|
||||
return $this->sent;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user