Synchronization tools for coordinating between threads

Apart from the low level synchronization abstractions (synchronized/Lock/Semaphore), Java also provides a bunch of high level abstractions for synchronizing activities of two or more threads. Let's have a closer look at some of them

CountdownLatch

CountdownLatch is a simple yet useful utility which allows one or more threads to wait until a set of operations have been completed.

When we create an instance of CountDownLatch, as part of the constructor arguments we specify the number of threads it should wait for, all such thread are required to do count down by invoking countDown() on the latch object once they are completed or ready to do the job. As soon as count reaches zero, the waiting task starts running. Note that in order to wait, a thread has to explicitly invoke await() or its time out version on the latch object - invoking countDown() doesn't automatically cause a thread to wait. This provides us an interesting flexibility - the threads calling countDown() need not necessarily be same as the threads waiting. This flexibility makes it easier to solve a lot of problems with CountdownLatch as opposed to other synchronization aiding classes

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset.

To initialize a CountDownLatch we can use this constructor and provide the count as constructor argument

 public CountDownLatch(int count)

To trigger a count-down, the following method needs to be invoked:

 public void countDown()

A call to countDown() decrements the count of the latch, releasing all waiting threads if the count reaches zero. If the current count is greater than zero then it is decremented. If the new count is zero then all waiting threads are re-enabled for thread scheduling. If the current count is already zero then nothing happens.

In order to wait on a latch, a thread needs to invoke either of the following methods :

 public void await() throws InterruptedException

 public boolean await(long timeout, TimeUnit unit) throws InterruptedException

Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted, or the specified waiting time elapses. If the current count is zero then this method returns immediately with the value true.

Note that for the timeout version of await, the return type is boolean and not void. It returns true if the count reached zero and false if the waiting time elapsed before the count reached zero.

It's generally safer to use the timeout version of the await method instead of the no-args version as in the absence of a timeout, in case of the worker thread runs into an Exception before calling countDown(), we might have a situation where count never reaches zero.

In this post when we discussed DeadLock and tried to simulate a deadlock, we used a sleep to ensure deadlock. Lets revisit the code and try to avoid a sleep (or any kind of delay) but still guarantee a deadlock using CountDownLatch

Existing Code:

class Dummy {
    synchronized void m1 (Dummy other) {
        System.out.println(Thread.currentThread().getName()+ " : Executing m1");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        other.m2();
    }

    synchronized void m2() {
        System.out.println(Thread.currentThread().getName()+ " : Executing m2");
    }
}
class DeadlockDemo implements Runnable {
    Dummy a;
    Dummy b;

    public DeadlockDemo(Dummy a, Dummy b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        a.m1(b);
    }
}

and the main method :

public static void main(String[] args) {
    Dummy a = new Dummy();
    Dummy b = new Dummy();

    Runnable r1 = new DeadlockDemo(a, b);
    Runnable r2 = new DeadlockDemo(b, a);
    new Thread(r1).start();
    new Thread(r2).start();
}

The reason we used a sleep() was to ensure that first thread doesn't acquire both the locks even before second thread hasn't acquired a single lock.

Now we can rewrite the class Dummy as :

class Dummy {
    static CountDownLatch latch = new CountDownLatch(2);
    synchronized void m1 (Dummy other) {
        System.out.println(Thread.currentThread().getName()+ " : Executing m1");
        try {
            latch.countDown();
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        other.m2();
    }

    synchronized void m2() {
        System.out.println(Thread.currentThread().getName()+ " : Executing m2");
    }
}

So we have created a latch with count 2 and each thread calls countDown() and await(), before invoking m2(). As a result, thread 1 cant proceed with m2() until thread 2 has invoked countDown() on the latch. So, we'll still have a deadlock but wont need a sleep (or some other form of delay)

The rest of the code remains exactly the same

CyclicBarrier

A CyclicBarrier is a synchronization aid very similar to a CountDownLatch but with the following differences :

  • While a CountDownLatch is one-shot, a CyclicBarrier, as the name suggests, can be reset/reused even after the waiting threads have been released

  • Unlike a CountDownLatch, where wait and countdown are two distinct events (and can be triggered by two different threads), in a CyclicBarrier, a call to await() implicitly triggers a count-down as well. This also means that the threads triggering the barrier count can't be different from the threads waiting

