BlockingQueue in Java
Introduction
A BlockingQueue in Java is a specialized queue that supports operations that wait for the queue to become non-empty when retrieving elements and wait for space to become available when storing elements. This makes it especially useful in multi-threaded environments for producer-consumer scenarios.
Java provides several implementations of BlockingQueue in the java.util.concurrent
package, allowing developers to efficiently manage concurrent data flow.
Why Use BlockingQueue?
Traditional queues like LinkedList
or ArrayDeque
do not provide built-in blocking behavior, leading to potential concurrency issues. BlockingQueue helps by:
-
Handling Synchronization Automatically: No need to manually implement synchronization.
-
Preventing Data Loss: Ensures proper handling of producer-consumer scenarios.
-
Avoiding CPU Wastage: Threads do not have to actively poll; they block until conditions are met.
-
Supporting Capacity Limits: Helps in avoiding memory overflow by limiting queue size.
Types of BlockingQueue in Java
Java provides different implementations of BlockingQueue
, each serving distinct purposes:
1. ArrayBlockingQueue
-
A bounded queue backed by an array.
-
Uses a fixed-size queue.
-
Useful for cases where the queue size is known beforehand.
import java.util.concurrent.*;
public class ArrayBlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("Removed: " + queue.take());
}
}
2. LinkedBlockingQueue
-
An optionally bounded queue.
-
Backed by a linked list.
-
Provides better throughput than
ArrayBlockingQueue
for large-scale tasks.
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
3. PriorityBlockingQueue
-
Unbounded queue with priority ordering.
-
Elements are sorted based on their natural order or a provided comparator.
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(10);
queue.put(5);
queue.put(20);
System.out.println(queue.take()); // Outputs 5
4. DelayQueue
-
Elements are available for retrieval only after a specified delay.
-
Useful in scheduling tasks.
import java.util.concurrent.*;
class DelayedTask implements Delayed {
private final long expiry;
private final String name;
public DelayedTask(String name, long delayInMillis) {
this.name = name;
this.expiry = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiry - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expiry, ((DelayedTask) o).expiry);
}
@Override
public String toString() {
return name;
}
}
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("Task 1", 5000));
queue.put(new DelayedTask("Task 2", 2000));
System.out.println("Waiting for tasks...");
while (!queue.isEmpty()) {
System.out.println("Processing: " + queue.take());
}
}
}
5. SynchronousQueue
-
A queue of size 0; each insert operation must wait for a corresponding remove.
-
Useful for hand-off designs.
BlockingQueue<Integer> queue = new SynchronousQueue<>();
Methods of BlockingQueue
BlockingQueue provides various important methods:
Method | Description |
---|---|
put(E e) |
Inserts an element, waiting if necessary |
take() |
Retrieves and removes the head, waiting if necessary |
offer(E e, long timeout, TimeUnit unit) |
Tries to add an element, waiting for a time |
poll(long timeout, TimeUnit unit) |
Retrieves and removes, waiting for a time |
remainingCapacity() |
Returns the number of additional elements it can accept |
Producer-Consumer Example Using BlockingQueue
A classic producer-consumer example where a producer thread adds elements, and a consumer thread takes them out:
import java.util.concurrent.*;
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) { this.queue = queue; }
public void run() {
try {
for (int i = 1; i <= 5; i++) {
queue.put(i);
System.out.println("Produced: " + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) { this.queue = queue; }
public void run() {
try {
while (true) {
int value = queue.take();
System.out.println("Consumed: " + value);
Thread.sleep(1500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
Conclusion
BlockingQueue is a powerful utility for managing concurrent tasks in Java. It simplifies multi-threaded programming by providing built-in synchronization and blocking mechanisms, making it ideal for scenarios like producer-consumer models, task scheduling, and more.
Using the right implementation of BlockingQueue based on your requirements ensures efficient, scalable, and reliable concurrent processing in Java applications.
Sign up here with your email
ConversionConversion EmoticonEmoticon