[HttpClient] add EventSourceHttpClient to consume Server-Sent Events

This commit is contained in:
soyuka 2020-05-04 19:03:17 +02:00 committed by Fabien Potencier
parent 367aa1df6c
commit 12ccca3cd4
6 changed files with 502 additions and 0 deletions

View File

@ -8,6 +8,7 @@ CHANGELOG
* added support for pausing responses with a new `pause_handler` callable exposed as an info item * added support for pausing responses with a new `pause_handler` callable exposed as an info item
* added `StreamableInterface` to ease turning responses into PHP streams * added `StreamableInterface` to ease turning responses into PHP streams
* added `MockResponse::getRequestMethod()` and `getRequestUrl()` to allow inspecting which request has been sent * added `MockResponse::getRequestMethod()` and `getRequestUrl()` to allow inspecting which request has been sent
* added `EventSourceHttpClient` a Server-Sent events stream implementing the [EventSource specification](https://www.w3.org/TR/eventsource/#eventsource)
5.1.0 5.1.0
----- -----

View File

@ -0,0 +1,79 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Chunk;
use Symfony\Contracts\HttpClient\ChunkInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
* @author Nicolas Grekas <p@tchwork.com>
*/
final class ServerSentEvent extends DataChunk implements ChunkInterface
{
private $data = '';
private $id = '';
private $type = 'message';
private $retry = 0;
public function __construct(string $content)
{
parent::__construct(-1, $content);
// remove BOM
if (0 === strpos($content, "\xEF\xBB\xBF")) {
$content = substr($content, 3);
}
foreach (preg_split("/(?:\r\n|[\r\n])/", $content) as $line) {
if (0 === $i = strpos($line, ':')) {
continue;
}
$i = false === $i ? \strlen($line) : $i;
$field = substr($line, 0, $i);
$i += 1 + (' ' === ($line[1 + $i] ?? ''));
switch ($field) {
case 'id': $this->id = substr($line, $i); break;
case 'event': $this->type = substr($line, $i); break;
case 'data': $this->data .= ('' === $this->data ? '' : "\n").substr($line, $i); break;
case 'retry':
$retry = substr($line, $i);
if ('' !== $retry && \strlen($retry) === strspn($retry, '0123456789')) {
$this->retry = $retry / 1000.0;
}
break;
}
}
}
public function getId(): string
{
return $this->id;
}
public function getType(): string
{
return $this->type;
}
public function getData(): string
{
return $this->data;
}
public function getRetry(): float
{
return $this->retry;
}
}

View File

@ -0,0 +1,153 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient;
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
use Symfony\Component\HttpClient\Exception\EventSourceException;
use Symfony\Component\HttpClient\Response\AsyncContext;
use Symfony\Component\HttpClient\Response\AsyncResponse;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
* @author Nicolas Grekas <p@tchwork.com>
*/
final class EventSourceHttpClient implements HttpClientInterface
{
use AsyncDecoratorTrait;
use HttpClientTrait;
private $reconnectionTime;
public function __construct(HttpClientInterface $client = null, float $reconnectionTime = 10.0)
{
$this->client = $client ?? HttpClient::create();
$this->reconnectionTime = $reconnectionTime;
}
public function connect(string $url, array $options = []): ResponseInterface
{
return $this->request('GET', $url, self::mergeDefaultOptions($options, [
'buffer' => false,
'headers' => [
'Accept' => 'text/event-stream',
'Cache-Control' => 'no-cache',
],
], true));
}
public function request(string $method, string $url, array $options = []): ResponseInterface
{
$state = new class() {
public $buffer = null;
public $lastEventId = null;
public $reconnectionTime;
public $lastError = null;
};
$state->reconnectionTime = $this->reconnectionTime;
if ($accept = self::normalizeHeaders($options['headers'] ?? [])['accept'] ?? []) {
$state->buffer = \in_array($accept, [['Accept: text/event-stream'], ['accept: text/event-stream']], true) ? '' : null;
}
return new AsyncResponse($this->client, $method, $url, $options, static function (ChunkInterface $chunk, AsyncContext $context) use ($state, $method, $url, $options) {
if (null !== $state->buffer) {
$context->setInfo('reconnection_time', $state->reconnectionTime);
$isTimeout = false;
}
$lastError = $state->lastError;
$state->lastError = null;
try {
$isTimeout = $chunk->isTimeout();
if (null !== $chunk->getInformationalStatus()) {
yield $chunk;
return;
}
} catch (TransportExceptionInterface $e) {
$state->lastError = $lastError ?? microtime(true);
if (null === $state->buffer || ($isTimeout && microtime(true) - $state->lastError < $state->reconnectionTime)) {
yield $chunk;
} else {
$options['headers']['Last-Event-ID'] = $state->lastEventId;
$state->buffer = '';
$state->lastError = microtime(true);
$context->getResponse()->cancel();
$context->replaceRequest($method, $url, $options);
if ($isTimeout) {
yield $chunk;
} else {
$context->pause($state->reconnectionTime);
}
}
return;
}
if ($chunk->isFirst()) {
if (preg_match('/^text\/event-stream(;|$)/i', $context->getHeaders()['content-type'][0] ?? '')) {
$state->buffer = '';
} elseif (null !== $lastError || (null !== $state->buffer && 200 === $context->getStatusCode())) {
throw new EventSourceException(sprintf('Response content-type is "%s" while "text/event-stream" was expected for "%s".', $context->getHeaders()['content-type'][0] ?? '', $context->getInfo('url')));
} else {
$context->passthru();
}
if (null === $lastError) {
yield $chunk;
}
return;
}
$rx = '/((?:\r\n|[\r\n]){2,})/';
$content = $state->buffer.$chunk->getContent();
if ($chunk->isLast()) {
$rx = substr_replace($rx, '|$', -2, 0);
}
$events = preg_split($rx, $content, -1, PREG_SPLIT_DELIM_CAPTURE);
$state->buffer = array_pop($events);
for ($i = 0; isset($events[$i]); $i += 2) {
$event = new ServerSentEvent($events[$i].$events[1 + $i]);
if ('' !== $event->getId()) {
$context->setInfo('last_event_id', $state->lastEventId = $event->getId());
}
if ($event->getRetry()) {
$context->setInfo('reconnection_time', $state->reconnectionTime = $event->getRetry());
}
yield $event;
}
if (preg_match('/^(?::[^\r\n]*+(?:\r\n|[\r\n]))+$/m', $state->buffer)) {
$content = $state->buffer;
$state->buffer = '';
yield $context->createChunk($content);
}
if ($chunk->isLast()) {
yield $chunk;
}
});
}
}

View File

@ -0,0 +1,21 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Exception;
use Symfony\Contracts\HttpClient\Exception\DecodingExceptionInterface;
/**
* @author Nicolas Grekas <p@tchwork.com>
*/
final class EventSourceException extends \RuntimeException implements DecodingExceptionInterface
{
}

View File

@ -0,0 +1,79 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Tests\Chunk;
use PHPUnit\Framework\TestCase;
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class ServerSentEventTest extends TestCase
{
public function testParse()
{
$rawData = <<<STR
data: test
data:test
id: 12
event: testEvent
STR;
$sse = new ServerSentEvent($rawData);
$this->assertSame("test\ntest", $sse->getData());
$this->assertSame('12', $sse->getId());
$this->assertSame('testEvent', $sse->getType());
}
public function testParseValid()
{
$rawData = <<<STR
event: testEvent
data
STR;
$sse = new ServerSentEvent($rawData);
$this->assertSame('', $sse->getData());
$this->assertSame('', $sse->getId());
$this->assertSame('testEvent', $sse->getType());
}
public function testParseRetry()
{
$rawData = <<<STR
retry: 12
STR;
$sse = new ServerSentEvent($rawData);
$this->assertSame('', $sse->getData());
$this->assertSame('', $sse->getId());
$this->assertSame('message', $sse->getType());
$this->assertSame(0.012, $sse->getRetry());
}
public function testParseNewLine()
{
$rawData = <<<STR
data: <tag>
data
data: <foo />
data:
data:
data: </tag>
STR;
$sse = new ServerSentEvent($rawData);
$this->assertSame("<tag>\n\n <foo />\n\n\n</tag>", $sse->getData());
}
}

View File

@ -0,0 +1,169 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\HttpClient\Tests;
use PHPUnit\Framework\TestCase;
use Symfony\Component\HttpClient\Chunk\DataChunk;
use Symfony\Component\HttpClient\Chunk\ErrorChunk;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\ServerSentEvent;
use Symfony\Component\HttpClient\EventSourceHttpClient;
use Symfony\Component\HttpClient\Exception\EventSourceException;
use Symfony\Component\HttpClient\Response\MockResponse;
use Symfony\Component\HttpClient\Response\ResponseStream;
use Symfony\Contracts\HttpClient\HttpClientInterface;
/**
* @author Antoine Bluchet <soyuka@gmail.com>
*/
class EventSourceHttpClientTest extends TestCase
{
public function testGetServerSentEvents()
{
$data = <<<TXT
event: builderror
id: 46
data: {"foo": "bar"}
event: reload
id: 47
data: {}
event: reload
id: 48
data: {}
data: test
data:test
id: 49
event: testEvent
id: 50
data: <tag>
data
data: <foo />
data
data: </tag>
id: 60
data
TXT;
$chunk = new DataChunk(0, $data);
$response = new MockResponse('', ['canceled' => false, 'http_method' => 'GET', 'url' => 'http://localhost:8080/events', 'response_headers' => ['content-type: text/event-stream']]);
$responseStream = new ResponseStream((function () use ($response, $chunk) {
yield $response => new FirstChunk();
yield $response => $chunk;
yield $response => new ErrorChunk(0, 'timeout');
})());
$hasCorrectHeaders = function ($options) {
$this->assertSame(['Accept: text/event-stream', 'Cache-Control: no-cache'], $options['headers']);
return true;
};
$httpClient = $this->createMock(HttpClientInterface::class);
$httpClient->method('request')->with('GET', 'http://localhost:8080/events', $this->callback($hasCorrectHeaders))->willReturn($response);
$httpClient->method('stream')->willReturn($responseStream);
$es = new EventSourceHttpClient($httpClient);
$res = $es->connect('http://localhost:8080/events');
$expected = [
new FirstChunk(),
new ServerSentEvent("event: builderror\nid: 46\ndata: {\"foo\": \"bar\"}\n\n"),
new ServerSentEvent("event: reload\nid: 47\ndata: {}\n\n"),
new ServerSentEvent("event: reload\nid: 48\ndata: {}\n\n"),
new ServerSentEvent("data: test\ndata:test\nid: 49\nevent: testEvent\n\n\n"),
new ServerSentEvent("id: 50\ndata: <tag>\ndata\ndata: <foo />\ndata\ndata: </tag>\n\n"),
];
$i = 0;
$this->expectExceptionMessage('Response has been canceled');
while ($res) {
if ($i > 0) {
$res->cancel();
}
foreach ($es->stream($res) as $chunk) {
if ($chunk->isTimeout()) {
continue;
}
if ($chunk->isLast()) {
continue;
}
$this->assertEquals($expected[$i++], $chunk);
}
}
}
/**
* @dataProvider contentTypeProvider
*/
public function testContentType($contentType, $expected)
{
$chunk = new DataChunk(0, '');
$response = new MockResponse('', ['canceled' => false, 'http_method' => 'GET', 'url' => 'http://localhost:8080/events', 'response_headers' => ['content-type: '.$contentType]]);
$responseStream = new ResponseStream((function () use ($response, $chunk) {
yield $response => new FirstChunk();
yield $response => $chunk;
yield $response => new ErrorChunk(0, 'timeout');
})());
$hasCorrectHeaders = function ($options) {
$this->assertSame(['Accept: text/event-stream', 'Cache-Control: no-cache'], $options['headers']);
return true;
};
$httpClient = $this->createMock(HttpClientInterface::class);
$httpClient->method('request')->with('GET', 'http://localhost:8080/events', $this->callback($hasCorrectHeaders))->willReturn($response);
$httpClient->method('stream')->willReturn($responseStream);
$es = new EventSourceHttpClient($httpClient);
$res = $es->connect('http://localhost:8080/events');
if ($expected instanceof EventSourceException) {
$this->expectExceptionMessage($expected->getMessage());
}
foreach ($es->stream($res) as $chunk) {
if ($chunk->isTimeout()) {
continue;
}
if ($chunk->isLast()) {
return;
}
}
}
public function contentTypeProvider()
{
return [
['text/event-stream', true],
['text/event-stream;charset=utf-8', true],
['text/event-stream;charset=UTF-8', true],
['Text/EVENT-STREAM;Charset="utf-8"', true],
['text/event-stream; charset="utf-8"', true],
['text/event-stream; charset=iso-8859-15', true],
['text/html', new EventSourceException('Response content-type is "text/html" while "text/event-stream" was expected for "http://localhost:8080/events".')],
['text/html; charset="utf-8"', new EventSourceException('Response content-type is "text/html; charset="utf-8"" while "text/event-stream" was expected for "http://localhost:8080/events".')],
['text/event-streambla', new EventSourceException('Response content-type is "text/event-streambla" while "text/event-stream" was expected for "http://localhost:8080/events".')],
];
}
}