  • A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue

To construct a CyclicBarrier use either of the following constructors. The second one accepts an additional Runnable argument which is invoked once per barrier point

 public CyclicBarrier(int parties)
 public CyclicBarrier(int parties, Runnable barrierAction)

Just like a CountDownLatch, in order to wait for a barrier to be tripped, a thread has to invoke await (with or without timeout) on it

 public int await() throws InterruptedException, BrokenBarrierException
 public int await(long timeout, TimeUnit unit) throws InterruptedException,               BrokenBarrierException, TimeoutException

Note that the await methods return an int. The return value indicates the arrival index of the current thread ( 0 -> most recent ). Also unlike a CountDownLatch, a CyclicBarrier throws TimeoutException in case of a timeout

A CyclicBarrier can be reset to it's initial state by invoking the reset() method

 public void reset()

If the barrier is reset while thread(s) are waiting on it, the barrier is said to be broken and those threads get a BrokenBarrierException ( that's why await() in CyclicBarrier throws BrokenBarrierException ). Once a barrier is broken, reusing it isn't recommended as it might cause unpredictable results

To check if a barrier has already been reset or not, use :

 public boolean isBroken()

Here's a piece of code which demonstrates how a CyclicBarrier works :

So, we have a barrier of size 3.
We spawn three threads each of which wait on the barrier. Because of barrier count being 3, the barrier will be tripped and reset.
After a while (5 seconds), we spawn 3 more threads each of which wait on the barrier. Because of barrier count being 3, the barrier will be tripped again.

public static void main(String[] args) {

    Runnable r = new Runnable() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " barrier tripped");
        }
    };

    CyclicBarrier barrier = new CyclicBarrier(3, r);

    Runnable task = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is waiting on barrier");
                barrier.await();
                System.out.println("Barrier tripped "+Thread.currentThread().getName() + " can proceed now");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    };

    new Thread(task, "T1").start();
    new Thread(task, "T2").start();
    new Thread(task, "T3").start();

    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(task, "T4").start();
    new Thread(task, "T5").start();
    new Thread(task, "T6").start();
}

Output:

 T1 is waiting on barrier
 T2 is waiting on barrier
 T3 is waiting on barrier
 T3 barrier tripped
 Barrier tripped T3 can proceed now
 Barrier tripped T2 can proceed now
 Barrier tripped T1 can proceed now
 T4 is waiting on barrier
 T5 is waiting on barrier
 T6 is waiting on barrier
 T6 barrier tripped
 Barrier tripped T6 can proceed now
 Barrier tripped T4 can proceed now
 Barrier tripped T5 can proceed now

So, from the output we can see that :

Once a barrier is tripped, it automatically gets reset (and is hence reusable)

The runnable argument to the barrier is executed by the same thread that trips the barrier.

Lets try one more exercise. We will change the code to make the Runnable implementation (that goes into the barrier through its constructor) get stuck in an infinite loop and take a thread dump to see the state of the threads

    Runnable r = new Runnable() {
        @Override
        public void run() {
            while(true) {
                // do nothing
            }
        }
    };

    CyclicBarrier barrier = new CyclicBarrier(3, r);

    Runnable task = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is waiting on barrier");
                barrier.await();
                System.out.println("Barrier tripped "+Thread.currentThread().getName() + " can proceed now");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    };

    new Thread(task, "T1").start();
    new Thread(task, "T2").start();
    new Thread(task, "T3").start();

    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(task, "T4").start();
    new Thread(task, "T5").start();
    new Thread(task, "T6").start();
}

Output:

T3 is waiting on barrier
T1 is waiting on barrier
T2 is waiting on barrier
T4 is waiting on barrier
T5 is waiting on barrier
T6 is waiting on barrier

If we connect the the JVM using a Profiling tool like JProfiler we can see that only one thread is running and others are waiting
VisualVM Screenshot

This proves that :

The last thread to trigger the barrier runs the Barrier trip Runnable. No new thread is spawned

Unless the Runnable associated with the barrier completes execution, waiting threads cant proceed

Unless the Runnable associated with the barrier completes execution, the barrier state isnt reset and even if same number of threads as the barrier count invokes await, the barrier isn't tripped again, nor is the Runnable executed again

Phaser

