Queue ClientConsumer management
adryen31200 May 9, 2012 4:49 AMHello all I have a good question ! I'm trying to set up a publish/subscription with core api but I have a question, if I subscription like this:
clientConsumer = clientSession.createConsumer(queueName, filter);
What's happend is the queue is already create ? And If isn't already created ? It's automatically create the linked queue ? without calling myself:
clientSession.createQueue(address, queueName);
And for unsubscription aspect ? I have to manage the queue myself ? If no client consumer are no longer bound to a queue, I need to remove it myselft
or it's automatically remove it ? And during unsubscription I need to only close client session ?
Can you clear my ideas plz ? I have the following methods, what do you think ?
private final ClientSessionFactory clientSessionFactory;
// Map where key is queueName and its bounded consumer map
private final Map<String, Map<String, ClientConsumer>> queueConsumerMap;
/**
* Allow to subscribe consumer to provided address
* This function create queue if isn't already
* create and bound to it a <ClientConsumer>.
* It's return the consumer subscription reference
* as a <String>.
* @param address
* @param filter
*/
public String subscribe(final String address, final String filter) throws FrontlineException {
LOGGER.info("Subscription request called for [address|filter]: [" + address + "|" + filter + "]");
ClientSession clientSession = null;
ClientConsumer clientConsumer = null;
final String queueName = address + "#Queue";
try {
clientSession = this.clientSessionFactory.createSession();
LOGGER.debug("Client session correclty created");
} catch (final HornetQException e) {
LOGGER.error("Client session creation error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);
}
// Queue not already bounded
if (!this.queueConsumerMap.containsKey(queueName)) {
try {
LOGGER.debug(queueName + " queue not already created");
clientSession.createQueue(address, queueName);
this.queueConsumerMap.put(queueName, new HashMap<String, ClientConsumer>());
LOGGER.info("Queue correctly create for [address|queueName]: [" + address + "|" + queueName + "]");
} catch (final HornetQException e) {
LOGGER.error("Queue creation error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);
}
}
try {
// Create a ClientConsumer to consume messages matching the filter from the queue with the given name.
clientConsumer = clientSession.createConsumer(queueName, filter);
LOGGER.info("Client consumer correctly create for [queueName|filter]: [" + queueName + "|" + filter + "]");
} catch (final HornetQException e) {
LOGGER.error("Client consumer creation error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);
}
try {
clientSession.start();
LOGGER.debug("Client session correctly started");
} catch (final HornetQException e) {
LOGGER.error("Client session starting error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e); }
try {
clientConsumer.setMessageHandler(new MessageListener(clientSession));
LOGGER.debug("Message listener correctly set to client consumer");
} catch (final HornetQException e) {
LOGGER.error("Message handler binding error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);
}
final String consumerUUID = UUID.randomUUID().toString();
final String consumerSubscriptionRef = queueName + ":" + consumerUUID;
LOGGER.debug("Consumer subscription reference correctly generated: " + consumerSubscriptionRef);
this.queueConsumerMap.get(queueName).put(consumerUUID, clientConsumer);
LOGGER.info(consumerUUID + " client consumer correctly bound");
return consumerSubscriptionRef;
}
/**
* Allow to unsubscribe consumer by provided this
* consumer subscription reference. This function
* remove the bounded <ClientConsumer> from the queue
* and finally remove the queue is no longer
* <ClientConsumer> are bound.
* @param consumerSubscriptionRef
*/
public void unSubscribe(final String consumerSubscriptionRef) {
ClientConsumer clientConsumer = null;
LOGGER.info("Unsubscription request called for consumer subscription reference: " + consumerSubscriptionRef);
// Index 0 is queueName and index 1 is consumer UUID
final String[] subscriptionParts = consumerSubscriptionRef.split(":");
// Queue exist to provided address
final Map<String, ClientConsumer> clientConsumerMap = this.queueConsumerMap.get(subscriptionParts[0]);
LOGGER.debug("Queue found");
if (clientConsumerMap != null) {
clientConsumer = clientConsumerMap.get(subscriptionParts[1]);
/*
* If client consumer found close its session
* and remove it from client consumer map
*/
if (clientConsumer != null) {
LOGGER.debug("Client consumer found");
MessageListener messageListener = null;
try {
messageListener = (MessageListener) clientConsumer.getMessageHandler();
} catch (final HornetQException e) {
LOGGER.error("Message handler getting error occurs during unsubscription for consumer subscription reference: " + consumerSubscriptionRef, e);
}
if (messageListener != null) {
LOGGER.debug("Message listener found");
final ClientSession clientSession = messageListener.getClientSession();
if (clientSession != null) {
LOGGER.debug("Client session recovered");
try {
clientSession.close();
LOGGER.debug("Client session correclty closed");
} catch (final HornetQException e) {
LOGGER.error("Client session closing error occurs during unsubscription for consumer subscription reference: " + consumerSubscriptionRef, e);
}
} else {
LOGGER.debug("No client session recovered in the message listener of client consumer");
}
}
clientConsumerMap.remove(subscriptionParts[1]);
LOGGER.debug("Consumer client " + consumerSubscriptionRef + " found was removed");
LOGGER.info("Unsubscription request correctly processed for consumer subscription reference: " + consumerSubscriptionRef);
} else {
LOGGER.debug("No client consumer found during unsubcription for consumer subscription reference: " + consumerSubscriptionRef);
}
// If no longer client consumer bounded to queue, delete it
if (clientConsumerMap.isEmpty()) {
this.queueConsumerMap.remove(subscriptionParts[0]);
LOGGER.debug("No longer consumer client found, therefore " + subscriptionParts[0] + " queue was removed");
}
} else {
LOGGER.debug("No queue found during unsubcription for consumer subscription reference: " + consumerSubscriptionRef);
}
}
Do you have a publish/subscribe example for API core ? Because I haven't found on
Thank you