Websphere MQ作为Apache Spark流的数据源

编程入门 行业动态 更新时间:2024-10-18 08:35:03
本文介绍了Websphere MQ作为Apache Spark流的数据源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在研究将Websphere MQ用作火花流的数据源的可能性,因为在我们的一个用例中需要它. 我知道 MQTT 是支持MQ数据结构通信的协议,但是由于我是引发流传输的新手我同样需要一些工作示例. 是否有人尝试将MQ与Spark Streaming连接起来.请设计出最佳方法.

I was digging into the possibilities for Websphere MQ as a data source for spark-streaming becuase it is needed in one of our use case. I got to know that MQTT is the protocol that supports the communication from MQ data structures but since I am a newbie to spark streaming I need some working examples for the same. Did anyone try to connect the MQ with spark streaming. Please devise the best way for doing so.

推荐答案

所以,我在这里发布CustomMQReceiver的工作代码,该代码连接Websphere MQ并读取数据:

So, I am posting here the working code for CustomMQReceiver which connects the Websphere MQ and reads data :

public class CustomMQReciever extends Receiver<String> { String host = null; int port = -1; String qm=null; String qn=null; String channel=null; transient Gson gson=new Gson(); transient MQQueueConnection qCon= null; Enumeration enumeration =null; public CustomMQReciever(String host , int port, String qm, String channel, String qn) { super(StorageLevel.MEMORY_ONLY_2()); this.host = host; this.port = port; this.qm=qm; this.qn=qn; this.channel=channel; } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { try { initConnection(); receive(); } catch (JMSException ex) { ex.printStackTrace(); } } }.start(); } public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a MQ connection and receive data until receiver is stopped */ private void receive() { System.out.print("Started receiving messages from MQ"); try { JMSMessage receivedMessage= null; while (!isStopped() && enumeration.hasMoreElements() ) { receivedMessage= (JMSMessage) enumeration.nextElement(); String userInput = convertStreamToString(receivedMessage); //System.out.println("Received data :'" + userInput + "'"); store(userInput); } // Restart in an attempt to connect again when server is active again //restart("Trying to connect again"); stop("No More Messages To read !"); qCon.close(); System.out.println("Queue Connection is Closed"); } catch(Exception e) { e.printStackTrace(); restart("Trying to connect again"); } catch(Throwable t) { // restart if there is any other error restart("Error receiving data", t); } } public void initConnection() throws JMSException { MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); conFactory.setHostName(host); conFactory.setPort(port); conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); conFactory.setQueueManager(qm); conFactory.setChannel(channel); qCon= (MQQueueConnection) conFactory.createQueueConnection(); MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); MQQueue queue=(MQQueue) qSession.createQueue(qn); MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); qCon.start(); enumeration= browser.getEnumeration(); } @Override public StorageLevel storageLevel() { return StorageLevel.MEMORY_ONLY_2(); } }

更多推荐

Websphere MQ作为Apache Spark流的数据源

本文发布于:2023-11-25 23:00:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631687.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据源   MQ   Websphere   Spark   Apache

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!