Netty Example of PingPong using Object

This example is intend to show my way to use Netty. It is inspired from the ObjectEcho example from Netty.

- It shows how to implement a business logic for each connection (creating a new PongHandler for the server on each connection).

- It shows how to deal with private data (related to the previous point).

- It shows how to implement a protocol based on object (efficient using ObjectXxcoder from Netty).

- It shows how to start, wait and shutdown for Netty part, both on client and server.

 

To shutdown the server, my "simple" example just wait for the last authorized connection to be done

and then shutdown at next attempt of connection, throwing an Exception into the getPipeline method.

This is not a good way, but I don't want to complexify the example by adding a special message

as an example to show how to shutdown "silently" (both on server and client).

However, the main idea is there...

 

I attached the code of the example for convenience to this wiki page. The code is shown below to enable direct reading.

 

The root of the project is supposed to be:

org.jboss.netty.example.pingpong

 

 

The PingPong Object itself:

package org.jboss.netty.example.pingpong;

import java.io.Serializable;

/**
 * Simple object for Ping Pong
 * @author frederic
 *
 */
public class PingPong implements Serializable {

    /**
     *
     */
    private static final long serialVersionUID = 1L;

    /**
     * Current rank of the ping-pong
     */
    public int rank;

    /**
     * Array of bytes for the ping pong message
     */
    public byte[] status;

    /**
     * Default String (for instance for MD5 of the array of bytes)
     */
    public String test1 = "12345678901234567890123456789012";

    /**
     * Constructor from rank and array of bytes
     * @param rank
     * @param status
     */
    public PingPong(int rank, byte[] status) {
        this.rank = rank;
        this.status = status;
    }

    @Override
    public String toString() {
        return "PingPong:" + rank;
    }
}

The PingSerializeClient part:

package org.jboss.netty.example.pingpong;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

/**
 * Simple example of Ping-Pong Client using Serialization
 * @author frederic
 *
 */
public class PingSerializeClient {
    /**
     * Main class for Client taking from two to four arguments<br>
     * -host for server<br>
     * -port for server<br>
     * -number of message (default is 256)<br>
     * -size of array of bytes (default is 16384 = 16KB)
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // Print usage if no argument is specified.
        if (args.length < 2 || args.length > 4) {
            System.err
                    .println("Usage: " +
                            PingSerializeClient.class.getSimpleName() +
                            " <host> <port> [<number of messages>] [<size of array of bytes>]");
            return;
        }

        // Parse options.
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        int nbMessage;

        if (args.length >= 3) {
            nbMessage = Integer.parseInt(args[2]);
        } else {
            nbMessage = 256;
        }
        int size = 16384;
        if (args.length == 4) {
            size = Integer.parseInt(args[3]);
        }

        // *** Start the Netty configuration ***

        // Start client with Nb of active threads = 3 as maximum.
        ChannelFactory factory = new NioClientSocketChannelFactory(Executors
                .newCachedThreadPool(), Executors.newCachedThreadPool(), 3);
        // Create the bootstrap
        ClientBootstrap bootstrap = new ClientBootstrap(factory);
        // Create the global ChannelGroup
        ChannelGroup channelGroup = new DefaultChannelGroup(
                PingSerializeClient.class.getName());
        // Create the associated Handler
        PingHandler handler = new PingHandler(nbMessage, size);

        // Add the handler to the pipeline and set some options
        bootstrap.getPipeline().addLast("handler", handler);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("connectTimeoutMillis", 100);

        // *** Start the Netty running ***

        // Connect to the server, wait for the connection and get back the channel
        Channel channel = bootstrap.connect(new InetSocketAddress(host, port))
                .awaitUninterruptibly().getChannel();
        // Add the parent channel to the group
        channelGroup.add(channel);
        // Wait for the PingPong to finish
        PingPong pingPong = handler.getPingPong();
        System.out
                .println("Result: " + pingPong.toString() + " for 2x" +
                        nbMessage + " messages and " + size +
                        " bytes as size of array");

        // *** Start the Netty shutdown ***

        // Now close all channels
        System.out.println("close channelGroup");
        channelGroup.close().awaitUninterruptibly();
        // Now release resources
        System.out.println("close external resources");
        factory.releaseExternalResources();
    }
}

The PingHandler, Client Handler part:

package org.jboss.netty.example.pingpong;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;

/**
 * Example of ChannelHandler for the Pong Client
 * @author frederic
 *
 */
@ChannelPipelineCoverage("one")
public class PingHandler extends SimpleChannelHandler {

