merged branch drak3/process-control (PR #3681)

Commits
-------

f9f51a5 fixed cs
af65673 [Process] Added support for non-blocking process control Added methods to control long running processes to the Process class:  - A non blocking start method to startup a process and return    immediately  - A blocking waitForTermination method to wait for the processes    termination  - A stop method to stop a process started with start All status-getters like getOutput were changed to return real-time data

Discussion
----------

[Process] Added support for non-blocking process control

Bug fix: no
Feature addition: yes
Backwards compatibility break: no
Symfony2 tests pass: yes [![Build Status](https://secure.travis-ci.org/drak3/symfony.png?branch=process-control)](http://travis-ci.org/drak3/symfony)
Fixes the following tickets: #1049
Todo: -

Added methods to control long running processes (as described in issue #1049) to the Process class:
 - A non blocking start method to startup a process and return
   immediately
 - A blocking waitForTermination method to wait for the processes
   termination
 - A stop method to stop a process started with start
All status-getters like getOutput were changed to return real-time data

---------------------------------------------------------------------------

by Seldaek at 2012-03-23T10:52:30Z

Overall this seems like a good improvement. I didn't check the code in detail though.

---------------------------------------------------------------------------

by drak3 at 2012-03-23T11:21:45Z

@stof @Seldaek thanks, fixed
This commit is contained in:
Fabien Potencier 2012-03-23 12:49:19 +01:00
commit 4eec697c01
2 changed files with 273 additions and 57 deletions

View File

@ -24,6 +24,14 @@ class Process
const ERR = 'err';
const OUT = 'out';
const STATUS_READY = 'ready';
const STATUS_STARTED = 'started';
const STATUS_TERMINATED = 'terminated';
const STDIN = 0;
const STDOUT = 1;
const STDERR = 2;
private $commandline;
private $cwd;
private $env;
@ -31,10 +39,13 @@ class Process
private $timeout;
private $options;
private $exitcode;
private $status;
private $processInformation;
private $stdout;
private $stderr;
private $enhanceWindowsCompatibility;
private $pipes;
private $process;
private $status = self::STATUS_READY;
/**
* Exit codes translation table.
@ -141,24 +152,37 @@ class Process
*/
public function run($callback = null)
{
$this->start($callback);
return $this->waitForTermination($callback);
}
/**
* Starts the process and returns after sending the stdin.
*
* This method blocks until all stdin data is sent to the process then it returns while the process runs in the background.
* The termination of the process can be awaited with waitForTermination
*
* The callback receives the type of output (out or err) and
* some bytes from the output in real-time while writing the standard input to the process. It allows to have feedback
* from the independent process during execution.
* If there is no callback passed, the waitForTermination() method can be called with true as a second parameter
* then the callback will get all data occured in (and since) the start call
*
* @param Closure|string|array $callback A PHP callback to run whenever there is some
* output available on STDOUT or STDERR
*
* @throws \RuntimeException When process can't be launch or is stopped
* @throws \RuntimeException When process is already running
*/
public function start($callback = null)
{
if ($this->isRunning()) {
throw new \RuntimeException('Process is already running');
}
$this->stdout = '';
$this->stderr = '';
$that = $this;
$out = self::OUT;
$err = self::ERR;
$callback = function ($type, $data) use ($that, $callback, $out, $err)
{
if ($out == $type) {
$that->addOutput($data);
} else {
$that->addErrorOutput($data);
}
if (null !== $callback) {
call_user_func($callback, $type, $data);
}
};
$callback = $this->buildCallback($callback);
$descriptors = array(array('pipe', 'r'), array('pipe', 'w'), array('pipe', 'w'));
$commandline = $this->commandline;
@ -170,28 +194,30 @@ class Process
}
}
$process = proc_open($commandline, $descriptors, $pipes, $this->cwd, $this->env, $this->options);
$this->process = proc_open($commandline, $descriptors, $this->pipes, $this->cwd, $this->env, $this->options);
if (!is_resource($process)) {
if (!is_resource($this->process)) {
throw new \RuntimeException('Unable to launch a new process.');
}
$this->status = self::STATUS_STARTED;
foreach ($pipes as $pipe) {
foreach ($this->pipes as $pipe) {
stream_set_blocking($pipe, false);
}
if (null === $this->stdin) {
fclose($pipes[0]);
$writePipes = null;
fclose($this->pipes[0]);
return;
} else {
$writePipes = array($pipes[0]);
$writePipes = array($this->pipes[0]);
$stdinLen = strlen($this->stdin);
$stdinOffset = 0;
}
unset($pipes[0]);
unset($this->pipes[0]);
while ($pipes || $writePipes) {
$r = $pipes;
while ($writePipes) {
$r = $this->pipes;
$w = $writePipes;
$e = null;
@ -200,7 +226,7 @@ class Process
if (false === $n) {
break;
} elseif ($n === 0) {
proc_terminate($process);
proc_terminate($this->process);
throw new \RuntimeException('The process timed out.');
}
@ -217,56 +243,99 @@ class Process
}
foreach ($r as $pipe) {
$type = array_search($pipe, $pipes);
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
call_user_func($callback, $type == 1 ? $out : $err, $data);
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($pipes[$type]);
unset($this->pipes[$type]);
}
}
}
$this->status = proc_get_status($process);
$time = 0;
while (1 == $this->status['running'] && $time < 1000000) {
$time += 1000;
usleep(1000);
$this->status = proc_get_status($process);
}
$exitcode = proc_close($process);
if ($this->status['signaled']) {
throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->status['stopsig']));
}
return $this->exitcode = $this->status['running'] ? $exitcode : $this->status['exitcode'];
$this->processInformation = proc_get_status($this->process);
}
/**
* Returns the output of the process (STDOUT).
* Waits for the process to terminate
*
* This only returns the output if you have not supplied a callback
* to the run() method.
* The callback receives the type of output (out or err) and
* some bytes from the output in real-time while writing the standard input to the process. It allows to have feedback
* from the independent process during execution.
*
* @param Closure|string|array $callback
* @return int the exitcode of the process
* @throws \RuntimeException
*/
public function waitForTermination($callback = null)
{
$this->processInformation = proc_get_status($this->process);
$callback = $this->buildCallback($callback);
while ($this->pipes) {
$r = $this->pipes;
$w = null;
$e = null;
$n = @stream_select($r, $w, $e, $this->timeout);
if (false === $n) {
break;
}
if (0 === $n) {
proc_terminate($this->process);
throw new \RuntimeException('The process timed out.');
}
foreach ($r as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}
}
$this->updateStatus();
if ($this->processInformation['signaled']) {
throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->processInformation['stopsig']));
}
$time = 0;
while ($this->isRunning() && $time < 1000000) {
$time += 1000;
usleep(1000);
}
$exitcode = proc_close($this->process);
if ($this->processInformation['signaled']) {
throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->processInformation['stopsig']));
}
return $this->exitcode = $this->processInformation['running'] ? $exitcode : $this->processInformation['exitcode'];
}
/**
* Returns the current output of the process (STDOUT).
* @return string The process output
*
* @api
*/
public function getOutput()
{
$this->updateOutput();
return $this->stdout;
}
/**
* Returns the error output of the process (STDERR).
*
* This only returns the error output if you have not supplied a callback
* to the run() method.
* Returns the current error output of the process (STDERR).
*
* @return string The process error output
*
@ -274,6 +343,8 @@ class Process
*/
public function getErrorOutput()
{
$this->updateErrorOutput();
return $this->stderr;
}
@ -286,6 +357,8 @@ class Process
*/
public function getExitCode()
{
$this->updateStatus();
return $this->exitcode;
}
@ -302,6 +375,8 @@ class Process
*/
public function getExitCodeText()
{
$this->updateStatus();
return isset(self::$exitCodes[$this->exitcode]) ? self::$exitCodes[$this->exitcode] : 'Unknown error';
}
@ -314,6 +389,8 @@ class Process
*/
public function isSuccessful()
{
$this->updateStatus();
return 0 == $this->exitcode;
}
@ -328,7 +405,9 @@ class Process
*/
public function hasBeenSignaled()
{
return $this->status['signaled'];
$this->updateStatus();
return $this->processInformation['signaled'];
}
/**
@ -342,7 +421,9 @@ class Process
*/
public function getTermSignal()
{
return $this->status['termsig'];
$this->updateStatus();
return $this->processInformation['termsig'];
}
/**
@ -356,7 +437,9 @@ class Process
*/
public function hasBeenStopped()
{
return $this->status['stopped'];
$this->updateStatus();
return $this->processInformation['stopped'];
}
/**
@ -370,7 +453,51 @@ class Process
*/
public function getStopSignal()
{
return $this->status['stopsig'];
$this->updateStatus();
return $this->processInformation['stopsig'];
}
/**
* Returns if the process is currently running
* @return boolean
*/
public function isRunning()
{
if (self::STATUS_STARTED === $this->status) {
$this->updateStatus();
if($this->processInformation['running'] === false) {
$this->status = self::STATUS_TERMINATED;
}
return $this->processInformation['running'];
}
return false;
}
/**
* Stops the process
* @param float $timeout the timeout in seconds
* @return int the exitcode of the process
* @throws \RuntimeException if the process got signaled
*/
public function stop($timeout=10)
{
$timeoutMicro = (int) $timeout*10E6;
if ($this->isRunning()) {
proc_terminate($this->process);
$time = 0;
while (1 == $this->isRunning() && $time < $timeoutMicro) {
$time += 1000;
usleep(1000);
}
$exitcode = proc_close($this->process);
$this->exitcode = -1 === $this->processInformation['exitcode'] ? $exitcode : $this->processInformation['exitcode'];
}
$this->status = self::STATUS_TERMINATED;
return $this->exitcode;
}
public function addOutput($line)
@ -452,4 +579,58 @@ class Process
{
$this->enhanceWindowsCompatibility = (Boolean) $enhance;
}
/**
* Builds up the callback used by waitForTermination
* The callbacks adds all occured output to the specific buffer and calls the usercallback (if present) with the received output
* @param callable $callback the userdefined callback
* @return callable
*/
protected function buildCallback($callback) {
$that = $this;
$out = self::OUT;
$err = self::ERR;
$callback = function ($type, $data) use ($that, $callback, $out, $err)
{
if ($out == $type) {
$that->addOutput($data);
} else {
$that->addErrorOutput($data);
}
if (null !== $callback) {
call_user_func($callback, $type, $data);
}
};
return $callback;
}
/**
* If the process was started, its status is updated
*/
protected function updateStatus()
{
if (self::STATUS_STARTED === $this->status) {
$this->processInformation = proc_get_status($this->process);
if (!$this->processInformation['running']) {
$this->status = self::STATUS_TERMINATED;
if (-1 !== $this->processInformation['exitcode']) {
$this->exitcode = $this->processInformation['exitcode'];
}
}
}
}
protected function updateErrorOutput() {
if (isset($this->pipes[self::STDERR]) && is_resource($this->pipes[self::STDERR])) {
$this->addErrorOutput(stream_get_contents($this->pipes[self::STDERR]));
}
}
protected function updateOutput() {
if (isset($this->pipes[self::STDOUT]) && is_resource($this->pipes[self::STDOUT])) {
$this->addOutput(stream_get_contents($this->pipes[self::STDOUT]));
}
}
}

View File

@ -79,6 +79,41 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
$this->assertEquals('Misuse of shell builtins', $process->getExitCodeText());
}
public function testStartIsNonBlocking()
{
$process = new Process('php -r "sleep(4);"');
$start = microtime(true);
$process->start();
$end = microtime(true);
$this->assertLessThan(1 , $end-$start);
}
public function testUpdateStatus() {
$process = new Process('php -h');
$process->start();
usleep(0.05E6); //wait for output
$this->assertEquals(0, $process->getExitCode());
$this->assertTrue(strlen($process->getOutput()) > 0 );
}
public function testIsRunning() {
$process = new Process('php -r "sleep(1);"');
$this->assertFalse($process->isRunning());
$process->start();
$this->assertTrue($process->isRunning());
$process->waitForTermination();
$this->assertFalse($process->isRunning());
}
public function testStop() {
$process = new Process('php -r "while(true){}"');
$process->start();
$this->assertTrue($process->isRunning());
$process->stop();
$this->assertFalse($process->isRunning());
$this->assertTrue($process->hasBeenSignaled());
}
public function responsesCodeProvider()
{
return array(