[Messenger] Add option to prevent Redis from deleting messages on rejection
This commit is contained in:
parent
174d91c23b
commit
135c6504f1
@ -1,6 +1,12 @@
|
|||||||
CHANGELOG
|
CHANGELOG
|
||||||
=========
|
=========
|
||||||
|
|
||||||
|
5.2.0
|
||||||
|
-----
|
||||||
|
|
||||||
|
* Added a `delete_after_reject` option to the DSN to allow control over message
|
||||||
|
deletion, similar to `delete_after_ack`.
|
||||||
|
|
||||||
5.1.0
|
5.1.0
|
||||||
-----
|
-----
|
||||||
|
|
||||||
|
@ -346,6 +346,21 @@ class ConnectionTest extends TestCase
|
|||||||
$connection->ack('1');
|
$connection->ack('1');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testDeleteAfterReject()
|
||||||
|
{
|
||||||
|
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||||
|
|
||||||
|
$redis->expects($this->exactly(1))->method('xack')
|
||||||
|
->with('queue', 'symfony', ['1'])
|
||||||
|
->willReturn(1);
|
||||||
|
$redis->expects($this->exactly(1))->method('xdel')
|
||||||
|
->with('queue', ['1'])
|
||||||
|
->willReturn(1);
|
||||||
|
|
||||||
|
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', [], $redis); // 1 = always
|
||||||
|
$connection->reject('1');
|
||||||
|
}
|
||||||
|
|
||||||
public function testLastErrorGetsCleared()
|
public function testLastErrorGetsCleared()
|
||||||
{
|
{
|
||||||
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
|
||||||
|
@ -33,6 +33,7 @@ class Connection
|
|||||||
'consumer' => 'consumer',
|
'consumer' => 'consumer',
|
||||||
'auto_setup' => true,
|
'auto_setup' => true,
|
||||||
'delete_after_ack' => false,
|
'delete_after_ack' => false,
|
||||||
|
'delete_after_reject' => true,
|
||||||
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
|
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
|
||||||
'dbindex' => 0,
|
'dbindex' => 0,
|
||||||
'tls' => false,
|
'tls' => false,
|
||||||
@ -51,6 +52,7 @@ class Connection
|
|||||||
private $nextClaim = 0;
|
private $nextClaim = 0;
|
||||||
private $claimInterval;
|
private $claimInterval;
|
||||||
private $deleteAfterAck;
|
private $deleteAfterAck;
|
||||||
|
private $deleteAfterReject;
|
||||||
private $couldHavePendingMessages = true;
|
private $couldHavePendingMessages = true;
|
||||||
|
|
||||||
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
|
||||||
@ -89,6 +91,7 @@ class Connection
|
|||||||
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
|
||||||
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
|
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
|
||||||
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
|
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
|
||||||
|
$this->deleteAfterReject = $configuration['delete_after_reject'] ?? self::DEFAULT_OPTIONS['delete_after_reject'];
|
||||||
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
|
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
|
||||||
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
|
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
|
||||||
}
|
}
|
||||||
@ -128,6 +131,12 @@ class Connection
|
|||||||
unset($redisOptions['delete_after_ack']);
|
unset($redisOptions['delete_after_ack']);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$deleteAfterReject = null;
|
||||||
|
if (\array_key_exists('delete_after_reject', $redisOptions)) {
|
||||||
|
$deleteAfterReject = filter_var($redisOptions['delete_after_reject'], \FILTER_VALIDATE_BOOLEAN);
|
||||||
|
unset($redisOptions['delete_after_reject']);
|
||||||
|
}
|
||||||
|
|
||||||
$dbIndex = null;
|
$dbIndex = null;
|
||||||
if (\array_key_exists('dbindex', $redisOptions)) {
|
if (\array_key_exists('dbindex', $redisOptions)) {
|
||||||
$dbIndex = filter_var($redisOptions['dbindex'], \FILTER_VALIDATE_INT);
|
$dbIndex = filter_var($redisOptions['dbindex'], \FILTER_VALIDATE_INT);
|
||||||
@ -159,6 +168,7 @@ class Connection
|
|||||||
'auto_setup' => $autoSetup,
|
'auto_setup' => $autoSetup,
|
||||||
'stream_max_entries' => $maxEntries,
|
'stream_max_entries' => $maxEntries,
|
||||||
'delete_after_ack' => $deleteAfterAck,
|
'delete_after_ack' => $deleteAfterAck,
|
||||||
|
'delete_after_reject' => $deleteAfterReject,
|
||||||
'dbindex' => $dbIndex,
|
'dbindex' => $dbIndex,
|
||||||
'redeliver_timeout' => $redeliverTimeout,
|
'redeliver_timeout' => $redeliverTimeout,
|
||||||
'claim_interval' => $claimInterval,
|
'claim_interval' => $claimInterval,
|
||||||
@ -348,7 +358,9 @@ class Connection
|
|||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
|
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
|
||||||
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
|
if ($this->deleteAfterReject) {
|
||||||
|
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
|
||||||
|
}
|
||||||
} catch (\RedisException $e) {
|
} catch (\RedisException $e) {
|
||||||
throw new TransportException($e->getMessage(), 0, $e);
|
throw new TransportException($e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
@ -426,7 +438,7 @@ class Connection
|
|||||||
$this->connection->clearLastError();
|
$this->connection->clearLastError();
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->deleteAfterAck) {
|
if ($this->deleteAfterAck || $this->deleteAfterReject) {
|
||||||
$groups = $this->connection->xinfo('GROUPS', $this->stream);
|
$groups = $this->connection->xinfo('GROUPS', $this->stream);
|
||||||
if (
|
if (
|
||||||
// support for Redis extension version 5+
|
// support for Redis extension version 5+
|
||||||
@ -434,7 +446,7 @@ class Connection
|
|||||||
// support for Redis extension version 4.x
|
// support for Redis extension version 4.x
|
||||||
|| (\is_string($groups) && substr_count($groups, '"name"'))
|
|| (\is_string($groups) && substr_count($groups, '"name"'))
|
||||||
) {
|
) {
|
||||||
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
|
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack and delete_after_reject can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user