外部访问K8S集群内部的kafka集群服务

编程入门 行业动态 更新时间:2024-10-11 11:14:48

外部访问K8S<a href=https://www.elefans.com/category/jswz/34/1771240.html style=集群内部的kafka集群服务"/>

外部访问K8S集群内部的kafka集群服务

不许转载

kafka 部署
把 kafka 部署到 k8s 后,我们肯定是通过 service 从 k8s 外部访问 kafaka。这里的 service 要么是 NodePort, 要么是 LoadBalancer 类型。我们使用的方式是 LoadBalancer。
我们先看下面这张图,这是 kafka 在集群中的网络拓扑。当我们通过地址 12.345.67:31090 访问到 kafka 后,kafka 返回的访问地址是类似这样的 endpoint jettopro-kafka.jettopro-poc.svc.cluster.local:9092。这是 k8s 集群内部能访问的 headless service endpoint 地址,从集群外部自然使用这个地址是访问不通的。

所以,我们需要解决两个问题:

  1. kafka 不同的 pod 需要不通的对外能访问的地址
  2. 不能使用 kafka 默认的 advertised.listeners
解决方案

问题1,我们为每个 pod 创建类型是 LoadBalancer 的 service 并且监听不同的端口。这样通过 LB IP + port 就能找到特定的 kafka broker。
service 示例如下:

apiVersion: v1
kind: Service
metadata:name: kafka-{index}
spec:externalTrafficPolicy: Localtype: LoadBalancerselector:statefulset.kubernetes.io/pod-name: kafka-{index}ports:- protocol: TCPport: {9092+index}targetPort: 9092

这里如果不是云主机,也可以使用NodePort类型来暴露kafka服务。

问题2,我们在容器启动的时候,执行脚本动态获取 pod 编号,生成容器需要的环境变量 KAFKA_CFG_ADVERTISED_LISTENERS(对应 kafka broker 的配置 advertised.listener)
 

HOSTNAME=`hostname -s`
if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; thenORD=${BASH_REMATCH[2]}PORT=$((ORD + 9092))#12.345.67.8 是 LB 的 ipexport KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://12.345.67.8:$PORT"
elseecho "Failed to get index from hostname $HOST"exit 1
fi
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:selector:matchLabels:app: kafkaserviceName: kafkareplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"containers:- name: kafkacommand:- bash- -ec- |HOSTNAME=`hostname -s`if [[ $HOSTNAME =~ (.*)-([0-9]+)$ ]]; thenORD=${BASH_REMATCH[2]}PORT=$((ORD + 9092))export KAFKA_CFG_ADVERTISED_LISTENERS="PLAINTEXT://12.345.67.8:$PORT"elseecho "Failed to get index from hostname $HOST"exit 1fiexec /entrypoint.sh /run.shimagePullPolicy: Alwaysimage: "bitnami/kafka:2"env:- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"- name: KAFKA_CFG_ZOOKEEPER_CONNECTvalue: "zookeeper-0.zookeeper-hs:2181,zookeeper-1.zookeeper-hs:2181,zookeeper-2.zookeeper-hs:2181"- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORvalue: "3"- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRvalue: "3"- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: "3"ports:- containerPort: 9092volumeMounts:- name: kafka-datamountPath: /bitnamisecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ]storageClassName: alicloud-disk-efficiencyresources:requests:storage: 20Gi

更多推荐

外部访问K8S集群内部的kafka集群服务

本文发布于:2023-11-15 13:52:56,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1600778.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:集群   K8S   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!