Blocking Queues

BlockingQueue is a thread-safe queue designed to handle producer-consumer scenarios efficiently without the need for manual synchronization using wait() or notify(). It belongs to the java.util.concurrent package and provides built-in blocking behavior for adding and removing elements.


Important Features

  • Thread-safe: Automatically handles synchronization internally.

  • Blocking operations:

    • put(E e): Waits if the queue is full.

    • take(): Waits if the queue is empty.

  • Avoids explicit use of wait()/notify().


Java provides several implementations of blocking queues in the java.util.concurrent package, which are designed to be used in concurrent and multi-threaded environments.

Common Implementations

  1. ArrayBlockingQueue – Bounded queue backed by an array.

  2. LinkedBlockingQueue – Optionally bounded queue backed by linked nodes.

  3. PriorityBlockingQueue – Orders elements based on priority.

  4. DelayQueue – Delays elements until a delay has expired.

  5. SynchronousQueue – No internal capacity; transfer occurs only when another thread is ready to receive.


Example Syntax

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// Producer
queue.put("item"); // waits if full

// Consumer
String item = queue.take(); // waits if emptyCode language: JavaScript (javascript)

ArrayBlockingQueue

The ArrayBlockingQueue is backed by an array and has a fixed capacity specified when it is created. Once created, the capacity cannot be changed. The key operations provided by ArrayBlockingQueue include:

  • put(E e): Adds the specified element to the queue if space is available, or waits until space is available.
  • take(): Retrieves and removes the head of the queue, waiting if necessary until an element becomes available.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class ArrayBlockQueueDemo {

    // Define the size of the task queue
    private static final int QUEUE_SIZE = 10;
    private static final int NUM_WORKERS = 3;

    // Create the shared task queue
    private static final BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);

    public static void main(String[] args) {
        // Create and start worker threads
        for (int i = 0; i < NUM_WORKERS; i++) {
            new Thread(new Worker(taskQueue)).start();
        }

        // Create and start a thread to add tasks to the queue
        new Thread(new TaskGenerator(taskQueue)).start();
    }
}

// TaskGenerator class that adds tasks to the queue
class TaskGenerator implements Runnable {
    private final BlockingQueue<Integer> queue;
    private int taskId = 0;

