我正在尝试使用Java EE实现(用于练习)主动复制算法。
我遇到了意想不到的问题。
我有这个功能(伪代码):
public boolean doOperation() in the frontEnd EJB{ sendJMSMessageToTopic(); //publish new command to the replica Thread.sleep(200); readFromQueue(); //read Ack from replica control if write quorum is reached commit or abort return state (success or not) }问题是:
调用回调onMessage()但在doOperation的同一调用中未收到响应。 当我再次调用doOperation时,我读取了之前调用doOperation方法的响应。
我使用glassfish,我在同一个应用程序服务器中部署所有组件。 我使用一个主题将请求发布到副本,并使用队列从副本接收响应。 FrontEnd是Singleton,Replicas是Message Drive Bean。
更新:我
我的副本代码
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.ResponceMessage; import Service.Command; import Service.Product; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "TopicRepliche"), //@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "durable"), @ActivationConfigProperty(propertyName = "clientId", propertyValue = "TopicRepliche"), @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "TopicRepliche") }) public class Replica1 implements MessageListener { @Resource(mappedName = "QueueFrontEnd") private Queue queueFrontEnd; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final int COD_REPLICA=0; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long,Command> pending_command=new ConcurrentHashMap<>(); private long last_cmd_executed=0; private long last_cmd_received=0; private synchronized void setLastCmdExecuted(long last_cmd_executed){ this.last_cmd_executed=last_cmd_executed; } private synchronized void setLastCmdReceived(long last_cmd_received){ this.last_cmd_received=last_cmd_received; } public boolean containsKey(int key){ return item_map.containsKey(key); } public boolean removeKey(int key){ item_map.remove(key); return true; } public boolean put(Product b){ item_map.put(Integer.parseInt(b.getId()),b); return true; } public Product get(int key){ return item_map.get(key); } public void updatePrice(){ for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); System.out.println(price); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } public String generateJSONFromMap(){ String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; } public Replica1() { } @Override public void onMessage(Message message) { Command c; if (message instanceof ObjectMessage) { try { Object object = ((ObjectMessage) message).getObject(); c=(Command)object; int command_type=c.getType(); Object res=null; switch(command_type){ case R_CONTAINS_KEY: res=containsKey((int)c.getInput()); break; case W_REMOVE_KEY: res=removeKey((int)c.getInput()); break; case W_PUT: res=put((Product)c.getInput()); break; case R_GET: res=get((int)c.getInput()); break; case R_UPDATE_PRICE: updatePrice(); break; case R_GENERATE_JSON_FROM_MAP: res=generateJSONFromMap(); break; } System.out.println("FROM REPLICA, ACK FOR"+c.getSqnCommand()); this.setLastCmdReceived(c.getSqnCommand()); sendAckToQueueFrontEnd(c.getSqnCommand(),res); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } } private void sendJMSMessageToQueueFrontEnd(String messageData) { context.createProducer().send(queueFrontEnd, messageData); } private void sendAckToQueueFrontEnd(long sqn_command,Object responce) { try { ResponceMessage ack=new ResponceMessage(true,false,sqn_command,COD_REPLICA); ack.setResponce(responce); ObjectMessage objectMessage=context.createObjectMessage(); objectMessage.setObject(ack); context.createProducer().send(queueFrontEnd,objectMessage); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } }前端代码:(在我的代码中使用sendNewCommand()映射doOperation)
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import EJBExecutor.ReaderLocal; import Service.Command; import Service.Product; import Service.ResponceMessage; import java.util.ArrayList; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.Timeout; import javax.ejb.Timer; import javax.ejb.TimerConfig; import javax.ejb.TimerService; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.ObjectMessage; import javax.jms.Topic; /** * * @author melix */ @Stateless public class AuctionFrontEnd implements AuctionFrontEndLocal{ @EJB private FrontEndConsumer frontEndConsumer; @EJB private ReaderLocal reader; @Resource(mappedName = "TopicRepliche") private Topic topicRepliche; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; @Resource TimerService service; private boolean isConsumerInit=false; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private ArrayList<ConcurrentHashMap<Long,ResponceMessage>> listOfHashMap; private Queue<Command> pending_command = new LinkedList<Command>(); private final int WRITE_QUORUM=2; private final int READ_QUORUM=2; private final int NUM_REPLIC=3; private int last_key_inserted=0; private long num_command=0; public void initConsumer(){ frontEndConsumer.init(); } private synchronized void putCommandInQueue(Command c){ pending_command.add(c); } private synchronized Command removeCommandFromQueue(Command c){ if(!pending_command.isEmpty()) return pending_command.remove(); return null; } private synchronized void addAck(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica).put(num_command,m); } private synchronized void addResponce(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica+3).put(num_command,m); } @Override public synchronized void addAckToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } @Override public synchronized void addResponceToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } private synchronized long addNumCommand(){ this.num_command++; return num_command; } @Override public void mexReceived(String message){ System.out.println(message); } @Timeout public void handleTimeout(Timer timer) { if(!pending_command.isEmpty()){ Command c=pending_command.poll(); for(int i=0;i<NUM_REPLIC*2;i++){ if(listOfHashMap.get(i).containsKey(c.getSqnCommand())){ ResponceMessage m=listOfHashMap.get(i).get(c.getSqnCommand()); System.out.println("Ack per comando:"+m.getSqnCommand()+"from replica"+m.getCode_replica()); } } } timer.cancel(); } public void startTimer() { TimerConfig config = new TimerConfig(); config.setPersistent(false); Timer timer = service.createSingleActionTimer(200, config); } private ResponceMessage[] sendNewCommand(boolean isWriteOperation,int type,Object input){ int num_ack=0,i=0; Command c=new Command(isWriteOperation,addNumCommand(),type,input); ObjectMessage objectMessage=context.createObjectMessage(); try { objectMessage.setObject(c); } catch (JMSException ex) { Logger.getLogger(AuctionFrontEnd.class.getName()).log(Level.SEVERE, null, ex); } if(!isConsumerInit){ this.initConsumer(); isConsumerInit=true; } frontEndConsumer.cleanQueue(); sendJMSMessageToTopicRepliche(objectMessage); ResponceMessage[] m=new ResponceMessage[NUM_REPLIC]; ResponceMessage tmp = null; do{ tmp=frontEndConsumer.consume(); if(tmp!=null){ System.out.println("ACK CORRECT"+tmp.getSqnCommand()+";"+c.getSqnCommand()); if(tmp.getSqnCommand()==c.getSqnCommand()){ m[num_ack]=tmp; num_ack++; } } }while(tmp!=null); System.out.println("sono alla fine!"); if(isWriteOperation&&num_ack>=WRITE_QUORUM) return m; if(!isWriteOperation&&num_ack>=READ_QUORUM) return m; return null; } @Override public boolean containsKey(int key){ /*ResponceMessage[] m; m=sendNewCommand(true,R_CONTAINS_KEY,key); if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ return item_map.containsKey(key); } @Override public boolean removeKey(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,W_REMOVE_KEY,key); item_map.remove(key); return true; /*if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ } @Override public boolean put(Product b){ ResponceMessage[] m; m=sendNewCommand(true,W_PUT,b); item_map.put(Integer.parseInt(b.getId()),b); last_key_inserted=Integer.parseInt(b.getId()); if(m!=null){ //last_key_inserted=Integer.parseInt(b.getId()); //return (boolean)m[0].getResponce(); if((boolean)m[0].getResponce()) System.out.println("V_TRUE"); else System.out.println("FALSE"); }//else // return false; return true; } @Override public Product get(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GET,key); return item_map.get(key); /*if(m!=null) return (Product)m[0].getResponce(); else return null;*/ } @Override public int getLastKeyInserted(){ return last_key_inserted; } @Override public void updatePrice(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_UPDATE_PRICE,null); for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } @Override public String generateJSONFromMap(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GENERATE_JSON_FROM_MAP,null); String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; /* if(m!=null) return (String)m[0].getResponce(); else return null;*/ } private void sendJMSMessageToTopicRepliche(String messageData) { context.createProducer().send(topicRepliche, messageData); } private void sendJMSMessageToTopicRepliche(ObjectMessage messageData) { context.createProducer().send(topicRepliche, messageData); } public AuctionFrontEnd(){ final ConcurrentHashMap<Long,ResponceMessage> responce_list=new ConcurrentHashMap<>(); this.listOfHashMap = new ArrayList<>(); for(int i=0;i<2*NUM_REPLIC;i++){ this.listOfHashMap.add(new ConcurrentHashMap<Long,ResponceMessage>()); } } }frontEndConsumer类:
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.Command; import Service.ResponceMessage; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.Stateless; import javax.jms.ConnectionFactory; import javax.jms.JMSConsumer; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @Stateless public class FrontEndConsumer { @Resource(lookup = "jms/__defaultConnectionFactory") private ConnectionFactory connectionFactory; @Resource(lookup = "QueueFrontEnd") private Queue queueFrontEnd; private JMSContext context; private JMSConsumer consumer; public void init(){ context = connectionFactory.createContext(); consumer = context.createConsumer(queueFrontEnd); } public void cleanQueue(){ while(consumer.receive(1000)!=null){ System.out.println("CANCELLO"); } } public ResponceMessage consume(){ Message m = consumer.receive(1000); if (m != null) { if(m instanceof ObjectMessage){ try { Object object = ((ObjectMessage)m).getObject(); ResponceMessage mex=(ResponceMessage)object; System.out.println("RICEVO"); return (ResponceMessage)mex; } catch (JMSException ex) { Logger.getLogger(FrontEndConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } return null; } }I am trying to implement ( for exercise ) , with Java EE , the algorithm of active replication.
I encountered an unexpected problem.
I have this function (pseudo-code):
public boolean doOperation() in the frontEnd EJB{ sendJMSMessageToTopic(); //publish new command to the replica Thread.sleep(200); readFromQueue(); //read Ack from replica control if write quorum is reached commit or abort return state (success or not) }The problem is:
The callback onMessage() is called but the response is not received in the same invocation of the doOperation. When I call doOperation again I read the response of previous invocation of doOperation method.
I use glassfish, and i deploy all component in the same application server. I use a topic for publish the request to the replicas and a queue to receive the responce from the replicas. FrontEnd is Singleton and Replicas are Message Drive Bean.
UPDATE: I
My Replica Code
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.ResponceMessage; import Service.Command; import Service.Product; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "TopicRepliche"), //@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "durable"), @ActivationConfigProperty(propertyName = "clientId", propertyValue = "TopicRepliche"), @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "TopicRepliche") }) public class Replica1 implements MessageListener { @Resource(mappedName = "QueueFrontEnd") private Queue queueFrontEnd; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final int COD_REPLICA=0; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long,Command> pending_command=new ConcurrentHashMap<>(); private long last_cmd_executed=0; private long last_cmd_received=0; private synchronized void setLastCmdExecuted(long last_cmd_executed){ this.last_cmd_executed=last_cmd_executed; } private synchronized void setLastCmdReceived(long last_cmd_received){ this.last_cmd_received=last_cmd_received; } public boolean containsKey(int key){ return item_map.containsKey(key); } public boolean removeKey(int key){ item_map.remove(key); return true; } public boolean put(Product b){ item_map.put(Integer.parseInt(b.getId()),b); return true; } public Product get(int key){ return item_map.get(key); } public void updatePrice(){ for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); System.out.println(price); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } public String generateJSONFromMap(){ String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; } public Replica1() { } @Override public void onMessage(Message message) { Command c; if (message instanceof ObjectMessage) { try { Object object = ((ObjectMessage) message).getObject(); c=(Command)object; int command_type=c.getType(); Object res=null; switch(command_type){ case R_CONTAINS_KEY: res=containsKey((int)c.getInput()); break; case W_REMOVE_KEY: res=removeKey((int)c.getInput()); break; case W_PUT: res=put((Product)c.getInput()); break; case R_GET: res=get((int)c.getInput()); break; case R_UPDATE_PRICE: updatePrice(); break; case R_GENERATE_JSON_FROM_MAP: res=generateJSONFromMap(); break; } System.out.println("FROM REPLICA, ACK FOR"+c.getSqnCommand()); this.setLastCmdReceived(c.getSqnCommand()); sendAckToQueueFrontEnd(c.getSqnCommand(),res); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } } private void sendJMSMessageToQueueFrontEnd(String messageData) { context.createProducer().send(queueFrontEnd, messageData); } private void sendAckToQueueFrontEnd(long sqn_command,Object responce) { try { ResponceMessage ack=new ResponceMessage(true,false,sqn_command,COD_REPLICA); ack.setResponce(responce); ObjectMessage objectMessage=context.createObjectMessage(); objectMessage.setObject(ack); context.createProducer().send(queueFrontEnd,objectMessage); } catch (JMSException ex) { Logger.getLogger(Replica1.class.getName()).log(Level.SEVERE, null, ex); } } }The front end code: (doOperation is mapped with sendNewCommand() in my code)
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import EJBExecutor.ReaderLocal; import Service.Command; import Service.Product; import Service.ResponceMessage; import java.util.ArrayList; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.EJB; import javax.ejb.Stateless; import javax.ejb.Timeout; import javax.ejb.Timer; import javax.ejb.TimerConfig; import javax.ejb.TimerService; import javax.inject.Inject; import javax.jms.JMSConnectionFactory; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.ObjectMessage; import javax.jms.Topic; /** * * @author melix */ @Stateless public class AuctionFrontEnd implements AuctionFrontEndLocal{ @EJB private FrontEndConsumer frontEndConsumer; @EJB private ReaderLocal reader; @Resource(mappedName = "TopicRepliche") private Topic topicRepliche; @Inject @JMSConnectionFactory("java:comp/DefaultJMSConnectionFactory") private JMSContext context; @Resource TimerService service; private boolean isConsumerInit=false; private final int R_CONTAINS_KEY=0; private final int W_REMOVE_KEY=1; private final int W_PUT=2; private final int R_GET=3; private final int R_UPDATE_PRICE=4; private final int R_GENERATE_JSON_FROM_MAP=5; private final ConcurrentHashMap<Integer,Product> item_map=new ConcurrentHashMap<>(); private ArrayList<ConcurrentHashMap<Long,ResponceMessage>> listOfHashMap; private Queue<Command> pending_command = new LinkedList<Command>(); private final int WRITE_QUORUM=2; private final int READ_QUORUM=2; private final int NUM_REPLIC=3; private int last_key_inserted=0; private long num_command=0; public void initConsumer(){ frontEndConsumer.init(); } private synchronized void putCommandInQueue(Command c){ pending_command.add(c); } private synchronized Command removeCommandFromQueue(Command c){ if(!pending_command.isEmpty()) return pending_command.remove(); return null; } private synchronized void addAck(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica).put(num_command,m); } private synchronized void addResponce(int cod_replica,long num_command,ResponceMessage m){ listOfHashMap.get(cod_replica+3).put(num_command,m); } @Override public synchronized void addAckToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } @Override public synchronized void addResponceToList(int cod_replica,long num_command,ResponceMessage m){ addAck(cod_replica,num_command,m); } private synchronized long addNumCommand(){ this.num_command++; return num_command; } @Override public void mexReceived(String message){ System.out.println(message); } @Timeout public void handleTimeout(Timer timer) { if(!pending_command.isEmpty()){ Command c=pending_command.poll(); for(int i=0;i<NUM_REPLIC*2;i++){ if(listOfHashMap.get(i).containsKey(c.getSqnCommand())){ ResponceMessage m=listOfHashMap.get(i).get(c.getSqnCommand()); System.out.println("Ack per comando:"+m.getSqnCommand()+"from replica"+m.getCode_replica()); } } } timer.cancel(); } public void startTimer() { TimerConfig config = new TimerConfig(); config.setPersistent(false); Timer timer = service.createSingleActionTimer(200, config); } private ResponceMessage[] sendNewCommand(boolean isWriteOperation,int type,Object input){ int num_ack=0,i=0; Command c=new Command(isWriteOperation,addNumCommand(),type,input); ObjectMessage objectMessage=context.createObjectMessage(); try { objectMessage.setObject(c); } catch (JMSException ex) { Logger.getLogger(AuctionFrontEnd.class.getName()).log(Level.SEVERE, null, ex); } if(!isConsumerInit){ this.initConsumer(); isConsumerInit=true; } frontEndConsumer.cleanQueue(); sendJMSMessageToTopicRepliche(objectMessage); ResponceMessage[] m=new ResponceMessage[NUM_REPLIC]; ResponceMessage tmp = null; do{ tmp=frontEndConsumer.consume(); if(tmp!=null){ System.out.println("ACK CORRECT"+tmp.getSqnCommand()+";"+c.getSqnCommand()); if(tmp.getSqnCommand()==c.getSqnCommand()){ m[num_ack]=tmp; num_ack++; } } }while(tmp!=null); System.out.println("sono alla fine!"); if(isWriteOperation&&num_ack>=WRITE_QUORUM) return m; if(!isWriteOperation&&num_ack>=READ_QUORUM) return m; return null; } @Override public boolean containsKey(int key){ /*ResponceMessage[] m; m=sendNewCommand(true,R_CONTAINS_KEY,key); if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ return item_map.containsKey(key); } @Override public boolean removeKey(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,W_REMOVE_KEY,key); item_map.remove(key); return true; /*if(m!=null) return (boolean)m[0].getResponce(); else return false;*/ } @Override public boolean put(Product b){ ResponceMessage[] m; m=sendNewCommand(true,W_PUT,b); item_map.put(Integer.parseInt(b.getId()),b); last_key_inserted=Integer.parseInt(b.getId()); if(m!=null){ //last_key_inserted=Integer.parseInt(b.getId()); //return (boolean)m[0].getResponce(); if((boolean)m[0].getResponce()) System.out.println("V_TRUE"); else System.out.println("FALSE"); }//else // return false; return true; } @Override public Product get(int key){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GET,key); return item_map.get(key); /*if(m!=null) return (Product)m[0].getResponce(); else return null;*/ } @Override public int getLastKeyInserted(){ return last_key_inserted; } @Override public void updatePrice(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_UPDATE_PRICE,null); for (Map.Entry pairs : item_map.entrySet()) { int key=(int)pairs.getKey(); Product p=(Product)pairs.getValue(); double price=p.getInitialPrice(); item_map.remove(key); if(price>=5) p.setInitialPrice(price-5); else p.setInitialPrice(0); item_map.put(key, p); } } @Override public String generateJSONFromMap(){ //ResponceMessage[] m; //m=sendNewCommand(true,R_GENERATE_JSON_FROM_MAP,null); String json="["; for (Map.Entry pairs : item_map.entrySet()) { Product p=(Product)pairs.getValue(); json+=p.toString(); json+=","; } if(json.endsWith(",")) json = json.substring(0,json.length() - 1); json+="]"; return json; /* if(m!=null) return (String)m[0].getResponce(); else return null;*/ } private void sendJMSMessageToTopicRepliche(String messageData) { context.createProducer().send(topicRepliche, messageData); } private void sendJMSMessageToTopicRepliche(ObjectMessage messageData) { context.createProducer().send(topicRepliche, messageData); } public AuctionFrontEnd(){ final ConcurrentHashMap<Long,ResponceMessage> responce_list=new ConcurrentHashMap<>(); this.listOfHashMap = new ArrayList<>(); for(int i=0;i<2*NUM_REPLIC;i++){ this.listOfHashMap.add(new ConcurrentHashMap<Long,ResponceMessage>()); } } }The frontEndConsumer class:
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package EJBData; import Service.Command; import Service.ResponceMessage; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.ejb.Stateless; import javax.jms.ConnectionFactory; import javax.jms.JMSConsumer; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Queue; /** * * @author melix */ @Stateless public class FrontEndConsumer { @Resource(lookup = "jms/__defaultConnectionFactory") private ConnectionFactory connectionFactory; @Resource(lookup = "QueueFrontEnd") private Queue queueFrontEnd; private JMSContext context; private JMSConsumer consumer; public void init(){ context = connectionFactory.createContext(); consumer = context.createConsumer(queueFrontEnd); } public void cleanQueue(){ while(consumer.receive(1000)!=null){ System.out.println("CANCELLO"); } } public ResponceMessage consume(){ Message m = consumer.receive(1000); if (m != null) { if(m instanceof ObjectMessage){ try { Object object = ((ObjectMessage)m).getObject(); ResponceMessage mex=(ResponceMessage)object; System.out.println("RICEVO"); return (ResponceMessage)mex; } catch (JMSException ex) { Logger.getLogger(FrontEndConsumer.class.getName()).log(Level.SEVERE, null, ex); } } } return null; } }最满意答案
onMessage()方法不需要等待doOperation()完成。 JMS API启用异步通信,并在JMS提供程序的线程上调用onMessage()。
The onMessage() method shouldn't need to wait for doOperation() to finish. The JMS API enables asynchronous communication and onMessage() is called on the JMS provider's thread.
更多推荐
发布评论