Phaser is a relatively new synchronization aid introduced in JDK. Just like latch and barrier, Phaser allows us to build logic in which threads need to wait on the barrier before going to the next step of execution. It is similar in construct to CountDownLatch and CyclicBarrier, but is much more powerful and flexible. Phaser's main advantage over synchronization aids is that unlike CountDownLatch and CyclicBarrier, the expected number of parties in Phaser is dynamic. We can coordinate multiple phases of execution, reusing a Phaser instance for each application phase. Each phase can have a different number of threads waiting for advancing to another phase.

Unlike the case for other barriers, the number of parties registered to synchronize on a phaser may vary over time. Tasks may be registered at any time and optionally deregistered upon any arrival. To participate in the coordination, the thread needs to register itself with the Phaser instance. Note that this only increases the number of registered parties, and we can’t check whether the current thread is registered

To create a Phaser, we can pass the number of parties in the constructor :

  public Phaser(int parties)

Or we can use the no-arg constructor as well. Invoking the no arg constructor is akin to calling Phaser(0). It creates a Phaser with no parties registered initially.

A thread can register itself with the Phaser by invoking any of the following methods:

 public int register()

Returns the arrival phase number to which this registration applied. If this value is negative, then this phaser has already terminated, in which case registration has no effect.

public int bulkRegister(int parties)

Similar to register, except that it bulk registers a number of parties at one go.

A thread signals that it arrived at the barrier by calling the arriveAndAwaitAdvance(), which is a blocking method. It is a usage error for an unregistered party to invoke this method.

 public int arriveAndAwaitAdvance()

When the number of arrived parties is equal to the number of registered parties, the execution of the program will continue, and the phase number will increase.

A thread can arrive and deregister itself upon arrival as well. arrive. Deregistration reduces the number of parties required to advance in future phases.

 public int arriveAndDeregister()

This method call returns immediately without blocking. When the thread finishes its job, we should call the arriveAndDeregister() method to signal that the current thread should no longer be accounted for in this particular phase.

We can get the current phase number by calling the getPhase() method.

public final int getPhase()

Phase Number is a positive integer incremented at every phase. If phase number is negative, that essentially means that the phaser has already terminated.

At any point, if we want to check the number of parties registered/arrived/yet-to-arrive, we can do so using one of the following methods :

 public int getRegisteredParties()
 public int getArrivedParties()
 public int getUnarrivedParties()

How can a phaser terminate?
A phaser terminates when the method onAdvance returns true. In default implementation of this method it returns true when all the partied have deregistered and number of registered parties becomes zero. We can check whether a phaser has terminated or not by calling method isTerminated on phaser instance. A phaser can also be force terminated. To force terminate a Phaser we can use :

public void forceTermination()

If the Phaser has already terminated, invoking this method has no impact

Note that, unlike CountDownLatch or CyclicBarrier, Phasers can be tiered (i.e arranged in a hierarchical tree structure). A Phaser can optionally accept a parent Phaser instance through it's constructor :

public Phaser(Phaser parent)
public Phaser(Phaser parent, int parties)

To get a reference to a Phaser's parent Phaser or the root Phaser in a hierarchy use the following methods :

 public Phaser getParent()
 public Phaser getRoot()

A phaser combines the best of both worlds from CountDownLatch and CyclicBarrier. It's reusable just like a CyclicBarrier and can distinguish between arrivals and waits just like a CountDownLatch. Unlike both CountDownLatch and CyclicBarrier, a Phaser supports dynamic number of registered parties.

The code snippet below depicts how a Phaser works.

We create a phaser with 3 registered parties. By default its in Phase 0
Now we start 3 threads which invoke arriveAndAwaitAdvance() on the phaser. So Phaser moves to phase 1, number of registered parties is sill 3
Now we start 2 threads who deregister from the phaser. Phaser is still in phase 1 but number of registered parties is 1
Now we start a thread which
invokes arriveAndAwaitAdvance() on the phaser. So phaser moves to phase 2
Now we start a thread which deregisters from the phaser. No parties are now registered with the phaser. So the phaser terminates and invoking getPhase() on it will return a negative integer now.

