[HttpClient] improve StreamWrapper
This commit is contained in:
parent
cbc4efc345
commit
ea52d1cea2
@ -227,6 +227,13 @@ final class CurlResponse implements ResponseInterface
|
||||
*/
|
||||
private function close(): void
|
||||
{
|
||||
if (self::$performing) {
|
||||
$this->multi->handlesActivity[$this->id][] = null;
|
||||
$this->multi->handlesActivity[$this->id][] = new TransportException('Response has been canceled.');
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
unset($this->multi->openHandles[$this->id], $this->multi->handlesActivity[$this->id]);
|
||||
curl_multi_remove_handle($this->multi->handle, $this->handle);
|
||||
curl_setopt_array($this->handle, [
|
||||
@ -264,6 +271,12 @@ final class CurlResponse implements ResponseInterface
|
||||
private static function perform(CurlClientState $multi, array &$responses = null): void
|
||||
{
|
||||
if (self::$performing) {
|
||||
if ($responses) {
|
||||
$response = current($responses);
|
||||
$multi->handlesActivity[(int) $response->handle][] = null;
|
||||
$multi->handlesActivity[(int) $response->handle][] = new TransportException(sprintf('Userland callback cannot use the client nor the response while processing "%s".', curl_getinfo($response->handle, CURLINFO_EFFECTIVE_URL)));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@ -370,8 +383,15 @@ final class CurlResponse implements ResponseInterface
|
||||
|
||||
curl_setopt($ch, CURLOPT_PRIVATE, 'content');
|
||||
|
||||
if (!$content && $options['buffer'] instanceof \Closure && $options['buffer']($headers)) {
|
||||
$content = fopen('php://temp', 'w+');
|
||||
try {
|
||||
if (!$content && $options['buffer'] instanceof \Closure && $options['buffer']($headers)) {
|
||||
$content = fopen('php://temp', 'w+');
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
$multi->handlesActivity[$id][] = null;
|
||||
$multi->handlesActivity[$id][] = $e;
|
||||
|
||||
return 0;
|
||||
}
|
||||
} elseif (null !== $info['redirect_url'] && $logger) {
|
||||
$logger->info(sprintf('Redirecting: "%s %s"', $info['http_code'], $info['redirect_url']));
|
||||
|
@ -105,9 +105,7 @@ class MockResponse implements ResponseInterface
|
||||
$response->requestOptions = $options;
|
||||
$response->id = ++self::$idSequence;
|
||||
|
||||
if (($options['buffer'] ?? null) instanceof \Closure) {
|
||||
$response->content = $options['buffer']($mock->getHeaders(false)) ? fopen('php://temp', 'w+') : null;
|
||||
} else {
|
||||
if (!($options['buffer'] ?? null) instanceof \Closure) {
|
||||
$response->content = true === ($options['buffer'] ?? true) ? fopen('php://temp', 'w+') : null;
|
||||
}
|
||||
$response->initializer = static function (self $response) {
|
||||
@ -184,6 +182,10 @@ class MockResponse implements ResponseInterface
|
||||
$response->headers = $chunk[1]->getHeaders(false);
|
||||
self::readResponse($response, $chunk[0], $chunk[1], $offset);
|
||||
$multi->handlesActivity[$id][] = new FirstChunk();
|
||||
|
||||
if (($response->requestOptions['buffer'] ?? null) instanceof \Closure) {
|
||||
$response->content = $response->requestOptions['buffer']($response->headers) ? fopen('php://temp', 'w+') : null;
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
$multi->handlesActivity[$id][] = null;
|
||||
$multi->handlesActivity[$id][] = $e;
|
||||
|
@ -160,10 +160,6 @@ final class NativeResponse implements ResponseInterface
|
||||
stream_set_blocking($h, false);
|
||||
$this->context = $this->resolveRedirect = null;
|
||||
|
||||
if (null !== $this->shouldBuffer && null === $this->content && ($this->shouldBuffer)($this->headers)) {
|
||||
$this->content = fopen('php://temp', 'w+');
|
||||
}
|
||||
|
||||
if (isset($context['ssl']['peer_certificate_chain'])) {
|
||||
$this->info['peer_certificate_chain'] = $context['ssl']['peer_certificate_chain'];
|
||||
}
|
||||
@ -182,6 +178,23 @@ final class NativeResponse implements ResponseInterface
|
||||
$this->inflate = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (null !== $this->shouldBuffer && null === $this->content && ($this->shouldBuffer)($this->headers)) {
|
||||
$this->content = fopen('php://temp', 'w+');
|
||||
}
|
||||
|
||||
if (!$this->buffer) {
|
||||
throw new TransportException('Response has been canceled.');
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
$this->close();
|
||||
$this->multi->handlesActivity[$this->id] = [new FirstChunk()];
|
||||
$this->multi->handlesActivity[$this->id][] = null;
|
||||
$this->multi->handlesActivity[$this->id][] = $e;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
$this->multi->openHandles[$this->id] = [$h, $this->buffer, $this->inflate, $this->content, $this->onProgress, &$this->remaining, &$this->info];
|
||||
$this->multi->handlesActivity[$this->id] = [new FirstChunk()];
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ trait ResponseTrait
|
||||
/** @var resource */
|
||||
private $handle;
|
||||
private $id;
|
||||
private $timeout;
|
||||
private $timeout = 0;
|
||||
private $finalInfo;
|
||||
private $offset = 0;
|
||||
private $jsonData;
|
||||
@ -185,7 +185,7 @@ trait ResponseTrait
|
||||
/**
|
||||
* Casts the response to a PHP stream resource.
|
||||
*
|
||||
* @return resource|null
|
||||
* @return resource
|
||||
*
|
||||
* @throws TransportExceptionInterface When a network error occurs
|
||||
* @throws RedirectionExceptionInterface On a 3xx when $throw is true and the "max_redirects" option has been reached
|
||||
@ -194,8 +194,10 @@ trait ResponseTrait
|
||||
*/
|
||||
public function toStream(bool $throw = true)
|
||||
{
|
||||
// Ensure headers arrived
|
||||
$this->getHeaders($throw);
|
||||
if ($throw) {
|
||||
// Ensure headers arrived
|
||||
$this->getHeaders($throw);
|
||||
}
|
||||
|
||||
return StreamWrapper::createResource($this, null, $this->content, $this->handle && 'stream' === get_resource_type($this->handle) ? $this->handle : null);
|
||||
}
|
||||
|
@ -73,6 +73,11 @@ class StreamWrapper
|
||||
}
|
||||
}
|
||||
|
||||
public function getResponse(): ResponseInterface
|
||||
{
|
||||
return $this->response;
|
||||
}
|
||||
|
||||
public function stream_open(string $path, string $mode, int $options): bool
|
||||
{
|
||||
if ('r' !== $mode) {
|
||||
@ -107,7 +112,9 @@ class StreamWrapper
|
||||
// Empty the internal activity list
|
||||
foreach ($this->client->stream([$this->response], 0) as $chunk) {
|
||||
try {
|
||||
$chunk->isTimeout();
|
||||
if (!$chunk->isTimeout() && $chunk->isFirst()) {
|
||||
$this->response->getStatusCode(); // ignore 3/4/5xx
|
||||
}
|
||||
} catch (ExceptionInterface $e) {
|
||||
trigger_error($e->getMessage(), E_USER_WARNING);
|
||||
|
||||
@ -146,6 +153,10 @@ class StreamWrapper
|
||||
$this->eof = !$chunk->isTimeout();
|
||||
$this->eof = $chunk->isLast();
|
||||
|
||||
if ($chunk->isFirst()) {
|
||||
$this->response->getStatusCode(); // ignore 3/4/5xx
|
||||
}
|
||||
|
||||
if ('' !== $data = $chunk->getContent()) {
|
||||
if (\strlen($data) > $count) {
|
||||
if (null === $this->content) {
|
||||
@ -192,6 +203,10 @@ class StreamWrapper
|
||||
if (SEEK_END === $whence || $size < $offset) {
|
||||
foreach ($this->client->stream([$this->response]) as $chunk) {
|
||||
try {
|
||||
if ($chunk->isFirst()) {
|
||||
$this->response->getStatusCode(); // ignore 3/4/5xx
|
||||
}
|
||||
|
||||
// Chunks are buffered in $this->content already
|
||||
$size += \strlen($chunk->getContent());
|
||||
|
||||
@ -231,6 +246,13 @@ class StreamWrapper
|
||||
|
||||
public function stream_stat(): array
|
||||
{
|
||||
try {
|
||||
$headers = $this->response->getHeaders(false);
|
||||
} catch (ExceptionInterface $e) {
|
||||
trigger_error($e->getMessage(), E_USER_WARNING);
|
||||
$headers = [];
|
||||
}
|
||||
|
||||
return [
|
||||
'dev' => 0,
|
||||
'ino' => 0,
|
||||
@ -239,12 +261,16 @@ class StreamWrapper
|
||||
'uid' => 0,
|
||||
'gid' => 0,
|
||||
'rdev' => 0,
|
||||
'size' => (int) ($this->response->getHeaders(false)['content-length'][0] ?? 0),
|
||||
'size' => (int) ($headers['content-length'][0] ?? 0),
|
||||
'atime' => 0,
|
||||
'mtime' => strtotime($this->response->getHeaders(false)['last-modified'][0] ?? '') ?: 0,
|
||||
'mtime' => strtotime($headers['last-modified'][0] ?? '') ?: 0,
|
||||
'ctime' => 0,
|
||||
'blksize' => 0,
|
||||
'blocks' => 0,
|
||||
];
|
||||
}
|
||||
|
||||
private function __construct()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
namespace Symfony\Component\HttpClient\Tests;
|
||||
|
||||
use Symfony\Component\HttpClient\Exception\ClientException;
|
||||
use Symfony\Component\HttpClient\Exception\TransportException;
|
||||
use Symfony\Contracts\HttpClient\Test\HttpClientTestCase as BaseHttpClientTestCase;
|
||||
|
||||
@ -52,9 +53,7 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
|
||||
public function testToStream()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
|
||||
$response = $client->request('GET', 'http://localhost:8057');
|
||||
|
||||
$stream = $response->toStream();
|
||||
|
||||
$this->assertSame("{\n \"SER", fread($stream, 10));
|
||||
@ -67,6 +66,22 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
|
||||
$this->assertTrue(feof($stream));
|
||||
}
|
||||
|
||||
public function testToStream404()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
$response = $client->request('GET', 'http://localhost:8057/404');
|
||||
$stream = $response->toStream(false);
|
||||
|
||||
$this->assertSame("{\n \"SER", fread($stream, 10));
|
||||
$this->assertSame('VER_PROTOCOL', fread($stream, 12));
|
||||
$this->assertSame($response, stream_get_meta_data($stream)['wrapper_data']->getResponse());
|
||||
$this->assertSame(404, $response->getStatusCode());
|
||||
|
||||
$this->expectException(ClientException::class);
|
||||
$response = $client->request('GET', 'http://localhost:8057/404');
|
||||
$stream = $response->toStream();
|
||||
}
|
||||
|
||||
public function testConditionalBuffering()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
@ -83,4 +98,34 @@ abstract class HttpClientTestCase extends BaseHttpClientTestCase
|
||||
$this->expectExceptionMessage('Cannot get the content of the response twice: buffering is disabled.');
|
||||
$response->getContent();
|
||||
}
|
||||
|
||||
public function testReentrantBufferCallback()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
|
||||
$response = $client->request('GET', 'http://localhost:8057', ['buffer' => function () use (&$response) {
|
||||
$response->cancel();
|
||||
}]);
|
||||
|
||||
$this->assertSame(200, $response->getStatusCode());
|
||||
|
||||
$this->expectException(TransportException::class);
|
||||
$this->expectExceptionMessage('Response has been canceled.');
|
||||
$response->getContent();
|
||||
}
|
||||
|
||||
public function testThrowingBufferCallback()
|
||||
{
|
||||
$client = $this->getHttpClient(__FUNCTION__);
|
||||
|
||||
$response = $client->request('GET', 'http://localhost:8057', ['buffer' => function () {
|
||||
throw new \Exception('Boo');
|
||||
}]);
|
||||
|
||||
$this->assertSame(200, $response->getStatusCode());
|
||||
|
||||
$this->expectException(TransportException::class);
|
||||
$this->expectExceptionMessage('Boo');
|
||||
$response->getContent();
|
||||
}
|
||||
}
|
||||
|
@ -103,6 +103,8 @@ class MockHttpClientTest extends HttpClientTestCase
|
||||
case 'testBadRequestBody':
|
||||
case 'testOnProgressCancel':
|
||||
case 'testOnProgressError':
|
||||
case 'testReentrantBufferCallback':
|
||||
case 'testThrowingBufferCallback':
|
||||
$responses[] = new MockResponse($body, ['response_headers' => $headers]);
|
||||
break;
|
||||
|
||||
|
Reference in New Issue
Block a user