bug #18066 [Process] Fix pipes handling (nicolas-grekas)

This PR was merged into the 2.7 branch.

Discussion
----------

[Process] Fix pipes handling

| Q             | A
| ------------- | ---
| Branch        | 2.7
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #18015
| License       | MIT
| Doc PR        | -

Commits
-------

3ab6c39 [Process] Fix pipes handling
This commit is contained in:
Fabien Potencier 2016-03-10 11:09:14 +01:00
commit 1d9815f5c7
4 changed files with 109 additions and 168 deletions

View File

@ -29,6 +29,17 @@ abstract class AbstractPipes implements PipesInterface
/** @var bool */
private $blocked = true;
public function __construct($input)
{
if (is_resource($input)) {
$this->input = $input;
} elseif (is_string($input)) {
$this->inputBuffer = $input;
} else {
$this->inputBuffer = (string) $input;
}
}
/**
* {@inheritdoc}
*/
@ -71,4 +82,64 @@ abstract class AbstractPipes implements PipesInterface
$this->blocked = false;
}
/**
* Writes input to stdin.
*/
protected function write()
{
if (!isset($this->pipes[0])) {
return;
}
$e = array();
$r = null !== $this->input ? array($this->input) : $e;
$w = array($this->pipes[0]);
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, 0)) {
return;
}
foreach ($w as $stdin) {
if (isset($this->inputBuffer[0])) {
$written = fwrite($stdin, $this->inputBuffer);
$this->inputBuffer = substr($this->inputBuffer, $written);
if (isset($this->inputBuffer[0])) {
return array($this->pipes[0]);
}
}
foreach ($r as $input) {
for (;;) {
$data = fread($input, self::CHUNK_SIZE);
if (!isset($data[0])) {
break;
}
$written = fwrite($stdin, $data);
$data = substr($data, $written);
if (isset($data[0])) {
$this->inputBuffer = $data;
return array($this->pipes[0]);
}
}
if (!isset($data[0]) && feof($input)) {
// no more data to read on input resource
// use an empty buffer in the next reads
$this->input = null;
}
}
}
// no input to read on resource, buffer is empty
if (null === $this->input && !isset($this->inputBuffer[0])) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}
if (!$w) {
return array($this->pipes[0]);
}
}
}

View File

