Producer-Consumer using Java Threads – wait,notifyAll

April 27, 2010

Hello,

In Java , threads communicate with each other using “wait,notify and notifyAll” APIs.  Producer-Consumer is the classic example to explain these APIs. The example below tries to show how we can use inter-thread communication to read shared data when it is available. I have also tried to demonstrate use of Thread Interrupt using which we can cancel the running task, in this case exiting the application.

[sourcecode language=”java”]
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* @author Amit
* This Producer-Consumer example tries to read/write one shared String variable.
* This example also uses Thread interrupt concept to stop the application.
* The variable in main() are defined final so that we can use them in
* Anonymous inner class implementation of thread within main() method.
*/
public class ConsumerProducerTest {

static String data = new String();

public static void main(String args[]) {

//Declaring these variables as final so that
//I can access them inside the Anonymous inner class
//for user thread.
final Object lock = new Object();

final Thread readerThread = new Thread(new Reader(lock));
final Thread writerThread = new Thread(new Writer(lock));
writerThread.start();
readerThread.start();

//User Input Thread
final BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
new Thread(new Runnable() {
public void run() {
while(true) {
try {
String str = in.readLine();
if("quit".equalsIgnoreCase(str)) {
readerThread.interrupt();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

}).start();
}
}
class Reader implements Runnable {

private Object lock;

Reader(Object lock) {
this.lock = lock;
}

public void run() {
System.out.println("Reader::run");
while(true) {
synchronized(lock) {
if("".equals(ConsumerProducerTest.data)) {
System.out.println("Reader is going to wait");
try {
lock.wait();
} catch (InterruptedException e) {
System.out.println("Reader thread interrupted !!");
}
System.out.println("After call to Reader wait");
} else {
System.out.println("Reader got the data" + ConsumerProducerTest.data);
System.out.println("Reader is notifying");
ConsumerProducerTest.data = ""; //Clear the data
//Wait for 10 seconds before notifying other threads.
try {
Thread.sleep(10000);
} catch(InterruptedException ie) {
System.out.println("Reader is interrupted while sleeping!!");
System.exit(0);
}
lock.notifyAll();
System.out.println("Reader done notifying");
}
}
}
}
}

class Writer implements Runnable {

private Object lock;

Writer(Object lock) {
this.lock = lock;
}

public void run() {
System.out.println("Writer::run");
while(true) {
synchronized(lock) {
if("".equals(ConsumerProducerTest.data)) {
ConsumerProducerTest.data = "data-"+System.currentTimeMillis();
System.out.println("Writer wrote the data – " + ConsumerProducerTest.data);
System.out.println("Writer is notifying reader");
lock.notifyAll();
System.out.println("Writer done notifying");
} else {
try {
System.out.println("Writer is going to wait");
lock.wait();
System.out.println("After call to Writer wait");
} catch(InterruptedException ie) {
System.out.println("Writer is interrupted !!");
}
}
}
}
}
}
[/sourcecode]

Hope you find this useful.

Cheers !

Amit

Advertisements