【Flink Kafka】Flink程序连接Kafka没输出也不报错

编程入门 行业动态 更新时间:2024-10-21 15:29:02

【Flink Kafka】Flink程序连接Kafka没输出<a href=https://www.elefans.com/category/jswz/34/1766868.html style=也不报错"/>

【Flink Kafka】Flink程序连接Kafka没输出也不报错

Flink程序连接Kafka没输出也不报错

本人最近在使用Kafka作为数据源输出数据到Flink时遇到一个问题,那就是既没有结果输出,也没有报错

代码如下

package Sourceimport org.apache.flink.apimon.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Propertiesobject SourceFromKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//配置项val properties = new Properties()properties.setProperty("bootstrap.servers", "master:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")//这边是要读取Kafka的数据,所以算是一个消费者/*** 第一个泛型:读取过来的数据类型* 第一个参数:Kafka名* 第二个参数:序列化* 第三个参数:配置项*/val stream =env.addSource(new FlinkKafkaConsumer011[String]("first", new SimpleStringSchema(), properties))stream.print()env.execute()}
}

结果就和下图相似,不报错,也没有输出


这样的结果让人很困惑,于是我们准备看看更加详细的信息,我们在pom文件中添加下面的依赖

        <dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>

src/main/resources文件夹下创建一个名为log4j.properties的文件,在该文件里添加如下内容

log4j.rootLogger=info,console  log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

这个时候在运行,我们就可以看到下面几行信息

[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:607): Discovered coordinator slave2:9092 (id: 2147483645 rack: null) for group consumer-group.  
[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:652): Marking the coordinator slave2:9092 (id: 2147483645 rack: null) dead for group consumer-group 

我们看到信息中出现了slave2这个正是我们安装Kafka集群的虚拟机的主机名,然后我们想到是否是windows上运行的kafka拿到的host是机器名而不是IP地址,因此我们赶紧去修改host文件,文件地址:C:\Windows\System32\drivers\etc\hosts


此时再运行程序,问题解决

更多推荐

【Flink Kafka】Flink程序连接Kafka没输出也不报错

本文发布于:2024-02-06 15:06:45,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1749811.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:也不   报错   程序   Flink   Kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!