import java.util.concurrent.*; import java.io.*; import java.nio.*; import java.nio.channels.*; public class ChannelCopy { // This method is the cannonical New I/O channel copying loop // implemented with a single buffer and a single thread. static void copy(ReadableByteChannel source, WritableByteChannel destination) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(32 * 1024); while(source.read(buffer) != -1) { // Prepare the buffer to be drained buffer.flip(); // Make sure that the buffer was fully drained while (buffer.hasRemaining()) { destination.write(buffer); } // Make the buffer empty, ready for filling buffer.clear(); } } // Here is a concurrent version of the same copying operation, using // two threads and two buffers exchanged with an Exchanger. In theory, // this should be more efficiently because IO latencies will allow the // two threads to run concurrently static void copyConcurrent(final ReadableByteChannel source, final WritableByteChannel destination) throws IOException { // The two threads use this to exchange full buffers for empty buffers. // One thread tells the other to exit by exchanging null with it. final Exchanger exchanger = new Exchanger(); // This is the definition of the reader thread class class Reader extends Thread { public void run() { try { // Start with an empty buffer ByteBuffer buffer = ByteBuffer.allocateDirect(32 * 1024); for(;;) { // Invariant: buffer is empty and ready to fill here. // Try to read some bytes. Exit the loop if the // channel has no more bytes or on IOException try { int numread = source.read(buffer); if (numread == -1) break; } catch(IOException e) { exception = e; // Remember for later queries break; } // Prepare the buffer to be drained buffer.flip(); // Pass it to the writer thread, getting a new drained // buffer to use in the next pass through the loop. buffer = exchanger.exchange(buffer); // If the writer gave us a null buffer, it has exited // with an IOException, and we should too. if (buffer == null) return; } // When we exit the loop because of EOF or IOExcpetion, // exchange null with the writer thread, so it will exit. exchanger.exchange(null); } // Exit if we're interrupted, and assume that the writer thread // will also get interrupted. catch(InterruptedException abort) {} } // We can't throw an exception from run(), but we can remember it IOException exception; public IOException getException() { return exception; } } // Here is the writer thread class Writer extends Thread { public void run() { try { ByteBuffer buffer = ByteBuffer.allocateDirect(32 * 1024); for(;;) { // Invariant: here we have an empty buffer we need to // exchange for a full one. buffer = exchanger.exchange(buffer); // A null buffer means the reader reached EOF or got // an IOException. Either way, we're done. if (buffer == null) return; // Otherwise, drain the buffer to the destination while (buffer.hasRemaining()) { try { destination.write(buffer); } catch(IOException e) { // If we get an IO exception, remember it and // let the reader thread know, so it doesn't // block forever in exchange() exception = e; exchanger.exchange(null); return; } } // Now make the buffer ready to be refilled so we // can exchange it again at the top of the loop. buffer.clear(); } } catch(InterruptedException abort) {} } IOException exception; public IOException getException() { return exception; } } // Create one thread to read and one thread to write Reader reader = new Reader(); Writer writer = new Writer(); // Start both threads reader.start(); writer.start(); // Wait for both threads to stop try { reader.join(); writer.join(); } catch(InterruptedException e) { // If we get interrupted while waiting for the threads to exit, // then interrupt the threads we spawned. reader.interrupt(); writer.interrupt(); return; } // if either thread encountered an exception, rethrow that exception if (reader.getException() != null) throw reader.getException(); if (writer.getException() != null) throw writer.getException(); } // Test as a filecopy operation public static void main(String[] args) throws IOException { FileInputStream in = new FileInputStream(args[0]); FileOutputStream out = new FileOutputStream(args[1]); FileChannel source = in.getChannel(); FileChannel destination = out.getChannel(); ChannelCopy.copyConcurrent(source, destination); } }