[Messenger] InMemoryTransport handle acknowledged and rejected messages
This commit is contained in:
parent
3522590290
commit
71e7bdff22
@ -7,6 +7,7 @@ CHANGELOG
|
|||||||
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
|
||||||
pass a `RoutableMessageBus` instance instead.
|
pass a `RoutableMessageBus` instance instead.
|
||||||
* Added support for auto trimming of Redis streams.
|
* Added support for auto trimming of Redis streams.
|
||||||
|
* `InMemoryTransport` handle acknowledged and rejected messages.
|
||||||
|
|
||||||
4.3.0
|
4.3.0
|
||||||
-----
|
-----
|
||||||
|
@ -34,7 +34,20 @@ class InMemoryTransportTest extends TestCase
|
|||||||
{
|
{
|
||||||
$envelope = new Envelope(new \stdClass());
|
$envelope = new Envelope(new \stdClass());
|
||||||
$this->transport->send($envelope);
|
$this->transport->send($envelope);
|
||||||
$this->assertSame([$envelope], $this->transport->get());
|
$this->assertSame([$envelope], $this->transport->getSent());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testQueue()
|
||||||
|
{
|
||||||
|
$envelope1 = new Envelope(new \stdClass());
|
||||||
|
$this->transport->send($envelope1);
|
||||||
|
$envelope2 = new Envelope(new \stdClass());
|
||||||
|
$this->transport->send($envelope2);
|
||||||
|
$this->assertSame([$envelope1, $envelope2], $this->transport->get());
|
||||||
|
$this->transport->ack($envelope1);
|
||||||
|
$this->assertSame([$envelope2], $this->transport->get());
|
||||||
|
$this->transport->reject($envelope2);
|
||||||
|
$this->assertSame([], $this->transport->get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testAck()
|
public function testAck()
|
||||||
@ -63,5 +76,6 @@ class InMemoryTransportTest extends TestCase
|
|||||||
$this->assertEmpty($this->transport->get(), 'Should be empty after reset');
|
$this->assertEmpty($this->transport->get(), 'Should be empty after reset');
|
||||||
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
|
$this->assertEmpty($this->transport->getAcknowledged(), 'Should be empty after reset');
|
||||||
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
|
$this->assertEmpty($this->transport->getRejected(), 'Should be empty after reset');
|
||||||
|
$this->assertEmpty($this->transport->getSent(), 'Should be empty after reset');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,12 +36,17 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
*/
|
*/
|
||||||
private $rejected = [];
|
private $rejected = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var Envelope[]
|
||||||
|
*/
|
||||||
|
private $queue = [];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
public function get(): iterable
|
public function get(): iterable
|
||||||
{
|
{
|
||||||
return $this->sent;
|
return array_values($this->queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -50,6 +55,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
public function ack(Envelope $envelope): void
|
public function ack(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
$this->acknowledged[] = $envelope;
|
$this->acknowledged[] = $envelope;
|
||||||
|
$id = spl_object_hash($envelope);
|
||||||
|
unset($this->queue[$id]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -58,6 +65,8 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
public function reject(Envelope $envelope): void
|
public function reject(Envelope $envelope): void
|
||||||
{
|
{
|
||||||
$this->rejected[] = $envelope;
|
$this->rejected[] = $envelope;
|
||||||
|
$id = spl_object_hash($envelope);
|
||||||
|
unset($this->queue[$id]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -66,13 +75,15 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
public function send(Envelope $envelope): Envelope
|
public function send(Envelope $envelope): Envelope
|
||||||
{
|
{
|
||||||
$this->sent[] = $envelope;
|
$this->sent[] = $envelope;
|
||||||
|
$id = spl_object_hash($envelope);
|
||||||
|
$this->queue[$id] = $envelope;
|
||||||
|
|
||||||
return $envelope;
|
return $envelope;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function reset()
|
public function reset()
|
||||||
{
|
{
|
||||||
$this->sent = $this->rejected = $this->acknowledged = [];
|
$this->sent = $this->queue = $this->rejected = $this->acknowledged = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -90,4 +101,12 @@ class InMemoryTransport implements TransportInterface, ResetInterface
|
|||||||
{
|
{
|
||||||
return $this->rejected;
|
return $this->rejected;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Envelope[]
|
||||||
|
*/
|
||||||
|
public function getSent(): array
|
||||||
|
{
|
||||||
|
return $this->sent;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user