diff --git a/src/Symfony/Component/Process/Pipes/AbstractPipes.php b/src/Symfony/Component/Process/Pipes/AbstractPipes.php index 1ca85739f5..f2fd35eb43 100644 --- a/src/Symfony/Component/Process/Pipes/AbstractPipes.php +++ b/src/Symfony/Component/Process/Pipes/AbstractPipes.php @@ -29,6 +29,17 @@ abstract class AbstractPipes implements PipesInterface /** @var bool */ private $blocked = true; + public function __construct($input) + { + if (is_resource($input)) { + $this->input = $input; + } elseif (is_string($input)) { + $this->inputBuffer = $input; + } else { + $this->inputBuffer = (string) $input; + } + } + /** * {@inheritdoc} */ @@ -71,4 +82,64 @@ abstract class AbstractPipes implements PipesInterface $this->blocked = false; } + + /** + * Writes input to stdin. + */ + protected function write() + { + if (!isset($this->pipes[0])) { + return; + } + + $e = array(); + $r = null !== $this->input ? array($this->input) : $e; + $w = array($this->pipes[0]); + + // let's have a look if something changed in streams + if (false === $n = @stream_select($r, $w, $e, 0, 0)) { + return; + } + + foreach ($w as $stdin) { + if (isset($this->inputBuffer[0])) { + $written = fwrite($stdin, $this->inputBuffer); + $this->inputBuffer = substr($this->inputBuffer, $written); + if (isset($this->inputBuffer[0])) { + return array($this->pipes[0]); + } + } + + foreach ($r as $input) { + for (;;) { + $data = fread($input, self::CHUNK_SIZE); + if (!isset($data[0])) { + break; + } + $written = fwrite($stdin, $data); + $data = substr($data, $written); + if (isset($data[0])) { + $this->inputBuffer = $data; + + return array($this->pipes[0]); + } + } + if (!isset($data[0]) && feof($input)) { + // no more data to read on input resource + // use an empty buffer in the next reads + $this->input = null; + } + } + } + + // no input to read on resource, buffer is empty + if (null === $this->input && !isset($this->inputBuffer[0])) { + fclose($this->pipes[0]); + unset($this->pipes[0]); + } + + if (!$w) { + return array($this->pipes[0]); + } + } } diff --git a/src/Symfony/Component/Process/Pipes/UnixPipes.php b/src/Symfony/Component/Process/Pipes/UnixPipes.php index 925f4f0b05..73146356d4 100644 --- a/src/Symfony/Component/Process/Pipes/UnixPipes.php +++ b/src/Symfony/Component/Process/Pipes/UnixPipes.php @@ -35,13 +35,7 @@ class UnixPipes extends AbstractPipes $this->ptyMode = (bool) $ptyMode; $this->disableOutput = (bool) $disableOutput; - if (is_resource($input)) { - $this->input = $input; - } else { - $this->input = fopen('php://temp', 'w+'); - fwrite($this->input, $input); - fseek($this->input, 0); - } + parent::__construct($input); } public function __destruct() @@ -100,36 +94,15 @@ class UnixPipes extends AbstractPipes */ public function readAndWrite($blocking, $close = false) { - // only stdin is left open, job has been done ! - // we can now close it - if (1 === count($this->pipes) && array(0) === array_keys($this->pipes)) { - fclose($this->pipes[0]); - unset($this->pipes[0]); - } - - if (empty($this->pipes)) { - return array(); - } - $this->unblock(); + $w = $this->write(); - $read = array(); - - if (null !== $this->input) { - // if input is a resource, let's add it to stream_select argument to - // fill a buffer - $r = array_merge($this->pipes, array('input' => $this->input)); - } else { - $r = $this->pipes; - } - // discard read on stdin + $read = $e = array(); + $r = $this->pipes; unset($r[0]); - $w = isset($this->pipes[0]) ? array($this->pipes[0]) : null; - $e = null; - // let's have a look if something changed in streams - if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) { + if ($r && false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) { // if a system call has been interrupted, forget about it, let's try again // otherwise, an error occurred, let's reset pipes if (!$this->hasSystemCallBeenInterrupted()) { @@ -139,46 +112,26 @@ class UnixPipes extends AbstractPipes return $read; } - // nothing has changed - if (0 === $n) { - return $read; - } - foreach ($r as $pipe) { // prior PHP 5.4 the array passed to stream_select is modified and // lose key association, we have to find back the key - $type = (false !== $found = array_search($pipe, $this->pipes)) ? $found : 'input'; - $data = ''; - if ($type !== 'input') { - while ('' !== $dataread = (string) fread($pipe, self::CHUNK_SIZE)) { - $data .= $dataread; - } - // Remove extra null chars returned by fread - if ('' !== $data) { - $read[$type] = rtrim($data, "\x00"); - } - } elseif (isset($w[0])) { - stream_copy_to_stream($this->input, $w[0], 4096); + $read[$type = array_search($pipe, $this->pipes, true)] = ''; + + do { + $data = fread($pipe, self::CHUNK_SIZE); + $read[$type] .= $data; + } while (isset($data[0])); + + if (!isset($read[$type][0])) { + unset($read[$type]); } - if (false === $data || (true === $close && feof($pipe) && '' === $data)) { - if ($type === 'input') { - // no more data to read on input resource - // use an empty buffer in the next reads - $this->input = null; - } else { - fclose($this->pipes[$type]); - unset($this->pipes[$type]); - } + if ($close && feof($pipe)) { + fclose($pipe); + unset($this->pipes[$type]); } } - // no input to read on resource and stdin still open - if (null === $this->input && isset($this->pipes[0])) { - fclose($this->pipes[0]); - unset($this->pipes[0]); - } - return $read; } diff --git a/src/Symfony/Component/Process/Pipes/WindowsPipes.php b/src/Symfony/Component/Process/Pipes/WindowsPipes.php index 1472f8c6c4..bedac8feed 100644 --- a/src/Symfony/Component/Process/Pipes/WindowsPipes.php +++ b/src/Symfony/Component/Process/Pipes/WindowsPipes.php @@ -52,17 +52,13 @@ class WindowsPipes extends AbstractPipes Process::STDERR => tempnam(sys_get_temp_dir(), 'err_sf_proc'), ); foreach ($this->files as $offset => $file) { - if (false === $file || false === $this->fileHandles[$offset] = fopen($file, 'rb')) { + if (false === $file || false === $this->fileHandles[$offset] = @fopen($file, 'rb')) { throw new RuntimeException('A temporary file could not be opened to write the process output to, verify that your TEMP environment variable is writable'); } } } - if (is_resource($input)) { - $this->input = $input; - } else { - $this->inputBuffer = $input; - } + parent::__construct($input); } public function __destruct() @@ -109,28 +105,22 @@ class WindowsPipes extends AbstractPipes */ public function readAndWrite($blocking, $close = false) { - $this->write($blocking, $close); + $this->unblock(); + $this->write(); $read = array(); - $fh = $this->fileHandles; - foreach ($fh as $type => $fileHandle) { - if (0 !== fseek($fileHandle, $this->readBytes[$type])) { - continue; - } - $data = ''; - $dataread = null; - while (!feof($fileHandle)) { - if (false !== $dataread = fread($fileHandle, self::CHUNK_SIZE)) { - $data .= $dataread; - } - } - if (0 < $length = strlen($data)) { - $this->readBytes[$type] += $length; + if ($this->fileHandles && $blocking) { + usleep(Process::TIMEOUT_PRECISION * 1E6); + } + foreach ($this->fileHandles as $type => $fileHandle) { + $data = stream_get_contents($fileHandle, -1, $this->readBytes[$type]); + + if (isset($data[0])) { + $this->readBytes[$type] += strlen($data); $read[$type] = $data; } - - if (false === $dataread || (true === $close && feof($fileHandle) && '' === $data)) { - fclose($this->fileHandles[$type]); + if ($close) { + fclose($fileHandle); unset($this->fileHandles[$type]); } } @@ -143,7 +133,7 @@ class WindowsPipes extends AbstractPipes */ public function areOpen() { - return (bool) $this->pipes && (bool) $this->fileHandles; + return $this->pipes && $this->fileHandles; } /** @@ -183,71 +173,4 @@ class WindowsPipes extends AbstractPipes } $this->files = array(); } - - /** - * Writes input to stdin. - * - * @param bool $blocking - * @param bool $close - */ - private function write($blocking, $close) - { - if (empty($this->pipes)) { - return; - } - - $this->unblock(); - - $r = null !== $this->input ? array('input' => $this->input) : null; - $w = isset($this->pipes[0]) ? array($this->pipes[0]) : null; - $e = null; - - // let's have a look if something changed in streams - if (false === $n = @stream_select($r, $w, $e, 0, $blocking ? Process::TIMEOUT_PRECISION * 1E6 : 0)) { - // if a system call has been interrupted, forget about it, let's try again - // otherwise, an error occurred, let's reset pipes - if (!$this->hasSystemCallBeenInterrupted()) { - $this->pipes = array(); - } - - return; - } - - // nothing has changed - if (0 === $n) { - return; - } - - if (null !== $w && 0 < count($r)) { - $data = ''; - while ($dataread = fread($r['input'], self::CHUNK_SIZE)) { - $data .= $dataread; - } - - $this->inputBuffer .= $data; - - if (false === $data || (true === $close && feof($r['input']) && '' === $data)) { - // no more data to read on input resource - // use an empty buffer in the next reads - $this->input = null; - } - } - - if (null !== $w && 0 < count($w)) { - while (strlen($this->inputBuffer)) { - $written = fwrite($w[0], $this->inputBuffer, 2 << 18); - if ($written > 0) { - $this->inputBuffer = (string) substr($this->inputBuffer, $written); - } else { - break; - } - } - } - - // no input to read on resource, buffer is empty and stdin still open - if ('' === $this->inputBuffer && null === $this->input && isset($this->pipes[0])) { - fclose($this->pipes[0]); - unset($this->pipes[0]); - } - } } diff --git a/src/Symfony/Component/Process/Tests/ProcessTest.php b/src/Symfony/Component/Process/Tests/ProcessTest.php index 652246267c..501662f8ac 100644 --- a/src/Symfony/Component/Process/Tests/ProcessTest.php +++ b/src/Symfony/Component/Process/Tests/ProcessTest.php @@ -192,9 +192,6 @@ class ProcessTest extends \PHPUnit_Framework_TestCase */ public function testSetStreamAsInput($code, $size) { - if ('\\' === DIRECTORY_SEPARATOR) { - $this->markTestIncomplete('This test fails with a timeout on Windows, can someone investigate please?'); - } $expected = str_repeat(str_repeat('*', 1024), $size).'!'; $expectedLength = (1024 * $size) + 1; @@ -791,10 +788,7 @@ class ProcessTest extends \PHPUnit_Framework_TestCase public function testIdleTimeoutNotExceededWhenOutputIsSent() { - if ('\\' === DIRECTORY_SEPARATOR) { - $this->markTestIncomplete('This test fails with a timeout on Windows, can someone investigate please?'); - } - $process = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('while (true) {echo "foo\n"; usleep(10000);}'))); + $process = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('while (true) {echo \'foo \'; usleep(1000);}'))); $process->setTimeout(1); $process->start(); @@ -802,15 +796,15 @@ class ProcessTest extends \PHPUnit_Framework_TestCase usleep(1000); } - $process->setIdleTimeout(0.1); + $process->setIdleTimeout(0.5); try { $process->wait(); $this->fail('A timeout exception was expected.'); - } catch (ProcessTimedOutException $ex) { - $this->assertTrue($ex->isGeneralTimeout(), 'A general timeout is expected.'); - $this->assertFalse($ex->isIdleTimeout(), 'No idle timeout is expected.'); - $this->assertEquals(1, $ex->getExceededTimeout()); + } catch (ProcessTimedOutException $e) { + $this->assertTrue($e->isGeneralTimeout(), 'A general timeout is expected.'); + $this->assertFalse($e->isIdleTimeout(), 'No idle timeout is expected.'); + $this->assertEquals(1, $e->getExceededTimeout()); } }