1 2 Previous Next 19 Replies Latest reply on Sep 30, 2010 11:38 AM by timfox

    How to guarantee delivery order to HornetQ queue

    bjs

      Hi,

       

      I've got a simple test app that sends persistent messages onto a queue (batching them with JMS transactions for speed), with another thread/session reading them off the same queue (using client_acknowledge). The messages are numbered so I can check for loss or redelivery. The other JMS providers I have run my app against have always delivered the messages in the same order I put them onto the queue, but (although it's gloriously fast) HornetQ is continually reordering groups of messages. I *believe* though I could be wrong that this violates the JMS spec?

       

      Anyway, how can I ensure the order messages come off the q is the same as the order they were sent?

       

      I tried adding a single fixed group id <group-id>BenGroup</group-id> to my connection factory, and I think that slightly decreased the number of out-of-order messages, but there are still plenty.

       

      Many thanks for your help

      Ben

       

      PS I'm using the latest version,  HornetQ 2.1.1 final

        • 1. Re: How to guarantee delivery order to HornetQ queue
          ataylor

          yes the JMS spec mandates that messages delivered to a consumer are delivered in the order they were received  on the queue. There are caveats for this if you have multiple consumers or if message deliveries are cancelled. Maybe your test is doing one of these, if you posted your code we would have a better idea of whats going on

          • 2. Re: How to guarantee delivery order to HornetQ queue
            timfox

            +1, would need to see code here

            • 3. Re: How to guarantee delivery order to HornetQ queue
              bjs

              yes the JMS spec mandates that messages delivered to a consumer are delivered in the order they were received  on the queue. There are caveats for this if you have multiple consumers or if message deliveries are cancelled. Maybe your test is doing one of these, if you posted your code we would have a better idea of whats going on

              Thanks guys - my initial impression from the doc and my experiments was that serial message ordering might not even be intended unless message groups are used, but I'm relieved to hear that's not the case. Presumably it's safe to assume that if there are multiple consumers, then the order each consumer sees messages will be the same as the order they were added to the queue, though obviously each message will only be taken off the queue by one consumer and there are no ordering guarantees across the consumers. I'm aware all ordering guarantees disappear if there's a broker failure.

               

              However, my test app doesn't have multiple consumers or any client/broker failures, yet I'm still seeing messages arrive out of order, even within the first 20 seconds or so of running the app.

               

              I'd appreciate it if you could take a look at this and help me figure out what's going on; doubtless you have robustness tests that check for this kind of bug, yet my code is pretty simple and I can't see any reason for it to behave this way.

               

              FYI I'm running 64-bit Windows 7, a default-configured standalone server (except I set connection-ttl to 20000 not 60000) and 64bit Java 1.6.0_18-b07.
              Here's an example of the errors I'm logging after the first ~7,000 messages were sent and received correctly (the important bit is the id numbers at the end of the messages):
              Wed Jul 07 15:27:11 BST 2010: Committing send batch of 2,527 messages after 51 ms (status: outOfOrderErrors=0, tx=15,557, rx=7,474, outstanding=8,083)
              Wed Jul 07 15:27:12 BST 2010: ERROR: Expected message "client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 11,940" but got earlier or later message "sent at Wed Jul 07 15:27:11 BST 2010, client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 12,819"
              Wed Jul 07 15:27:12 BST 2010: ERROR: Expected message "client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 12,942" but got earlier or later message "sent at Wed Jul 07 15:27:11 BST 2010, client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 11,940"
              Wed Jul 07 15:27:13 BST 2010: ERROR: Expected message "client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 12,819" but got earlier or later message "sent at Wed Jul 07 15:27:11 BST 2010, client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 12,942"
              Wed Jul 07 15:27:13 BST 2010: ERROR: Expected message "client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 15,558" but got earlier or later message "sent at Wed Jul 07 15:27:11 BST 2010, client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 18,077"
              Wed Jul 07 15:27:14 BST 2010: ERROR: Expected message "client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 18,279" but got earlier or later message "sent at Wed Jul 07 15:27:11 BST 2010, client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 15,558"
              Wed Jul 07 15:27:14 BST 2010: ERROR: Expected message "client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 18,077" but got earlier or later message "sent at Wed Jul 07 15:27:12 BST 2010, client start time Wed Jul 07 15:26:45 BST 2010, TestClient message 18,279"

              I've spent some time today simplifying my sample app so I have something fairly short I can post here; I added a sleep statement in my ReceiveThread to reduce the time it takes for the problem to show up:

               

               

              {code}

              import java.io.PrintWriter; import java.io.StringWriter; import java.text.MessageFormat; import java.util.Date; import java.util.Hashtable; import java.util.UUID; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.InitialContext; /** Create a send session and a receive session, send messages with ascending sequence numbers onto a queue and receive from the * same queue, checking the numbers to detect message loss/redelivery/out-of-order. */ public class SimpleSendReceiveClient {      public static final String QUEUE_NAME = "ExampleQueue";      public static final String BROKER = "jnp://localhost:1099";      static final Date START_TIME = new Date();           public static void main(String[] args) throws Exception {           flushOldMessages();           new ReceiveThread().start();           new SendThread().start();      }      static volatile int sentId = 0;      static volatile int receivedId = 0;      static volatile int outOfOrderErrors = 0;      static class SendThread extends Thread      {           @Override           public void run() {                setName("SendThread");                try                {                     Connection connection = createConnection(); // TODO: we obviously need to close connections in a finally block in the real version                     connection.start();                                         // transact and batch to avoid very slow round trip time delay on each send                     Session s = connection.createSession(true, Session.SESSION_TRANSACTED);                                         MessageProducer p = s.createProducer(s.createQueue(QUEUE_NAME));                     p.setDeliveryMode(DeliveryMode.PERSISTENT);                                                             long lastBatchTime = System.currentTimeMillis();                     int batchSize = 0;                     while(true)                     {                          sentId++;                          String t = MessageFormat.format("sent at {0}, client start time {1}, TestClient message {2}", new Date().toString(), START_TIME.toString(), sentId);                          TextMessage m = s.createTextMessage(t);                          p.send(m);                          batchSize++;                                                   if (sentId == 1 || System.currentTimeMillis()-lastBatchTime > 50)                          {                               log("Committing send batch of {0} messages after {1} ms (status: outOfOrderErrors={2}, tx={3}, rx={4}, outstanding={5})",                                         batchSize, System.currentTimeMillis()-lastBatchTime, outOfOrderErrors, sentId, receivedId, sentId-receivedId);                               lastBatchTime = System.currentTimeMillis();                               s.commit();                               batchSize = 0;                          }                          // wait for the first message to be received before we continue sending                          while (receivedId == 0)                          {                               Thread.sleep(50);                          }                     }                               } catch (Exception e)                {                     log("SendThread error: ", e);                     SimpleSendReceiveClient.sleep(20000); // wait to see if any other errors surface in the meantime                     System.exit(3);                }           }      }           static class ReceiveThread extends Thread      {           @Override           public void run() {                setName("ReceiveThread");                try                {                     Connection connection = createConnection();                     connection.start();                                         // start a new session for the main receive loop                     Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);                     MessageConsumer c = s.createConsumer(s.createQueue(QUEUE_NAME));                                         long lastAcknowledgeTime = System.currentTimeMillis();                     Message lastMessage = null;                     int batch = 0;                     while(true)                     {                          // it seems we have to specify a timeout here or HornetQ sometimes randomly starves itself of acknowledgements and receive() never returns                          TextMessage m = (TextMessage)c.receive(5000);                          if (m != null)                          {                               if (receivedId < 10 || receivedId % 100000==0)                                    log("Received message: "+m.getText());                                                             lastMessage = m;                               batch++;                               String expectedMessage = MessageFormat.format("client start time {0}, TestClient message {1}", START_TIME.toString(), (receivedId+1));                               if (!m.getText().endsWith(expectedMessage))                               {                                    outOfOrderErrors++;                                    log("ERROR: Expected message \"{0}\" but got earlier or later message \"{1}\"", expectedMessage, m.getText());                                    //System.exit(100);                                    receivedId = Integer.parseInt(m.getText().substring(m.getText().lastIndexOf(' ')+1).replace(",", ""));                               }                               else                                    receivedId++;                          }                                                   if (lastMessage != null && System.currentTimeMillis()-lastAcknowledgeTime > 50)                          {                               log("Acknowledging {0} message(s) after {1} ms", batch, System.currentTimeMillis()-lastAcknowledgeTime);                               lastMessage.acknowledge();                               lastMessage = null;                               batch = 0;                                                             // for some reason, adding a sleep in here makes the out-of-order messages come much more quickly                               Thread.sleep(1000);                                                             lastAcknowledgeTime = System.currentTimeMillis();                          }                     }                               } catch (Exception e)                {                     log("ReceiveThread error: ", e);                     SimpleSendReceiveClient.sleep(20000); // wait to see if any other errors surface in the meantime                     System.exit(2);                }           }      }      private static void flushOldMessages() throws Exception      {           Connection connection = createConnection();           try           {                // start by flushing out any old messages from the queues by adding a chaser event                log("Flushing old messages...");                connection.start();                Session s = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);                UUID uuid = UUID.randomUUID();                MessageProducer p = s.createProducer(null);                p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);                p.send(s.createQueue(QUEUE_NAME), s.createTextMessage("Flush: "+uuid));                               int flushed = 0;                               for (Destination d: new Destination[]{s.createQueue(QUEUE_NAME)})                {                     MessageConsumer c = s.createConsumer(d);                     Message ackMessage = null;                     while (true)                     {                          Message m = c.receive(50);                          if (m != null)                               ackMessage = m;                                                   if (m instanceof TextMessage)                               if (((TextMessage)m).getText().equals("Flush: "+uuid))                                    break;                          if (m != null)                          {                               flushed++;                               if (flushed % 100 == 0)                                    log("Flushed {0} old message(s) so far...", flushed);                          }                     }                     if (ackMessage != null)                          ackMessage.acknowledge();                     c.close();                }                               s.close();                log("Flushed {0} old message(s).", flushed);               } finally {                closeConnection(connection);           }      }           public static Connection createConnection() throws Exception      {           Connection result;           Hashtable<Object,Object> env = new Hashtable<Object, Object>();           env.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");           env.put("java.naming.provider.url", BROKER);           env.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");           InitialContext initialContext = new InitialContext(env);           result = ((ConnectionFactory)initialContext.lookup("/ConnectionFactory")).createConnection();           initialContext.close();           final String threadName = Thread.currentThread().getName();           result.setExceptionListener(new ExceptionListener() {                @Override                public void onException(JMSException exception) {                     log("JMS Exception listener \""+threadName+"\": ", exception);                }           });           return result;      }           private static void sleep(long millis)      {           try           {                Thread.sleep(millis);           } catch (InterruptedException e) {                log("Interrupted during sleep, ignoring");           }      }           private static void closeConnection(Connection c)      {           if (c == null)                return;           try           {                c.close();           } catch (Exception e) {                log("Error closing connection: ", e);           }      }      private static void log(String msg, Exception e)      {           log(msg+stackTraceFromException(e));      }           private static void log(String msg, Object... params)      {           if (params != null && params.length > 0)                msg = MessageFormat.format(msg, params);           System.out.println(new Date()+": "+msg);           System.out.flush();           System.err.flush();      }           private static String stackTraceFromException(Throwable t) {           if (t == null)                return "<null Throwable>";           StringWriter sw=new StringWriter();           t.printStackTrace(new PrintWriter(sw));           return sw.toString().replace("\n\r", "\n");      } }

              {code}

               

              Thanks for your help!

              • 4. Re: How to guarantee delivery order to HornetQ queue
                bjs

                FYI I've just confirmed that the same thing happens running the broker and client on 64-bit Red Hat (Enterprise Linux AS release 4)

                • 5. Re: How to guarantee delivery order to HornetQ queue
                  timfox

                  Does this only happen when you're sending with transactions?

                  1 of 1 people found this helpful
                  • 6. Re: How to guarantee delivery order to HornetQ queue
                    bjs

                    I haven't had time to do much testing without sending in transactions... but so far I haven't been able to reproduce the problem if I don't use them.

                     

                    Of course, it's massively slower sending persistent messages if you don't batch them with transactions, so the timings will be very different... which could reduce the probability of noticing the reordering problem, even if it's still possible for it to occur.

                     

                    I've seen this behaviour on two separate machines/platforms now, reproducing very easily within a few seconds. I'd be interested to know if you/anyone else can see the same thing?

                     

                    Thanks for the help!

                    • 7. Re: How to guarantee delivery order to HornetQ queue
                      timfox

                      Ben Spiller wrote:

                       

                      I haven't had time to do much testing without sending in transactions... but so far I haven't been able to reproduce the problem if I don't use them.

                       

                      Of course, it's massively slower sending persistent messages if you don't batch them with transactions, so the timings will be very different...

                      Only if you have block-on-persistent-send set to true.

                      • 8. Re: How to guarantee delivery order to HornetQ queue
                        bjs

                        True; so far (after a very few mins testing) it looks as though you're right, and the problem doesn't occur unless I use transactions (I'm still sending persistent messages, but setting block-on-durable-send=false as you suggested).

                         

                        It's not a great solution for me, however, as I need to know for sure when messages have been sent in order to guarantee at-least-once sending, and as I'm writing an adapter to support integration with any JMS provider I must stick to standard JMS so can't use the HornetQ asynchronous send notification feature (cool as it is!). And once I get past this issue I'm also planning to use XA transactions for situations where exactly-once reliability is needed - and I guess it's possible XA transactions could exhibit the same problem.

                         

                        Does this look like a genuine bug to you? Shall I file it in JIRA?

                         

                        In the project I'm working on I've tried a couple of other JMS providers so far, and come across lots of horrendous robustness bugs . I'd be so happy if I could at least get a working solution with HornetQ (which has great features and is blazingly fast)...

                         

                        Do you happen to know of any changes in recent versions of HornetQ that might have introduced a bug like this - maybe there's an older version I could try that might not have this bug?

                         

                        Cheers!

                        Ben

                        • 9. Re: How to guarantee delivery order to HornetQ queue
                          ataylor

                          I'll take a look at this today and confirm if it is a bug

                          • 10. Re: How to guarantee delivery order to HornetQ queue
                            bjs

                            Hi Andy, FYI it looks to me like there's a similar reordering problem using XA transactions instead of JMS session transactions.

                             

                            Did you have a chance to take a look at what might be going on?

                             

                            I wonder if there's an earlier version of HornetQ which might not suffer from this problem (has anything major changed in this area recently?)

                             

                            Thanks so much for the help

                            • 11. Re: How to guarantee delivery order to HornetQ queue
                              clebert.suconic

                              The only issue I could replicate was OutOfMemoryExceptions...

                               

                              You are generating messages using a transaction, while ACKing them using AutoACK. Since you batch send transactions.. the AutoACK will be equivalent to a TX with only one ACK. i.e. you will be building up messages.

                               

                              However... someone reported me an issue on the journal that there are writes out of sync when an Out Of Memory occurs. Perhaps the out of order you see is happening after OutOfMemory errors on the server's side?. That would actually help me with another investigation I'm doing.

                               

                              What would happen if you ACKed at the same rate you produced? I tried it here and I couldn't have any issues.

                              • 12. Re: How to guarantee delivery order to HornetQ queue
                                bjs

                                Thanks Clebert... hmmm I'm surprised you can't reproduce this - as I said, I consistently see it fail after just a few seconds on both windows and linux test machines, without any special configuration settings.

                                 

                                There are no errors in the log or console, and I don't think there's an out of memory issue for the process as a whole - server memory is:

                                Heap
                                 PSYoungGen      total 149760K, used 89357K [0x00000000da950000, 0x00000000e53f0000, 0x00000001053f0000)
                                  eden space 125184K, 59% used [0x00000000da950000,0x00000000df272868,0x00000000e2390000)
                                  from space 24576K, 58% used [0x00000000e2390000,0x00000000e31b0cb8,0x00000000e3b90000)
                                  to   space 23552K, 0% used [0x00000000e3cf0000,0x00000000e3cf0000,0x00000000e53f0000)
                                 PSOldGen        total 349568K, used 13733K [0x00000000853f0000, 0x000000009a950000, 0x00000000da950000)
                                  object space 349568K, 3% used [0x00000000853f0000,0x0000000086159580,0x000000009a950000)
                                 PSPermGen       total 27456K, used 21695K [0x000000007fff0000, 0x0000000081ac0000, 0x00000000853f0000)
                                  object space 27456K, 79% used [0x000000007fff0000,0x000000008151fca8,0x0000000081ac0000)
                                

                                 

                                The sample I posted does not use AUTO_ACK, but CLIENT_ACK (with the acks batched every 50ms); and although I did find the issue was easier to reproduce if I added a 1 second sleep after each acknowlegement (which is what I posted), I'm still seeing it if I comment that line out.

                                 

                                I've tried printing the number of outstanding messages (sentId-receivedId), and it's mostly around 1000-6000 messages (suggesting that on my machine the receiving isn't falling behind the sending by too much) - but sometimes rising higher (10,000 is the highest I've seen).

                                • 13. Re: How to guarantee delivery order to HornetQ queue
                                  clebert.suconic

                                  Jeff Mesnil was able to reproduce the issue.

                                   

                                  The issue is related to an edge case on the system entering and leaving page mode while a transaction is being performed. I could confirm this won't happen if the system is not entering Page Mode.

                                   

                                   

                                  I couldn't replicate the issue as I didn't have paging configured when I ran your test or Andy's test (a conversion of your test to our test suite). But now Jeff Mesnil is working on it:

                                   

                                   

                                  Here is the dev thread about it:

                                   

                                  http://community.jboss.org/thread/154061?tstart=0

                                  1 of 1 people found this helpful
                                  • 14. Re: How to guarantee delivery order to HornetQ queue
                                    jmesnil
                                    Hi Ben,
                                    I'm looking at your issue and we found an edge case which could explain why you receive out of order messages.
                                    However to confirm this, I'd like to know the configuration you used for the HornetQ server. Could you post your hornetq-configuration.xml?
                                    I'm especially interested by the address settings to check if we set PAGE or BLOCK policy on full address and the other associated settings.
                                    thanks,
                                    jeff
                                    1 2 Previous Next