bug #39228 [HttpClient] Fix content swallowed by AsyncClient initializer (jderusse)
This PR was merged into the 5.2 branch.
Discussion
----------
[HttpClient] Fix content swallowed by AsyncClient initializer
| Q | A
| ------------- | ---
| Branch? | 5.2
| Bug fix? | yes
| New feature? | no
| Deprecations? | no
| Tickets | -
| License | MIT
| Doc PR | -
I'm not sure if it should be fixed in RetryableHttpClient or AsycClient.
The issue is: when the Strategy needs the body to take a decision BUT decide to NOT retry the request, the content is "lost"
In fact, when the first chunk is yield, the AsyncResponse's initializer is stopped, and nothing consume the remaining chunks. Moreover, because the `passthru` were disabled before yielding the first chunk in RetryableHttpClient, the callback is never called again to yield the remaining content.
Commits
-------
d324271691
Fix content swallowed by AsyncClient initializer
This commit is contained in:
commit
359bcc5686
@ -12,6 +12,7 @@
|
||||
namespace Symfony\Component\HttpClient\Response;
|
||||
|
||||
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
|
||||
use Symfony\Component\HttpClient\Chunk\FirstChunk;
|
||||
use Symfony\Component\HttpClient\Chunk\LastChunk;
|
||||
use Symfony\Component\HttpClient\Exception\TransportException;
|
||||
use Symfony\Contracts\HttpClient\ChunkInterface;
|
||||
@ -34,6 +35,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
private $response;
|
||||
private $info = ['canceled' => false];
|
||||
private $passthru;
|
||||
private $stream;
|
||||
private $lastYielded = false;
|
||||
|
||||
/**
|
||||
@ -226,6 +228,19 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
|
||||
$asyncMap[$r->response] = $r;
|
||||
$wrappedResponses[] = $r->response;
|
||||
|
||||
if ($r->stream) {
|
||||
yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);
|
||||
|
||||
if (!isset($asyncMap[$response])) {
|
||||
array_pop($wrappedResponses);
|
||||
}
|
||||
|
||||
if ($r->response !== $response && !isset($asyncMap[$r->response])) {
|
||||
$asyncMap[$r->response] = $r;
|
||||
$wrappedResponses[] = $r->response;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!$client) {
|
||||
@ -286,6 +301,7 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
|
||||
private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
|
||||
{
|
||||
$r->stream = null;
|
||||
$response = $r->response;
|
||||
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
|
||||
if (null === $stream = ($r->passthru)($chunk, $context)) {
|
||||
@ -295,32 +311,39 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
|
||||
return;
|
||||
}
|
||||
$chunk = null;
|
||||
|
||||
if (!$stream instanceof \Iterator) {
|
||||
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
|
||||
}
|
||||
$r->stream = $stream;
|
||||
|
||||
yield from self::passthruStream($response, $r, null, $asyncMap);
|
||||
}
|
||||
|
||||
private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
|
||||
{
|
||||
while (true) {
|
||||
try {
|
||||
if (null !== $chunk) {
|
||||
$stream->next();
|
||||
if (null !== $chunk && $r->stream) {
|
||||
$r->stream->next();
|
||||
}
|
||||
|
||||
if (!$stream->valid()) {
|
||||
if (!$r->stream || !$r->stream->valid() || !$r->stream) {
|
||||
$r->stream = null;
|
||||
break;
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
unset($asyncMap[$response]);
|
||||
$r->stream = null;
|
||||
$r->info['error'] = $e->getMessage();
|
||||
$r->response->cancel();
|
||||
|
||||
yield $r => $chunk = new ErrorChunk($r->offset, $e);
|
||||
$chunk->didThrow() ?: $chunk->getContent();
|
||||
unset($asyncMap[$response]);
|
||||
break;
|
||||
}
|
||||
|
||||
$chunk = $stream->current();
|
||||
$chunk = $r->stream->current();
|
||||
|
||||
if (!$chunk instanceof ChunkInterface) {
|
||||
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
|
||||
@ -356,6 +379,12 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
}
|
||||
}
|
||||
|
||||
if (null !== $chunk->getError() || $chunk->isLast()) {
|
||||
$stream = $r->stream;
|
||||
$r->stream = null;
|
||||
unset($asyncMap[$response]);
|
||||
}
|
||||
|
||||
if (null === $chunk->getError()) {
|
||||
$r->offset += \strlen($content);
|
||||
|
||||
@ -387,7 +416,6 @@ final class AsyncResponse implements ResponseInterface, StreamableInterface
|
||||
$chunk->didThrow() ?: $chunk->getContent();
|
||||
}
|
||||
|
||||
unset($asyncMap[$response]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ class RetryableHttpClient implements HttpClientInterface
|
||||
|
||||
$context->getResponse()->cancel();
|
||||
|
||||
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk instanceof LastChunk ? $content : null, $exception);
|
||||
$delay = $this->getDelayFromHeader($context->getHeaders()) ?? $this->strategy->getDelay($context, $chunk->isLast() ? $content : null, $exception);
|
||||
++$retryCount;
|
||||
|
||||
$this->logger->info('Try #{count} after {delay}ms'.($exception ? ': '.$exception->getMessage() : ', status code: '.$context->getStatusCode()), [
|
||||
|
@ -282,4 +282,25 @@ class AsyncDecoratorTraitTest extends NativeHttpClientTest
|
||||
$this->assertSame('test', $lastInfo['foo']);
|
||||
$this->assertArrayHasKey('previous_info', $lastInfo);
|
||||
}
|
||||
|
||||
public function testMultipleYieldInInitializer()
|
||||
{
|
||||
$first = null;
|
||||
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$first) {
|
||||
if ($chunk->isFirst()) {
|
||||
$first = $chunk;
|
||||
|
||||
return;
|
||||
}
|
||||
$context->passthru();
|
||||
yield $first;
|
||||
yield $context->createChunk('injectedFoo');
|
||||
yield $chunk;
|
||||
});
|
||||
|
||||
$response = $client->request('GET', 'http://localhost:8057/404', ['timeout' => 0.1]);
|
||||
|
||||
$this->assertSame(404, $response->getStatusCode());
|
||||
$this->assertStringContainsString('injectedFoo', $response->getContent(false));
|
||||
}
|
||||
}
|
||||
|
@ -68,6 +68,31 @@ class RetryableHttpClientTest extends TestCase
|
||||
self::assertSame(200, $response->getStatusCode());
|
||||
}
|
||||
|
||||
public function testRetryWithBodyKeepContent()
|
||||
{
|
||||
$client = new RetryableHttpClient(
|
||||
new MockHttpClient([
|
||||
new MockResponse('my bad', ['http_code' => 400]),
|
||||
]),
|
||||
new class([400], 0) extends GenericRetryStrategy {
|
||||
public function shouldRetry(AsyncContext $context, ?string $responseContent, ?TransportExceptionInterface $exception): ?bool
|
||||
{
|
||||
if (null === $responseContent) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return 'my bad' !== $responseContent;
|
||||
}
|
||||
},
|
||||
1
|
||||
);
|
||||
|
||||
$response = $client->request('GET', 'http://example.com/foo-bar');
|
||||
|
||||
self::assertSame(400, $response->getStatusCode());
|
||||
self::assertSame('my bad', $response->getContent(false));
|
||||
}
|
||||
|
||||
public function testRetryWithBodyInvalid()
|
||||
{
|
||||
$client = new RetryableHttpClient(
|
||||
|
Reference in New Issue
Block a user