疑案追踪"/>
storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪
前言
上一篇写了 storm运行异常之No output fields defined for component:stream XxxBolt:null 发现是多线程导致的,但是也有可能是其他原因,今天就来追踪一下。
反查蛛丝马迹
错误log:
Caused by: java.lang.IllegalArgumentException: No output fields defined for component:stream XxxBolt:nullat backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:113) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.tuple.TupleImpl.<init>(TupleImpl.java:53) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:54) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.daemon.executor$mk_task_receiver$fn__4244.invoke(executor.clj:397) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.disruptor$clojure_handler$reify__1668.onEvent(disruptor.clj:59) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:124) ~[storm-core-0.9.3-rc1.jar:0.9.3-rc1]... 6 common frames omitted
从log上看是GeneralTopologyContext的方法抛出,我们来看一下
/*** Gets the declared output fields for the specified component/stream.*/public Fields getComponentOutputFields(String componentId, String streamId) {Fields ret = _componentToStreamToFields.get(componentId).get(streamId);if(ret==null) {throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);}return ret;}
根据log打印信息可知,是 streamId 为null。一般来说bolt往下emit时,可以指定streamId,如果不指定的话,storm会给定一个默认的default streamId,所以这里streamId为null就是一个奇怪的异常。
继续观察错误stack,发现是executor.clj 的 mk_task_receiver 调用出错。来看看这个方法:
(defn mk-task-receiver [executor-data tuple-action-fn](let [^KryoTupleDeserializer deserializer (:deserializer executor-data)task-ids (:task-ids executor-data)debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))](disruptor/clojure-handler(fn [tuple-batch sequence-id end-of-batch?](fast-list-iter [[task-id msg] tuple-batch](let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))](when debug? (log-message "Processing received message " tuple))(if task-id(tuple-action-fn task-id tuple);; null task ids are broadcast tuples(fast-list-iter [task-id task-ids](tuple-action-fn task-id tuple)))))))))
根据错误stack的行号指示是在 let [^ TupleImpl tuple (if instance? Tuple msg ......)]这行报错。
这里是对Tuple发序列化过程,实例一个TupleImpl,会调用其构造函数:
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {this.values = values;this.taskId = taskId;this.streamId = streamId;this.id = id;this.context = context;String componentId = context.getComponentId(taskId);Fields schema = context.getComponentOutputFields(componentId, streamId);if(values.size()!=schema.size()) {throw new IllegalArgumentException("Tuple created with wrong number of fields. " +"Expected " + schema.size() + " fields but got " +values.size() + " fields");}}
这里会调用GeneralTopologyContext的getComponentOutputFields方法,传进去的streamId为null
那么这个StreamId是从什么时候传进来的呐??
线索
storm是像spark一样,使用DAG引擎的,关于DAG引擎的优缺点,请看 DAG (directed acyclic graph) 作为大数据执行引擎的优点
DAG就是一个有向图,在createTopology时就创建好了,具体请看
1、我们一般用TopologyBuilder来构建topology,每次setBolt时,都会把指定group方式,grouping里面就保留当前bolt接收上游bolt的streamId
private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {_commons.get(_boltId).put_to_inputs(new GlobalStreamId(
更多推荐
storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪
发布评论