通过SSH隧道安全消费Kafka数据

编程入门 行业动态 更新时间:2024-10-16 00:25:09

通过SSH<a href=https://www.elefans.com/category/jswz/34/1762965.html style=隧道安全消费Kafka数据"/>

通过SSH隧道安全消费Kafka数据

一.背景

    由于我们有个业务在阿里云部署了Kafka,但是想直接在本地IDC机房服务器直接通过公网消费Kafka进行业务处理。这个本来也不是什么难事,阿里云把9092默认端口打开运行访问即可,也不不值得再写这篇博客了。 这个事情让人特别关注的一个主题其实就是数据安全问题。

    如果从公网消费Kafka, 由于Kafka我们并没有配置SSL机制,所以如果直接通过公网消费意味着数据明文裸奔,导致数据泄漏。 那么针对这个安全问题,我们是这么做的:

     1.首先,阿里云安全组更改kafka的默认端口号,并且对访问kafka的端口设置为公司的IP白名单,保证只有机房IP才能正常访问,非法IP则无法访问。

      2.通过搭建SSH隧道代理的方式,在IDC服务器做一个SSH隧道代理到阿里云服务器的Kafka端口,这样从公网消费的kafka数据都经过隧道进行传输,避免了数据通过明文传输的风险。

    还不太了解什么是SSH隧道的童鞋,可以参考一下我之前写过的博客: ssh端口转发(隧道技术)

二.实现流程

1.运行SSH隧道代理

ssh -N -o "ServerAliveInterval 60" -L 192.168.1.101:9092:10.0.3.99:9092 -p 22 root@182.43.x.x

解释一下这个语句:

-N   #不登录到远程服务器-o "ServerAliveInterval 60"   #心跳检测周期时间 60s-L 192.168.1.101:9092:10.0.3.99:9092  #绑定本地192.168.1.101:9092端口,映射到10.0.3.99:9092(阿里云服务器内网IP)-p 22 #SSH端口root@182.43.x.x  #服务器的公网IP

使用supervisord加以运行即可,防止进程意外挂掉,保证进程高可用。

2.Python客户端无法通过SSH隧道消费

执行消费代码:

from kafka import KafkaConsumerconsumer = KafkaConsumer('test', bootstrap_servers='192.168.1.101:9092', group_id="python", retries=3, max_block_ms=3000, request_timeout_ms=3000)print("开始")
for msg in consumer:print('从kafka获取到的数据: ')print(msg.value.decode(encoding='utf-8'))

连接kafka消费超时: 

 

3.Kafka客户端连接集群的流程分析

    如果上面的服务是http或者https服务器,你通过curl访问本地192.168.1.101:9092的话绝对是没啥问题的。 并且我通过测试 telnet 192.168.1.101 9092 是能正常连接的,由此说明我搭建的SSH隧道是没问题的。  现在出问题更多的可能是在于我Kafka配置或者Kafka的用法存在问题.  查了一波资料引用下这个网友回答:

根据这个网友的资料,我自己画了一下Kafka客户端连接服务端的原理流程图:

 kafka存在2个比较重要的参数: 

       1.KAFKA_LISTENERS(只负责集群服务的监听,用来返回brokers列表给客户端)

       2.KAFKA_ADVERTISED_LISTENERS(客户端真实连接的broker端点, 进行数据传输)

4.Kafka无法消费原因求证

我当前的Kafka配置信息:

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.3.99:9092
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

我访问192.168.1.101:9092,此时可以通过隧道访问到阿里云公网【KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092】,此时Kafka集群返回给客户端的brokers是【KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.3.99:9092】, 那我python客户端真实连接的brokers端点则是10.0.3.99:9092, 但是明显我本地的网络环境根本连不到10.0.3.99这个IP,  所以导致我的python脚本超时。OK.如果到这里仅仅只是猜测,那么我们通过抓包来验证下结论:

 事实胜于雄辩! 此时大家可以看到最后我本地python客户端连接的brokers是10.0.3.99:9092, 所以导致消费超时!

5.使用域名的方式修改kafka配置

最后我们采用域名的方式修改了kafka配置,相对灵活:

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://my.kafka:9092
KAFKA_LISTENERS=PLAINTEXT://my.kafka:9092

 阿里云服务器/etc/hosts添加了一条记录:

0.0.0.0  my.kafka

此时,我们的python客户端所在宿主机也要加下hosts, 将my.kafka指向SSH隧道代理的服务器ip:

192.168.1.101 my.kafka

修改后的python代码:

from kafka import KafkaConsumerconsumer = KafkaConsumer('test', bootstrap_servers='my.kafka:9092', group_id="python", retries=3, max_block_ms=3000, request_timeout_ms=3000)print("开始")
for msg in consumer:print('从kafka获取到的数据: ')print(msg.value.decode(encoding='utf-8'))

那么此时,kafka就能正常消费了。 我们再来捋一下python客户端连接kafka集群的过程。 首先,连接my.kafka:9092获取到broker列表, my.kafka对应的ip是192.168.1.101, 连接的端口是9092,没毛病,因为SSH隧道是正常的. 此时kafka返回broker列表为【KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://my.kafka:9092】, 明显我在本地访问broker   my.kafka:9092也能正常连接,到此,python从kafka消费正常。

 

综上所述,我们得到一些经验:

   1. 在搭建kafka集群的时候使用域名的方式来配置,而不是使用IP.  这样我们可以很灵活的做hosts配置来访问kafka.

    2.充分理解Kafka的2个参数KAFKA_ADVERTISED_LISTENERS、KAFKA_LISTENERS含义以及Kafka客户端连接到服务端的流程和原理

    3.别人说的不一定是正确的,大家可以自行判断、求证(包括我写的,大家可以自己去求证)

更多推荐

通过SSH隧道安全消费Kafka数据

本文发布于:2024-03-06 05:48:38,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1714523.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:隧道   数据   SSH   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!