-
2. Re: Unsubscribing durable subscription via Stomp
clebert.suconic Apr 3, 2012 4:50 PM (in response to ameyamk)My concern is what if you had multiple consumers on the same subscription. It seems it's possible to do that on stomp. The first time you called unsubscribe it would delete the queue (if we did this).
I wonder what Jeff Mesnil and Howard would think about it since they worked a lot on Stomp.
-
3. Re: Unsubscribing durable subscription via Stomp
jmesnil Apr 4, 2012 2:37 AM (in response to clebert.suconic)I had a quick look and indeed the code does not look right when unsubscribing a durable subscriber.
When, a client wants to create a durable subscriber, he must pass a (non-standard iirc)
Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME header. In that case, we create a core queue for it.
When a client wants to unsubscribe the durable subscriber, he should pass the same header so that we can remove its core queue.
howard, wdyt?
-
4. Re: Unsubscribing durable subscription via Stomp
ameyamk Apr 4, 2012 2:19 PM (in response to jmesnil)I did bit mroe work on this, now I am passing correct subscriber id etc from client. I am also calling deleteQueue in StompSession onUnsubscribe
It looks something like this (I added bindingName):
public boolean unsubscribe(String destination, String id, String bindingName) throws Exception
{
Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
while (iterator.hasNext())
{
Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long, StompSubscription>)iterator.next();
long consumerID = entry.getKey();
StompSubscription sub = entry.getValue();
if (id != null && id.equals(sub.getID()))
{
iterator.remove();
session.closeConsumer(consumerID);
session.deleteQueue(new SimpleString(bindingName));
return true;
}
}
return false;
}
This works as expected as long as there are no messages in the queue. If there are messages in the queue, it throws NPE:
java.lang.NullPointerException
at org.hornetq.core.server.impl.ScheduledDeliveryHandlerImpl.cancel(ScheduledDeliveryHandlerImpl.java:119)
at org.hornetq.core.server.impl.QueueImpl.deleteMatchingReferences(QueueImpl.java:948)
at org.hornetq.core.server.impl.QueueImpl.deleteAllReferences(QueueImpl.java:921)
at org.hornetq.core.server.impl.HornetQServerImpl.destroyQueue(HornetQServerImpl.java:1104)
at org.hornetq.core.server.impl.ServerSessionImpl.deleteQueue(ServerSessionImpl.java:491)
at org.hornetq.core.protocol.stomp.StompSession.unsubscribe(StompSession.java:315)
at org.hornetq.core.protocol.stomp.StompProtocolManager.onUnsubscribe(StompProtocolManager.java:407)
at org.hornetq.core.protocol.stomp.StompProtocolManager.handleBuffer(StompProtocolManager.java:203)
at org.hornetq.core.protocol.stomp.StompConnection.bufferReceived(StompConnection.java:274)
at org.hornetq.core.remoting.server.impl.RemotingServiceImpl$DelegatingBufferHandler.bufferReceived(RemotingServiceImpl.java:459)
at org.hornetq.core.remoting.impl.netty.HornetQChannelHandler.messageReceived(HornetQChannelHandler.java:73)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:100)
at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:362)
at org.jboss.netty.channel.StaticChannelPipeline.sendUpstream(StaticChannelPipeline.java:357)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.oio.OioWorker.run(OioWorker.java:90)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
at org.jboss.netty.util.VirtualExecutorService$ChildExecutorRunnable.run(VirtualExecutorService.java:181)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Any help here would help.
-
6. Re: Unsubscribing durable subscription via Stomp
matthieuc Jan 31, 2013 10:20 AM (in response to clebert.suconic)Hi,
Following up on this post, had the same issues than Ameya and it's solution was helpful while making a patch on my own upon 2.2.20 release last year. Context : HornetQ production servers delivering a few hundreds msg/secs into Topics, with mixed STOMP / JMS consumers.
STOMP consumers mostly using Net::STOMP::Client Perl module.
Behaviour observed on 2.2.20 release and reproduced on current 2.3.0-CR1 :
- selectors : When using a selector, excluded messages still looks routed in the queue, however not delivered to the client. (Observed from the MessageCount JMX attribute value). Also the selector value also does not appear in the JMX 'Filter' attribute. The latter being minor compared to the heap memory saturation caused by the first.
- unsubscribe : The Queue is not destroyed upon a successful unsubscribe : Incoming messages remains routed, also causing heap memory saturation.
Changed slightly more things from Ameya's patch, and been using this since October without trouble. The patch :
- Assign the selector value to the Queue, not the Consumer on the server side : Selector value can then be seen in the JMX Filter attribute, and excluded messages don't looks routed in the queue.
- Destroy the Queue when the last consumer disconnects. (Tested with multiple consumers and several subscribe/unsubcribe cases before the final "unsubscribe, no more consumers" situation.
Starting to look at 2.3.0-CR1, i ported this patch onto the code base & was wondering if those STOMP features could be considered useful & integrated onto the main release at some point. Can provide diff file in that case.
regards,
matthieu
-
7. Re: Unsubscribing durable subscription via Stomp
gaohoward Aug 21, 2013 9:54 AM (in response to jmesnil)I think you are right Jeff. (sorry for the late response, I missed it)
Howard