    private static final Logger logger = Logger.getLogger(PingHandler.class
            .getName());

    /**
     * Number of message to do
     */
    private final int nbMessage;

    /**
     * Current rank (decreasing, 0 is the end of the game)
     */
    private int curMessage;

    /**
     * Is there any Ping to send (at least 1 at starting)
     */
    private final AtomicInteger isPing = new AtomicInteger(1);

    /**
     * Start date
     */
    private Date startDate = null;

    /**
     * Stop date
     */
    private Date stopDate = null;

    /**
     * Return value for the caller
     */
    final BlockingQueue<PingPong> answer = new LinkedBlockingQueue<PingPong>();

    /**
     * Ping object
     */
    PingPong pp;

    /**
     * Method to wait for the final PingPong object
     * @return the final PingPong object
     */
    public PingPong getPingPong() {
        for (;;) {
            try {
                return answer.take();
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    }

    /**
     * Constructor
     * @param nbMessage
     * @param size
     */
    public PingHandler(int nbMessage, int size) {
        if (nbMessage < 0) {
            throw new IllegalArgumentException("nbMessage: " + nbMessage);
        }
        this.nbMessage = nbMessage;
        curMessage = nbMessage;
        pp = new PingPong(0, new byte[size]);
    }

    /**
     * Add the ObjectXxcoder to the Pipeline
     */
    @Override
    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
        e.getChannel().getPipeline().addFirst("decoder", new ObjectDecoder());
        e.getChannel().getPipeline().addAfter("decoder", "encoder",
                new ObjectEncoder());
    }

    /**
     * Starts the Ping-Pong
     */
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
        logger.log(Level.INFO, "Start PingPong");
        startDate = new Date();
        generatePingTraffic(e);
    }

    /**
     * If write of Ping was not possible before, just do it now
     */
    @Override
    public void channelInterestChanged(ChannelHandlerContext ctx,
            ChannelStateEvent e) {
        generatePingTraffic(e);
    }

    /**
     * When the channel is closed, print result
     */
    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        stopDate = new Date();
        String MB = String.format("Memory Used: %8.3f MB",
                (Runtime.getRuntime().totalMemory() - Runtime.getRuntime()
                        .freeMemory()) / 1048576.0);
        String Mbs = String.format("%9.3f Mb/s",
                ((nbMessage - curMessage) * 1000 / (stopDate
                        .getTime() - startDate.getTime()) *
                        (pp.status.length + pp.test1.length() + 16) /
                        1048576.0 * 8));
        logger
                .log(
                        Level.INFO,
                        (nbMessage - curMessage) * 2 +
                                " PingPong in " +
                                (stopDate.getTime() - startDate
                                        .getTime()) +
                                " ms so " +
                                (nbMessage - curMessage) * 2 * 1000 / (stopDate
                                        .getTime() - startDate.getTime()) +
                                " msg/s (" + Mbs + ") with " +
                                pp.status.length + " bytes in array, " +
                                MB);
    }

    /**
     * When a Pong is received, starts to send the next Ping
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        PingPong pptmp = (PingPong) e.getMessage();
        if (pptmp != null) {
            pp = pptmp;
            isPing.incrementAndGet();
            generatePingTraffic(e);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        if (e.getCause() instanceof IOException) {
            logger.log(Level.WARNING, "IOException from downstream.");
        } else {
            logger.log(Level.WARNING, "Unexpected exception from downstream.",
                    e.getCause());
        }
        // Offer default object
        answer.offer(pp);
        Channels.close(e.getChannel());
    }

    /**
     * Called when Channel is connected or when the write is enabled again
     * @param e
     */
    private void generatePingTraffic(ChannelStateEvent e) {
        if (isPing.intValue() > 0) {
            Channel channel = e.getChannel();
            sendPingTraffic(channel);
        }
    }

    /**
     * Called when a Pong message was received
     * @param e
     */
    private void generatePingTraffic(MessageEvent e) {
        if (isPing.intValue() > 0) {
            Channel channel = e.getChannel();
            sendPingTraffic(channel);
        }
    }

