11 Replies Latest reply on Oct 5, 2005 12:07 PM by adrian.brock

    Ordered delivery of redelivered messages

    fabiulin76

      I'm trying to make some changes in the JBoss 4.0.2 source code to obtain ordered delivery of redelivered messages and I would like your comment about the code I wrote.

      I made changes in the following classes: "org.jboss.ejb.plugins.jms.JMSContainerInvoker" and "org.jboss.jms.asf.STDServerSession". In particular:

      - JMSContainerInvoker launch in a separate thread an instance of StdServerSession (actually it does not use the StdServerSessionPool and the connectionConsumer...)
      - StdServerSession receive from the queue using the "receive()" method and, if an error occurs in the MDB, then it re-start receiving from the queue with selector "JMS_JBOSS_REDELIVERY_COUNT>0" to receive only the rollbacked message until the message is sent or goes to DLQ. After that the receiver continue to receive in the normal mode (without selector).

      Please see below for details (I made diff -u between the original jboss_4.0.2 source files and the modified ones)

      At the moment this is NOT a "JBoss-ready" patch because it's embedded for my application (for example it only works for queues, not for topic, and does not support message selectors...) but with your help I hope that we could make a patch...

      Also, the modified invoker does not use the ServerSessionPool and does not work with selectors, so I would like to have some suggestions on how to patch the actual JBoss code to add these changes.

        • 1. Re: Ordered delivery of redelivered messages
          fabiulin76

          The first diff:

          --- JMSContainerInvoker.java.jboss4 2005-08-30 16:26:39.117824100 +0200
          +++ JMSContainerInvoker.java 2005-08-30 15:30:47.268100900 +0200
          @@ -6,8 +6,7 @@
          * See terms of license at gnu.org. *
          * *
          ***************************************/
          -
          -package org.jboss.ejb.plugins.jms;
          +package it.imolinfo;

          import java.lang.reflect.Method;
          import java.security.AccessController;
          @@ -38,6 +37,7 @@
          import org.jboss.deployment.DeploymentException;
          import org.jboss.ejb.Container;
          import org.jboss.ejb.EJBProxyFactory;
          +import org.jboss.ejb.plugins.jms.DLQHandler;
          import org.jboss.invocation.Invocation;
          import org.jboss.invocation.InvocationType;
          import org.jboss.jms.ConnectionFactoryHelper;
          @@ -207,6 +207,8 @@

          protected boolean createJBossMQDestination = true;

          + private StdServerSession session;
          + private Thread sessionThread;
          /**
          * Set the invoker meta data so that the ProxyFactory can initialize
          * properly
          @@ -749,16 +751,16 @@
          jndiSuffix);

          // set up the server session pool
          - pool = createSessionPool(
          - topic,
          - tConnection,
          - minPoolSize,
          - maxPoolSize,
          - keepAlive,
          - true, // tx
          - acknowledgeMode,
          - new MessageListenerImpl(this));
          -
          +// pool = createSessionPool(
          +// topic,
          +// tConnection,
          +// minPoolSize,
          +// maxPoolSize,
          +// keepAlive,
          +// true, // tx
          +// acknowledgeMode,
          +// new MessageListenerImpl(this));
          +
          int subscriptionDurablity = config.getSubscriptionDurability();
          activationConfig = getActivationConfigProperty("subscriptionDurability");
          if (activationConfig != null)
          @@ -867,24 +869,32 @@
          jndiSuffix);

          // set up the server session pool
          - pool = createSessionPool(
          - queue,
          - qConnection,
          - minPoolSize,
          - maxPoolSize,
          - keepAlive,
          - true, // tx
          - acknowledgeMode,
          - new MessageListenerImpl(this));
          +// pool = createSessionPool(
          +// queue,
          +// qConnection,
          +// minPoolSize,
          +// maxPoolSize,
          +// keepAlive,
          +// true, // tx
          +// acknowledgeMode,
          +// new MessageListenerImpl(this));
          log.debug("Server session pool: " + pool);
          + try {
          + session = new StdServerSession(qConnection,
          + new MessageListenerImpl(this), true, queue, dlqHandler);
          + sessionThread = new Thread(session);
          + sessionThread.start();
          + } catch (Exception e) {
          + log.error("Errore: ", e);
          + }

          // create the connection consumer
          - connectionConsumer =
          - qConnection.createConnectionConsumer(queue,
          - messageSelector,
          - pool,
          - maxMessagesNr);
          - log.debug("Connection consumer: " + connectionConsumer);
          +// connectionConsumer =
          +// qConnection.createConnectionConsumer(queue,
          +// messageSelector,
          +// pool,
          +// maxMessagesNr);
          +// log.debug("Connection consumer: " + connectionConsumer);
          }
          catch (Throwable t)
          {
          @@ -1028,11 +1038,15 @@
          // clear the server session pool (if it is clearable)
          try
          {
          - if (pool instanceof StdServerSessionPool)
          - {
          - StdServerSessionPool p = (StdServerSessionPool) pool;
          - p.clear();
          - }
          +// if (pool instanceof StdServerSessionPool)
          +// {
          +// StdServerSessionPool p = (StdServerSessionPool) pool;
          +// p.clear();
          +// }
          +
          + session.close();
          + sessionThread.interrupt();
          +
          }
          catch (Throwable t)
          {
          @@ -1287,7 +1301,8 @@
          */
          protected void sendNotification(String event, Object userData)
          {
          - Notification notif = new Notification(event, getServiceName(), getNextNotificationSequenceNumber());
          + log.info("sendNotification(): event="+event+" serviceName="+getServiceName()+" sequenceNumber="+getNextNotificationSequenceNu
          mber());
          + Notification notif = new Notification(event, /*"fakeName"*/ getServiceName() , getNextNotificationSequenceNumber());
          notif.setUserData(userData);
          sendNotification(notif);
          }
          @@ -1332,7 +1347,7 @@
          * container invoker.
          */
          class MessageListenerImpl
          - implements MessageListener
          +// implements MessageListener
          {
          /**
          * The container invoker.
          @@ -1355,7 +1370,7 @@
          * Process a message.
          * @param message The message to process.
          */
          - public void onMessage(final Message message)
          + public boolean onMessage(final Message message) throws Exception
          {
          // assert message != null;
          if (log.isTraceEnabled())
          @@ -1364,6 +1379,7 @@
          }

          Object id;
          + boolean dlqHandled = false;
          try
          {
          id = message.getJMSMessageID();
          @@ -1378,15 +1394,15 @@
          try
          {
          Transaction tx = tm.getTransaction();
          -
          + dlqHandled = dlqHandler.handleRedeliveredMessage(message, tx);
          // DLQHandling
          if (useDLQ && // Is Dead Letter Queue used at all
          message.getJMSRedelivered() && // Was message resent
          - dlqHandler.handleRedeliveredMessage(message, tx)) //Did the DLQ handler take care of the message
          + dlqHandled) //Did the DLQ handler take care of the message
          {
          // Message will be placed on Dead Letter Queue,
          // if redelivered to many times
          - return;
          + return dlqHandled;
          }

          invoker.invoke(id, // Object id - where used?
          @@ -1400,7 +1416,9 @@
          catch (Exception e)
          {
          log.error("Exception in JMSCI message listener", e);
          + throw e;
          }
          + return dlqHandled;
          }
          }

          • 2. Re: Ordered delivery of redelivered messages
            fabiulin76

            And the other diff:

            D:\ws01\Invoker_JBoss4\src\it\imolinfo>diff -u StdServerSession.java.jboss4 StdServerSession.java
            --- StdServerSession.java.jboss4 2005-08-30 15:26:09.839177700 +0200
            +++ StdServerSession.java 2005-08-30 15:28:10.162193700 +0200
            @@ -4,7 +4,8 @@
            * Distributable under LGPL license.
            * See terms of license at gnu.org.
            */
            -package org.jboss.jms.asf;
            +// package org.jboss.jms.asf;
            +package it.imolinfo;

            import javax.jms.JMSException;
            import javax.jms.Message;
            @@ -23,6 +24,18 @@
            import org.jboss.tm.TransactionManagerService;
            import org.jboss.tm.XidFactoryMBean;

            +import it.imolinfo.JMSContainerInvoker.MessageListenerImpl;
            +import javax.jms.Destination;
            +import javax.jms.Queue;
            +import javax.jms.QueueConnection;
            +import javax.jms.QueueReceiver;
            +import javax.jms.QueueSession;
            +import javax.jms.TextMessage;
            +import javax.jms.XAQueueConnection;
            +import javax.jms.XAQueueSession;
            +import org.jboss.ejb.plugins.jms.DLQHandler;
            +import org.jboss.tm.XidImpl;
            +
            /**
            * An implementation of ServerSession.
            *
            @@ -33,8 +46,8 @@
            * @author Hiram Chirino .
            * @version $Revision: 1.3 $
            */
            -public class StdServerSession
            - implements Runnable, ServerSession, MessageListener
            +public class StdServerSession
            + implements Runnable, ServerSession
            {
            /**
            * Instance logger.
            @@ -44,7 +57,7 @@
            /**
            * The server session pool which we belong to.
            */
            - private StdServerSessionPool serverSessionPool;
            +// private ServerSessionPool serverSessionPool;

            /**
            * Our session resource.
            @@ -71,10 +84,18 @@
            /**
            * The listener to delegate calls, to. In our case the container invoker.
            */
            - private MessageListener delegateListener;
            + private MessageListenerImpl delegateListener;

            private XidFactoryMBean xidFactory;

            + private Queue queue;
            + private QueueConnection connection;
            + private DLQHandler dlqHandler;
            + private long globalIdNumber = 1;
            + private byte[] baseGlobalIdBytes;
            + private byte[] noBranchQualifier = new byte[1]; // len > 0, per the XA spec
            + private boolean closing = false;
            +
            /**
            * Create a StdServerSession .
            *
            @@ -82,39 +103,45 @@
            * @param session Our session resource.
            * @param xaSession Our XA session resource.
            * @param delegateListener Listener to call when messages arrives.
            - * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit)
            + * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit)
            * @throws JMSException Transation manager was not found.
            * @exception JMSException Description of Exception
            */
            - StdServerSession(final StdServerSessionPool pool,
            - final Session session,
            - final XASession xaSession,
            - final MessageListener delegateListener,
            - boolean useLocalTX,
            - final XidFactoryMBean xidFactory)
            + StdServerSession( final QueueConnection con,
            + final MessageListenerImpl delegateListener,
            + boolean useLocalTX,
            + final Destination destination,
            + final DLQHandler dlqHandler)
            throws JMSException
            {
            // assert pool != null
            // assert session != null

            - this.serverSessionPool = pool;
            - this.session = session;
            - this.xaSession = xaSession;
            +// this.serverSessionPool = pool;
            +// this.session = session;
            +// this.xaSession = xaSession;
            this.delegateListener = delegateListener;
            + this.dlqHandler = dlqHandler;
            + this.queue = (Queue)destination;
            + this.connection = con;
            + if (con instanceof XAQueueConnection)
            + {
            + this.xaSession = ((XAQueueConnection)con).createXAQueueSession();
            + this.session = ((XAQueueSession)xaSession).getQueueSession();
            + }
            + else
            + {
            + // should never happen really
            + log.error("Connection was not reconizable: " + con);
            + throw new JMSException("Connection was not reconizable: " + con);
            + }
            if( xaSession == null )
            useLocalTX = false;
            this.useLocalTX = useLocalTX;
            - this.xidFactory = xidFactory;

            if (log.isDebugEnabled())
            - log.debug("initializing (pool, session, xaSession, useLocalTX): " +
            - pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
            -
            - // Set out self as message listener
            - if (xaSession != null)
            - xaSession.setMessageListener(this);
            - else
            - session.setMessageListener(this);
            + log.debug("initializing (session, xaSession, useLocalTX): " +
            + ", " + session + ", " + xaSession + ", " + useLocalTX);

            InitialContext ctx = null;
            try
            @@ -178,8 +205,11 @@
            {
            if (xaSession != null)
            xaSession.run();
            - else
            - session.run();
            + else {
            + log.error("Error: You are not using XA transaction!");
            + session.run();
            + }
            + this.onMessage();
            }
            finally
            {
            @@ -205,13 +235,20 @@
            * leaving it this way since it keeps the code simpler and that case should
            * not be too common (JBossMQ provides XASessions).
            */
            - public void onMessage(Message msg)
            + public void onMessage()
            {
            + Message message = null;
            boolean trace = log.isTraceEnabled();
            + boolean isRedelivered = false;
            + boolean dlqHandled = false;
            + int id = Thread.currentThread().hashCode();
            + String selector = "JMS_JBOSS_REDELIVERY_COUNT>0";
            + QueueReceiver receiver;
            if( trace )
            - log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
            - ", " + session + ", " + xaSession + ", " + useLocalTX);
            + log.trace("onMessage running (session, xaSession, useLocalTX): " +
            + session + ", " + xaSession + ", " + useLocalTX);

            + while (!closing) {
            // Used if run with useLocalTX if true
            Xid localXid = null;
            boolean localRollbackFlag=false;
            @@ -222,8 +259,22 @@

            if (useLocalTX)
            {
            - // Use JBossMQ One Phase Commit to commit the TX
            - localXid = xidFactory.newXid();//new XidImpl();
            +// XidFactoryMBean xidFactoryObj = (XidFactoryMBean)getServer().getAttribute(xidFactory, "Instance");
            + // Use JBossMQ One Phase Commit to commit the TX
            + long localId = ++globalIdNumber;
            + String idString = Long.toString(localId);
            + String baseGlobalId = "localhost/";
            + baseGlobalIdBytes = baseGlobalId.getBytes();
            + int len = idString.length()+baseGlobalIdBytes.length;
            + byte[] globalId = new byte[len];
            + System.arraycopy(baseGlobalIdBytes, 0, globalId, 0, baseGlobalIdBytes.length);
            + // this method is deprecated, but does exactly what we need in a very fast way
            + // the default conversion from String.getBytes() is way too expensive
            + idString.getBytes(0, idString.length(), globalId, baseGlobalIdBytes.length);
            + localXid = new XidImpl(0x0101, globalId, noBranchQualifier, (int)localId, localId);
            +
            +// localXid = xidFactory.newXid();//new XidImpl();
            + log.info("[" + id +"] Xid: " + localXid);
            XAResource res = xaSession.getXAResource();
            res.start(localXid, XAResource.TMNOFLAGS);

            @@ -249,12 +300,63 @@
            }
            }
            //currentTransactionId = connection.spyXAResourceManager.startTx();
            -
            - // run the session
            - //session.run();
            - // Call delegate listener
            - delegateListener.onMessage(msg);
            - }
            + connection.start();
            +
            + if (!isRedelivered)
            + {
            + log.debug("[" + id +"] createReceiver(" + queue + ")...");
            + receiver = ((QueueSession) xaSession).createReceiver(queue);
            + } else {
            + log.debug("[" + id +"] createReceiver(" + queue +", "+ selector + ")...");
            + receiver = ((QueueSession) xaSession).createReceiver(queue,selector);
            + }
            + log.debug("[" + id +"] before receive()...");
            +
            + message = receiver.receive();
            +
            + log.debug("[" + id +"] after receive()...");
            +
            + if (message != null) {
            +
            + log.info( "[" + id +"] received messagge:" + message);
            +
            + try {
            + log.debug(" [" + id +"] onMessage(message)...");
            + dlqHandled = delegateListener.onMessage(message);
            +
            + if (dlqHandled) {
            + /**
            + * Messaggio inserito nella DLQ
            + */
            + /**
            + * Impostando isRedelivered=false l'invoker
            + * continua a scodare i msg successivi a quello inserito
            + * nella DLQ invece di fermarsi sulla receive con selettore.
            + * Questo Þ il comportamento tipico di un invoker con
            + * quality of service.
            + */
            + isRedelivered = false;
            +//TODO: Qui si potrebbe notificare all'invoker la necessitÓ di stoppare il delivery
            +// Per il momento l'invoker resta bloccato con il selettore redelivered a true
            +// Provare a settare closing = true; ....
            +
            + } else {
            + isRedelivered = false;
            + }
            + }
            + catch (Exception e) {
            + log.error("[" + id +"] gestione eccezione in atto...", e);
            + isRedelivered = true;
            + // Rilancio fuori l'eccezione per andare in rollback
            + throw e;
            + }
            + } else {
            + log.warn("[" + id +"] la receive() ha restituito un messaggio null.");
            + log.info("closing = " + closing);
            + //TODO: Probabilmente Þ stata chiusa la sessione da fuori devo uscire?
            +// undeploy = true;
            + }
            + }
            catch (Exception e)
            {
            log.error("session failed to run; setting rollback only", e);
            @@ -266,7 +368,7 @@
            }
            else
            {
            - // Mark for tollback TX via TM
            + // Mark for rollback TX via TM
            try
            {
            // The transaction will be rolledback in the finally
            @@ -285,6 +387,7 @@
            {
            try
            {
            + if (!closing) {
            if (useLocalTX)
            {
            if( localRollbackFlag == true )
            @@ -325,7 +428,7 @@
            // NO XASession? then manually rollback.
            // This is not so good but
            // it's the best we can do if we have no XASession.
            - if (xaSession == null && serverSessionPool.isTransacted())
            + if (xaSession == null /*&& serverSessionPool.isTransacted() by FC*/)
            {
            session.rollback();
            }
            @@ -342,12 +445,13 @@

            // NO XASession? then manually commit. This is not so good but
            // it's the best we can do if we have no XASession.
            - if (xaSession == null && serverSessionPool.isTransacted())
            + if (xaSession == null /*&& serverSessionPool.isTransacted() by FC*/)
            {
            session.commit();
            }
            }
            - }
            + }
            + }
            }
            catch (Exception e)
            {
            @@ -356,6 +460,7 @@
            }
            if( trace )
            log.trace("onMessage done");
            + }
            }

            /**
            @@ -369,13 +474,13 @@

            if (session != null)
            {
            - try
            - {
            - serverSessionPool.getExecutor().execute(this);
            - }
            - catch (InterruptedException ignore)
            - {
            - }
            +// try
            +// {
            +// serverSessionPool.getExecutor().execute(this);
            +// }
            +// catch (InterruptedException ignore)
            +// {
            +// }
            }
            else
            {
            @@ -388,6 +493,7 @@
            */
            void close()
            {
            + closing = true;
            if (session != null)
            {
            try
            @@ -422,7 +528,7 @@
            */
            void recycle()
            {
            - serverSessionPool.recycle(this);
            + this.close();
            }

            }

            • 3. Re: Ordered delivery of redelivered messages

              When I said *use* the developer forums, I didn't mean dump your code
              and ask for suggested fixes.

              We discuss requirements first before discussing implementation.
              We certainly don't fix your patch if you don't know how to get it to work yourself.

              As has been said many times before, there can be no guarantee of order
              in redelivery. The redelivery could be due to a transaction rollback, on a transaction
              the JMS and/or JBoss doesn't even control.

              You should also search the forum for previous discussions and look at the roadmap.
              e.g. http://jira.jboss.com/jira/browse/JBAS-1343
              and its associated discussion forum.
              Which would potentially allow you to set a read-ahead=0 at least for JBossMQ
              which is about as close as you are going get to subverting the session pool.

              The MDB *MUST* work with JMS providers, not just JBossMQ. Any use of
              JBossMQ (or other JMS features) must be configurable/optional.

              Finally, we are no longer developing (only bug fixing) the JMSContainerInvoker since
              most JMS providers (including JBoss itself) are moving to JCA inbound resource
              adapters.
              So any features should work in both scenarios where possible:
              e.g. read the discussion here: http://jira.jboss.com/jira/browse/JBAS-2047

              • 4. Re: Ordered delivery of redelivered messages

                 

                "fabiulin76" wrote:

                I made changes in the following classes: "org.jboss.ejb.plugins.jms.JMSContainerInvoker" and "org.jboss.jms.asf.STDServerSession". In particular:


                You are free to configure your own "JMSContainerInvoker" and/or contribute an
                alternative implementation.
                But we are not going to rewrite/break this code to try to implement a feature
                that is fundamentally unsupported by JMS semantics.

                • 5. Re: Ordered delivery of redelivered messages
                  fabiulin76

                  Ok, it was only for "diff" purpose.
                  In my tests I have implemented a custom "JMSContainerInvoker" and use the <invoker-proxy-bindings> informations inside jboss.xml file.

                  • 6. Re: Ordered delivery of redelivered messages
                    fabiulin76

                     

                    "adrian@jboss.org" wrote:
                    Which would potentially allow you to set a read-ahead=0 at least for JBossMQ which is about as close as you are going get to subverting the session pool.

                    Yes, even if this is not portable...

                    If I'm not wrong, JBossMQ already implements read-ahead of messages (default is MaxMessages=1 in the invoker-proxy-binding configuration) but you can't set read-ahead=0.

                    The proposed solution posted on http://www.mail-archive.com/jboss-user@lists.sourceforge.net/msg30909.html
                    (changing the order between receiving messages and waiting for ServerSession) seems interesting and could be discussed:

                    - the read-ahead behaviour should be parametric (true/false): in this way you only lose read-ahead when you are using QoS-SingletonMDB (i.e. MDB with ordered redelivery...).

                    - which are the consequences due to the order change?

                    • 7. Re: Ordered delivery of redelivered messages

                       

                      "fabiulin76" wrote:

                      If I'm not wrong, JBossMQ already implements read-ahead of messages (default is MaxMessages=1 in the invoker-proxy-binding configuration) but you can't set read-ahead=0.


                      No MaxMessages is about reducing concurrency.
                      i.e. How many messages should I wait for before invoking the MDB in a loop
                      on one thread.
                      I personally don't think this is very useful.


                      - the read-ahead behaviour should be parametric (true/false)


                      Parameterized, but not a boolean. It should be a number.
                      e.g. For a client that wants to maximise throughput you could send messages in
                      "read-ahead" size blocks from server to client.

                      messageConsumer.setMessageListener(xxx);
                      Internally:
                      il.receive(); // Will actually pull down multiple messages, passing them to the message
                      listener one-by-one before going back to the server to get the next block
                      


                      read-ahead = 0 is just a special case of this

                      • 8. Re: Ordered delivery of redelivered messages
                        fabiulin76

                        About read-ahead = 0.

                        I've made some tests and seems that changing the order between receiving messages and waiting for ServerSession grants ordered redelivery of messages.

                        There's only a problem related to the serverSession taken by the ConnecionConsumer (see the FIXME task in the StdServerSessionPool...).
                        In fact, if you try to do undeploy, StdServerSessionPool can't clear the serverSession because it's not in the pool. So JBoss can't undeploy the application.

                        I solved it simply by adding serverSession.start() inside the if(close){...} statement. It cause the serverSession to start and then suddenly recycle. This seems me a good way to bypass the problem and in fact now the undeploy is succesful.

                        • 9. Re: Ordered delivery of redelivered messages
                          fabiulin76

                          Any comment?

                          I would like to contribute with my (few) code and junit tests.

                          • 10. Re: Ordered delivery of redelivered messages
                            fabiulin76

                            There's another question about how to make read-ahead parametrized.

                            The Container Invoker could load the read-ahead value (0, 1, 2,...) from the proxy-factory-config, but how to pass this parameter to the connection consumer? We can't modify the connection consumer constructor because the signature is defined by the JMS spec.

                            • 11. Re: Ordered delivery of redelivered messages

                              For better control on the protocol, you should look at using the jms inbound resource adapter (warning it is still "experimental").
                              Related (recent) discussions:
                              http://www.jboss.org/index.html?module=bb&op=viewtopic&t=70306
                              http://www.jboss.org/index.html?module=bb&op=viewtopic&t=66978