4 Replies Latest reply: Jan 12, 2012 3:13 AM by Hushen Savani RSS

Should HornetQ JMS code be written inside a POJO or SessionBean?

Hushen Savani Newbie

Hi Community,

 

     I have been using Hornetq-2.2.5 for my middleware application. Application is deployed on JBossAS-5.1.0-GA AppServer (Hornetq embedded in this JBossAS).  To send messages in a queue using Hornetq (using JMS Implementation), initially I have used a POJO which starts connection, creates sessions and producers, then it sends the message to the hornetq queue. I.e.

 

POJO:

 

public class HornetQ {
  
          private Context _ctx = null;
          private Session _session = null;
          private Queue _queue = null;
          private Connection _conn = null;
          private MessageProducer _producer = null;
          private String strLogPreffix = "HornetQ";
          private JMSQueueControl _queueControl = null;
          private MessageCounterInfo _messageCounter = null;
          private String resourcePath = null;
          private Hashtable configHT =  null; 
          private String JMX_URL = null; 
          private ObjectName on = null;
          private JMXConnector connector = null;
          private MBeanServerConnection mbsc = null;
          private long lFinalCounter = 0; 
  
          @Resource(mappedName = "java:JmsXA")
          private ConnectionFactory _cfConnFactory;
  


          public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {//NamingException, JMSException { 
  
                    String strMethodName ="startConnection():: ";
  
                    System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue);
  
                    try {
                              this._ctx = initialContecxt;
  
                              String queue = "";
                              if(hornetqQueue!=null) {
                                        //Set HornetQ Queue
                                        queue = "/queue/" + hornetqQueue;
                                        this._queue = (Queue)_ctx.lookup(queue);
  
                                        //Set Connection
                                        _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory");
                                        this._conn = _cfConnFactory.createConnection();
  
                                        //Set session
                                        this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
                                        _producer = _session.createProducer(_queue); 
                            _producer.setDisableMessageID(true);
                            _producer.setDisableMessageTimestamp(true);
                            
                            //Start Service
                            if(_conn!=null)
                                                  _conn.start();
                                        else
                                                  System.err.println("[ HornetQ:: startConnection() ]Error in Connection!");                            
                                 
                              }
                              else {
                                        System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]");
                              } 
                    }
                    catch (Exception e) { 
                              e.printStackTrace();
                    }
          }


          public void stopConnection() throws BusinessTaskException {
                    String strMethodName ="stopConnection():: ";
  
                    try {
                              _ctx.close();
                  _conn.close();            
                  _session.close();
                  _producer.close();
        }catch (NamingException e) {
                  System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    }catch (JMSException e) {
                  System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    }
        
          }
  
          public void enqueueEntity(Object objEntities) throws BusinessTaskException {
       
                    String strMethodName ="enqueueEntity():: ";
  
                    int iMessagePriority = 0;
                    String strCurrentJob = null;
                    if(resourcePath==null || configHT==null) {
                              resourcePath = "messageserviceinterface_jndi";
                              configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                    }
                    try {
                              //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities);
                              if(objEntities!=null) {
                                        MDSMessage mdsMessage = (MDSMessage)objEntities;
                                        strCurrentJob = mdsMessage.getCurrentJob();
                                        if(strCurrentJob!=null)
                                                  iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
                                        ObjectMessage objMessage = null;
                            objMessage = _session.createObjectMessage((Serializable) objEntities);
                            _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);                           
                                                        
                              }
                              else {
                                        System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                              }
  
                    }catch (JMSException e) {
                              // TODO: handle exception
                              System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    }
          }
  
          public void initMessageCounterSample() {
                    resourcePath = "messageserviceinterface_jndi";
                    configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                    JMX_URL = (String)configHT.get("jmx_url"); 


                    try {
                              on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
                              connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
                  mbsc = connector.getMBeanServerConnection();
                  _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);                  
                  String counters = _queueControl.listMessageCounter();
                  _messageCounter = MessageCounterInfo.fromJSON(counters);   
    
                    }catch (Exception e) {
                              e.printStackTrace();
                    }
        
          }
  
          public long getMessageCount() throws Exception {
  
                    String counters = null;
  
                    counters = _queueControl.listMessageCounter();
                    _messageCounter = MessageCounterInfo.fromJSON(counters); 
  
                    lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
  
                    return lFinalCounter;
  
          }
  
  
   private void displayMessageCounter(MessageCounterInfo counter) {
      System.out.println("counter.getName():: " + counter.getName());
      System.out.println("counter.getUdpateTimestamp():: " + counter.getUdpateTimestamp());
      System.out.println("counter.getCount():: " + counter.getCount());
      System.out.println("counter.getCountDelta():: " + counter.getCountDelta());
      System.out.println("counter.getDepth():: " + counter.getDepth());
      System.out.println("counter.getDepthDelta():: " + counter.getDepthDelta());
      System.out.println("counter.getLastAddTimestamp():: " + counter.getLastAddTimestamp());
    } 
    
          public void resetMessageCounter() {
                    try { 
                              _queueControl.resetMessageCounter(); 
                              String counters = _queueControl.listMessageCounter(); 
                  _messageCounter = MessageCounterInfo.fromJSON(counters);
                  lFinalCounter = 0;
                              System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); 
                    } catch (Exception e) {
                              System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample() method!");
                    } 
          }
}

 

