为Apache Kafka 0.9创建主题使用Java(Creating a topic for Apache Kafka 0.9 Using Java)

编程入门 行业动态 更新时间:2024-10-27 18:28:07
为Apache Kafka 0.9创建主题使用Java(Creating a topic for Apache Kafka 0.9 Using Java)

我正在编程一个客户来与kafka 0.9一起工作。 我想知道如何创建一个主题。 这个答案: 如何通过Java在Kafka中创建主题与我所要求的类似。 除此之外,该解决方案仅适用于与Kafka 0.9的API截然不同的Kafka 0.8.2。

I am programming a client to work with kafka 0.9. I want to know how to create a topic. This answer: How to create a Topic in Kafka through Java is similar to what I am asking. Except, that solution only works for Kafka 0.8.2 which is hugely different from Kafka 0.9's API.

最满意答案

我尝试着使用Kafka 0.9.0.1的Soon Chee Loong的答案,但不得不做出一个改变。 ZKStringSerializer现在是私有的。 要创建ZkUtils,我使用了以下API(它在内部创建一个ZkClient):

ZkUtils.apply( "zookeeper1:port1,zookeeper2:port2", sessionTimeoutMs, connectionTimeoutMs, false)

After looking through the scala api and various links online.

This is the solution I found:

Maven Dependencies:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency>

Code:

import java.util.Properties; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; public class KafkaJavaExample { public static void main(String[] args) { String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; int sessionTimeoutMs = 10 * 1000; int connectionTimeoutMs = 8 * 1000; ZkClient zkClient = new ZkClient( zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); // Security for Kafka was added in Kafka 0.9.0.0 boolean isSecureKafkaCluster = false; // ZkUtils for Kafka was used in Kafka 0.9.0.0 for the AdminUtils API ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); String topic = "my-topic"; int partitions = 2; int replication = 3; // Add topic configuration here Properties topicConfig = new Properties(); AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig); zkClient.close(); } }

If you are wondering why the code below doesn't look like Java:

ZKStringSerializer$.MODULE$

It is because ZkStringSerializer is a Scala Object. You can read more information about that here:

How create Kafka ZKStringSerializer in Java?

Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then createTopic() will only seem to work (In other words: it will return without error). The topic will exist in only Zookeeper and only works when listing topics. i.e. list command below works fine

bin/kafka-topics.sh --list --zookeeper localhost:2181

but Kafka itself does not create the topic. To illustrate, the describe command below will throw an error.

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Therefore, make sure you initialize it with ZKStringSerializer$.MODULE$.

References: How Can we create a topic in Kafka from the IDE using API‌​from-the-ide-using-api

Soon Chee Loong, University of Toronto

更多推荐

本文发布于:2023-08-01 23:06:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1365180.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:主题   Kafka   Apache   Creating   Java

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!