kafka 配置部署及SASL安全认证

编程入门 行业动态 更新时间:2024-10-10 19:18:26

kafka 配置部署及SASL<a href=https://www.elefans.com/category/jswz/34/1769801.html style=安全认证"/>

kafka 配置部署及SASL安全认证

一、下载及配置说明

1. 下载安装

kafka下载地址:

# 下载文件
wget .2.1/kafka_2.11-2.2.1.tgz# 文件解压缩
tar -zxvf kafka_2.11-2.2.1.tgz# 创建软连接
ln -s kafka_2.12-2.2.0 kafka

2. 配置说明

kafka服务的配置文件为 kafka/config/server.properties ,内容如下:

############################# Server Basics #############################
broker.id=0######################### Socket Server Settings ########################
listeners=PLAINTEXT://192.168.1.128:9092
numwork.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600########################## Log Basics ####################################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1#################### Internal Topic Settings  ############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1######################## Log Flush Policy ################################
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000################### Group Coordinator Settings ############################
group.initial.rebalance.delay.ms=0

配置说明:

  • broker.id:每个broker在集群中的唯一标识。
  • listeners:kafka的监听地址与端口,需要提供外网服务的话,要设置为本地的IP地址。
  • numwork.threads:kafka用于处理网络请求的线程数
  • num.io.threads:kafka用于处理磁盘io的线程数
  • socket.send.buffer.bytes:发送数据的缓冲区
  • socket.receive.buffer.bytes:接收数据的缓冲区
  • socket.request.max.bytes:允许接收的最大数据包的大小(防止数据包过大导致OOM)
  • log.dirs:kakfa用于保存数据的目录,所有的消息都会存储在该目录当中。可以通过逗号来指定多个路径,kafka会根据最少被使用的原则选择目录分配新的partition。需要说明的是,kafka在分配partition的时候选择的原则不是按照磁盘空间大小来定的,而是根据分配的partition的个数多少而定
  • num.partitions:设置新创建的topic的默认分区数
  • number.recovery.threads.per.data.dir:用于恢复每个数据目录时启动的线程数
  • log.retention.hours:配置kafka中消息保存的时间,还支持log.retention.minutes和log.retention.ms。如果多个同时设置会选择时间最短的配置,默认为7天。
  • log.retention.check.interval.ms:用于检测数据过期的周期
  • log.segment.bytes:配置partition中每个segment数据文件的大小。默认为1GB。超出该大小后,会自动创建一个新的segment文件。
  • zookeeper.connect:指定连接的zk的地址,zk中存储了broker的元数据信息。可以通过逗号来设置多个值。格式为:hostname:port/path。hostname为zk的主机名或ip,port为zk监听的端口。/path表示kafka的元数据存储到zk上的目录,如果不设置,默认为根目录
  • zookeeper.connection.timeout:kafka连接zk的超时时间
  • group.initial.rebalance.delay.ms:在实际环境当中,当将多个consumer加入到一个空的consumer group中时,每加入一个consumer就会触发一次对partition消费的重平衡,如果加入100个,就得重平衡100次,这个过程就会变得非常耗时。通过设置该参数,可以延迟重平衡的时间,比如有100个consumer会在10s内全部加入到一个consumer group中,就可以将该值设置为10s,10s之后,只需要做一次重平衡即可。默认为0则代表不开启该特性。
  • auto.create.topics.enable:当有producer向一个不存在的topic中写入消息时,是否自动创建该topic
  • delete.topics.enable:kafka提供了删除topic的功能,但默认并不会直接将topic数据物理删除。如果要从物理上删除(删除topic后,数据文件也一并删除),则需要将此项设置为true

二、单机部署

1. 服务配置

单机模式需要修改 kafka/config/server.properties配置文件:

listener 配置项默认为注释状态,找到配置行,修改为本机IP及需要开启的端口

# 本机IP可省略
# listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.1.128:9092

log.dir 为kafka数据存储地址,修改配置项:

log.dirs=/usr/local/kafka/data

启动自带的zookeeper服务

# 后台启动
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动kafka服务

# 后台启动
./bin/kafka-server-start.sh -daemon  config/server.properties

2. kafka命令

# 创建topic,多个服务用逗号分隔
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test# 查看topic列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 启动一个控制台消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test# 启动一个控制台生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

三、KAFKA 集群部署

1. zookeeper 集群部署

下载地址:.html

配置文件说明:

