From 12ccca3cd43fda82f37141adedadd34e71f02b5f Mon Sep 17 00:00:00 2001 From: soyuka Date: Mon, 4 May 2020 19:03:17 +0200 Subject: [PATCH] [HttpClient] add EventSourceHttpClient to consume Server-Sent Events --- src/Symfony/Component/HttpClient/CHANGELOG.md | 1 + .../HttpClient/Chunk/ServerSentEvent.php | 79 ++++++++ .../HttpClient/EventSourceHttpClient.php | 153 ++++++++++++++++ .../Exception/EventSourceException.php | 21 +++ .../Tests/Chunk/ServerSentEventTest.php | 79 ++++++++ .../Tests/EventSourceHttpClientTest.php | 169 ++++++++++++++++++ 6 files changed, 502 insertions(+) create mode 100644 src/Symfony/Component/HttpClient/Chunk/ServerSentEvent.php create mode 100644 src/Symfony/Component/HttpClient/EventSourceHttpClient.php create mode 100644 src/Symfony/Component/HttpClient/Exception/EventSourceException.php create mode 100644 src/Symfony/Component/HttpClient/Tests/Chunk/ServerSentEventTest.php create mode 100644 src/Symfony/Component/HttpClient/Tests/EventSourceHttpClientTest.php diff --git a/src/Symfony/Component/HttpClient/CHANGELOG.md b/src/Symfony/Component/HttpClient/CHANGELOG.md index ae2c9c00ea..b652df1d94 100644 --- a/src/Symfony/Component/HttpClient/CHANGELOG.md +++ b/src/Symfony/Component/HttpClient/CHANGELOG.md @@ -8,6 +8,7 @@ CHANGELOG * added support for pausing responses with a new `pause_handler` callable exposed as an info item * added `StreamableInterface` to ease turning responses into PHP streams * added `MockResponse::getRequestMethod()` and `getRequestUrl()` to allow inspecting which request has been sent + * added `EventSourceHttpClient` a Server-Sent events stream implementing the [EventSource specification](https://www.w3.org/TR/eventsource/#eventsource) 5.1.0 ----- diff --git a/src/Symfony/Component/HttpClient/Chunk/ServerSentEvent.php b/src/Symfony/Component/HttpClient/Chunk/ServerSentEvent.php new file mode 100644 index 0000000000..f7ff4b9631 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Chunk/ServerSentEvent.php @@ -0,0 +1,79 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Chunk; + +use Symfony\Contracts\HttpClient\ChunkInterface; + +/** + * @author Antoine Bluchet + * @author Nicolas Grekas + */ +final class ServerSentEvent extends DataChunk implements ChunkInterface +{ + private $data = ''; + private $id = ''; + private $type = 'message'; + private $retry = 0; + + public function __construct(string $content) + { + parent::__construct(-1, $content); + + // remove BOM + if (0 === strpos($content, "\xEF\xBB\xBF")) { + $content = substr($content, 3); + } + + foreach (preg_split("/(?:\r\n|[\r\n])/", $content) as $line) { + if (0 === $i = strpos($line, ':')) { + continue; + } + + $i = false === $i ? \strlen($line) : $i; + $field = substr($line, 0, $i); + $i += 1 + (' ' === ($line[1 + $i] ?? '')); + + switch ($field) { + case 'id': $this->id = substr($line, $i); break; + case 'event': $this->type = substr($line, $i); break; + case 'data': $this->data .= ('' === $this->data ? '' : "\n").substr($line, $i); break; + case 'retry': + $retry = substr($line, $i); + + if ('' !== $retry && \strlen($retry) === strspn($retry, '0123456789')) { + $this->retry = $retry / 1000.0; + } + break; + } + } + } + + public function getId(): string + { + return $this->id; + } + + public function getType(): string + { + return $this->type; + } + + public function getData(): string + { + return $this->data; + } + + public function getRetry(): float + { + return $this->retry; + } +} diff --git a/src/Symfony/Component/HttpClient/EventSourceHttpClient.php b/src/Symfony/Component/HttpClient/EventSourceHttpClient.php new file mode 100644 index 0000000000..0c6536f508 --- /dev/null +++ b/src/Symfony/Component/HttpClient/EventSourceHttpClient.php @@ -0,0 +1,153 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient; + +use Symfony\Component\HttpClient\Chunk\ServerSentEvent; +use Symfony\Component\HttpClient\Exception\EventSourceException; +use Symfony\Component\HttpClient\Response\AsyncContext; +use Symfony\Component\HttpClient\Response\AsyncResponse; +use Symfony\Contracts\HttpClient\ChunkInterface; +use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +/** + * @author Antoine Bluchet + * @author Nicolas Grekas + */ +final class EventSourceHttpClient implements HttpClientInterface +{ + use AsyncDecoratorTrait; + use HttpClientTrait; + + private $reconnectionTime; + + public function __construct(HttpClientInterface $client = null, float $reconnectionTime = 10.0) + { + $this->client = $client ?? HttpClient::create(); + $this->reconnectionTime = $reconnectionTime; + } + + public function connect(string $url, array $options = []): ResponseInterface + { + return $this->request('GET', $url, self::mergeDefaultOptions($options, [ + 'buffer' => false, + 'headers' => [ + 'Accept' => 'text/event-stream', + 'Cache-Control' => 'no-cache', + ], + ], true)); + } + + public function request(string $method, string $url, array $options = []): ResponseInterface + { + $state = new class() { + public $buffer = null; + public $lastEventId = null; + public $reconnectionTime; + public $lastError = null; + }; + $state->reconnectionTime = $this->reconnectionTime; + + if ($accept = self::normalizeHeaders($options['headers'] ?? [])['accept'] ?? []) { + $state->buffer = \in_array($accept, [['Accept: text/event-stream'], ['accept: text/event-stream']], true) ? '' : null; + } + + return new AsyncResponse($this->client, $method, $url, $options, static function (ChunkInterface $chunk, AsyncContext $context) use ($state, $method, $url, $options) { + if (null !== $state->buffer) { + $context->setInfo('reconnection_time', $state->reconnectionTime); + $isTimeout = false; + } + $lastError = $state->lastError; + $state->lastError = null; + + try { + $isTimeout = $chunk->isTimeout(); + + if (null !== $chunk->getInformationalStatus()) { + yield $chunk; + + return; + } + } catch (TransportExceptionInterface $e) { + $state->lastError = $lastError ?? microtime(true); + + if (null === $state->buffer || ($isTimeout && microtime(true) - $state->lastError < $state->reconnectionTime)) { + yield $chunk; + } else { + $options['headers']['Last-Event-ID'] = $state->lastEventId; + $state->buffer = ''; + $state->lastError = microtime(true); + $context->getResponse()->cancel(); + $context->replaceRequest($method, $url, $options); + if ($isTimeout) { + yield $chunk; + } else { + $context->pause($state->reconnectionTime); + } + } + + return; + } + + if ($chunk->isFirst()) { + if (preg_match('/^text\/event-stream(;|$)/i', $context->getHeaders()['content-type'][0] ?? '')) { + $state->buffer = ''; + } elseif (null !== $lastError || (null !== $state->buffer && 200 === $context->getStatusCode())) { + throw new EventSourceException(sprintf('Response content-type is "%s" while "text/event-stream" was expected for "%s".', $context->getHeaders()['content-type'][0] ?? '', $context->getInfo('url'))); + } else { + $context->passthru(); + } + + if (null === $lastError) { + yield $chunk; + } + + return; + } + + $rx = '/((?:\r\n|[\r\n]){2,})/'; + $content = $state->buffer.$chunk->getContent(); + + if ($chunk->isLast()) { + $rx = substr_replace($rx, '|$', -2, 0); + } + $events = preg_split($rx, $content, -1, PREG_SPLIT_DELIM_CAPTURE); + $state->buffer = array_pop($events); + + for ($i = 0; isset($events[$i]); $i += 2) { + $event = new ServerSentEvent($events[$i].$events[1 + $i]); + + if ('' !== $event->getId()) { + $context->setInfo('last_event_id', $state->lastEventId = $event->getId()); + } + + if ($event->getRetry()) { + $context->setInfo('reconnection_time', $state->reconnectionTime = $event->getRetry()); + } + + yield $event; + } + + if (preg_match('/^(?::[^\r\n]*+(?:\r\n|[\r\n]))+$/m', $state->buffer)) { + $content = $state->buffer; + $state->buffer = ''; + + yield $context->createChunk($content); + } + + if ($chunk->isLast()) { + yield $chunk; + } + }); + } +} diff --git a/src/Symfony/Component/HttpClient/Exception/EventSourceException.php b/src/Symfony/Component/HttpClient/Exception/EventSourceException.php new file mode 100644 index 0000000000..30ab7957c5 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Exception/EventSourceException.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Exception; + +use Symfony\Contracts\HttpClient\Exception\DecodingExceptionInterface; + +/** + * @author Nicolas Grekas + */ +final class EventSourceException extends \RuntimeException implements DecodingExceptionInterface +{ +} diff --git a/src/Symfony/Component/HttpClient/Tests/Chunk/ServerSentEventTest.php b/src/Symfony/Component/HttpClient/Tests/Chunk/ServerSentEventTest.php new file mode 100644 index 0000000000..1c0d6834a7 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Tests/Chunk/ServerSentEventTest.php @@ -0,0 +1,79 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Tests\Chunk; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\HttpClient\Chunk\ServerSentEvent; + +/** + * @author Antoine Bluchet + */ +class ServerSentEventTest extends TestCase +{ + public function testParse() + { + $rawData = <<assertSame("test\ntest", $sse->getData()); + $this->assertSame('12', $sse->getId()); + $this->assertSame('testEvent', $sse->getType()); + } + + public function testParseValid() + { + $rawData = <<assertSame('', $sse->getData()); + $this->assertSame('', $sse->getId()); + $this->assertSame('testEvent', $sse->getType()); + } + + public function testParseRetry() + { + $rawData = <<assertSame('', $sse->getData()); + $this->assertSame('', $sse->getId()); + $this->assertSame('message', $sse->getType()); + $this->assertSame(0.012, $sse->getRetry()); + } + + public function testParseNewLine() + { + $rawData = << +data +data: +data: +data: +data: +STR; + $sse = new ServerSentEvent($rawData); + $this->assertSame("\n\n \n\n\n", $sse->getData()); + } +} diff --git a/src/Symfony/Component/HttpClient/Tests/EventSourceHttpClientTest.php b/src/Symfony/Component/HttpClient/Tests/EventSourceHttpClientTest.php new file mode 100644 index 0000000000..b738c15a18 --- /dev/null +++ b/src/Symfony/Component/HttpClient/Tests/EventSourceHttpClientTest.php @@ -0,0 +1,169 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\HttpClient\Tests; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\HttpClient\Chunk\DataChunk; +use Symfony\Component\HttpClient\Chunk\ErrorChunk; +use Symfony\Component\HttpClient\Chunk\FirstChunk; +use Symfony\Component\HttpClient\Chunk\ServerSentEvent; +use Symfony\Component\HttpClient\EventSourceHttpClient; +use Symfony\Component\HttpClient\Exception\EventSourceException; +use Symfony\Component\HttpClient\Response\MockResponse; +use Symfony\Component\HttpClient\Response\ResponseStream; +use Symfony\Contracts\HttpClient\HttpClientInterface; + +/** + * @author Antoine Bluchet + */ +class EventSourceHttpClientTest extends TestCase +{ + public function testGetServerSentEvents() + { + $data = << +data +data: +data +data: + +id: 60 +data +TXT; + + $chunk = new DataChunk(0, $data); + $response = new MockResponse('', ['canceled' => false, 'http_method' => 'GET', 'url' => 'http://localhost:8080/events', 'response_headers' => ['content-type: text/event-stream']]); + $responseStream = new ResponseStream((function () use ($response, $chunk) { + yield $response => new FirstChunk(); + yield $response => $chunk; + yield $response => new ErrorChunk(0, 'timeout'); + })()); + + $hasCorrectHeaders = function ($options) { + $this->assertSame(['Accept: text/event-stream', 'Cache-Control: no-cache'], $options['headers']); + + return true; + }; + + $httpClient = $this->createMock(HttpClientInterface::class); + $httpClient->method('request')->with('GET', 'http://localhost:8080/events', $this->callback($hasCorrectHeaders))->willReturn($response); + + $httpClient->method('stream')->willReturn($responseStream); + + $es = new EventSourceHttpClient($httpClient); + $res = $es->connect('http://localhost:8080/events'); + + $expected = [ + new FirstChunk(), + new ServerSentEvent("event: builderror\nid: 46\ndata: {\"foo\": \"bar\"}\n\n"), + new ServerSentEvent("event: reload\nid: 47\ndata: {}\n\n"), + new ServerSentEvent("event: reload\nid: 48\ndata: {}\n\n"), + new ServerSentEvent("data: test\ndata:test\nid: 49\nevent: testEvent\n\n\n"), + new ServerSentEvent("id: 50\ndata: \ndata\ndata: \ndata\ndata: \n\n"), + ]; + $i = 0; + + $this->expectExceptionMessage('Response has been canceled'); + while ($res) { + if ($i > 0) { + $res->cancel(); + } + foreach ($es->stream($res) as $chunk) { + if ($chunk->isTimeout()) { + continue; + } + + if ($chunk->isLast()) { + continue; + } + + $this->assertEquals($expected[$i++], $chunk); + } + } + } + + /** + * @dataProvider contentTypeProvider + */ + public function testContentType($contentType, $expected) + { + $chunk = new DataChunk(0, ''); + $response = new MockResponse('', ['canceled' => false, 'http_method' => 'GET', 'url' => 'http://localhost:8080/events', 'response_headers' => ['content-type: '.$contentType]]); + $responseStream = new ResponseStream((function () use ($response, $chunk) { + yield $response => new FirstChunk(); + yield $response => $chunk; + yield $response => new ErrorChunk(0, 'timeout'); + })()); + + $hasCorrectHeaders = function ($options) { + $this->assertSame(['Accept: text/event-stream', 'Cache-Control: no-cache'], $options['headers']); + + return true; + }; + + $httpClient = $this->createMock(HttpClientInterface::class); + $httpClient->method('request')->with('GET', 'http://localhost:8080/events', $this->callback($hasCorrectHeaders))->willReturn($response); + + $httpClient->method('stream')->willReturn($responseStream); + + $es = new EventSourceHttpClient($httpClient); + $res = $es->connect('http://localhost:8080/events'); + + if ($expected instanceof EventSourceException) { + $this->expectExceptionMessage($expected->getMessage()); + } + + foreach ($es->stream($res) as $chunk) { + if ($chunk->isTimeout()) { + continue; + } + + if ($chunk->isLast()) { + return; + } + } + } + + public function contentTypeProvider() + { + return [ + ['text/event-stream', true], + ['text/event-stream;charset=utf-8', true], + ['text/event-stream;charset=UTF-8', true], + ['Text/EVENT-STREAM;Charset="utf-8"', true], + ['text/event-stream; charset="utf-8"', true], + ['text/event-stream; charset=iso-8859-15', true], + ['text/html', new EventSourceException('Response content-type is "text/html" while "text/event-stream" was expected for "http://localhost:8080/events".')], + ['text/html; charset="utf-8"', new EventSourceException('Response content-type is "text/html; charset="utf-8"" while "text/event-stream" was expected for "http://localhost:8080/events".')], + ['text/event-streambla', new EventSourceException('Response content-type is "text/event-streambla" while "text/event-stream" was expected for "http://localhost:8080/events".')], + ]; + } +}