Producer-Consumer using Java Threads – wait,notifyAll

April 27, 2010

Hello,

In Java , threads communicate with each other using “wait,notify and notifyAll” APIs.  Producer-Consumer is the classic example to explain these APIs. The example below tries to show how we can use inter-thread communication to read shared data when it is available. I have also tried to demonstrate use of Thread Interrupt using which we can cancel the running task, in this case exiting the application.

import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
 * @author Amit
 * This Producer-Consumer example tries to read/write one shared String variable.
 * This example also uses Thread interrupt concept to stop the application.
 * The variable in main() are defined final so that we can use them in
 * Anonymous inner class implementation of thread within main() method.
 */
public class ConsumerProducerTest {

	static String data = new String();

	public static void main(String args[]) {

		//Declaring these variables as final so that
		//I can access them inside the Anonymous inner class
		//for user thread.
		final Object lock = new Object();

		final Thread readerThread = new Thread(new Reader(lock));
		final Thread writerThread = new Thread(new Writer(lock));
		writerThread.start();
		readerThread.start();

		//User Input Thread
		final BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
		new Thread(new Runnable() {
			public void run() {
				while(true) {
					try {
						String str = in.readLine();
						if("quit".equalsIgnoreCase(str)) {
								readerThread.interrupt();
						}
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}

		}).start();
	}
}
class Reader implements Runnable {

	private Object lock;

	Reader(Object lock) {
		this.lock = lock;
	}

	public void run() {
		System.out.println("Reader::run");
		while(true) {
			synchronized(lock) {
				if("".equals(ConsumerProducerTest.data)) {
					System.out.println("Reader is going to wait");
					try {
						lock.wait();
					} catch (InterruptedException e) {
						System.out.println("Reader thread interrupted !!");
					}
					System.out.println("After call to Reader wait");
				} else {
					System.out.println("Reader got the data" + ConsumerProducerTest.data);
					System.out.println("Reader is notifying");
					ConsumerProducerTest.data = ""; //Clear the data
					//Wait for 10 seconds before notifying other threads.
					try {
						Thread.sleep(10000);
					} catch(InterruptedException ie) {
						System.out.println("Reader is interrupted while sleeping!!");
						System.exit(0);
					}
					lock.notifyAll();
					System.out.println("Reader done notifying");
				}
			}
		}
	}
}

class Writer implements Runnable {

	private Object lock;

	Writer(Object lock) {
		this.lock = lock;
	}

	public void run() {
		System.out.println("Writer::run");
		while(true) {
			synchronized(lock) {
				if("".equals(ConsumerProducerTest.data)) {
					ConsumerProducerTest.data = "data-"+System.currentTimeMillis();
						System.out.println("Writer wrote the data - " + ConsumerProducerTest.data);
						System.out.println("Writer is notifying reader");
						lock.notifyAll();
						System.out.println("Writer done notifying");
				} else {
					try {
						System.out.println("Writer is going to wait");
						lock.wait();
						System.out.println("After call to Writer wait");
					} catch(InterruptedException ie) {
						System.out.println("Writer is interrupted !!");
					}
				}
			}
		}
	}
}

Hope you find this useful.

Cheers !

Amit

Advertisements

Serialization – Compatible and Incompatible modifications to classes

April 15, 2010

Hello,
Java Serialization allows us to write an Object into byte stream and again re-construct its state whenever needed.
In between Serialization and De-serialization the class definition might get changed. Some modifications in
the class definition would result in java.io.InvalidClassException while De-serializing.
It is very important to understand these constraints. especially while dealing with Enterprise distributed Applications. As Distributed applications would always exchange the objects over the network stream using Serialization techniques.

Modifications those keep the Class compatible with Serialized version

  1. Add Fields
  2. Change Field from Static to Non-Static.
  3. Change Field from Transient to Non-Transient
  4. Add Class to the Object Tree


Modifications those make the Class incompatible with Serialized version

  1. Delete Field
  2. Change Field from Non-Static to Static
  3. Change Field from Non-Transient to Transient
  4. Changing the Class hierarchy
  5. Changing type of a Primitive Field

There are others changes as well those can be added to above list. They can be found here.
If you look at the above list the gist is “Addition of Field is permitted while Deletion cause Exception“.
Changing Field from Static to Non-static for transient to non-transient is kind of adding field to existing class.
While making non-static field to static and data-type of the field is like deleting the field which is incompatible.

This seems quite obvious from the Java Documentation but a beginner level would find this helpful, which is the reason
behind this post.

Cheers !

Amit