闪光。卡夫卡消费者没有收到卡夫卡的信息

编程入门 行业动态 更新时间:2024-10-22 11:04:14
本文介绍了闪光。卡夫卡消费者没有收到卡夫卡的信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我在Mac上将Kafka和Flink作为坞站容器运行。

我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。

作业开始时没有问题,但零消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。

闪烁作业(Java):

package com.p81.datapipeline.swg; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class SWGEventJob { private static final Logger LOG = LoggerFactory.getLogger(SWGEventJob.class); public static void main(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromArgs(args); final String inputTopic = parameterTool.get("kafka_input_topic","kafka_fake_swg_event_topic_in"); final String outputTopic = parameterTool.get("kafka_output_topic","kafka_fake_swg_event_topic_out"); final String consumerGroup = parameterTool.get("kafka_consumer_group","p81_swg_event_consumer_group"); final String bootstrapServers = parameterTool.get("kafka_bootstrap_servers","broker:29092"); LOG.info("inputTopic : " + inputTopic); LOG.info("outputTopic : " + outputTopic); LOG.info("consumerGroup : " + consumerGroup); LOG.info("bootstrapServers : " + bootstrapServers); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer<SWGEvent> swgEventConsumer = createSWGEventConsumer(inputTopic, bootstrapServers, consumerGroup); swgEventConsumer.setStartFromEarliest(); DataStream<SWGEvent> dataStream = env.addSource(swgEventConsumer).name(String.format("SWG Event Kafka Consumer [%s]",inputTopic)); FlinkKafkaProducer<SWGEvent> swgEventProducer = createSWGEventProducer(outputTopic, bootstrapServers); dataStream.map(new SWGEventAnonymizer()).addSink(swgEventProducer).name(String.format("SWG Event Kafka Producer [%s]",outputTopic)); env.execute("P81 Dummy SWG Event Flink Job"); } static private FlinkKafkaConsumer<SWGEvent> createSWGEventConsumer(String topic, String kafkaAddress, String kafkaGroup) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("group.id", kafkaGroup); return new FlinkKafkaConsumer<>(topic, new SWGEventDeserializationSchema(), properties); } static private FlinkKafkaProducer<SWGEvent> createSWGEventProducer(String topic, String kafkaAddress) { return new FlinkKafkaProducer<>(kafkaAddress, topic, new SWGEventSerializationSchema()); } }

闪烁作业日志:

