[Process][2.2] Fix Process component on windows

This commit is contained in:
Romain Neutron 2013-09-02 10:23:35 +02:00 committed by Fabien Potencier
parent 567e3f4642
commit 4a76c76698
4 changed files with 293 additions and 223 deletions

View File

@ -53,14 +53,13 @@ class Process
private $stderr; private $stderr;
private $enhanceWindowsCompatibility; private $enhanceWindowsCompatibility;
private $enhanceSigchildCompatibility; private $enhanceSigchildCompatibility;
private $pipes;
private $process; private $process;
private $status = self::STATUS_READY; private $status = self::STATUS_READY;
private $incrementalOutputOffset; private $incrementalOutputOffset;
private $incrementalErrorOutputOffset; private $incrementalErrorOutputOffset;
private $fileHandles; private $useFileHandles = false;
private $readBytes; private $processPipes;
private static $sigchild; private static $sigchild;
@ -152,6 +151,7 @@ class Process
} }
$this->stdin = $stdin; $this->stdin = $stdin;
$this->setTimeout($timeout); $this->setTimeout($timeout);
$this->useFileHandles = defined('PHP_WINDOWS_VERSION_BUILD');
$this->enhanceWindowsCompatibility = true; $this->enhanceWindowsCompatibility = true;
$this->enhanceSigchildCompatibility = !defined('PHP_WINDOWS_VERSION_BUILD') && $this->isSigchildEnabled(); $this->enhanceSigchildCompatibility = !defined('PHP_WINDOWS_VERSION_BUILD') && $this->isSigchildEnabled();
$this->options = array_replace(array('suppress_errors' => true, 'binary_pipes' => true), $options); $this->options = array_replace(array('suppress_errors' => true, 'binary_pipes' => true), $options);
@ -225,33 +225,14 @@ class Process
$this->starttime = microtime(true); $this->starttime = microtime(true);
$this->callback = $this->buildCallback($callback); $this->callback = $this->buildCallback($callback);
//Fix for PHP bug #51800: reading from STDOUT pipe hangs forever on Windows if the output is too big. $this->processPipes = new ProcessPipes($this->useFileHandles);
//Workaround for this problem is to use temporary files instead of pipes on Windows platform. $descriptors = $this->processPipes->getDescriptors();
//@see https://bugs.php.net/bug.php?id=51800
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
$this->fileHandles = array(
self::STDOUT => tmpfile(),
);
if (false === $this->fileHandles[self::STDOUT]) {
throw new RuntimeException('A temporary file could not be opened to write the process output to, verify that your TEMP environment variable is writable');
}
$this->readBytes = array(
self::STDOUT => 0,
);
$descriptors = array(array('pipe', 'r'), $this->fileHandles[self::STDOUT], array('pipe', 'w'));
} else {
$descriptors = array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);
if ($this->enhanceSigchildCompatibility && $this->isSigchildEnabled()) { if (!$this->useFileHandles && $this->enhanceSigchildCompatibility && $this->isSigchildEnabled()) {
// last exit code is output on the fourth pipe and caught to work around --enable-sigchild // last exit code is output on the fourth pipe and caught to work around --enable-sigchild
$descriptors = array_merge($descriptors, array(array('pipe', 'w'))); $descriptors = array_merge($descriptors, array(array('pipe', 'w')));
$this->commandline = '('.$this->commandline.') 3>/dev/null; code=$?; echo $code >&3; exit $code'; $this->commandline = '('.$this->commandline.') 3>/dev/null; code=$?; echo $code >&3; exit $code';
}
} }
$commandline = $this->commandline; $commandline = $this->commandline;
@ -263,18 +244,15 @@ class Process
} }
} }
$this->process = proc_open($commandline, $descriptors, $this->pipes, $this->cwd, $this->env, $this->options); $this->process = proc_open($commandline, $descriptors, $this->processPipes->pipes, $this->cwd, $this->env, $this->options);
if (!is_resource($this->process)) { if (!is_resource($this->process)) {
throw new RuntimeException('Unable to launch a new process.'); throw new RuntimeException('Unable to launch a new process.');
} }
$this->status = self::STATUS_STARTED; $this->status = self::STATUS_STARTED;
foreach ($this->pipes as $pipe) { $this->processPipes->unblock();
stream_set_blocking($pipe, false); $this->processPipes->write(false, $this->stdin);
}
$this->writePipes();
$this->updateStatus(false); $this->updateStatus(false);
$this->checkTimeout(); $this->checkTimeout();
} }
@ -326,24 +304,12 @@ class Process
if (null !== $callback) { if (null !== $callback) {
$this->callback = $this->buildCallback($callback); $this->callback = $this->buildCallback($callback);
} }
while ($this->pipes || (defined('PHP_WINDOWS_VERSION_BUILD') && $this->fileHandles)) { while ($this->processInformation['running']) {
$this->checkTimeout(); $this->checkTimeout();
$this->readPipes(true); $this->updateStatus(true);
} }
$this->updateStatus(false); $this->updateStatus(false);
if ($this->processInformation['signaled']) { $this->processPipes->close();
if ($this->isSigchildEnabled()) {
throw new RuntimeException('The process has been signaled.');
}
throw new RuntimeException(sprintf('The process has been signaled with signal "%s".', $this->processInformation['termsig']));
}
$time = 0;
while ($this->isRunning() && $time < 1000000) {
$time += 1000;
usleep(1000);
}
$exitcode = proc_close($this->process); $exitcode = proc_close($this->process);
@ -628,21 +594,10 @@ class Process
if (!defined('PHP_WINDOWS_VERSION_BUILD') && $this->isRunning()) { if (!defined('PHP_WINDOWS_VERSION_BUILD') && $this->isRunning()) {
proc_terminate($this->process, SIGKILL); proc_terminate($this->process, SIGKILL);
} }
$this->processPipes->close();
foreach ($this->pipes as $pipe) {
fclose($pipe);
}
$this->pipes = array();
$exitcode = proc_close($this->process); $exitcode = proc_close($this->process);
$this->exitcode = -1 === $this->processInformation['exitcode'] ? $exitcode : $this->processInformation['exitcode']; $this->exitcode = -1 === $this->processInformation['exitcode'] ? $exitcode : $this->processInformation['exitcode'];
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
foreach ($this->fileHandles as $fileHandle) {
fclose($fileHandle);
}
$this->fileHandles = array();
}
} }
$this->status = self::STATUS_TERMINATED; $this->status = self::STATUS_TERMINATED;
@ -989,41 +944,6 @@ class Process
return self::$sigchild = false !== strpos(ob_get_clean(), '--enable-sigchild'); return self::$sigchild = false !== strpos(ob_get_clean(), '--enable-sigchild');
} }
/**
* Handles the windows file handles fallbacks
*
* @param Boolean $closeEmptyHandles if true, handles that are empty will be assumed closed
*/
private function processFileHandles($closeEmptyHandles = false)
{
$fh = $this->fileHandles;
foreach ($fh as $type => $fileHandle) {
fseek($fileHandle, $this->readBytes[$type]);
$data = fread($fileHandle, 8192);
if (strlen($data) > 0) {
$this->readBytes[$type] += strlen($data);
call_user_func($this->callback, $type == 1 ? self::OUT : self::ERR, $data);
}
if (false === $data || ($closeEmptyHandles && '' === $data && feof($fileHandle))) {
fclose($fileHandle);
unset($this->fileHandles[$type]);
}
}
}
/**
* Returns true if a system call has been interrupted.
*
* @return Boolean
*/
private function hasSystemCallBeenInterrupted()
{
$lastError = error_get_last();
// stream_select returns false when the `select` system call is interrupted by an incoming signal
return isset($lastError['message']) && false !== stripos($lastError['message'], 'interrupted system call');
}
/** /**
* Reads pipes, executes callback. * Reads pipes, executes callback.
* *
@ -1031,113 +951,11 @@ class Process
*/ */
private function readPipes($blocking) private function readPipes($blocking)
{ {
if (defined('PHP_WINDOWS_VERSION_BUILD') && $this->fileHandles) { foreach ($this->processPipes->read($blocking) as $type => $data) {
$this->processFileHandles(!$this->pipes); if (3 == $type) {
} $this->fallbackExitcode = (int) $data;
} else {
if ($this->pipes) { call_user_func($this->callback, $type === self::STDOUT ? self::OUT : self::ERR, $data);
$r = $this->pipes;
$w = null;
$e = null;
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? ceil(self::TIMEOUT_PRECISION * 1E6) : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occured, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
$this->pipes = array();
}
return;
}
// nothing has changed
if (0 === $n) {
return;
}
$this->processReadPipes($r);
}
}
/**
* Writes data to pipes.
*
* @param Boolean $blocking Whether to use blocking calls or not.
*/
private function writePipes()
{
if (null === $this->stdin) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
return;
}
$writePipes = array($this->pipes[0]);
unset($this->pipes[0]);
$stdinLen = strlen($this->stdin);
$stdinOffset = 0;
while ($writePipes) {
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
$this->processFileHandles();
}
$r = $this->pipes;
$w = $writePipes;
$e = null;
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? ceil(static::TIMEOUT_PRECISION * 1E6) : 0)) {
// if a system call has been interrupted, forget about it, let's try again
if ($this->hasSystemCallBeenInterrupted()) {
continue;
}
break;
}
// nothing has changed, let's wait until the process is ready
if (0 === $n) {
continue;
}
if ($w) {
$written = fwrite($writePipes[0], (binary) substr($this->stdin, $stdinOffset), 8192);
if (false !== $written) {
$stdinOffset += $written;
}
if ($stdinOffset >= $stdinLen) {
fclose($writePipes[0]);
$writePipes = null;
}
}
$this->processReadPipes($r);
}
}
/**
* Processes read pipes, executes callback on it.
*
* @param array $pipes
*/
private function processReadPipes(array $pipes)
{
foreach ($pipes as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
// last exit code is output and caught to work around --enable-sigchild
if (3 == $type) {
$this->fallbackExitcode = (int) $data;
} else {
call_user_func($this->callback, $type == 1 ? self::OUT : self::ERR, $data);
}
}
if (false === $data || feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
} }
} }
} }
@ -1154,11 +972,8 @@ class Process
$this->processInformation = null; $this->processInformation = null;
$this->stdout = null; $this->stdout = null;
$this->stderr = null; $this->stderr = null;
$this->pipes = null;
$this->process = null; $this->process = null;
$this->status = self::STATUS_READY; $this->status = self::STATUS_READY;
$this->fileHandles = null;
$this->readBytes = null;
$this->incrementalOutputOffset = 0; $this->incrementalOutputOffset = 0;
$this->incrementalErrorOutputOffset = 0; $this->incrementalErrorOutputOffset = 0;
} }

