也不报错"/>
【Flink Kafka】Flink程序连接Kafka没输出也不报错
Flink程序连接Kafka没输出也不报错
本人最近在使用Kafka
作为数据源输出数据到Flink
时遇到一个问题,那就是既没有结果输出,也没有报错
代码如下
package Sourceimport org.apache.flink.apimon.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import java.util.Propertiesobject SourceFromKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//配置项val properties = new Properties()properties.setProperty("bootstrap.servers", "master:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")//这边是要读取Kafka的数据,所以算是一个消费者/*** 第一个泛型:读取过来的数据类型* 第一个参数:Kafka名* 第二个参数:序列化* 第三个参数:配置项*/val stream =env.addSource(new FlinkKafkaConsumer011[String]("first", new SimpleStringSchema(), properties))stream.print()env.execute()}
}
结果就和下图相似,不报错,也没有输出
这样的结果让人很困惑,于是我们准备看看更加详细的信息,我们在pom
文件中添加下面的依赖
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
在src/main/resources
文件夹下创建一个名为log4j.properties
的文件,在该文件里添加如下内容
log4j.rootLogger=info,console log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n
这个时候在运行,我们就可以看到下面几行信息
[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:607): Discovered coordinator slave2:9092 (id: 2147483645 rack: null) for group consumer-group.
[INFO ] 2022-03-23 10:34:33,892(26157) --> [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (5/6)] org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:652): Marking the coordinator slave2:9092 (id: 2147483645 rack: null) dead for group consumer-group
我们看到信息中出现了slave2
这个正是我们安装Kafka
集群的虚拟机的主机名,然后我们想到是否是windows
上运行的kafka
拿到的host
是机器名而不是IP
地址,因此我们赶紧去修改host
文件,文件地址:C:\Windows\System32\drivers\etc\hosts
此时再运行程序,问题解决
更多推荐
【Flink Kafka】Flink程序连接Kafka没输出也不报错
发布评论