I've been testing HornetQ 2.1.2.Final for a few days now, and I needed to finally integrate it with our Java SE symmetric cluster server. We're using Spring 3 IOC to instantiate and configure a few components, and we wanted to do the same with HornetQ, as migrating our code to a JBoss AS solution is not an option.
The following configuration embeds a symmetric clustered instance of HornetQ into our server:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd" default-init-method="init" default-destroy-method="destroy"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <description>Loads properties from multiple sources, with fallback capabilities</description> <property name="locations"> <list> <value>file:${basePath}/etc/hornetq.properties</value> </list> </property> <property name="ignoreUnresolvablePlaceholders" value="true" /> <property name="ignoreResourceNotFound" value="true" /> <!-- Fallback to these properties if not found in locations or system properties --> <property name="systemPropertiesMode"> <util:constant static-field="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer.SYSTEM_PROPERTIES_MODE_OVERRIDE" /> </property> <property name="properties"> <props> <!-- Default values: will be used if not present in properties file, or properties file is not found --> <prop key="hornetq.runpath">${basePath}/run/hornetq</prop> <prop key="data.dir">${hornetq.runpath}</prop> </props> </property> </bean> <!-- JNDI Server Configuration ########################################## --> <bean name="Naming" class="org.jnp.server.NamingBeanImpl" init-method="start" destroy-method="stop"> <!-- If true, it performs a java:/comp/env context lookup... and the server is not ready --> <property name="installJavaComp" value="false" /> </bean> <bean name="JNDIServer" class="org.jnp.server.Main" init-method="start" destroy-method="stop"> <property name="namingInfo" ref="Naming" /> <property name="port" value="${conf.jndi.port:2099}" /> <property name="bindAddress" value="${conf.jndi.bindingAddress:0.0.0.0}" /> <property name="rmiPort" value="${conf.jndi.rmi.port:2098}" /> <property name="rmiBindAddress" value="${conf.jndi.rmi.bindingAddress:0.0.0.0}" /> </bean> <bean name="MBeanServer" class="java.lang.management.ManagementFactory" factory-method="getPlatformMBeanServer" /> <!-- END JNDI Server Configuration ###################################### --> <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl"> <description>HornetQ Core Server</description> <constructor-arg ref="Configuration" /> <constructor-arg ref="MBeanServer" /> <constructor-arg ref="HornetQSecurityManager" /> </bean> <!-- Note init/destroy methods set as start/stop here, and not in Core Server, to avoid the latter starting before JMS Server --> <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl" init-method="start" destroy-method="stop"> <description>HornetQ JMS Server</description> <constructor-arg ref="HornetQServer" /> <constructor-arg ref="JMSConfiguration" /> </bean> <!-- Security manager configuration: add user/roles here --> <!-- HornetQSecurityManagerBean just extends org.hornetq.spi.core.security.HornetQSecurityManagerImpl adding a constructor to ease user/roles configuration from Spring --> <bean name="HornetQSecurityManager" class="es.tid.planb.hornetq.security.HornetQSecurityManagerBean"> <constructor-arg> <!-- User/pass map --> <map> <entry key="guest" value="guest" /> </map> </constructor-arg> <constructor-arg> <!-- User/roles map --> <map> <entry key="guest"> <!-- Roles for user --> <list> <value>guest</value> </list> </entry> </map> </constructor-arg> <property name="defaultUser" value="guest" /> </bean> <!-- CONNECTORS : shared between HornetQ core and JMS servers --> <bean name="NettyInternalConnector" class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" /> <constructor-arg> <map> <entry key="host" value="${conf.hornetq.acceptor.internal.host:localhost}"/> <entry key="port" value="${conf.hornetq.acceptor.internal.port:5446}"/> <entry key="use-nio" value="${conf.hornetq.acceptor.internal.useNIO:true}"/> </map> </constructor-arg> </bean> <bean name="NettyExternalConnector" class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" /> <constructor-arg> <map> <entry key="host" value="${conf.hornetq.acceptor.external.host:0.0.0.0}"/> <entry key="port" value="${conf.hornetq.acceptor.external.port:5445}"/> <entry key="use-nio" value="${conf.hornetq.acceptor.external.useNIO:true}"/> </map> </constructor-arg> </bean> <bean name="NettyInVMConnector" class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.invm.InVMConnectorFactory" /> <constructor-arg> <map> <entry key="server-id" value="${conf.hornetq.acceptor.invm.serverId:0}"/> </map> </constructor-arg> </bean> <!-- END CONNECTORS --> <!-- Common HornetQ configuration --> <bean name="Configuration" class="org.hornetq.core.config.impl.ConfigurationImpl"> <property name="clustered" value="true" /> <property name="persistenceEnabled" value="${conf.hornetq.persistenceEnabled:true}" /> <property name="JMXManagementEnabled" value="${conf.hornetq.jmxEnabled:true}" /> <property name="pagingDirectory" value="${hornetq.runpath}/paging" /> <property name="bindingsDirectory" value="${hornetq.runpath}/bindings" /> <property name="largeMessagesDirectory" value="${hornetq.runpath}/large-messages" /> <property name="journalDirectory" value="${hornetq.runpath}/journal" /> <property name="journalMinFiles" value="${conf.hornetq.journalMinFiles:10}" /> <property name="journalType"> <util:constant static-field="${conf.hornetq.journal.type:org.hornetq.core.server.JournalType.NIO}" /> </property> <property name="clusterUser" value="HORNETQ-CLUSTER-USER" /> <property name="clusterPassword" value="HORNETQ-CLUSTER-PASS" /> <property name="threadPoolMaxSize" value="${conf.hornetq.threadPoolMaxSize:-1}" /> <!-- Security settings --> <!-- Disable security to get a small performance boost --> <property name="securityEnabled" value="${conf.hornetq.securityEnabled:false}" /> <property name="securityRoles"> <map> <!-- Entry keys are the matching parameters --> <entry key="#"> <set> <bean class="org.hornetq.core.security.Role"> <!-- Name --> <constructor-arg value="guest" /> <!-- send? --> <constructor-arg value="true" /> <!-- consume? --> <constructor-arg value="true" /> <!-- create durable queue? --> <constructor-arg value="false" /> <!-- delete durable queue? --> <constructor-arg value="false" /> <!-- create non-durable queue? --> <constructor-arg value="true" /> <!-- delete non-durable queue? --> <constructor-arg value="true" /> <!-- manage? --> <constructor-arg value="false" /> </bean> </set> </entry> </map> </property> <!-- Addresses settings --> <property name="addressesSettings"> <map> <!-- Entry keys are the matching parameters --> <entry key="#"> <bean class="org.hornetq.core.settings.impl.AddressSettings"> <property name="deadLetterAddress" value="jms.queue.DLQ" /> <property name="expiryAddress" value="jms.queue.ExpiryQueue" /> <!-- In milliseconds --> <property name="redeliveryDelay" value="${conf.hornetq.addresses.redeliveryDelay:0}" /> <property name="maxSizeBytes" value="${conf.hornetq.addresses.maxSizeBytes:20971520}" /> <property name="messageCounterHistoryDayLimit" value="${conf.hornetq.addresses.dayLimit:10}" /> <property name="addressFullMessagePolicy"> <util:constant static-field="org.hornetq.core.settings.impl.AddressFullMessagePolicy.PAGE" /> </property> </bean> </entry> </map> </property> <!-- Already defined as named beans (see above) --> <property name="connectorConfigurations"> <map> <entry key="netty-internal"><ref bean="NettyInternalConnector" /></entry> <entry key="netty-external"><ref bean="NettyExternalConnector" /></entry> <entry key="netty-invm"><ref bean="NettyInVMConnector" /></entry> </map> </property> <!-- Acceptors: match ports with connectors --> <property name="acceptorConfigurations"> <set> <bean class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory" /> <constructor-arg> <map> <entry key="host" value="${conf.hornetq.acceptor.internal.host:localhost}"/> <entry key="port" value="${conf.hornetq.acceptor.internal.port:5446}"/> <entry key="use-nio" value="${conf.hornetq.acceptor.internal.useNIO:true}"/> <entry key="direct-deliver" value="${conf.hornetq.acceptor.internal.directDeliver:false}" /> </map> </constructor-arg> <!-- Name --> <constructor-arg value="netty-internal" /> </bean> <bean class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory" /> <constructor-arg> <map> <entry key="host" value="${conf.hornetq.acceptor.external.host:0.0.0.0}"/> <entry key="port" value="${conf.hornetq.acceptor.external.port:5445}"/> <entry key="use-nio" value="${conf.hornetq.acceptor.external.useNIO:true}"/> <entry key="direct-deliver" value="${conf.hornetq.acceptor.external.directDeliver:false}" /> </map> </constructor-arg> <!-- Name --> <constructor-arg value="netty-external" /> </bean> <bean class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory" /> <constructor-arg> <map> <entry key="server-id" value="${conf.hornetq.acceptor.invm.serverId:0}"/> </map> </constructor-arg> <!-- Name --> <constructor-arg value="netty-invm" /> </bean> </set> </property> <!-- Broadcast groups --> <property name="broadcastGroupConfigurations"> <list> <bean class="org.hornetq.core.config.BroadcastGroupConfiguration"> <!-- Group name --> <constructor-arg value="${conf.hornetq.broadcastGroup.name:hornetq-bg}" /> <!-- Local bind address --> <constructor-arg value="${conf.hornetq.broadcastGroup.bindingAddress:0.0.0.0}" /> <!-- Local bind port --> <constructor-arg value="${conf.hornetq.broadcastGroup.bindingPort:9875}" /> <!-- Group address --> <constructor-arg value="${conf.hornetq.broadcastGroup.groupAddress:231.7.7.7}" /> <!-- Group port --> <constructor-arg value="${conf.hornetq.broadcastGroup.groupPort:9876}" /> <!-- Broadcast period --> <constructor-arg value="${conf.hornetq.broadcastGroup.broadcastPeriod:5000}" /> <!-- Connector infos --> <constructor-arg> <list> <!-- A list of connector-backup connector name pairs --> <bean class="org.hornetq.api.core.Pair"> <constructor-arg value="netty-external" /> <constructor-arg><null/></constructor-arg> </bean> </list> </constructor-arg> </bean> </list> </property> <!-- Discovery groups --> <property name="discoveryGroupConfigurations"> <map> <!-- Match map key with name constructor arg --> <entry key="${conf.hornetq.discoveryGroup.name}"> <bean class="org.hornetq.core.config.DiscoveryGroupConfiguration"> <!-- Group name --> <constructor-arg value="${conf.hornetq.discoveryGroup.name:hornetq-dg}" /> <!-- Local bind address --> <constructor-arg value="${conf.hornetq.discoveryGroup.bindingAddress:0.0.0.0}" /> <!-- Group address --> <constructor-arg value="${conf.hornetq.broadcastGroup.groupAddress:231.7.7.7}" /> <!-- Group port --> <constructor-arg value="${conf.hornetq.broadcastGroup.groupPort:9876}" /> <!-- Refresh timeout --> <constructor-arg value="${conf.hornetq.discoveryGroup.refreshTimeout:1000}" /> </bean> </entry> </map> </property> <!-- Cluster connections --> <property name="clusterConfigurations"> <list> <bean class="org.hornetq.core.config.ClusterConnectionConfiguration"> <constructor-arg value="${conf.hornetq.cluster.name:HORNETQ-CLUSTER}"/> <!-- Address : prefix of addresses being clustered--> <constructor-arg value="jms"/> <constructor-arg value="${conf.hornetq.cluster.retryInterval:500}"/> <constructor-arg value="${conf.hornetq.cluster.duplicateDetection:true}"/> <constructor-arg value="${conf.hornetq.cluster.forwardNoConsumers:false}"/> <constructor-arg value="${conf.hornetq.cluster.maxHops:1}"/> <constructor-arg value="${conf.hornetq.cluster.confirmationWindowSize:4096}" /> <constructor-arg value="${conf.hornetq.discoveryGroup.name:hornetq-dg}"/> </bean> </list> </property> </bean> <!-- End Core configuration --> <!-- JMS configuration --> <bean name="JMSConfiguration" class="org.hornetq.jms.server.config.impl.JMSConfigurationImpl"> <!-- Connection factories --> <constructor-arg> <list> <bean class="org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl"> <description>Connection factory for internal use</description> <!-- Name --> <constructor-arg value="ConnectionFactory"/> <!-- Transport configuration --> <constructor-arg ref="NettyInternalConnector"/> <!-- Bindings (vararg, use String[]) --> <constructor-arg type="java.lang.String[]"> <list> <value>/ConnectionFactory</value> <value>/connectionFactory</value> </list> </constructor-arg> <!-- Confirmation windows size: how many bytes clients can send before waiting for server ACKs --> <property name="confirmationWindowSize" value="${conf.hornetq.client.cf.internal.confirmationWindowSize:8192}" /> <!-- Consumer window size: how many bytes to pre-fetch from server when consuming --> <property name="consumerWindowSize" value="${conf.hornetq.client.cf.internal.consumerWindowSize:1048576}" /> </bean> <!-- TODO: check failover settings --> <bean class="org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl"> <description>Connection factory for external clients</description> <constructor-arg value="ExternalConnectionFactory"/> <constructor-arg ref="NettyExternalConnector"/> <constructor-arg type="java.lang.String[]"> <list> <value>/ExternalConnectionFactory</value> <value>/externalConnectionFactory</value> <value>/externalconnectionfactory</value> </list> </constructor-arg> <!-- Client-side load balancing: add discovery group name --> <property name="discoveryGroupName" value="${conf.hornetq.discoveryGroup.name:hornetq-dg}" /> <property name="confirmationWindowSize" value="${conf.hornetq.client.cf.external.confirmationWindowSize:8192}" /> <property name="consumerWindowSize" value="${conf.hornetq.client.cf.external.consumerWindowSize:1048576}" /> <!-- Retry/reconnect params --> <property name="reconnectAttempts" value="${conf.hornetq.client.cf.external.reconnectAttempts:-1}" /> <property name="retryInterval" value="${conf.hornetq.client.cf.external.retryInterval:1000}" /> <property name="maxRetryInterval" value="${conf.hornetq.client.cf.external.maxRetryInterval:20000}" /> <property name="retryIntervalMultiplier" value="${conf.hornetq.client.cf.external.retryIntervalMultiplier:2.0}" /> <property name="clientFailureCheckPeriod" value="${conf.hornetq.client.cf.external.clientFailureCheckPeriod:10000}" /> <property name="failoverOnInitialConnection" value="${conf.hornetq.client.cf.external.failoverOnInitialConnection:false}" /> <property name="failoverOnServerShutdown" value="${conf.hornetq.client.cf.external.failoverOnServerShutdown:true}" /> </bean> </list> </constructor-arg> <!-- Queue configurations --> <constructor-arg> <list> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <!-- Name --> <constructor-arg value="TaskManagerQueue" /> <!-- Selector, if any --> <constructor-arg><null/></constructor-arg> <!-- Durable? --> <constructor-arg value="true" /> <!-- Bindings (varargs) --> <constructor-arg value="/queue/TaskManagerQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="AsyncWorkerQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/AsyncWorkerQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="taskIO" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg type="java.lang.String[]"> <list> <value>/queue/taskIO</value> <value>taskIO</value> </list> </constructor-arg> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="taskrecv" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg type="java.lang.String[]"> <list> <value>/queue/taskrecv</value> <value>taskrecv</value> </list> </constructor-arg> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="EventDispatcherQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/EventDispatcherQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="EventGateQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/EventGateQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="HTTPGateQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/HTTPGateQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="LoopbackGateQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/LoopbackGateQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="ExternalJMSGateQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/ExternalJMSGateQueue" /> </bean> <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl"> <constructor-arg value="SpringGateQueue" /> <constructor-arg><null/></constructor-arg> <constructor-arg value="true" /> <constructor-arg value="/queue/SpringGateQueue" /> </bean> </list> </constructor-arg> <!-- Topic configurations --> <constructor-arg> <list> </list> </constructor-arg> <property name="context"> <bean class="javax.naming.InitialContext" /> </property> </bean> </beans>
A few notes about this configuration:
- The JBoss JNP JNDI is also instantiated and configured. Our code relies on JNDI to perform the lookup of JMS connection factories and destinations.
- I don't like the Spring p-namespace for property sets, but it can save a lot of space. Check it out here
- 3 connectors/acceptors pairs are configured: one Netty-type for internal (within JVM) use, one Netty-type for external clients, and one InVM (configured, but not currently used)
- No backup server or backup connectors are (currently) configured
- No topics configured (should be pretty similar to queue configuration, but using TopicConfigurationImpl)
- There are no proper set/get methods for user and roles in org.hornetq.spi.core.security.HornetQSecurityManagerImpl, so we created a simple extension of it to ease configuration from Spring:
package es.tid.planb.hornetq.security; import java.util.List; import java.util.Map; import org.hornetq.spi.core.security.HornetQSecurityManagerImpl; /** * Bean adaptor for {@link HornetQSecurityManagerImpl} * * @author Eduardo Corral * @since 1.2 */ public class HornetQSecurityManagerBean extends HornetQSecurityManagerImpl { public HornetQSecurityManagerBean() { super(); } /** * @param users Map of user and passwords * @param roles Map of user and list of roles */ public HornetQSecurityManagerBean(Map<String, String> users, Map<String, List<String>> roles) { super(); if (users != null && !users.isEmpty()) { for (Map.Entry<String, String> userEntry : users.entrySet()) { addUser(userEntry.getKey(), userEntry.getValue()); if (roles.containsKey(userEntry.getKey())) { for (String role : roles.get(userEntry.getKey())) { addRole(userEntry.getKey(), role); } } } } } }
Comments