I used to call startConnection(), enqueueEntity() then stopConnection() methods in sequence when sending each message. But, application started crashing after heavy load generated on the same. Then I came to know that starting/closing connections/sessions/producers etc on each message are considered to be an anti-pattern if used inside a POJO.

 

But, according to this wiki article, I came to know that if JMS Code is running inside a JEE Application Server, then switching connections/sessions/producers on each message is not an anti-pattern. Hence, I moved to SessionBean. I.e.:

 

SessionBean:

 

@AspectDomain("HornetQSessionBean")
@Stateless(name="HornetQSessionBean")
@Local(HornetQLocal.class)
@Remote(HornetQRemote.class)
@LocalBinding(jndiBinding="HornetQSessionBean")
@RemoteBinding(jndiBinding="HornetQSessionBeanRemote")
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.NOT_SUPPORTED)
@ResourceAdapter("hornetq-ra.rar")
public class HornetQSessionBean implements SessionBean, HornetQRemote, HornetQLocal { 
  
          private Queue _queue = null;
          private String strLogPreffix = "HornetQ";
          private JMSQueueControl _queueControl = null;
          private MessageCounterInfo _messageCounter = null;
          private String resourcePath = null;
          private Hashtable configHT =  null; 
          private String JMX_URL = null; 
          private ObjectName on = null;
          private JMXConnector connector = null;
          private MBeanServerConnection mbsc = null;
          private long lFinalCounter = 0; 
  
          @Resource(mappedName = "java:JmsXA")
          ConnectionFactory _cfConnFactory; 
  
           @Resource(mappedName = "queue/mocmMasterQueue")
           Queue mocmMasterQueue; 
  
           @Resource(mappedName = "queue/mocmProcessingQueue")
           Queue mocmProcessingQueue; 
  
           @Resource(mappedName = "queue/mocmManagementQueue")
           Queue mocmManagementQueue; 
           
           @Resource(mappedName = "queue/mocmTestQueue")
           Queue mocmTestQueue;
           
           @Resource(mappedName = "queue/mocmSuccessQueue")
           Queue mocmSuccessQueue;
           
           @Resource(mappedName = "queue/mocmFailureQueue")
           Queue mocmFailureQueue;
           
           @Resource(mappedName = "queue/mocmAuditQueue")
           Queue mocmAuditQueue; 


