Wednesday, February 15, 2006

The Long Way Around, An Exercise in Concurrent Programming

Adopting the mindset to fully utilize the classes in java.util.concurrent can be quite challenging. One of the side effects of concurrent programming is that it can require more code than it would take to achieve the same results using mutual exclusion locking.

Is concurrency worth writing more code? My current answer to the question is hell yes because I think most Java programmers need the practice it's going to take to get one's head around all the issues associated with concurrency. And if the programmer doesn't practice, how is he/she going to be able to fully exploit concurrency in the cases where it is decisive? With that in mind I would like to talk about some code that I've written, but first some background info.

Better event handling

I think Java's current event model sucks! It's biggest problem is that it tightly couples the firing of events with there processing. In other words, the thread that does the firing is usually the same thread that handles the event. This is not a very stable or scalable solution because all it takes to bring event firing/handling to its knees is one badly written event handler (think infinite loop or blocking I/O). It is for this reason that the Swing team introduce the Swing worker thread so the programmer could separate the handling of events from the firing of events without taking the main Swing thread hostage.

There is a better way based on asynchronous messages. It is not fundamentally new. It is based on how asynchronous network I/O is done on Unix operating systems. You tell the kernel your are interested in a set of events then you can either go about your business and periodically check if the things you are interested has happened (poll(2)) or you can sit around and wait until something you are interested in does happen (select(2)). The choice is yours. Plus you can be interested in as many things as you want without a) requiring a copy of yourself for each interest set or b) hogging the kernel. This is because all the kernel does is tell you that what you are interested in is ready, it is your job to do something useful with it. So how do we implement the functionality in Java? Queues!

To wait or not to wait? That is the question!

The basic idea is a class/service that fires events will provide methods for threads to receive notification of these events by registering java.util.Queue objects. You can use any Queue object but I highly recommend java.util.concurrent.BlockingQueue objects. Why BlockingQueue[s]? Simple! A BlockingQueue allows us to choose to wait indefinitely, wait temporarily, or not at all for events (take(), poll(long,TimeUnit), poll()). It's the decoupler see? It unbinds the firing of events from the processing of events. It's the service/class 's responsibility to fire the event and it's the registrant's responsibility to process them. Tada!

The code dammit! The code!

Before I present any code I need to put it in context so it makes sense to you because I'm not going to include a complete project, just snippets. The snippets are part of a class that provides availability testing services. What that means is if a thread tries to make a connection to some host on the network and the connection fails, the thread can register the host with the service and the service will continue to try connecting to the host until some pre-configured timeout has elapsed or it manages to establish a connection. The registrant will be notified of the outcome of the connection testing by the service via the notification mechanism I've been talking about.

Now that I've through all the background info lets look at some code:
    /** 

37    * Container for listeners interested all test results. 
38    */ 
39   private final Collection<Queue<Result>> allResults = new ConcurrentHashSet<Queue<Result>>(); 

40    
41   /** 
42    * Container for listeners interested in specific test results. 
43    */ 
44   private final ConcurrentMap<String, Collection<Queue<Result>>> hostResults = 

45           new ConcurrentHashMap<String, Collection<Queue<Result>>>(); 
46    
47   /** 
48    * Registers <tt>listener</tt> with <tt>this</tt> object to receive {@link Result} 

49    * notifications for all hosts. 
50    * 
51    * @param listener  The listener. 
52    * @return  <tt>true</tt> if the listener was registered, <tt>false</tt> otherwise. 

53    */ 
54   public boolean register( Queue<Result> listener ) 
55   { 
56       return allResults.add( listener ); 
57   } 