2021-11-25 10:03:25,282 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true) 2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - inputTopic : kafka_fake_swg_event_topic_in 2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - outputTopic : kafka_fake_swg_event_topic_out 2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - consumerGroup : p81_swg_event_consumer_group 2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - bootstrapServers : broker:29092 2021-11-25 10:03:26,155 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Property [transaction.timeout.ms] not specified. Setting it to 3600000 ms 2021-11-25 10:03:26,202 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 62c766b4ace055cf91f97f1e46f621d1 is submitted. 2021-11-25 10:03:26,202 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=62c766b4ace055cf91f97f1e46f621d1. 2021-11-25 10:03:26,301 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 62c766b4ace055cf91f97f1e46f621d1 (P81 Dummy SWG Event Flink Job). 2021-11-25 10:03:26,302 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 62c766b4ace055cf91f97f1e46f621d1 (P81 Dummy SWG Event Flink Job). 2021-11-25 10:03:26,306 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_15 . 2021-11-25 10:03:26,307 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1). 2021-11-25 10:03:26,309 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1). 2021-11-25 10:03:26,309 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1). 2021-11-25 10:03:26,309 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms. 2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms 2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@252e8634 2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'jobmanager' 2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore. 2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3931aba0 for P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1). 2021-11-25 10:03:26,311 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1) under job master id 00000000000000000000000000000000. 2021-11-25 10:03:26,318 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] 2021-11-25 10:03:26,318 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1) switched from state CREATED to RUNNING. 2021-11-25 10:03:26,319 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from CREATED to SCHEDULED. 2021-11-25 10:03:26,320 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) 2021-11-25 10:03:26,321 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager address, beginning registration 2021-11-25 10:03:26,322 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_15 for job 62c766b4ace055cf91f97f1e46f621d1. 2021-11-25 10:03:26,324 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_15 for job 62c766b4ace055cf91f97f1e46f621d1. 2021-11-25 10:03:26,327 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000. 2021-11-25 10:03:26,328 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 62c766b4ace055cf91f97f1e46f621d1: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}] 2021-11-25 10:03:26,394 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from SCHEDULED to DEPLOYING. 2021-11-25 10:03:26,395 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (attempt #0) with attempt id 87c54365842acb250dc6984b1ca9b466 to 172.18.0.4:35157-adeb80 @ kafka_taskmanager_1.kafka_default (dataPort=41077) with allocation id 968834ad9a512d16050107a088449490 2021-11-25 10:03:26,546 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from DEPLOYING to INITIALIZING. 2021-11-25 10:03:27,597 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from INITIALIZING to RUNNING.

生产者作业(Python):(在主机上运行-不在docker上运行)

import json import os import time from dataclasses import dataclass, asdict from random import randint from kafka import KafkaProducer import logging logging.basicConfig(level=logging.INFO) _METHODS = ['GET'] * 17 + ['POST', 'PUT', 'DELETE'] _ACTIONS = ['ALLOW', 'WARNING', 'BLOCK'] _URLS = ['x'] @dataclass class SWGEvent: url: str action: str agentId: int agentIP: str HTTPMethod: str timestamp: int def _get_fake_swg_event() -> SWGEvent: url = _URLS[randint(0, len(_URLS) - 1)] action = _ACTIONS[randint(0, len(_ACTIONS) - 1)] agent_id = randint(1, 1000) agent_ip = f'{randint(1, 255)}.{randint(1, 255)}.{randint(1, 255)}.{randint(1, 255)}' http_method = _METHODS[randint(0, len(_METHODS) - 1)] timestamp = int(time.time()) return SWGEvent(url, action, agent_id, agent_ip, http_method, timestamp) def produce(producer: KafkaProducer, topic_name: str) -> None: x = 0 while x < 500: event: SWGEvent = _get_fake_swg_event() result = producer.send(topic_name, asdict(event)) x += 1 time.sleep(1) producer.flush() logging.info(f'send result: {str(result)}') if __name__ == '__main__': kafka_server = os.getenv('KAFKA_SERVER') topic_name = os.getenv('TOPIC_NAME') logging.info(f'Producer.Working with server {kafka_server} and topic {topic_name}') producer = KafkaProducer(bootstrap_servers=kafka_server, value_serializer=lambda v: json.dumps(v).encode('utf-8')) produce(producer, topic_name)

输出python代码:

INFO:root:Producer.Working with server localhost:9092 and topic kafka_fake_swg_event_topic_in

docker-compose.yml

version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: schema-registry:8091 schema-registry: image: confluentinc/cp-schema-registry:7.0.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8091:8091" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: 0.0.0.0:8091 rest-proxy: image: confluentinc/cp-kafka-rest:7.0.0 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'schema-registry:8091' KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*' KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD' jobmanager: image: flink:1.13.2-scala_2.12 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.13.2-scala_2.12 depends_on: - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2

坞站PS

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 2f465a0a4129 confluentinc/cp-kafka-rest:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 23 hours 0.0.0.0:8082->8082/tcp rest-proxy eb25992c47d0 confluentinc/cp-schema-registry:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 23 hours 8081/tcp, 0.0.0.0:8091->8091/tcp schema-registry 1081319da296 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 17 hours 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp broker de9056ee250c flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 23 hours ago Up 28 minutes 6123/tcp, 8081/tcp kafka_taskmanager_1 b38beefc35e3 confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 23 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper e6db23fa8842 flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 23 hours ago Up 18 hours 6123/tcp, 0.0.0.0:8081->8081/tcp kafka_jobmanager_1

问题:若要将邮件放入Flink作业,应修复哪些问题?

更新#1 看起来工作正常。由卡夫卡消费者消费并由卡夫卡生产者生产的事件。(我通过查看Flink任务管理器日志了解到这一点。)因此,实际问题是-为什么Flink UI显示零活动?

推荐答案

您正在查看的Flink度量仅测量Flink群集本身内发生的流量(使用Flink的序列化程序和网络堆栈),而忽略作业图边缘的通信(使用连接器的序列化程序和网络)。

换句话说,源从不报告传入的记录,汇点从不报告传出的记录。

此外,在您的工作中,所有操作员都可以链接在一起,因此根本不使用Flink的网络。

是的,这令人困惑。

更多推荐

闪光。卡夫卡消费者没有收到卡夫卡的信息

本文发布于:2023-11-26 00:32:26,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631944.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:卡夫卡   闪光   消费者   信息

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!