org.logicalcobwebs.concurrent
Interface Channel

All Superinterfaces:
Puttable, Takable
All Known Subinterfaces:
BoundedChannel
All Known Implementing Classes:
LinkedQueue

public interface Channel
extends Puttable, Takable

Main interface for buffers, queues, pipes, conduits, etc.

A Channel represents anything that you can put items into and take them out of. As with the Sync interface, both blocking (put(x), take), and timeouts (offer(x, msecs), poll(msecs)) policies are provided. Using a zero timeout for offer and poll results in a pure balking policy.

To aid in efforts to use Channels in a more typesafe manner, this interface extends Puttable and Takable. You can restrict arguments of instance variables to this type as a way of guaranteeing that producers never try to take, or consumers put. for example:

 class Producer implements Runnable {
   final Puttable chan;
   Producer(Puttable channel) { chan = channel; }
   public void run() {
     try {
       for(;;) { chan.put(produce()); }
     }
     catch (InterruptedException ex) {}
   }
   Object produce() { ... }
 }


 class Consumer implements Runnable {
   final Takable chan;
   Consumer(Takable channel) { chan = channel; }
   public void run() {
     try {
       for(;;) { consume(chan.take()); }
     }
     catch (InterruptedException ex) {}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     Channel chan = new SomeChannelImplementation();
     Producer p = new Producer(chan);
     Consumer c = new Consumer(chan);
     new Thread(p).start();
     new Thread(c).start();
   }
 }
 

A given channel implementation might or might not have bounded capacity or other insertion constraints, so in general, you cannot tell if a given put will block. However, Channels that are designed to have an element capacity (and so always block when full) should implement the BoundedChannel subinterface.

Channels may hold any kind of item. However, insertion of null is not in general supported. Implementations may (all currently do) throw IllegalArgumentExceptions upon attempts to insert null.

By design, the Channel interface does not support any methods to determine the current number of elements being held in the channel. This decision reflects the fact that in concurrent programming, such methods are so rarely useful that including them invites misuse; at best they could provide a snapshot of current state, that could change immediately after being reported. It is better practice to instead use poll and offer to try to take and put elements without blocking. For example, to empty out the current contents of a channel, you could write:

  try {
    for (;;) {
       Object item = channel.poll(0);
       if (item != null)
         process(item);
       else
         break;
    }
  }
  catch(InterruptedException ex) { ... }
 

However, it is possible to determine whether an item exists in a Channel via peek, which returns but does NOT remove the next item that can be taken (or null if there is no such item). The peek operation has a limited range of applicability, and must be used with care. Unless it is known that a given thread is the only possible consumer of a channel, and that no time-out-based offer operations are ever invoked, there is no guarantee that the item returned by peek will be available for a subsequent take.

When appropriate, you can define an isEmpty method to return whether peek returns null.

Also, as a compromise, even though it does not appear in interface, implementation classes that can readily compute the number of elements support a size() method. This allows careful use, for example in queue length monitors, appropriate to the particular implementation constraints and properties.

All channels allow multiple producers and/or consumers. They do not support any kind of close method to shut down operation or indicate completion of particular producer or consumer threads. If you need to signal completion, one way to do it is to create a class such as

 class EndOfStream {
    // Application-dependent field/methods
 }
 
And to have producers put an instance of this class into the channel when they are done. The consumer side can then check this via
   Object x = aChannel.take();
   if (x instanceof EndOfStream)
     // special actions; perhaps terminate
   else
     // process normally
 

