storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪

编程入门 行业动态 更新时间:2024-10-27 22:34:10

storm运行异常之No output fields defined for component:stream XxxBolt:null<a href=https://www.elefans.com/category/jswz/34/935350.html style=疑案追踪"/>

storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪

前言

上一篇写了 storm运行异常之No output fields defined for component:stream XxxBolt:null 发现是多线程导致的,但是也有可能是其他原因,今天就来追踪一下。


反查蛛丝马迹

错误log:

Caused by: java.lang.IllegalArgumentException: No output fields defined for component:stream XxxBolt:nullat backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:113) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.tuple.TupleImpl.<init>(TupleImpl.java:53) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:54) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.daemon.executor$mk_task_receiver$fn__4244.invoke(executor.clj:397) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.disruptor$clojure_handler$reify__1668.onEvent(disruptor.clj:59) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:124) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]... 6 common frames omitted

从log上看是GeneralTopologyContext的方法抛出,我们来看一下

/*** Gets the declared output fields for the specified component/stream.*/public Fields getComponentOutputFields(String componentId, String streamId) {Fields ret = _componentToStreamToFields.get(componentId).get(streamId);if(ret==null) {throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);}return ret;}

根据log打印信息可知,是  streamId 为null。一般来说bolt往下emit时,可以指定streamId,如果不指定的话,storm会给定一个默认的default streamId,所以这里streamId为null就是一个奇怪的异常。

继续观察错误stack,发现是executor.clj 的 mk_task_receiver 调用出错。来看看这个方法:

(defn mk-task-receiver [executor-data tuple-action-fn](let [^KryoTupleDeserializer deserializer (:deserializer executor-data)task-ids (:task-ids executor-data)debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))](disruptor/clojure-handler(fn [tuple-batch sequence-id end-of-batch?](fast-list-iter [[task-id msg] tuple-batch](let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))](when debug? (log-message "Processing received message " tuple))(if task-id(tuple-action-fn task-id tuple);; null task ids are broadcast tuples(fast-list-iter [task-id task-ids](tuple-action-fn task-id tuple)))))))))

根据错误stack的行号指示是在 let [^ TupleImpl tuple (if instance? Tuple msg ......)]这行报错。

这里是对Tuple发序列化过程,实例一个TupleImpl,会调用其构造函数:

public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {this.values = values;this.taskId = taskId;this.streamId = streamId;this.id = id;this.context = context;String componentId = context.getComponentId(taskId);Fields schema = context.getComponentOutputFields(componentId, streamId);if(values.size()!=schema.size()) {throw new IllegalArgumentException("Tuple created with wrong number of fields. " +"Expected " + schema.size() + " fields but got " +values.size() + " fields");}}

这里会调用GeneralTopologyContext的getComponentOutputFields方法,传进去的streamId为null


那么这个StreamId是从什么时候传进来的呐??


线索

storm是像spark一样,使用DAG引擎的,关于DAG引擎的优缺点,请看 DAG (directed acyclic graph) 作为大数据执行引擎的优点

DAG就是一个有向图,在createTopology时就创建好了,具体请看

1、我们一般用TopologyBuilder来构建topology,每次setBolt时,都会把指定group方式,grouping里面就保留当前bolt接收上游bolt的streamId

private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {_commons.get(_boltId).put_to_inputs(new GlobalStreamId(

更多推荐

storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪

本文发布于:2024-02-08 21:33:42,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1675296.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:疑案   异常   fields   output   storm

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!