BlockingQueue is a thread-safe queue designed to handle producer-consumer scenarios efficiently without the need for manual synchronization using wait()
or notify()
. It belongs to the java.util.concurrent
package and provides built-in blocking behavior for adding and removing elements.
Important Features
-
Thread-safe: Automatically handles synchronization internally.
-
Blocking operations:
-
put(E e)
: Waits if the queue is full. -
take()
: Waits if the queue is empty.
-
-
Avoids explicit use of
wait()
/notify()
.
Java provides several implementations of blocking queues in the java.util.concurrent package, which are designed to be used in concurrent and multi-threaded environments.
Common Implementations
-
ArrayBlockingQueue – Bounded queue backed by an array.
-
LinkedBlockingQueue – Optionally bounded queue backed by linked nodes.
-
PriorityBlockingQueue – Orders elements based on priority.
-
DelayQueue – Delays elements until a delay has expired.
-
SynchronousQueue – No internal capacity; transfer occurs only when another thread is ready to receive.
Example Syntax
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// Producer
queue.put("item"); // waits if full
// Consumer
String item = queue.take(); // waits if empty
Code language: JavaScript (javascript)
ArrayBlockingQueue
The ArrayBlockingQueue is backed by an array and has a fixed capacity specified when it is created. Once created, the capacity cannot be changed. The key operations provided by ArrayBlockingQueue include:
- put(E e): Adds the specified element to the queue if space is available, or waits until space is available.
- take(): Retrieves and removes the head of the queue, waiting if necessary until an element becomes available.
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; class ArrayBlockQueueDemo { // Define the size of the task queue private static final int QUEUE_SIZE = 10; private static final int NUM_WORKERS = 3; // Create the shared task queue private static final BlockingQueue<Integer> taskQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); public static void main(String[] args) { // Create and start worker threads for (int i = 0; i < NUM_WORKERS; i++) { new Thread(new Worker(taskQueue)).start(); } // Create and start a thread to add tasks to the queue new Thread(new TaskGenerator(taskQueue)).start(); } } // TaskGenerator class that adds tasks to the queue class TaskGenerator implements Runnable { private final BlockingQueue<Integer> queue; private int taskId = 0; public TaskGenerator(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { queue.put(generateTask()); System.out.println("Task Added: " + taskId); taskId++; Thread.sleep(300); // Simulate time taken to generate a task } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private int generateTask() { return taskId; } } // Worker class that processes tasks from the queue class Worker implements Runnable { private final BlockingQueue<Integer> queue; public Worker(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer task = queue.take(); processTask(task); System.out.println("Task Processed: " + task); Thread.sleep(500); // Simulate time taken to process a task } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void processTask(Integer task) { // Simulate task processing System.out.println("Processing task: " + task); } } /* C:\>javac ArrayBlockQueueDemo.java C:\>java ArrayBlockQueueDemo Processing task: 0 Task Processed: 0 Task Added: 0 Task Added: 1 Processing task: 1 Task Processed: 1 Task Added: 2 Processing task: 2 Task Processed: 2 Task Added: 3 Processing task: 3 Task Processed: 3 Task Added: 4 Processing task: 4 Task Processed: 4 Task Added: 5 Processing task: 5 Task Processed: 5 Task Added: 6 Processing task: 6 Task Processed: 6 Task Added: 7 Processing task: 7 Task Processed: 7 Task Added: 8 Processing task: 8 Task Processed: 8 Task Added: 9 Processing task: 9 Task Processed: 9 Task Added: 10 Processing task: 10 Task Processed: 10 Task Added: 11 Processing task: 11 Task Processed: 11 Task Added: 12 Processing task: 12 Task Processed: 12 Task Added: 13 Processing task: 13 Task Processed: 13 Task Added: 14 Processing task: 14 Task Processed: 14 Task Added: 15 Processing task: 15 Task Processed: 15 Task Added: 16 Processing task: 16 Task Processed: 16 Task Added: 17 Processing task: 17 Task Processed: 17 Task Added: 18 Processing task: 18 Task Processed: 18 Processing task: 19 Task Added: 19 Task Processed: 19 Task Added: 20 Processing task: 20 Task Processed: 20 Task Added: 21 Processing task: 21 Task Processed: 21 Task Added: 22 Processing task: 22 Task Processed: 22 Task Added: 23 Processing task: 23 Task Processed: 23 Task Added: 24 Processing task: 24 Task Processed: 24 Task Added: 25 Processing task: 25 Task Processed: 25 Task Added: 26 Processing task: 26 Task Processed: 26 Task Added: 27 Processing task: 27 Task Processed: 27 Task Added: 28 Processing task: 28 Task Processed: 28 Task Added: 29 Processing task: 29 Task Processed: 29 Task Added: 30 Processing task: 30 Task Processed: 30 Task Added: 31 Processing task: 31 Task Processed: 31 Task Added: 32 Processing task: 32 Task Processed: 32 Task Added: 33 Processing task: 33 Task Processed: 33 Task Added: 34 Processing task: 34 Task Processed: 34 Task Added: 35 Processing task: 35 Task Processed: 35 Task Added: 36 Processing task: 36 Task Processed: 36 Task Added: 37 Processing task: 37 Task Processed: 37 Task Added: 38 Processing task: 38 Task Processed: 38 Processing task: 39 Task Added: 39 Task Processed: 39 Processing task: 40 Task Added: 40 Task Processed: 40 Task Added: 41 Processing task: 41 Task Processed: 41 Processing task: 42 Task Processed: 42 Task Added: 42 Task Added: 43 Processing task: 43 Task Processed: 43 Processing task: 44 Task Added: 44 Task Processed: 44 Task Added: 45 Processing task: 45 Task Processed: 45 Processing task: 46 Task Processed: 46 Task Added: 46 Task Added: 47 Processing task: 47 Task Processed: 47 Processing task: 48 Task Added: 48 Task Processed: 48 Task Added: 49 Processing task: 49 Task Processed: 49 Task Added: 50 Processing task: 50 Task Processed: 50 Task Added: 51 Processing task: 51 Task Processed: 51 Task Added: 52 Processing task: 52 */
LinkedBlockingQueue
The LinkedBlockingQueue is backed by a linked node structure and can optionally have a bounded capacity (if specified). If no capacity is specified, LinkedBlockingQueue has an Integer.MAX_VALUE capacity. The operations provided by LinkedBlockingQueue are similar to ArrayBlockingQueue, but it can dynamically grow as needed if it’s unbounded.
//LinkedBlockingQueueDemo.java import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueDemo { // Define the size of the blocking queue private static final int QUEUE_CAPACITY = 10; // Create the shared blocking queue private static final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); public static void main(String[] args) { // Create and start producer and consumer threads Thread producerThread = new Thread(new Producer(queue)); Thread consumerThread = new Thread(new Consumer(queue)); producerThread.start(); consumerThread.start(); } // Producer class that puts items into the queue static class Producer implements Runnable { private final LinkedBlockingQueue<Integer> queue; private int item = 0; public Producer(LinkedBlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { int producedItem = produce(); queue.put(producedItem); // Add item to queue, blocks if queue is full System.out.println("Produced: " + producedItem); // Notify consumers if needed - not strictly necessary with LinkedBlockingQueue synchronized (queue) { queue.notifyAll(); } Thread.sleep(500); // Simulate time taken to produce an item } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private int produce() { return item++; } } // Consumer class that takes items from the queue static class Consumer implements Runnable { private final LinkedBlockingQueue<Integer> queue; public Consumer(LinkedBlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer item = queue.take(); // Remove item from queue, blocks if queue is empty System.out.println("Consumed: " + item); process(item); // Notify producer if needed - not strictly necessary with LinkedBlockingQueue synchronized (queue) { queue.notifyAll(); } Thread.sleep(1000); // Simulate time taken to consume an item } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void process(Integer item) { // Simulate processing the item System.out.println("Processing item: " + item); } } } /* :\Users\AITS_CCF\Desktop\Bhava Advanced Concurrency\Bhava Edited\Thread Communication\04>javac LinkedBlockingQueueDemo.java C:\Users\AITS_CCF\Desktop\Bhava Advanced Concurrency\Bhava Edited\Thread Communication\04>java LinkedBlockingQueueDemo Consumed: 0 Processing item: 0 Produced: 0 Produced: 1 Consumed: 1 Processing item: 1 Produced: 2 Produced: 3 Consumed: 2 Processing item: 2 Produced: 4 Produced: 5 Consumed: 3 Processing item: 3 Produced: 6 Produced: 7 Consumed: 4 Processing item: 4 Produced: 8 Produced: 9 Consumed: 5 Processing item: 5 Produced: 10 Produced: 11 Consumed: 6 Processing item: 6 Produced: 12 Produced: 13 Consumed: 7 Processing item: 7 Produced: 14 Produced: 15 Consumed: 8 Processing item: 8 Produced: 16 Produced: 17 Consumed: 9 Processing item: 9 Produced: 18 Produced: 19 Consumed: 10 Processing item: 10 Produced: 20 Consumed: 11 Produced: 21 Processing item: 11 Consumed: 12 Produced: 22 Processing item: 12 Produced: 23 Consumed: 13 Processing item: 13 Consumed: 14 Produced: 24 Processing item: 14 Produced: 25 Consumed: 15 Processing item: 15 Consumed: 16 Produced: 26 Processing item: 16 Consumed: 17 Produced: 27 Processing item: 17 Consumed: 18 Produced: 28 */
PriorityBlockingQueue
PriorityBlockingQueue
is an unbounded blocking queue that uses a priority heap to order its elements. Elements are ordered according to their natural ordering (if they implement Comparable
) or by a specified comparator provided at queue construction time.
We demonstrate a PriorityBlockingQueue
where elements are custom objects (Task
objects) that implement the Comparable
interface to define their priority order based on their priority value.
//PriorityBlockingQueueDemo.java import java.util.concurrent.PriorityBlockingQueue; class Task implements Comparable<Task> { private String name; private int priority; // Special poison pill task to signal consumer to stop public static final Task POISON_PILL = new Task("POISON_PILL", Integer.MAX_VALUE); public Task(String name, int priority) { this.name = name; this.priority = priority; } public String getName() { return name; } public int getPriority() { return priority; } @Override public int compareTo(Task other) { return Integer.compare(this.priority, other.priority); } } public class PriorityBlockingQueueDemo { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>(); // Producer thread adding tasks with different priorities Thread producer = new Thread(() -> { try { queue.put(new Task("Task 1", 3)); // Lowest priority queue.put(new Task("Task 2", 1)); // Highest priority queue.put(new Task("Task 3", 2)); // Medium priority queue.put(Task.POISON_PILL); // Poison pill to signal consumer to stop } catch (Exception e) { e.printStackTrace(); } }); // Consumer thread processing tasks in priority order Thread consumer = new Thread(() -> { try { while (true) { Task task = queue.take(); // Block until a task is available if (task == Task.POISON_PILL) { System.out.println("Received poison pill. Consumer is shutting down."); break; // Exit loop once poison pill is received } System.out.println("Processing task: " + task.getName() + " with priority " + task.getPriority()); Thread.sleep(1000); // Simulate processing time } } catch (InterruptedException e) { System.out.println("Consumer thread interrupted."); } }); producer.start(); consumer.start(); // Let the producer and consumer threads finish producer.join(); consumer.join(); } } /* C:\>javac PriorityBlockingQueueDemo.java C:\>java PriorityBlockingQueueDemo Processing task: Task 2 with priority 1 Processing task: Task 3 with priority 2 Processing task: Task 1 with priority 3 Received poison pill. Consumer is shutting down. */
DelayQueue
DelayQueue
is an unbounded blocking queue of elements that implement the Delayed
interface. Elements can only be taken from the queue when their delay has expired.
//DelayQueueDemo.java import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; // Example class representing a delayed element class DelayedElement implements Delayed { private String name; private long delayTime; // Delay period in milliseconds private long expireTime; // Expiration time in milliseconds public DelayedElement(String name, long delayTime) { this.name = name; this.delayTime = delayTime; this.expireTime = System.currentTimeMillis() + delayTime; } public String getName() { return name; } @Override public long getDelay(TimeUnit unit) { long diff = expireTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed other) { long diff = this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS); return Long.compare(diff, 0); } } public class DelayQueueDemo { public static void main(String[] args) throws InterruptedException { DelayQueue<DelayedElement> queue = new DelayQueue<>(); // Producer thread adding delayed elements Thread producer = new Thread(() -> { try { queue.put(new DelayedElement("Delayed Element 1", 2000)); // Delay of 2 seconds queue.put(new DelayedElement("Delayed Element 2", 4000)); // Delay of 4 seconds queue.put(new DelayedElement("Delayed Element 3", 6000)); // Delay of 6 seconds } catch (Exception e) { e.printStackTrace(); } }); // Consumer thread processing delayed elements Thread consumer = new Thread(() -> { try { while (!Thread.currentThread().isInterrupted()) { DelayedElement element = queue.take(); // This will block until a delayed element's delay has expired System.out.println("Processing delayed element: " + element.getName()); } } catch (InterruptedException e) { System.out.println("Consumer thread interrupted, exiting..."); } }); producer.start(); consumer.start(); // Let the consumer run for some time Thread.sleep(8000); consumer.interrupt(); // Interrupt the consumer to stop the example gracefully producer.join(); consumer.join(); } } /* C:\>javac DelayQueueDemo.java C:\>java DelayQueueDemo Processing delayed element: Delayed Element 1 Processing delayed element: Delayed Element 2 Processing delayed element: Delayed Element 3 Consumer thread interrupted, exiting... */
SynchronousQueue
ynchronousQueue
is a blocking queue where each put
operation must wait for a corresponding take
operation by another thread, and vice versa. It has a size of zero, meaning each insert operation must wait for a corresponding remove operation by another thread, and vice versa.
import java.util.concurrent.SynchronousQueue; //SynchronousQueueDemo.java public class SynchronousQueueDemo{ public static void main(String[] args) throws InterruptedException { SynchronousQueue<Integer> queue = new SynchronousQueue<>(); // Producer thread putting data into the queue Thread producer = new Thread(() -> { try { for (int i = 1; i <= 5; i++) { System.out.println("Producing: " + i); queue.put(i); // This will block until a consumer takes the data } } catch (InterruptedException e) { e.printStackTrace(); } }); // Consumer thread taking data from the queue Thread consumer = new Thread(() -> { try { for (int i = 1; i <= 5; i++) { int value = queue.take(); // This will block until a producer puts data System.out.println("Consuming: " + value); } } catch (InterruptedException e) { e.printStackTrace(); } }); producer.start(); consumer.start(); producer.join(); consumer.join(); } } /* C:\>javac SynchronousQueueDemo.java C:\>java SynchronousQueueDemo Producing: 1 Producing: 2 Consuming: 1 Producing: 3 Consuming: 2 Producing: 4 Consuming: 3 Producing: 5 Consuming: 4 Consuming: 5 */
Blocking queues provide a clean, efficient, and thread-safe mechanism for coordinating communication between producer and consumer threads. By encapsulating complex synchronization logic, they eliminate the need for manual wait()
and notify()
calls, reducing the risk of common multithreading issues like deadlocks, race conditions, and missed signals. With various implementations such as ArrayBlockingQueue
and LinkedBlockingQueue
, they offer flexibility and performance tailored to different application needs. Overall, blocking queues are a powerful tool in Java’s concurrency framework, promoting safe and scalable concurrent programming.