    /**
     * Truly sends the Ping message if any (if not the last one)
     * @param channel
     */
    private void sendPingTraffic(Channel channel) {
        if ((channel.getInterestOps() & Channel.OP_WRITE) == 0) {
            PingPong sendpp = nextMessage();
            if (sendpp == null) {
                logger.log(Level.WARNING, "Close channel");
                channel.close().addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture future) {
                        answer.offer(pp);
                    }
                });
                return;
            }
            isPing.decrementAndGet();
            Channels.write(channel, sendpp);
        }
    }

    /**
     * Create the next Ping message if its not the las one.
     * @return the next Ping message or NULL if it is the last one.
     */
    private PingPong nextMessage() {
        if (curMessage == 0) {
            logger.log(Level.WARNING, "No more message");
            return null;
        }
        curMessage --;
        pp.rank ++;
        return pp;
    }
}

Now the PongSerializeServer part:

package org.jboss.netty.example.pingpong;

import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

/**
 * Example of Pong Server with serialization
 * @author frederic
 *
 */
public class PongSerializeServer {
    /**
     * Take two arguments :<br>
     * -port to listen to<br>
     * -nb of connections before shutting down
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // Print usage if no argument is specified.
        if (args.length != 2) {
            System.err.println("Usage: " +
                    PongSerializeServer.class.getSimpleName() +
                    " <port> <nb of connections>");
            return;
        }
        // Parse options.
        int port = Integer.parseInt(args[0]);
        int nbconn = Integer.parseInt(args[1]);

        // *** Start the Netty configuration ***

        // Start server with Nb of active threads = 2*NB CPU + 1 as maximum.
        ChannelFactory factory = new NioServerSocketChannelFactory(Executors
                .newCachedThreadPool(), Executors.newCachedThreadPool(),
                Runtime.getRuntime().availableProcessors() * 2 + 1);

        ServerBootstrap bootstrap = new ServerBootstrap(factory);
        // Create the global ChannelGroup
        ChannelGroup channelGroup = new DefaultChannelGroup(
                PongSerializeServer.class.getName());
        // Create the blockingQueue to wait for a limited number of client
        BlockingQueue<Integer> answer = new LinkedBlockingQueue<Integer>();
        // 200 threads max, Memory limitation: 1MB by channel, 1GB global, 100 ms of timeout
        OrderedMemoryAwareThreadPoolExecutor pipelineExecutor = new OrderedMemoryAwareThreadPoolExecutor(
                200, 1048576, 1073741824, 100, TimeUnit.MILLISECONDS, Executors
                        .defaultThreadFactory());

        bootstrap.setPipelineFactory(new PongPipelineFactory(channelGroup,
                pipelineExecutor, answer, nbconn));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        bootstrap.setOption("child.reuseAddress", true);
        bootstrap.setOption("child.connectTimeoutMillis", 100);
        bootstrap.setOption("readWriteFair", true);

        // *** Start the Netty running ***

        // Create the monitor
        ThroughputMonitor monitor = new ThroughputMonitor();
        // Add the parent channel to the group
        Channel channel = bootstrap.bind(new InetSocketAddress(port));
        channelGroup.add(channel);

        // Starts the monitor
        monitor.start();

        // Wait for the server to stop
        answer.take();

        // *** Start the Netty shutdown ***

        // End the monitor
        System.out.println("End of monitor");
        monitor.interrupt();
        // Now close all channels
        System.out.println("End of channel group");
        channelGroup.close().awaitUninterruptibly();
        // Close the executor for Pipeline
        System.out.println("End of pipeline executor");
        pipelineExecutor.shutdownNow();
        // Now release resources
        System.out.println("End of resources");
        factory.releaseExternalResources();
    }
}

The PongPipelineFactory for server part:

package org.jboss.netty.example.pingpong;

import java.util.concurrent.BlockingQueue;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;

/**
 * @author frederic
 *
 */
public class PongPipelineFactory implements ChannelPipelineFactory {

    private ChannelGroup channelGroup = null;

    private OrderedMemoryAwareThreadPoolExecutor pipelineExecutor = null;

    private BlockingQueue<Integer> answer = null;

    private int max = 100; // default is 100 max connections

    /**
     * Constructor
     * @param channelGroup
     * @param pipelineExecutor
     * @param answer
     * @param max max connection
     */
    public PongPipelineFactory(ChannelGroup channelGroup,
            OrderedMemoryAwareThreadPoolExecutor pipelineExecutor,
            BlockingQueue<Integer> answer, int max) {
        super();
        this.channelGroup = channelGroup;
        this.pipelineExecutor = pipelineExecutor;
        this.answer = answer;
        this.max = max;
    }

