0 Replies Latest reply on Nov 12, 2010 10:41 AM by gaohoward

    Implementation details of HornetQ's large message compression utility

    gaohoward

      HornetQ uses InputStream to send large messages, and this is also true with compressed ones. So in order to send a compressed large message, we need to have a InputStream to read the compressed bytes from. Here is how I implement it:

       

      1. Create a OutputStream to receive compressed bytes from GZIPOutputStream:

       

      public static class DynamicOutputStream extends OutputStream
      {
      ...
      }

       

      This class allows for us to directly read it's internal buffer.

       

      2. Then pass this stream to GZIPOutputStream, like

       

      GZIPOutputStream zipOut = new GZIPOutputStream(new DynamicOutputStream);

       

      3. As a Large Message comes in, we write it into the GZIPOuptutStream and then get the compressed byte stream from the DynamicOutputStream.

       

      To coordinate the writing and reading, a GZipPipe class is created:

       

         public static class GZipPipe extends InputStream

       

      This class combines the reading of compressed bytes and writing of the raw large message to GZIPOutputStream into one action, as in its read1() method:
           
            public byte[] read1() throws IOException
            {
               byte[] result = receiver.getBuffer();
               if (result == null)
               {
                  return null;
               }
               else if (result.length > 0)
               {
                  return result;
               }
              
               int n = input.read(readBuffer);
               while (true)
               {
                  if (n > 0)
                  {
                     zipOut.write(readBuffer, 0, n);
                     result = receiver.getBuffer();
                     if ((result != null) && (result.length > 0))
                     {
                        break;
                     }
                     n = input.read(readBuffer);
                  }
                  else
                  {
                     zipOut.close();
                     result = receiver.getBuffer();
                     break;
                  }
               }
               return result;
            }
         }

       

      What this method does is that:

       

      First it tries to read any compressed bytes (in byte array) from the receiver (a DynamicOutputStream), if there are, keep them in the buffer for reading.
      If no compressed bytes available, it will keep writing the raw data into GZIPOutputStream, until compressed data is available.
      If all raw data have been written, close the GZIPOutputStream and get back the remaining compressed data that haven't been read yet.