[HttpClient] yield a last chunk for completed responses also

This commit is contained in:
Nicolas Grekas 2019-03-11 10:55:59 +01:00
parent f54c89c530
commit e11ef7ed12
4 changed files with 38 additions and 19 deletions

View File

@ -80,11 +80,12 @@ final class CurlResponse implements ResponseInterface
if ($onProgress = $options['on_progress']) {
$url = isset($info['url']) ? ['url' => $info['url']] : [];
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, static function ($ch, $dlSize, $dlNow) use ($onProgress, &$info, $url) {
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, static function ($ch, $dlSize, $dlNow) use ($onProgress, &$info, $url, $multi) {
try {
$onProgress($dlNow, $dlSize, $url + curl_getinfo($ch) + $info);
} catch (\Throwable $e) {
$info['error'] = $e;
$multi->handlesActivity[(int) $ch][] = null;
$multi->handlesActivity[(int) $ch][] = $e;
return 1; // Abort the request
}
@ -109,6 +110,7 @@ final class CurlResponse implements ResponseInterface
}
self::stream([$response])->current();
} catch (\Throwable $e) {
// Persist timeouts thrown during initialization
$response->info['error'] = $e->getMessage();
$response->close();
throw $e;
@ -201,13 +203,17 @@ final class CurlResponse implements ResponseInterface
*/
protected static function schedule(self $response, array &$runningResponses): void
{
if ('' === curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE)) {
// no-op - response already completed
} elseif (isset($runningResponses[$i = (int) $response->multi->handle])) {
if (isset($runningResponses[$i = (int) $response->multi->handle])) {
$runningResponses[$i][1][$response->id] = $response;
} else {
$runningResponses[$i] = [$response->multi, [$response->id => $response]];
}
if ('' === curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE)) {
// Response already completed
$response->multi->handlesActivity[$response->id][] = null;
$response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
}
}
/**

View File

@ -165,10 +165,6 @@ final class NativeResponse implements ResponseInterface
*/
private static function schedule(self $response, array &$runningResponses): void
{
if (null === $response->buffer) {
return;
}
if (!isset($runningResponses[$i = $response->multi->id])) {
$runningResponses[$i] = [$response->multi, []];
}
@ -178,6 +174,12 @@ final class NativeResponse implements ResponseInterface
} else {
$runningResponses[$i][1][$response->id] = $response;
}
if (null === $response->buffer) {
// Response already completed
$response->multi->handlesActivity[$response->id][] = null;
$response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
}
}
/**

View File

@ -100,14 +100,16 @@ trait ResponseTrait
}
if (null === $this->content) {
$content = '';
$content = null;
$chunk = null;
foreach (self::stream([$this]) as $chunk) {
$content .= $chunk->getContent();
if (!$chunk->isLast()) {
$content .= $chunk->getContent();
}
}
if (null === $chunk) {
if (null === $content) {
throw new TransportException('Cannot get the content of the response twice: the request was issued with option "buffer" set to false.');
}
@ -280,12 +282,14 @@ trait ResponseTrait
$response->offset += \strlen($chunk);
$chunk = new DataChunk($response->offset, $chunk);
} elseif (null === $chunk) {
if (null !== $e = $response->info['error'] ?? $multi->handlesActivity[$j][0]) {
$e = $multi->handlesActivity[$j][0];
unset($responses[$j], $multi->handlesActivity[$j]);
$response->close();
if (null !== $e) {
$response->info['error'] = $e->getMessage();
if ($e instanceof \Error) {
unset($responses[$j], $multi->handlesActivity[$j]);
$response->close();
throw $e;
}
@ -293,9 +297,6 @@ trait ResponseTrait
} else {
$chunk = new LastChunk($response->offset);
}
unset($responses[$j]);
$response->close();
} elseif ($chunk instanceof ErrorChunk) {
unset($responses[$j]);
$isTimeout = true;

View File

@ -194,8 +194,8 @@ abstract class HttpClientTestCase extends TestCase
$this->assertSame($response, $r);
$this->assertNotNull($chunk->getError());
$this->expectException(TransportExceptionInterface::class);
foreach ($client->stream($response) as $chunk) {
$this->fail('Already errored responses shouldn\'t be yielded');
}
}
@ -340,6 +340,16 @@ abstract class HttpClientTestCase extends TestCase
$this->assertSame($response, $r);
$this->assertSame(['f', 'l'], $result);
$chunk = null;
$i = 0;
foreach ($client->stream($response) as $chunk) {
++$i;
}
$this->assertSame(1, $i);
$this->assertTrue($chunk->isLast());
}
public function testAddToStream()