58    
59   /** 
60    * Registers <tt>listener</tt> with <tt>this</tt> object to receive {@link Result} 

61    * notifications for hosts in <tt>hosts</tt>. 
62    * 
63    * @param hosts The interest set. 
64    * @param listener  The listener. 

65    * @return  The registration results. 
66    */ 
67   public boolean[] register( String[] hosts, Queue<Result> listener ) 
68   { 

69       boolean[] registrations = new boolean[ hosts.length ]; 
70       for( int i = registrations.length; --i >= 0; ) 
71       { 

72           String host = hosts[ i ]; 
73           Collection<Queue<Result>> listeners = hostResults.get( host ); 
74           if( null == listeners ) 
75           { 

76               listeners = new ConcurrentHashSet<Queue<Result>>(); 
77               Collection<Queue<Result>> preempt = hostResults.putIfAbsent( host, listeners ); 
78               if( null != preempt ) 

79                   listeners = preempt; 
80           } 
81           registrations[ i ] = listeners.add( listener ); 
82       } 
83       return registrations; 
84   } 

85    
86   /** 
87    * Unregisters <tt>listener</tt>. 
88    * @param listener  The listener that was registered with this object via {@link #register(Queue)}. 

89    */ 
90   public void unregister( Queue<Result> listener ) 
91   { 
92       allResults.remove( listener ); 
93   } 

94    
95   /** 
96    * Unregister's <tt>listener</tt> from all <tt>hosts</tt>. 
97    * @param hosts 

98    * @param listener 
99    */ 
100  public void unregister( String[] hosts, Queue<Result> listener ) 
101  { 

102      for( String host : hosts ) 
103      { 
104          Collection<Queue<Result>> c = hostResults.get( host ); 
105          if( null != c ) 

106          { 
107              c.remove( listener ); 
108              if( c.isEmpty() ) 
109              { 
110                  Queue<Result> temp = new ConcurrentLinkedQueue<Result>(); 

111                  boolean b = register( temp ); 
112                  assert b : "IMPOSSIBLE!";// but i like to check anyway. 
113                  hostResults.remove( host ); 
114                  if( !c.isEmpty() ) 

115                  { 
116                      Collection<Queue<Result>> preempt = hostResults.putIfAbsent( host, c ); 
117                      if( null != preempt ) 
118                      { 

119                          preempt.addAll( c ); 
120                          unregister( temp ); 
121                          c = preempt; 
122                      } 
123   
124                      for( Result result : temp ) 

125                          if( result.host.equals( host ) ) 
126                              for( Queue<Result> queue : c ) 
127                                  queue.offer( result ); 
128                  } 

129                  unregister( temp ); 
130              } 
131          } 
132      } 
133  } 
134   
135  /** 

136   * Notifies registered listeners of a new event. 
137   * @param result    The event. 
138   */ 
139  private void fire( Result result ) 
140  { 

141      Collection<Queue<Result>> listeners = hostResults.get( result.host ); 
142      if( null != listeners ) 
143          for( Queue<Result> listener : listeners ) 

144              listener.offer( result ); 
145   
146      for( Queue<Result> results : allResults ) 
147          results.offer( result ); 
148  }

If you've simply skimmed pass the code you may want to go back and take a minute to digest it because I'm going to rewrite parts of it as it would have been written without the concurrency tools.

No concurrency

private final Collection<Queue<Result>> allResults = 

37       Collections.synchronizedCollection( new HashSet<Queue<Result>>() ); 
38    
39   private final Map<String, Collection<Queue<Result>>> hostResults = 

40       new Hashtable<String, Collection<Queue<Result>>>(); 
41    
42   public boolean register( Queue<Result> listener ) 
43   { 

44       return allResults.add( listener ); 
45   } 
46    
47   public boolean[] register( String[] hosts, Queue<Result> listener ) 
48   { 
49       boolean[] registrations = new boolean[ hosts.length ]; 

50       for( int i = registrations.length; --i >= 0; ) 
51       { 
52           String host = hosts[ i ]; 
53           Collection<Queue<Result>> listeners; 

54           synchronized( hostResults ) 
55           { 
56               listeners = hostResults.get( host ); 
57               if( null == listeners ) 
58               { 

59                   listeners = Collections.synchronizedCollection( new HashSet<Queue<Result>>() ); 
60                   hostResults.put( host, listeners ); 
61               } 
62           } 

63           registrations[ i ] = listeners.add( listener ); 
64       } 
65       return registrations; 
66   } 
67    
68   public void unregister( Queue<Result> listener ) 

