merged branch romainneutron/process-callbacks-2.2 (PR #8741)

This PR was merged into the 2.2 branch.

Discussion
----------

[Process][2.2] Fix #8739

| Q             | A
| ------------- | ---
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #8739
| License       | MIT

This adds a fix to #8739. Whenever a call is done to to any non-blocking methods (`Process::isRunning`, `Process::isStopped`, `Process::isTerminated`, `Process::getStatus`, `Process::getPid`...), buffers are read, and callback executed.

Such code will now work :

```
$process->start(function ($type, $data) {
    echo $data;
});

while ($process->isRunning()) {
    // some stuff
    // callback is executed
}
```

Commits
-------

fa769a2 [Process] Add more precision to Process::stop timeout
57d4159 [Process] Avoid zombie process in case of unit tests failure
3ef517b [Process] Fix #8739
7716fb2 [Process] Add failing test for #8739
bff6f3c [Process] Fix CS
This commit is contained in:
Fabien Potencier 2013-08-13 22:10:11 +02:00
commit f6e664d9b9
2 changed files with 159 additions and 158 deletions

View File

@ -38,6 +38,7 @@ class Process
// Timeout Precision in seconds.
const TIMEOUT_PRECISION = 0.2;
private $callback;
private $commandline;
private $cwd;
private $env;
@ -164,6 +165,7 @@ class Process
public function __clone()
{
$this->callback = null;
$this->exitcode = null;
$this->fallbackExitcode = null;
$this->processInformation = null;
@ -199,7 +201,7 @@ class Process
{
$this->start($callback);
return $this->wait($callback);
return $this->wait();
}
/**
@ -234,7 +236,7 @@ class Process
$this->stderr = '';
$this->incrementalOutputOffset = 0;
$this->incrementalErrorOutputOffset = 0;
$callback = $this->buildCallback($callback);
$this->callback = $this->buildCallback($callback);
//Fix for PHP bug #51800: reading from STDOUT pipe hangs forever on Windows if the output is too big.
//Workaround for this problem is to use temporary files instead of pipes on Windows platform.
@ -285,67 +287,9 @@ class Process
stream_set_blocking($pipe, false);
}
if (null === $this->stdin) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
return;
}
$writePipes = array($this->pipes[0]);
unset($this->pipes[0]);
$stdinLen = strlen($this->stdin);
$stdinOffset = 0;
while ($writePipes) {
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
$this->processFileHandles($callback);
}
$r = $this->pipes;
$w = $writePipes;
$e = null;
if (false === $n = @stream_select($r, $w, $e, 0, ceil(static::TIMEOUT_PRECISION * 1E6))) {
// if a system call has been interrupted, forget about it, let's try again
if ($this->hasSystemCallBeenInterrupted()) {
continue;
}
break;
}
// nothing has changed, let's wait until the process is ready
if (0 === $n) {
continue;
}
if ($w) {
$written = fwrite($writePipes[0], (binary) substr($this->stdin, $stdinOffset), 8192);
if (false !== $written) {
$stdinOffset += $written;
}
if ($stdinOffset >= $stdinLen) {
fclose($writePipes[0]);
$writePipes = null;
}
}
foreach ($r as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}
$this->checkTimeout();
}
$this->updateStatus();
$this->writePipes(false);
$this->updateStatus(false);
$this->checkTimeout();
}
/**
@ -391,55 +335,15 @@ class Process
*/
public function wait($callback = null)
{
$this->updateStatus();
$callback = $this->buildCallback($callback);
while ($this->pipes || (defined('PHP_WINDOWS_VERSION_BUILD') && $this->fileHandles)) {
if (defined('PHP_WINDOWS_VERSION_BUILD') && $this->fileHandles) {
$this->processFileHandles($callback, !$this->pipes);
}
$this->checkTimeout();
if ($this->pipes) {
$r = $this->pipes;
$w = null;
$e = null;
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, ceil(static::TIMEOUT_PRECISION * 1E6))) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occured, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
$this->pipes = array();
}
continue;
}
// nothing has changed
if (0 === $n) {
continue;
}
foreach ($r as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
// last exit code is output and caught to work around --enable-sigchild
if (3 == $type) {
$this->fallbackExitcode = (int) $data;
} else {
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
}
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}
}
$this->updateStatus(false);
if (null !== $callback) {
$this->callback = $this->buildCallback($callback);
}
$this->updateStatus();
while ($this->processInformation['running']) {
$this->updateStatus(true);
$this->checkTimeout();
}
$this->updateStatus(false);
if ($this->processInformation['signaled']) {
if ($this->isSigchildEnabled()) {
throw new RuntimeException('The process has been signaled.');
@ -482,7 +386,7 @@ class Process
*/
public function getOutput()
{
$this->updateOutput();
$this->readPipes(false);
return $this->stdout;
}
@ -514,7 +418,7 @@ class Process
*/
public function getErrorOutput()
{
$this->updateErrorOutput();
$this->readPipes(false);
return $this->stderr;
}
@ -553,7 +457,7 @@ class Process
throw new RuntimeException('This PHP has been compiled with --enable-sigchild. You must use setEnhanceSigchildCompatibility() to use this method');
}
$this->updateStatus();
$this->updateStatus(false);
return $this->exitcode;
}
@ -605,7 +509,7 @@ class Process
throw new RuntimeException('This PHP has been compiled with --enable-sigchild. Term signal can not be retrieved');
}
$this->updateStatus();
$this->updateStatus(false);
return $this->processInformation['signaled'];
}
@ -627,7 +531,7 @@ class Process
throw new RuntimeException('This PHP has been compiled with --enable-sigchild. Term signal can not be retrieved');
}
$this->updateStatus();
$this->updateStatus(false);
return $this->processInformation['termsig'];
}
@ -643,7 +547,7 @@ class Process
*/
public function hasBeenStopped()
{
$this->updateStatus();
$this->updateStatus(false);
return $this->processInformation['stopped'];
}
@ -659,7 +563,7 @@ class Process
*/
public function getStopSignal()
{
$this->updateStatus();
$this->updateStatus(false);
return $this->processInformation['stopsig'];
}
@ -675,7 +579,7 @@ class Process
return false;
}
$this->updateStatus();
$this->updateStatus(false);
return $this->processInformation['running'];
}
@ -697,7 +601,7 @@ class Process
*/
public function isTerminated()
{
$this->updateStatus();
$this->updateStatus(false);
return $this->status == self::STATUS_TERMINATED;
}
@ -711,7 +615,7 @@ class Process
*/
public function getStatus()
{
$this->updateStatus();
$this->updateStatus(false);
return $this->status;
}
@ -727,12 +631,10 @@ class Process
*/
public function stop($timeout = 10)
{
$timeoutMicro = (int) $timeout*1E6;
$timeoutMicro = microtime(true) + $timeout;
if ($this->isRunning()) {
proc_terminate($this->process);
$time = 0;
while (1 == $this->isRunning() && $time < $timeoutMicro) {
$time += 1000;
while ($this->isRunning() && microtime(true) < $timeoutMicro) {
usleep(1000);
}
@ -1062,14 +964,18 @@ class Process
}
/**
* Updates the status of the process.
* Updates the status of the process, reads pipes.
*
* @param Boolean $blocking Whether to use a clocking read call.
*/
protected function updateStatus()
protected function updateStatus($blocking)
{
if (self::STATUS_STARTED !== $this->status) {
return;
}
$this->readPipes($blocking);
$this->processInformation = proc_get_status($this->process);
if (!$this->processInformation['running']) {
$this->status = self::STATUS_TERMINATED;
@ -1079,29 +985,6 @@ class Process
}
}
/**
* Updates the current error output of the process (STDERR).
*/
protected function updateErrorOutput()
{
if (isset($this->pipes[self::STDERR]) && is_resource($this->pipes[self::STDERR])) {
$this->addErrorOutput(stream_get_contents($this->pipes[self::STDERR]));
}
}
/**
* Updates the current output of the process (STDOUT).
*/
protected function updateOutput()
{
if (defined('PHP_WINDOWS_VERSION_BUILD') && isset($this->fileHandles[self::STDOUT]) && is_resource($this->fileHandles[self::STDOUT])) {
fseek($this->fileHandles[self::STDOUT], $this->readBytes[self::STDOUT]);
$this->addOutput(stream_get_contents($this->fileHandles[self::STDOUT]));
} elseif (isset($this->pipes[self::STDOUT]) && is_resource($this->pipes[self::STDOUT])) {
$this->addOutput(stream_get_contents($this->pipes[self::STDOUT]));
}
}
/**
* Return whether PHP has been compiled with the '--enable-sigchild' option or not
*
@ -1122,10 +1005,9 @@ class Process
/**
* Handles the windows file handles fallbacks
*
* @param callable $callback A valid PHP callback
* @param Boolean $closeEmptyHandles if true, handles that are empty will be assumed closed
*/
private function processFileHandles($callback, $closeEmptyHandles = false)
private function processFileHandles($closeEmptyHandles = false)
{
$fh = $this->fileHandles;
foreach ($fh as $type => $fileHandle) {
@ -1133,7 +1015,7 @@ class Process
$data = fread($fileHandle, 8192);
if (strlen($data) > 0) {
$this->readBytes[$type] += strlen($data);
call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data);
call_user_func($this->callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || ($closeEmptyHandles && '' === $data && feof($fileHandle))) {
fclose($fileHandle);
@ -1154,4 +1036,106 @@ class Process
// stream_select returns false when the `select` system call is interrupted by an incoming signal
return isset($lastError['message']) && false !== stripos($lastError['message'], 'interrupted system call');
}
/**
* Reads pipes, executes callback.
*
* @param Boolean $blocking Whether to use blocking calls or not.
*/
private function readPipes($blocking)
{
if (defined('PHP_WINDOWS_VERSION_BUILD') && $this->fileHandles) {
$this->processFileHandles(!$this->pipes);
}
if ($this->pipes) {
$r = $this->pipes;
$w = null;
$e = null;
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? ceil(self::TIMEOUT_PRECISION * 1E6) : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occured, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
$this->pipes = array();
}
return;
}
// nothing has changed
if (0 === $n) {
return;
}
foreach ($r as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
// last exit code is output and caught to work around --enable-sigchild
if (3 == $type) {
$this->fallbackExitcode = (int) $data;
} else {
call_user_func($this->callback, $type == 1 ? self::OUT : self::ERR, $data);
}
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}
}
}
/**
* Writes data to pipes.
*
* @param Boolean $blocking Whether to use blocking calls or not.
*/
private function writePipes($blocking)
{
if (null === $this->stdin) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
return;
}
$writePipes = array($this->pipes[0]);
unset($this->pipes[0]);
$stdinLen = strlen($this->stdin);
$stdinOffset = 0;
while ($writePipes) {
$r = array();
$w = $writePipes;
$e = null;
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? ceil(static::TIMEOUT_PRECISION * 1E6) : 0)) {
// if a system call has been interrupted, forget about it, let's try again
if ($this->hasSystemCallBeenInterrupted()) {
continue;
}
break;
}
// nothing has changed, let's wait until the process is ready
if (0 === $n) {
continue;
}
if ($w) {
$written = fwrite($writePipes[0], (binary) substr($this->stdin, $stdinOffset), 8192);
if (false !== $written) {
$stdinOffset += $written;
}
if ($stdinOffset >= $stdinLen) {
fclose($writePipes[0]);
$writePipes = null;
}
}
}
}
}

