Thread signaling and coordination

Thread signaling and coordination in Java involve mechanisms that allow threads to notify each other about changes in their state, thus enabling smooth and efficient inter-thread communication.

Here’s a available techniques for thread signaling and coordination:

  • wait(), notify(), and notifyAll() Methods
  • ReentrantLock and Condition Variables
  • CountDownLatch
  • CyclicBarrier
  • Semaphore

1.wait(), notify(), and notifyAll() Methods

These methods, part of the Object class, are fundamental for basic thread coordination:

wait(): Makes the current thread release the lock and enter the waiting state until another thread invokes notify() or notifyAll() on the same object.

notify(): Wakes up a single thread that is waiting on the object’s monitor.

notifyAll(): Wakes up all threads that are waiting on the object’s monitor.

These methods are always used within synchronized blocks or methods to ensure proper access control.

Program
//ProducerConsumerDemo.java
class SharedResource {
    private int data;
    private boolean isDataAvailable = false; // Track if data is available or not

    // Synchronized method for producer to produce data
    public synchronized void produce(int value) {
        while (isDataAvailable) {
            try {
                wait(); // Wait if data is already available
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Handle thread interruption
            }
        }
        data = value; // Assign new data value
        isDataAvailable = true; // Mark that new data is available
        System.out.println("Produced: " + value); // Log the produced value
        notifyAll(); // Notify waiting consumer thread
    }

    // Synchronized method for consumer to consume data
    public synchronized int consume() {
        while (!isDataAvailable) {
            try {
                wait(); // Wait if no data is available yet
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Handle thread interruption
            }
        }
        isDataAvailable = false; // Mark that the data has been consumed
        System.out.println("Consumed: " + data); // Log the consumed value
        notifyAll(); // Notify the producer that the data has been consumed
        return data; // Return the consumed data
    }
}

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource(); // Shared resource for both threads

        // Producer thread
        Runnable producerTask = () -> {
            for (int i = 1; i < 10; i++) {
                resource.produce(i); // Produce data
            }
        };

        // Consumer thread
        Runnable consumerTask = () -> {
            for (int i = 1; i < 10; i++) {
                resource.consume(); // Consume data
            }
        };

        // Create and start both producer and consumer threads
        Thread producerThread = new Thread(producerTask);
        Thread consumerThread = new Thread(consumerTask);
        
        // Start consumer first to ensure it waits for the producer
        consumerThread.start();
        producerThread.start();
    }
}

/*

C:\>javac ProducerConsumerDemo.java

C:\>java ProducerConsumerDemo
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Consumed: 4
Produced: 5
Consumed: 5
Produced: 6
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 9
Consumed: 9

*/

2.ReentrantLock and Condition Variables

ReentrantLock is a part of the java.util.concurrent.locks package and offers more flexibility compared to synchronized blocks. With ReentrantLock, you can create multiple Condition objects for finer control over thread coordination.

//ProducerConsumerUsingLocks.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class SharedResource {
    private int data;
    private boolean isDataAvailable = false;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    // Producer method
    public void produce(int value) {
        lock.lock();  // Acquire lock
        try {
            // Wait while data is available (i.e., the consumer hasn't consumed yet)
            while (isDataAvailable) {
                condition.await();  // Release the lock and wait for the consumer to signal
            }
            // Produce data
            data = value;
            isDataAvailable = true;
            System.out.println("Produced: " + value);
            // Notify all waiting threads (consumer) that new data is available
            condition.signalAll();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();  // Restore the interrupt status
        } finally {
            lock.unlock();  // Release the lock
        }
    }

    // Consumer method
    public int consume() {
        lock.lock();  // Acquire lock
        try {
            // Wait while no data is available (i.e., the producer hasn't produced yet)
            while (!isDataAvailable) {
                condition.await();  // Release the lock and wait for the producer to signal
            }
            // Consume data
            isDataAvailable = false;
            System.out.println("Consumed: " + data);
            // Notify all waiting threads (producer) that data has been consumed
            condition.signalAll();
            return data;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();  // Restore the interrupt status
            return -1;  // Handle appropriately
        } finally {
            lock.unlock();  // Release the lock
        }
    }
}

public class ProducerConsumerUsingLocks {
    public static void main(String[] args) {
        SharedResource resource = new SharedResource();  // Shared resource between producer and consumer

        // Producer task
        Runnable producerTask = () -> {
            for (int i = 1; i < 10; i++) {
                resource.produce(i);  // Produce data
            }
        };

        // Consumer task
        Runnable consumerTask = () -> {
            for (int i = 1; i < 10; i++) {
                resource.consume();  // Consume data
            }
        };

        // Create and start producer and consumer threads
        Thread producerThread = new Thread(producerTask);
        Thread consumerThread = new Thread(consumerTask);

        producerThread.start();
        consumerThread.start();

        try {
            producerThread.join();  // Wait for producer to finish
            consumerThread.join();  // Wait for consumer to finish
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();  // Handle thread interruption
        }
    }
}