    public TaskGenerator(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                queue.put(generateTask());
                System.out.println("Task Added: " + taskId);
                taskId++;
                Thread.sleep(300); // Simulate time taken to generate a task
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private int generateTask() {
        return taskId;
    }
}

// Worker class that processes tasks from the queue
class Worker implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Worker(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer task = queue.take();
                processTask(task);
                System.out.println("Task Processed: " + task);
                Thread.sleep(500); // Simulate time taken to process a task
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processTask(Integer task) {
        // Simulate task processing
        System.out.println("Processing task: " + task);
    }
}
/*
C:\>javac ArrayBlockQueueDemo.java

C:\>java ArrayBlockQueueDemo
Processing task: 0
Task Processed: 0
Task Added: 0
Task Added: 1
Processing task: 1
Task Processed: 1
Task Added: 2
Processing task: 2
Task Processed: 2
Task Added: 3
Processing task: 3
Task Processed: 3
Task Added: 4
Processing task: 4
Task Processed: 4
Task Added: 5
Processing task: 5
Task Processed: 5
Task Added: 6
Processing task: 6
Task Processed: 6
Task Added: 7
Processing task: 7
Task Processed: 7
Task Added: 8
Processing task: 8
Task Processed: 8
Task Added: 9
Processing task: 9
Task Processed: 9
Task Added: 10
Processing task: 10
Task Processed: 10
Task Added: 11
Processing task: 11
Task Processed: 11
Task Added: 12
Processing task: 12
Task Processed: 12
Task Added: 13
Processing task: 13
Task Processed: 13
Task Added: 14
Processing task: 14
Task Processed: 14
Task Added: 15
Processing task: 15
Task Processed: 15
Task Added: 16
Processing task: 16
Task Processed: 16
Task Added: 17
Processing task: 17
Task Processed: 17
Task Added: 18
Processing task: 18
Task Processed: 18
Processing task: 19
Task Added: 19
Task Processed: 19
Task Added: 20
Processing task: 20
Task Processed: 20
Task Added: 21
Processing task: 21
Task Processed: 21
Task Added: 22
Processing task: 22
Task Processed: 22
Task Added: 23
Processing task: 23
Task Processed: 23
Task Added: 24
Processing task: 24
Task Processed: 24
Task Added: 25
Processing task: 25
Task Processed: 25
Task Added: 26
Processing task: 26
Task Processed: 26
Task Added: 27
Processing task: 27
Task Processed: 27
Task Added: 28
Processing task: 28
Task Processed: 28
Task Added: 29
Processing task: 29
Task Processed: 29
Task Added: 30
Processing task: 30
Task Processed: 30
Task Added: 31
Processing task: 31
Task Processed: 31
Task Added: 32
Processing task: 32
Task Processed: 32
Task Added: 33
Processing task: 33
Task Processed: 33
Task Added: 34
Processing task: 34
Task Processed: 34
Task Added: 35
Processing task: 35
Task Processed: 35
Task Added: 36
Processing task: 36
Task Processed: 36
Task Added: 37
Processing task: 37
Task Processed: 37
Task Added: 38
Processing task: 38
Task Processed: 38
Processing task: 39
Task Added: 39
Task Processed: 39
Processing task: 40
Task Added: 40
Task Processed: 40
Task Added: 41
Processing task: 41
Task Processed: 41
Processing task: 42
Task Processed: 42
Task Added: 42
Task Added: 43
Processing task: 43
Task Processed: 43
Processing task: 44
Task Added: 44
Task Processed: 44
Task Added: 45
Processing task: 45
Task Processed: 45
Processing task: 46
Task Processed: 46
Task Added: 46
Task Added: 47
Processing task: 47
Task Processed: 47
Processing task: 48
Task Added: 48
Task Processed: 48
Task Added: 49
Processing task: 49
Task Processed: 49
Task Added: 50
Processing task: 50
Task Processed: 50
Task Added: 51
Processing task: 51
Task Processed: 51
Task Added: 52
Processing task: 52
*/

LinkedBlockingQueue

The LinkedBlockingQueue is backed by a linked node structure and can optionally have a bounded capacity (if specified). If no capacity is specified, LinkedBlockingQueue has an Integer.MAX_VALUE capacity. The operations provided by LinkedBlockingQueue are similar to ArrayBlockingQueue, but it can dynamically grow as needed if it’s unbounded.

//LinkedBlockingQueueDemo.java
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueDemo {

    // Define the size of the blocking queue
    private static final int QUEUE_CAPACITY = 10;

    // Create the shared blocking queue
    private static final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

    public static void main(String[] args) {
        // Create and start producer and consumer threads
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));

