Fix content swallowed by AsyncClient initializer
This commit is contained in:
parent
fd05da651b
commit
d324271691
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\HttpClient\Response;
|
||||
|
||||
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
|
||||
use Symfony\Component\HttpClient\Chunk\FirstChunk;
|
||||
use Symfony\Component\HttpClient\Chunk\LastChunk;
|
||||
use Symfony\Component\HttpClient\Exception\TransportException;
|
||||
use Symfony\Contracts\HttpClient\ChunkInterface;
|
||||
@ -34,6 +35,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
private $response;
|
||||
private $info = ['canceled' => false];
|
||||
private $passthru;
|
||||
private $stream;
|
||||
private $lastYielded = false;
|
||||
|
||||
/**
|
||||
@ -226,6 +228,19 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
|
||||
$asyncMap[$r->response] = $r;
|
||||
$wrappedResponses[] = $r->response;
|
||||
|
||||
if ($r->stream) {
|
||||
yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);
|
||||
|
||||
if (!isset($asyncMap[$response])) {
|
||||
array_pop($wrappedResponses);
|
||||
}
|
||||
|
||||
if ($r->response !== $response && !isset($asyncMap[$r->response])) {
|
||||
$asyncMap[$r->response] = $r;
|
||||
$wrappedResponses[] = $r->response;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!$client) {
|
||||
@ -286,6 +301,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
|
||||
private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
|
||||
{
|
||||
$r->stream = null;
|
||||
$response = $r->response;
|
||||
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
|
||||
if (null === $stream = ($r->passthru)($chunk, $context)) {
|
||||
@ -295,32 +311,39 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
|
||||
return;
|
||||
}
|
||||
$chunk = null;
|
||||
|
||||
if (!$stream instanceof \Iterator) {
|
||||
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
|
||||
}
|
||||
$r->stream = $stream;
|
||||
|
||||
yield from self::passthruStream($response, $r, null, $asyncMap);
|
||||
}
|
||||
|
||||
private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
|
||||
{
|
||||
while (true) {
|
||||
try {
|
||||
if (null !== $chunk) {
|
||||
$stream->next();
|
||||
if (null !== $chunk && $r->stream) {
|
||||
$r->stream->next();
|
||||
}
|
||||
|
||||
if (!$stream->valid()) {
|
||||
if (!$r->stream || !$r->stream->valid() || !$r->stream) {
|
||||
$r->stream = null;
|
||||
break;
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
unset($asyncMap[$response]);
|
||||
$r->stream = null;
|
||||
$r->info['error'] = $e->getMessage();
|
||||
$r->response->cancel();
|
||||
|
||||
yield $r => $chunk = new ErrorChunk($r->offset, $e);
|
||||
$chunk->didThrow() ?: $chunk->getContent();
|
||||
unset($asyncMap[$response]);
|
||||
break;
|
||||
}
|
||||
|
||||
$chunk = $stream->current();
|
||||
$chunk = $r->stream->current();
|
||||
|
||||
if (!$chunk instanceof ChunkInterface) {
|
||||
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
|
||||
@ -356,6 +379,12 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
}
|
||||
}
|
||||
|
||||
if (null !== $chunk->getError() || $chunk->isLast()) {
|
||||
$stream = $r->stream;
|
||||
$r->stream = null;
|
||||
unset($asyncMap[$response]);
|
||||
}
|
||||
|
||||
if (null === $chunk->getError()) {
|
||||
$r->offset += \strlen($content);
|
||||
|
||||
@ -387,7 +416,6 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
$chunk->didThrow() ?: $chunk->getContent();
|
||||
}
|
||||
|
||||
unset($asyncMap[$response]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ class RetryableHttpClient implements HttpClientInterface
|
||||
|
||||
$context->getResponse()->cancel();
|
||||
|
||||
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk instanceof LastChunk ? $content : null, $exception);
|
||||
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk->isLast() ? $content : null, $exception);
|
||||
++$retryCount;
|
||||
|
||||
$this->logger->info('Try #{count} after {delay}ms'.($exception ? ': '.$exception->getMessage() : ', status code: '.$context->getStatusCode()), [
|
||||
|
@ -282,4 +282,25 @@ class AsyncDecoratorTraitTest extends NativeHttpClientTest
|
||||
$this->assertSame('test', $lastInfo['foo']);
|
||||
$this->assertArrayHasKey('previous_info', $lastInfo);
|
||||
}
|
||||
|
||||
public function testMultipleYieldInInitializer()
|
||||
{
|
||||
$first = null;
|
||||
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$first) {
|
||||
if ($chunk->isFirst()) {
|
||||
$first = $chunk;
|
||||
|
||||
return;
|
||||
}
|
||||
$context->passthru();
|
||||
yield $first;
|
||||
yield $context->createChunk('injectedFoo');
|
||||
yield $chunk;
|
||||
});
|
||||
|
||||
$response = $client->request('GET', 'http://localhost:8057/404', ['timeout' => 0.1]);
|
||||
|
||||
$this->assertSame(404, $response->getStatusCode());
|
||||
$this->assertStringContainsString('injectedFoo', $response->getContent(false));
|
||||
}
|
||||
}
|
||||
|
@ -68,6 +68,31 @@ class RetryableHttpClientTest extends TestCase
|
||||
self::assertSame(200, $response->getStatusCode());
|
||||
}
|
||||
|
||||
public function testRetryWithBodyKeepContent()
|
||||
{
|
||||
$client = new RetryableHttpClient(
|
||||
new MockHttpClient([
|
||||
new MockResponse('my bad', ['http_code' => 400]),
|
||||
]),
|
||||
new class([400], 0) extends GenericRetryStrategy {
|
||||
public function shouldRetry(AsyncContext $context, ?string $responseContent, ?TransportExceptionInterface $exception): ?bool
|
||||
{
|
||||
if (null === $responseContent) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return 'my bad' !== $responseContent;
|
||||
}
|
||||
},
|
||||
1
|
||||
);
|
||||
|
||||
$response = $client->request('GET', 'http://example.com/foo-bar');
|
||||
|
||||
self::assertSame(400, $response->getStatusCode());
|
||||
self::assertSame('my bad', $response->getContent(false));
|
||||
}
|
||||
|
||||
public function testRetryWithBodyInvalid()
|
||||
{
|
||||
$client = new RetryableHttpClient(
|
||||
|
Reference in New Issue
Block a user