Spring and HornetQ standalone integration

I've been testing HornetQ 2.1.2.Final for a few days now, and I needed to finally integrate it with our Java SE symmetric cluster server. We're using Spring 3 IOC  to instantiate and configure a few components, and we wanted to do the same with HornetQ, as migrating our code to a JBoss AS solution is not an option.

 

The following configuration embeds a symmetric clustered instance of HornetQ into our server:

 

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
  xmlns:util="http://www.springframework.org/schema/util"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
  default-init-method="init" default-destroy-method="destroy">
  
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">    
    <description>Loads properties from multiple sources, with fallback capabilities</description>
    <property name="locations">
      <list>
        <value>file:${basePath}/etc/hornetq.properties</value>
      </list>
    </property>
    <property name="ignoreUnresolvablePlaceholders" value="true" />
    <property name="ignoreResourceNotFound" value="true" />
    <!-- Fallback to these properties if not found in locations or system 
      properties -->
    <property name="systemPropertiesMode">
      <util:constant
        static-field="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer.SYSTEM_PROPERTIES_MODE_OVERRIDE" />
    </property>
    <property name="properties">
      <props>        
        <!-- Default values: will be used if not present in properties file, or properties
             file is not found -->
        <prop key="hornetq.runpath">${basePath}/run/hornetq</prop>
        <prop key="data.dir">${hornetq.runpath}</prop>        
      </props>
    </property>
  </bean>
    
  <!-- JNDI Server Configuration ########################################## -->
  <bean name="Naming" class="org.jnp.server.NamingBeanImpl"
        init-method="start" destroy-method="stop">
    <!-- If true, it performs a java:/comp/env context lookup... and the server
         is not ready -->
    <property name="installJavaComp" value="false" />
  </bean>
            
  <bean name="JNDIServer" class="org.jnp.server.Main"
        init-method="start" destroy-method="stop">
    <property name="namingInfo" ref="Naming" />
    <property name="port" value="${conf.jndi.port:2099}" />
    <property name="bindAddress" value="${conf.jndi.bindingAddress:0.0.0.0}" />
    <property name="rmiPort" value="${conf.jndi.rmi.port:2098}" />
    <property name="rmiBindAddress" value="${conf.jndi.rmi.bindingAddress:0.0.0.0}" />
  </bean>
  
  <bean name="MBeanServer" class="java.lang.management.ManagementFactory"
        factory-method="getPlatformMBeanServer" />
        
  <!-- END JNDI Server Configuration ###################################### -->
  
  <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
    <description>HornetQ Core Server</description>
    <constructor-arg ref="Configuration" />
    <constructor-arg ref="MBeanServer" />
    <constructor-arg ref="HornetQSecurityManager" />
  </bean>
  
  <!-- Note init/destroy methods set as start/stop here, and not in Core Server, to
       avoid the latter starting before JMS Server -->
  <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl"
        init-method="start" destroy-method="stop">
    <description>HornetQ JMS Server</description>
    <constructor-arg ref="HornetQServer" />
    <constructor-arg ref="JMSConfiguration" />
  </bean>
  
  <!-- Security manager configuration: add user/roles here -->
  <!-- HornetQSecurityManagerBean just extends org.hornetq.spi.core.security.HornetQSecurityManagerImpl
       adding a constructor to ease user/roles configuration from Spring -->
  <bean name="HornetQSecurityManager" 
        class="es.tid.planb.hornetq.security.HornetQSecurityManagerBean">
    <constructor-arg>
      <!-- User/pass map -->
      <map>
         <entry key="guest" value="guest" />
      </map>
    </constructor-arg>
     <constructor-arg>
      <!-- User/roles map -->
      <map>
         <entry key="guest">
            <!-- Roles for user -->
            <list>
               <value>guest</value>
            </list>
         </entry>
      </map>
    </constructor-arg>
    <property name="defaultUser" value="guest" />
  </bean>  
  
  <!-- CONNECTORS : shared between HornetQ core and JMS servers -->
  <bean name="NettyInternalConnector" class="org.hornetq.api.core.TransportConfiguration">
      <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
      <constructor-arg>
        <map>
           <entry key="host" value="${conf.hornetq.acceptor.internal.host:localhost}"/>
           <entry key="port" value="${conf.hornetq.acceptor.internal.port:5446}"/>
           <entry key="use-nio" value="${conf.hornetq.acceptor.internal.useNIO:true}"/>
        </map>
      </constructor-arg>
   </bean>
   
   <bean name="NettyExternalConnector" class="org.hornetq.api.core.TransportConfiguration">
      <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
      <constructor-arg>
        <map>
           <entry key="host" value="${conf.hornetq.acceptor.external.host:0.0.0.0}"/>
           <entry key="port" value="${conf.hornetq.acceptor.external.port:5445}"/>
           <entry key="use-nio" value="${conf.hornetq.acceptor.external.useNIO:true}"/>           
        </map>
      </constructor-arg>
   </bean>
   
   <bean name="NettyInVMConnector" class="org.hornetq.api.core.TransportConfiguration">
      <constructor-arg value="org.hornetq.core.remoting.impl.invm.InVMConnectorFactory" />
      <constructor-arg>
        <map>
           <entry key="server-id" value="${conf.hornetq.acceptor.invm.serverId:0}"/>
        </map>
      </constructor-arg>
   </bean>
   <!-- END CONNECTORS -->
  
  <!-- Common HornetQ configuration --> 
  <bean name="Configuration" class="org.hornetq.core.config.impl.ConfigurationImpl">        
    <property name="clustered" value="true" />
    <property name="persistenceEnabled" value="${conf.hornetq.persistenceEnabled:true}" />
    <property name="JMXManagementEnabled" value="${conf.hornetq.jmxEnabled:true}" />
    <property name="pagingDirectory" value="${hornetq.runpath}/paging" />
    <property name="bindingsDirectory" value="${hornetq.runpath}/bindings" />
    <property name="largeMessagesDirectory" value="${hornetq.runpath}/large-messages" />
    <property name="journalDirectory" value="${hornetq.runpath}/journal" />
    <property name="journalMinFiles" value="${conf.hornetq.journalMinFiles:10}" />
    <property name="journalType">
      <util:constant static-field="${conf.hornetq.journal.type:org.hornetq.core.server.JournalType.NIO}" />
    </property>
    <property name="clusterUser" value="HORNETQ-CLUSTER-USER" />
    <property name="clusterPassword" value="HORNETQ-CLUSTER-PASS" />
    <property name="threadPoolMaxSize" value="${conf.hornetq.threadPoolMaxSize:-1}" />
    
    <!-- Security settings -->
    <!-- Disable security to get a small performance boost -->
    <property name="securityEnabled" value="${conf.hornetq.securityEnabled:false}" />
    <property name="securityRoles">
      <map>
         <!-- Entry keys are the matching parameters -->
         <entry key="#">
            <set>
               <bean class="org.hornetq.core.security.Role">
                  <!-- Name -->
                  <constructor-arg value="guest" />
                  <!-- send? -->
                  <constructor-arg value="true" />
                  <!-- consume? -->
                  <constructor-arg value="true" />
                  <!-- create durable queue? -->
                  <constructor-arg value="false" />
                  <!-- delete durable queue? -->
                  <constructor-arg value="false" />
                  <!-- create non-durable queue? -->
                  <constructor-arg value="true" />
                  <!-- delete non-durable queue? -->
                  <constructor-arg value="true" />
                  <!-- manage? -->
                  <constructor-arg value="false" />
               </bean>
            </set>
         </entry>
      </map>
    </property>
    
    <!-- Addresses settings -->
    <property name="addressesSettings">
      <map>
         <!-- Entry keys are the matching parameters -->
         <entry key="#">
            <bean class="org.hornetq.core.settings.impl.AddressSettings">
               <property name="deadLetterAddress" value="jms.queue.DLQ" />
               <property name="expiryAddress" value="jms.queue.ExpiryQueue" />
               <!-- In milliseconds -->
               <property name="redeliveryDelay" value="${conf.hornetq.addresses.redeliveryDelay:0}" />
               <property name="maxSizeBytes" value="${conf.hornetq.addresses.maxSizeBytes:20971520}" />
               <property name="messageCounterHistoryDayLimit" value="${conf.hornetq.addresses.dayLimit:10}" />
               <property name="addressFullMessagePolicy">
                  <util:constant static-field="org.hornetq.core.settings.impl.AddressFullMessagePolicy.PAGE" />
               </property>
            </bean>
         </entry>
      </map>
    </property>
        
    <!-- Already defined as named beans (see above) -->
    <property name="connectorConfigurations">
      <map>
         <entry key="netty-internal"><ref bean="NettyInternalConnector" /></entry>
         <entry key="netty-external"><ref bean="NettyExternalConnector" /></entry>
         <entry key="netty-invm"><ref bean="NettyInVMConnector" /></entry>
      </map>
    </property>
        
    <!-- Acceptors: match ports with connectors -->
    <property name="acceptorConfigurations">
      <set>
         <bean class="org.hornetq.api.core.TransportConfiguration">
            <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory" />
            <constructor-arg>
              <map>
                 <entry key="host" value="${conf.hornetq.acceptor.internal.host:localhost}"/>
                 <entry key="port" value="${conf.hornetq.acceptor.internal.port:5446}"/>
                 <entry key="use-nio" value="${conf.hornetq.acceptor.internal.useNIO:true}"/>
                 <entry key="direct-deliver" value="${conf.hornetq.acceptor.internal.directDeliver:false}" />
              </map>
            </constructor-arg>
            <!-- Name -->
            <constructor-arg value="netty-internal" />
         </bean>
         <bean class="org.hornetq.api.core.TransportConfiguration">
            <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory" />
            <constructor-arg>
              <map>
                 <entry key="host" value="${conf.hornetq.acceptor.external.host:0.0.0.0}"/>
                 <entry key="port" value="${conf.hornetq.acceptor.external.port:5445}"/>
                 <entry key="use-nio" value="${conf.hornetq.acceptor.external.useNIO:true}"/>
                 <entry key="direct-deliver" value="${conf.hornetq.acceptor.external.directDeliver:false}" />
              </map>
            </constructor-arg>
            <!-- Name -->
            <constructor-arg value="netty-external" />
         </bean>
         <bean class="org.hornetq.api.core.TransportConfiguration">
            <constructor-arg value="org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory" />
            <constructor-arg>
              <map>
                 <entry key="server-id" value="${conf.hornetq.acceptor.invm.serverId:0}"/>
              </map>
            </constructor-arg>
            <!-- Name -->
            <constructor-arg value="netty-invm" />
         </bean>
      </set>
    </property>
    
    <!-- Broadcast groups -->
    <property name="broadcastGroupConfigurations">
      <list>
         <bean class="org.hornetq.core.config.BroadcastGroupConfiguration">
            <!-- Group name -->
            <constructor-arg value="${conf.hornetq.broadcastGroup.name:hornetq-bg}" />
            <!-- Local bind address -->
            <constructor-arg value="${conf.hornetq.broadcastGroup.bindingAddress:0.0.0.0}" />
            <!-- Local bind port -->
            <constructor-arg value="${conf.hornetq.broadcastGroup.bindingPort:9875}" />
            <!-- Group address -->
            <constructor-arg value="${conf.hornetq.broadcastGroup.groupAddress:231.7.7.7}" />
            <!-- Group port -->
            <constructor-arg value="${conf.hornetq.broadcastGroup.groupPort:9876}" />
            <!-- Broadcast period -->
            <constructor-arg value="${conf.hornetq.broadcastGroup.broadcastPeriod:5000}" />
            <!-- Connector infos -->
            <constructor-arg>
              <list>
                <!-- A list of connector-backup connector name pairs -->
                <bean class="org.hornetq.api.core.Pair">
                  <constructor-arg value="netty-external" />
                  <constructor-arg><null/></constructor-arg>
                </bean>
              </list>
            </constructor-arg>
         </bean>
      </list>
    </property>
    
    <!-- Discovery groups -->
    <property name="discoveryGroupConfigurations">
      <map>
         <!-- Match map key with name constructor arg -->
         <entry key="${conf.hornetq.discoveryGroup.name}">
           <bean class="org.hornetq.core.config.DiscoveryGroupConfiguration">
              <!-- Group name -->
              <constructor-arg value="${conf.hornetq.discoveryGroup.name:hornetq-dg}" />
              <!-- Local bind address -->
              <constructor-arg value="${conf.hornetq.discoveryGroup.bindingAddress:0.0.0.0}" />
              <!-- Group address -->
              <constructor-arg value="${conf.hornetq.broadcastGroup.groupAddress:231.7.7.7}" />
              <!-- Group port -->
              <constructor-arg value="${conf.hornetq.broadcastGroup.groupPort:9876}" />
              <!-- Refresh timeout -->
              <constructor-arg value="${conf.hornetq.discoveryGroup.refreshTimeout:1000}" />
           </bean>
         </entry>
      </map>
    </property>
    
    <!-- Cluster connections -->
    <property name="clusterConfigurations">
      <list>      
        <bean class="org.hornetq.core.config.ClusterConnectionConfiguration">
          <constructor-arg value="${conf.hornetq.cluster.name:HORNETQ-CLUSTER}"/>
          <!-- Address : prefix of addresses being clustered-->
          <constructor-arg value="jms"/>
          <constructor-arg value="${conf.hornetq.cluster.retryInterval:500}"/>
          <constructor-arg value="${conf.hornetq.cluster.duplicateDetection:true}"/>
          <constructor-arg value="${conf.hornetq.cluster.forwardNoConsumers:false}"/>
          <constructor-arg value="${conf.hornetq.cluster.maxHops:1}"/>
          <constructor-arg value="${conf.hornetq.cluster.confirmationWindowSize:4096}" />
          <constructor-arg value="${conf.hornetq.discoveryGroup.name:hornetq-dg}"/>
        </bean>        
      </list>
    </property>
    
  </bean>
  <!-- End Core configuration -->
  
  <!-- JMS configuration -->
  <bean name="JMSConfiguration" class="org.hornetq.jms.server.config.impl.JMSConfigurationImpl">
    <!-- Connection factories -->
    <constructor-arg>
      <list>
               
         <bean class="org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl">
            <description>Connection factory for internal use</description>
            <!-- Name -->
            <constructor-arg value="ConnectionFactory"/>
            <!-- Transport configuration -->
            <constructor-arg ref="NettyInternalConnector"/>
            <!-- Bindings (vararg, use String[]) -->             
            <constructor-arg type="java.lang.String[]">
               <list>
                  <value>/ConnectionFactory</value>
                  <value>/connectionFactory</value>
               </list>
            </constructor-arg>
            <!-- Confirmation windows size: how many bytes clients can send before waiting for server ACKs -->
            <property name="confirmationWindowSize" value="${conf.hornetq.client.cf.internal.confirmationWindowSize:8192}" />
            <!-- Consumer window size: how many bytes to pre-fetch from server when consuming -->
            <property name="consumerWindowSize" value="${conf.hornetq.client.cf.internal.consumerWindowSize:1048576}" />
         </bean>
         
         <!-- TODO: check failover settings -->
         <bean class="org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl">
            <description>Connection factory for external clients</description>
            <constructor-arg value="ExternalConnectionFactory"/>
            <constructor-arg ref="NettyExternalConnector"/>
            <constructor-arg type="java.lang.String[]">
               <list>
                  <value>/ExternalConnectionFactory</value>
                  <value>/externalConnectionFactory</value>
                  <value>/externalconnectionfactory</value>
               </list>
            </constructor-arg>
            <!-- Client-side load balancing: add discovery group name -->
            <property name="discoveryGroupName" value="${conf.hornetq.discoveryGroup.name:hornetq-dg}" />
            <property name="confirmationWindowSize" value="${conf.hornetq.client.cf.external.confirmationWindowSize:8192}" />
            <property name="consumerWindowSize" value="${conf.hornetq.client.cf.external.consumerWindowSize:1048576}" />
            <!-- Retry/reconnect params -->
            <property name="reconnectAttempts" value="${conf.hornetq.client.cf.external.reconnectAttempts:-1}" />
            <property name="retryInterval" value="${conf.hornetq.client.cf.external.retryInterval:1000}" />
            <property name="maxRetryInterval" value="${conf.hornetq.client.cf.external.maxRetryInterval:20000}" />
            <property name="retryIntervalMultiplier" value="${conf.hornetq.client.cf.external.retryIntervalMultiplier:2.0}" />
            <property name="clientFailureCheckPeriod" value="${conf.hornetq.client.cf.external.clientFailureCheckPeriod:10000}" />
            <property name="failoverOnInitialConnection" value="${conf.hornetq.client.cf.external.failoverOnInitialConnection:false}" />
            <property name="failoverOnServerShutdown" value="${conf.hornetq.client.cf.external.failoverOnServerShutdown:true}" />
         </bean>
         
      </list>
    </constructor-arg>
    <!-- Queue configurations -->
    <constructor-arg>
      <list>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <!-- Name -->
          <constructor-arg value="TaskManagerQueue" />
          <!-- Selector, if any -->
          <constructor-arg><null/></constructor-arg>
          <!-- Durable? -->
          <constructor-arg value="true" />
          <!-- Bindings (varargs) -->
          <constructor-arg value="/queue/TaskManagerQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="AsyncWorkerQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/AsyncWorkerQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="taskIO" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg type="java.lang.String[]">
            <list>
               <value>/queue/taskIO</value>
               <value>taskIO</value>
            </list>
          </constructor-arg>
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="taskrecv" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg type="java.lang.String[]">
            <list>
               <value>/queue/taskrecv</value>
               <value>taskrecv</value>
            </list>
          </constructor-arg>
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="EventDispatcherQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/EventDispatcherQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="EventGateQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/EventGateQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="HTTPGateQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/HTTPGateQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="LoopbackGateQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/LoopbackGateQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="ExternalJMSGateQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/ExternalJMSGateQueue" />
        </bean>
        <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
          <constructor-arg value="SpringGateQueue" />
          <constructor-arg><null/></constructor-arg>
          <constructor-arg value="true" />
          <constructor-arg value="/queue/SpringGateQueue" />
        </bean>
      </list>
    </constructor-arg>
    <!-- Topic configurations -->
    <constructor-arg>
      <list>
      </list>
    </constructor-arg>
    
    <property name="context">
      <bean class="javax.naming.InitialContext" />
    </property>
  </bean>  

