Version 8

    Message creator interface

     

    This interface allows client to build JMS message easily

     

    import java.io.Serializable;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    public interface MessageCreator extends Serializable {
        public Message getMessage(Session session) throws JMSException;
    }
    

     

     

      Code

     

    import java.io.Serializable;
    import java.util.Date;
    
    import javax.ejb.EJBException;
    import javax.ejb.SessionBean;
    import javax.ejb.SessionContext;
    import javax.ejb.CreateException;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicPublisher;
    import javax.jms.TopicSession;
    import javax.jms.Connection;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    
    /**
     * Helper sessionbean to send JMS messages.
     *
     * @ejb.bean name="JmsSender"
     *           type="Stateless"
     *           display-name="Jms Sender helpers"
     *           jndi-name="ejb/JmsSender"
     *           local-jndi-name="ejb/JmsSenderLocal"
     *           view-type="both"
     *
     * @ejb.transaction type = "Required"
     *
     * @ejb.util generate = "physical"
     *
     * @ejb.resource-ref res-ref-name = "jms/QueueConnectionFactory"
     *                   res-type = "javax.jms.QueueConnectionFactory"
     *                   res-auth = "Container"
     * @jboss.resource-ref res-ref-name = "jms/QueueConnectionFactory"
     *                     jndi-name = "java:/JmsXA"
     * @ejb.resource-ref res-ref-name = "jms/TopicConnectionFactory"
     *                   res-type = "javax.jms.TopicConnectionFactory"
     *                   res-auth = "Container"
     * @jboss.resource-ref res-ref-name = "jms/TopicConnectionFactory"
     *                     jndi-name = "java:/JmsXA"
     *
     */
    public class JmsSenderBean implements SessionBean {
        private static org.apache.log4j.Logger logger =
                org.apache.log4j.Logger.getLogger(JmsSenderBean.class.getName());
    
        private SessionContext _ctx;
        private transient QueueConnectionFactory queueConnectionFactory;
        private transient TopicConnectionFactory topicConnectionFactory;
    
        /**
         * Creates a new instance.
         *
         * @throws javax.ejb.CreateException if the bean couldn't be created
         * @ejb.create-method
         */
        public void ejbCreate() throws CreateException {
            try {
                init();
            } catch (NamingException e) {
                throw new CreateException("Component not found [" + e.getMessage() + "]");
            }
        }
    
        /**
         * Publish a message on a topic.
         *
         * @ejb.interface-method
         * @param messageCreator MessageCreator to create the message with.
         * @param topic The topic to publish on.
         */
        public void publish(MessageCreator messageCreator, Topic topic) throws JMSException {
            if (logger.isDebugEnabled()) {
                logger.debug("Publishing message [" + messageCreator + "] on topic [" + topic.getTopicName() + "]");
            }
    
            TopicConnection connection = null;
            TopicSession session = null;
            TopicPublisher publisher = null;
    
            try {
                connection = openTopicConnection();
                session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                publisher = session.createPublisher(topic);
                publisher.publish(messageCreator.getMessage(session));
            } finally {
                closeTopicPublisher(publisher);
                closeSession(session);
                closeConnection(connection);
            }
        }
    
        /**
         * Publish text on a topic.
         *
         * @ejb.interface-method
         * @param message The message to publish.
         * @param topic The topic to publish on.
         */
        public void publish(String message, Topic topic) throws JMSException {
            if (logger.isDebugEnabled()) {
                logger.debug("Publishing text [" + message + "] on topic [" + topic.getTopicName() + "]");
            }
    
            publish(new TextMessageCreator(message), topic);
        }
    
        /**
         * Publish an object on a topic.
         *
         * @ejb.interface-method
         * @param message The message to publish.
         * @param topic The topic to publish on.
         */
        public void publish(Object message, Topic topic) throws JMSException {
            if (logger.isDebugEnabled()) {
                logger.debug("Publishing object [" + message + "] on topic [" + topic.getTopicName() + "]");
            }
    
            publish(new ObjectMessageCreator(message), topic);
        }
    
    
       /**
         * Send a Message to a queue with a scheduled delivery and returns the ID assigned by
         * the JMS provider.
         *
         * @ejb.interface-method
         * @param messageCreator MessageCreator to create the message with.
         * @param queue The queue to send to.
         * @param deliveryDate the date at which the message should be delivered. If null, no delivery date is set
         * @return the JMSMessageID asssigned to this message.
         */
        public String send(MessageCreator messageCreator, Queue queue, Date deliveryDate) throws JMSException {
            if (logger.isDebugEnabled()) {
                logger.debug("Sending message [" + messageCreator + "] to queue [" + queue.getQueueName() + "]");
            }
    
            QueueConnection connection = null;
            QueueSession session = null;
            QueueSender sender = null;
    
            try {
                connection = openQueueConnection();
                session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                sender = session.createSender(queue);
                Message msg = messageCreator.getMessage(session);
    
                if (deliveryDate != null) {
                    logger.debug("Message will be delivered on ["+deliveryDate+"]");
                    msg.setLongProperty("JMS_JBOSS_SCHEDULED_DELIVERY", deliveryDate.getTime());
                }
    
                sender.send(msg);
                return msg.getJMSMessageID();
            } finally {
                closeQueueSender(sender);
                closeSession(session);
                closeConnection(connection);
            }
        }
    
        /**
         * Send a Message to a queue and returns the ID assigned by the JMS provider.
         *
         * @ejb.interface-method
         * @param messageCreator MessageCreator to create the message with.
         * @param queue The queue to send to.
         * @return the JMSMessageID asssigned to this message.
         */
        public String send(MessageCreator messageCreator, Queue queue) throws JMSException {
            return send(messageCreator, queue, null);
        }
    
        /**
         * Send text to a queue and returns the ID assigned by the JMS provider.
         *
         * @ejb.interface-method
         * @param message The message to publish.
         * @param queue The queue to send to.
         * @return the JMSMessageID asssigned to this message.
         */
        public String send(String message, Queue queue) throws JMSException {
            return send(new TextMessageCreator(message), queue);
        }
    
        /**
         * Send an Object to a queue and returns its ID assigned by the JMS provider.
         *
         * @ejb.interface-method
         * @param message The message to publish.
         * @param queue The queue to send to.
         * @return the JMSMessageID asssigned to this message.
         */
        public String send(Object message, Queue queue) throws JMSException {
            return send(new ObjectMessageCreator(message), queue);
        }
    
        public void ejbActivate() {
            try {
                init();
            } catch (NamingException e) {
                throw new EJBException("Could not activate", e);
            }
        }
    
        public void ejbPassivate() {
        }
    
        public void ejbRemove() {
        }
    
        public void setSessionContext(SessionContext context) {
            this._ctx = context;
        }
    
        /**
         * Initializes the bean.
         */
        private void init() throws NamingException {
            Context jndi = new InitialContext();
            queueConnectionFactory = (QueueConnectionFactory) jndi.lookup("java:comp/env/jms/QueueConnectionFactory");
            topicConnectionFactory = (TopicConnectionFactory) jndi.lookup("java:comp/env/jms/TopicConnectionFactory");
        }
    
        /**
         * Retrieves a new JMS Connection from the pool
         * @return a <code>QueueConnection</code>
         * @throws JMSException if the connection could not be retrieved
         */
        private QueueConnection openQueueConnection() throws JMSException {
            return queueConnectionFactory.createQueueConnection();
           // queueConnection.start(); this is a pool we don't need to start the connection
        }
    
        /**
         * Retrieves a new JMS Connection from the pool
         * @return a <code>QueueConnection</code>
         * @throws JMSException if the connection could not be retrieved
         */
        private TopicConnection openTopicConnection() throws JMSException {
            return topicConnectionFactory.createTopicConnection();
        }
    
        /**
         * Closes the JMS connection.
         */
        private void closeConnection(Connection connection) {
            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e) {
                logger.warn("Could not close JMS connection", e);
            }
        }
    
        /**
         * Closes the JMS session.
         */
        private void closeSession(Session session) {
            try {
                if (session != null)
                    session.close();
            } catch (JMSException e) {
                logger.warn("Could not close JMS session", e);
            }
        }
    
        /**
         * Closes the JMS session.
         */
        private void closeQueueSender(QueueSender queueSender) {
            try {
                if (queueSender!= null)
                    queueSender.close();
            } catch (JMSException e) {
                logger.warn("Could not close queue sender", e);
            }
        }
    
        /**
         * Closes the JMS session.
         */
        private void closeTopicPublisher(TopicPublisher topicPublisher) {
            try {
                if (topicPublisher != null)
                    topicPublisher.close();
            } catch (JMSException e) {
                logger.warn("Could not close queue sender", e);
            }
        }
    
        private class TextMessageCreator implements MessageCreator {
            private String message;
    
            public TextMessageCreator(String message) {
                this.message = message;
            }
    
            public Message getMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
    
            public String toString() {
                return message;
            }
        }
    
        private class ObjectMessageCreator implements MessageCreator {
            private Serializable message;
    
            public ObjectMessageCreator(Object message) throws JMSException {
                if (!(message instanceof Serializable)) {
                    throw new JMSException("Object ["+message+"] is not serializable");
                }
    
                this.message = (Serializable) message;
            }
    
            public Message getMessage(Session session) throws JMSException {
                return session.createObjectMessage(message);
            }
    
            public String toString() {
                if (message != null)
                    return message.toString();
                else
                    return "null";
            }
        }
    }