feature #18386 [Process] Add InputStream to seamlessly feed running processes (nicolas-grekas)

This PR was merged into the 3.1-dev branch.

Discussion
----------

[Process] Add InputStream to seamlessly feed running processes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #18262
| License       | MIT
| Doc PR        | https://github.com/symfony/symfony-docs/pull/6424

Look at the tests, beautiful, isn't it?

Commits
-------

3d20b6c [Process] Add InputStream to seamlessly feed running processes
This commit is contained in:
Fabien Potencier 2016-04-02 10:44:52 +02:00
commit dc189f0793
5 changed files with 192 additions and 23 deletions

View File

@ -0,0 +1,90 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Process;
use Symfony\Component\Process\Exception\RuntimeException;
/**
* Provides a way to continuously write to the input of a Process until the InputStream is closed.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
class InputStream implements \IteratorAggregate
{
private $onEmpty = null;
private $input = array();
private $open = true;
/**
* Sets a callback that is called when the write buffer becomes empty.
*/
public function onEmpty(callable $onEmpty = null)
{
$this->onEmpty = $onEmpty;
}
/**
* Appends an input to the write buffer.
*
* @param resource|scalar|\Traversable|null The input to append as stream resource, scalar or \Traversable
*/
public function write($input)
{
if (null === $input) {
return;
}
if ($this->isClosed()) {
throw new RuntimeException(sprintf('%s is closed', static::class));
}
$this->input[] = ProcessUtils::validateInput(__METHOD__, $input);
}
/**
* Closes the write buffer.
*/
public function close()
{
$this->open = false;
}
/**
* Tells whether the write buffer is closed or not.
*/
public function isClosed()
{
return !$this->open;
}
public function getIterator()
{
$this->open = true;
while ($this->open || $this->input) {
if (!$this->input) {
yield '';
continue;
}
$current = array_shift($this->input);
if ($current instanceof \Iterator) {
foreach ($current as $cur) {
yield $cur;
}
} else {
yield $current;
}
if (!$this->input && $this->open && null !== $onEmpty = $this->onEmpty) {
$this->write($onEmpty($this));
}
}
}
}

View File

@ -11,6 +11,8 @@
namespace Symfony\Component\Process\Pipes;
use Symfony\Component\Process\Exception\InvalidArgumentException;
/**
* @author Romain Neutron <imprec@gmail.com>
*
@ -23,7 +25,7 @@ abstract class AbstractPipes implements PipesInterface
/** @var string */
private $inputBuffer = '';
/** @var resource|\Iterator|null */
/** @var resource|scalar|\Iterator|null */
private $input;
/** @var bool */
private $blocked = true;
@ -84,6 +86,8 @@ abstract class AbstractPipes implements PipesInterface
/**
* Writes input to stdin.
*
* @throws InvalidArgumentException When an input iterator yields a non supported value
*/
protected function write()
{
@ -97,10 +101,18 @@ abstract class AbstractPipes implements PipesInterface
$input = null;
} elseif (is_resource($input = $input->current())) {
stream_set_blocking($input, 0);
} else {
$this->inputBuffer .= $input;
} elseif (!isset($this->inputBuffer[0])) {
if (!is_string($input)) {
if (!is_scalar($input)) {
throw new InvalidArgumentException(sprintf('%s yielded a value of type "%s", but only scalars and stream resources are supported', get_class($this->input), gettype($input)));
}
$input = (string) $input;
}
$this->inputBuffer = $input;
$this->input->next();
$input = null;
} else {
$input = null;
}
}

View File

@ -132,7 +132,7 @@ class Process
* @param string $commandline The command line to run
* @param string|null $cwd The working directory or null to use the working dir of the current PHP process
* @param array|null $env The environment variables or null to use the same environment as the current PHP process
* @param string|null $input The input
* @param mixed|null $input The input as stream resource, scalar or \Traversable, or null for no input
* @param int|float|null $timeout The timeout in seconds or null to disable
* @param array $options An array of options for proc_open
*
@ -1027,7 +1027,7 @@ class Process
/**
* Gets the Process input.
*
* @return null|string The Process input
* @return resource|string|\Iterator|null The Process input
*/
public function getInput()
{
@ -1039,7 +1039,7 @@ class Process
*
* This content will be passed to the underlying process standard input.
*
* @param mixed $input The content
* @param resource|scalar|\Traversable|null $input The content
*
* @return self The current Process instance
*

View File

@ -167,7 +167,7 @@ class ProcessBuilder
/**
* Sets the input of the process.
*
* @param mixed $input The input as a string
* @param resource|scalar|\Traversable|null $input The input content
*
* @return ProcessBuilder
*

View File

@ -14,6 +14,7 @@ namespace Symfony\Component\Process\Tests;
use Symfony\Component\Process\Exception\LogicException;
use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\InputStream;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Pipes\PipesInterface;
use Symfony\Component\Process\Process;
@ -1176,33 +1177,99 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
public function testIteratorInput()
{
$nextData = 'ping';
$input = function () use (&$nextData) {
while (false !== $nextData) {
yield $nextData;
yield $nextData = '';
}
$input = function () {
yield 'ping';
yield 'pong';
};
$input = $input();
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input());
$process->run();
$this->assertSame('pingpong', $process->getOutput());
}
public function testSimpleInputStream()
{
$input = new InputStream();
$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input, &$nextData) {
$process->start(function ($type, $data) use ($input) {
if ('ping' === $data) {
$h = fopen('php://memory', 'r+');
fwrite($h, 'pong');
rewind($h);
$nextData = $h;
$input->next();
} else {
$nextData = false;
$input->write('pang');
} elseif (!$input->isClosed()) {
$input->write('pong');
$input->close();
}
});
$process->wait();
$this->assertSame('pingpangpong', $process->getOutput());
}
public function testInputStreamWithCallable()
{
$i = 0;
$stream = fopen('php://memory', 'w+');
$stream = function () use ($stream, &$i) {
if ($i < 3) {
rewind($stream);
fwrite($stream, ++$i);
rewind($stream);
return $stream;
}
};
$input = new InputStream();
$input->onEmpty($stream);
$input->write($stream());
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
$input->close();
});
$process->wait();
$this->assertSame('123', $process->getOutput());
}
public function testInputStreamWithGenerator()
{
$input = new InputStream();
$input->onEmpty(function ($input) {
yield 'pong';
$input->close();
});
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start();
$input->write('ping');
$process->wait();
$this->assertSame('pingpong', $process->getOutput());
}
public function testInputStreamOnEmpty()
{
$i = 0;
$input = new InputStream();
$input->onEmpty(function () use (&$i) {++$i;});
$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
if ('123' === $data) {
$input->close();
}
});
$process->wait();
$this->assertSame(0, $i, 'InputStream->onEmpty callback should be called only when the input *becomes* empty');
$this->assertSame('123456', $process->getOutput());
}
/**
* @param string $commandline
* @param null|string $cwd