2 Replies Latest reply: May 9, 2012 7:54 AM by Adrien Ruffie RSS

Queue ClientConsumer management

Adrien Ruffie Newbie

Hello all I have a good question ! I'm trying to set up a publish/subscription with core api but I have a question, if I subscription like this:

 

clientConsumer = clientSession.createConsumer(queueName, filter);

 

What's happend is the queue is already create ? And If isn't already created ? It's automatically create the linked queue ? without calling myself:

 

clientSession.createQueue(address, queueName);

 

 

And for unsubscription aspect ? I have to manage the queue myself ? If no client consumer are no longer bound to a queue, I need to remove it myselft

or it's automatically remove it ? And during unsubscription I need to only close client session ?

 

Can you clear my ideas plz ? I have the following methods, what do you think ?

 

 

 

 

 

          private final ClientSessionFactory clientSessionFactory;

 

 

          // Map where key is queueName and its bounded consumer map

          private final Map<String, Map<String, ClientConsumer>> queueConsumerMap;

 

 

          /**

          

  • Allow to subscribe consumer to provided address

          

  • This function create queue if isn't already

          

  • create and bound to it a <ClientConsumer>.

          

  • It's return the consumer subscription reference

          

  • as a <String>.

          

  • @param address

          

  • @param filter

           */

public String subscribe(final String address, final String filter) throws FrontlineException {

 

 

                    LOGGER.info("Subscription request called for address: [" + address + "|" + filter + "]");

                    ClientSession clientSession = null;

                    ClientConsumer clientConsumer = null;

                    final String queueName = address + "#Queue";

 

 

                    try {

                              clientSession = this.clientSessionFactory.createSession();

                              LOGGER.debug("Client session correclty created");

                    } catch (final HornetQException e) {

                              LOGGER.error("Client session creation error occurs during subscription for address: [" + address + "|" + filter + "]", e);

                    }

 

 

                    // Queue not already bounded

                    if (!this.queueConsumerMap.containsKey(queueName)) {

                              try {

                                        LOGGER.debug(queueName + " queue not already created");

                                        clientSession.createQueue(address, queueName);

                                        this.queueConsumerMap.put(queueName, new HashMap<String, ClientConsumer>());

                                        LOGGER.info("Queue correctly create for address: [" + address + "|" + queueName + "]");

                              } catch (final HornetQException e) {

                                        LOGGER.error("Queue creation error occurs during subscription for address: [" + address + "|" + filter + "]", e);

                              }

                    }

 

 

                    try {

                              // Create a ClientConsumer to consume messages matching the filter from the queue with the given name.

                              clientConsumer = clientSession.createConsumer(queueName, filter);

                              LOGGER.info("Client consumer correctly create for queueName: [" + queueName + "|" + filter + "]");

                    } catch (final HornetQException e) {

                              LOGGER.error("Client consumer creation error occurs during subscription for address: [" + address + "|" + filter + "]", e);

                    }

 

 

                    try {

                              clientSession.start();

                              LOGGER.debug("Client session correctly started");

                    } catch (final HornetQException e) {

                              LOGGER.error("Client session starting error occurs during subscription for address: [" + address + "|" + filter + "]", e);            }

 

 

                    try {

                              clientConsumer.setMessageHandler(new MessageListener(clientSession));

                              LOGGER.debug("Message listener correctly set to client consumer");

                    } catch (final HornetQException e) {

                              LOGGER.error("Message handler binding error occurs during subscription for address: [" + address + "|" + filter + "]", e);

                    }

                    final String consumerUUID = UUID.randomUUID().toString();

                    final String consumerSubscriptionRef = queueName + ":" + consumerUUID;

                    LOGGER.debug("Consumer subscription reference correctly generated: " + consumerSubscriptionRef);

                    this.queueConsumerMap.get(queueName).put(consumerUUID, clientConsumer);

                    LOGGER.info(consumerUUID + " client consumer correctly bound");

                    return consumerSubscriptionRef;

          }

 

 

          /**

          

  • Allow to unsubscribe consumer by provided this

          

  • consumer subscription reference. This function

          

  • remove the bounded <ClientConsumer> from the queue

          

  • and finally remove the queue is no longer

          

  • <ClientConsumer> are bound.

          

  • @param consumerSubscriptionRef

           */

          public void unSubscribe(final String consumerSubscriptionRef) {

 

 

                    ClientConsumer clientConsumer = null;

 

 

                    LOGGER.info("Unsubscription request called for consumer subscription reference: " + consumerSubscriptionRef);

                    // Index 0 is queueName and index 1 is consumer UUID

                    final String[] subscriptionParts = consumerSubscriptionRef.split(":");

 

 

                    // Queue exist to provided address

                    final Map<String, ClientConsumer> clientConsumerMap = this.queueConsumerMap.get(subscriptionParts[0]);

                    LOGGER.debug("Queue found");

                    if (clientConsumerMap != null) {

                              clientConsumer = clientConsumerMap.get(subscriptionParts[1]);

 

 

                              /*

                              

  • If client consumer found close its session

                              

  • and remove it from client consumer map

                               */

                              if (clientConsumer != null) {

                                        LOGGER.debug("Client consumer found");

                                        MessageListener messageListener = null;

                                        try {

                                                  messageListener = (MessageListener) clientConsumer.getMessageHandler();

                                        } catch (final HornetQException e) {

                                                  LOGGER.error("Message handler getting error occurs during unsubscription for consumer subscription reference: " + consumerSubscriptionRef, e);

                                        }

                                        if (messageListener != null) {

 

 

                                                  LOGGER.debug("Message listener found");

                                                  final ClientSession clientSession = messageListener.getClientSession();

                                                  if (clientSession != null) {

                                                            LOGGER.debug("Client session recovered");

                                                            try {

                                                                      clientSession.close();

                                                                      LOGGER.debug("Client session correclty closed");

                                                            } catch (final HornetQException e) {

                                                                      LOGGER.error("Client session closing error occurs during unsubscription for consumer subscription reference: " + consumerSubscriptionRef, e);

                                                            }

                                                  } else {

                                                            LOGGER.debug("No client session recovered in the message listener of client consumer");

                                                  }

                                        }

                                        clientConsumerMap.remove(subscriptionParts[1]);

 

 

                                        LOGGER.debug("Consumer client " + consumerSubscriptionRef + " found was removed");

 

 

                                        LOGGER.info("Unsubscription request correctly processed for consumer subscription reference: " + consumerSubscriptionRef);

                              } else {

                                        LOGGER.debug("No client consumer found during unsubcription for consumer subscription reference: " + consumerSubscriptionRef);

                              }

 

 

                              // If no longer client consumer bounded to queue, delete it

                              if (clientConsumerMap.isEmpty()) {

                                        this.queueConsumerMap.remove(subscriptionParts[0]);

                                        LOGGER.debug("No longer consumer client found, therefore " + subscriptionParts[0] + " queue was removed");

                              }

                    } else {

                              LOGGER.debug("No queue found during unsubcription for consumer subscription reference: " + consumerSubscriptionRef);

                    }

          }

 

