HomeJavaUnderstanding BlockingQueue in Java

Understanding BlockingQueue in Java

BlockingQueue interface in Java was first introduced in Java 1.5. BlockingQueue interface supports flow control by introducing blocking if either BlockingQueue is full or empty. All methods of BlockingQueue are atomic in nature and use internal locks or other forms of concurrency control.

Implementation of BlockingQueue is thread-safe, i.e., a thread trying to enqueue an element in a full queue is blocked until some other thread makes space in the queue, either by dequeuing one or more element or clearing the queue completely. Similarly it blocks a thread trying to delete from an empty queue until some other threads inserts an item.

Read More: Multithreading in Java

BlockingQueue does not accept null value.

The BlockingQueue interface is a part of java collections framework “java.util.concurrent” package along with various other concurrent Utility classes like ConcurrentHashMap, Counting Semaphore, CopyOnWriteArrrayList etc.

Java provides several BlockingQueue implementations such as:

  • LinkedBlockingQueue
  • ArrayBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

Now, let’s have a look at each of these specified BlockingQueue implementations with examples to have a clear understanding.

LinkedBlockingQueue

The LinkedBlockingQueue keeps the elements internally in a linked structure (linked nodes) and uses the FIFO (First In, First Out) order. This linked structure can optionally have an upper bound if desired. If no upper bound is specified,Integer.MAX_VALUE is used as the upper bound.

Here is how to instantiate and use a LinkedBlockingQueue:

The BlockingQueueExample class in the below example is starting a Producer and a Consumer in separate threads. The Producer inserts strings into the BlockingQueue, and the Consumer takes them out.

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {
        BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
        BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);
        
        new Thread(new Producer(bounded)).start();
        new Thread(new Consumer(bounded)).start();

        Thread.sleep(4000);
    }
}

Producer class which is inserting Strings in the BlockingQueue.

public class Producer implements Runnable {
    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("Item #1");
            Thread.sleep(1000);
            queue.put("Item #2");
            Thread.sleep(1000);
            queue.put("Item #3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Consumer class which is consuming the Strings which are inserted in the BlockingQueue via Producer class.

public class Consumer implements Runnable {
    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ArrayBlockingQueue

ArrayBlockingQueue is a bounded blocking queue similar to LinkedBlockingQueue that stores the elements internally in an array in FIFO (First In, First Out) order. The upper-bound for this BlockingQueue is set at the instantiation time, and once instantiated it cannot be changed.

Here is how to instantiate and use an ArrayBlockingQueue:

BlockingQueue queue = new ArrayBlockingQueue(1024);

queue.put("ArrayBlockingQueue item: #1");

System.out.println(queue.take());

PriorityBlockingQueue

Unlike LinkedBlockingQueue and ArrayBlockingQueue, PriorityBlockingQueue is an unbounded concurrent queue. It follows the same ordering rules as the java.util.PriorityQueue class and we cannot insert null into this queue.

The java.lang.Comparableinterface must be implemented by all elements that are being inserted into the PriorityBlockingQueue. The elements thus order themselves according to whatever priority we decide in our Comparable implementation.

Notice that the PriorityBlockingQueue does not enforce any specific behaviour for elements that have equal priority (compare() == 0). Also in case when we obtain an Iterator from a PriorityBlockingQueue, the Iterator does not guarantee to iterate the elements in priority order.

Here is how to instantiate and use an PriorityBlockingQueue:

BlockingQueue queue = new PriorityBlockingQueue();

//String class implements java.lang.Comparable
queue.put("PriorityBlockingQueue item: #1");

System.out.println(queue.take());

SynchronousQueue

The SynchronousQueue is a queue that can only contain a single element internally and it supports only two operations: put() and take().

Both of these operations are blocking, i.e., if a thread is trying to insert an element into the queue it gets blocked until another thread takes the element from the queue. Likewise, if a thread tries to take an element and no element is currently present, that thread is blocked until a thread insert an element into the queue.

Although the SynchronousQueue has an interface of a queue, we should think about it as an exchange point for a single element between two threads.


RELATED ARTICLES

Most Popular