Producer-Consumer using Java Threads – wait,notifyAll

April 27, 2010

Hello,

In Java , threads communicate with each other using “wait,notify and notifyAll” APIs.  Producer-Consumer is the classic example to explain these APIs. The example below tries to show how we can use inter-thread communication to read shared data when it is available. I have also tried to demonstrate use of Thread Interrupt using which we can cancel the running task, in this case exiting the application.

import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
 * @author Amit
 * This Producer-Consumer example tries to read/write one shared String variable.
 * This example also uses Thread interrupt concept to stop the application.
 * The variable in main() are defined final so that we can use them in
 * Anonymous inner class implementation of thread within main() method.
 */
public class ConsumerProducerTest {

	static String data = new String();

	public static void main(String args[]) {

		//Declaring these variables as final so that
		//I can access them inside the Anonymous inner class
		//for user thread.
		final Object lock = new Object();

		final Thread readerThread = new Thread(new Reader(lock));
		final Thread writerThread = new Thread(new Writer(lock));
		writerThread.start();
		readerThread.start();

		//User Input Thread
		final BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
		new Thread(new Runnable() {
			public void run() {
				while(true) {
					try {
						String str = in.readLine();
						if("quit".equalsIgnoreCase(str)) {
								readerThread.interrupt();
						}
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}

		}).start();
	}
}
class Reader implements Runnable {

	private Object lock;

	Reader(Object lock) {
		this.lock = lock;
	}

	public void run() {
		System.out.println("Reader::run");
		while(true) {
			synchronized(lock) {
				if("".equals(ConsumerProducerTest.data)) {
					System.out.println("Reader is going to wait");
					try {
						lock.wait();
					} catch (InterruptedException e) {
						System.out.println("Reader thread interrupted !!");
					}
					System.out.println("After call to Reader wait");
				} else {
					System.out.println("Reader got the data" + ConsumerProducerTest.data);
					System.out.println("Reader is notifying");
					ConsumerProducerTest.data = ""; //Clear the data
					//Wait for 10 seconds before notifying other threads.
					try {
						Thread.sleep(10000);
					} catch(InterruptedException ie) {
						System.out.println("Reader is interrupted while sleeping!!");
						System.exit(0);
					}
					lock.notifyAll();
					System.out.println("Reader done notifying");
				}
			}
		}
	}
}

class Writer implements Runnable {

	private Object lock;

	Writer(Object lock) {
		this.lock = lock;
	}

	public void run() {
		System.out.println("Writer::run");
		while(true) {
			synchronized(lock) {
				if("".equals(ConsumerProducerTest.data)) {
					ConsumerProducerTest.data = "data-"+System.currentTimeMillis();
						System.out.println("Writer wrote the data - " + ConsumerProducerTest.data);
						System.out.println("Writer is notifying reader");
						lock.notifyAll();
						System.out.println("Writer done notifying");
				} else {
					try {
						System.out.println("Writer is going to wait");
						lock.wait();
						System.out.println("After call to Writer wait");
					} catch(InterruptedException ie) {
						System.out.println("Writer is interrupted !!");
					}
				}
			}
		}
	}
}

Hope you find this useful.

Cheers !

Amit