[Process] Added support for non-blocking process control

Added methods to control long running processes to the Process class:
 - A non blocking start method to startup a process and return
   immediately
 - A blocking waitForTermination method to wait for the processes
   termination
 - A stop method to stop a process started with start
All status-getters like getOutput were changed to return real-time data
This commit is contained in:
drak3 2012-03-21 17:58:02 +01:00
parent 54ce7c75e8
commit af65673363
2 changed files with 271 additions and 58 deletions

View File

@ -24,6 +24,14 @@ class Process
const ERR = 'err';
const OUT = 'out';
const STATUS_READY = 'ready';
const STATUS_STARTED = 'started';
const STATUS_TERMINATED = 'terminated';
const STDIN = 0;
const STDOUT = 1;
const STDERR = 2;
private $commandline;
private $cwd;
private $env;
@ -31,10 +39,13 @@ class Process
private $timeout;
private $options;
private $exitcode;
private $status;
private $processInformation;
private $stdout;
private $stderr;
private $enhanceWindowsCompatibility;
private $pipes;
private $process;
private $status = self::STATUS_READY;
/**
* Exit codes translation table.
@ -141,24 +152,36 @@ class Process
*/
public function run($callback = null)
{
$this->start($callback);
return $this->waitForTermination($callback);
}
/**
* Starts the process and returns after sending the stdin.
* This method blocks until all stdin data is sent to the process then it returns while the pricess runs in the background.
* The termination of the process can be awaited with waitForTermination
*
* The callback receives the type of output (out or err) and
* some bytes from the output in real-time while writing the standard input to the process. It allows to have feedback
* from the independent process during execution.
* If there is no callback passed, the waitForTermination() method can be called with true as a second parameter
* then the callback will get all data occured in (and since) the start call
*
* @param Closure|string|array $callback A PHP callback to run whenever there is some
* output available on STDOUT or STDERR
*
* @throws \RuntimeException When process can't be launch or is stopped
* @throws \RuntimeException When process is already running
*/
public function start($callback = null)
{
if($this->isRunning()) {
throw new \RuntimeException('Process is already running');
}
$this->stdout = '';
$this->stderr = '';
$that = $this;
$out = self::OUT;
$err = self::ERR;
$callback = function ($type, $data) use ($that, $callback, $out, $err)
{
if ($out == $type) {
$that->addOutput($data);
} else {
$that->addErrorOutput($data);
}
if (null !== $callback) {
call_user_func($callback, $type, $data);
}
};
$callback = $this->buildCallback($callback);
$descriptors = array(array('pipe', 'r'), array('pipe', 'w'), array('pipe', 'w'));
$commandline = $this->commandline;
@ -170,28 +193,30 @@ class Process
}
}
$process = proc_open($commandline, $descriptors, $pipes, $this->cwd, $this->env, $this->options);
$this->process = proc_open($commandline, $descriptors, $this->pipes, $this->cwd, $this->env, $this->options);
if (!is_resource($process)) {
if (!is_resource($this->process)) {
throw new \RuntimeException('Unable to launch a new process.');
}
$this->status = self::STATUS_STARTED;
foreach ($pipes as $pipe) {
foreach ($this->pipes as $pipe) {
stream_set_blocking($pipe, false);
}
if (null === $this->stdin) {
fclose($pipes[0]);
$writePipes = null;
if(null === $this->stdin) {
fclose($this->pipes[0]);
return;
} else {
$writePipes = array($pipes[0]);
$writePipes = array($this->pipes[0]);
$stdinLen = strlen($this->stdin);
$stdinOffset = 0;
}
unset($pipes[0]);
unset($this->pipes[0]);
while ($pipes || $writePipes) {
$r = $pipes;
while ($writePipes) {
$r = $this->pipes;
$w = $writePipes;
$e = null;
@ -200,7 +225,7 @@ class Process
if (false === $n) {
break;
} elseif ($n === 0) {
proc_terminate($process);
proc_terminate($this->process);
throw new \RuntimeException('The process timed out.');
}
@ -217,56 +242,99 @@ class Process
}
foreach ($r as $pipe) {
$type = array_search($pipe, $pipes);
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
call_user_func($callback, $type == 1 ? $out : $err, $data);
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($pipes[$type]);
unset($this->pipes[$type]);
}
}
}
$this->status = proc_get_status($process);
$time = 0;
while (1 == $this->status['running'] && $time < 1000000) {
$time += 1000;
usleep(1000);
$this->status = proc_get_status($process);
}
$exitcode = proc_close($process);
if ($this->status['signaled']) {
throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->status['stopsig']));
}
return $this->exitcode = $this->status['running'] ? $exitcode : $this->status['exitcode'];
$this->processInformation = proc_get_status($this->process);
}
/**
* Returns the output of the process (STDOUT).
* Waits for the process to terminate
*
* This only returns the output if you have not supplied a callback
* to the run() method.
* The callback receives the type of output (out or err) and
* some bytes from the output in real-time while writing the standard input to the process. It allows to have feedback
* from the independent process during execution.
*
* @param Closure|string|array $callback
* @param boolean $invokeCallbackWithStartData
* @return int the exitcode of the process
* @throws \RuntimeException
*/
public function waitForTermination($callback = null)
{
$this->processInformation = proc_get_status($this->process);
$callback = $this->buildCallback($callback);
while ($this->pipes) {
$r = $this->pipes;
$w = null;
$e = null;
$n = @stream_select($r, $w, $e, $this->timeout);
if (false === $n) {
break;
} elseif ($n === 0) {
proc_terminate($this->process);
throw new \RuntimeException('The process timed out.');
}
foreach ($r as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}
}
$this->updateStatus();
if ($this->processInformation['signaled']) {
throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->processInformation['stopsig']));
}
$time = 0;
while ($this->isRunning() && $time < 1000000) {
$time += 1000;
usleep(1000);
}
$exitcode = proc_close($this->process);
if ($this->processInformation['signaled']) {
throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->processInformation['stopsig']));
}
return $this->exitcode = $this->processInformation['running'] ? $exitcode : $this->processInformation['exitcode'];
}
/**
* Returns the current output of the process (STDOUT).
* @return string The process output
*
* @api
*/
public function getOutput()
{
$this->updateOutput();
return $this->stdout;
}
/**
* Returns the error output of the process (STDERR).
*
* This only returns the error output if you have not supplied a callback
* to the run() method.
* Returns the current error output of the process (STDERR).
*
* @return string The process error output
*
@ -274,6 +342,8 @@ class Process
*/
public function getErrorOutput()
{
$this->updateErrorOutput();
return $this->stderr;
}
@ -286,6 +356,8 @@ class Process
*/
public function getExitCode()
{
$this->updateStatus();
return $this->exitcode;
}
@ -302,6 +374,8 @@ class Process
*/
public function getExitCodeText()
{
$this->updateStatus();
return isset(self::$exitCodes[$this->exitcode]) ? self::$exitCodes[$this->exitcode] : 'Unknown error';
}
@ -314,6 +388,8 @@ class Process
*/
public function isSuccessful()
{
$this->updateStatus();
return 0 == $this->exitcode;
}
@ -328,7 +404,9 @@ class Process
*/
public function hasBeenSignaled()
{
return $this->status['signaled'];
$this->updateStatus();
return $this->processInformation['signaled'];
}
/**
@ -342,7 +420,9 @@ class Process
*/
public function getTermSignal()
{
return $this->status['termsig'];
$this->updateStatus();
return $this->processInformation['termsig'];
}
/**
@ -356,7 +436,9 @@ class Process
*/
public function hasBeenStopped()
{
return $this->status['stopped'];
$this->updateStatus();
return $this->processInformation['stopped'];
}
/**
@ -370,7 +452,49 @@ class Process
*/
public function getStopSignal()
{
return $this->status['stopsig'];
$this->updateStatus();
return $this->processInformation['stopsig'];
}
/**
* Returns if the process is currently running
* @return boolean
*/
public function isRunning() {
if(self::STATUS_STARTED === $this->status) {
$this->updateStatus();
if($this->processInformation['running'] === false) {
$this->status = self::STATUS_TERMINATED;
}
return $this->processInformation['running'];
}
return false;
}
/**
* Stops the process
* @param float $timeout the timeout in seconds
* @return int the exitcode of the process
* @throws \RuntimeException if the process got signaled
*/
public function stop($timeout=10) {
$timeoutMicro = (int) $timeout*10E6;
if($this->isRunning()) {
proc_terminate($this->process);
$time = 0;
while (1 == $this->isRunning() && $time < $timeoutMicro) {
$time += 1000;
usleep(1000);
}
$exitcode = proc_close($this->process);
$this->exitcode = -1 === $this->processInformation['exitcode'] ? $exitcode : $this->processInformation['exitcode'];
}
$this->status = self::STATUS_TERMINATED;
return $this->exitcode;
}
public function addOutput($line)
@ -452,4 +576,58 @@ class Process
{
$this->enhanceWindowsCompatibility = (Boolean) $enhance;
}
/**
* Builds up the callback used by waitForTermination
* The callbacks adds all occured output to the specific buffer and calls the usercallback (if present) with the received output
* @param callable $callback the userdefined callback
* @return callable
*/
protected function buildCallback($callback) {
$that = $this;
$out = self::OUT;
$err = self::ERR;
$callback = function ($type, $data) use ($that, $callback, $out, $err)
{
if ($out == $type) {
$that->addOutput($data);
} else {
$that->addErrorOutput($data);
}
if (null !== $callback) {
call_user_func($callback, $type, $data);
}
};
return $callback;
}
/**
* If the process was started, its status is updated
*/
protected function updateStatus()
{
if(self::STATUS_STARTED === $this->status) {
$this->processInformation = proc_get_status($this->process);
if(!$this->processInformation['running']) {
$this->status = self::STATUS_TERMINATED;
if(-1 !== $this->processInformation['exitcode']) {
$this->exitcode = $this->processInformation['exitcode'];
}
}
}
}
protected function updateErrorOutput() {
if(isset($this->pipes[self::STDERR]) && is_resource($this->pipes[self::STDERR])) {
$this->addErrorOutput(stream_get_contents($this->pipes[self::STDERR]));
}
}
protected function updateOutput() {
if(isset($this->pipes[self::STDOUT]) && is_resource($this->pipes[self::STDOUT])) {
$this->addOutput(stream_get_contents($this->pipes[self::STDOUT]));
}
}
}

View File

@ -79,6 +79,41 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
$this->assertEquals('Misuse of shell builtins', $process->getExitCodeText());
}
public function testStartIsNonBlocking()
{
$process = new Process('php -r "sleep(4);"');
$start = microtime(true);
$process->start();
$end = microtime(true);
$this->assertLessThan(1 , $end-$start);
}
public function testUpdateStatus() {
$process = new Process('php -h');
$process->start();
usleep(0.05E6); //wait for output
$this->assertEquals(0, $process->getExitCode());
$this->assertTrue(strlen($process->getOutput()) > 0 );
}
public function testIsRunning() {
$process = new Process('php -r "sleep(1);"');
$this->assertFalse($process->isRunning());
$process->start();
$this->assertTrue($process->isRunning());
$process->waitForTermination();
$this->assertFalse($process->isRunning());
}
public function testStop() {
$process = new Process('php -r "while(true){}"');
$process->start();
$this->assertTrue($process->isRunning());
$process->stop();
$this->assertFalse($process->isRunning());
$this->assertTrue($process->hasBeenSignaled());
}
public function responsesCodeProvider()
{
return array(