        producerThread.start();
        consumerThread.start();
    }

    // Producer class that puts items into the queue
    static class Producer implements Runnable {
        private final LinkedBlockingQueue<Integer> queue;
        private int item = 0;

        public Producer(LinkedBlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    int producedItem = produce();
                    queue.put(producedItem);  // Add item to queue, blocks if queue is full
                    System.out.println("Produced: " + producedItem);

                    // Notify consumers if needed - not strictly necessary with LinkedBlockingQueue
                    synchronized (queue) {
                        queue.notifyAll();
                    }

                    Thread.sleep(500);  // Simulate time taken to produce an item
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private int produce() {
            return item++;
        }
    }

    // Consumer class that takes items from the queue
    static class Consumer implements Runnable {
        private final LinkedBlockingQueue<Integer> queue;

        public Consumer(LinkedBlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Integer item = queue.take();  // Remove item from queue, blocks if queue is empty
                    System.out.println("Consumed: " + item);
                    process(item);

                    // Notify producer if needed - not strictly necessary with LinkedBlockingQueue
                    synchronized (queue) {
                        queue.notifyAll();
                    }

                    Thread.sleep(1000);  // Simulate time taken to consume an item
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void process(Integer item) {
            // Simulate processing the item
            System.out.println("Processing item: " + item);
        }
    }
}
/*
:\Users\AITS_CCF\Desktop\Bhava Advanced Concurrency\Bhava Edited\Thread Communication\04>javac LinkedBlockingQueueDemo.java

C:\Users\AITS_CCF\Desktop\Bhava Advanced Concurrency\Bhava Edited\Thread Communication\04>java LinkedBlockingQueueDemo
Consumed: 0
Processing item: 0
Produced: 0
Produced: 1
Consumed: 1
Processing item: 1
Produced: 2
Produced: 3
Consumed: 2
Processing item: 2
Produced: 4
Produced: 5
Consumed: 3
Processing item: 3
Produced: 6
Produced: 7
Consumed: 4
Processing item: 4
Produced: 8
Produced: 9
Consumed: 5
Processing item: 5
Produced: 10
Produced: 11
Consumed: 6
Processing item: 6
Produced: 12
Produced: 13
Consumed: 7
Processing item: 7
Produced: 14
Produced: 15
Consumed: 8
Processing item: 8
Produced: 16
Produced: 17
Consumed: 9
Processing item: 9
Produced: 18
Produced: 19
Consumed: 10
Processing item: 10
Produced: 20
Consumed: 11
Produced: 21
Processing item: 11
Consumed: 12
Produced: 22
Processing item: 12
Produced: 23
Consumed: 13
Processing item: 13
Consumed: 14
Produced: 24
Processing item: 14
Produced: 25
Consumed: 15
Processing item: 15
Consumed: 16
Produced: 26
Processing item: 16
Consumed: 17
Produced: 27
Processing item: 17
Consumed: 18
Produced: 28
*/

PriorityBlockingQueue

PriorityBlockingQueue is an unbounded blocking queue that uses a priority heap to order its elements. Elements are ordered according to their natural ordering (if they implement Comparable) or by a specified comparator provided at queue construction time.

We demonstrate a PriorityBlockingQueue where elements are custom objects (Task objects) that implement the Comparable interface to define their priority order based on their priority value.

//PriorityBlockingQueueDemo.java
import java.util.concurrent.PriorityBlockingQueue;

class Task implements Comparable<Task> {
    private String name;
    private int priority;
    
    // Special poison pill task to signal consumer to stop
    public static final Task POISON_PILL = new Task("POISON_PILL", Integer.MAX_VALUE);

    public Task(String name, int priority) {
        this.name = name;
        this.priority = priority;
    }

    public String getName() {
        return name;
    }

    public int getPriority() {
        return priority;
    }

    @Override
    public int compareTo(Task other) {
        return Integer.compare(this.priority, other.priority);
    }
}

public class PriorityBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();

        // Producer thread adding tasks with different priorities
        Thread producer = new Thread(() -> {
            try {
                queue.put(new Task("Task 1", 3)); // Lowest priority
                queue.put(new Task("Task 2", 1)); // Highest priority
                queue.put(new Task("Task 3", 2)); // Medium priority
                queue.put(Task.POISON_PILL); // Poison pill to signal consumer to stop
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // Consumer thread processing tasks in priority order
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    Task task = queue.take(); // Block until a task is available
                    if (task == Task.POISON_PILL) {
                        System.out.println("Received poison pill. Consumer is shutting down.");
                        break; // Exit loop once poison pill is received
                    }
                    System.out.println("Processing task: " + task.getName() + " with priority " + task.getPriority());
                    Thread.sleep(1000); // Simulate processing time
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer thread interrupted.");
            }
        });

        producer.start();
        consumer.start();

        // Let the producer and consumer threads finish
        producer.join();
        consumer.join();
    }
}

