Should HornetQ JMS code be written inside a POJO or SessionBean?
hushen.savani Jan 11, 2012 9:28 AMHi Community,
I have been using Hornetq-2.2.5 for my middleware application. Application is deployed on JBossAS-5.1.0-GA AppServer (Hornetq embedded in this JBossAS). To send messages in a queue using Hornetq (using JMS Implementation), initially I have used a POJO which starts connection, creates sessions and producers, then it sends the message to the hornetq queue. I.e.
POJO:
public class HornetQ { private Context _ctx = null; private Session _session = null; private Queue _queue = null; private Connection _conn = null; private MessageProducer _producer = null; private String strLogPreffix = "HornetQ"; private JMSQueueControl _queueControl = null; private MessageCounterInfo _messageCounter = null; private String resourcePath = null; private Hashtable configHT = null; private String JMX_URL = null; private ObjectName on = null; private JMXConnector connector = null; private MBeanServerConnection mbsc = null; private long lFinalCounter = 0; @Resource(mappedName = "java:JmsXA") private ConnectionFactory _cfConnFactory; public void startConnection(Context initialContecxt, String hornetqQueue) throws BusinessTaskException {//NamingException, JMSException { String strMethodName ="startConnection():: "; System.out.println(strLogPreffix + strMethodName + "HornetQ Queue Name:: " + hornetqQueue); try { this._ctx = initialContecxt; String queue = ""; if(hornetqQueue!=null) { //Set HornetQ Queue queue = "/queue/" + hornetqQueue; this._queue = (Queue)_ctx.lookup(queue); //Set Connection _cfConnFactory = (ConnectionFactory) _ctx.lookup("/ThroughputConnectionFactory"); this._conn = _cfConnFactory.createConnection(); //Set session this._session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); _producer = _session.createProducer(_queue); _producer.setDisableMessageID(true); _producer.setDisableMessageTimestamp(true); //Start Service if(_conn!=null) _conn.start(); else System.err.println("[ HornetQ:: startConnection() ]Error in Connection!"); } else { System.err.println("[HornetQ :: startConnection() Queue Name is not provided.]"); } } catch (Exception e) { e.printStackTrace(); } } public void stopConnection() throws BusinessTaskException { String strMethodName ="stopConnection():: "; try { _ctx.close(); _conn.close(); _session.close(); _producer.close(); }catch (NamingException e) { System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); }catch (JMSException e) { System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); } } public void enqueueEntity(Object objEntities) throws BusinessTaskException { String strMethodName ="enqueueEntity():: "; int iMessagePriority = 0; String strCurrentJob = null; if(resourcePath==null || configHT==null) { resourcePath = "messageserviceinterface_jndi"; configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath); } try { //System.out.println(strLogPreffix + strMethodName + "objEntities:: "+ objEntities); if(objEntities!=null) { MDSMessage mdsMessage = (MDSMessage)objEntities; strCurrentJob = mdsMessage.getCurrentJob(); if(strCurrentJob!=null) iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim())); ObjectMessage objMessage = null; objMessage = _session.createObjectMessage((Serializable) objEntities); _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0); } else { System.out.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!"); } }catch (JMSException e) { // TODO: handle exception System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); } } public void initMessageCounterSample() { resourcePath = "messageserviceinterface_jndi"; configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath); JMX_URL = (String)configHT.get("jmx_url"); try { on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName()); connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap()); mbsc = connector.getMBeanServerConnection(); _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false); String counters = _queueControl.listMessageCounter(); _messageCounter = MessageCounterInfo.fromJSON(counters); }catch (Exception e) { e.printStackTrace(); } } public long getMessageCount() throws Exception { String counters = null; counters = _queueControl.listMessageCounter(); _messageCounter = MessageCounterInfo.fromJSON(counters); lFinalCounter = lFinalCounter + _messageCounter.getCountDelta(); return lFinalCounter; } private void displayMessageCounter(MessageCounterInfo counter) { System.out.println("counter.getName():: " + counter.getName()); System.out.println("counter.getUdpateTimestamp():: " + counter.getUdpateTimestamp()); System.out.println("counter.getCount():: " + counter.getCount()); System.out.println("counter.getCountDelta():: " + counter.getCountDelta()); System.out.println("counter.getDepth():: " + counter.getDepth()); System.out.println("counter.getDepthDelta():: " + counter.getDepthDelta()); System.out.println("counter.getLastAddTimestamp():: " + counter.getLastAddTimestamp()); } public void resetMessageCounter() { try { _queueControl.resetMessageCounter(); String counters = _queueControl.listMessageCounter(); _messageCounter = MessageCounterInfo.fromJSON(counters); lFinalCounter = 0; System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); } catch (Exception e) { System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample() method!"); } } }
I used to call startConnection(), enqueueEntity() then stopConnection() methods in sequence when sending each message. But, application started crashing after heavy load generated on the same. Then I came to know that starting/closing connections/sessions/producers etc on each message are considered to be an anti-pattern if used inside a POJO.
But, according to this wiki article, I came to know that if JMS Code is running inside a JEE Application Server, then switching connections/sessions/producers on each message is not an anti-pattern. Hence, I moved to SessionBean. I.e.:
SessionBean:
@AspectDomain("HornetQSessionBean") @Stateless(name="HornetQSessionBean") @Local(HornetQLocal.class) @Remote(HornetQRemote.class) @LocalBinding(jndiBinding="HornetQSessionBean") @RemoteBinding(jndiBinding="HornetQSessionBeanRemote") @TransactionManagement(value= TransactionManagementType.CONTAINER) @TransactionAttribute(value= TransactionAttributeType.NOT_SUPPORTED) @ResourceAdapter("hornetq-ra.rar") public class HornetQSessionBean implements SessionBean, HornetQRemote, HornetQLocal { private Queue _queue = null; private String strLogPreffix = "HornetQ"; private JMSQueueControl _queueControl = null; private MessageCounterInfo _messageCounter = null; private String resourcePath = null; private Hashtable configHT = null; private String JMX_URL = null; private ObjectName on = null; private JMXConnector connector = null; private MBeanServerConnection mbsc = null; private long lFinalCounter = 0; @Resource(mappedName = "java:JmsXA") ConnectionFactory _cfConnFactory; @Resource(mappedName = "queue/mocmMasterQueue") Queue mocmMasterQueue; @Resource(mappedName = "queue/mocmProcessingQueue") Queue mocmProcessingQueue; @Resource(mappedName = "queue/mocmManagementQueue") Queue mocmManagementQueue; @Resource(mappedName = "queue/mocmTestQueue") Queue mocmTestQueue; @Resource(mappedName = "queue/mocmSuccessQueue") Queue mocmSuccessQueue; @Resource(mappedName = "queue/mocmFailureQueue") Queue mocmFailureQueue; @Resource(mappedName = "queue/mocmAuditQueue") Queue mocmAuditQueue; public void enqueueEntity(String hornetqQueue,Object objEntities) throws BusinessTaskException { String strMethodName ="enqueueEntity():: "; int iMessagePriority = 0; String strCurrentJob = null; Session _session = null; Connection _conn = null; MessageProducer _producer = null; if(resourcePath==null || configHT==null) { resourcePath = "messageserviceinterface_jndi"; configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath); } try { if(objEntities!=null) { populateQueue(hornetqQueue); _conn = _cfConnFactory.createConnection(); //Set session _session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); _producer = _session.createProducer(_queue); _producer.setDisableMessageID(true); _producer.setDisableMessageTimestamp(true); //Set Message Priority MDSMessage mdsMessage = (MDSMessage)objEntities; strCurrentJob = mdsMessage.getCurrentJob(); if(strCurrentJob!=null) iMessagePriority = Integer.parseInt((String)configHT.get(strCurrentJob.trim())); ObjectMessage objMessage = null; objMessage = _session.createObjectMessage((Serializable) objEntities); //Send Message _conn.start(); _producer.send(objMessage, DeliveryMode.NON_PERSISTENT, iMessagePriority, 0); } else { System.err.println(strLogPreffix + strMethodName + "-->>> Cannot Enqueue NULL Message!"); } }catch (JMSException e) { System.err.println("[ " + strLogPreffix + " ] "+ strMethodName+ e.getClass().getName() + e); e.printStackTrace(); } finally { try { _conn.close(); _session.close(); _producer.close(); _queue = null; } catch (JMSException e) { e.printStackTrace(); } } } public long getMessageCount(String hornetqQueue) throws Exception { String counters = _queueControl.listMessageCounter(); _messageCounter = MessageCounterInfo.fromJSON(counters); lFinalCounter = lFinalCounter + _messageCounter.getCountDelta(); return lFinalCounter; } public void resetMessageCounter(String hornetqQueue) { try { _queueControl.resetMessageCounter(); String counters = _queueControl.listMessageCounter(); _messageCounter = MessageCounterInfo.fromJSON(counters); lFinalCounter = 0; System.out.println("Counter for Queue " + _queue.getQueueName() + " has been reset!"); } catch (Exception e) { System.out.println("The Counter Sample cannot be reset until it is initialized. Hint: Call initMessageCounterSample(String hornetqQueue) method!"); } } private void populateQueue(String hornetqQueue) { if(hornetqQueue!=null) { if(hornetqQueue.equalsIgnoreCase("mocmMasterQueue")) { _queue = mocmMasterQueue; }else if(hornetqQueue.equalsIgnoreCase("mocmProcessingQueue")) { _queue = mocmProcessingQueue; } else if(hornetqQueue.equalsIgnoreCase("mocmManagementQueue")) { _queue = mocmManagementQueue; }else if(hornetqQueue.equalsIgnoreCase("mocmTestQueue")) { _queue = mocmTestQueue; }else if(hornetqQueue.equalsIgnoreCase("mocmSuccessQueue")) { _queue = mocmSuccessQueue; }else if(hornetqQueue.equalsIgnoreCase("mocmFailureQueue")) { _queue = mocmFailureQueue; }else if(hornetqQueue.equalsIgnoreCase("mocmAuditQueue")) { _queue = mocmAuditQueue; } } else System.err.println("Cannot Continue. Please Provide Correct Queue Name!"); } public void initMessageCounterSample(String hornetqQueue) { if(resourcePath == null || configHT == null || JMX_URL == null) { resourcePath = "messageserviceinterface_jndi"; configHT = JndiConfigUtil.getInstance().getJNDIHT(resourcePath); JMX_URL = (String)configHT.get("jmx_url"); } populateQueue(hornetqQueue); try { on = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(_queue.getQueueName()); connector = JMXConnectorFactory.connect(new JMXServiceURL(JMX_URL), new HashMap()); mbsc = connector.getMBeanServerConnection(); _queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc, on, JMSQueueControl.class, false); String counters = _queueControl.listMessageCounter(); _messageCounter = MessageCounterInfo.fromJSON(counters); }catch(Exception e) { e.printStackTrace(); } } public void ejbActivate() throws EJBException, RemoteException { // TODO Auto-generated method stub } public void ejbPassivate() throws EJBException, RemoteException { // TODO Auto-generated method stub } public void ejbRemove() throws EJBException, RemoteException { // TODO Auto-generated method stub } public void setSessionContext(SessionContext arg0) throws EJBException, RemoteException { // TODO Auto-generated method stub } }
I want to discuss, whether migrating to session bean will really help in this case?
Thanks.