1 Reply Latest reply on May 16, 2013 2:21 AM by rfnel

    JBoss/EMS Connection Issue

    rfnel

      Hi All

       

      I'm trying to resolve a very strange issue.  My team recently migrated an old application from WebLogic 8.1 to JBoss 6.1 Final.  We use TIBCO EMS as our messaging provider, and we've set up a datasource as per the instructions specified on this page:  https://community.jboss.org/wiki/IntegrationWithTibcoEMS.  However, when we try to send messages to both a queue and a topic from an EJB (EJB2.1), the application seems to try and re-use whichever factory we looked up first, and throws an exception due to queue/topic specific methods being invoked on the wrong kind of session.  Strangely though, the issue only occurs on EJBs; if the exact same code is executed from a servlet, it runs without any error.

       

      I wrote some sample code to illustrate the issue.

       

      private void sendMessagesToQueue(String[] messages) {
              QueueConnectionFactory queueConnectionFactory = null;
              QueueConnection queueConnection = null;
              Queue queue = null;
              QueueSender queueSender = null;
              QueueSession queueSession = null;
      
              try {
                  InitialContext ctx = new InitialContext();
      
                  queueConnectionFactory = (QueueConnectionFactory) PortableRemoteObject.narrow(
                          ctx.lookup("TIBCOXAFactoryRef"), QueueConnectionFactory.class);
        
                  queueConnection = queueConnectionFactory.createQueueConnection();
      
      //            The below code, if uncommented, will work.            
      //            queueConnectionFactory = (QueueConnectionFactory) PortableRemoteObject.narrow(
      //            ctx.lookup("tibco/TibcoXAQueueConnectionFactory"), QueueConnectionFactory.class);
      
      
                  queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                  queue = (Queue) PortableRemoteObject.narrow(ctx.lookup("tibco/DemoQueue"), Queue.class);
                  queueSender = queueSession.createSender(queue);  //Works if sendMessagesToQueue() is called before sendMessagesToTopic().
                  queueConnection.start();
                  TextMessage theMessage;
                  for (String message : messages) {
                      theMessage = queueSession.createTextMessage(message);
                      queueSender.send(theMessage);
                  }
              } catch (javax.naming.NamingException ex) {
                  ex.printStackTrace();
              } catch (javax.jms.JMSException ex) {
                  ex.printStackTrace();
              } finally {
                  if (queueConnection != null) {
                      try {
                          queueConnection.close();
                      } catch (JMSException ex) {
                          ex.printStackTrace();
                      }
                  }
              }
          }
      
          private void sendMessagesToTopic(String[] messages) {
              TopicConnectionFactory topicConnectionFactory = null;
              TopicConnection topicConnection = null;
              Topic topic = null;
              TopicPublisher topicPublisher = null;
              TopicSession topicSession = null;
      
              try {
                  InitialContext ctx = new InitialContext();
        
                  topicConnectionFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(
                          ctx.lookup("TIBCOXAFactoryRef"), TopicConnectionFactory.class);
      
                  topicConnection = topicConnectionFactory.createTopicConnection();
      
      //            The below code, if uncommented, will work.
      //            topicConnectionFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(
      //            ctx.lookup("tibco/TibcoXATopicConnectionFactory"), TopicConnectionFactory.class);
      
                  topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
                  topic = (Topic) javax.rmi.PortableRemoteObject.narrow(ctx.lookup("tibco/DemoTopic"), Topic.class);
                  topicPublisher = topicSession.createPublisher(topic);  //Works if sendMessagesToTopic() is called before sendMessagesToQueue().
                  topicConnection.start();
                  TextMessage theMessage;
                  for (String message : messages) {
                      theMessage = topicSession.createTextMessage(message);
                      topicPublisher.send(theMessage);
                  }
              } catch (javax.naming.NamingException ex) {
                  ex.printStackTrace();
              } catch (javax.jms.JMSException ex) {
                  ex.printStackTrace();
              } finally {
                  if (topicConnection != null) {
                      try {
                          topicConnection.close();
                      } catch (JMSException ex) {
                          ex.printStackTrace();
                      }
                  }
              }
          }
      
      

       

      All of the JNDI lookups work (i.e. they don't throw any errors).  If either of the above methods are executed in isolation, they work.  However, if both of them are executed in the same EJB call, with sendMessagesToQueue() being called first, the following exception is thrown. 

       

      javax.jms.InvalidDestinationException: Attempting to use TopicSession methods on: org.jboss.resource.adapter.jms.JmsSession@5292f822
      10:25:38,949 ERROR [STDERR]      at org.jboss.resource.adapter.jms.JmsSession.getTopicSession(JmsSession.java:860)
      10:25:38,949 ERROR [STDERR]      at org.jboss.resource.adapter.jms.JmsSession.createPublisher(JmsSession.java:435)
      10:25:38,949 ERROR [STDERR]      at jms.demo.DemoEJBBean.sendMessagesToTopic(DemoEJBBean.java:107)
      10:25:38,949 ERROR [STDERR]      at jms.demo.DemoEJBBean.sendMessages(DemoEJBBean.java:46)
      10:25:38,950 ERROR [STDERR]      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      ...
      

       

      A similar exception occurs if sendMessagesToTopic() is called first, but then the message refers to queue methods being called on a topic connection.  I looked through JBoss code today, and it looks like JBoss creates connections, initializes them (setting only the fields relevant to the type of the connection; queue/topic) and caches them (for the active transaction).  Upon a subsequent lookup, it seems to return a cached connection, regardless of whether or not the connection is of the correct type.  I believe this might be due to the fact that the TIBCO connection seems to double as both a queue connection and a topic connection, and only the fields relevant to whichever type is created first are populated.  My notes about the specific JBoss classes that I looked at are at the office, but if anyone wants more information, I could probably provide more detail tomorrow.

       

      I've worked around the issue by updating my code to point directly to the TIBCO queue and topic connection factories in the TIBCO JNDI context, but I'm not convinced that it is an ideal solution.  It bypasses the TIBCO datasource that we've set up in JBoss.  Can anyone explain why JBoss does this, and whether or not there is a better way of accomplishing what I'm try to do?

       

      I'm not an expert or either EMS or JBoss, so please correct me if anything I say is complete nonsense.

       

      Thank you for your assistance.

       

      FYI, I also posted on JavaRanch a while back, but I haven't had any responses.

      http://www.coderanch.com/t/610849/EJB-JEE/java/JNDI-JBoss-TIBCO-EMS

       

      Cheers,

      rfnel

        • 1. Re: JBoss/EMS Connection Issue
          rfnel

          Good Day Everyone

           

          I believe that I've come up with a solution after going through the org.jboss.resource.adapter.jms.JmsManagedConnection source code (http://grepcode.com/file/repository.jboss.org/nexus/content/repositories/releases/org.jboss.jbossas/jboss-as-connector/6.0.0.20100429-M3/org/jboss/resource/adapter/jms/JmsManagedConnection.java#JmsManagedConnection). 

           

          It looks like a connection is initialized upon creation, with whichever fields are relevant to the type of the connection (either queue or topic).  The connection is then cached, and return by subsequent lookups from the same transaction.  My understanding of our TIBCO datasource is that it exposes a single provider that we can use to connect to both queues and topics; however, when it's initialized, it's initialized for whichever type is called first, because we explicitly ask for a queue connection or a topic connection.  After looking at the JBoss source, I realized that we can initailize a factory of type BOTH, which caters to both queue connections and topic connections.

           

          The code that I posted earlier is based on legacy code from an old application, but now that we've moved to JBoss, I think the code should be refactored to use generic connections.  I've updated the code in my sample application, and it seems to work.  See the new code below.

           

          private void sendMessagesToQueue(String[] messages) {
                  ConnectionFactory connectionFactory = null;
                  Connection connection = null;
                  Queue queue = null;
                  MessageProducer producer = null;
                  Session session = null;
          
                  try {
                      InitialContext ctx = new InitialContext();
          
                      connectionFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(
                              ctx.lookup("TIBCOXAFactoryRef"), ConnectionFactory.class);
          
                      connection = connectionFactory.createConnection();
          
                      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                      queue = (Queue) PortableRemoteObject.narrow(ctx.lookup("tibco/DemoQueue"), Queue.class);
                      producer = session.createProducer(queue);
                      connection.start();
                      TextMessage theMessage;
                      for (String message : messages) {
                          theMessage = session.createTextMessage(message);
                          producer.send(theMessage);
                      }
                  } catch (javax.naming.NamingException ex) {
                      ex.printStackTrace();
                  } catch (javax.jms.JMSException ex) {
                      ex.printStackTrace();
                  } finally {
                      if (connection != null) {
                          try {
                              connection.close();
                          } catch (JMSException ex) {
                              ex.printStackTrace();
                          }
                      }
                  }
              }
          
              private void sendMessagesToTopic(String[] messages) {
                  ConnectionFactory connectionFactory = null;
                  Connection connection = null;
                  Topic topic = null;
                  MessageProducer producer = null;
                  Session session = null;
          
                  try {
                      InitialContext ctx = new InitialContext();
          
                      connectionFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(
                              ctx.lookup("TIBCOXAFactoryRef"), ConnectionFactory.class);
          
                      connection = connectionFactory.createConnection();
          
                      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                      topic = (Topic) javax.rmi.PortableRemoteObject.narrow(ctx.lookup("tibco/DemoTopic"), Topic.class);
                      producer = session.createProducer(topic);
                      connection.start();
                      TextMessage theMessage;
                      for (String message : messages) {
                          theMessage = session.createTextMessage(message);
                          producer.send(theMessage);
                      }
                  } catch (javax.naming.NamingException ex) {
                      ex.printStackTrace();
                  } catch (javax.jms.JMSException ex) {
                      ex.printStackTrace();
                  } finally {
                      if (connection != null) {
                          try {
                              connection.close();
                          } catch (JMSException ex) {
                              ex.printStackTrace();
                          }
                      }
                  }
              }