admin管理员组文章数量:1642217
问题一:
22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.googlemon.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.googlemon.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
... 3 more
22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds
22/06/23 12:09:02 INFO avro.ReliableSpoolingFileEventReader: Last read was never committed - resetting mark position.
分析以上日志可知:
1. 22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed sink进程失败.
2. java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null flume的日志头信息中缺少timestamp.
3. 22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event. sinkRunner组件无法发送event.
4. 22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds channel堆积event满了,现在source无法写入。4秒后再试.
从以上可知: 应该是sink的问题。跟timestamp的配置有关. source和channel没有问题.
以上是数据采集的目录,从这里可以看到,有一部分文件已经标识为 COMPLETED了,即source没有问题,但后面有几个文件没有变为COMPLETED,与前面的信息配置分析可知,这是因为channel满了,无法继续读取数据导致.
hdfs中一个文件都没有,说明sink工作失败。
查看我的配置文件:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /tmp/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = = hdfs://node1:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 10
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = minute
#是否使用本地时间戳
#a3.sinks.k3.hdfs.useLocalTimeStamp = true #关键是这一句被 注释了
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
#a3.sinks.k3.hdfs.useLocalTimeStamp = true 这一句话被 注释了, 无法生成timestamp.
2. 改好后,运行又出现一错误:
java.URISyntaxException: Illegal character in scheme name at index 0: = hdfs://node1:8020/flume/upload/20220623/12/upload-.1655957677405.tmp
这是说hdfs的路径配置的第0个字母的 = 不合法. 然后到配置文件中一查,发现多写了一个 = .
去掉后运行结果如下.
生成了大量的上传文件, 这是因为 时间间隔没有配置。 再修改sink的配置信息如下:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /tmp/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 10
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = minute
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 1000
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
生成的文件数可控.
版权声明:本文标题:解决Flume数据采集中出现的几个问题 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1729332108a1196521.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论