Topic messages not being delivered to all subscribers in cluster
mackerman Nov 14, 2011 4:46 PMHi, I have a scenario where I have 2 servers in a cluster, one topic, 2 producers, one on each server, and 2 consumers, one on each server. When I send a message to the topic on the first server, the consumer on the first server receives the message, but not the consumer on the second server. When I send a message to the topic on the second server, both consumers receive the message.
I am setting up the cluster using static connectors; the second server connects to the first server to establish the bridge, but from the behaviour above, it appears that the connection from the first server to the second is not being established or used correctly.
I need to be able to establish a cluster in a situation where not all hosts are known initially, so i'm relying on HornetQ to propagate cluster connections as they are established. According to the user manual, section 38.2.2:
Note that the consumers are hornetQ rest consumers.
My environment is HornetQ 2.2.7 & HornetQ Rest 2.2.6. I am running in JBoss AS7, though no messaging configuration is done in the AS7 standalone (or domain) configuration files due to problems getting Rest working with that configuration. In this configuration JNDI is not being used.
Here are snippets from my configuration files & source. In the scenario outlined above, the first server is 10.2.62.28 & the second 10.2.62.27.
hornetq-configuration.xml (note that the inVM connectors & acceptors are needed by hornetQ rest)
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<persistence-enabled>false</persistence-enabled>
<clustered>true</clustered>
<connectors>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
</connector>
<connector name="netty-connector">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="port" value="5445"/>
</connector>
<connector name="netty-connector-28">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="port" value="5445"/>
<param key="host" value="10.2.62.28" />
</connector>
<connector name="netty-connector-27">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="port" value="5445"/>
<param key="host" value="10.2.62.27" />
</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
<acceptor name="netty-acceptor">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="port" value="5445"/>
<param key="host" value="0.0.0.0" />
</acceptor>
</acceptors>
<cluster-connections>
<cluster-connection name="bob-cluster">
<address>jms</address>
<connector-ref>netty-connector-28</connector-ref>
<retry-interval>1000</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>netty-connector-27</connector-ref>
<connector-ref>netty-connector-28</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector" />
</connectors>
<entries>
<entry name="ConnectionFactory" />
</entries>
</connection-factory>
<topic name="asyncEvents">
<entry name="/topic/asyncEvents"/>
</topic>
<queue name="apiRequests">
<entry name="/queue/apiRequests"/>
</queue>
</configuration>
The code:
ConnectionFactory cf = (ConnectionFactory)getRegistry().lookup("ConnectionFactory"); | |||
Topic topic = (Topic)getRegistry().lookup("/topic/asyncEvents"); | |||
Connection connection = cf.createConnection(); | |||
connection.start(); | |||
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); | |||
producer = session.createProducer(topic); |
producer.setDisableMessageID(true); | ||
producer.setDisableMessageTimestamp(true); |
ObjectMessage message = session.createObjectMessage(); | |||||||||||
message.setStringProperty(org.hornetq.rest.HttpHeaderProperty.CONTENT_TYPE, | MediaType.APPLICATION_JSON); | ||||||||||
String strmsg = MessageSerializer.toJson(messages.get(confId)); | |||||||||||
message.setObject(strmsg); | |||||||||||
message.setStringProperty("confId", confId); | |||||||||||
producer.send(message); | |||||||||||
The console on server1 shows the bridge being setup:
13:33:30,103 INFO [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads646299910-484057683)) Connecting bridge sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31 to its destination [dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]
13:33:30,171 INFO [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads646299910-484057683)) Bridge sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31 is connected [dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba-> sf.bob-cluster.6753eaf9-0b1f-11e1-86b2-0025b3289f31]
The console on server2 also shows a bridge being setup:
13:21:23,689 INFO [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads1495441375-1380431624)) Connecting bridge sf.bob-cluster.dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba to its destination [6753eaf9-0b1f-11e1-86b2-0025b3289f31]
13:21:23,798 INFO [org.hornetq.core.server.cluster.impl.BridgeImpl] (Thread-1 (group:HornetQ-server-threads1495441375-1380431624)) Bridge sf.bob-cluster.dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba is connected [6753eaf9-0b1f-11e1-86b2-0025b3289f31-> sf.bob-cluster.dd89d0b0-f9dd-11e0-aa73-0025b3d1b2ba]
Does anyone have any idea why this is not working as I need, and how to fix it?
thanks, Mitchell