kafka安装及使用"/>
kafka安装及使用
kafka
是一个开源的消息系统 由scala语言完成的
是为处理 实时数据提供一个统一的,高吞吐量的,低延迟的平台
是一个分布式的消息队列。由生产者和消费者组成的
对消息的保存,根据Topic(主题)进行归类
无论是单机kafka还是集群kafka,都依赖于zookeeper集群
让zookeeper集群保存一些元数据
保证系统的可用性
安装前提:
有正常的三台虚拟机且都配置zookeeper
安装:
解压缩到对应的文件夹之后配置环境
需配置:
首先其运行的环境变量要配置好
配置中箭头所指向的以及黑点标记的,ip用自己虚拟机的
修改好了以上配置之后就差不多完善了正常的一台主虚拟机的配置了
后面就开始配置另外两台虚拟机
分发
scp /etc/profile root@cx02:/etc/profile
scp /etc/profile root@cx03:/etc/profile
scp -r /usr/local/kafka root@cx02:/usr/local/
scp -r /usr/local/kafka root@cx03:/usr/local/
分别将我的环境变量所在文件和kafka所在文件夹分发给另外两台虚拟机然后还需要修改:
环境变量都要刷新
看kafka中的myid 都有他们对应的123
以及zookeeper.properties的最底部所在虚拟机都调成0.0.0.0防止之后报3888端口号错误
server.properties 中除了修改broker.id外还有其listeners和advertised.listeners所对应的ip地址都需要改成虚拟机自己的。
其他的就差不多没有了
kafka运行
在三台虚拟机上启动zookeeper //启动zookeeper
zkServer.sh start
在三台主机上分别输入 //启动kafka(可能需要在bin目录在才能找到开启命令)
kafka-server-start.sh /usr/local/kafka/config/server.properties
一些常用命令
创建主题
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic test
查看主题
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --list
删除主题
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --delete --topic test
查看主题
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --desc --topic test
创建一个生产者,开启一个新的终端(终端会被占用)
flink的数据源是kafka的消费者,生产者是web项目
kafka-console-producer.sh --broker-list 192.168.60.131:9092 --topic test
创建一个消费者
kafka-console-consumer.sh --zookeeper 192.168.60.131:2181 --topic test
释:cx0*为主机名,即hostname名
flink整合kafka
首先仅适应于flink的流式数据
即只能在StreamExecutionEnvironment环境下kafka才能引用
以下分享一个较简单的样例
总体步骤
启动zookeeper集群,启动kafka,
创建一个接受数据的主题:userS
创建一个接受结果消息的主题:userR
使用一个终端producer,来发送消息,主题为userS
使用一个终端consumer,来接收flink的消息,主题为userR
package cx.kafkaimport cx.model.{Student, User}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}import java.util.Propertiesobject KafkaAsSourceDemo {def main(args: Array[String]): Unit = {//val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//声明主题val topic = "userS"//声明序列化的类val valueDeserializer = new SimpleStringSchema()//声明配置文件val props = new Properties()//加载配置文件路径props.load(this.getClass.getClassLoader.getResourceAsStream("kafka/consumer.properties"))//声明数据源val ds = env.addSource[String](new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),props)).filter(_.trim.nonEmpty).map(perMsg=>{val arr = perMsg.split(",") //通过 , 拆分数据结果为数组val name:String = arr(0).trim //将数组的第一个数据赋值给name,trim去除前后空格val age:Long = arr(1).toLong //将数组的第二个数据赋值给age,toLong转换数据类型val phone:Long = arr(2).toLongval isOnline:String = arr(3).trimval salary:Long = arr(4).toLongUser(name,age,phone,isOnline,salary)}).filter(_.age>25).map(_.toString)//添加val producerConfig = new Properties()producerConfig.load(this.getClass.getClassLoader.getResourceAsStream("kafka/producer.properties"))ds.addSink(new FlinkKafkaProducer[String]("userR",new SimpleStringSchema(),producerConfig))env.execute(this.getClass.getSimpleName)}
}
其中需要事先准备两个kafka主题,以及idea中准备一个样例类User
package cx.modelcase class User(name: String,age: Long,phone:Long,isOnline:String,salary:Long)
kafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic userRkafka-topics.sh --zookeeper cx01:2181,cx02:2181,cx03:2181 --create --partitions 3 --replication-factor 3 --topic userS
运行
kafka-console-producer.sh --broker-list 192.168.60.131:9092 --topic userS
kafka-console-consumer.sh --zookeeper 192.168.60.131:2181 --topic userR
启动idea类
运行结果
结合代码分析,成功过滤出年龄小于35的人User类
更多推荐
kafka安装及使用
发布评论