In time-out based methods (poll(msecs) and offer(x, msecs), time bounds are interpreted in a coarse-grained, best-effort fashion. Since there is no way in Java to escape out of a wait for a synchronized method/block, time bounds can sometimes be exceeded when there is a lot contention for the channel. Additionally, some Channel semantics entail a ``point of no return'' where, once some parts of the operation have completed, others must follow, regardless of time bound.

Interruptions are in general handled as early as possible in all methods. Normally, InterruptionExceptions are thrown in put/take and offer(msec)/poll(msec) if interruption is detected upon entry to the method, as well as in any later context surrounding waits.

If a put returns normally, an offer returns true, or a put or poll returns non-null, the operation completed successfully. In all other cases, the operation fails cleanly -- the element is not put or taken.

As with Sync classes, spinloops are not directly supported, are not particularly recommended for routine use, but are not hard to construct. For example, here is an exponential backoff version:

 Object backOffTake(Channel q) throws InterruptedException {
   long waitTime = 0;
   for (;;) {
      Object x = q.poll(0);
      if (x != null)
        return x;
      else {
        Thread.sleep(waitTime);
        waitTime = 3 * waitTime / 2 + 1;
      }
    }
 

Sample Usage. Here is a producer/consumer design where the channel is used to hold Runnable commands representing background tasks.

 class Service {
   private final Channel channel = ... some Channel implementation;

   private void backgroundTask(int taskParam) { ... }

   public void action(final int arg) {
     Runnable command =
       new Runnable() {
         public void run() { backgroundTask(arg); }
       };
     try { channel.put(command) }
     catch (InterruptedException ex) {
       Thread.currentThread().interrupt(); // ignore but propagate
     }
   }

   public Service() {
     Runnable backgroundLoop =
       new Runnable() {
         public void run() {
           for (;;) {
             try {
               Runnable task = (Runnable)(channel.take());
               task.run();
             }
             catch (InterruptedException ex) { return; }
           }
         }
       };
     new Thread(backgroundLoop).start();
   }
 }

 

[ Introduction to this package. ]

See Also:
Sync, BoundedChannel

Method Summary
 boolean offer(java.lang.Object item, long msecs)
          Place item in channel only if it can be accepted within msecs milliseconds.
 java.lang.Object peek()
          Return, but do not remove object at head of Channel, or null if it is empty.
 java.lang.Object poll(long msecs)
          Return and remove an item from channel only if one is available within msecs milliseconds.
 void put(java.lang.Object item)
          Place item in the channel, possibly waiting indefinitely until it can be accepted.
 java.lang.Object take()
          Return and remove an item from channel, possibly waiting indefinitely until such an item exists.
 

Method Detail

put

void put(java.lang.Object item)
         throws java.lang.InterruptedException
Place item in the channel, possibly waiting indefinitely until it can be accepted. Channels implementing the BoundedChannel subinterface are generally guaranteed to block on puts upon reaching capacity, but other implementations may or may not block.

Specified by:
put in interface Puttable
Parameters:
item - the element to be inserted. Should be non-null.
Throws:
java.lang.InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case the element is guaranteed not to be inserted. Otherwise, on normal return, the element is guaranteed to have been inserted.

offer

boolean offer(java.lang.Object item,
              long msecs)
              throws java.lang.InterruptedException
Place item in channel only if it can be accepted within msecs milliseconds. The time bound is interpreted in a coarse-grained, best-effort fashion.

Specified by:
offer in interface Puttable
Parameters:
item - the element to be inserted. Should be non-null.
msecs - the number of milliseconds to wait. If less than or equal to zero, the method does not perform any timed waits, but might still require access to a synchronization lock, which can impose unbounded delay if there is a lot of contention for the channel.
Returns:
true if accepted, else false
Throws:
java.lang.InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case the element is guaranteed not to be inserted (i.e., is equivalent to a false return).

take

java.lang.Object take()
                      throws java.lang.InterruptedException
Return and remove an item from channel, possibly waiting indefinitely until such an item exists.

Specified by:
take in interface Takable
Returns:
some item from the channel. Different implementations may guarantee various properties (such as FIFO) about that item
Throws:
java.lang.InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case state of the channel is unchanged.

poll

java.lang.Object poll(long msecs)
                      throws java.lang.InterruptedException
Return and remove an item from channel only if one is available within msecs milliseconds. The time bound is interpreted in a coarse grained, best-effort fashion.

Specified by:
poll in interface Takable
Parameters:
msecs - the number of milliseconds to wait. If less than or equal to zero, the operation does not perform any timed waits, but might still require access to a synchronization lock, which can impose unbounded delay if there is a lot of contention for the channel.
Returns:
some item, or null if the channel is empty.
Throws:
java.lang.InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case state of the channel is unchanged (i.e., equivalent to a null return).

peek

java.lang.Object peek()
Return, but do not remove object at head of Channel, or null if it is empty.