7 Replies Latest reply on Aug 21, 2013 9:54 AM by gaohoward

    Unsubscribing durable subscription via Stomp

    ameyamk

      Hi,

       

      I am opening this thread, post discussion with Clebert.

       

      I was trying to "unsubscribe" subscription from my durable subscriptions (over Stomp).

       

      I found out that, this does not work. It does not actually remove durable subscriptions thats on the queue. (Although it does kill consumers on it).

       

      What I expect to happen when "unsubscribe"

       

      - Remove deurable subscription for given subId.

      - Do not accumulate any new messages published to the topic for this subId

      - Delete any "unconsumed" messages for that subscription queue

      - Kill consumers on the subscriptions.

       

      Would love to get some ideas on what was original intended functionality of unsubscribe, and If I am missing something.

       

      Thanks,

       

      Ameya

        • 2. Re: Unsubscribing durable subscription via Stomp
          clebert.suconic

          My concern is what if you had multiple consumers on the same subscription. It seems it's possible to do that on stomp. The first time you called unsubscribe it would delete the queue (if we did this).

           

           

          I wonder what Jeff Mesnil and Howard would think about it since they worked a lot on Stomp.

          • 3. Re: Unsubscribing durable subscription via Stomp
            jmesnil

            I had a quick look and indeed the code does not look right when unsubscribing a durable subscriber.

             

            When, a client wants to create a durable subscriber, he must pass a (non-standard iirc)

            Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME  header. In that case, we create a core queue for it.

             

            When a client wants to unsubscribe the durable subscriber, he should pass the same header so that we can remove its core queue.

             

            howard, wdyt?

            • 4. Re: Unsubscribing durable subscription via Stomp
              ameyamk

              I did bit mroe work on this, now I am passing correct subscriber id etc from client. I am also calling deleteQueue in StompSession onUnsubscribe

               

              It looks something like this (I added bindingName):

               

                 public boolean unsubscribe(String destination, String id, String bindingName) throws Exception

                 {

                    Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();             

                    while (iterator.hasNext())

                    {

                       Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();

                       long consumerID = entry.getKey();

                       StompSubscription sub = entry.getValue();

                             

                      

                       if (id != null && id.equals(sub.getID()))

                       {

                          iterator.remove();                       

                          session.closeConsumer(consumerID);

                          

               

                          session.deleteQueue(new SimpleString(bindingName));                       

                          return true;

                       }

                    }

                    return false;

                 }

               

               

               

              This works as expected as long as there are no messages in the queue. If there are messages in the queue, it throws NPE:

               

              java.lang.NullPointerException

              at org.hornetq.core.server.impl.ScheduledDeliveryHandlerImpl.cancel(ScheduledDeliveryHandlerImpl.java:119)

              at org.hornetq.core.server.impl.QueueImpl.deleteMatchingReferences(QueueImpl.java:948)

              at org.hornetq.core.server.impl.QueueImpl.deleteAllReferences(QueueImpl.java:921)

              at org.hornetq.core.server.impl.HornetQServerImpl.destroyQueue(HornetQServerImpl.java:1104)

              at org.hornetq.core.server.impl.ServerSessionImpl.deleteQueue(ServerSessionImpl.java:491)

              at org.hornetq.core.protocol.stomp.StompSession.unsubscribe(StompSession.java:315)

              at org.hornetq.core.protocol.stomp.StompProtocolManager.onUnsubscribe(StompProtocolManager.java:407)

              at org.hornetq.core.protocol.stomp.StompProtocolManager.handleBuffer(StompProtocolManager.java:203)

              at org.hornetq.core.protocol.stomp.StompConnection.bufferReceived(StompConnection.java:274)

              at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingBufferHandler.bufferReceived(RemotingServiceImpl.java:459)

              at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:73)

              at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:100)

              at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)

              at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:357)

              at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)

              at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)

              at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:90)

              at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)

              at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)

              at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:181)

              at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

              at java.lang.Thread.run(Thread.java:662)

               

               

              Any help here would help.

              • 6. Re: Unsubscribing durable subscription via Stomp
                matthieuc

                Hi,

                 

                Following up on this post, had the same issues than Ameya and it's solution was helpful while making a patch on my own upon 2.2.20 release last year. Context : HornetQ production servers delivering a few hundreds msg/secs into Topics, with mixed STOMP / JMS consumers.

                STOMP consumers mostly using Net::STOMP::Client Perl module.

                 

                Behaviour observed on 2.2.20 release and reproduced on current 2.3.0-CR1 :

                - selectors : When using a selector, excluded messages still looks routed in the queue, however not delivered to the client. (Observed from the MessageCount JMX attribute value). Also the selector value also does not appear in the JMX 'Filter' attribute. The latter being minor compared to the heap memory saturation caused by the first.

                - unsubscribe : The Queue is not destroyed upon a successful unsubscribe : Incoming messages remains routed, also causing heap memory saturation.

                 

                Changed slightly more things from Ameya's patch, and been using this since October without trouble. The patch :

                - Assign the selector value to the Queue, not the Consumer on the server side : Selector value can then be seen in the JMX Filter attribute, and excluded messages don't looks routed in the queue.

                - Destroy the Queue when the last consumer disconnects. (Tested with multiple consumers and several subscribe/unsubcribe cases before the final "unsubscribe, no more consumers" situation.

                 

                Starting to look at 2.3.0-CR1, i ported this patch onto the code base & was wondering if those STOMP features could be considered useful & integrated onto the main release at some point. Can provide diff file in that case.

                 

                regards,

                 

                matthieu

                • 7. Re: Unsubscribing durable subscription via Stomp
                  gaohoward

                  I think you are right Jeff. (sorry for the late response, I missed it)

                   

                  Howard