    /**
     * Initiate the Pipeline for the newly active connection with ObjectXxcoder.
     * @see org.jboss.netty.channel.ChannelPipelineFactory#getPipeline()
     */
    public ChannelPipeline getPipeline() throws Exception {
        if (max == 0) {
            // stop globally
            answer.add(new Integer(0));
            throw new Exception("End of server");
        }
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("decoder", new ObjectDecoder());
        pipeline.addLast("encoder", new ObjectEncoder());
        pipeline.addLast("pipelineExecutor", new ExecutionHandler(
                pipelineExecutor));
        PongHandler handler = new PongHandler(channelGroup);
        pipeline.addLast("handler", handler);
        max --;
        System.out.println("Continue... " + max);
        return pipeline;
    }
}

The PongHandler part for Server Handler:

package org.jboss.netty.example.pingpong;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;

/**
 * Example of ChannelHandler for the Pong Server
 * @author frederic
 *
 */
@ChannelPipelineCoverage("one")
public class PongHandler extends SimpleChannelHandler {

    private static final Logger logger = Logger.getLogger(PongHandler.class
            .getName());

    /**
     * Is there any Pong message to send
     */
    private final AtomicInteger isPong = new AtomicInteger(0);

    /**
     * Bytes monitor
     */
    public static final AtomicLong transferredBytes = new AtomicLong();

    /**
     * Pong object
     */
    private PingPong pp;

    /**
     * Channel Group
     */
    private ChannelGroup channelGroup = null;

    /**
     * Constructor
     * @param channelGroup
     */
    public PongHandler(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    /**
     * Returns the number of transferred bytes
     * @return the number of transferred bytes
     */
    public static long getTransferredBytes() {
        return transferredBytes.get();
    }

    /* (non-Javadoc)
     * @see org.jboss.netty.channel.SimpleChannelHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
     */
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
            throws Exception {
        channelGroup.add(ctx.getChannel());
    }

    /**
     * If write of Pong was not possible before, just do it now
     */
    @Override
    public void channelInterestChanged(ChannelHandlerContext ctx,
            ChannelStateEvent e) {
        generatePongTraffic(e);
    }

    /**
     * When a Ping message is received, send a new Pong
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        PingPong pptmp = (PingPong) e.getMessage();
        if (pptmp != null) {
            pp = pptmp;
            PongHandler.transferredBytes.addAndGet(pp.status.length +
                    pp.test1.length() + 16);
            isPong.incrementAndGet();
            generatePongTraffic(e);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        logger.log(Level.WARNING, "Unexpected exception from downstream.", e
                .getCause());
        Channels.close(e.getChannel());
    }

    /**
     * Used when write is possible
     * @param e
     */
    private void generatePongTraffic(ChannelStateEvent e) {
        if (isPong.intValue() > 0) {
            Channel channel = e.getChannel();
            sendPongTraffic(channel);
        }
    }

    /**
     * Used when a Ping message is received
     * @param e
     */
    private void generatePongTraffic(MessageEvent e) {
        if (isPong.intValue() > 0) {
            Channel channel = e.getChannel();
            sendPongTraffic(channel);
        }
    }

    /**
     * Truly send the Pong
     * @param channel
     */
    private void sendPongTraffic(Channel channel) {
        if ((channel.getInterestOps() & Channel.OP_WRITE) == 0) {
            pp.rank ++;
            isPong.decrementAndGet();
            Channels.write(channel, pp);
        }
    }
}

And finally the ThroughputMonitor:

/*
 * Copyright (C) 2008  Trustin Heuiseung Lee
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, 5th Floor, Boston, MA 02110-1301 USA
 */
package org.jboss.netty.example.pingpong;

/**
 * Class for Throughput Monitoring
 * @author fbregier
 *
 */
public class ThroughputMonitor extends Thread {

    /**
     * Constructor
     */
    public ThroughputMonitor() {
    }

    @Override
    public void run() {
        try {
            long oldCounter = PongHandler.getTransferredBytes();
            long startTime = System.currentTimeMillis();
            for (;;) {
                Thread.sleep(3000);

                long endTime = System.currentTimeMillis();
                long newCounter = PongHandler.getTransferredBytes();
                System.err.format("%4.3f MiB/s%n", (newCounter - oldCounter) *
                        1000 / (endTime - startTime) / 1048576.0);
                oldCounter = newCounter;
                startTime = endTime;
            }
        } catch (InterruptedException e) {
            // Stop monitoring asked
            return;
        }
    }
}

I hope this example can help some other users.

 

Frederic