/*

C:\>javac PriorityBlockingQueueDemo.java

C:\>java PriorityBlockingQueueDemo
Processing task: Task 2 with priority 1
Processing task: Task 3 with priority 2
Processing task: Task 1 with priority 3
Received poison pill. Consumer is shutting down.
*/

DelayQueue

DelayQueue is an unbounded blocking queue of elements that implement the Delayed interface. Elements can only be taken from the queue when their delay has expired.

//DelayQueueDemo.java
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

// Example class representing a delayed element
class DelayedElement implements Delayed {
    private String name;
    private long delayTime; // Delay period in milliseconds
    private long expireTime; // Expiration time in milliseconds

    public DelayedElement(String name, long delayTime) {
        this.name = name;
        this.delayTime = delayTime;
        this.expireTime = System.currentTimeMillis() + delayTime;
    }

    public String getName() {
        return name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = expireTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
        return Long.compare(diff, 0);
    }
}

public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedElement> queue = new DelayQueue<>();

        // Producer thread adding delayed elements
        Thread producer = new Thread(() -> {
            try {
                queue.put(new DelayedElement("Delayed Element 1", 2000)); // Delay of 2 seconds
                queue.put(new DelayedElement("Delayed Element 2", 4000)); // Delay of 4 seconds
                queue.put(new DelayedElement("Delayed Element 3", 6000)); // Delay of 6 seconds
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // Consumer thread processing delayed elements
        Thread consumer = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    DelayedElement element = queue.take(); // This will block until a delayed element's delay has expired
                    System.out.println("Processing delayed element: " + element.getName());
                }
            } catch (InterruptedException e) {
                System.out.println("Consumer thread interrupted, exiting...");
            }
        });

        producer.start();
        consumer.start();

        // Let the consumer run for some time
        Thread.sleep(8000);
        consumer.interrupt(); // Interrupt the consumer to stop the example gracefully

        producer.join();
        consumer.join();
    }
}


/*
C:\>javac DelayQueueDemo.java

C:\>java DelayQueueDemo
Processing delayed element: Delayed Element 1
Processing delayed element: Delayed Element 2
Processing delayed element: Delayed Element 3
Consumer thread interrupted, exiting...

*/

SynchronousQueue

ynchronousQueue is a blocking queue where each put operation must wait for a corresponding take operation by another thread, and vice versa. It has a size of zero, meaning each insert operation must wait for a corresponding remove operation by another thread, and vice versa.

import java.util.concurrent.SynchronousQueue;
//SynchronousQueueDemo.java
public class SynchronousQueueDemo{
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();

        // Producer thread putting data into the queue
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    System.out.println("Producing: " + i);
                    queue.put(i); // This will block until a consumer takes the data
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // Consumer thread taking data from the queue
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    int value = queue.take(); // This will block until a producer puts data
                    System.out.println("Consuming: " + value);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();
    }
}

/*
C:\>javac SynchronousQueueDemo.java

C:\>java SynchronousQueueDemo
Producing: 1
Producing: 2
Consuming: 1
Producing: 3
Consuming: 2
Producing: 4
Consuming: 3
Producing: 5
Consuming: 4
Consuming: 5
*/

Blocking queues provide a clean, efficient, and thread-safe mechanism for coordinating communication between producer and consumer threads. By encapsulating complex synchronization logic, they eliminate the need for manual wait() and notify() calls, reducing the risk of common multithreading issues like deadlocks, race conditions, and missed signals. With various implementations such as ArrayBlockingQueue and LinkedBlockingQueue, they offer flexibility and performance tailored to different application needs. Overall, blocking queues are a powerful tool in Java’s concurrency framework, promoting safe and scalable concurrent programming.

Scroll to Top