Transfer Queue

A transfer queue is a special type of BlockingQueue (it implements BlockingQueue) where producers may optionally wait until consumers have consumed the corresponding element.

Like other blocking queues, a TransferQueue may be capacity bounded. If so, an attempted transfer operation may initially block waiting for available space, and/or subsequently block waiting for reception by a consumer. Note that in a queue with zero capacity, such as SynchronousQueue, put and transfer are effectively synonymous.

TransferQueue has only one built-in implementation that comes as part of JDK - LinkedTransferQueue which is an unbounded TransferQueue implementation backed by linked nodes. Note that unlike most collections, size() in a LinkedTransferQueue is not a constant time operation.

When a producer sends a message to a consumer using the transfer() method, the producer thread stays blocked until the message is consumed.

 void transfer(E e)

If there is a consumer already waiting to take the element out (blocked on call to poll or offer, the method call returns immediately. Else the thread is blocked until a consumer takes the element out of the queue.

There are a few available variants of the transfer method :

 boolean tryTransfer(E e)

tryTransfer returns immediately with a boolean value indicating if there are consumers available to take this element or not. The element is enqueued only in cases where it returns false.

boolean tryTransfer(E e, long timeout, TimeUnit unit)

Similar to the other tryTransfer method except that it waits for a specified period for consumers to be available before returning.

Before enqueueing a message, producer can check if there are any consumers available (or how many consumers are currently available) to consume the message by invoking the following methods:

boolean hasWaitingConsumer()
int getWaitingConsumerCount()

Let's explore the behavior of transfer queue by trying out a few code snippets :

Multiple producers, no consumers

Let's try running the following code. Here we have 5 threads trying to transfer elements, but no consumer thread is running

    TransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
    AtomicInteger counter = new AtomicInteger(0);

    Runnable producer = () -> {
        try {
            queue.transfer(counter.incrementAndGet());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    for(int i=0; i<5; i++) {
        new Thread(producer, "T-"+i).start();
    }
}

As excepted the process does not terminate (because the threads are blocked). If we try to monitor the threads through visualVM :


Clearly all the 5 threads are in blocked state

Let's try the same scenario again with tryTransfer()

public static void main(String[] args) throws Exception {
    TransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
    AtomicInteger counter = new AtomicInteger(0);

    Runnable producer = () -> {
        try {
            queue.tryTransfer(counter.incrementAndGet(), 2, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    for(int i=0; i<5; i++) {
        new Thread(producer, "T-"+i).start();
    }

    TimeUnit.MINUTES.sleep(1);        
    System.out.println(queue.size());

    TimeUnit.MINUTES.sleep(2);        
    System.out.println(queue.size());
}

Now we can see that the producer threads got terminated after a while.

Output:

5
0

Interestingly, size() is effectively doing an uncommitted read here. Even though the messages weren't enqueued eventually, size() prints count as 5 when the producer threads were blocked at call to tryTransfer

One producer, many consumers

Let's try running the following code. Each consumer sleeps for 10 ms before consuming a message. We'll start 4 consumers and then sleep for 10 seconds. Now the consumers would be already invoking take() on the queue. Now we'll transfer 5 values (and start a 5th consumer just before the 5th transfer). The first 4 transfers will return almost immediately whereas the 5th will be slower by almost 10 ms

public static void main(String[] args) throws Exception {
    TransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();

    Runnable consumer = () -> {
        try {
            TimeUnit.MILLISECONDS.sleep(10);
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    new Thread(consumer, "Consumer-1").start();
    new Thread(consumer, "Consumer-2").start();
    new Thread(consumer, "Consumer-3").start();
    new Thread(consumer, "Consumer-4").start();

    TimeUnit.MILLISECONDS.sleep(10);

    for(int i=0; i<5; i++) {
        System.out.println("Is consumer available : "+ queue.hasWaitingConsumer());
        System.out.println("Number of consumers available : "+ queue.getWaitingConsumerCount());
        if (i==4) {
            new Thread(consumer, "Consumer-5").start();
        }
        long start = System.nanoTime();
        queue.transfer(i);
        long end = System.nanoTime();
        System.out.println("transfer completed for "+i+ " in "+(end - start) + " ns");
    }

}

Output:

 Is consumer available : false
 Number of consumers available : 4
 transfer completed for 0 in 477485 ns
 Is consumer available : true
 Number of consumers available : 3
 transfer completed for 1 in 9371 ns
 Is consumer available : true
 Number of consumers available : 2
 transfer completed for 2 in 2232 ns
 Is consumer available : true
 Number of consumers available : 1
 transfer completed for 3 in 4016 ns
 Is consumer available : false
 Number of consumers available : 0
 transfer completed for 4 in 11217313 ns

comments powered by Disqus