如果bean部署在同一个应用程序服务器中,那么更多bean的线程?(A thread for more beans if beans are deployed in the same applicat

编程入门 行业动态 更新时间:2024-10-12 12:29:15
如果bean部署在同一个应用程序服务器中,那么更多bean的线程?(A thread for more beans if beans are deployed in the same application server?)

我正在尝试使用Java EE实现(用于练习)主动复制算法。

我遇到了意想不到的问题。

我有这个功能(伪代码):

public boolean doOperation() in the frontEnd EJB{ sendJMSMessageToTopic(); //publish new command to the replica Thread.sleep(200); readFromQueue(); //read Ack from replica control if write quorum is reached commit or abort return state (success or not) }

问题是:

调用回调onMessage()但在doOperation的同一调用中未收到响应。 当我再次调用doOperation时,我读取了之前调用doOperation方法的响应。

我使用glassfish,我在同一个应用程序服务器中部署所有组件。 我使用一个主题将请求发布到副本,并使用队列从副本接收响应。 FrontEnd是Singleton,Replicas是Message Drive Bean。

更新:我

我的副本代码

/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.ResponceMessage; import Service.Command; import Service.Product; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "TopicRepliche"), //@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "durable"), @ActivationConfigProperty(propertyName = "clientId", propertyValue = "TopicRepliche"), @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "TopicRepliche") }) public class Replica1 implements MessageListener { @Resource(mappedName = "QueueFrontEnd") private Queue queueFrontEnd; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final int COD_REPLICA=0; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long,Command> pending_command=new ConcurrentHashMap<>(); private long last_cmd_executed=0; private long last_cmd_received=0; private synchronized void setLastCmdExecuted(long last_cmd_executed){ this.last_cmd_executed=last_cmd_executed; } private synchronized void setLastCmdReceived(long last_cmd_received){ this.last_cmd_received=last_cmd_received; } public boolean containsKey(int key){ return item_map.containsKey(key); } public boolean removeKey(int key){ item_map.remove(key); return true; } public boolean put(Product b){ item_map.put(Integer.parseInt(b.getId()),b); return true; } public Product get(int key){ return item_map.get(key); } public void updatePrice(){ for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); System.out.println(price); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } public String generateJSONFromMap(){ String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; } public Replica1() { } @Override public void onMessage(Message message) { Command c; if (message instanceof ObjectMessage) { try { Object object = ((ObjectMessage) message).getObject(); c=(Command)object; int command_type=c.getType(); Object res=null; switch(command_type){ case R_CONTAINS_KEY: res=containsKey((int)c.getInput()); break; case W_REMOVE_KEY: res=removeKey((int)c.getInput()); break; case W_PUT: res=put((Product)c.getInput()); break; case R_GET: res=get((int)c.getInput()); break; case R_UPDATE_PRICE: updatePrice(); break; case R_GENERATE_JSON_FROM_MAP: res=generateJSONFromMap(); break; } System.out.println("FROM REPLICA, ACK FOR"+c.getSqnCommand()); this.setLastCmdReceived(c.getSqnCommand()); sendAckToQueueFrontEnd(c.getSqnCommand(),res); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } } private void sendJMSMessageToQueueFrontEnd(String messageData) { context.createProducer().send(queueFrontEnd, messageData); } private void sendAckToQueueFrontEnd(long sqn_command,Object responce) { try { ResponceMessage ack=new ResponceMessage(true,false,sqn_command,COD_REPLICA); ack.setResponce(responce); ObjectMessage objectMessage=context.createObjectMessage(); objectMessage.setObject(ack); context.createProducer().send(queueFrontEnd,objectMessage); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } }

前端代码:(在我的代码中使用sendNewCommand()映射doOperation)

/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import EJBExecutor.ReaderLocal; import Service.Command; import Service.Product; import Service.ResponceMessage; import java.util.ArrayList; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.Timeout; import javax.ejb.Timer; import javax.ejb.TimerConfig; import javax.ejb.TimerService; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.ObjectMessage; import javax.jms.Topic; /** * * @author melix */ @Stateless public class AuctionFrontEnd implements AuctionFrontEndLocal{ @EJB private FrontEndConsumer frontEndConsumer; @EJB private ReaderLocal reader; @Resource(mappedName = "TopicRepliche") private Topic topicRepliche; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; @Resource TimerService service; private boolean isConsumerInit=false; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private ArrayList<ConcurrentHashMap<Long,ResponceMessage>> listOfHashMap; private Queue<Command> pending_command = new LinkedList<Command>(); private final int WRITE_QUORUM=2; private final int READ_QUORUM=2; private final int NUM_REPLIC=3; private int last_key_inserted=0; private long num_command=0; public void initConsumer(){ frontEndConsumer.init(); } private synchronized void putCommandInQueue(Command c){ pending_command.add(c); } private synchronized Command removeCommandFromQueue(Command c){ if(!pending_command.isEmpty()) return pending_command.remove(); return null; } private synchronized void addAck(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica).put(num_command,m); } private synchronized void addResponce(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica+3).put(num_command,m); } @Override public synchronized void addAckToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } @Override public synchronized void addResponceToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } private synchronized long addNumCommand(){ this.num_command++; return num_command; } @Override public void mexReceived(String message){ System.out.println(message); } @Timeout public void handleTimeout(Timer timer) { if(!pending_command.isEmpty()){ Command c=pending_command.poll(); for(int i=0;i<NUM_REPLIC*2;i++){ if(listOfHashMap.get(i).containsKey(c.getSqnCommand())){ ResponceMessage m=listOfHashMap.get(i).get(c.getSqnCommand()); System.out.println("Ack per comando:"+m.getSqnCommand()+"from replica"+m.getCode_replica()); } } } timer.cancel(); } public void startTimer() { TimerConfig config = new TimerConfig(); config.setPersistent(false); Timer timer = service.createSingleActionTimer(200, config); } private ResponceMessage[] sendNewCommand(boolean isWriteOperation,int type,Object input){ int num_ack=0,i=0; Command c=new Command(isWriteOperation,addNumCommand(),type,input); ObjectMessage objectMessage=context.createObjectMessage(); try { objectMessage.setObject(c); } catch (JMSException ex) { Logger.getLogger(AuctionFrontEnd.class.getName()).log(Level.SEVERE, null, ex); } if(!isConsumerInit){ this.initConsumer(); isConsumerInit=true; } frontEndConsumer.cleanQueue(); sendJMSMessageToTopicRepliche(objectMessage); ResponceMessage[] m=new ResponceMessage[NUM_REPLIC]; ResponceMessage tmp = null; do{ tmp=frontEndConsumer.consume(); if(tmp!=null){ System.out.println("ACK CORRECT"+tmp.getSqnCommand()+";"+c.getSqnCommand()); if(tmp.getSqnCommand()==c.getSqnCommand()){ m[num_ack]=tmp; num_ack++; } } }while(tmp!=null); System.out.println("sono alla fine!"); if(isWriteOperation&&num_ack>=WRITE_QUORUM) return m; if(!isWriteOperation&&num_ack>=READ_QUORUM) return m; return null; } @Override public boolean containsKey(int key){ /*ResponceMessage[] m; m=sendNewCommand(true,R_CONTAINS_KEY,key); if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ return item_map.containsKey(key); } @Override public boolean removeKey(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,W_REMOVE_KEY,key); item_map.remove(key); return true; /*if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ } @Override public boolean put(Product b){ ResponceMessage[] m; m=sendNewCommand(true,W_PUT,b); item_map.put(Integer.parseInt(b.getId()),b); last_key_inserted=Integer.parseInt(b.getId()); if(m!=null){ //last_key_inserted=Integer.parseInt(b.getId()); //return (boolean)m[0].getResponce(); if((boolean)m[0].getResponce()) System.out.println("V_TRUE"); else System.out.println("FALSE"); }//else // return false; return true; } @Override public Product get(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GET,key); return item_map.get(key); /*if(m!=null) return (Product)m[0].getResponce(); else return null;*/ } @Override public int getLastKeyInserted(){ return last_key_inserted; } @Override public void updatePrice(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_UPDATE_PRICE,null); for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } @Override public String generateJSONFromMap(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GENERATE_JSON_FROM_MAP,null); String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; /* if(m!=null) return (String)m[0].getResponce(); else return null;*/ } private void sendJMSMessageToTopicRepliche(String messageData) { context.createProducer().send(topicRepliche, messageData); } private void sendJMSMessageToTopicRepliche(ObjectMessage messageData) { context.createProducer().send(topicRepliche, messageData); } public AuctionFrontEnd(){ final ConcurrentHashMap<Long,ResponceMessage> responce_list=new ConcurrentHashMap<>(); this.listOfHashMap = new ArrayList<>(); for(int i=0;i<2*NUM_REPLIC;i++){ this.listOfHashMap.add(new ConcurrentHashMap<Long,ResponceMessage>()); } } }