View File

@ -70,6 +70,23 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
$this->assertLessThan(1.3, $duration);
}
public function testCallbacksAreExecutedWithStart()
{
$data = '';
$process = $this->getProcess('echo "foo";sleep 1;echo "foo"');
$process->start(function ($type, $buffer) use (&$data) {
$data .= $buffer;
});
$start = microtime(true);
while ($process->isRunning()) {
usleep(10000);
}
$this->assertEquals("foo\nfoo\n", $data);
}
/**
* tests results from sub processes
*
@ -246,7 +263,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
public function testStop()
{
$process = $this->getProcess('php -r "while (true) {}"');
$process = $this->getProcess('php -r "sleep(4);"');
$process->start();
$this->assertTrue($process->isRunning());
$process->stop();
@ -262,7 +279,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
public function testIsNotSuccessful()
{
$process = $this->getProcess('php -r "while (true) {}"');
$process = $this->getProcess('php -r "sleep(4);"');
$process->start();
$this->assertTrue($process->isRunning());
$process->stop();
@ -297,7 +314,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
$this->markTestSkipped('Windows does not support POSIX signals');
}
$process = $this->getProcess('php -r "while (true) {}"');
$process = $this->getProcess('php -r "sleep(4);"');
$process->start();
$process->stop();
$this->assertTrue($process->hasBeenSignaled());
@ -312,7 +329,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
// SIGTERM is only defined if pcntl extension is present
$termSignal = defined('SIGTERM') ? SIGTERM : 15;
$process = $this->getProcess('php -r "while (true) {}"');
$process = $this->getProcess('php -r "sleep(4);"');
$process->start();
$process->stop();
@ -343,7 +360,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
// Sleep doesn't work as it will allow the process to handle signals and close
// file handles from the other end.
$process = $this->getProcess('php -r "while (true) {}"');
$process = $this->getProcess('php -r "sleep 4"');
$process->start();
// PHP will deadlock when it tries to cleanup $process