Merge branch 'master' into 0.9.x
Conflicts: lib/stompqueuemanager.php
This commit is contained in:
commit
54ebb0a2b9
@ -41,6 +41,10 @@ class StompQueueManager extends QueueManager
|
|||||||
|
|
||||||
protected $sites = array();
|
protected $sites = array();
|
||||||
|
|
||||||
|
protected $useTransactions = true;
|
||||||
|
protected $transaction = null;
|
||||||
|
protected $transactionCount = 0;
|
||||||
|
|
||||||
function __construct()
|
function __construct()
|
||||||
{
|
{
|
||||||
parent::__construct();
|
parent::__construct();
|
||||||
@ -201,6 +205,7 @@ class StompQueueManager extends QueueManager
|
|||||||
} else {
|
} else {
|
||||||
$this->doSubscribe();
|
$this->doSubscribe();
|
||||||
}
|
}
|
||||||
|
$this->begin();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,6 +218,9 @@ class StompQueueManager extends QueueManager
|
|||||||
*/
|
*/
|
||||||
public function finish()
|
public function finish()
|
||||||
{
|
{
|
||||||
|
// If there are any outstanding delivered messages we haven't processed,
|
||||||
|
// free them for another thread to take.
|
||||||
|
$this->rollback();
|
||||||
if ($this->sites) {
|
if ($this->sites) {
|
||||||
foreach ($this->sites as $server) {
|
foreach ($this->sites as $server) {
|
||||||
StatusNet::init($server);
|
StatusNet::init($server);
|
||||||
@ -293,7 +301,9 @@ class StompQueueManager extends QueueManager
|
|||||||
$notice = Notice::staticGet('id', $id);
|
$notice = Notice::staticGet('id', $id);
|
||||||
if (empty($notice)) {
|
if (empty($notice)) {
|
||||||
$this->_log(LOG_WARNING, "Skipping missing $info");
|
$this->_log(LOG_WARNING, "Skipping missing $info");
|
||||||
$this->con->ack($frame);
|
$this->ack($frame);
|
||||||
|
$this->commit();
|
||||||
|
$this->begin();
|
||||||
$this->stats('badnotice', $queue);
|
$this->stats('badnotice', $queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -308,7 +318,9 @@ class StompQueueManager extends QueueManager
|
|||||||
$handler = $this->getHandler($queue);
|
$handler = $this->getHandler($queue);
|
||||||
if (!$handler) {
|
if (!$handler) {
|
||||||
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
|
$this->_log(LOG_ERROR, "Missing handler class; skipping $info");
|
||||||
$this->con->ack($frame);
|
$this->ack($frame);
|
||||||
|
$this->commit();
|
||||||
|
$this->begin();
|
||||||
$this->stats('badhandler', $queue);
|
$this->stats('badhandler', $queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -320,14 +332,18 @@ class StompQueueManager extends QueueManager
|
|||||||
// FIXME we probably shouldn't have to do
|
// FIXME we probably shouldn't have to do
|
||||||
// this kind of queue management ourselves;
|
// this kind of queue management ourselves;
|
||||||
// if we don't ack, it should resend...
|
// if we don't ack, it should resend...
|
||||||
$this->con->ack($frame);
|
$this->ack($frame);
|
||||||
$this->enqueue($item, $queue);
|
$this->enqueue($item, $queue);
|
||||||
|
$this->commit();
|
||||||
|
$this->begin();
|
||||||
$this->stats('requeued', $queue);
|
$this->stats('requeued', $queue);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->_log(LOG_INFO, "Successfully handled $info");
|
$this->_log(LOG_INFO, "Successfully handled $info");
|
||||||
$this->con->ack($frame);
|
$this->ack($frame);
|
||||||
|
$this->commit();
|
||||||
|
$this->begin();
|
||||||
$this->stats('handled', $queue);
|
$this->stats('handled', $queue);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -369,5 +385,49 @@ class StompQueueManager extends QueueManager
|
|||||||
{
|
{
|
||||||
common_log($level, 'StompQueueManager: '.$msg);
|
common_log($level, 'StompQueueManager: '.$msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected function begin()
|
||||||
|
{
|
||||||
|
if ($this->useTransactions) {
|
||||||
|
if ($this->transaction) {
|
||||||
|
throw new Exception("Tried to start transaction in the middle of a transaction");
|
||||||
|
}
|
||||||
|
$this->transactionCount++;
|
||||||
|
$this->transaction = $this->master->id . '-' . $this->transactionCount . '-' . time();
|
||||||
|
$this->con->begin($this->transaction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function ack($frame)
|
||||||
|
{
|
||||||
|
if ($this->useTransactions) {
|
||||||
|
if (!$this->transaction) {
|
||||||
|
throw new Exception("Tried to ack but not in a transaction");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
$this->con->ack($frame, $this->transaction);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function commit()
|
||||||
|
{
|
||||||
|
if ($this->useTransactions) {
|
||||||
|
if (!$this->transaction) {
|
||||||
|
throw new Exception("Tried to commit but not in a transaction");
|
||||||
|
}
|
||||||
|
$this->con->commit($this->transaction);
|
||||||
|
$this->transaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function rollback()
|
||||||
|
{
|
||||||
|
if ($this->useTransactions) {
|
||||||
|
if (!$this->transaction) {
|
||||||
|
throw new Exception("Tried to rollback but not in a transaction");
|
||||||
|
}
|
||||||
|
$this->con->commit($this->transaction);
|
||||||
|
$this->transaction = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ class User_flag_profile extends Memcached_DataObject
|
|||||||
|
|
||||||
function keys()
|
function keys()
|
||||||
{
|
{
|
||||||
return array('profile_id' => 'N', 'user_id' => 'N');
|
return array('profile_id' => 'K', 'user_id' => 'K');
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -130,6 +130,15 @@ class User_flag_profile extends Memcached_DataObject
|
|||||||
return !empty($ufp);
|
return !empty($ufp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new flag
|
||||||
|
*
|
||||||
|
* @param integer $user_id ID of user who's flagging
|
||||||
|
* @param integer $profile_id ID of profile being flagged
|
||||||
|
*
|
||||||
|
* @return boolean success flag
|
||||||
|
*/
|
||||||
|
|
||||||
static function create($user_id, $profile_id)
|
static function create($user_id, $profile_id)
|
||||||
{
|
{
|
||||||
$ufp = new User_flag_profile();
|
$ufp = new User_flag_profile();
|
||||||
|
Loading…
Reference in New Issue
Block a user