69   { 
70       allResults.remove( listener ); 
71   } 
72    
73   public void unregister( String[] hosts, Queue<Result> listener ) 
74   { 

75       for( String host : hosts ) 
76       { 
77           synchronized( hostResults ) 
78           { 
79               Collection<Queue<Result>> c = hostResults.get( host ); 

80               if( null != c ) 
81               { 
82                   synchronized( c ) 
83                   { 
84                       c.remove( listener ); 

85                       if( c.isEmpty() ) 
86                           hostResults.remove( host ); 
87                   } 
88               } 
89           } 
90       } 

91   } 
92    
93   private void fire( Result result ) 
94   { 
95       Collection<Queue<Result>> listeners = hostResults.get( result.host ); 
96       if( null != listeners ) 

97       { 
98           synchronized( listeners ) 
99           { 
100              for( Queue<Result> listener : listeners ) 

101                  listener.offer( result ); 
102          } 
103      } 
104   
105      synchronized( allResults ) 
106      { 

107          for( Queue<Result> results : allResults ) 
108              results.offer( result ); 
109      } 
110  }

Let's compare

For the most part the non concurrent version looks pretty much like the concurrent version in terms of lines of code and complexity, except for public void unregister( String[], Queue).

It was because of this method that I started this blog entry in the first place, because I realized how much easier a synchronized version would have been to write. It just seemed to be the long way around for removing an entry from a Map. Lets walk through it.

First off, let me explain what it's supposed to be doing then (hopefully) it will make sense why it does what it does.

Basically, one host can have multiple listeners. So when all the listeners for a host have been unregistered we want to remove the mapping for the host. Otherwise, we are going to have an object retention bug on our hands. The problem is how do we remove the mapping in a thread safe way? Because we can't simply say,

if( c.isEmpty() )
    hostResults.remove( host );
and stop there because it is entirely possible that between the test and the removal another thread has registered for host. This is the kind of problem that mutual exclusion (synchronized) solves. But since we aren't using mutual exclusion we have to get creative. What we do is create a dummy Queue and register it to receive all events that get fired this way when we actual remove the mapping, if we are wrong about it being empty we can put it back and re-fire all the events that got fired between the time we removed the mapping and the time we put it back. The only draw back to all this (beside the additional lines of code) is that it is possible for a listener to receive duplicate events in the gap between,
preempt.addAll( c );
and
unregister( temp );

So is the additional complexity worth it? Maybe. If we did things the synchronized way the entire event notification mechanism can quickly become the bottle neck if a lot of threads are using it, especially if they are registering and unregistering for the same host. Now, we could replace the synchronized keyword with java.util.concurrent.locks.ReentrantLock[s]. It's a more scalable mutual exclusion mechanism than synchronized. But I've spoken to Doug Lea recently and he told me that the (Sun's) JVM team is working hard at making the synchronized keyword as performant as ReentrantLock. Ya gotta love that competition. I think everyone should email Doug and thank him and the rest of the jsr166 team for embarrassing the JVM team into trying to make contended monitor acquisition more performant.

One last thing

I want to point out there is no general purpose lock free solution/pattern to the problem that I was solving in the unregister method. I was able to get away with not using locks because of the class' support for listeners to be notified of all events regardless of host. This support wasn't put in there just for concurrency's sake but is simply a feature requirement of the class that I've leveraged to avoid locking.

1 comment:

  1. Anonymous1:24 AM

    hi HashiDiKo ,

    nice post. I will try and digest the code tomorrow.

    BR,
    ~A

    ReplyDelete