部署Storm构建JAR(Deploying Storm build JAR)

编程入门 行业动态 更新时间:2024-10-06 12:26:15
部署Storm构建JAR(Deploying Storm build JAR)

我开发了一个Java类,它从Kafka队列中读取数据并将其打印出来

ZkHosts zkHosts=new ZkHosts("localhost:2181"); String topic_name="test"; String consumer_group_id="storm"; String zookeeper_root=""; SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id); kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); /*kafkaConfig.forceFromStart=false; kafkaConfig.startOffsetTime =-2;*/ KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); TopologyBuilder builder=new TopologyBuilder(); //builder.setSpout("KafkaSpout", kafkaSpout, 1); builder.setSpout("KafkaSpout", kafkaSpout); builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout"); Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); LocalCluster cluster=new LocalCluster(); try{ cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology()); Thread.sleep(120000); }catch (Exception e) { //throw new IllegalStateException("Couldn't initialize the topology", e); System.out.println(e.getMessage()); }

在编码之后,我正在将Maven构建为JAR文件并将jar移动到Amazon AWS集群

然后像nohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App一样运行命令nohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App

但是我在这里面临一个错误,有人能告诉我在部署中我做了什么错误吗? 我在想我必须这样做:

我需要在strom配置文件夹中部署这个jar文件,我需要吗? 但我确实将jar放在AWS的单独文件夹中(不在storm文件夹中) 如何看待系统输出 我是否需要在项目中包含任何yml文件?

请查看以下异常:

29537 [Thread-14-KafkaSpout] ERROR backtype.storm.util - Async loop died! java.lang.ExceptionInInitializerError: null at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.utils.Logging$class.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.logger$lzycompute(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.utils.Logging$class.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at clojure.lang.AFn.run(AFn.java:24) [uber-iot-0.0.1-SNAPSHOT.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66] Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] ... 22 common frames omitted

I developed a Java class which reads data from Kafka queue and prints it out

ZkHosts zkHosts=new ZkHosts("localhost:2181"); String topic_name="test"; String consumer_group_id="storm"; String zookeeper_root=""; SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id); kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); /*kafkaConfig.forceFromStart=false; kafkaConfig.startOffsetTime =-2;*/ KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); TopologyBuilder builder=new TopologyBuilder(); //builder.setSpout("KafkaSpout", kafkaSpout, 1); builder.setSpout("KafkaSpout", kafkaSpout); builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout"); Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); LocalCluster cluster=new LocalCluster(); try{ cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology()); Thread.sleep(120000); }catch (Exception e) { //throw new IllegalStateException("Couldn't initialize the topology", e); System.out.println(e.getMessage()); }

after coding, I am doing Maven build as JAR file and moved the jar to Amazon AWS cluster

and then running command like nohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App

But I am facing an error here, could anyone tell me what mistake I am doing in deployment?. I am thinking following I have to do:

I need to deploy this jar file in strom config folder, do I need to? but I did placed the jar in seperate folder in AWS (not in storm folder) how to see the sys outs do I need to include any yml files in my project?

Please find below exception:

29537 [Thread-14-KafkaSpout] ERROR backtype.storm.util - Async loop died! java.lang.ExceptionInInitializerError: null at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.utils.Logging$class.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.logger$lzycompute(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.utils.Logging$class.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.network.BlockingChannel.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] at clojure.lang.AFn.run(AFn.java:24) [uber-iot-0.0.1-SNAPSHOT.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66] Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[uber-iot-0.0.1-SNAPSHOT.jar:na] ... 22 common frames omitted

最满意答案

@Matthias J. Sax和大家,感谢您的帮助。 我在这里犯的错误是,我所遵循的部署过程是错误的。 要部署toplogy构建,我必须遵循以下流程,

Jar必须被推入风暴AWS文件夹,然后必须在命令下运行以使其被Storm识别

rm -f * .out

(nohup bin / storm nimbus> nimubus.out)&

(nohup bin / storm supervisor> supervisor.out)&

(nohup bin / storm jar topos / IoT.jar com.bridgera.iot.test.App01> IoT.out)&

在这里,我告诉风暴,它可以找到我的jar和主类,从哪里可以找到拓扑构建器......

多谢你们...

@Matthias J. Sax and everyone, thanks for your help. The mistake i did here is, the deployment process i followed is wrong. To deploy the toplogy build i have to folow below process,

Jar has to be push into storm AWS folder and then has to run below commands to make it recognized by Storm

rm -f *.out

(nohup bin/storm nimbus > nimubus.out)&

(nohup bin/storm supervisor > supervisor.out)&

(nohup bin/storm jar topos/IoT.jar com.bridgera.iot.test.App01 > IoT.out)&

here i am telling the storm where it can find my jar and the main class from where it can find the topology builder...

Thanks guys...

更多推荐

本文发布于:2023-08-07 11:14:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1464122.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:JAR   Storm   build   Deploying

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!