BlockingQueue interface in Java was first introduced in Java 1.5. BlockingQueue interface supports flow control by introducing blocking if either BlockingQueue is full or empty. All methods of BlockingQueue are atomic in nature and use internal locks or other forms of concurrency control.
Implementation of BlockingQueue is thread-safe, i.e., a thread trying to enqueue an element in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more element or clearing the queue completely. Similarly it blocks a thread trying to delete from an empty queue until some other threads inserts an item.
Read More: Multithreading in Java
BlockingQueue does not accept null value.
The BlockingQueue interface is a part of java collections framework “java.util.concurrent” package along with various other concurrent Utility classes like ConcurrentHashMap, Counting Semaphore, CopyOnWriteArrrayList etc.
Java provides several BlockingQueue implementations such as:
- LinkedBlockingQueue
- ArrayBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
Now, let’s have a look at each of these specified BlockingQueue implementations with examples to have a clear understanding.
LinkedBlockingQueue
The LinkedBlockingQueue
keeps the elements internally in a linked structure (linked nodes) and uses the FIFO (First In, First Out) order. This linked structure can optionally have an upper bound if desired. If no upper bound is specified,Integer.MAX_VALUE
is used as the upper bound.
Here is how to instantiate and use a LinkedBlockingQueue
:
The BlockingQueueExample
class in the below example is starting a Producer
and a Consumer
in separate threads. The Producer
inserts strings into the BlockingQueue
, and the Consumer
takes them out.
public class BlockingQueueExample { public static void main(String[] args) throws Exception { BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>(); BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024); new Thread(new Producer(bounded)).start(); new Thread(new Consumer(bounded)).start(); Thread.sleep(4000); } }
Producer
class which is inserting Strings in the BlockingQueue
.
public class Producer implements Runnable { protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("Item #1"); Thread.sleep(1000); queue.put("Item #2"); Thread.sleep(1000); queue.put("Item #3"); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer
class which is consuming the Strings which are inserted in the BlockingQueue
via Producer
class.
public class Consumer implements Runnable { protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
ArrayBlockingQueue
ArrayBlockingQueue
is a bounded blocking queue similar to LinkedBlockingQueue
that stores the elements internally in an array in FIFO (First In, First Out) order. The upper-bound for this BlockingQueue
is set at the instantiation time, and once instantiated it cannot be changed.
Here is how to instantiate and use an ArrayBlockingQueue
:
BlockingQueue queue = new ArrayBlockingQueue(1024); queue.put("ArrayBlockingQueue item: #1"); System.out.println(queue.take());
PriorityBlockingQueue
Unlike LinkedBlockingQueue and ArrayBlockingQueue, PriorityBlockingQueue
is an unbounded concurrent queue. It follows the same ordering rules as the java.util.PriorityQueue
class and we cannot insert null into this queue.
The java.lang.Comparable
interface must be implemented by all elements that are being inserted into the PriorityBlockingQueue
. The elements thus order themselves according to whatever priority we decide in our Comparable
implementation.
Notice that the PriorityBlockingQueue
does not enforce any specific behaviour for elements that have equal priority (compare() == 0). Also in case when we obtain an Iterator
from a PriorityBlockingQueue
, the Iterator
does not guarantee to iterate the elements in priority order.
Here is how to instantiate and use an PriorityBlockingQueue
:
BlockingQueue queue = new PriorityBlockingQueue(); //String class implements java.lang.Comparable queue.put("PriorityBlockingQueue item: #1"); System.out.println(queue.take());
SynchronousQueue
The SynchronousQueue
is a queue that can only contain a single element internally and it supports only two operations: put() and take().
Both of these operations are blocking, i.e., if a thread is trying to insert an element into the queue it gets blocked until another thread takes the element from the queue. Likewise, if a thread tries to take an element and no element is currently present, that thread is blocked until a thread insert an element into the queue.
Although the SynchronousQueue
has an interface of a queue, we should think about it as an exchange point for a single element between two threads.