Properly daemonized 2-way Twitter bridge code

This commit is contained in:
Zach Copley 2009-05-07 00:25:15 -07:00
parent b291cb8a1b
commit 48226e0c48

View File

@ -28,25 +28,37 @@ define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
define('LACONICA', true); define('LACONICA', true);
// Tune number of processes and how often to poll Twitter // Tune number of processes and how often to poll Twitter
define('MAXCHILDREN', 5); // XXX: Should these things be in config.php?
define('POLL_INTERVAL', 60 * 10); // in seconds define('MAXCHILDREN', 2);
define('POLL_INTERVAL', 60); // in seconds
// Uncomment this to get useful console output // Uncomment this to get useful console output
define('SCRIPT_DEBUG', true); define('SCRIPT_DEBUG', true);
require_once(INSTALLDIR . '/lib/common.php'); require_once(INSTALLDIR . '/lib/common.php');
require_once(INSTALLDIR . '/lib/daemon.php');
$children = array(); class TwitterStatusFetcher extends Daemon
{
do { private $children = array();
$flinks = refreshFlinks(); function name()
{
return 'twitterstatusfetcher';
}
function run()
{
do {
$flinks = $this->refreshFlinks();
foreach ($flinks as $f){ foreach ($flinks as $f){
// We have to disconnect from the DB before forking so // We have to disconnect from the DB before forking so
// each process will open its own connection and // each sub-process will open its own connection and
// avoid stomping on each other // avoid stomping on the others
$conn = &$f->getDatabaseConnection(); $conn = &$f->getDatabaseConnection();
$conn->disconnect(); $conn->disconnect();
@ -60,33 +72,37 @@ do {
if ($pid) { if ($pid) {
// Parent // Parent
common_debug("Parent: forked new status fetcher process " . $pid);
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
print "Parent: forked " . $pid . "\n"; print "Parent: forked fetcher process " . $pid . "\n";
} }
$children[] = $pid; $this->children[] = $pid;
} else { } else {
// Child // Child
$this->getTimeline($f);
getTimeline($f, $child_db_name);
exit(); exit();
} }
// Remove child from ps list as it finishes // Remove child from ps list as it finishes
while(($c = pcntl_wait($status, WNOHANG OR WUNTRACED)) > 0) { while(($c = pcntl_wait($status, WNOHANG OR WUNTRACED)) > 0) {
common_debug("Child $c finished.");
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
print "Child $c finished.\n"; print "Child $c finished.\n";
} }
remove_ps($children, $c); $this->remove_ps($this->children, $c);
} }
// Wait if we have too many kids // Wait! We have too many damn kids.
if (sizeof($children) > MAXCHILDREN) { if (sizeof($this->children) > MAXCHILDREN) {
common_debug('Too many children. Waiting...');
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
print "Too many children. Waiting...\n"; print "Too many children. Waiting...\n";
@ -94,11 +110,13 @@ do {
if (($c = pcntl_wait($status, WUNTRACED)) > 0){ if (($c = pcntl_wait($status, WUNTRACED)) > 0){
common_debug("Finished waiting for $c");
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
print "Finished waiting for $c\n"; print "Finished waiting for $c\n";
} }
remove_ps($children, $c); $this->remove_ps($this->children, $c);
} }
} }
} }
@ -106,11 +124,13 @@ do {
// Remove all children from the process list before restarting // Remove all children from the process list before restarting
while(($c = pcntl_wait($status, WUNTRACED)) > 0) { while(($c = pcntl_wait($status, WUNTRACED)) > 0) {
common_debug("Child $c finished.");
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
print "Child $c finished.\n"; print "Child $c finished.\n";
} }
remove_ps($children, $c); $this->remove_ps($this->children, $c);
} }
// Rest for a bit before we fetch more statuses // Rest for a bit before we fetch more statuses
@ -123,10 +143,10 @@ do {
sleep(POLL_INTERVAL); sleep(POLL_INTERVAL);
} while (true); } while (true);
}
function refreshFlinks() {
function refreshFlinks() {
$flink = new Foreign_link(); $flink = new Foreign_link();
$flink->service = 1; // Twitter $flink->service = 1; // Twitter
@ -151,9 +171,9 @@ function refreshFlinks() {
unset($flink); unset($flink);
return $flinks; return $flinks;
} }
function remove_ps(&$plist, $ps){ function remove_ps(&$plist, $ps){
for ($i = 0; $i < sizeof($plist); $i++) { for ($i = 0; $i < sizeof($plist); $i++) {
if ($plist[$i] == $ps) { if ($plist[$i] == $ps) {
unset($plist[$i]); unset($plist[$i]);
@ -161,10 +181,10 @@ function remove_ps(&$plist, $ps){
break; break;
} }
} }
} }
function getTimeline($flink) function getTimeline($flink)
{ {
if (empty($flink)) { if (empty($flink)) {
common_log(LOG_WARNING, "Can't retrieve Foreign_link for foreign ID $fid"); common_log(LOG_WARNING, "Can't retrieve Foreign_link for foreign ID $fid");
@ -215,18 +235,18 @@ function getTimeline($flink)
continue; continue;
} }
saveStatus($status, $flink); $this->saveStatus($status, $flink);
} }
// Okay, record the time we synced with Twitter for posterity // Okay, record the time we synced with Twitter for posterity
$flink->last_noticesync = common_sql_now(); $flink->last_noticesync = common_sql_now();
$flink->update(); $flink->update();
} }
function saveStatus($status, $flink) function saveStatus($status, $flink)
{ {
$id = ensureProfile($status->user); $id = $this->ensureProfile($status->user);
$profile = Profile::staticGet($id); $profile = Profile::staticGet($id);
if (!$profile) { if (!$profile) {
@ -298,10 +318,10 @@ function saveStatus($status, $flink)
$inbox->insert(); $inbox->insert();
} }
} }
function ensureProfile($user) function ensureProfile($user)
{ {
// check to see if there's already a profile for this user // check to see if there's already a profile for this user
$profileurl = 'http://twitter.com/' . $user->screen_name; $profileurl = 'http://twitter.com/' . $user->screen_name;
$profile = Profile::staticGet('profileurl', $profileurl); $profile = Profile::staticGet('profileurl', $profileurl);
@ -310,7 +330,7 @@ function ensureProfile($user)
common_debug("Profile for $profile->nickname found."); common_debug("Profile for $profile->nickname found.");
// Check to see if the user's Avatar has changed // Check to see if the user's Avatar has changed
checkAvatar($user, $profile); $this->checkAvatar($user, $profile);
return $profile->id; return $profile->id;
} else { } else {
@ -370,14 +390,14 @@ function ensureProfile($user)
$profile->query("COMMIT"); $profile->query("COMMIT");
saveAvatars($user, $id); $this->saveAvatars($user, $id);
return $id; return $id;
} }
} }
function checkAvatar($user, $profile) function checkAvatar($user, $profile)
{ {
global $config; global $config;
$path_parts = pathinfo($user->profile_image_url); $path_parts = pathinfo($user->profile_image_url);
@ -399,7 +419,7 @@ function checkAvatar($user, $profile)
$img_root = substr($path_parts['basename'], 0, -11); $img_root = substr($path_parts['basename'], 0, -11);
$ext = $path_parts['extension']; $ext = $path_parts['extension'];
$mediatype = getMediatype($ext); $mediatype = $this->getMediatype($ext);
foreach (array('mini', 'normal', 'bigger') as $size) { foreach (array('mini', 'normal', 'bigger') as $size) {
$url = $path_parts['dirname'] . '/' . $url = $path_parts['dirname'] . '/' .
@ -407,15 +427,15 @@ function checkAvatar($user, $profile)
$filename = 'Twitter_' . $user->id . '_' . $filename = 'Twitter_' . $user->id . '_' .
$img_root . "_$size.$ext"; $img_root . "_$size.$ext";
if (fetchAvatar($url, $filename)) { if ($this->fetchAvatar($url, $filename)) {
updateAvatar($profile->id, $size, $mediatype, $filename); $this->updateAvatar($profile->id, $size, $mediatype, $filename);
}
} }
} }
} }
}
function getMediatype($ext) function getMediatype($ext)
{ {
$mediatype = null; $mediatype = null;
switch (strtolower($ext)) { switch (strtolower($ext)) {
@ -430,17 +450,17 @@ function getMediatype($ext)
} }
return $mediatype; return $mediatype;
} }
function saveAvatars($user, $id) function saveAvatars($user, $id)
{ {
global $config; global $config;
$path_parts = pathinfo($user->profile_image_url); $path_parts = pathinfo($user->profile_image_url);
$ext = $path_parts['extension']; $ext = $path_parts['extension'];
$end = strlen('_normal' . $ext); $end = strlen('_normal' . $ext);
$img_root = substr($path_parts['basename'], 0, -($end+1)); $img_root = substr($path_parts['basename'], 0, -($end+1));
$mediatype = getMediatype($ext); $mediatype = $this->getMediatype($ext);
foreach (array('mini', 'normal', 'bigger') as $size) { foreach (array('mini', 'normal', 'bigger') as $size) {
$url = $path_parts['dirname'] . '/' . $url = $path_parts['dirname'] . '/' .
@ -448,8 +468,8 @@ function saveAvatars($user, $id)
$filename = 'Twitter_' . $user->id . '_' . $filename = 'Twitter_' . $user->id . '_' .
$img_root . "_$size.$ext"; $img_root . "_$size.$ext";
if (fetchAvatar($url, $filename)) { if ($this->fetchAvatar($url, $filename)) {
newAvatar($id, $size, $mediatype, $filename); $this->newAvatar($id, $size, $mediatype, $filename);
} else { } else {
common_log(LOG_WARNING, "Problem fetching Avatar: $url", __FILE__); common_log(LOG_WARNING, "Problem fetching Avatar: $url", __FILE__);
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
@ -457,9 +477,9 @@ function saveAvatars($user, $id)
} }
} }
} }
} }
function updateAvatar($profile_id, $size, $mediatype, $filename) { function updateAvatar($profile_id, $size, $mediatype, $filename) {
common_debug("Updating avatar: $size"); common_debug("Updating avatar: $size");
if (defined('SCRIPT_DEBUG')) { if (defined('SCRIPT_DEBUG')) {
@ -485,11 +505,11 @@ function updateAvatar($profile_id, $size, $mediatype, $filename) {
$avatar->delete(); $avatar->delete();
} }
newAvatar($profile->id, $size, $mediatype, $filename); $this->newAvatar($profile->id, $size, $mediatype, $filename);
} }
function newAvatar($profile_id, $size, $mediatype, $filename) function newAvatar($profile_id, $size, $mediatype, $filename)
{ {
global $config; global $config;
$avatar = new Avatar(); $avatar = new Avatar();
@ -542,10 +562,10 @@ function newAvatar($profile_id, $size, $mediatype, $filename)
} }
return $id; return $id;
} }
function fetchAvatar($url, $filename) function fetchAvatar($url, $filename)
{ {
$avatar_dir = INSTALLDIR . '/avatar/'; $avatar_dir = INSTALLDIR . '/avatar/';
$avatarfile = $avatar_dir . $filename; $avatarfile = $avatar_dir . $filename;
@ -576,4 +596,15 @@ function fetchAvatar($url, $filename)
fclose($out); fclose($out);
return $result; return $result;
}
} }
ini_set("max_execution_time", "0");
ini_set("max_input_time", "0");
set_time_limit(0);
mb_internal_encoding('UTF-8');
declare(ticks = 1);
$fetcher = new TwitterStatusFetcher();
$fetcher->runOnce();