[HttpClient] yield a last chunk for completed responses also
This commit is contained in:
parent
f54c89c530
commit
e11ef7ed12
@ -80,11 +80,12 @@ final class CurlResponse implements ResponseInterface
|
|||||||
if ($onProgress = $options['on_progress']) {
|
if ($onProgress = $options['on_progress']) {
|
||||||
$url = isset($info['url']) ? ['url' => $info['url']] : [];
|
$url = isset($info['url']) ? ['url' => $info['url']] : [];
|
||||||
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
|
curl_setopt($ch, CURLOPT_NOPROGRESS, false);
|
||||||
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, static function ($ch, $dlSize, $dlNow) use ($onProgress, &$info, $url) {
|
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, static function ($ch, $dlSize, $dlNow) use ($onProgress, &$info, $url, $multi) {
|
||||||
try {
|
try {
|
||||||
$onProgress($dlNow, $dlSize, $url + curl_getinfo($ch) + $info);
|
$onProgress($dlNow, $dlSize, $url + curl_getinfo($ch) + $info);
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
$info['error'] = $e;
|
$multi->handlesActivity[(int) $ch][] = null;
|
||||||
|
$multi->handlesActivity[(int) $ch][] = $e;
|
||||||
|
|
||||||
return 1; // Abort the request
|
return 1; // Abort the request
|
||||||
}
|
}
|
||||||
@ -109,6 +110,7 @@ final class CurlResponse implements ResponseInterface
|
|||||||
}
|
}
|
||||||
self::stream([$response])->current();
|
self::stream([$response])->current();
|
||||||
} catch (\Throwable $e) {
|
} catch (\Throwable $e) {
|
||||||
|
// Persist timeouts thrown during initialization
|
||||||
$response->info['error'] = $e->getMessage();
|
$response->info['error'] = $e->getMessage();
|
||||||
$response->close();
|
$response->close();
|
||||||
throw $e;
|
throw $e;
|
||||||
@ -201,13 +203,17 @@ final class CurlResponse implements ResponseInterface
|
|||||||
*/
|
*/
|
||||||
protected static function schedule(self $response, array &$runningResponses): void
|
protected static function schedule(self $response, array &$runningResponses): void
|
||||||
{
|
{
|
||||||
if ('' === curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE)) {
|
if (isset($runningResponses[$i = (int) $response->multi->handle])) {
|
||||||
// no-op - response already completed
|
|
||||||
} elseif (isset($runningResponses[$i = (int) $response->multi->handle])) {
|
|
||||||
$runningResponses[$i][1][$response->id] = $response;
|
$runningResponses[$i][1][$response->id] = $response;
|
||||||
} else {
|
} else {
|
||||||
$runningResponses[$i] = [$response->multi, [$response->id => $response]];
|
$runningResponses[$i] = [$response->multi, [$response->id => $response]];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ('' === curl_getinfo($ch = $response->handle, CURLINFO_PRIVATE)) {
|
||||||
|
// Response already completed
|
||||||
|
$response->multi->handlesActivity[$response->id][] = null;
|
||||||
|
$response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -165,10 +165,6 @@ final class NativeResponse implements ResponseInterface
|
|||||||
*/
|
*/
|
||||||
private static function schedule(self $response, array &$runningResponses): void
|
private static function schedule(self $response, array &$runningResponses): void
|
||||||
{
|
{
|
||||||
if (null === $response->buffer) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isset($runningResponses[$i = $response->multi->id])) {
|
if (!isset($runningResponses[$i = $response->multi->id])) {
|
||||||
$runningResponses[$i] = [$response->multi, []];
|
$runningResponses[$i] = [$response->multi, []];
|
||||||
}
|
}
|
||||||
@ -178,6 +174,12 @@ final class NativeResponse implements ResponseInterface
|
|||||||
} else {
|
} else {
|
||||||
$runningResponses[$i][1][$response->id] = $response;
|
$runningResponses[$i][1][$response->id] = $response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (null === $response->buffer) {
|
||||||
|
// Response already completed
|
||||||
|
$response->multi->handlesActivity[$response->id][] = null;
|
||||||
|
$response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -100,14 +100,16 @@ trait ResponseTrait
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (null === $this->content) {
|
if (null === $this->content) {
|
||||||
$content = '';
|
$content = null;
|
||||||
$chunk = null;
|
$chunk = null;
|
||||||
|
|
||||||
foreach (self::stream([$this]) as $chunk) {
|
foreach (self::stream([$this]) as $chunk) {
|
||||||
|
if (!$chunk->isLast()) {
|
||||||
$content .= $chunk->getContent();
|
$content .= $chunk->getContent();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (null === $chunk) {
|
if (null === $content) {
|
||||||
throw new TransportException('Cannot get the content of the response twice: the request was issued with option "buffer" set to false.');
|
throw new TransportException('Cannot get the content of the response twice: the request was issued with option "buffer" set to false.');
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,12 +282,14 @@ trait ResponseTrait
|
|||||||
$response->offset += \strlen($chunk);
|
$response->offset += \strlen($chunk);
|
||||||
$chunk = new DataChunk($response->offset, $chunk);
|
$chunk = new DataChunk($response->offset, $chunk);
|
||||||
} elseif (null === $chunk) {
|
} elseif (null === $chunk) {
|
||||||
if (null !== $e = $response->info['error'] ?? $multi->handlesActivity[$j][0]) {
|
$e = $multi->handlesActivity[$j][0];
|
||||||
|
unset($responses[$j], $multi->handlesActivity[$j]);
|
||||||
|
$response->close();
|
||||||
|
|
||||||
|
if (null !== $e) {
|
||||||
$response->info['error'] = $e->getMessage();
|
$response->info['error'] = $e->getMessage();
|
||||||
|
|
||||||
if ($e instanceof \Error) {
|
if ($e instanceof \Error) {
|
||||||
unset($responses[$j], $multi->handlesActivity[$j]);
|
|
||||||
$response->close();
|
|
||||||
throw $e;
|
throw $e;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,9 +297,6 @@ trait ResponseTrait
|
|||||||
} else {
|
} else {
|
||||||
$chunk = new LastChunk($response->offset);
|
$chunk = new LastChunk($response->offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
unset($responses[$j]);
|
|
||||||
$response->close();
|
|
||||||
} elseif ($chunk instanceof ErrorChunk) {
|
} elseif ($chunk instanceof ErrorChunk) {
|
||||||
unset($responses[$j]);
|
unset($responses[$j]);
|
||||||
$isTimeout = true;
|
$isTimeout = true;
|
||||||
|
@ -194,8 +194,8 @@ abstract class HttpClientTestCase extends TestCase
|
|||||||
$this->assertSame($response, $r);
|
$this->assertSame($response, $r);
|
||||||
$this->assertNotNull($chunk->getError());
|
$this->assertNotNull($chunk->getError());
|
||||||
|
|
||||||
|
$this->expectException(TransportExceptionInterface::class);
|
||||||
foreach ($client->stream($response) as $chunk) {
|
foreach ($client->stream($response) as $chunk) {
|
||||||
$this->fail('Already errored responses shouldn\'t be yielded');
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,6 +340,16 @@ abstract class HttpClientTestCase extends TestCase
|
|||||||
|
|
||||||
$this->assertSame($response, $r);
|
$this->assertSame($response, $r);
|
||||||
$this->assertSame(['f', 'l'], $result);
|
$this->assertSame(['f', 'l'], $result);
|
||||||
|
|
||||||
|
$chunk = null;
|
||||||
|
$i = 0;
|
||||||
|
|
||||||
|
foreach ($client->stream($response) as $chunk) {
|
||||||
|
++$i;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->assertSame(1, $i);
|
||||||
|
$this->assertTrue($chunk->isLast());
|
||||||
}
|
}
|
||||||
|
|
||||||
public function testAddToStream()
|
public function testAddToStream()
|
||||||
|
Reference in New Issue
Block a user