我是jms的新手.目标是通过将侦听器实例附加到多个使用方,每个使用方使用其自己的会话并在单独的线程中运行,从而将消息传递到不同的使用方,从而在异步侦听器的onMessage方法中从队列中并发处理消息.并发处理.
I am new to jms. The goal is to process messages concurrently from a queue in an asynchronous listener's onMessage method by attaching a listener instance to multiple consumer's with each consumer using its own session and running in a separate thread, that way the messages are passed on to the different consumers for concurrent processing.
1)是否可以通过创建多个使用者来同时处理单个队列中的消息? 2)我想出了下面的代码,但是想让您想到下面的代码对于我想要完成的事情是否正确.
1) Is it possible to process messages concurrently from a single queue by creating multiple consumers ? 2) I came up with the below code, but would like to get your thoughts on whether the below code looks correct for what I want to accomplish.
public class QueueConsumer implements Runnable, MessageListener { public static void main(String[] args) { QueueConsumer consumer1 = new QueueConsumer(); QueueConsumer consumer2 = new QueueConsumer(); try { consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON"); consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON"); } catch (JMSException ex) { ex.printStackTrace(); System.exit(-1); } Thread newThread1 = new Thread(consumer1); Thread newThread2 = new Thread(consumer1); newThread1.start(); newThread2.start(); } private static String connectionFactoryName = null; private static String queueName = null; private static ConnectionFactory qcf = null; private static Connection queueConnection = null; private Session ses = null; private Destination queue = null; private MessageConsumer msgConsumer = null; public static final Logger logger = LoggerFactory .getLogger(QueueConsumer.class); public QueueConsumer() { super(); } public void onMessage(Message msg) { if (msg instanceof TextMessage) { try { //process message } catch (JMSException ex) { ex.printStackTrace(); } } } public void run() { try { queueConnection.start(); } catch (JMSException e) { e.printStackTrace(); System.exit(-1); } while (!Thread.currentThread().isInterrupted()) { synchronized (this) { try { wait(); } catch (InterruptedException ex) { break; } } } } public void init(String factoryName, String queue2) throws JMSException { try { qcf = new JMSConnectionFactory(factoryName); queueConnection = qcf.createConnection(); ses = queueConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); queue = ses.createQueue(queue2); logger.info("Subscribing to destination: " + queue2); msgConsumer = ses.createConsumer(queue); msgConsumer.setMessageListener(this); System.out.println("Listening on queue " + queue2); } catch (Exception e) { e.printStackTrace(); System.exit(-1); } } private static void setConnectionFactoryName(String name) { connectionFactoryName = name; } private static String getQueueName() { return queueName; } private static void setQueueName(String name) { queueName = name; }}
推荐答案
我只看了一下,发现您将错误的使用者传递给第二个线程:
I only took a brief look and I noticed that you pass the wrong consumer to your second thread: Thread newThread2 = new Thread(consumer1); // has to pass consumer2
此外,某些变量(例如ConnectionFactory)是静态的,并且已多次初始化/覆盖.您只需要一个可以创建多个会话和/或使用者的连接.
beside of this, some variables such as ConnectionFactory are static and initialized multiple times/overriden. You only need one connection that could create multiple sessions and/or consumers.
更多推荐
单个队列:与多个使用者进行并发消息处理
发布评论