Add a RdKafka caster to Var-Dumper

This commit is contained in:
Romain Neutron 2020-01-15 09:29:33 +01:00
parent fc30e610d5
commit 6cd1235539
No known key found for this signature in database
GPG Key ID: 201FC7CF9F0CA3ED
5 changed files with 512 additions and 0 deletions

View File

@ -62,6 +62,20 @@ before_install:
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |
# Start Kafka and install an up-to-date librdkafka
docker network create kafka_network
docker pull wurstmeister/zookeeper:3.4.6
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=false" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.12-2.3.1
export KAFKA_BROKER=kafka:9092
sudo sh -c 'echo "\n127.0.0.1 kafka\n" >> /etc/hosts'
mkdir /tmp/librdkafka
curl https://codeload.github.com/edenhill/librdkafka/tar.gz/v0.11.6 | tar xzf - -C /tmp/librdkafka
(cd /tmp/librdkafka/librdkafka-0.11.6 && ./configure && make && sudo make install)
- |
# General configuration
set -e
@ -175,6 +189,7 @@ before_install:
tfold ext.igbinary tpecl igbinary-3.1.2 igbinary.so $INI
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
tfold ext.rdkafka tpecl rdkafka-4.0.2 rdkafka.so $INI
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
done
- |

View File

@ -1,6 +1,11 @@
CHANGELOG
=========
5.1.0
-----
* added `RdKafka` support
4.4.0
-----

View File

@ -0,0 +1,187 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\VarDumper\Caster;
use RdKafka;
use RdKafka\Conf;
use RdKafka\Exception as RdKafkaException;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use RdKafka\Metadata\Broker as BrokerMetadata;
use RdKafka\Metadata\Collection as CollectionMetadata;
use RdKafka\Metadata\Partition as PartitionMetadata;
use RdKafka\Metadata\Topic as TopicMetadata;
use RdKafka\Topic;
use RdKafka\TopicConf;
use RdKafka\TopicPartition;
use Symfony\Component\VarDumper\Cloner\Stub;
/**
* Casts RdKafka related classes to array representation.
*
* @author Romain Neutron <imprec@gmail.com>
*/
class RdKafkaCaster
{
public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
try {
$assignment = $c->getAssignment();
} catch (RdKafkaException $e) {
$assignment = [];
}
$a += [
$prefix.'subscription' => $c->getSubscription(),
$prefix.'assignment' => $assignment,
];
$a += self::extractMetadata($c);
return $a;
}
public static function castTopic(Topic $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'name' => $c->getName(),
];
return $a;
}
public static function castTopicPartition(TopicPartition $c, array $a)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'offset' => $c->getOffset(),
$prefix.'partition' => $c->getPartition(),
$prefix.'topic' => $c->getTopic(),
];
return $a;
}
public static function castMessage(Message $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'errstr' => $c->errstr(),
];
return $a;
}
public static function castConf(Conf $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
foreach ($c->dump() as $key => $value) {
$a[$prefix.$key] = $value;
}
return $a;
}
public static function castTopicConf(TopicConf $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
foreach ($c->dump() as $key => $value) {
$a[$prefix.$key] = $value;
}
return $a;
}
public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'out_q_len' => $c->getOutQLen(),
];
$a += self::extractMetadata($c);
return $a;
}
public static function castCollectionMetadata(CollectionMetadata $c, array $a, Stub $stub, $isNested)
{
$a += iterator_to_array($c);
return $a;
}
public static function castTopicMetadata(TopicMetadata $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'name' => $c->getTopic(),
$prefix.'partitions' => $c->getPartitions(),
];
return $a;
}
public static function castPartitionMetadata(PartitionMetadata $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'id' => $c->getId(),
$prefix.'err' => $c->getErr(),
$prefix.'leader' => $c->getLeader(),
];
return $a;
}
public static function castBrokerMetadata(BrokerMetadata $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;
$a += [
$prefix.'id' => $c->getId(),
$prefix.'host' => $c->getHost(),
$prefix.'port' => $c->getPort(),
];
return $a;
}
private static function extractMetadata($c)
{
$prefix = Caster::PREFIX_VIRTUAL;
try {
$m = $c->getMetadata(true, null, 500);
} catch (RdKafkaException $e) {
return [];
}
return [
$prefix.'orig_broker_id' => $m->getOrigBrokerId(),
$prefix.'orig_broker_name' => $m->getOrigBrokerName(),
$prefix.'brokers' => $m->getBrokers(),
$prefix.'topics' => $m->getTopics(),
];
}
}