Do you have a publish/subscribe example for API core ? Because I haven't found on

 

Thank you

 

  • 1. Re: Queue ClientConsumer management
    Andy Taylor Master

    What's happend is the queue is already create ? And If isn't already created ? It's automatically create the linked queue ? without calling myself:

     

    clientSession.createQueue(address, queueName);

     

     

    And for unsubscription aspect ? I have to manage the queue myself ? If no client consumer are no longer bound to a queue, I need to remove it myselft

    or it's automatically remove it ? And during unsubscription I need to only close client session ?

    I'm not sure what you are saying here but if you create a queue then you also need to delete it unless its temporary where it will be delelted on session close

     

    Do you have a publish/subscribe example for API core ? Because I haven't found on

    With the core api there are no topics just address and queues so a subscription would be a queue on an address with a single consumer.

  • 2. Re: Queue ClientConsumer management
    Adrien Ruffie Newbie

    Thank Andy for your response.

     

    Ok isn't a address --> queue 1 --> consumer 1, consumer 2, consumer n

                                    queue 2 --> consumer 3, consumer 2, consumer m

                                    queue n --> consumer 1, consumer 4, consumer 3

    for publish/subcribe pattern

    but address --> queue1 --> consumer1

                      --> queue2 ---> consumer2

                      --> queue n --> consumer n

     

    1 subscription isn't represented by one consumer on one queue, but one queue on one address ?

     

     

    for your response to "I'm not sure what you are saying here but if you create a queue then you also need to delete it unless its temporary where it will be delelted on session close"

     

    Can I only do clientConsumer = clientSession.createConsumer(queueName, filter);

    without calling before: clientSession.createQueue(address, queueName);

     

    createConsumer() also create Queue if queueName isn't created ?

     

     

     

    Other problem here I have one ClientSession for one Consumer but If I have 100000 consumers ... it means I will have 100000 openned ClientSession ... do you have a better idea for this problem ?