Version 5

    Code

    
    import java.util.ArrayList;
    
    import javax.ejb.EJBException;
    import javax.ejb.SessionBean;
    import javax.ejb.SessionContext;
    import javax.ejb.CreateException;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueReceiver;
    import javax.jms.QueueSession;
    import javax.jms.Connection;
    import javax.jms.Session;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    
    
    /**
     * Helper sessionbean to receive JMS messages.
     *
     * @ejb.bean name="JmsReceiver"
     *           type="Stateless"
     *           display-name="Jms Receiver helpers"
     *           jndi-name="ejb/JmsReceiver"
     *           local-jndi-name="ejb/JmsReceiverLocal"
     *           view-type="both"
     *
     * @ejb.transaction type = "Required"
     *
     * @ejb.util generate = "physical"
     *
     * @ejb.resource-ref res-ref-name = "jms/QueueConnectionFactory"
     *                   res-type = "javax.jms.QueueConnectionFactory"
     *                   res-auth = "Container"
     * @jboss.resource-ref res-ref-name = "jms/QueueConnectionFactory"
     *                     jndi-name = "java:/JmsXA"
     *
     */
    public class JmsReceiverBean implements SessionBean {
        private static org.apache.log4j.Logger logger =
                org.apache.log4j.Logger.getLogger(JmsReceiverBean.class.getName());
    
        private SessionContext _ctx;
        private transient QueueConnectionFactory queueConnectionFactory;
    
        /**
         * Creates a new instance.
         *
         * @throws CreateException if the bean couldn't be created
         * @ejb.create-method
         */
        public void ejbCreate() throws CreateException {
            try {
                init();
            } catch (NamingException e) {
                throw new CreateException("Component not found [" + e.getMessage() + "]");
            }
        }
    
    
        /**
         * Gets a batch of messages on the specified queue mapping the specified selector
         * @param queue the queue to connect to
         * @param selector only messages with properties matching the message selector
         *                 expression are delivered. A value of null or an empty string
         *                 indicates that there is no message selector for the message
         *                 consumer.
         * @param batchSize the maximum number of <code>Message</code>s to return. If queue size is <= 0, then
         * all messages matching the selector will be returned.
         * @return an array of max <code>batchSize javax.jms.Message</code> matching
         *         the specified selector. An empty array is returned if there is no
         *         (more) messages
         * @throws JMSException if an error occured while retrieving the messages
         *
         * @ejb.interface-method
         */
        public Message[] receive(Queue queue, String selector, int batchSize) throws JMSException {
            QueueConnection connection = null;
            QueueSession session = null;
            QueueReceiver receiver = null;
    
            try {
                queueConnectionFactory.createQueueConnection();
                session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
                receiver = session.createReceiver(queue, selector);
    
                // As from 3.2.4, you need to start the connection to receive messages with java:/JmsXA
                connection.start();
                // Comment the line above if you are running Jboss < 3.2.4
    
                ArrayList result = new ArrayList();
                Message msg = null;
    
                boolean unLimitedBatchSize = (batchSize <= 0);
    
                while ( ( (result.size() < batchSize) || unLimitedBatchSize)  && ((msg = receiver.receiveNoWait()) != null)) {
                    result.add(msg);
                }
    
                return (Message[]) result.toArray(new Message[result.size()]);
            } finally {
                closeReceiver(receiver);
                closeSession(session);
                closeConnection(connection);
            }
        }
    
        /**
         * Gets a batch of messages on the specified queue.
         * @param queue the queue to connect to
         * @param batchSize the maximum number of <code>Message</code>s to return. If batchSize <= 0, then
         * all messages will be returned.
         * @return an array of max <code>batchSize javax.jms.Message</code>. An
         *         empty array is returned if there is no (more) messages
         * @throws JMSException if an error occured while retrieving the messages
         *
         * @ejb.interface-method
         */
        public Message[] receive(Queue queue, int batchSize) throws JMSException {
            return receive(queue, null, batchSize);
        }
    
        /**
         * Gets the next message on the specified queue mapping the specified selector
         * @param queue the queue to connect to
         * @param selector only messages with properties matching the message selector
         *                 expression are delivered. A value of null or an empty string
         *                 indicates that there is no message selector for the message
         *                 consumer.
         * @return a <code>javax.jms.Message</code> matching the specified selector or
         *         <code>null</code> if there is no (more) messages
         * @throws JMSException if an error occured while retrieving the message
         *
         * @ejb.interface-method
         */
        public Message receive(Queue queue, String selector) throws JMSException {
            Message[] msg = receive(queue, selector, 1);
    
            if (msg.length > 0) {
                return msg[0];
            } else {
                return null;
            }
        }
    
        /**
         * Gets the next message on the specified queue.
         * @param queue the queue to connect to
         * @return a <code>javax.jms.Message</code> or <code>null</code> if there
         *         is no (more) messages
         * @throws JMSException if an error occured while retrieving the message
         *
         * @ejb.interface-method
         */
        public Message receive(Queue queue) throws JMSException {
            return receive(queue, null);
        }
    
        public void ejbActivate() {
            try {
                init();
            } catch (NamingException e) {
                throw new EJBException("Could not activate", e);
            }
        }
    
        public void ejbPassivate() {
        }
    
        public void ejbRemove() {
        }
    
        public void setSessionContext(SessionContext context) {
            this._ctx = context;
        }
    
        /**
         * Initializes the bean.
         */
        private void init() throws NamingException {
            Context jndi = new InitialContext();
            queueConnectionFactory = (QueueConnectionFactory) jndi.lookup("java:comp/env/jms/QueueConnectionFactory");
        }
    
        /**
         * Closes the JMS connection.
         */
        private void closeConnection(Connection connection) {
            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e) {
                logger.warn("Could not close JMS connection", e);
            }
        }
    
        /**
         * Closes the JMS session.
         */
        private void closeSession(Session session) {
            try {
                if (session != null)
                    session.close();
            } catch (JMSException e) {
                logger.warn("Could not close JMS session", e);
            }
        }
    
        /**
         * Closes the JMS session.
         */
        private void closeReceiver(QueueReceiver queueReceiver) {
            try {
                if (queueReceiver != null)
                    queueReceiver.close();
            } catch (JMSException e) {
                logger.warn("Could not close queue receiver", e);
            }
        }
    }