[Monolog] Added ElasticsearchLogstashHandler
This commit is contained in:
parent
c65dc15aac
commit
1587e9a4e2
@ -5,6 +5,7 @@ CHANGELOG
|
||||
-----
|
||||
|
||||
* The `RouteProcessor` class has been made final
|
||||
* Added `ElasticsearchLogstashHandler`
|
||||
|
||||
4.3.0
|
||||
-----
|
||||
|
@ -0,0 +1,142 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Bridge\Monolog\Handler;
|
||||
|
||||
use Monolog\Formatter\FormatterInterface;
|
||||
use Monolog\Formatter\LogstashFormatter;
|
||||
use Monolog\Handler\AbstractHandler;
|
||||
use Monolog\Logger;
|
||||
use Symfony\Component\HttpClient\HttpClient;
|
||||
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
|
||||
use Symfony\Contracts\HttpClient\HttpClientInterface;
|
||||
|
||||
/**
|
||||
* Push logs directly to Elasticsearch and format them according to Logstash specification.
|
||||
*
|
||||
* This handler dials directly with the HTTP interface of Elasticsearch. This
|
||||
* means it will slow down your application if Elasticsearch takes times to
|
||||
* answer. Even if all HTTP calls are done asynchronously.
|
||||
*
|
||||
* In a development environment, it's fine to keep the default configuration:
|
||||
* for each log, an HTTP request will be made to push the log to Elasticsearch.
|
||||
*
|
||||
* In a production environment, it's highly recommended to wrap this handler
|
||||
* in a handler with buffering capabilities (like the FingersCrossedHandler, or
|
||||
* BufferHandler) in order to call Elasticsearch only once with a bulk push. For
|
||||
* even better performance and fault tolerance, a proper ELK (https://www.elastic.co/what-is/elk-stack)
|
||||
* stack is recommended.
|
||||
*
|
||||
* @author Grégoire Pineau <lyrixx@lyrixx.info>
|
||||
*/
|
||||
class ElasticsearchLogstashHandler extends AbstractHandler
|
||||
{
|
||||
private $endpoint;
|
||||
private $index;
|
||||
private $client;
|
||||
private $responses;
|
||||
|
||||
public function __construct(string $endpoint = 'http://127.0.0.1:9200', string $index = 'monolog', HttpClientInterface $client = null, int $level = Logger::DEBUG, bool $bubble = true)
|
||||
{
|
||||
if (!interface_exists(HttpClientInterface::class)) {
|
||||
throw new \LogicException(sprintf('The %s handler needs an HTTP client. Try running "composer require symfony/http-client".', __CLASS__));
|
||||
}
|
||||
|
||||
parent::__construct($level, $bubble);
|
||||
$this->endpoint = $endpoint;
|
||||
$this->index = $index;
|
||||
$this->client = $client ?: HttpClient::create(['timeout' => 1]);
|
||||
$this->responses = new \SplObjectStorage();
|
||||
}
|
||||
|
||||
public function handle(array $record): bool
|
||||
{
|
||||
if (!$this->isHandling($record)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$this->sendToElasticsearch([$record]);
|
||||
|
||||
return !$this->bubble;
|
||||
}
|
||||
|
||||
public function handleBatch(array $records): void
|
||||
{
|
||||
$records = array_filter($records, [$this, 'isHandling']);
|
||||
|
||||
if ($records) {
|
||||
$this->sendToElasticsearch($records);
|
||||
}
|
||||
}
|
||||
|
||||
protected function getDefaultFormatter(): FormatterInterface
|
||||
{
|
||||
return new LogstashFormatter('application', null, null, 'ctxt_', LogstashFormatter::V1);
|
||||
}
|
||||
|
||||
private function sendToElasticsearch(array $records)
|
||||
{
|
||||
$formatter = $this->getFormatter();
|
||||
|
||||
$body = '';
|
||||
foreach ($records as $record) {
|
||||
foreach ($this->processors as $processor) {
|
||||
$record = $processor($record);
|
||||
}
|
||||
|
||||
$body .= json_encode([
|
||||
'index' => [
|
||||
'_index' => $this->index,
|
||||
'_type' => '_doc',
|
||||
],
|
||||
]);
|
||||
$body .= "\n";
|
||||
$body .= $formatter->format($record);
|
||||
$body .= "\n";
|
||||
}
|
||||
|
||||
$response = $this->client->request('POST', $this->endpoint.'/_bulk', [
|
||||
'body' => $body,
|
||||
'headers' => [
|
||||
'Content-Type' => 'application/json',
|
||||
],
|
||||
]);
|
||||
|
||||
$this->responses->attach($response);
|
||||
|
||||
$this->wait(false);
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->wait(true);
|
||||
}
|
||||
|
||||
private function wait(bool $blocking)
|
||||
{
|
||||
foreach ($this->client->stream($this->responses, $blocking ? null : 0.0) as $response => $chunk) {
|
||||
try {
|
||||
if ($chunk->isTimeout() && !$blocking) {
|
||||
continue;
|
||||
}
|
||||
if (!$chunk->isFirst() && !$chunk->isLast()) {
|
||||
continue;
|
||||
}
|
||||
if ($chunk->isLast()) {
|
||||
$this->responses->detach($response);
|
||||
}
|
||||
} catch (ExceptionInterface $e) {
|
||||
$this->responses->detach($response);
|
||||
error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,119 @@
|
||||
<?php
|
||||
|
||||
/*
|
||||
* This file is part of the Symfony package.
|
||||
*
|
||||
* (c) Fabien Potencier <fabien@symfony.com>
|
||||
*
|
||||
* For the full copyright and license information, please view the LICENSE
|
||||
* file that was distributed with this source code.
|
||||
*/
|
||||
|
||||
namespace Symfony\Bridge\Monolog\Tests\Handler;
|
||||
|
||||
use Monolog\Formatter\FormatterInterface;
|
||||
use Monolog\Formatter\LogstashFormatter;
|
||||
use Monolog\Logger;
|
||||
use PHPUnit\Framework\TestCase;
|
||||
use Symfony\Bridge\Monolog\Handler\ElasticsearchLogstashHandler;
|
||||
use Symfony\Component\HttpClient\MockHttpClient;
|
||||
use Symfony\Component\HttpClient\Response\MockResponse;
|
||||
|
||||
class ElasticsearchLogstashHandlerTest extends TestCase
|
||||
{
|
||||
public function testHandle()
|
||||
{
|
||||
$callCount = 0;
|
||||
$responseFactory = function ($method, $url, $options) use (&$callCount) {
|
||||
$body = <<<EOBODY
|
||||
{"index":{"_index":"log","_type":"_doc"}}
|
||||
{"@timestamp":"2020-01-01T00:00:00.000000+01:00","@version":1,"host":"my hostname","message":"My info message","type":"application","channel":"app","level":"INFO"}
|
||||
|
||||
|
||||
EOBODY;
|
||||
|
||||
$this->assertSame('POST', $method);
|
||||
$this->assertSame('http://es:9200/_bulk', $url);
|
||||
$this->assertSame($body, $options['body']);
|
||||
$this->assertSame('Content-Type: application/json', $options['normalized_headers']['content-type'][0]);
|
||||
++$callCount;
|
||||
|
||||
return new MockResponse();
|
||||
};
|
||||
|
||||
$handler = new ElasticsearchLogstashHandlerWithHardCodedHostname('http://es:9200', 'log', new MockHttpClient($responseFactory));
|
||||
|
||||
$record = [
|
||||
'message' => 'My info message',
|
||||
'context' => [],
|
||||
'level' => Logger::INFO,
|
||||
'level_name' => Logger::getLevelName(Logger::INFO),
|
||||
'channel' => 'app',
|
||||
'datetime' => new \DateTime('2020-01-01T00:00:00+01:00'),
|
||||
'extra' => [],
|
||||
];
|
||||
|
||||
$handler->handle($record);
|
||||
|
||||
$this->assertSame(1, $callCount);
|
||||
}
|
||||
|
||||
public function testBandleBatch()
|
||||
{
|
||||
$callCount = 0;
|
||||
$responseFactory = function ($method, $url, $options) use (&$callCount) {
|
||||
$body = <<<EOBODY
|
||||
{"index":{"_index":"log","_type":"_doc"}}
|
||||
{"@timestamp":"2020-01-01T00:00:00.000000+01:00","@version":1,"host":"my hostname","message":"My info message","type":"application","channel":"app","level":"INFO"}
|
||||
|
||||
{"index":{"_index":"log","_type":"_doc"}}
|
||||
{"@timestamp":"2020-01-01T00:00:01.000000+01:00","@version":1,"host":"my hostname","message":"My second message","type":"application","channel":"php","level":"WARNING"}
|
||||
|
||||
|
||||
EOBODY;
|
||||
|
||||
$this->assertSame('POST', $method);
|
||||
$this->assertSame('http://es:9200/_bulk', $url);
|
||||
$this->assertSame($body, $options['body']);
|
||||
$this->assertSame('Content-Type: application/json', $options['normalized_headers']['content-type'][0]);
|
||||
++$callCount;
|
||||
|
||||
return new MockResponse();
|
||||
};
|
||||
|
||||
$handler = new ElasticsearchLogstashHandlerWithHardCodedHostname('http://es:9200', 'log', new MockHttpClient($responseFactory));
|
||||
|
||||
$records = [
|
||||
[
|
||||
'message' => 'My info message',
|
||||
'context' => [],
|
||||
'level' => Logger::INFO,
|
||||
'level_name' => Logger::getLevelName(Logger::INFO),
|
||||
'channel' => 'app',
|
||||
'datetime' => new \DateTime('2020-01-01T00:00:00+01:00'),
|
||||
'extra' => [],
|
||||
],
|
||||
[
|
||||
'message' => 'My second message',
|
||||
'context' => [],
|
||||
'level' => Logger::WARNING,
|
||||
'level_name' => Logger::getLevelName(Logger::WARNING),
|
||||
'channel' => 'php',
|
||||
'datetime' => new \DateTime('2020-01-01T00:00:01+01:00'),
|
||||
'extra' => [],
|
||||
],
|
||||
];
|
||||
|
||||
$handler->handleBatch($records);
|
||||
|
||||
$this->assertSame(1, $callCount);
|
||||
}
|
||||
}
|
||||
|
||||
class ElasticsearchLogstashHandlerWithHardCodedHostname extends ElasticsearchLogstashHandler
|
||||
{
|
||||
protected function getDefaultFormatter(): FormatterInterface
|
||||
{
|
||||
return new LogstashFormatter('application', 'my hostname', null, 'ctxt_', LogstashFormatter::V1);
|
||||
}
|
||||
}
|
@ -23,6 +23,7 @@
|
||||
},
|
||||
"require-dev": {
|
||||
"symfony/console": "^3.4|^4.0|^5.0",
|
||||
"symfony/http-client": "^4.4|^5.0",
|
||||
"symfony/security-core": "^3.4|^4.0|^5.0",
|
||||
"symfony/var-dumper": "^3.4|^4.0|^5.0"
|
||||
},
|
||||
|
Reference in New Issue
Block a user