</beans>

 

A few notes about this configuration:

 

  • The JBoss JNP JNDI is also instantiated and configured. Our code relies on JNDI to perform the lookup of JMS connection factories and destinations.
  • I don't like the Spring p-namespace for property sets, but it can save a lot of space. Check it out here
  • 3 connectors/acceptors pairs are configured: one Netty-type for internal (within JVM) use, one Netty-type for external clients, and one InVM (configured, but not currently used)
  • No backup server or backup connectors are (currently) configured
  • No topics configured (should be pretty similar to queue configuration, but using TopicConfigurationImpl)
  • There are no proper set/get methods for user and roles in org.hornetq.spi.core.security.HornetQSecurityManagerImpl, so we created a simple extension of it to ease configuration from Spring:

 

 

package es.tid.planb.hornetq.security;

import java.util.List;
import java.util.Map;

import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;

/**
 * Bean adaptor for {@link HornetQSecurityManagerImpl}
 * 
 * @author Eduardo Corral
 * @since 1.2
 */
public class HornetQSecurityManagerBean extends HornetQSecurityManagerImpl
{

    public HornetQSecurityManagerBean() {
        super();
    }
    
    /**
     * @param users Map of user and passwords
     * @param roles Map of user and list of roles
     */
    public HornetQSecurityManagerBean(Map<String, String> users, Map<String, List<String>> roles) {
        super();
        if (users != null && !users.isEmpty()) {
            for (Map.Entry<String, String> userEntry : users.entrySet()) {
                addUser(userEntry.getKey(), userEntry.getValue());
                if (roles.containsKey(userEntry.getKey())) {
                    for (String role : roles.get(userEntry.getKey())) {
                        addRole(userEntry.getKey(), role);
                    }
                }
            }
        }
    }
}