kafka安装及使用

编程入门 行业动态 更新时间:2024-10-27 17:22:12

<a href=https://www.elefans.com/category/jswz/34/1769633.html style=kafka安装及使用"/>

kafka安装及使用

kafka

是一个开源的消息系统 由scala语言完成的

是为处理 实时数据提供一个统一的,高吞吐量的,低延迟的平台

是一个分布式的消息队列。由生产者和消费者组成的

对消息的保存,根据Topic(主题)进行归类

无论是单机kafka还是集群kafka,都依赖于zookeeper集群

让zookeeper集群保存一些元数据

保证系统的可用性

安装前提:

有正常的三台虚拟机且都配置zookeeper

安装:

解压缩到对应的文件夹之后配置环境

需配置:

首先其运行的环境变量要配置好

 

 配置中箭头所指向的以及黑点标记的,ip用自己虚拟机的

 修改好了以上配置之后就差不多完善了正常的一台主虚拟机的配置了

后面就开始配置另外两台虚拟机

分发

scp /etc/profile root@cx02:/etc/profile

scp /etc/profile root@cx03:/etc/profile

 scp -r /usr/local/kafka root@cx02:/usr/local/
 scp -r /usr/local/kafka root@cx03:/usr/local/
分别将我的环境变量所在文件和kafka所在文件夹分发给另外两台虚拟机

然后还需要修改:

环境变量都要刷新

看kafka中的myid  都有他们对应的123

  以及zookeeper.properties的最底部所在虚拟机都调成0.0.0.0防止之后报3888端口号错误

server.properties        中除了修改broker.id外还有其listeners和advertised.listeners所对应的ip地址都需要改成虚拟机自己的。

其他的就差不多没有了

kafka运行

在三台虚拟机上启动zookeeper                //启动zookeeper

zkServer.sh start

在三台主机上分别输入               //启动kafka(可能需要在bin目录在才能找到开启命令)

 kafka-server-start.sh /usr/local/kafka/config/server.properties

一些常用命令

创建主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic test

查看主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --list

删除主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --delete --topic test

查看主题

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --desc --topic test

创建一个生产者,开启一个新的终端(终端会被占用)

flink的数据源是kafka的消费者,生产者是web项目

kafka-console-producer.sh --broker-list 192.168.60.131:9092 --topic test

创建一个消费者

kafka-console-consumer.sh --zookeeper 192.168.60.131:2181 --topic test

 释:cx0*为主机名,即hostname名

flink整合kafka

首先仅适应于flink的流式数据

即只能在StreamExecutionEnvironment环境下kafka才能引用

以下分享一个较简单的样例

总体步骤

启动zookeeper集群,启动kafka,

创建一个接受数据的主题:userS

创建一个接受结果消息的主题:userR

使用一个终端producer,来发送消息,主题为userS

使用一个终端consumer,来接收flink的消息,主题为userR

package cx.kafkaimport cx.model.{Student, User}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}import java.util.Propertiesobject KafkaAsSourceDemo {def main(args: Array[String]): Unit = {//val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//声明主题val topic = "userS"//声明序列化的类val valueDeserializer = new SimpleStringSchema()//声明配置文件val props = new Properties()//加载配置文件路径props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/consumer.properties"))//声明数据源val ds = env.addSource[String](new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),props)).filter(_.trim.nonEmpty).map(perMsg=>{val arr = perMsg.split(",")   //通过 ,  拆分数据结果为数组val name:String = arr(0).trim         //将数组的第一个数据赋值给name,trim去除前后空格val age:Long = arr(1).toLong //将数组的第二个数据赋值给age,toLong转换数据类型val phone:Long = arr(2).toLongval isOnline:String = arr(3).trimval salary:Long = arr(4).toLongUser(name,age,phone,isOnline,salary)}).filter(_.age>25).map(_.toString)//添加val producerConfig = new Properties()producerConfig.load(this.getClass.getClassLoader.getResourceAsStream("kafka/producer.properties"))ds.addSink(new FlinkKafkaProducer[String]("userR",new SimpleStringSchema(),producerConfig))env.execute(this.getClass.getSimpleName)}
}

 其中需要事先准备两个kafka主题,以及idea中准备一个样例类User

package cx.modelcase class User(name: String,age: Long,phone:Long,isOnline:String,salary:Long)
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic userR         

kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic userS

运行

 

kafka-console-producer.sh --broker-list 192.168.60.131:9092 --topic userS

kafka-console-consumer.sh --zookeeper 192.168.60.131:2181 --topic userR

启动idea类

运行结果

结合代码分析,成功过滤出年龄小于35的人User类

更多推荐

kafka安装及使用

本文发布于:2023-06-23 16:02:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/851433.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!