frontEndConsumer类:

/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.Command; import Service.ResponceMessage; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.Stateless; import javax.jms.ConnectionFactory; import javax.jms.JMSConsumer; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @Stateless public class FrontEndConsumer { @Resource(lookup = "jms/__defaultConnectionFactory") private ConnectionFactory connectionFactory; @Resource(lookup = "QueueFrontEnd") private Queue queueFrontEnd; private JMSContext context; private JMSConsumer consumer; public void init(){ context = connectionFactory.createContext(); consumer = context.createConsumer(queueFrontEnd); } public void cleanQueue(){ while(consumer.receive(1000)!=null){ System.out.println("CANCELLO"); } } public ResponceMessage consume(){ Message m = consumer.receive(1000); if (m != null) { if(m instanceof ObjectMessage){ try { Object object = ((ObjectMessage)m).getObject(); ResponceMessage mex=(ResponceMessage)object; System.out.println("RICEVO"); return (ResponceMessage)mex; } catch (JMSException ex) { Logger.getLogger(FrontEndConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } return null; } }

I am trying to implement ( for exercise ) , with Java EE , the algorithm of active replication.

I encountered an unexpected problem.

I have this function (pseudo-code):

public boolean doOperation() in the frontEnd EJB{ sendJMSMessageToTopic(); //publish new command to the replica Thread.sleep(200); readFromQueue(); //read Ack from replica control if write quorum is reached commit or abort return state (success or not) }

The problem is:

The callback onMessage() is called but the response is not received in the same invocation of the doOperation. When I call doOperation again I read the response of previous invocation of doOperation method.

I use glassfish, and i deploy all component in the same application server. I use a topic for publish the request to the replicas and a queue to receive the responce from the replicas. FrontEnd is Singleton and Replicas are Message Drive Bean.

UPDATE: I

My Replica Code

/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.ResponceMessage; import Service.Command; import Service.Product; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "TopicRepliche"), //@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "durable"), @ActivationConfigProperty(propertyName = "clientId", propertyValue = "TopicRepliche"), @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "TopicRepliche") }) public class Replica1 implements MessageListener { @Resource(mappedName = "QueueFrontEnd") private Queue queueFrontEnd; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final int COD_REPLICA=0; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long,Command> pending_command=new ConcurrentHashMap<>(); private long last_cmd_executed=0; private long last_cmd_received=0; private synchronized void setLastCmdExecuted(long last_cmd_executed){ this.last_cmd_executed=last_cmd_executed; } private synchronized void setLastCmdReceived(long last_cmd_received){ this.last_cmd_received=last_cmd_received; } public boolean containsKey(int key){ return item_map.containsKey(key); } public boolean removeKey(int key){ item_map.remove(key); return true; } public boolean put(Product b){ item_map.put(Integer.parseInt(b.getId()),b); return true; } public Product get(int key){ return item_map.get(key); } public void updatePrice(){ for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); System.out.println(price); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } public String generateJSONFromMap(){ String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; } public Replica1() { } @Override public void onMessage(Message message) { Command c; if (message instanceof ObjectMessage) { try { Object object = ((ObjectMessage) message).getObject(); c=(Command)object; int command_type=c.getType(); Object res=null; switch(command_type){ case R_CONTAINS_KEY: res=containsKey((int)c.getInput()); break; case W_REMOVE_KEY: res=removeKey((int)c.getInput()); break; case W_PUT: res=put((Product)c.getInput()); break; case R_GET: res=get((int)c.getInput()); break; case R_UPDATE_PRICE: updatePrice(); break; case R_GENERATE_JSON_FROM_MAP: res=generateJSONFromMap(); break; } System.out.println("FROM REPLICA, ACK FOR"+c.getSqnCommand()); this.setLastCmdReceived(c.getSqnCommand()); sendAckToQueueFrontEnd(c.getSqnCommand(),res); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } } private void sendJMSMessageToQueueFrontEnd(String messageData) { context.createProducer().send(queueFrontEnd, messageData); } private void sendAckToQueueFrontEnd(long sqn_command,Object responce) { try { ResponceMessage ack=new ResponceMessage(true,false,sqn_command,COD_REPLICA); ack.setResponce(responce); ObjectMessage objectMessage=context.createObjectMessage(); objectMessage.setObject(ack); context.createProducer().send(queueFrontEnd,objectMessage); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } }

The front end code: (doOperation is mapped with sendNewCommand() in my code)

/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import EJBExecutor.ReaderLocal; import Service.Command; import Service.Product; import Service.ResponceMessage; import java.util.ArrayList; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.Timeout; import javax.ejb.Timer; import javax.ejb.TimerConfig; import javax.ejb.TimerService; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.ObjectMessage; import javax.jms.Topic; /** * * @author melix */ @Stateless public class AuctionFrontEnd implements AuctionFrontEndLocal{ @EJB private FrontEndConsumer frontEndConsumer; @EJB private ReaderLocal reader; @Resource(mappedName = "TopicRepliche") private Topic topicRepliche; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; @Resource TimerService service; private boolean isConsumerInit=false; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private ArrayList<ConcurrentHashMap<Long,ResponceMessage>> listOfHashMap; private Queue<Command> pending_command = new LinkedList<Command>(); private final int WRITE_QUORUM=2; private final int READ_QUORUM=2; private final int NUM_REPLIC=3; private int last_key_inserted=0; private long num_command=0; public void initConsumer(){ frontEndConsumer.init(); } private synchronized void putCommandInQueue(Command c){ pending_command.add(c); } private synchronized Command removeCommandFromQueue(Command c){ if(!pending_command.isEmpty()) return pending_command.remove(); return null; } private synchronized void addAck(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica).put(num_command,m); } private synchronized void addResponce(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica+3).put(num_command,m); } @Override public synchronized void addAckToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } @Override public synchronized void addResponceToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } private synchronized long addNumCommand(){ this.num_command++; return num_command; } @Override public void mexReceived(String message){ System.out.println(message); } @Timeout public void handleTimeout(Timer timer) { if(!pending_command.isEmpty()){ Command c=pending_command.poll(); for(int i=0;i<NUM_REPLIC*2;i++){ if(listOfHashMap.get(i).containsKey(c.getSqnCommand())){ ResponceMessage m=listOfHashMap.get(i).get(c.getSqnCommand()); System.out.println("Ack per comando:"+m.getSqnCommand()+"from replica"+m.getCode_replica()); } } } timer.cancel(); } public void startTimer() { TimerConfig config = new TimerConfig(); config.setPersistent(false); Timer timer = service.createSingleActionTimer(200, config); } private ResponceMessage[] sendNewCommand(boolean isWriteOperation,int type,Object input){ int num_ack=0,i=0; Command c=new Command(isWriteOperation,addNumCommand(),type,input); ObjectMessage objectMessage=context.createObjectMessage(); try { objectMessage.setObject(c); } catch (JMSException ex) { Logger.getLogger(AuctionFrontEnd.class.getName()).log(Level.SEVERE, null, ex); } if(!isConsumerInit){ this.initConsumer(); isConsumerInit=true; } frontEndConsumer.cleanQueue(); sendJMSMessageToTopicRepliche(objectMessage); ResponceMessage[] m=new ResponceMessage[NUM_REPLIC]; ResponceMessage tmp = null; do{ tmp=frontEndConsumer.consume(); if(tmp!=null){ System.out.println("ACK CORRECT"+tmp.getSqnCommand()+";"+c.getSqnCommand()); if(tmp.getSqnCommand()==c.getSqnCommand()){ m[num_ack]=tmp; num_ack++; } } }while(tmp!=null); System.out.println("sono alla fine!"); if(isWriteOperation&&num_ack>=WRITE_QUORUM) return m; if(!isWriteOperation&&num_ack>=READ_QUORUM) return m; return null; } @Override public boolean containsKey(int key){ /*ResponceMessage[] m; m=sendNewCommand(true,R_CONTAINS_KEY,key); if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ return item_map.containsKey(key); } @Override public boolean removeKey(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,W_REMOVE_KEY,key); item_map.remove(key); return true; /*if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ } @Override public boolean put(Product b){ ResponceMessage[] m; m=sendNewCommand(true,W_PUT,b); item_map.put(Integer.parseInt(b.getId()),b); last_key_inserted=Integer.parseInt(b.getId()); if(m!=null){ //last_key_inserted=Integer.parseInt(b.getId()); //return (boolean)m[0].getResponce(); if((boolean)m[0].getResponce()) System.out.println("V_TRUE"); else System.out.println("FALSE"); }//else // return false; return true; } @Override public Product get(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GET,key); return item_map.get(key); /*if(m!=null) return (Product)m[0].getResponce(); else return null;*/ } @Override public int getLastKeyInserted(){ return last_key_inserted; } @Override public void updatePrice(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_UPDATE_PRICE,null); for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } @Override public String generateJSONFromMap(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GENERATE_JSON_FROM_MAP,null); String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; /* if(m!=null) return (String)m[0].getResponce(); else return null;*/ } private void sendJMSMessageToTopicRepliche(String messageData) { context.createProducer().send(topicRepliche, messageData); } private void sendJMSMessageToTopicRepliche(ObjectMessage messageData) { context.createProducer().send(topicRepliche, messageData); } public AuctionFrontEnd(){ final ConcurrentHashMap<Long,ResponceMessage> responce_list=new ConcurrentHashMap<>(); this.listOfHashMap = new ArrayList<>(); for(int i=0;i<2*NUM_REPLIC;i++){ this.listOfHashMap.add(new ConcurrentHashMap<Long,ResponceMessage>()); } } }