View File

@ -160,6 +160,18 @@ abstract class AbstractCloner implements ClonerInterface
':persistent stream' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStream'],
':stream-context' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStreamContext'],
':xml' => ['Symfony\Component\VarDumper\Caster\XmlResourceCaster', 'castXml'],
'RdKafka' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castRdKafka'],
'RdKafka\Conf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castConf'],
'RdKafka\KafkaConsumer' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castKafkaConsumer'],
'RdKafka\Metadata\Broker' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castBrokerMetadata'],
'RdKafka\Metadata\Collection' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castCollectionMetadata'],
'RdKafka\Metadata\Partition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castPartitionMetadata'],
'RdKafka\Metadata\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicMetadata'],
'RdKafka\Message' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castMessage'],
'RdKafka\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopic'],
'RdKafka\TopicPartition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicPartition'],
'RdKafka\TopicConf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicConf'],
];
protected $maxItems = 2500;

View File

@ -0,0 +1,293 @@
<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\VarDumper\Tests\Caster;
use PHPUnit\Framework\TestCase;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Producer;
use RdKafka\TopicConf;
use Symfony\Component\VarDumper\Test\VarDumperTestTrait;
/**
* @requires extension rdkafka
*/
class RdKafkaCasterTest extends TestCase
{
use VarDumperTestTrait;
private const TOPIC = 'test-topic';
private const GROUP_ID = 'test-group-id';
private $hasBroker = false;
private $broker;
protected function setUp(): void
{
if (!$this->hasBroker && getenv('KAFKA_BROKER')) {
$this->broker = getenv('KAFKA_BROKER');
$this->hasBroker = true;
}
}
public function testDumpConf()
{
$conf = new Conf();
$conf->setErrorCb(function ($kafka, $err, $reason) {});
$conf->setDrMsgCb(function () {});
$conf->setRebalanceCb(function () {});
// BC with earlier version of extension rdkafka
foreach (['setLogCb', 'setOffsetCommitCb', 'setStatsCb', 'setConsumeCb'] as $method) {
if (method_exists($conf, $method)) {
$conf->{$method}(function () {});
}
}
$expectedDump = <<<EODUMP
RdKafka\Conf {
builtin.features: "gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins"
client.id: "rdkafka"
message.max.bytes: "1000000"
message.copy.max.bytes: "65535"
receive.message.max.bytes: "100000000"
max.in.flight.requests.per.connection: "1000000"
metadata.request.timeout.ms: "60000"
topic.metadata.refresh.interval.ms: "300000"
metadata.max.age.ms: "-1"
topic.metadata.refresh.fast.interval.ms: "250"
topic.metadata.refresh.fast.cnt: "10"
topic.metadata.refresh.sparse: "true"
debug: ""
socket.timeout.ms: "60000"
socket.blocking.max.ms: "1000"
socket.send.buffer.bytes: "0"
socket.receive.buffer.bytes: "0"
socket.keepalive.enable: "false"
socket.nagle.disable: "false"
socket.max.fails: "%d"
broker.address.ttl: "1000"
broker.address.family: "any"
reconnect.backoff.jitter.ms: "500"
statistics.interval.ms: "0"
enabled_events: "0"
error_cb: "0x%x"
%A
log_level: "6"
log.queue: "%s"
log.thread.name: "true"
log.connection.close: "true"
socket_cb: "0x%x"
open_cb: "0x%x"
internal.termination.signal: "0"
api.version.request: "true"
api.version.request.timeout.ms: "10000"
api.version.fallback.ms: "1200000"
broker.version.fallback: "0.9.0"
security.protocol: "plaintext"
sasl.mechanisms: "GSSAPI"
sasl.kerberos.service.name: "kafka"
sasl.kerberos.principal: "kafkaclient"
sasl.kerberos.kinit.cmd: "kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}"
sasl.kerberos.min.time.before.relogin: "60000"
partition.assignment.strategy: "range,roundrobin"
session.timeout.ms: "30000"
heartbeat.interval.ms: "1000"
group.protocol.type: "consumer"
coordinator.query.interval.ms: "600000"
enable.auto.commit: "true"
auto.commit.interval.ms: "5000"
enable.auto.offset.store: "true"
queued.min.messages: "100000"
queued.max.messages.kbytes: "1048576"
fetch.wait.max.ms: "100"
%A
fetch.min.bytes: "1"
fetch.error.backoff.ms: "500"
offset.store.method: "broker"
%A
enable.partition.eof: "true"
check.crcs: "false"
queue.buffering.max.messages: "100000"
queue.buffering.max.kbytes: "1048576"
queue.buffering.max.ms: "0"
%A
compression.codec: "none"
batch.num.messages: "10000"
delivery.report.only.error: "false"
dr_msg_cb: "0x%x"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $conf);
}
public function testDumpProducer()
{
if (!$this->hasBroker) {
$this->markTestSkipped('Test requires an active broker');
}
$producer = new Producer(new Conf());
$producer->addBrokers($this->broker);
$expectedDump = <<<EODUMP
RdKafka\Producer {
-error_cb: null
-dr_cb: null
out_q_len: %d
orig_broker_id: 1001
orig_broker_name: "$this->broker/1001"
brokers: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Broker {
id: 1001
host: "%s"
port: %d
}
}
topics: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Topic {
name: "%s"
partitions: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Partition {
id: 0
err: 0
leader: 1001
}%A
}
}%A
}
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $producer);
}
public function testDumpTopicConf()
{
$topicConf = new TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$expectedDump = <<<EODUMP
RdKafka\TopicConf {
request.required.acks: "1"
request.timeout.ms: "5000"
message.timeout.ms: "300000"
%A
auto.commit.enable: "true"
auto.commit.interval.ms: "60000"
auto.offset.reset: "smallest"
offset.store.path: "."
offset.store.sync.interval.ms: "-1"
offset.store.method: "broker"
consume.callback.max.messages: "0"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $topicConf);
}
public function testDumpKafkaConsumer()
{
if (!$this->hasBroker) {
$this->markTestSkipped('Test requires an active broker');
}
$conf = new Conf();
$conf->set('metadata.broker.list', $this->broker);
$conf->set('group.id', self::GROUP_ID);
$consumer = new KafkaConsumer($conf);
$consumer->subscribe([self::TOPIC]);
$expectedDump = <<<EODUMP
RdKafka\KafkaConsumer {
-error_cb: null
-rebalance_cb: null
-dr_msg_cb: null
subscription: array:1 [
0 => "test-topic"
]
assignment: []
orig_broker_id: %d
orig_broker_name: "$this->broker/%s"
brokers: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Broker {
id: 1001
host: "%s"
port: %d
}
}
topics: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Topic {
name: "%s"
partitions: RdKafka\Metadata\Collection {
+0: RdKafka\Metadata\Partition {
id: 0
err: 0
leader: 1001
}%A
}
}%A
}
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $consumer);
}
public function testDumpProducerTopic()
{
$producer = new Producer(new Conf());
$producer->addBrokers($this->broker);
$topic = $producer->newTopic('test');
$topic->produce(\RD_KAFKA_PARTITION_UA, 0, '{}');
$expectedDump = <<<EODUMP
RdKafka\ProducerTopic {
name: "test"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $topic);
}
public function testDumpMessage()
{
$conf = new Conf();
$conf->set('metadata.broker.list', $this->broker);
$conf->set('group.id', self::GROUP_ID);
$consumer = new KafkaConsumer($conf);
$consumer->subscribe([self::TOPIC]);
// Force timeout
$message = $consumer->consume(0);
$expectedDump = <<<EODUMP
RdKafka\Message {
+err: -185
+topic_name: null
+timestamp: null
+partition: 0
+payload: null
+len: null
+key: null
+offset: 0%A
errstr: "Local: Timed out"
}
EODUMP;
$this->assertDumpMatchesFormat($expectedDump, $message);
}
}