          public void enqueueEntity(String hornetqQueue,Object objEntities) throws BusinessTaskException {
                 
                    String strMethodName ="enqueueEntity():: ";
  
                    int iMessagePriority = 0;
                    String strCurrentJob = null;
  
                    Session _session = null; 
                    Connection _conn = null;
                    MessageProducer _producer = null;
  
                    if(resourcePath==null || configHT==null) {
                              resourcePath = "messageserviceinterface_jndi";
                              configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                    }
  
                    try {
  
                              if(objEntities!=null) {
  
                                        populateQueue(hornetqQueue);
  
                                        _conn = _cfConnFactory.createConnection();
  
                                        //Set session
                                        _session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                            _producer = _session.createProducer(_queue); 
                            _producer.setDisableMessageID(true);
                            _producer.setDisableMessageTimestamp(true);                                      
                               
                            //Set Message Priority
                                        MDSMessage mdsMessage = (MDSMessage)objEntities;
                                        strCurrentJob = mdsMessage.getCurrentJob();
                                        if(strCurrentJob!=null)
                                                  iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim()));
                                        ObjectMessage objMessage = null;
                            objMessage = _session.createObjectMessage((Serializable) objEntities);
                            
                            //Send Message
                            _conn.start();
                            _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0);
                              }
                              else {
                                        System.err.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!");
                              }
  
                    }catch (JMSException e) {
                              System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e);
                              e.printStackTrace();
                    } finally
                {
                   try {
                      _conn.close();
                _session.close();
                      _producer.close();             
                      _queue = null;                      
                   }
            catch (JMSException e) {
                      e.printStackTrace();
            }            
         }               
          } 


          public long getMessageCount(String hornetqQueue) throws Exception {
  
                    String counters = _queueControl.listMessageCounter();
                    _messageCounter = MessageCounterInfo.fromJSON(counters); 
  
                    lFinalCounter = lFinalCounter + _messageCounter.getCountDelta();
  
                    return lFinalCounter;
  
          } 
  
          public void resetMessageCounter(String hornetqQueue) {
  
                    try {
                              _queueControl.resetMessageCounter(); 
                              String counters = _queueControl.listMessageCounter(); 
                  _messageCounter = MessageCounterInfo.fromJSON(counters);
                  lFinalCounter = 0;
                              System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); 
                    } catch (Exception e) {
                              System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample(String hornetqQueue) method!");
                    } 
          }
  
          private void populateQueue(String hornetqQueue) {
  
                    if(hornetqQueue!=null) { 
  
                              if(hornetqQueue.equalsIgnoreCase("mocmMasterQueue"))
                              {
                                        _queue = mocmMasterQueue;
                              }else if(hornetqQueue.equalsIgnoreCase("mocmProcessingQueue"))
                              {
                                        _queue = mocmProcessingQueue;
                              }
                              else if(hornetqQueue.equalsIgnoreCase("mocmManagementQueue"))
                              {
                                        _queue = mocmManagementQueue;
                              }else if(hornetqQueue.equalsIgnoreCase("mocmTestQueue"))
                              {
                                        _queue = mocmTestQueue;
                              }else if(hornetqQueue.equalsIgnoreCase("mocmSuccessQueue"))
                              {
                                        _queue = mocmSuccessQueue;
                              }else if(hornetqQueue.equalsIgnoreCase("mocmFailureQueue"))
                              {
                                        _queue = mocmFailureQueue;
                              }else if(hornetqQueue.equalsIgnoreCase("mocmAuditQueue"))
                              {
                                        _queue = mocmAuditQueue;
                              } 
                    }
                    else
                              System.err.println("Cannot Continue. Please Provide Correct Queue Name!");
          }
  
          public void initMessageCounterSample(String hornetqQueue) {
  
                    if(resourcePath == null || configHT == null || JMX_URL == null) { 
                              resourcePath = "messageserviceinterface_jndi";
                              configHT =  JndiConfigUtil.getInstance().getJNDIHT(resourcePath); 
                              JMX_URL = (String)configHT.get("jmx_url");
                    }
  
                    populateQueue(hornetqQueue); 
  
                    try {
                              on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName());
                              connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap());
                  mbsc = connector.getMBeanServerConnection();
                  _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false);
                  String counters = _queueControl.listMessageCounter();
                  _messageCounter = MessageCounterInfo.fromJSON(counters); 
                    }catch(Exception e) {
                              e.printStackTrace();
                    }       
          }
  
          public void ejbActivate() throws EJBException, RemoteException {
                    // TODO Auto-generated method stub
  
          }


          public void ejbPassivate() throws EJBException, RemoteException {
                    // TODO Auto-generated method stub
  
          }


          public void ejbRemove() throws EJBException, RemoteException {
                    // TODO Auto-generated method stub
  
          }


          public void setSessionContext(SessionContext arg0) throws EJBException,
                              RemoteException {
                    // TODO Auto-generated method stub
  
          }
}

 

I want to discuss, whether migrating to session bean will really help in this case?

 

Thanks.