491 lines
12 KiB
PHP
491 lines
12 KiB
PHP
<?php
|
|
|
|
/*
|
|
* This file is part of the Stomp package.
|
|
*
|
|
* For the full copyright and license information, please view the LICENSE
|
|
* file that was distributed with this source code.
|
|
*/
|
|
|
|
namespace Stomp;
|
|
|
|
use Stomp\Exception\ConnectionException;
|
|
use Stomp\Exception\MissingReceiptException;
|
|
use Stomp\Exception\StompException;
|
|
use Stomp\Exception\UnexpectedResponseException;
|
|
use Stomp\Network\Connection;
|
|
use Stomp\Protocol\Protocol;
|
|
use Stomp\Protocol\Version;
|
|
use Stomp\Transport\Frame;
|
|
|
|
/**
|
|
* Stomp Client
|
|
*
|
|
* This is the minimal implementation of a Stomp Client, it allows to send and receive Frames using the Stomp Protocol.
|
|
*
|
|
* @package Stomp
|
|
* @author Hiram Chirino <hiram@hiramchirino.com>
|
|
* @author Dejan Bosanac <dejan@nighttale.net>
|
|
* @author Michael Caplan <mcaplan@labnet.net>
|
|
* @author Jens Radtke <swefl.oss@fin-sn.de>
|
|
*/
|
|
class Client
|
|
{
|
|
/**
|
|
* Perform request synchronously
|
|
*
|
|
* @var boolean
|
|
*/
|
|
private $sync = true;
|
|
|
|
|
|
/**
|
|
* Client id used for durable subscriptions
|
|
*
|
|
* @var string
|
|
*/
|
|
private $clientId;
|
|
|
|
/**
|
|
* Connection session id
|
|
*
|
|
* @var string|null
|
|
*/
|
|
private $sessionId;
|
|
|
|
/**
|
|
* Frames that have been read but not processed yet.
|
|
*
|
|
* @var Frame[]
|
|
*/
|
|
private $unprocessedFrames = [];
|
|
|
|
/**
|
|
* @var Connection|null
|
|
*/
|
|
private $connection;
|
|
|
|
/**
|
|
*
|
|
* @var Protocol|null
|
|
*/
|
|
private $protocol;
|
|
|
|
/**
|
|
* Seconds to wait for a receipt.
|
|
*
|
|
* @var float
|
|
*/
|
|
private $receiptWait = 2;
|
|
|
|
/**
|
|
*
|
|
* @var string
|
|
*/
|
|
private $login;
|
|
|
|
/**
|
|
*
|
|
* @var string
|
|
*/
|
|
private $passcode;
|
|
|
|
|
|
/**
|
|
*
|
|
* @var array
|
|
*/
|
|
private $versions = [Version::VERSION_1_0, Version::VERSION_1_1, Version::VERSION_1_2];
|
|
|
|
/**
|
|
*
|
|
* @var string
|
|
*/
|
|
private $host;
|
|
|
|
/**
|
|
*
|
|
* @var int[]
|
|
*/
|
|
private $heartbeat = [0, 0];
|
|
|
|
/**
|
|
* @var bool
|
|
*/
|
|
private $isConnecting = false;
|
|
|
|
/**
|
|
* Constructor
|
|
*
|
|
* @param string|Connection $broker Broker URL or a connection
|
|
* @see Connection::__construct()
|
|
*/
|
|
public function __construct($broker)
|
|
{
|
|
$this->connection = $broker instanceof Connection ? $broker : new Connection($broker);
|
|
}
|
|
|
|
/**
|
|
* Configure versions to support.
|
|
*
|
|
* @param array $versions defaults to all client supported versions
|
|
*/
|
|
public function setVersions(array $versions)
|
|
{
|
|
$this->versions = $versions;
|
|
}
|
|
|
|
/**
|
|
* Configure the login to use.
|
|
*
|
|
* @param string $login
|
|
* @param string $passcode
|
|
*/
|
|
public function setLogin($login, $passcode)
|
|
{
|
|
$this->login = $login;
|
|
$this->passcode = $passcode;
|
|
}
|
|
|
|
/**
|
|
* Sets an fixed vhostname, which will be passed on connect as header['host'].
|
|
*
|
|
* (null = Default value is the hostname determined by connection.)
|
|
*
|
|
* @param string $host
|
|
*/
|
|
public function setVhostname($host = null)
|
|
{
|
|
$this->host = $host;
|
|
}
|
|
|
|
/**
|
|
* Set the desired heartbeat for the connection.
|
|
*
|
|
* A heartbeat is a specific message that will be send / received when no other data is send / received
|
|
* within an interval - to indicate that the connection is still stable. If client and server agree on a beat and
|
|
* the interval passes without any data activity / beats the connection will be considered as broken and closed.
|
|
*
|
|
* If you want to make sure that the server is still available, you should use the ServerAliveObserver
|
|
* in combination with an requested server heartbeat interval.
|
|
*
|
|
* If you define a heartbeat for client side, you must assure that
|
|
* your application will send data within the interval.
|
|
* You can add \Stomp\Network\Observer\HeartbeatEmitter to your connection in order to send beats automatically.
|
|
*
|
|
* If you don't use HeartbeatEmitter you must either send messages within the interval
|
|
* or make calls to Connection::sendAlive()
|
|
*
|
|
* @param int $send
|
|
* Number of milliseconds between expected sending of heartbeats. 0 means
|
|
* no heartbeats sent.
|
|
* @param int $receive
|
|
* Number of milliseconds between expected receipt of heartbeats. 0 means
|
|
* no heartbeats expected. (not yet supported by this client)
|
|
* @see \Stomp\Network\Observer\ServerAliveObserver
|
|
* @see \Stomp\Network\Observer\HeartbeatEmitter
|
|
* @see \Stomp\Network\Connection::sendAlive()
|
|
*/
|
|
public function setHeartbeat($send = 0, $receive = 0)
|
|
{
|
|
$this->heartbeat = [$send, $receive];
|
|
}
|
|
|
|
/**
|
|
* Connect to server
|
|
*
|
|
* @return boolean
|
|
* @throws StompException
|
|
* @see setVhostname
|
|
*/
|
|
public function connect()
|
|
{
|
|
if ($this->isConnected()) {
|
|
return true;
|
|
}
|
|
$this->isConnecting = true;
|
|
$this->connection->connect();
|
|
$this->connection->getParser()->legacyMode(true);
|
|
$this->protocol = new Protocol($this->clientId);
|
|
|
|
$this->host = $this->host ?: $this->connection->getHost();
|
|
|
|
$connectFrame = $this->protocol->getConnectFrame(
|
|
$this->login,
|
|
$this->passcode,
|
|
$this->versions,
|
|
$this->host,
|
|
$this->heartbeat
|
|
);
|
|
$this->sendFrame($connectFrame, false);
|
|
|
|
if ($frame = $this->getConnectedFrame()) {
|
|
$version = new Version($frame);
|
|
|
|
if ($version->hasVersion(Version::VERSION_1_1)) {
|
|
$this->connection->getParser()->legacyMode(false);
|
|
}
|
|
|
|
$this->sessionId = $frame['session'];
|
|
$this->protocol = $version->getProtocol($this->clientId);
|
|
$this->isConnecting = false;
|
|
return true;
|
|
}
|
|
throw new ConnectionException('Connection not acknowledged');
|
|
}
|
|
|
|
/**
|
|
* Returns the next available frame from the connection, respecting the connect timeout.
|
|
*
|
|
* @return null|Frame
|
|
* @throws ConnectionException
|
|
* @throws Exception\ErrorFrameException
|
|
*/
|
|
private function getConnectedFrame()
|
|
{
|
|
$deadline = microtime(true) + $this->getConnection()->getConnectTimeout();
|
|
do {
|
|
if ($frame = $this->connection->readFrame()) {
|
|
return $frame;
|
|
}
|
|
} while (microtime(true) <= $deadline);
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Send a message to a destination in the messaging system
|
|
*
|
|
* @param string $destination Destination queue
|
|
* @param string|Frame $msg Message
|
|
* @param array $header
|
|
* @param boolean $sync Perform request synchronously
|
|
* @return boolean
|
|
*/
|
|
public function send($destination, $msg, array $header = [], $sync = null)
|
|
{
|
|
if (!$msg instanceof Frame) {
|
|
return $this->send($destination, new Frame('SEND', $header, $msg), [], $sync);
|
|
}
|
|
|
|
$msg->addHeaders($header);
|
|
$msg['destination'] = $destination;
|
|
return $this->sendFrame($msg, $sync);
|
|
}
|
|
|
|
/**
|
|
* Send a frame.
|
|
*
|
|
* @param Frame $frame
|
|
* @param boolean $sync
|
|
* @return boolean
|
|
*/
|
|
public function sendFrame(Frame $frame, $sync = null)
|
|
{
|
|
if (!$this->isConnecting && !$this->isConnected()) {
|
|
$this->connect();
|
|
}
|
|
// determine if client was configured to write sync or not
|
|
$writeSync = $sync !== null ? $sync : $this->sync;
|
|
if ($writeSync) {
|
|
return $this->sendFrameExpectingReceipt($frame);
|
|
} else {
|
|
return $this->connection->writeFrame($frame);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Write frame to server and expect an matching receipt frame
|
|
*
|
|
* @param Frame $stompFrame
|
|
* @return bool
|
|
*/
|
|
protected function sendFrameExpectingReceipt(Frame $stompFrame)
|
|
{
|
|
$receipt = md5(microtime());
|
|
$stompFrame['receipt'] = $receipt;
|
|
$this->connection->writeFrame($stompFrame);
|
|
return $this->waitForReceipt($receipt);
|
|
}
|
|
|
|
|
|
/**
|
|
* Wait for an receipt
|
|
*
|
|
* @param string $receipt
|
|
* @return boolean
|
|
* @throws UnexpectedResponseException If response has an invalid receipt.
|
|
* @throws MissingReceiptException If no receipt is received.
|
|
*/
|
|
protected function waitForReceipt($receipt)
|
|
{
|
|
$stopAfter = $this->calculateReceiptWaitEnd();
|
|
while (true) {
|
|
if ($frame = $this->connection->readFrame()) {
|
|
if ($frame->getCommand() == 'RECEIPT') {
|
|
if ($frame['receipt-id'] == $receipt) {
|
|
return true;
|
|
} else {
|
|
throw new UnexpectedResponseException($frame, sprintf('Expected receipt id %s', $receipt));
|
|
}
|
|
} else {
|
|
$this->unprocessedFrames[] = $frame;
|
|
}
|
|
}
|
|
if (microtime(true) >= $stopAfter) {
|
|
break;
|
|
}
|
|
}
|
|
throw new MissingReceiptException($receipt);
|
|
}
|
|
|
|
/**
|
|
* Returns the timestamp with micro time to stop wait for a receipt.
|
|
*
|
|
* @return float
|
|
*/
|
|
protected function calculateReceiptWaitEnd()
|
|
{
|
|
return microtime(true) + $this->receiptWait;
|
|
}
|
|
|
|
|
|
/**
|
|
* Read response frame from server
|
|
*
|
|
* @return Frame|false when no frame to read
|
|
*/
|
|
public function readFrame()
|
|
{
|
|
return array_shift($this->unprocessedFrames) ?: $this->connection->readFrame();
|
|
}
|
|
|
|
/**
|
|
* Graceful disconnect from the server
|
|
* @param bool $sync
|
|
* @return void
|
|
*/
|
|
public function disconnect($sync = false)
|
|
{
|
|
try {
|
|
if ($this->connection && $this->connection->isConnected()) {
|
|
if ($this->protocol) {
|
|
$this->sendFrame($this->protocol->getDisconnectFrame(), $sync);
|
|
}
|
|
}
|
|
} catch (StompException $ex) {
|
|
// nothing!
|
|
}
|
|
if ($this->connection) {
|
|
$this->connection->disconnect();
|
|
}
|
|
|
|
$this->sessionId = null;
|
|
$this->unprocessedFrames = [];
|
|
$this->protocol = null;
|
|
$this->isConnecting = false;
|
|
}
|
|
|
|
/**
|
|
* Current stomp session ID
|
|
*
|
|
* @return string|null
|
|
*/
|
|
public function getSessionId()
|
|
{
|
|
return $this->sessionId;
|
|
}
|
|
|
|
/**
|
|
* Graceful object destruction
|
|
*
|
|
*/
|
|
public function __destruct()
|
|
{
|
|
$this->disconnect();
|
|
}
|
|
|
|
/**
|
|
* Check if client session has ben established
|
|
*
|
|
* @return boolean
|
|
*/
|
|
public function isConnected()
|
|
{
|
|
return !empty($this->sessionId) && $this->connection->isConnected();
|
|
}
|
|
|
|
/**
|
|
* Get the used connection.
|
|
*
|
|
* @return Connection
|
|
*/
|
|
public function getConnection()
|
|
{
|
|
return $this->connection;
|
|
}
|
|
|
|
/**
|
|
* Get the currently used protocol.
|
|
*
|
|
* @return null|\Stomp\Protocol\Protocol
|
|
*/
|
|
public function getProtocol()
|
|
{
|
|
if (!$this->isConnecting && !$this->isConnected()) {
|
|
$this->connect();
|
|
}
|
|
return $this->protocol;
|
|
}
|
|
|
|
/**
|
|
* @return string
|
|
*/
|
|
public function getClientId()
|
|
{
|
|
return $this->clientId;
|
|
}
|
|
|
|
/**
|
|
* @param string $clientId
|
|
* @return Client
|
|
*/
|
|
public function setClientId($clientId)
|
|
{
|
|
$this->clientId = $clientId;
|
|
return $this;
|
|
}
|
|
|
|
|
|
/**
|
|
* Set seconds to wait for a receipt.
|
|
*
|
|
* @param float $seconds
|
|
*/
|
|
public function setReceiptWait($seconds)
|
|
{
|
|
$this->receiptWait = $seconds;
|
|
}
|
|
|
|
/**
|
|
* Check if client runs in synchronized mode, which is the default operation mode.
|
|
*
|
|
* @return boolean
|
|
*/
|
|
public function isSync()
|
|
{
|
|
return $this->sync;
|
|
}
|
|
|
|
/**
|
|
* Toggle synchronized mode.
|
|
*
|
|
* @param boolean $sync
|
|
*/
|
|
public function setSync($sync)
|
|
{
|
|
$this->sync = $sync;
|
|
}
|
|
}
|