diff --git a/src/Symfony/Bridge/Monolog/CHANGELOG.md b/src/Symfony/Bridge/Monolog/CHANGELOG.md index 20f0dc788b..e2439304fa 100644 --- a/src/Symfony/Bridge/Monolog/CHANGELOG.md +++ b/src/Symfony/Bridge/Monolog/CHANGELOG.md @@ -5,6 +5,7 @@ CHANGELOG ----- * The `RouteProcessor` class has been made final +* Added `ElasticsearchLogstashHandler` 4.3.0 ----- diff --git a/src/Symfony/Bridge/Monolog/Handler/ElasticsearchLogstashHandler.php b/src/Symfony/Bridge/Monolog/Handler/ElasticsearchLogstashHandler.php new file mode 100644 index 0000000000..efce0d7e56 --- /dev/null +++ b/src/Symfony/Bridge/Monolog/Handler/ElasticsearchLogstashHandler.php @@ -0,0 +1,142 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bridge\Monolog\Handler; + +use Monolog\Formatter\FormatterInterface; +use Monolog\Formatter\LogstashFormatter; +use Monolog\Handler\AbstractHandler; +use Monolog\Logger; +use Symfony\Component\HttpClient\HttpClient; +use Symfony\Contracts\HttpClient\Exception\ExceptionInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; + +/** + * Push logs directly to Elasticsearch and format them according to Logstash specification. + * + * This handler dials directly with the HTTP interface of Elasticsearch. This + * means it will slow down your application if Elasticsearch takes times to + * answer. Even if all HTTP calls are done asynchronously. + * + * In a development environment, it's fine to keep the default configuration: + * for each log, an HTTP request will be made to push the log to Elasticsearch. + * + * In a production environment, it's highly recommended to wrap this handler + * in a handler with buffering capabilities (like the FingersCrossedHandler, or + * BufferHandler) in order to call Elasticsearch only once with a bulk push. For + * even better performance and fault tolerance, a proper ELK (https://www.elastic.co/what-is/elk-stack) + * stack is recommended. + * + * @author Grégoire Pineau + */ +class ElasticsearchLogstashHandler extends AbstractHandler +{ + private $endpoint; + private $index; + private $client; + private $responses; + + public function __construct(string $endpoint = 'http://127.0.0.1:9200', string $index = 'monolog', HttpClientInterface $client = null, int $level = Logger::DEBUG, bool $bubble = true) + { + if (!interface_exists(HttpClientInterface::class)) { + throw new \LogicException(sprintf('The %s handler needs an HTTP client. Try running "composer require symfony/http-client".', __CLASS__)); + } + + parent::__construct($level, $bubble); + $this->endpoint = $endpoint; + $this->index = $index; + $this->client = $client ?: HttpClient::create(['timeout' => 1]); + $this->responses = new \SplObjectStorage(); + } + + public function handle(array $record): bool + { + if (!$this->isHandling($record)) { + return false; + } + + $this->sendToElasticsearch([$record]); + + return !$this->bubble; + } + + public function handleBatch(array $records): void + { + $records = array_filter($records, [$this, 'isHandling']); + + if ($records) { + $this->sendToElasticsearch($records); + } + } + + protected function getDefaultFormatter(): FormatterInterface + { + return new LogstashFormatter('application', null, null, 'ctxt_', LogstashFormatter::V1); + } + + private function sendToElasticsearch(array $records) + { + $formatter = $this->getFormatter(); + + $body = ''; + foreach ($records as $record) { + foreach ($this->processors as $processor) { + $record = $processor($record); + } + + $body .= json_encode([ + 'index' => [ + '_index' => $this->index, + '_type' => '_doc', + ], + ]); + $body .= "\n"; + $body .= $formatter->format($record); + $body .= "\n"; + } + + $response = $this->client->request('POST', $this->endpoint.'/_bulk', [ + 'body' => $body, + 'headers' => [ + 'Content-Type' => 'application/json', + ], + ]); + + $this->responses->attach($response); + + $this->wait(false); + } + + public function __destruct() + { + $this->wait(true); + } + + private function wait(bool $blocking) + { + foreach ($this->client->stream($this->responses, $blocking ? null : 0.0) as $response => $chunk) { + try { + if ($chunk->isTimeout() && !$blocking) { + continue; + } + if (!$chunk->isFirst() && !$chunk->isLast()) { + continue; + } + if ($chunk->isLast()) { + $this->responses->detach($response); + } + } catch (ExceptionInterface $e) { + $this->responses->detach($response); + error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e)); + } + } + } +} diff --git a/src/Symfony/Bridge/Monolog/Tests/Handler/ElasticsearchLogstashHandlerTest.php b/src/Symfony/Bridge/Monolog/Tests/Handler/ElasticsearchLogstashHandlerTest.php new file mode 100644 index 0000000000..a8875f27f2 --- /dev/null +++ b/src/Symfony/Bridge/Monolog/Tests/Handler/ElasticsearchLogstashHandlerTest.php @@ -0,0 +1,119 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bridge\Monolog\Tests\Handler; + +use Monolog\Formatter\FormatterInterface; +use Monolog\Formatter\LogstashFormatter; +use Monolog\Logger; +use PHPUnit\Framework\TestCase; +use Symfony\Bridge\Monolog\Handler\ElasticsearchLogstashHandler; +use Symfony\Component\HttpClient\MockHttpClient; +use Symfony\Component\HttpClient\Response\MockResponse; + +class ElasticsearchLogstashHandlerTest extends TestCase +{ + public function testHandle() + { + $callCount = 0; + $responseFactory = function ($method, $url, $options) use (&$callCount) { + $body = <<assertSame('POST', $method); + $this->assertSame('http://es:9200/_bulk', $url); + $this->assertSame($body, $options['body']); + $this->assertSame('Content-Type: application/json', $options['normalized_headers']['content-type'][0]); + ++$callCount; + + return new MockResponse(); + }; + + $handler = new ElasticsearchLogstashHandlerWithHardCodedHostname('http://es:9200', 'log', new MockHttpClient($responseFactory)); + + $record = [ + 'message' => 'My info message', + 'context' => [], + 'level' => Logger::INFO, + 'level_name' => Logger::getLevelName(Logger::INFO), + 'channel' => 'app', + 'datetime' => new \DateTime('2020-01-01T00:00:00+01:00'), + 'extra' => [], + ]; + + $handler->handle($record); + + $this->assertSame(1, $callCount); + } + + public function testBandleBatch() + { + $callCount = 0; + $responseFactory = function ($method, $url, $options) use (&$callCount) { + $body = <<assertSame('POST', $method); + $this->assertSame('http://es:9200/_bulk', $url); + $this->assertSame($body, $options['body']); + $this->assertSame('Content-Type: application/json', $options['normalized_headers']['content-type'][0]); + ++$callCount; + + return new MockResponse(); + }; + + $handler = new ElasticsearchLogstashHandlerWithHardCodedHostname('http://es:9200', 'log', new MockHttpClient($responseFactory)); + + $records = [ + [ + 'message' => 'My info message', + 'context' => [], + 'level' => Logger::INFO, + 'level_name' => Logger::getLevelName(Logger::INFO), + 'channel' => 'app', + 'datetime' => new \DateTime('2020-01-01T00:00:00+01:00'), + 'extra' => [], + ], + [ + 'message' => 'My second message', + 'context' => [], + 'level' => Logger::WARNING, + 'level_name' => Logger::getLevelName(Logger::WARNING), + 'channel' => 'php', + 'datetime' => new \DateTime('2020-01-01T00:00:01+01:00'), + 'extra' => [], + ], + ]; + + $handler->handleBatch($records); + + $this->assertSame(1, $callCount); + } +} + +class ElasticsearchLogstashHandlerWithHardCodedHostname extends ElasticsearchLogstashHandler +{ + protected function getDefaultFormatter(): FormatterInterface + { + return new LogstashFormatter('application', 'my hostname', null, 'ctxt_', LogstashFormatter::V1); + } +} diff --git a/src/Symfony/Bridge/Monolog/composer.json b/src/Symfony/Bridge/Monolog/composer.json index 846d427b75..f5b607edd8 100644 --- a/src/Symfony/Bridge/Monolog/composer.json +++ b/src/Symfony/Bridge/Monolog/composer.json @@ -23,6 +23,7 @@ }, "require-dev": { "symfony/console": "^3.4|^4.0|^5.0", + "symfony/http-client": "^4.4|^5.0", "symfony/security-core": "^3.4|^4.0|^5.0", "symfony/var-dumper": "^3.4|^4.0|^5.0" },