ASYNCHRONOUS COORDINATION

We have learned how to create multiple threads of execution, and then examined use of locks and Java monitor locks to protect critical sections where shared data is modified against concurrent operations.

These two mechanisms do not automatically provide a coordination between multiple threads of control. A coordination requires some sort of signalling between threads.

Producer-Consumer pattern

The most typical problem in coordination between multiple threads which work asynchronously is producer-consumer pattern which arise in many problems. This pattern of work arises in cases where a group of workers produce tasks, and another group of workers consume these tasks, where usually the tasks are processed in order as in a queue.

Producer Consumer
Producer Consumer

Example: The bank queue

Consider the bank queue example we have covered before:

    class BankQueue {
        int nextToGive=0, nextToServe=0;
        synchronized int nextNumberToCustomer() {
            return ++nextToGive;
        }
        synchronized int nextToServe() throws Exception{
            if (nextToServe<nextToGive)
                return ++nextToServe;
            else 
                throw new Exception("No customers to serve");
        }
    }

This problem fits the producer-consumer pattern: the ticket machines are producers which produce entities to be served, and the bank clerks are the consumers which consume from the entities in the queue.

The simple implementation throws an exception when there are no customers in the queue to serve. What is the bank clerk supposed to do in such a situation?

Coordination problem

The question above is an annoying one. One thing to do for the consumer/clerk is to keep polling the queue until an entity/customer arrives:

    class Consumer implements Runnable{
        BankQueue queue;
        Consumer(BankQueue queue) { this.queue=queue;}
        public void run() {
            while (true) {
                try {
                    int n=queue.nextToServe();
                    System.console().format("Serving customer no: %d\n",n);
                } catch (Exception e) {
                    //just keep trying!
                    System.console().format("Noone to serve\n");
                    Thread.sleep(500);
                }
            }
        }
    }
    class Producer implements Runnable{
        BankQueue queue;
        Consumer(BankQueue queue) { this.queue=queue;}
        public void run() {
            while (true) 
                queue.nextNumberToCustomer();
        }
    }
    class Bank {
        public static void main(String[] args) {
            int numConsumers=10, numProducers=2;
            BankQueue theQueue=new BankQueue();
            for(int i=0;i<numConsumers;i++)
                new Thread(new Consumer(theQueue)).start();
            for(int i=0;i<numProducers;i++)
                new Thread(new Producer(theQueue)).start();
        }
    }

The solution in the Consumer class is ugly due to several reasons. First, it involves polling, hence the consumer threads will keep the CPU busy to some extent. But more importantly, the sleep delay we have put to avoid CPU overhead in polling causes the Consumer(s) to be notified of customer/entity arrival with a time delay, hence wasting time which can be used to do actual tasks.

Coordination using Java monitors

A better solution comes from using Java monitor mechanism. Please go ahead and read the explanation in Object class for wait() and notify() methods.

The wait() and notify() methods work against one another. A thread which is blocked in the wait() call does not consume any CPU, and one of such threads is awakened with a notify() calll. Note that a wait()ing thread releases the monitor lock (the call is in a syncronized method!), and when awakened with a notify it must wait to obtain the monitor lock is it awakends in the midst of a synchronized block/method (which is the case in our program).

    class BankQueue {
        int nextToGive=0, nextToServe=0;
        synchronized int nextNumberToCustomer() {
            notify();
            return ++nextToGive;
        }
        synchronized int nextToServe() throws InterruptedException{
            if (!(nextToServe<nextToGive)){
                System.out.println("Waiting");
                wait();
            }
            return ++nextToServe;
        }
    }

    class Consumer implements Runnable{
        BankQueue queue;
        Consumer(BankQueue queue) { this.queue=queue;}
        public void run() {
            try {
                while (true) {
                    int n=queue.nextToServe();
                    System.console().format("Serving customer no: %d\n",n);
                }
            } catch(InterruptedException e) {}
        }
    }
    class Producer implements Runnable{
        BankQueue queue;
        Producer(BankQueue queue) { this.queue=queue;}
        public void run() {
            while (true) {
                int n=queue.nextNumberToCustomer();
                System.console().format("New customer no: %d\n",n);
            }
        }
    }
    class Bank {
        public static void main(String[] args) {
            int numConsumers=10, numProducers=2;
            BankQueue theQueue=new BankQueue();
            for(int i=0;i<numConsumers;i++)
                new Thread(new Consumer(theQueue)).start();
            for(int i=0;i<numProducers;i++)
                new Thread(new Producer(theQueue)).start();
        }
    }

Exercise: The bounded buffer problem

The bounded buffer is a similar problem but this time its capacity is bounded, thus the producers may also need to wait.