安全认证"/>
kafka 配置部署及SASL安全认证
一、下载及配置说明
1. 下载安装
kafka下载地址:
# 下载文件
wget .2.1/kafka_2.11-2.2.1.tgz# 文件解压缩
tar -zxvf kafka_2.11-2.2.1.tgz# 创建软连接
ln -s kafka_2.12-2.2.0 kafka
2. 配置说明
kafka服务的配置文件为 kafka/config/server.properties ,内容如下:
############################# Server Basics #############################
broker.id=0######################### Socket Server Settings ########################
listeners=PLAINTEXT://192.168.1.128:9092
numwork.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600########################## Log Basics ####################################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1#################### Internal Topic Settings ############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1######################## Log Flush Policy ################################
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000################### Group Coordinator Settings ############################
group.initial.rebalance.delay.ms=0
配置说明:
- broker.id:每个broker在集群中的唯一标识。
- listeners:kafka的监听地址与端口,需要提供外网服务的话,要设置为本地的IP地址。
- numwork.threads:kafka用于处理网络请求的线程数
- num.io.threads:kafka用于处理磁盘io的线程数
- socket.send.buffer.bytes:发送数据的缓冲区
- socket.receive.buffer.bytes:接收数据的缓冲区
- socket.request.max.bytes:允许接收的最大数据包的大小(防止数据包过大导致OOM)
- log.dirs:kakfa用于保存数据的目录,所有的消息都会存储在该目录当中。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的partition。需要说明的是,kafka在分配partition的时候选择的原则不是按照磁盘空间大小来定的,而是根据分配的partition的个数多少而定
- num.partitions:设置新创建的topic的默认分区数
- number.recovery.threads.per.data.dir:用于恢复每个数据目录时启动的线程数
- log.retention.hours:配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms。如果多个同时设置会选择时间最短的配置,默认为7天。
- log.retention.check.interval.ms:用于检测数据过期的周期
- log.segment.bytes:配置partition中每个segment数据文件的大小。默认为1GB。超出该大小后,会自动创建一个新的segment文件。
- zookeeper.connect:指定连接的zk的地址,zk中存储了broker的元数据信息。可以通过逗号来设置多个值。格式为:hostname:port/path。hostname为zk的主机名或ip,port为zk监听的端口。/path表示kafka的元数据存储到zk上的目录,如果不设置,默认为根目录
- zookeeper.connection.timeout:kafka连接zk的超时时间
- group.initial.rebalance.delay.ms:在实际环境当中,当将多个consumer加入到一个空的consumer group中时,每加入一个consumer就会触发一次对partition消费的重平衡,如果加入100个,就得重平衡100次,这个过程就会变得非常耗时。通过设置该参数,可以延迟重平衡的时间,比如有100个consumer会在10s内全部加入到一个consumer group中,就可以将该值设置为10s,10s之后,只需要做一次重平衡即可。默认为0则代表不开启该特性。
- auto.create.topics.enable:当有producer向一个不存在的topic中写入消息时,是否自动创建该topic
- delete.topics.enable:kafka提供了删除topic的功能,但默认并不会直接将topic数据物理删除。如果要从物理上删除(删除topic后,数据文件也一并删除),则需要将此项设置为true
二、单机部署
1. 服务配置
单机模式需要修改 kafka/config/server.properties配置文件:
listener 配置项默认为注释状态,找到配置行,修改为本机IP及需要开启的端口
# 本机IP可省略
# listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.1.128:9092
log.dir 为kafka数据存储地址,修改配置项:
log.dirs=/usr/local/kafka/data
启动自带的zookeeper服务
# 后台启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka服务
# 后台启动
./bin/kafka-server-start.sh -daemon config/server.properties
2. kafka命令
# 创建topic,多个服务用逗号分隔
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test# 查看topic列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 启动一个控制台消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test# 启动一个控制台生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
三、KAFKA 集群部署
1. zookeeper 集群部署
下载地址:.html
配置文件说明:
# 心跳时间间隔
tickTime=2000
# 初始连接心跳书限制
initLimit=10
# 每次请求与应答间的心跳数限制
syncLimit=5
# 存储快照的目录, 不要使用/tmp用于存储, 这里只是一个示例
dataDir=/tmp/zookeeper
# zookeeper服务端口
clientPort=2181
# 客户端连接的最大数目, 修改此选项以增加更多客户端
#maxClientCnxns=60
# 保存在dataDir中的快照数
#autopurge.snapRetainCount=3
# 清除任务间隔(小时), 设置为 “0” 以禁用自动清除功能
#autopurge.purgeInterval=1
准备三台服务器,并分别下载并解压zookeeper-3.4.12,三台服务器的IP如下:
- 192.168.1.101
- 192.168.1.102
- 192.168.1.103
拷贝并重命名配置文件:
cp zoo_sample.cfg zoo.cfg
在三台服务器的 zoo.cfg 配置文件末尾添加如下配置:
# 配置格式 server.id = ip:port:port
# 配置的id唯一,不可重复
# 2888端口为集群内机器通信使用
# 3888端口为leader选举使用
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888
在 dataDir 配置目录下新建myid文件,写入id的值:
# 此处仅为示例,dataDir不要使用/tmp目录
vim /tmp/zookeeper/myid
文件内容如下:
启动zookeeper时注意开放端口,centos 7.0 开放端口命令如下:
# --zone 作用域
# --add-port 添加端口,格式:端口/通讯协议
# --permanent 永久生效,无此参数时服务器重启后配置失效
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent# 配置完成后,重启防火墙
firewall-cmd --reload# 查看防火墙已开放的端口
firewall-cmd --list-ports
使用命令 ./zkServer.sh start 分别启动三台服务器的zookeeper,集群启动完成后,三台服务器状态如下:
- 服务器IP 192.168.1.101:
- 服务器IP 192.168.1.102:
- 服务器IP 192.168.1.103:
2. kafka集群部署
在三台服务器上,下载安装kafka,修改配置文件:
# broker.id,每个kafka配置不同的ID,唯一,不可重复
broker.id=0# listeners 配置
listeners=PLAINTEXT://:9092# zookeeper配置,多个地址用逗号分隔
zookeeper.connect=192.168.1.101:2181,192.168.1.103:2181,192.168.1.103:2181
配置完成后启动kafka
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
3. zookeeper 命令简介
zookeeper 命令用于在 zookeeper 服务上执行操作,首先执行命令,打开新的 session 会话,进入终端。
$ ./zkCli.sh
进入终端后如下显示:
查看命令帮助:
命令说明:
命令 | 格式 | 说明 |
---|---|---|
ls | ls path [watch] | 查看路径下的目录列表 |
ls2 | ls2 path [watch] | 查看路径下的目录列表,比 ls 更详细 |
stat | stat path [watch] | 查看节点状态信息 |
set | set path data [version] | 修改节点存储的数据 |
get | get path [watch] | 获取节点数据和状态信息 |
create | create [-s] [-e] path data acl | 创建节点 |
delete | delete path [version] | 删除节点 |
rmr | rmr path | 递归删除文件夹 |
history | history | 查看执行的历史命令 |
quit | quit | 退出终端 |
四、使用SASL认证
1. 修改启动配置
将kafka/config目录下server.properties复制并重命名
cp server.properties server-sasl.properties
在配置文件末尾添加如下配置:
# 修改listeners
listeners=SASL_PLAINTEXT://:9092
# 权限配置
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAINsuper.users=User:admin
2. 配置认证文件
在 kafka/config目录下创建kafka-server-jaas.conf文件,内容如下:
# 配置文件的分号千万不能忘
KafkaServer {org.apache.kafkamon.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin";
};
3. 修改启动脚本
复制 kafka/bin 目录下的 kafka-server-start.sh 并重命名
cp kafka-server-start.sh kafka-server-start-sasl.sh
打开文件,将最后一行:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改为(路径改为服务器的kafka路径):
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/user/kafka/config/kafka-server-jaas.conf kafka.Kafka "$@"
4. zookeeper 配置
使用 zkCli.sh 命令进入zookeeper控制台,执行如下命令
# 创建用户
create /digest user# 添加权限
addauth digest admin:admin# 设置acl加密
setAcl /digest auth:admin:admin:crwda
5. 启动命令
./bin/kafka-server-start-sasl.sh config/server-sasl.properties
更多推荐
kafka 配置部署及SASL安全认证
发布评论