我正在研究将Websphere MQ用作火花流的数据源的可能性,因为在我们的一个用例中需要它. 我知道 MQTT 是支持MQ数据结构通信的协议,但是由于我是引发流传输的新手我同样需要一些工作示例. 是否有人尝试将MQ与Spark Streaming连接起来.请设计出最佳方法.
I was digging into the possibilities for Websphere MQ as a data source for spark-streaming becuase it is needed in one of our use case. I got to know that MQTT is the protocol that supports the communication from MQ data structures but since I am a newbie to spark streaming I need some working examples for the same. Did anyone try to connect the MQ with spark streaming. Please devise the best way for doing so.
推荐答案所以,我在这里发布CustomMQReceiver的工作代码,该代码连接Websphere MQ并读取数据:
So, I am posting here the working code for CustomMQReceiver which connects the Websphere MQ and reads data :
public class CustomMQReciever extends Receiver<String> { String host = null; int port = -1; String qm=null; String qn=null; String channel=null; transient Gson gson=new Gson(); transient MQQueueConnection qCon= null; Enumeration enumeration =null; public CustomMQReciever(String host , int port, String qm, String channel, String qn) { super(StorageLevel.MEMORY_ONLY_2()); this.host = host; this.port = port; this.qm=qm; this.qn=qn; this.channel=channel; } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { try { initConnection(); receive(); } catch (JMSException ex) { ex.printStackTrace(); } } }.start(); } public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a MQ connection and receive data until receiver is stopped */ private void receive() { System.out.print("Started receiving messages from MQ"); try { JMSMessage receivedMessage= null; while (!isStopped() && enumeration.hasMoreElements() ) { receivedMessage= (JMSMessage) enumeration.nextElement(); String userInput = convertStreamToString(receivedMessage); //System.out.println("Received data :'" + userInput + "'"); store(userInput); } // Restart in an attempt to connect again when server is active again //restart("Trying to connect again"); stop("No More Messages To read !"); qCon.close(); System.out.println("Queue Connection is Closed"); } catch(Exception e) { e.printStackTrace(); restart("Trying to connect again"); } catch(Throwable t) { // restart if there is any other error restart("Error receiving data", t); } } public void initConnection() throws JMSException { MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); conFactory.setHostName(host); conFactory.setPort(port); conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); conFactory.setQueueManager(qm); conFactory.setChannel(channel); qCon= (MQQueueConnection) conFactory.createQueueConnection(); MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); MQQueue queue=(MQQueue) qSession.createQueue(qn); MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); qCon.start(); enumeration= browser.getEnumeration(); } @Override public StorageLevel storageLevel() { return StorageLevel.MEMORY_ONLY_2(); } }更多推荐
Websphere MQ作为Apache Spark流的数据源
发布评论