[Process] Accept Traversable input
This commit is contained in:
parent
06eb52cb75
commit
b9782b7eef
@ -22,16 +22,15 @@ abstract class AbstractPipes implements PipesInterface
|
||||
public $pipes = array();
|
||||
|
||||
/** @var string */
|
||||
protected $inputBuffer = '';
|
||||
/** @var resource|null */
|
||||
protected $input;
|
||||
|
||||
private $inputBuffer = '';
|
||||
/** @var resource|\Iterator|null */
|
||||
private $input;
|
||||
/** @var bool */
|
||||
private $blocked = true;
|
||||
|
||||
public function __construct($input)
|
||||
{
|
||||
if (is_resource($input)) {
|
||||
if (is_resource($input) || $input instanceof \Iterator) {
|
||||
$this->input = $input;
|
||||
} elseif (is_string($input)) {
|
||||
$this->inputBuffer = $input;
|
||||
@ -76,7 +75,7 @@ abstract class AbstractPipes implements PipesInterface
|
||||
foreach ($this->pipes as $pipe) {
|
||||
stream_set_blocking($pipe, 0);
|
||||
}
|
||||
if (null !== $this->input) {
|
||||
if (is_resource($this->input)) {
|
||||
stream_set_blocking($this->input, 0);
|
||||
}
|
||||
|
||||
@ -91,9 +90,21 @@ abstract class AbstractPipes implements PipesInterface
|
||||
if (!isset($this->pipes[0])) {
|
||||
return;
|
||||
}
|
||||
$input = $this->input;
|
||||
|
||||
$e = array();
|
||||
$r = null !== $this->input ? array($this->input) : $e;
|
||||
if ($input instanceof \Iterator) {
|
||||
if (!$input->valid()) {
|
||||
$input = null;
|
||||
} elseif (is_resource($input = $input->current())) {
|
||||
stream_set_blocking($input, 0);
|
||||
} else {
|
||||
$this->inputBuffer .= $input;
|
||||
$this->input->next();
|
||||
$input = null;
|
||||
}
|
||||
}
|
||||
|
||||
$r = $e = array();
|
||||
$w = array($this->pipes[0]);
|
||||
|
||||
// let's have a look if something changed in streams
|
||||
@ -109,8 +120,7 @@ abstract class AbstractPipes implements PipesInterface
|
||||
return array($this->pipes[0]);
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($r as $input) {
|
||||
if ($input) {
|
||||
for (;;) {
|
||||
$data = fread($input, self::CHUNK_SIZE);
|
||||
if (!isset($data[0])) {
|
||||
@ -124,16 +134,19 @@ abstract class AbstractPipes implements PipesInterface
|
||||
return array($this->pipes[0]);
|
||||
}
|
||||
}
|
||||
if (!isset($data[0]) && feof($input)) {
|
||||
// no more data to read on input resource
|
||||
// use an empty buffer in the next reads
|
||||
$this->input = null;
|
||||
if (feof($input)) {
|
||||
if ($this->input instanceof \Iterator) {
|
||||
$this->input->next();
|
||||
} else {
|
||||
$this->input = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// no input to read on resource, buffer is empty
|
||||
if (null === $this->input && !isset($this->inputBuffer[0])) {
|
||||
if (!isset($this->inputBuffer[0]) && !($this->input instanceof \Iterator ? $this->input->valid() : $this->input)) {
|
||||
$this->input = null;
|
||||
fclose($this->pipes[0]);
|
||||
unset($this->pipes[0]);
|
||||
}
|
||||
|
@ -1200,6 +1200,9 @@ class Process
|
||||
*/
|
||||
private function getDescriptors()
|
||||
{
|
||||
if ($this->input instanceof \Iterator) {
|
||||
$this->input->rewind();
|
||||
}
|
||||
if ('\\' === DIRECTORY_SEPARATOR) {
|
||||
$this->processPipes = WindowsPipes::create($this, $this->input);
|
||||
} else {
|
||||
|
@ -93,8 +93,14 @@ class ProcessUtils
|
||||
if (is_scalar($input)) {
|
||||
return (string) $input;
|
||||
}
|
||||
if ($input instanceof \Iterator) {
|
||||
return $input;
|
||||
}
|
||||
if ($input instanceof \Traversable) {
|
||||
return new \IteratorIterator($input);
|
||||
}
|
||||
|
||||
throw new InvalidArgumentException(sprintf('%s only accepts strings or stream resources.', $caller));
|
||||
throw new InvalidArgumentException(sprintf('%s only accepts strings, Traversable objects or stream resources.', $caller));
|
||||
}
|
||||
|
||||
return $input;
|
||||
|
@ -215,7 +215,7 @@ class ProcessBuilderTest extends \PHPUnit_Framework_TestCase
|
||||
|
||||
/**
|
||||
* @expectedException \Symfony\Component\Process\Exception\InvalidArgumentException
|
||||
* @expectedExceptionMessage Symfony\Component\Process\ProcessBuilder::setInput only accepts strings or stream resources.
|
||||
* @expectedExceptionMessage Symfony\Component\Process\ProcessBuilder::setInput only accepts strings, Traversable objects or stream resources.
|
||||
*/
|
||||
public function testInvalidInput()
|
||||
{
|
||||
|
@ -231,7 +231,7 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
|
||||
/**
|
||||
* @dataProvider provideInvalidInputValues
|
||||
* @expectedException \Symfony\Component\Process\Exception\InvalidArgumentException
|
||||
* @expectedExceptionMessage Symfony\Component\Process\Process::setInput only accepts strings or stream resources.
|
||||
* @expectedExceptionMessage Symfony\Component\Process\Process::setInput only accepts strings, Traversable objects or stream resources.
|
||||
*/
|
||||
public function testInvalidInput($value)
|
||||
{
|
||||
@ -1156,6 +1156,35 @@ class ProcessTest extends \PHPUnit_Framework_TestCase
|
||||
);
|
||||
}
|
||||
|
||||
public function testIteratorInput()
|
||||
{
|
||||
$nextData = 'ping';
|
||||
$input = function () use (&$nextData) {
|
||||
while (false !== $nextData) {
|
||||
yield $nextData;
|
||||
yield $nextData = '';
|
||||
}
|
||||
};
|
||||
$input = $input();
|
||||
|
||||
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
|
||||
$process->setInput($input);
|
||||
$process->start(function ($type, $data) use ($input, &$nextData) {
|
||||
if ('ping' === $data) {
|
||||
$h = fopen('php://memory', 'r+');
|
||||
fwrite($h, 'pong');
|
||||
rewind($h);
|
||||
$nextData = $h;
|
||||
$input->next();
|
||||
} else {
|
||||
$nextData = false;
|
||||
}
|
||||
});
|
||||
|
||||
$process->wait();
|
||||
$this->assertSame('pingpong', $process->getOutput());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $commandline
|
||||
* @param null|string $cwd
|
||||
|
Reference in New Issue
Block a user