gnu-social/vendor/enqueue/stomp/StompConsumer.php

211 lines
5.2 KiB
PHP

<?php
declare(strict_types=1);
namespace Enqueue\Stomp;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Message;
use Interop\Queue\Queue;
use Stomp\Client;
use Stomp\Transport\Frame;
class StompConsumer implements Consumer
{
const ACK_AUTO = 'auto';
const ACK_CLIENT = 'client';
const ACK_CLIENT_INDIVIDUAL = 'client-individual';
/**
* @var StompDestination
*/
private $queue;
/**
* @var Client
*/
private $stomp;
/**
* @var bool
*/
private $isSubscribed;
/**
* @var string
*/
private $ackMode;
/**
* @var int
*/
private $prefetchCount;
/**
* @var string
*/
private $subscriptionId;
public function __construct(BufferedStompClient $stomp, StompDestination $queue)
{
$this->stomp = $stomp;
$this->queue = $queue;
$this->isSubscribed = false;
$this->ackMode = self::ACK_CLIENT_INDIVIDUAL;
$this->prefetchCount = 1;
$this->subscriptionId = StompDestination::TYPE_TEMP_QUEUE == $queue->getType() ?
$queue->getQueueName() :
uniqid('', true)
;
}
public function setAckMode(string $mode): void
{
if (false === in_array($mode, [self::ACK_AUTO, self::ACK_CLIENT, self::ACK_CLIENT_INDIVIDUAL], true)) {
throw new \LogicException(sprintf('Ack mode is not valid: "%s"', $mode));
}
$this->ackMode = $mode;
}
public function getAckMode(): string
{
return $this->ackMode;
}
public function getPrefetchCount(): int
{
return $this->prefetchCount;
}
public function setPrefetchCount(int $prefetchCount): void
{
$this->prefetchCount = $prefetchCount;
}
/**
* @return StompDestination
*/
public function getQueue(): Queue
{
return $this->queue;
}
public function receive(int $timeout = 0): ?Message
{
$this->subscribe();
if (0 === $timeout) {
while (true) {
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
return $this->convertMessage($message);
}
}
} else {
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
return $this->convertMessage($message);
}
}
return null;
}
public function receiveNoWait(): ?Message
{
$this->subscribe();
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 0)) {
return $this->convertMessage($message);
}
return null;
}
/**
* @param StompMessage $message
*/
public function acknowledge(Message $message): void
{
InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
$this->stomp->sendFrame(
$this->stomp->getProtocol()->getAckFrame($message->getFrame())
);
}
/**
* @param StompMessage $message
*/
public function reject(Message $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, StompMessage::class);
$nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());
// rabbitmq STOMP protocol extension
$nackFrame->addHeaders([
'requeue' => $requeue ? 'true' : 'false',
]);
$this->stomp->sendFrame($nackFrame);
}
private function subscribe(): void
{
if (StompDestination::TYPE_TEMP_QUEUE == $this->queue->getType()) {
$this->isSubscribed = true;
return;
}
if (false == $this->isSubscribed) {
$this->isSubscribed = true;
$frame = $this->stomp->getProtocol()->getSubscribeFrame(
$this->queue->getQueueName(),
$this->subscriptionId,
$this->ackMode
);
// rabbitmq STOMP protocol extension
$headers = $this->queue->getHeaders();
$headers['prefetch-count'] = $this->prefetchCount;
$headers = StompHeadersEncoder::encode($headers);
foreach ($headers as $key => $value) {
$frame[$key] = $value;
}
$this->stomp->sendFrame($frame);
}
}
private function convertMessage(Frame $frame): StompMessage
{
if ('MESSAGE' !== $frame->getCommand()) {
throw new \LogicException(sprintf('Frame is not MESSAGE frame but: "%s"', $frame->getCommand()));
}
list($headers, $properties) = StompHeadersEncoder::decode($frame->getHeaders());
$redelivered = isset($headers['redelivered']) && 'true' === $headers['redelivered'];
unset(
$headers['redelivered'],
$headers['destination'],
$headers['message-id'],
$headers['ack'],
$headers['receipt'],
$headers['subscription'],
$headers['content-length']
);
$message = new StompMessage((string) $frame->getBody(), $properties, $headers);
$message->setRedelivered($redelivered);
$message->setFrame($frame);
return $message;
}
}