# 心跳时间间隔
tickTime=2000
# 初始连接心跳书限制
initLimit=10
# 每次请求与应答间的心跳数限制
syncLimit=5
# 存储快照的目录, 不要使用/tmp用于存储, 这里只是一个示例
dataDir=/tmp/zookeeper
# zookeeper服务端口
clientPort=2181
# 客户端连接的最大数目, 修改此选项以增加更多客户端
#maxClientCnxns=60
# 保存在dataDir中的快照数
#autopurge.snapRetainCount=3
# 清除任务间隔(小时), 设置为 “0” 以禁用自动清除功能
#autopurge.purgeInterval=1

准备三台服务器,并分别下载并解压zookeeper-3.4.12,三台服务器的IP如下:

  • 192.168.1.101
  • 192.168.1.102
  • 192.168.1.103

拷贝并重命名配置文件:

cp zoo_sample.cfg zoo.cfg

在三台服务器的 zoo.cfg 配置文件末尾添加如下配置:

# 配置格式 server.id = ip:port:port
# 配置的id唯一,不可重复
# 2888端口为集群内机器通信使用
# 3888端口为leader选举使用
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

在 dataDir 配置目录下新建myid文件,写入id的值:

# 此处仅为示例,dataDir不要使用/tmp目录
vim /tmp/zookeeper/myid

文件内容如下:

启动zookeeper时注意开放端口,centos 7.0 开放端口命令如下:

# --zone             作用域
# --add-port         添加端口,格式:端口/通讯协议
# --permanent        永久生效,无此参数时服务器重启后配置失效
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent# 配置完成后,重启防火墙
firewall-cmd --reload# 查看防火墙已开放的端口
firewall-cmd --list-ports

使用命令 ./zkServer.sh start 分别启动三台服务器的zookeeper,集群启动完成后,三台服务器状态如下:

  • 服务器IP 192.168.1.101:
  • 服务器IP 192.168.1.102:
  • 服务器IP 192.168.1.103:

2. kafka集群部署

在三台服务器上,下载安装kafka,修改配置文件:

# broker.id,每个kafka配置不同的ID,唯一,不可重复
broker.id=0# listeners 配置
listeners=PLAINTEXT://:9092# zookeeper配置,多个地址用逗号分隔
zookeeper.connect=192.168.1.101:2181,192.168.1.103:2181,192.168.1.103:2181

配置完成后启动kafka

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

3. zookeeper 命令简介

zookeeper 命令用于在 zookeeper 服务上执行操作,首先执行命令,打开新的 session 会话,进入终端。

$ ./zkCli.sh

进入终端后如下显示:

查看命令帮助:

命令说明:

命令格式说明
lsls path [watch]查看路径下的目录列表
ls2ls2 path [watch]查看路径下的目录列表,比 ls 更详细
statstat path [watch]查看节点状态信息
setset path data [version]修改节点存储的数据
getget path [watch]获取节点数据和状态信息
createcreate [-s] [-e] path data acl创建节点
deletedelete path [version]删除节点
rmrrmr path递归删除文件夹
historyhistory查看执行的历史命令
quitquit退出终端

四、使用SASL认证

1. 修改启动配置

将kafka/config目录下server.properties复制并重命名

cp server.properties server-sasl.properties

在配置文件末尾添加如下配置:

# 修改listeners
listeners=SASL_PLAINTEXT://:9092
# 权限配置
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAINsuper.users=User:admin

2. 配置认证文件

在 kafka/config目录下创建kafka-server-jaas.conf文件,内容如下:

# 配置文件的分号千万不能忘
KafkaServer {org.apache.kafkamon.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin";
};

3. 修改启动脚本

复制 kafka/bin 目录下的 kafka-server-start.sh 并重命名

cp kafka-server-start.sh kafka-server-start-sasl.sh

打开文件,将最后一行:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

修改为(路径改为服务器的kafka路径):

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/user/kafka/config/kafka-server-jaas.conf kafka.Kafka "$@"

4. zookeeper 配置

使用 zkCli.sh 命令进入zookeeper控制台,执行如下命令

# 创建用户
create /digest user# 添加权限
addauth digest admin:admin# 设置acl加密
setAcl /digest auth:admin:admin:crwda

5. 启动命令

./bin/kafka-server-start-sasl.sh config/server-sasl.properties

更多推荐

kafka 配置部署及SASL安全认证

本文发布于:2024-02-26 03:23:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1701205.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:安全认证   kafka   SASL

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!