View File

@ -0,0 +1,254 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Process;
use Symfony\Component\Process\Exception\RuntimeException;
/**
* ProcessPipes manages descriptors and pipes for the use of proc_open.
*/
class ProcessPipes
{
/** @var array */
public $pipes = array();
/** @var array */
private $fileHandles = array();
/** @var array */
private $readBytes = array();
/** @var Boolean */
private $useFiles;
public function __construct($useFiles = false)
{
$this->useFiles = (Boolean) $useFiles;
// Fix for PHP bug #51800: reading from STDOUT pipe hangs forever on Windows if the output is too big.
// Workaround for this problem is to use temporary files instead of pipes on Windows platform.
//
// Please note that this work around prevents hanging but
// another issue occurs : In some race conditions, some data may be
// lost or corrupted.
//
// @see https://bugs.php.net/bug.php?id=51800
if ($this->useFiles) {
$this->fileHandles = array(
Process::STDOUT => tmpfile(),
Process::STDERR => tmpfile(),
);
if (false === $this->fileHandles[Process::STDOUT]) {
throw new RuntimeException('A temporary file could not be opened to write the process output to, verify that your TEMP environment variable is writable');
}
if (false === $this->fileHandles[Process::STDERR]) {
throw new RuntimeException('A temporary file could not be opened to write the process output to, verify that your TEMP environment variable is writable');
}
$this->readBytes = array(
Process::STDOUT => 0,
Process::STDERR => 0,
);
}
}
public function __destruct()
{
$this->close();
}
/**
* Sets non-blocking mode on pipes.
*/
public function unblock()
{
foreach ($this->pipes as $pipe) {
stream_set_blocking($pipe, 0);
}
}
/**
* Closes file handles and pipes.
*/
public function close()
{
foreach ($this->pipes as $offset => $pipe) {
fclose($pipe);
}
foreach ($this->fileHandles as $offset => $handle) {
fclose($handle);
}
$this->fileHandles = $this->pipes = array();
}
/**
* Returns an array of descriptors for the use of proc_open.
*
* @return array
*/
public function getDescriptors()
{
if ($this->useFiles) {
return array(
array('pipe', 'r'),
$this->fileHandles[Process::STDOUT],
$this->fileHandles[Process::STDERR],
);
}
return array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);
}
/**
* Reads data in file handles and pipes.
*
* @param Boolean $blocking Whether to use blocking calls or not.
*
* @return array An array of read data indexed by their fd.
*/
public function read($blocking)
{
return array_replace($this->readStreams($blocking), $this->readFileHandles());
}
/**
* Writes stdin data.
*
* @param Boolean $blocking Whether to use blocking calls or not.
* @param string $stdin The data to write.
*/
public function write($blocking, $stdin)
{
if (null === $stdin) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
return;
}
$writePipes = array($this->pipes[0]);
unset($this->pipes[0]);
$stdinLen = strlen($stdin);
$stdinOffset = 0;
while ($writePipes) {
$r = null;
$w = $writePipes;
$e = null;
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? ceil(Process::TIMEOUT_PRECISION * 1E6) : 0)) {
// if a system call has been interrupted, forget about it, let's try again
if ($this->hasSystemCallBeenInterrupted()) {
continue;
}
break;
}
// nothing has changed, let's wait until the process is ready
if (0 === $n) {
continue;
}
if ($w) {
$written = fwrite($writePipes[0], (binary) substr($stdin, $stdinOffset), 8192);
if (false !== $written) {
$stdinOffset += $written;
}
if ($stdinOffset >= $stdinLen) {
fclose($writePipes[0]);
$writePipes = null;
}
}
}
}
/**
* Reads data in file handles.
*
* @return array An array of read data indexed by their fd.
*/
private function readFileHandles()
{
$read = array();
foreach ($this->fileHandles as $type => $fileHandle) {
fseek($fileHandle, $this->readBytes[$type]);
$data = '';
while (!feof($fileHandle)) {
$data .= fread($fileHandle, 8192);
}
if (0 < $length = strlen($data)) {
$this->readBytes[$type] += $length;
$read[$type] = $data;
}
}
return $read;
}
/**
* Reads data in file pipes streams.
*
* @param Boolean $blocking Whether to use blocking calls or not.
*
* @return array An array of read data indexed by their fd.
*/
private function readStreams($blocking)
{
$read = array();
$r = $this->pipes;
$w = null;
$e = null;
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? ceil(Process::TIMEOUT_PRECISION * 1E6) : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occured, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
$this->pipes = array();
}
return $read;
}
// nothing has changed
if (0 === $n) {
return $read;
}
foreach ($r as $pipe) {
$type = array_search($pipe, $this->pipes);
$data = fread($pipe, 8192);
if (strlen($data) > 0) {
$read[$type] = $data;
}
}
return $read;
}
/**
* Returns true if a system call has been interrupted.
*
* @return Boolean
*/
private function hasSystemCallBeenInterrupted()
{
$lastError = error_get_last();
// stream_select returns false when the `select` system call is interrupted by an incoming signal
return isset($lastError['message']) && false !== stripos($lastError['message'], 'interrupted system call');
}
}