The frontEndConsumer class:

/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.Command; import Service.ResponceMessage; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.Stateless; import javax.jms.ConnectionFactory; import javax.jms.JMSConsumer; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @Stateless public class FrontEndConsumer { @Resource(lookup = "jms/__defaultConnectionFactory") private ConnectionFactory connectionFactory; @Resource(lookup = "QueueFrontEnd") private Queue queueFrontEnd; private JMSContext context; private JMSConsumer consumer; public void init(){ context = connectionFactory.createContext(); consumer = context.createConsumer(queueFrontEnd); } public void cleanQueue(){ while(consumer.receive(1000)!=null){ System.out.println("CANCELLO"); } } public ResponceMessage consume(){ Message m = consumer.receive(1000); if (m != null) { if(m instanceof ObjectMessage){ try { Object object = ((ObjectMessage)m).getObject(); ResponceMessage mex=(ResponceMessage)object; System.out.println("RICEVO"); return (ResponceMessage)mex; } catch (JMSException ex) { Logger.getLogger(FrontEndConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } return null; } }

最满意答案

onMessage()方法不需要等待doOperation()完​​成。 JMS API启用异步通信,并在JMS提供程序的线程上调用onMessage()。

The onMessage() method shouldn't need to wait for doOperation() to finish. The JMS API enables asynchronous communication and onMessage() is called on the JMS provider's thread.

更多推荐

本文发布于:2023-07-30 14:27:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1338851.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:线程   应用程序   器中   更多   在同一个

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!