4 Replies Latest reply on Mar 27, 2015 10:04 AM by pragalathan

    How to configure HornetQ client with standalone server cluster (configured using JGroups TCP)

    pragalathan

      I have configured 2 hornetq standalone servers in clustered mode using groups (tcp) as i cant use default UDP. Below is the configuration.

       

      hornetq-configuration.xml:

       

          <broadcast-groups>
              <broadcast-group name="bg-group1">
                  <jgroups-file>jgroups-tcp.xml</jgroups-file>
                  <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
                  <connector-ref>netty</connector-ref>
              </broadcast-group>
          </broadcast-groups>
      
          <discovery-groups>
              <discovery-group name="dg-group1">
                  <jgroups-file>jgroups-tcp.xml</jgroups-file>
                  <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
                  <refresh-timeout>10000</refresh-timeout>
              </discovery-group>
          </discovery-groups>
      

      Jgroups.xml:

       

      <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="urn:org:jgroups"
              xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
          <TCP bind_port="7800"
               recv_buf_size="${tcp.recv_buf_size:5M}"
               send_buf_size="${tcp.send_buf_size:5M}"
               max_bundle_size="64K"
               max_bundle_timeout="30"
               use_send_queues="true"
               sock_conn_timeout="300"
      
               timer_type="new3"
               timer.min_threads="4"
               timer.max_threads="10"
               timer.keep_alive_time="3000"
               timer.queue_max_size="500"
      
               thread_pool.enabled="true"
               thread_pool.min_threads="2"
               thread_pool.max_threads="8"
               thread_pool.keep_alive_time="5000"
               thread_pool.queue_enabled="true"
               thread_pool.queue_max_size="10000"
               thread_pool.rejection_policy="discard"
      
               oob_thread_pool.enabled="true"
               oob_thread_pool.min_threads="1"
               oob_thread_pool.max_threads="8"
               oob_thread_pool.keep_alive_time="5000"
               oob_thread_pool.queue_enabled="false"
               oob_thread_pool.queue_max_size="100"
               oob_thread_pool.rejection_policy="discard"/>
      
          <TCPPING
                   initial_hosts="${jgroups.tcpping.initial_hosts:hornetq-server1-ip[7800], hornetq-server1-ip[7900], hornetq-server2-ip[7800], hornetq-server2-ip[7900]}"
                   port_range="1"/>
          <MERGE3  min_interval="10000"
                   max_interval="30000"/>
          <FD_SOCK/>
          <FD timeout="3000" max_tries="3" />
          <VERIFY_SUSPECT timeout="1500"  />
          <BARRIER />
          <pbcast.NAKACK2 use_mcast_xmit="false"
                         discard_delivered_msgs="true"/>
          <UNICAST3 />
          <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                         max_bytes="4M"/>
          <pbcast.GMS print_local_addr="true" join_timeout="2000"
                      view_bundling="true"/>
          <MFC max_credits="2M"
               min_threshold="0.4"/>
          <FRAG2 frag_size="60K"  />
          <!--RSVP resend_interval="2000" timeout="10000"/-->
          <pbcast.STATE_TRANSFER/>
      </config> 
      

       

      Servers work fine i.e., if the live goes down, backup takes its place.

       

      Client producer:

       

       

          TransportConfiguration[] servers = new TransportConfiguration[2];
          List<Configuration> configurations = ... // user defined class
          for (int i = 0; i < configurations.size(); i++) {
              Map<String, Object> map = new HashMap<>();
              map.put("host", configurations.get(i).getHost());
              map.put("port", configurations.get(i).getPort());
              servers[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
          }
          ServerLocator locator = HornetQClient.createServerLocatorWithHA(servers);
          locator.setReconnectAttempts(5);
      
          factory = locator.createSessionFactory();
          session = factory.createSession();
          producer = session.createProducer(queueName); 
      

       

      Client Consumer:

          ClientSessionFactory factory = locator.createSessionFactory();
          for (int i = 1; i <= nReceivers; i++) {
              ClientSession session = factory.createSession(true, true, 1);
              sessions.add(session);
              if (i == 1) {
                  Thread.sleep(10000); // waiting to download cluster information
              }
              session.start();
              ClientConsumer consumer = session.createConsumer(queueName);
              consumer.setMessageHandler(handler);
          }
      

      Issue:

       

      1. Client (producer) doesnt automatically fall back if the server connected to, goes down,  while sending messages.
      2. The sessions created using same client factory is always connecting to one server (as opposed to documentation http://docs.jboss.org/hornetq/2.3.0.beta1/docs/user-manual/html/clusters.html#clusters.client.loadbalancing)

       

      So it seems the client never gets the cluster information. I also dont find any documentation for configuring a client to use jgroups (needed?) to connect to a hornetq cluster.

       

      Any help is appreciated.

        • 1. Re: How to configure HornetQ client with standalone server cluster (configured using JGroups TCP)
          jbertram

          Couple of things:

          1. If you can't use UDP for clustering it's typically best just to use a static clustering configuration (look near the end of section 38.3.1).  It's quite a bit simpler (and certainly less verbose) than using JGroups.
          2. Fail-over and load-balancing are 100% independent of each other in HornetQ.  Fail-over is supported by a live-backup pair (i.e. active-passive configuration).  Load-balancing is supported by > 1 live servers clustered together.  It's not clear to me exactly what you have configured so I don't know if you should expect fail-over to occur or not.  Please attach your full server configuration to help clarify.
          • 2. Re: How to configure HornetQ client with standalone server cluster (configured using JGroups TCP)
            pragalathan

            Thanks Justin Bertram for helping me.

             

            I would like to create this configuration (live & backup) as a package and deploy it in multiple environments in my organization. Hence i need to use UDP or JGroups to make the server IPs/names dynamic depending on where they are installed. Since we are on cloud, UDP is disabled. Hence im using JGroups with TCP.

             

            Currently i have two live servers and two replicated backup servers. Each machine is hosting one live and one backup. They are working fine. i.e. if a live server goes down then its backup takes its place automatically. So on server side, both fail over and load balancing (automatic distribution of msgs) work fine.

             

            What is not working:

            1. If a live goes down when a client is sending message, then sending of the subsequent messages fails on the client side. How do I make it transparent to client applications, so that client sends the subsequent messages to backup server (that replaced the live server just now) without asking the application code/user to connect to backup server?
            2. I also would like to have a client side load balancing of messages while receiving messages. However the sessions created on the client side, are not connected to all the servers. They are always connected to the first server (http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html/clusters.html#clusters.cluster-connections).

             

             

            live hornetq-configuration.xml

            
            
            <configuration xmlns="urn:hornetq"
                          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                          xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
            
                <paging-directory>${data.dir:../data}/paging</paging-directory>
            
                <bindings-directory>${data.dir:../data}/bindings</bindings-directory>
            
                <journal-directory>${data.dir:../data}/journal</journal-directory>
            
                <journal-min-files>10</journal-min-files>
            
                <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
            
                <shared-store>false</shared-store>
                <failover-on-shutdown>true</failover-on-shutdown>
                <check-for-live-server>true</check-for-live-server>
            
                <connectors>
                    <connector name="netty">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                    </connector>
            
                    <connector name="netty-throughput">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                        <param key="batch-delay" value="50"/>
                    </connector>
                </connectors>
            
                <acceptors>
                    <acceptor name="netty">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                    </acceptor>
            
                    <acceptor name="netty-throughput">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </acceptor>
                </acceptors>
            
                <broadcast-groups>
                    <broadcast-group name="bg-group1">
                        <jgroups-file>jgroups-tcp.xml</jgroups-file>
                        <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
                        <connector-ref>netty</connector-ref>
                    </broadcast-group>
                </broadcast-groups>
            
                <discovery-groups>
                    <discovery-group name="dg-group1">
                        <jgroups-file>jgroups-tcp.xml</jgroups-file>
                        <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
                        <refresh-timeout>10000</refresh-timeout>
                    </discovery-group>
                </discovery-groups>
            
                <cluster-user>myClusterUser</cluster-user>
                <cluster-password>67137c70c1</cluster-password>
            
                <cluster-connections>
                    <cluster-connection name="myQCluster">
                        <address>jms</address>
                        <connector-ref>netty</connector-ref>
                        <check-period>1000</check-period>
                        <connection-ttl>5000</connection-ttl>
                        <min-large-message-size>50000</min-large-message-size>
                        <call-timeout>5000</call-timeout>
                        <retry-interval>500</retry-interval>
                        <retry-interval-multiplier>1.0</retry-interval-multiplier>
                        <max-retry-interval>5000</max-retry-interval>
                        <reconnect-attempts>-1</reconnect-attempts>
                        <use-duplicate-detection>true</use-duplicate-detection>
                        <forward-when-no-consumers>true</forward-when-no-consumers>
                        <max-hops>2</max-hops>
                        <confirmation-window-size>32000</confirmation-window-size>
                        <call-failover-timeout>30000</call-failover-timeout>
                        <notification-interval>1000</notification-interval>
                        <notification-attempts>2</notification-attempts>
                        <discovery-group-ref discovery-group-name="dg-group1"/>
                    </cluster-connection>
            
                </cluster-connections>
            
                <grouping-handler name="my-grouping-handler">
                    <type>LOCAL</type>
                    <address>jms</address>
                    <timeout>5000</timeout>
                </grouping-handler>
            
                <security-settings>
                    <security-setting match="#">
                        <permission type="createNonDurableQueue" roles="admin"/>
                        <permission type="deleteNonDurableQueue" roles="admin"/>
                        <permission type="createDurableQueue" roles="admin"/>
                        <permission type="deleteDurableQueue" roles="admin"/>
                        <permission type="consume" roles="admin,guest"/>
                        <permission type="send" roles="admin,guest"/>
                    </security-setting>
            
                    <security-setting match="jms.queue.hornetq.#">
                        <permission type="manage" roles="admin"/>
                        <permission type="createNonDurableQueue" roles="admin"/>
                        <permission type="deleteNonDurableQueue" roles="admin"/>
                        <permission type="createDurableQueue" roles="admin"/>
                        <permission type="deleteDurableQueue" roles="admin"/>
                        <permission type="consume" roles="admin"/>
                        <permission type="send" roles="admin"/>
                    </security-setting>
                   
                    <security-setting match="jms.#">
                        <redistribution-delay>10000</redistribution-delay>
                        <permission type="manage" roles="admin"/>
                        <permission type="createNonDurableQueue" roles="admin"/>
                        <permission type="deleteNonDurableQueue" roles="admin"/>
                        <permission type="createDurableQueue" roles="admin"/>
                        <permission type="deleteDurableQueue" roles="admin"/>
                        <permission type="consume" roles="admin"/>
                        <permission type="send" roles="admin"/>
                    </security-setting>
            
                </security-settings>
            
                <address-settings>
                    <!--default for catch all-->
                    <address-setting match="#">
                        <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                        <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                        <redelivery-delay>0</redelivery-delay>
                        <max-size-bytes>10485760</max-size-bytes>
                        <message-counter-history-day-limit>10</message-counter-history-day-limit>
                        <address-full-policy>BLOCK</address-full-policy>
                    </address-setting>
                    <address-setting match="jms.#">
                        <redistribution-delay>5000</redistribution-delay>
                    </address-setting>
                </address-settings>
            </configuration>
            
            
            

             

            backup hornetq-configuration.xml

            <configuration xmlns="urn:hornetq"
                           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                           xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
               
                <paging-directory>${data.dir:../data}/paging-backup</paging-directory>
               
                <bindings-directory>${data.dir:../data}/bindings-backup</bindings-directory>
               
                <journal-directory>${data.dir:../data}/journal-backup</journal-directory>
               
                <journal-min-files>10</journal-min-files>
               
                <large-messages-directory>${data.dir:../data}/large-messages-backup</large-messages-directory>
            
                <shared-store>false</shared-store>
                <backup>true</backup>
                <allow-failback>true</allow-failback>
               
                <connectors>      
                    <connector name="netty">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                    </connector>
                  
                    <connector name="netty-throughput">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                        <param key="batch-delay" value="50"/>
                    </connector>
                </connectors>
            
                <acceptors>
                    <acceptor name="netty">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                    </acceptor>
                  
                    <acceptor name="netty-throughput">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                        <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </acceptor>
                </acceptors>
            
                <broadcast-groups>
                    <broadcast-group name="bg-group1">
                        <jgroups-file>jgroups-tcp.xml</jgroups-file>
                        <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
                        <connector-ref>netty</connector-ref>
                    </broadcast-group>
                </broadcast-groups>
            
                <discovery-groups>
                    <discovery-group name="dg-group1">
                        <jgroups-file>jgroups-tcp.xml</jgroups-file>
                        <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
                        <refresh-timeout>10000</refresh-timeout>
                    </discovery-group>
                </discovery-groups>
               
                <cluster-user>myClusterUser</cluster-user>
                <cluster-password>67137c70c1</cluster-password>
                  
                <cluster-connections>
                    <cluster-connection name="myQCluster">
                        <address>jms</address>
                        <connector-ref>netty</connector-ref>
                        <check-period>1000</check-period>
                        <connection-ttl>5000</connection-ttl>
                        <min-large-message-size>50000</min-large-message-size>
                        <call-timeout>5000</call-timeout>
                        <retry-interval>500</retry-interval>
                        <retry-interval-multiplier>1.0</retry-interval-multiplier>
                        <max-retry-interval>5000</max-retry-interval>
                        <reconnect-attempts>-1</reconnect-attempts>
                        <use-duplicate-detection>true</use-duplicate-detection>
                        <forward-when-no-consumers>true</forward-when-no-consumers>
                        <max-hops>2</max-hops>
                        <confirmation-window-size>32000</confirmation-window-size>
                        <call-failover-timeout>30000</call-failover-timeout>
                        <notification-interval>1000</notification-interval>
                        <notification-attempts>2</notification-attempts>
                        <discovery-group-ref discovery-group-name="dg-group1"/>
                    </cluster-connection>
               
                </cluster-connections>
               
                <grouping-handler name="my-grouping-handler">
                    <type>LOCAL</type>
                    <address>jms</address>
                    <timeout>5000</timeout>
                </grouping-handler>
            
                <security-settings>
                    <security-setting match="#">
                        <permission type="createNonDurableQueue" roles="admin"/>
                        <permission type="deleteNonDurableQueue" roles="admin"/>
                        <permission type="createDurableQueue" roles="admin"/>
                        <permission type="deleteDurableQueue" roles="admin"/>
                        <permission type="consume" roles="admin"/>
                        <permission type="send" roles="admin"/>
                    </security-setting>
                 
                    <security-setting match="jms.queue.hornetq.#">
                        <permission type="manage" roles="admin"/>
                        <permission type="createNonDurableQueue" roles="admin"/>
                        <permission type="deleteNonDurableQueue" roles="admin"/>
                        <permission type="createDurableQueue" roles="admin"/>
                        <permission type="deleteDurableQueue" roles="admin"/>
                        <permission type="consume" roles="admin"/>
                        <permission type="send" roles="admin"/>
                    </security-setting>
                    
                    <security-setting match="jms.#">
                        <redistribution-delay>10000</redistribution-delay>
                        <permission type="manage" roles="admin"/>
                        <permission type="createNonDurableQueue" roles="admin"/>
                        <permission type="deleteNonDurableQueue" roles="admin"/>
                        <permission type="createDurableQueue" roles="admin"/>
                        <permission type="deleteDurableQueue" roles="admin"/>
                        <permission type="consume" roles="admin"/>
                        <permission type="send" roles="admin"/>
                    </security-setting>
                    
                </security-settings>
            
                <address-settings>
                    <!--default for catch all-->
                    <address-setting match="#">
                        <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                        <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                        <redelivery-delay>0</redelivery-delay>
                        <max-size-bytes>10485760</max-size-bytes>       
                        <message-counter-history-day-limit>10</message-counter-history-day-limit>
                        <address-full-policy>BLOCK</address-full-policy>
                    </address-setting>
                    <address-setting match="jms.#">
                        <redistribution-delay>5000</redistribution-delay>
                    </address-setting>
                </address-settings>
            </configuration>
            
            
            
            • 3. Re: How to configure HornetQ client with standalone server cluster (configured using JGroups TCP)
              jbertram

              I would like to create this configuration (live & backup) as a package and deploy it in multiple environments in my organization. Hence i need to use UDP or JGroups to make the server IPs/names dynamic depending on where they are installed. Since we are on cloud, UDP is disabled. Hence im using JGroups with TCP.

              I'm not clear on how your current JGroups configuration is any more "dynamic" than using HornetQ's static clustering.  Both use TCP, and both require a static list of servers.  For example, your JGroups configuration uses this:

               

                  <TCPPING

                          initial_hosts="${jgroups.tcpping.initial_hosts:hornetq-server1-ip[7800], hornetq-server1-ip[7900], hornetq-server2-ip[7800], hornetq-server2-ip[7900]}"

                          port_range="1"/>

               

              This configuration will change for each deployment.  In other words, it's not dynamic.

               

              We added JGroups support specifically for cloud use cases.  However, you wouldn't use TCP in this case.  You'd need to use the S3_PING protocol (or something similar) so that it really would be dynamic.

               

              1. If a live goes down when a client is sending message, then sending of the subsequent messages fails on the client side. How do I make it transparent to client applications, so that client sends the subsequent messages to backup server (that replaced the live server just now) without asking the application code/user to connect to backup server?

              Fail-over should be mostly transparent if your server and client are configured properly.  We have documentation and examples that cover the configuration, but you should read section 39.2.1.2 of the documentation to understand edge cases dealing with fail-over and sending messages when using replication.

               

              I also would like to have a client side load balancing of messages while receiving messages. However the sessions created on the client side, are not connected to all the servers. They are always connected to the first server (http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html/clusters.html#clusters.cluster-connections).

              Your configuration is for a live-backup pair.  This is an active-passive configuration so you won't get load-balancing.  If you want both fail-over and load-balancing then you need 2 live-backup pairs.

              1 of 1 people found this helpful
              • 4. Re: How to configure HornetQ client with standalone server cluster (configured using JGroups TCP)
                pragalathan

                Hi , thanks again for you reply.

                We are not on amazon cloud, so i dont have access to S3_PING. Well, i do list the servers in the jgroups file (but i didnt find a better way). But at least here, i thought, these are initial servers and if i add more servers with the same configuration, they will join the cluster automatically.

                This is the best i have got. Open to your suggestions, if there is better way.

                 

                Now coming back to the actual (client side) problems, i think there is a gap. As I said in my previous reply, i have two live and two replicated backups, which is a two live-backup pair. It goes as described below.

                Machine A: runs live1 and backup2 (of live2)

                Machine B: runs live2 and backup1 (of live1)

                 

                After searching a bit more, i have changed the ServerLocator generation code as follows.

                JGroupsBroadcastGroupConfiguration jGroupsBroadcastGroupConfiguration = new JGroupsBroadcastGroupConfiguration("jgroups-tcp.xml", "hornetq_broadcast_channel");
                ServerLocator locator = HornetQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration(1000, 1000, jGroupsBroadcastGroupConfiguration));
                locator.setReconnectAttempts(5);
                locator.setConnectionLoadBalancingPolicyClassName(org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy.class.getName());
                
                

                 

                 

                Changed the receiver creation code as follows.

                try (ClientSessionFactory firstFactory = locator.createSessionFactory()) {
                    Thread.sleep(WAIT_TIME); // this is to ensure that the client gets the updated view of the cluster
                    for (int i = 1; i <= concurrencyLevel; i++) {
                        ClientSessionFactory factory = locator.createSessionFactory();
                        String address = factory.getConnection().getRemoteAddress();
                        ClientSession session = factory.createSession(true, true, 1);
                        session.start();
                
                        ClientConsumer consumer = session.createConsumer(queueName);
                        consumer.setMessageHandler(handler);
                        sessions.add(session);
                        consumers.add(consumer);
                        factories.add(factory);
                        logger.log(Level.INFO, "Started receiver #{0} on {1} for {2}", new Object[]{i, address, queueName});
                     }
                }
                

                 

                Now both of the below are working fine.

                1. If a live goes down when a client is sending message, then sending of the subsequent messages fails on the client side. How do I make it transparent to client applications, so that client sends the subsequent messages to backup server (that replaced the live server just now) without asking the application code/user to connect to backup server?
                2. I also would like to have a client side load balancing of messages while receiving messages. However the sessions created on the client side, are not connected to all the servers. They are always connected to the first server (http://docs.jboss.org/hornetq/2.4.0.Final/docs/user-manual/html/clusters.html#clusters.cluster-connections).

                Thanks for your time and pointers.