public static void main(String[] args) {

    Phaser phaser = new Phaser(3);
    System.out.println("Phase #"+phaser.getPhase()+ ", is_phaser_live: "+ !phaser.isTerminated() + ", registered_parties: "+ phaser.getRegisteredParties());

    Runnable task = new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " is waiting on phaser");
                phaser.arriveAndAwaitAdvance();
                System.out.println("Phase over "+Thread.currentThread().getName() + " can proceed now");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    };

    new Thread(task, "T1").start();
    new Thread(task, "T2").start();
    new Thread(task, "T3").start();

    delay(2);
    System.out.println("Phase #"+phaser.getPhase()+ ", is_phaser_live: "+ !phaser.isTerminated() + ", registered_parties: "+ phaser.getRegisteredParties());


    Runnable deregisterTask = new Runnable() {            
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " is deregistering from phaser");
            phaser.arriveAndDeregister();
        }
    };
    new Thread(deregisterTask, "T4").start();
    new Thread(deregisterTask, "T5").start();
    delay(2);        
    System.out.println("Phase #"+phaser.getPhase()+ ", is_phaser_live: "+ !phaser.isTerminated() + ", registered_parties: "+ phaser.getRegisteredParties());

    new Thread(task, "T6").start();
    delay(2);
    System.out.println("Phase #"+phaser.getPhase()+ ", is_phaser_live: "+ !phaser.isTerminated() + ", registered_parties: "+ phaser.getRegisteredParties());

    new Thread(deregisterTask, "T7").start();

    delay(2);
    System.out.println("Phase #"+phaser.getPhase()+ ", is_phaser_live: "+ !phaser.isTerminated() + ", registered_parties: "+ phaser.getRegisteredParties());

}
private static void delay(int count) {
    try {
        TimeUnit.SECONDS.sleep(count);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }        
}

Output:

 Phase #0, is_phaser_live: true, registered_parties: 3
 T1 is waiting on phaser
 T2 is waiting on phaser
 T3 is waiting on phaser
 Phase over T3 can proceed now
 Phase over T1 can proceed now
 Phase over T2 can proceed now
 Phase #1, is_phaser_live: true, registered_parties: 3
 T4 is deregistering from phaser
 T5 is deregistering from phaser
 Phase #1, is_phaser_live: true, registered_parties: 1
 T6 is waiting on phaser
 Phase over T6 can proceed now
 Phase #2, is_phaser_live: true, registered_parties: 1
 T7 is deregistering from phaser
 Phase #-2147483645, is_phaser_live: false, registered_parties: 0

Exchanger

Exchanger class represents a kind of rendezvous point where two threads can exchange objects. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue.

Exchanger Diagram

Whenever a thread arrives at the exchange point, it must wait for the other thread to arrive. When the other pairing thread arrives the two threads proceed to exchange their objects.

Exchanger can be used in a Producer-Consumer scenarios where one thread will produce the data and exchange it with the consumer thread. Consumer thread in turn will pass the empty buffer to the producer thread.

To create an Exchanger instance, we can use the default no-arg constructor

 public Exchanger()

Two threads can exchange data by using the exchange method :

 public V exchange(V x) throws InterruptedException

If another thread is already waiting at the exchange point then it is resumed for thread scheduling purposes and receives the object passed in by the current thread. The current thread returns immediately, receiving the object passed to the exchange by that other thread.

If no other thread is already waiting at the exchange then the current thread is disabled for thread scheduling purposes and lies dormant until one of two things happens:

  • Some other thread enters the exchange; or

  • Some other thread interrupts the current thread.

The exchange method also comes with a timeout flavour:

 public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

In the event of a timeout, the method throws TimeoutException

Here's a code snippet demonstrating how exchanger works :

public static void main(String[] args) {

    Exchanger<Integer> exchanger = new Exchanger<>();

    Runnable producer = new Runnable() {            
        @Override
        public void run() {
            int value = 0;
            while(true) {
                System.out.println(Thread.currentThread().getName() + " produced "+ (++value));
                try {
                    exchanger.exchange(value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }; 
    Runnable consumer = new Runnable() {            
        @Override
        public void run() {
            int value = 0;
            while(true) {
                try {
                    value = exchanger.exchange(0);
                    System.out.println(Thread.currentThread().getName() + " consumed "+ value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    new Thread(producer, "Producer").start();
    new Thread(consumer, "Consumer").start();
}

Output:

Producer produced 1
Consumer consumed 1
Producer produced 2
Consumer consumed 2
Producer produced 3
Consumer consumed 3
Producer produced 4
Consumer consumed 4
Producer produced 5
Consumer consumed 5

comments powered by Disqus