* @author Dejan Bosanac * @author Michael Caplan * @author Jens Radtke */ class Client { /** * Perform request synchronously * * @var boolean */ private $sync = true; /** * Client id used for durable subscriptions * * @var string */ private $clientId; /** * Connection session id * * @var string|null */ private $sessionId; /** * Frames that have been read but not processed yet. * * @var Frame[] */ private $unprocessedFrames = []; /** * @var Connection|null */ private $connection; /** * * @var Protocol|null */ private $protocol; /** * Seconds to wait for a receipt. * * @var float */ private $receiptWait = 2; /** * * @var string */ private $login; /** * * @var string */ private $passcode; /** * * @var array */ private $versions = [Version::VERSION_1_0, Version::VERSION_1_1, Version::VERSION_1_2]; /** * * @var string */ private $host; /** * * @var int[] */ private $heartbeat = [0, 0]; /** * @var bool */ private $isConnecting = false; /** * Constructor * * @param string|Connection $broker Broker URL or a connection * @see Connection::__construct() */ public function __construct($broker) { $this->connection = $broker instanceof Connection ? $broker : new Connection($broker); } /** * Configure versions to support. * * @param array $versions defaults to all client supported versions */ public function setVersions(array $versions) { $this->versions = $versions; } /** * Configure the login to use. * * @param string $login * @param string $passcode */ public function setLogin($login, $passcode) { $this->login = $login; $this->passcode = $passcode; } /** * Sets an fixed vhostname, which will be passed on connect as header['host']. * * (null = Default value is the hostname determined by connection.) * * @param string $host */ public function setVhostname($host = null) { $this->host = $host; } /** * Set the desired heartbeat for the connection. * * A heartbeat is a specific message that will be send / received when no other data is send / received * within an interval - to indicate that the connection is still stable. If client and server agree on a beat and * the interval passes without any data activity / beats the connection will be considered as broken and closed. * * If you want to make sure that the server is still available, you should use the ServerAliveObserver * in combination with an requested server heartbeat interval. * * If you define a heartbeat for client side, you must assure that * your application will send data within the interval. * You can add \Stomp\Network\Observer\HeartbeatEmitter to your connection in order to send beats automatically. * * If you don't use HeartbeatEmitter you must either send messages within the interval * or make calls to Connection::sendAlive() * * @param int $send * Number of milliseconds between expected sending of heartbeats. 0 means * no heartbeats sent. * @param int $receive * Number of milliseconds between expected receipt of heartbeats. 0 means * no heartbeats expected. (not yet supported by this client) * @see \Stomp\Network\Observer\ServerAliveObserver * @see \Stomp\Network\Observer\HeartbeatEmitter * @see \Stomp\Network\Connection::sendAlive() */ public function setHeartbeat($send = 0, $receive = 0) { $this->heartbeat = [$send, $receive]; } /** * Connect to server * * @return boolean * @throws StompException * @see setVhostname */ public function connect() { if ($this->isConnected()) { return true; } $this->isConnecting = true; $this->connection->connect(); $this->connection->getParser()->legacyMode(true); $this->protocol = new Protocol($this->clientId); $this->host = $this->host ?: $this->connection->getHost(); $connectFrame = $this->protocol->getConnectFrame( $this->login, $this->passcode, $this->versions, $this->host, $this->heartbeat ); $this->sendFrame($connectFrame, false); if ($frame = $this->getConnectedFrame()) { $version = new Version($frame); if ($version->hasVersion(Version::VERSION_1_1)) { $this->connection->getParser()->legacyMode(false); } $this->sessionId = $frame['session']; $this->protocol = $version->getProtocol($this->clientId); $this->isConnecting = false; return true; } throw new ConnectionException('Connection not acknowledged'); } /** * Returns the next available frame from the connection, respecting the connect timeout. * * @return null|Frame * @throws ConnectionException * @throws Exception\ErrorFrameException */ private function getConnectedFrame() { $deadline = microtime(true) + $this->getConnection()->getConnectTimeout(); do { if ($frame = $this->connection->readFrame()) { return $frame; } } while (microtime(true) <= $deadline); return null; } /** * Send a message to a destination in the messaging system * * @param string $destination Destination queue * @param string|Frame $msg Message * @param array $header * @param boolean $sync Perform request synchronously * @return boolean */ public function send($destination, $msg, array $header = [], $sync = null) { if (!$msg instanceof Frame) { return $this->send($destination, new Frame('SEND', $header, $msg), [], $sync); } $msg->addHeaders($header); $msg['destination'] = $destination; return $this->sendFrame($msg, $sync); } /** * Send a frame. * * @param Frame $frame * @param boolean $sync * @return boolean */ public function sendFrame(Frame $frame, $sync = null) { if (!$this->isConnecting && !$this->isConnected()) { $this->connect(); } // determine if client was configured to write sync or not $writeSync = $sync !== null ? $sync : $this->sync; if ($writeSync) { return $this->sendFrameExpectingReceipt($frame); } else { return $this->connection->writeFrame($frame); } } /** * Write frame to server and expect an matching receipt frame * * @param Frame $stompFrame * @return bool */ protected function sendFrameExpectingReceipt(Frame $stompFrame) { $receipt = md5(microtime()); $stompFrame['receipt'] = $receipt; $this->connection->writeFrame($stompFrame); return $this->waitForReceipt($receipt); } /** * Wait for an receipt * * @param string $receipt * @return boolean * @throws UnexpectedResponseException If response has an invalid receipt. * @throws MissingReceiptException If no receipt is received. */ protected function waitForReceipt($receipt) { $stopAfter = $this->calculateReceiptWaitEnd(); while (true) { if ($frame = $this->connection->readFrame()) { if ($frame->getCommand() == 'RECEIPT') { if ($frame['receipt-id'] == $receipt) { return true; } else { throw new UnexpectedResponseException($frame, sprintf('Expected receipt id %s', $receipt)); } } else { $this->unprocessedFrames[] = $frame; } } if (microtime(true) >= $stopAfter) { break; } } throw new MissingReceiptException($receipt); } /** * Returns the timestamp with micro time to stop wait for a receipt. * * @return float */ protected function calculateReceiptWaitEnd() { return microtime(true) + $this->receiptWait; } /** * Read response frame from server * * @return Frame|false when no frame to read */ public function readFrame() { return array_shift($this->unprocessedFrames) ?: $this->connection->readFrame(); } /** * Graceful disconnect from the server * @param bool $sync * @return void */ public function disconnect($sync = false) { try { if ($this->connection && $this->connection->isConnected()) { if ($this->protocol) { $this->sendFrame($this->protocol->getDisconnectFrame(), $sync); } } } catch (StompException $ex) { // nothing! } if ($this->connection) { $this->connection->disconnect(); } $this->sessionId = null; $this->unprocessedFrames = []; $this->protocol = null; $this->isConnecting = false; } /** * Current stomp session ID * * @return string|null */ public function getSessionId() { return $this->sessionId; } /** * Graceful object destruction * */ public function __destruct() { $this->disconnect(); } /** * Check if client session has ben established * * @return boolean */ public function isConnected() { return !empty($this->sessionId) && $this->connection->isConnected(); } /** * Get the used connection. * * @return Connection */ public function getConnection() { return $this->connection; } /** * Get the currently used protocol. * * @return null|\Stomp\Protocol\Protocol */ public function getProtocol() { if (!$this->isConnecting && !$this->isConnected()) { $this->connect(); } return $this->protocol; } /** * @return string */ public function getClientId() { return $this->clientId; } /** * @param string $clientId * @return Client */ public function setClientId($clientId) { $this->clientId = $clientId; return $this; } /** * Set seconds to wait for a receipt. * * @param float $seconds */ public function setReceiptWait($seconds) { $this->receiptWait = $seconds; } /** * Check if client runs in synchronized mode, which is the default operation mode. * * @return boolean */ public function isSync() { return $this->sync; } /** * Toggle synchronized mode. * * @param boolean $sync */ public function setSync($sync) { $this->sync = $sync; } }