View File

@ -74,17 +74,16 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
{ {
$data = ''; $data = '';
$process = $this->getProcess('echo "foo";sleep 1;echo "foo"'); $process = $this->getProcess('echo foo && php -r "sleep(1);" && echo foo');
$process->start(function ($type, $buffer) use (&$data) { $process->start(function ($type, $buffer) use (&$data) {
$data .= $buffer; $data .= $buffer;
}); });
$start = microtime(true);
while ($process->isRunning()) { while ($process->isRunning()) {
usleep(10000); usleep(10000);
} }
$this->assertEquals("foo\nfoo\n", $data); $this->assertEquals(2, preg_match_all('/foo/', $data, $matches));
} }
/** /**
@ -124,6 +123,12 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
public function chainedCommandsOutputProvider() public function chainedCommandsOutputProvider()
{ {
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
return array(
array("2 \r\n2\r\n", '&&', '2')
);
}
return array( return array(
array("1\n1\n", ';', '1'), array("1\n1\n", ';', '1'),
array("2\n2\n", '&&', '2'), array("2\n2\n", '&&', '2'),
@ -136,10 +141,6 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
*/ */
public function testChainedCommandsOutput($expected, $operator, $input) public function testChainedCommandsOutput($expected, $operator, $input)
{ {
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
$this->markTestSkipped('Does it work on windows ?');
}
$process = $this->getProcess(sprintf('echo %s %s echo %s', $input, $operator, $input)); $process = $this->getProcess(sprintf('echo %s %s echo %s', $input, $operator, $input));
$process->run(); $process->run();
$this->assertEquals($expected, $process->getOutput()); $this->assertEquals($expected, $process->getOutput());
@ -178,7 +179,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
public function testGetOutput() public function testGetOutput()
{ {
$p = new Process(sprintf('php -r %s', escapeshellarg('$n=0;while ($n<3) {echo \' foo \';$n++;}'))); $p = new Process(sprintf('php -r %s', escapeshellarg('$n=0;while ($n<3) {echo \' foo \';$n++; usleep(500); }')));
$p->run(); $p->run();
$this->assertEquals(3, preg_match_all('/foo/', $p->getOutput(), $matches)); $this->assertEquals(3, preg_match_all('/foo/', $p->getOutput(), $matches));
@ -300,7 +301,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
public function testIsSuccessfulOnlyAfterTerminated() public function testIsSuccessfulOnlyAfterTerminated()
{ {
$process = $this->getProcess('sleep 1'); $process = $this->getProcess('php -r "sleep(1);"');
$process->start(); $process->start();
while ($process->isRunning()) { while ($process->isRunning()) {
$this->assertFalse($process->isSuccessful()); $this->assertFalse($process->isSuccessful());
@ -402,7 +403,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
public function testRunProcessWithTimeout() public function testRunProcessWithTimeout()
{ {
$timeout = 0.5; $timeout = 0.5;
$process = $this->getProcess('sleep 3'); $process = $this->getProcess('php -r "sleep(3);"');
$process->setTimeout($timeout); $process->setTimeout($timeout);
$start = microtime(true); $start = microtime(true);
try { try {
@ -420,7 +421,7 @@ abstract class AbstractProcessTest extends \PHPUnit_Framework_TestCase
{ {
$timeout = 0.5; $timeout = 0.5;
$precision = 100000; $precision = 100000;
$process = $this->getProcess('sleep 3'); $process = $this->getProcess('php -r "sleep(3);"');
$process->setTimeout($timeout); $process->setTimeout($timeout);
$start = microtime(true); $start = microtime(true);

View File

@ -8,9 +8,9 @@ define('ERR_WRITE_FAILED', 4);
$read = array(STDIN); $read = array(STDIN);
$write = array(STDOUT, STDERR); $write = array(STDOUT, STDERR);
stream_set_blocking(STDIN, false); stream_set_blocking(STDIN, 0);
stream_set_blocking(STDOUT, false); stream_set_blocking(STDOUT, 0);
stream_set_blocking(STDERR, false); stream_set_blocking(STDERR, 0);
$out = $err = ''; $out = $err = '';
while ($read || $write) { while ($read || $write) {
@ -26,7 +26,7 @@ while ($read || $write) {
} }
if (in_array(STDOUT, $w) && strlen($out) > 0) { if (in_array(STDOUT, $w) && strlen($out) > 0) {
$written = fwrite(STDOUT, (binary) $out, 1024); $written = fwrite(STDOUT, (binary) $out, 32768);
if (false === $written) { if (false === $written) {
die(ERR_WRITE_FAILED); die(ERR_WRITE_FAILED);
} }
@ -37,7 +37,7 @@ while ($read || $write) {
} }
if (in_array(STDERR, $w) && strlen($err) > 0) { if (in_array(STDERR, $w) && strlen($err) > 0) {
$written = fwrite(STDERR, (binary) $err, 1024); $written = fwrite(STDERR, (binary) $err, 32768);
if (false === $written) { if (false === $written) {
die(ERR_WRITE_FAILED); die(ERR_WRITE_FAILED);
} }
@ -48,7 +48,7 @@ while ($read || $write) {
} }
if ($r) { if ($r) {
$str = fread(STDIN, 1024); $str = fread(STDIN, 32768);
if (false !== $str) { if (false !== $str) {
$out .= $str; $out .= $str;
$err .= $str; $err .= $str;