Concurrent channel copying algorithm

I've been studying the new java.util.concurrent package. And, as part of this, I've been reading Doug Lea's book Concurrent Programming in Java: Design Principles and Patterns.

The book is well written and enlightening, and the package is powerful and has some very cool utilities in it.

Inspired by an example Doug gives in both the book and the javadoc, I've written a concurrent version of the basic New I/O byte channel copying loop. This is the loop you write anytime you need to copy bytes from one channel to another. A proxy server would use this, for example, to copy bytes from one socket to another. Here it is in the traditional single-threaded form:

// This method is the cannonical New I/O channel copying loop
// implemented with a single buffer and a single thread.
static void copy(ReadableByteChannel source,
		 WritableByteChannel destination) throws IOException {

    ByteBuffer buffer = ByteBuffer.allocateDirect(32 * 1024);

    while(source.read(buffer) != -1) {
	// Prepare the buffer to be drained
	buffer.flip();

	// Make sure that the buffer was fully drained
	while (buffer.hasRemaining()) {
	    destination.write(buffer);
	}

	// Make the buffer empty, ready for filling
	buffer.clear();
    }
}

Now here's a concurrent version that does the same thing. (And presumably it does it faster: if there is enough I/O latency, the reader thread can run while the writer thread blocks and vice versa.) This version uses two threads and two byte buffers. The trick is that the threads use a java.util.concurrent.Exchanger to swap buffers. When the reader thread has filled up its buffer and the writer thread has drained its buffer, they swap them. When a thread exits because it has encountered EOF or an IOException, its swaps null to tell the other thread to exit, too.

The basic buffer-swapping code is simple. The rest of the code is New I/O stuff, and exception handling.

// Here is a concurrent version of the same copying operation, using
// two threads and two buffers exchanged with an Exchanger.  In theory,
// this should be more efficiently because IO latencies will allow the
// two threads to run concurrently
static void copyConcurrent(final ReadableByteChannel source,
			   final WritableByteChannel destination)
    throws IOException
{
    // The two threads use this to exchange full buffers for empty buffers.
    // One thread tells the other to exit by exchanging null with it.
    final Exchanger<ByteBuffer> exchanger = new Exchanger<ByteBuffer>();

    // This is the definition of the reader thread class
    class Reader extends Thread {
	public void run() {
	    try {
		// Start with an empty buffer
		ByteBuffer buffer = ByteBuffer.allocateDirect(32 * 1024);

		for(;;) {
		    // Invariant: buffer is empty and ready to fill here.

		    // Try to read some bytes.  Exit the loop if the 
		    // channel has no more bytes or on IOException
		    try {
			int numread = source.read(buffer);
			if (numread == -1) break;
		    }
		    catch(IOException e) {
			exception = e; // Remember for later queries
			break;
		    }

		    // Prepare the buffer to be drained
		    buffer.flip();

		    // Pass it to the writer thread, getting a new drained
		    // buffer to use in the next pass through the loop.
		    buffer = exchanger.exchange(buffer);

		    // If the writer gave us a null buffer, it has exited
		    // with an IOException, and we should too.
		    if (buffer == null) return;
		}

		// When we exit the loop because of EOF or IOExcpetion,
		// exchange null with the writer thread, so it will exit.
		exchanger.exchange(null);
	    }
	    // Exit if we're interrupted, and assume that the writer thread
	    // will also get interrupted.
	    catch(InterruptedException abort) {}
	}

	// We can't throw an exception from run(), but we can remember it
	IOException exception;
	public IOException getException() { return exception; }
    }

    // Here is the writer thread
    class Writer extends Thread {
	public void run() {
	    try {
		ByteBuffer buffer = ByteBuffer.allocateDirect(32 * 1024);

		for(;;) {
		    // Invariant: here we have an empty buffer we need to 
		    // exchange for a full one.
		    buffer = exchanger.exchange(buffer);

		    // A null buffer means the reader reached EOF or got
		    // an IOException.  Either way, we're done.
		    if (buffer == null) return;

		    // Otherwise, drain the buffer to the destination
		    while (buffer.hasRemaining()) {
			try { destination.write(buffer); }
			catch(IOException e) {
			    // If we get an IO exception, remember it and 
			    // let the reader thread know, so it doesn't
			    // block forever in exchange()
			    exception = e;
			    exchanger.exchange(null);
			    return;
			}
		    }

		    // Now make the buffer ready to be refilled so we
		    // can exchange it again at the top of the loop.
		    buffer.clear();
		}
	    }
	    catch(InterruptedException abort) {}
	}

	IOException exception;
	public IOException getException() { return exception; }
    }

    // Create one thread to read and one thread to write
    Reader reader = new Reader();
    Writer writer = new Writer();

    // Start both threads
    reader.start();	writer.start();

    // Wait for both threads to stop
    try { reader.join(); writer.join(); }
    catch(InterruptedException e) {
	// If we get interrupted while waiting for the threads to exit,
	// then interrupt the threads we spawned.
	reader.interrupt(); writer.interrupt();
	return;
    }

    // if either thread encountered an exception, rethrow that exception
    if (reader.getException() != null) throw reader.getException();
    if (writer.getException() != null) throw writer.getException();
}

Full disclosure time: I've tested this code for copying local files only. I have not exercised the code that handles IOException or InterruptedException. So let me know if you spot any bugs, and don't start calling this method from your production code!

If you think this code is nifty, go buy Doug's book. It is well written, and aimed at professional programmers, not academic researchers. And it complements java.util.concurrent very nicely.

Finally, if you want to play around with this code, download it here.

Books

Comprehensive coverage of Ruby 1.8 and 1.9

"The New Most Important Ruby Book"
Peter Cooper,
rubyinside.com

Completely updated for Ajax and Web 2.0

"A must-have reference"
Brendan Eich,
creator of JavaScript

The classic Java quick-reference