/*
C:\>javac ProducerConsumerUsingLocks.java

C:\>java ProducerConsumerUsingLocks
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Consumed: 4
Produced: 5
Consumed: 5
Produced: 6
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 9
Consumed: 9
*/

3.CountDownLatch

CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations being performed by other threads completes.

//CountDownLatchDemo.java
import java.util.concurrent.CountDownLatch;
class Worker extends Thread {
    private final CountDownLatch latch;
    public Worker(CountDownLatch latch) {
        this.latch = latch;
    }
    public void run() {
        System.out.println("Worker is doing work");
        try {
            Thread.sleep(1000); // Simulate work
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("Worker finished work");
        latch.countDown();
    }
}
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int numberOfWorkers = 3;
        CountDownLatch latch = new CountDownLatch(numberOfWorkers);

        for (int i = 0; i < numberOfWorkers; i++) {
            new Worker(latch).start();
        }

        latch.await(); // Main thread waits until all workers finish
        System.out.println("All workers have finished");
    }
}

/*
C:\>javac CountDownLatchDemo.java

C:\>java CountDownLatchDemo
Worker is doing work
Worker is doing work
Worker is doing work
Worker finished work
Worker finished work
Worker finished work
All workers have finished
*/

4.CyclicBarrier

CyclicBarrier is a synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.

//CyclicBarrierDemo.java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Worker extends Thread {
    private final CyclicBarrier barrier;

    public Worker(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " is doing work");
            Thread.sleep(1000); // Simulate work
            System.out.println(Thread.currentThread().getName() + " finished work");
            barrier.await(); // Wait for all threads to reach this point
        } catch (InterruptedException | BrokenBarrierException e) {
            Thread.currentThread().interrupt();
        }
    }
}
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int numberOfWorkers = 3;
        CyclicBarrier barrier = new CyclicBarrier(numberOfWorkers, () -> {
            System.out.println("All workers have reached the barrier");
        });
        for (int i = 0; i < numberOfWorkers; i++) {
            new Worker(barrier).start();
        }
    }
}

/*
C:\>javac CyclicBarrierDemo.java

C:\>java CyclicBarrierDemo
Thread-1 is doing work
Thread-2 is doing work
Thread-0 is doing work
Thread-1 finished work
Thread-2 finished work
Thread-0 finished work
All workers have reached the barrier

*/

5.Semaphore

A Semaphore controls access to a shared resource through a set number of permits.

//SemaphoreDemo.java 
import java.util.concurrent.Semaphore;

class ResourceUser extends Thread {
    private final Semaphore semaphore;

    public ResourceUser(Semaphore semaphore) {
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " trying to acquire the lock...");
            semaphore.acquire();  // Acquire the lock (only one thread can hold it)
            System.out.println(Thread.currentThread().getName() + " acquired the lock and is using the resource.");
            // Simulate resource usage
            Thread.sleep(1000);  // Simulate some work
        } catch (InterruptedException e) {
            // Handle thread interruption (good practice to restore interrupted status)
            Thread.currentThread().interrupt();
            System.out.println(Thread.currentThread().getName() + " was interrupted!");
        } finally {
            System.out.println(Thread.currentThread().getName() + " released the lock.");
            semaphore.release();  // Release the lock
        }
    }
}

public class SemaphoreDemo {
    public static void main(String[] args) {
        // Semaphore with 1 permit, meaning only one thread can acquire the lock at a time
        Semaphore semaphore = new Semaphore(1);  // One permit for exclusive access

        for (int i = 0; i < 5; i++) {
            new ResourceUser(semaphore).start();
        }
    }
}

/*
C:\>javac SemaphoreDemo.java

C:\>java SemaphoreDemo
Thread-1 trying to acquire the lock...
Thread-1 acquired the lock and is using the resource.
Thread-3 trying to acquire the lock...
Thread-2 trying to acquire the lock...
Thread-4 trying to acquire the lock...
Thread-0 trying to acquire the lock...
Thread-1 released the lock.
Thread-3 acquired the lock and is using the resource.
Thread-3 released the lock.
Thread-2 acquired the lock and is using the resource.
Thread-2 released the lock.
Thread-4 acquired the lock and is using the resource.
Thread-4 released the lock.
Thread-0 acquired the lock and is using the resource.
Thread-0 released the lock.
*/

Java provides multiple ways to coordinate and signal between threads, each suited to different scenarios:

  • wait(), notify(), notifyAll(): Basic and low-level methods for synchronization within synchronized blocks or methods.
  • ReentrantLock and Condition: More flexible and powerful than synchronized blocks.
  • CountDownLatch: Used for waiting until a set of operations are completed.
  • CyclicBarrier: Used for a group of threads to wait for each other.
  • Semaphore: Used to control access to a resource with a fixed number of permits.

Choosing the appropriate mechanism depends on the specific requirements of your application.

Scroll to Top