diff --git a/.travis.yml b/.travis.yml index ac4f9c9635..c58495b1f4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -62,6 +62,20 @@ before_install: docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4 export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' + - | + # Start Kafka and install an up-to-date librdkafka + docker network create kafka_network + docker pull wurstmeister/zookeeper:3.4.6 + docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6 + docker pull wurstmeister/kafka:2.12-2.3.1 + docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=false" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.12-2.3.1 + export KAFKA_BROKER=kafka:9092 + sudo sh -c 'echo "\n127.0.0.1 kafka\n" >> /etc/hosts' + + mkdir /tmp/librdkafka + curl https://codeload.github.com/edenhill/librdkafka/tar.gz/v0.11.6 | tar xzf - -C /tmp/librdkafka + (cd /tmp/librdkafka/librdkafka-0.11.6 && ./configure && make && sudo make install) + - | # General configuration set -e @@ -175,6 +189,7 @@ before_install: tfold ext.igbinary tpecl igbinary-3.1.2 igbinary.so $INI tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI + tfold ext.rdkafka tpecl rdkafka-4.0.2 rdkafka.so $INI tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no" done - | diff --git a/src/Symfony/Component/VarDumper/CHANGELOG.md b/src/Symfony/Component/VarDumper/CHANGELOG.md index 94b1c17d1d..b1638017ca 100644 --- a/src/Symfony/Component/VarDumper/CHANGELOG.md +++ b/src/Symfony/Component/VarDumper/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +5.1.0 +----- + + * added `RdKafka` support + 4.4.0 ----- diff --git a/src/Symfony/Component/VarDumper/Caster/RdKafkaCaster.php b/src/Symfony/Component/VarDumper/Caster/RdKafkaCaster.php new file mode 100644 index 0000000000..bd3894ec11 --- /dev/null +++ b/src/Symfony/Component/VarDumper/Caster/RdKafkaCaster.php @@ -0,0 +1,187 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\VarDumper\Caster; + +use RdKafka; +use RdKafka\Conf; +use RdKafka\Exception as RdKafkaException; +use RdKafka\KafkaConsumer; +use RdKafka\Message; +use RdKafka\Metadata\Broker as BrokerMetadata; +use RdKafka\Metadata\Collection as CollectionMetadata; +use RdKafka\Metadata\Partition as PartitionMetadata; +use RdKafka\Metadata\Topic as TopicMetadata; +use RdKafka\Topic; +use RdKafka\TopicConf; +use RdKafka\TopicPartition; +use Symfony\Component\VarDumper\Cloner\Stub; + +/** + * Casts RdKafka related classes to array representation. + * + * @author Romain Neutron + */ +class RdKafkaCaster +{ + public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + try { + $assignment = $c->getAssignment(); + } catch (RdKafkaException $e) { + $assignment = []; + } + + $a += [ + $prefix.'subscription' => $c->getSubscription(), + $prefix.'assignment' => $assignment, + ]; + + $a += self::extractMetadata($c); + + return $a; + } + + public static function castTopic(Topic $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'name' => $c->getName(), + ]; + + return $a; + } + + public static function castTopicPartition(TopicPartition $c, array $a) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'offset' => $c->getOffset(), + $prefix.'partition' => $c->getPartition(), + $prefix.'topic' => $c->getTopic(), + ]; + + return $a; + } + + public static function castMessage(Message $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'errstr' => $c->errstr(), + ]; + + return $a; + } + + public static function castConf(Conf $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + foreach ($c->dump() as $key => $value) { + $a[$prefix.$key] = $value; + } + + return $a; + } + + public static function castTopicConf(TopicConf $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + foreach ($c->dump() as $key => $value) { + $a[$prefix.$key] = $value; + } + + return $a; + } + + public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'out_q_len' => $c->getOutQLen(), + ]; + + $a += self::extractMetadata($c); + + return $a; + } + + public static function castCollectionMetadata(CollectionMetadata $c, array $a, Stub $stub, $isNested) + { + $a += iterator_to_array($c); + + return $a; + } + + public static function castTopicMetadata(TopicMetadata $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'name' => $c->getTopic(), + $prefix.'partitions' => $c->getPartitions(), + ]; + + return $a; + } + + public static function castPartitionMetadata(PartitionMetadata $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'id' => $c->getId(), + $prefix.'err' => $c->getErr(), + $prefix.'leader' => $c->getLeader(), + ]; + + return $a; + } + + public static function castBrokerMetadata(BrokerMetadata $c, array $a, Stub $stub, $isNested) + { + $prefix = Caster::PREFIX_VIRTUAL; + + $a += [ + $prefix.'id' => $c->getId(), + $prefix.'host' => $c->getHost(), + $prefix.'port' => $c->getPort(), + ]; + + return $a; + } + + private static function extractMetadata($c) + { + $prefix = Caster::PREFIX_VIRTUAL; + + try { + $m = $c->getMetadata(true, null, 500); + } catch (RdKafkaException $e) { + return []; + } + + return [ + $prefix.'orig_broker_id' => $m->getOrigBrokerId(), + $prefix.'orig_broker_name' => $m->getOrigBrokerName(), + $prefix.'brokers' => $m->getBrokers(), + $prefix.'topics' => $m->getTopics(), + ]; + } +} diff --git a/src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php b/src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php index 9e548caae3..06fa4884e9 100644 --- a/src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php +++ b/src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php @@ -160,6 +160,18 @@ abstract class AbstractCloner implements ClonerInterface ':persistent stream' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStream'], ':stream-context' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStreamContext'], ':xml' => ['Symfony\Component\VarDumper\Caster\XmlResourceCaster', 'castXml'], + + 'RdKafka' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castRdKafka'], + 'RdKafka\Conf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castConf'], + 'RdKafka\KafkaConsumer' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castKafkaConsumer'], + 'RdKafka\Metadata\Broker' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castBrokerMetadata'], + 'RdKafka\Metadata\Collection' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castCollectionMetadata'], + 'RdKafka\Metadata\Partition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castPartitionMetadata'], + 'RdKafka\Metadata\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicMetadata'], + 'RdKafka\Message' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castMessage'], + 'RdKafka\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopic'], + 'RdKafka\TopicPartition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicPartition'], + 'RdKafka\TopicConf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicConf'], ]; protected $maxItems = 2500; diff --git a/src/Symfony/Component/VarDumper/Tests/Caster/RdKafkaCasterTest.php b/src/Symfony/Component/VarDumper/Tests/Caster/RdKafkaCasterTest.php new file mode 100644 index 0000000000..e14e0f927d --- /dev/null +++ b/src/Symfony/Component/VarDumper/Tests/Caster/RdKafkaCasterTest.php @@ -0,0 +1,293 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\VarDumper\Tests\Caster; + +use PHPUnit\Framework\TestCase; +use RdKafka\Conf; +use RdKafka\KafkaConsumer; +use RdKafka\Producer; +use RdKafka\TopicConf; +use Symfony\Component\VarDumper\Test\VarDumperTestTrait; + +/** + * @requires extension rdkafka + */ +class RdKafkaCasterTest extends TestCase +{ + use VarDumperTestTrait; + + private const TOPIC = 'test-topic'; + private const GROUP_ID = 'test-group-id'; + + private $hasBroker = false; + private $broker; + + protected function setUp(): void + { + if (!$this->hasBroker && getenv('KAFKA_BROKER')) { + $this->broker = getenv('KAFKA_BROKER'); + $this->hasBroker = true; + } + } + + public function testDumpConf() + { + $conf = new Conf(); + $conf->setErrorCb(function ($kafka, $err, $reason) {}); + $conf->setDrMsgCb(function () {}); + $conf->setRebalanceCb(function () {}); + + // BC with earlier version of extension rdkafka + foreach (['setLogCb', 'setOffsetCommitCb', 'setStatsCb', 'setConsumeCb'] as $method) { + if (method_exists($conf, $method)) { + $conf->{$method}(function () {}); + } + } + + $expectedDump = <<assertDumpMatchesFormat($expectedDump, $conf); + } + + public function testDumpProducer() + { + if (!$this->hasBroker) { + $this->markTestSkipped('Test requires an active broker'); + } + + $producer = new Producer(new Conf()); + $producer->addBrokers($this->broker); + + $expectedDump = <<broker/1001" + brokers: RdKafka\Metadata\Collection { + +0: RdKafka\Metadata\Broker { + id: 1001 + host: "%s" + port: %d + } + } + topics: RdKafka\Metadata\Collection { + +0: RdKafka\Metadata\Topic { + name: "%s" + partitions: RdKafka\Metadata\Collection { + +0: RdKafka\Metadata\Partition { + id: 0 + err: 0 + leader: 1001 + }%A + } + }%A + } +} +EODUMP; + + $this->assertDumpMatchesFormat($expectedDump, $producer); + } + + public function testDumpTopicConf() + { + $topicConf = new TopicConf(); + $topicConf->set('auto.offset.reset', 'smallest'); + + $expectedDump = <<assertDumpMatchesFormat($expectedDump, $topicConf); + } + + public function testDumpKafkaConsumer() + { + if (!$this->hasBroker) { + $this->markTestSkipped('Test requires an active broker'); + } + + $conf = new Conf(); + $conf->set('metadata.broker.list', $this->broker); + $conf->set('group.id', self::GROUP_ID); + + $consumer = new KafkaConsumer($conf); + $consumer->subscribe([self::TOPIC]); + + $expectedDump = << "test-topic" + ] + assignment: [] + orig_broker_id: %d + orig_broker_name: "$this->broker/%s" + brokers: RdKafka\Metadata\Collection { + +0: RdKafka\Metadata\Broker { + id: 1001 + host: "%s" + port: %d + } + } + topics: RdKafka\Metadata\Collection { + +0: RdKafka\Metadata\Topic { + name: "%s" + partitions: RdKafka\Metadata\Collection { + +0: RdKafka\Metadata\Partition { + id: 0 + err: 0 + leader: 1001 + }%A + } + }%A + } +} +EODUMP; + + $this->assertDumpMatchesFormat($expectedDump, $consumer); + } + + public function testDumpProducerTopic() + { + $producer = new Producer(new Conf()); + $producer->addBrokers($this->broker); + + $topic = $producer->newTopic('test'); + $topic->produce(\RD_KAFKA_PARTITION_UA, 0, '{}'); + + $expectedDump = <<assertDumpMatchesFormat($expectedDump, $topic); + } + + public function testDumpMessage() + { + $conf = new Conf(); + $conf->set('metadata.broker.list', $this->broker); + $conf->set('group.id', self::GROUP_ID); + + $consumer = new KafkaConsumer($conf); + $consumer->subscribe([self::TOPIC]); + + // Force timeout + $message = $consumer->consume(0); + + $expectedDump = <<assertDumpMatchesFormat($expectedDump, $message); + } +}