@ -35,13 +35,7 @@ class UnixPipes extends AbstractPipes
$this->ptyMode = (bool) $ptyMode;
$this->disableOutput = (bool) $disableOutput;
if (is_resource($input)) {
$this->input = $input;
} else {
$this->input = fopen('php://temp', 'w+');
fwrite($this->input, $input);
fseek($this->input, 0);
}
parent::__construct($input);
}
public function __destruct()
@ -100,36 +94,15 @@ class UnixPipes extends AbstractPipes
*/
public function readAndWrite($blocking, $close = false)
{
// only stdin is left open, job has been done !
// we can now close it
if (1 === count($this->pipes) && array(0) === array_keys($this->pipes)) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}
if (empty($this->pipes)) {
return array();
}
$this->unblock();
$w = $this->write();
$read = array();
if (null !== $this->input) {
// if input is a resource, let's add it to stream_select argument to
// fill a buffer
$r = array_merge($this->pipes, array('input' => $this->input));
} else {
$r = $this->pipes;
}
// discard read on stdin
$read = $e = array();
$r = $this->pipes;
unset($r[0]);
$w = isset($this->pipes[0]) ? array($this->pipes[0]) : null;
$e = null;
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) {
if ($r && false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occurred, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
@ -139,46 +112,26 @@ class UnixPipes extends AbstractPipes
return $read;
}
// nothing has changed
if (0 === $n) {
return $read;
}
foreach ($r as $pipe) {
// prior PHP 5.4 the array passed to stream_select is modified and
// lose key association, we have to find back the key
$type = (false !== $found = array_search($pipe, $this->pipes)) ? $found : 'input';
$data = '';
if ($type !== 'input') {
while ('' !== $dataread = (string) fread($pipe, self::CHUNK_SIZE)) {
$data .= $dataread;
}
// Remove extra null chars returned by fread
if ('' !== $data) {
$read[$type] = rtrim($data, "\x00");
}
} elseif (isset($w[0])) {
stream_copy_to_stream($this->input, $w[0], 4096);
$read[$type = array_search($pipe, $this->pipes, true)] = '';
do {
$data = fread($pipe, self::CHUNK_SIZE);
$read[$type] .= $data;
} while (isset($data[0]));
if (!isset($read[$type][0])) {
unset($read[$type]);
}
if (false === $data || (true === $close && feof($pipe) && '' === $data)) {
if ($type === 'input') {
// no more data to read on input resource
// use an empty buffer in the next reads
$this->input = null;
} else {
fclose($this->pipes[$type]);
unset($this->pipes[$type]);
}
if ($close && feof($pipe)) {
fclose($pipe);
unset($this->pipes[$type]);
}
}
// no input to read on resource and stdin still open
if (null === $this->input && isset($this->pipes[0])) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}
return $read;
}

View File

@ -52,17 +52,13 @@ class WindowsPipes extends AbstractPipes
Process::STDERR => tempnam(sys_get_temp_dir(), 'err_sf_proc'),
);
foreach ($this->files as $offset => $file) {
if (false === $file || false === $this->fileHandles[$offset] = fopen($file, 'rb')) {
if (false === $file || false === $this->fileHandles[$offset] = @fopen($file, 'rb')) {
throw new RuntimeException('A temporary file could not be opened to write the process output to, verify that your TEMP environment variable is writable');
}
}
}
if (is_resource($input)) {
$this->input = $input;
} else {
$this->inputBuffer = $input;
}
parent::__construct($input);
}
public function __destruct()
@ -109,28 +105,22 @@ class WindowsPipes extends AbstractPipes
*/
public function readAndWrite($blocking, $close = false)
{
$this->write($blocking, $close);
$this->unblock();
$this->write();
$read = array();
$fh = $this->fileHandles;
foreach ($fh as $type => $fileHandle) {
if (0 !== fseek($fileHandle, $this->readBytes[$type])) {
continue;
}
$data = '';
$dataread = null;
while (!feof($fileHandle)) {
if (false !== $dataread = fread($fileHandle, self::CHUNK_SIZE)) {
$data .= $dataread;
}
}
if (0 < $length = strlen($data)) {
$this->readBytes[$type] += $length;
if ($this->fileHandles && $blocking) {
usleep(Process::TIMEOUT_PRECISION * 1E6);
}
foreach ($this->fileHandles as $type => $fileHandle) {
$data = stream_get_contents($fileHandle, -1, $this->readBytes[$type]);
if (isset($data[0])) {
$this->readBytes[$type] += strlen($data);
$read[$type] = $data;
}
if (false === $dataread || (true === $close && feof($fileHandle) && '' === $data)) {
fclose($this->fileHandles[$type]);
if ($close) {
fclose($fileHandle);
unset($this->fileHandles[$type]);
}
}
@ -143,7 +133,7 @@ class WindowsPipes extends AbstractPipes
*/
public function areOpen()
{
return (bool) $this->pipes && (bool) $this->fileHandles;
return $this->pipes && $this->fileHandles;
}
/**
@ -183,71 +173,4 @@ class WindowsPipes extends AbstractPipes
}
$this->files = array();
}
/**
* Writes input to stdin.
*
* @param bool $blocking
* @param bool $close
*/
private function write($blocking, $close)
{
if (empty($this->pipes)) {
return;
}
$this->unblock();
$r = null !== $this->input ? array('input' => $this->input) : null;
$w = isset($this->pipes[0]) ? array($this->pipes[0]) : null;
$e = null;
// let's have a look if something changed in streams
if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) {
// if a system call has been interrupted, forget about it, let's try again
// otherwise, an error occurred, let's reset pipes
if (!$this->hasSystemCallBeenInterrupted()) {
$this->pipes = array();
}
return;
}
// nothing has changed
if (0 === $n) {
return;
}
if (null !== $w && 0 < count($r)) {
$data = '';
while ($dataread = fread($r['input'], self::CHUNK_SIZE)) {
$data .= $dataread;
}
$this->inputBuffer .= $data;
if (false === $data || (true === $close && feof($r['input']) && '' === $data)) {
// no more data to read on input resource
// use an empty buffer in the next reads
$this->input = null;
}
}
if (null !== $w && 0 < count($w)) {
while (strlen($this->inputBuffer)) {
$written = fwrite($w[0], $this->inputBuffer, 2 << 18);
if ($written > 0) {
$this->inputBuffer = (string) substr($this->inputBuffer, $written);
} else {
break;
}
}
}
// no input to read on resource, buffer is empty and stdin still open
if ('' === $this->inputBuffer && null === $this->input && isset($this->pipes[0])) {
fclose($this->pipes[0]);
unset($this->pipes[0]);
}
}
}

View File

@ -192,9 +192,6 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
*/
public function testSetStreamAsInput($code, $size)
{
if ('\\' === DIRECTORY_SEPARATOR) {
$this->markTestIncomplete('This test fails with a timeout on Windows, can someone investigate please?');
}
$expected = str_repeat(str_repeat('*', 1024), $size).'!';
$expectedLength = (1024 * $size) + 1;
@ -791,10 +788,7 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
public function testIdleTimeoutNotExceededWhenOutputIsSent()
{
if ('\\' === DIRECTORY_SEPARATOR) {
$this->markTestIncomplete('This test fails with a timeout on Windows, can someone investigate please?');
}
$process = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('while (true) {echo "foo\n"; usleep(10000);}')));
$process = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('while (true) {echo \'foo \'; usleep(1000);}')));
$process->setTimeout(1);
$process->start();
@ -802,15 +796,15 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
usleep(1000);
}
$process->setIdleTimeout(0.1);
$process->setIdleTimeout(0.5);
try {
$process->wait();
$this->fail('A timeout exception was expected.');
} catch (ProcessTimedOutException $ex) {
$this->assertTrue($ex->isGeneralTimeout(), 'A general timeout is expected.');
$this->assertFalse($ex->isIdleTimeout(), 'No idle timeout is expected.');
$this->assertEquals(1, $ex->getExceededTimeout());
} catch (ProcessTimedOutException $e) {
$this->assertTrue($e->isGeneralTimeout(), 'A general timeout is expected.');
$this->assertFalse($e->isIdleTimeout(), 'No idle timeout is expected.');
$this->assertEquals(1, $e->getExceededTimeout());
}
}