335 lines
8.2 KiB
PHP
335 lines
8.2 KiB
PHP
<?php
|
|
|
|
/*
|
|
* This file is part of the Stomp package.
|
|
*
|
|
* For the full copyright and license information, please view the LICENSE
|
|
* file that was distributed with this source code.
|
|
*/
|
|
|
|
namespace Stomp\Protocol;
|
|
|
|
use Stomp\Exception\StompException;
|
|
use Stomp\Transport\Frame;
|
|
|
|
/**
|
|
* Stomp base protocol
|
|
*
|
|
*
|
|
* @package Stomp
|
|
* @author Hiram Chirino <hiram@hiramchirino.com>
|
|
* @author Dejan Bosanac <dejan@nighttale.net>
|
|
* @author Michael Caplan <mcaplan@labnet.net>
|
|
* @author Jens Radtke <swefl.oss@fin-sn.de>
|
|
*/
|
|
class Protocol
|
|
{
|
|
/**
|
|
* Client id used for durable subscriptions
|
|
*
|
|
* @var string
|
|
*/
|
|
private $clientId;
|
|
|
|
/**
|
|
* @var string
|
|
*/
|
|
private $version;
|
|
|
|
/**
|
|
* Server Version
|
|
*
|
|
* @var string
|
|
*/
|
|
private $server;
|
|
|
|
/**
|
|
* Setup stomp protocol with configuration.
|
|
*
|
|
* @param string $clientId
|
|
* @param string $version
|
|
* @param string $server
|
|
*/
|
|
public function __construct($clientId, $version = Version::VERSION_1_0, $server = null)
|
|
{
|
|
$this->clientId = $clientId;
|
|
$this->server = $server;
|
|
$this->version = $version;
|
|
}
|
|
|
|
/**
|
|
* Get the connect frame
|
|
*
|
|
* @param string $login
|
|
* @param string $passcode
|
|
* @param array $versions
|
|
* @param string $host
|
|
* @param int[] $heartbeat
|
|
* @return \Stomp\Transport\Frame
|
|
*/
|
|
final public function getConnectFrame(
|
|
$login = '',
|
|
$passcode = '',
|
|
array $versions = [],
|
|
$host = null,
|
|
$heartbeat = [0, 0]
|
|
) {
|
|
$frame = $this->createFrame('CONNECT');
|
|
$frame->legacyMode(true);
|
|
|
|
if ($login || $passcode) {
|
|
$frame->addHeaders(['login' => $login, 'passcode' => $passcode]);
|
|
}
|
|
|
|
if ($this->hasClientId()) {
|
|
$frame['client-id'] = $this->getClientId();
|
|
}
|
|
|
|
if (!empty($versions)) {
|
|
$frame['accept-version'] = implode(',', $versions);
|
|
}
|
|
|
|
$frame['host'] = $host;
|
|
|
|
$frame['heart-beat'] = $heartbeat[0] . ',' . $heartbeat[1];
|
|
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Get subscribe frame.
|
|
*
|
|
* @param string $destination
|
|
* @param string $subscriptionId
|
|
* @param string $ack
|
|
* @param string $selector
|
|
* @return \Stomp\Transport\Frame
|
|
* @throws StompException;
|
|
*/
|
|
public function getSubscribeFrame($destination, $subscriptionId = null, $ack = 'auto', $selector = null)
|
|
{
|
|
// validate ACK types per spec
|
|
// https://stomp.github.io/stomp-specification-1.0.html#frame-ACK
|
|
// https://stomp.github.io/stomp-specification-1.1.html#ACK
|
|
// https://stomp.github.io/stomp-specification-1.2.html#ACK
|
|
if ($this->hasVersion(Version::VERSION_1_1)) {
|
|
$validAcks = ['auto', 'client', 'client-individual'];
|
|
} else {
|
|
$validAcks = ['auto', 'client'];
|
|
}
|
|
if (!in_array($ack, $validAcks)) {
|
|
throw new StompException(
|
|
sprintf(
|
|
'"%s" is not a valid ack value for STOMP %s. A valid value is one of %s',
|
|
$ack,
|
|
$this->version,
|
|
implode(',', $validAcks)
|
|
)
|
|
);
|
|
}
|
|
|
|
$frame = $this->createFrame('SUBSCRIBE');
|
|
|
|
$frame['destination'] = $destination;
|
|
$frame['ack'] = $ack;
|
|
$frame['id'] = $subscriptionId;
|
|
$frame['selector'] = $selector;
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Get unsubscribe frame.
|
|
*
|
|
* @param string $destination
|
|
* @param string $subscriptionId
|
|
* @return \Stomp\Transport\Frame
|
|
*/
|
|
public function getUnsubscribeFrame($destination, $subscriptionId = null)
|
|
{
|
|
$frame = $this->createFrame('UNSUBSCRIBE');
|
|
$frame['destination'] = $destination;
|
|
$frame['id'] = $subscriptionId;
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Get transaction begin frame.
|
|
*
|
|
* @param string $transactionId
|
|
* @return \Stomp\Transport\Frame
|
|
*/
|
|
public function getBeginFrame($transactionId = null)
|
|
{
|
|
$frame = $this->createFrame('BEGIN');
|
|
$frame['transaction'] = $transactionId;
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Get transaction commit frame.
|
|
*
|
|
* @param string $transactionId
|
|
* @return \Stomp\Transport\Frame
|
|
*/
|
|
public function getCommitFrame($transactionId = null)
|
|
{
|
|
$frame = $this->createFrame('COMMIT');
|
|
$frame['transaction'] = $transactionId;
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Get transaction abort frame.
|
|
*
|
|
* @param string $transactionId
|
|
* @return \Stomp\Transport\Frame
|
|
*/
|
|
public function getAbortFrame($transactionId = null)
|
|
{
|
|
$frame = $this->createFrame('ABORT');
|
|
$frame['transaction'] = $transactionId;
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Get message acknowledge frame.
|
|
*
|
|
* @param Frame $frame
|
|
* @param string $transactionId
|
|
* @return Frame
|
|
*/
|
|
public function getAckFrame(Frame $frame, $transactionId = null)
|
|
{
|
|
$ack = $this->createFrame('ACK');
|
|
$ack['transaction'] = $transactionId;
|
|
if ($this->hasVersion(Version::VERSION_1_2)) {
|
|
if (isset($frame['ack'])) {
|
|
$ack['id'] = $frame['ack'];
|
|
} else {
|
|
$ack['id'] = $frame->getMessageId();
|
|
}
|
|
} else {
|
|
$ack['message-id'] = $frame->getMessageId();
|
|
if ($this->hasVersion(Version::VERSION_1_1)) {
|
|
$ack['subscription'] = $frame['subscription'];
|
|
}
|
|
}
|
|
return $ack;
|
|
}
|
|
|
|
/**
|
|
* Get message not acknowledge frame.
|
|
*
|
|
* @param \Stomp\Transport\Frame $frame
|
|
* @param string $transactionId
|
|
* @param bool $requeue Requeue header
|
|
* @return \Stomp\Transport\Frame
|
|
* @throws StompException
|
|
* @throws \LogicException
|
|
*/
|
|
public function getNackFrame(Frame $frame, $transactionId = null, $requeue = null)
|
|
{
|
|
if ($requeue !== null) {
|
|
throw new \LogicException('requeue header not supported');
|
|
}
|
|
if ($this->version === Version::VERSION_1_0) {
|
|
throw new StompException('Stomp Version 1.0 has no support for NACK Frames.');
|
|
}
|
|
$nack = $this->createFrame('NACK');
|
|
$nack['transaction'] = $transactionId;
|
|
if ($this->hasVersion(Version::VERSION_1_2)) {
|
|
$nack['id'] = $frame->getMessageId();
|
|
} else {
|
|
$nack['message-id'] = $frame->getMessageId();
|
|
if ($this->hasVersion(Version::VERSION_1_1)) {
|
|
$nack['subscription'] = $frame['subscription'];
|
|
}
|
|
}
|
|
|
|
$nack['message-id'] = $frame->getMessageId();
|
|
return $nack;
|
|
}
|
|
|
|
/**
|
|
* Get the disconnect frame.
|
|
*
|
|
* @return \Stomp\Transport\Frame
|
|
*/
|
|
public function getDisconnectFrame()
|
|
{
|
|
$frame = $this->createFrame('DISCONNECT');
|
|
if ($this->hasClientId()) {
|
|
$frame['client-id'] = $this->getClientId();
|
|
}
|
|
return $frame;
|
|
}
|
|
|
|
/**
|
|
* Client Id is set
|
|
*
|
|
* @return boolean
|
|
*/
|
|
public function hasClientId()
|
|
{
|
|
return (boolean) $this->clientId;
|
|
}
|
|
|
|
/**
|
|
* Client Id is set
|
|
*
|
|
* @return string
|
|
*/
|
|
public function getClientId()
|
|
{
|
|
return $this->clientId;
|
|
}
|
|
|
|
/**
|
|
* Stomp Version
|
|
*
|
|
* @return string
|
|
*/
|
|
public function getVersion()
|
|
{
|
|
return $this->version;
|
|
}
|
|
|
|
/**
|
|
* Server Version Info
|
|
*
|
|
* @return string
|
|
*/
|
|
public function getServer()
|
|
{
|
|
return $this->server;
|
|
}
|
|
|
|
/**
|
|
* Checks if given version is included (equal or lower) in active protocol version.
|
|
*
|
|
* @param string $version
|
|
* @return bool
|
|
*/
|
|
public function hasVersion($version)
|
|
{
|
|
return version_compare($this->version, $version, '>=');
|
|
}
|
|
|
|
/**
|
|
* Creates a Frame according to the detected STOMP version.
|
|
*
|
|
* @param string $command
|
|
* @return Frame
|
|
*/
|
|
protected function createFrame($command)
|
|
{
|
|
$frame = new Frame($command);
|
|
|
|
if ($this->version === Version::VERSION_1_0) {
|
|
$frame->legacyMode(true);
|
|
}
|
|
|
|
return $frame;
|
|
}
|
|
}
|