Thursday, July 12, 2007

Becoming a concurrency expert. Rule number not one, Relax.

The holy grail for a concurrency expert is wait-free, and if you can't achieve that, lock-free code because generally they are more scalable than algorithms that use locks. The previous link and this one describes some of the benefits of non-blocking synchronization but I'll give you a contrived example.

LF/WF algorithms can't deadlock! No locks, no deadlocks. Deadlocks are a bane to scalability because in the wild they are probabilistic events whose probability increases as concurrency increases. So you can imagine a situation where you have some code that runs smoothly on your old single processor system, then you upgrade to a dual core system and suddenly the code starts freezing every once in a while. You think to yourself, "gremlins" and continue along your merry way. Christmas comes early and you win a shiny new quad core box from some tech event in Atlanta. So you are thinking, "4x the processing power, 4x the performance, yeah!" But instead your program is freezing all the time. Simply killing it and restarting is not good enough anymore. You have a deadlock on your hands. You've gone from a single processor to a 4-way system and the scalability of the program has not followed suit. This is not atypical of a lot of Java programs in the wild, because though it may come as a shock to you (it sure as hell shocked me!), most Java programmers don't know spit about concurrency, even though Java has concurrency baked in from birth! And now that multi-core systems are becoming the norm, concurrency bugs that have laid dormant for years are waking up and stinging users.

It's possible, with a great deal of effort, to eliminate deadlocks in non LF/WF code. So (a) given that writing LF/WF code is as hard or harder than writing deadlock free code and (b) you are willing to solve any deadlock problems in the non LF/WF code, is LF/WF worth it? As with all questions relating to trade-offs, the answer is "it depends". In this case, it depends on how much throughput is enough. In the WF case you are guaranteed system-wide throughput with starvation freedom while in the LF case you are still guaranteed system-wide throughput but with the possibility that individual threads may starve. The bottom line is, progress is always being made. There is no such guarantee with blocking synchronization.

Because of the complexity associated with LF/WF algorithms most programmers never tackle LF/WF head on. Contact with LF/WF algorithms and code come in the form of using LF/WF datastructures (i.e. java.util.concurrent.ConcurrentLinkedQueue). But it may surprise you that in your own code there may be opportunities to write LF/WF code.

Disclaimer:

I'm not advocating everybody going through every line of code and trying to make it LF/WF (though I do advocate going through every line of your code and making sure it's thread safe). You really, really, need to have an extremely strong grasp of the Java Memory Model before you can even begin to think about writing LF/WF code, especially the happens-before rules.

You should limit your LF/WF tinkering to critical paths only. Critical paths are hot sections of code (code that is executed frequently). You need two tools to fix critical paths. Firstly, you need a Java profiler to tell you where the critical path is. The critical path is going to be the method call chain where the program spends the majority of it's time while under load. The under load distinction is extremely important because if you take a server application as an example, and profile it when there is no load, the profiler is going to report that the app is spending most of its time in [something like] Socket.accept(), which doesn't tell you anything about the performance of the app. In your quest for the critical path the best any Java profiler can do is tell you what methods are consuming the most amount of time. They cannot peer into the method and tell you which specific line of code is slow or if a lock is hot (a hot lock is one that is highly contended). This is where the second tool comes into play. You need a hardware profiler.

A hardware profiler differs from a Java profiler in that it show events at the CPU. Every modern CPU comes with all sorts of counters that enables programs to know what's going on inside the CPU. It can tell you things like cache hit/miss rates, stalls, lock acquisitions and releases, etc. Some operating systems comes with hardware profilers baked right in. Solaris 10/OpenSolaris on Sparc is the gold standard when it comes to observability. mpstat, corestat, plockstat, and [the big daddy of them all] DTrace are some of the tools baked into Solaris 10/OpenSolaris that allow you to dig deep into the bowels of the system to figure out exactly what's going on. If you aren't running Solaris but Linux or Windows on AMD you can use AMD's CodeAnalyst Performance Analyzer. Finally, if you are running Linux or Solaris (Intel or AMD) you can use Sun Studio 12 to get at the data. All the hardware profiler tools I've mentioned are free and/or open source. So you have no excuse not to have at least one installed.

So here are the steps you've completed so far:

  1. Profiled the app under load.
  2. Identified the critical methods (hotspots).
  3. Tuned the critical method(s) as best you can.
  4. Repeat steps 1-3 until you hit the point of diminishing returns.
At this point if the throughput is where you want it you can stop. You didn't have to write a lick of LF/WF code. Good for you. But what happens if you want to see how far you can really push the system? You start really cranking up the amount of threads (assuming you have available processors to execute them concurrently). You take at the look at the system's CPU utilization and it's redlining. You need to make sure it's actually doing useful work. So now it's time to fire up the hardware profiler to see what's going on in the CPU.

Aside:

Sun Studio 12 does the best job, of the tools listed, of associating CPU events back to the source code line(s) that produced them.

So you fire it up and the first thing that jumps out at you is you have a smokin' hot lock along your critical path. If you can't reduce the granularity of the lock or its scope, going LF/WF may be your only option.

Aside:

It's entirely possible that you can neither reduce the granularity of the lock nor rewrite the critical section as LF/WF. At this point you are screwed. You are just going to have to buy another box (or virtualize) and spread the load. If you can't spread the load you are royally screwed so the best thing to do is degrade gracefully.

I've gone through all of that setup just so I dump some code on you:

public class RecipientLimits
{
    private static final long ONE_HOUR_MILLIS = TimeUnit.HOURS.toMillis(1);

    private static final long THREE_DAYS_MILLIS = TimeUnit.DAYS.toMillis(3);

    private static final long ONE_MONTH_MILLIS = TimeUnit.DAYS.toMillis(31);

    private final ConcurrentMap hosts = new ConcurrentHashMap();

    /**
     * Tracks the last time we pruned the table.
     */
    private final AtomicLong lastrun = new AtomicLong(System.currentTimeMillis());

    /**
     * Get the recipient limit for host.
     *
     * @param host The host.
     *
     * @return The maximum number of recipients the host will accept.
     */
    public int get(InetAddress host)
    {
        Entry e = hosts.get(host);
        if (null == e)
        {
            Entry t = hosts.putIfAbsent(host, e = get(host.getAddress()));
            if (null != t)
                e = t;
        }
        return e.limit();
    }

    /**
     * Confirms that host accepts limit recipients.
     *
     * @param host  The host.
     * @param limit The number of recipients that was accepted.
     */
    public void confirmed(InetAddress host, int limit)
    {
        Entry e = hosts.get(host);
        if (null != e)
            e.confirm(limit);
        prune();
    }

    /**
     * Indicates that host did not accept limit number of recipients.
     *
     * @param host  The host.
     * @param limit The limit.
     */
    public void denied(InetAddress host, int limit)
    {
        Entry e = hosts.get(host);
        if (null != e)
            e.decrement(limit);
        prune();
    }

    /**
     * Removes inactive hosts from the table.
     */
    private void prune()
    {
        long last = lastrun.get();
        long millis = System.currentTimeMillis();
        if (millis - last >= ONE_HOUR_MILLIS && lastrun.compareAndSet(last, millis))
        {
            for (Iterator i = hosts.values().iterator(); i.hasNext();)
            {
                Entry e = i.next();
                if (0 == e.users.get() && millis - e.lastaccessed.get() >= THREE_DAYS_MILLIS)
                    i.remove();
            }
        }
    }

    /**
     * Look up host in the database.
     *
     * @param host The ip address of the host.
     *
     * @return A new Entry.
     */
    private Entry get(byte[] host)
    {
        //@todo don't forget to create an entry in the database if host does not already exist.
        return new Entry(1);
    }

    final class Entry
    {
        /**
         * The last time (milliseconds timestamp) this enty was accessed.
         */
        final AtomicLong lastaccessed;

        /**
         * The number of threads reading/writing this object.
         */
        final AtomicInteger users = new AtomicInteger();

        /**
         * Semaphore for updates.
         */
        private final AtomicInteger dflag = new AtomicInteger();

        /**
         * The recipient limit.
         */
        private final AtomicInteger limit;

        /**
         * Indicates when we've maxed out {@link #limit}.
         */
        private volatile boolean maxo;

        /**
         * The millisecond timestamp when we maxed out date.
         * 

* This field is correctly synchronized because there is a happens-before edge created by the write to this * followed by a write to {@link #maxo} in {@link #decrement(int)} and then the read of {@link #maxo} followed by the read * of this in {@link #confirm(int)}. *

*/ private long maxtstamp; /** * Constructs a new Entry. * * @param limit The recipient limit. */ Entry(int limit) { this.limit = new AtomicInteger(limit); lastaccessed = new AtomicLong(System.currentTimeMillis()); } /** * @return The current limit. */ int limit() { users.incrementAndGet(); try { return limit.get(); } finally { lastaccessed.compareAndSet(lastaccessed.get(), System.currentTimeMillis()); users.decrementAndGet(); } } /** * Confirm the limit. * * @param limit The value returned by {@link #limit()}. */ void confirm(int limit) { users.incrementAndGet(); try { if (0 == dflag.get() && (!maxo || System.currentTimeMillis() - maxtstamp >= ONE_MONTH_MILLIS) && this.limit.compareAndSet(limit, limit + 1)) { //@todo - talk to database //@todo - use limit not limit + 1 } } finally { lastaccessed.compareAndSet(lastaccessed.get(), System.currentTimeMillis()); users.decrementAndGet(); } } /** * Decrement the limit. * * @param x The value returned by {@link #limit()}. */ void decrement(int x) { users.incrementAndGet(); try { if (x > 1 && dflag.compareAndSet(0, x)) { try { if (limit.compareAndSet(x, x = x - 1)) { boolean dbput = true; if (dbput) { maxtstamp = System.currentTimeMillis(); maxo = true; } } } finally { dflag.set(0); } } } finally { lastaccessed.compareAndSet(lastaccessed.get(), System.currentTimeMillis()); users.decrementAndGet(); } } } }

Let's revisit the title of this post "Becoming a concurrency expert. Rule number not one, Relax". I've emphasized relax because it is critically important to finding LF/WF opportunities in your own code. So what exactly does relaxing mean? The biggest thing it entails is realizing that you have very little control over the order in which threads execute and being OK with that. Because if you try to be draconian about the order in which things happen you will have to go single threaded or use locks. So once you've relaxed and let go of draconian ordering the only thing you have to worry about is ensuring that data races are benign. Notice I didn't say eliminate data races, I said "ensuring that data races are benign". The difference of course is relaxation. In LF/WF, data races are part of the design because at some point in the code you are going to need to do a CAS (i.e. java.util.concurrent.atomic.AtomicBoolean.compareAndSet(false,true), java.util.concurrent.atomic.AtomicInteger.incrementeAndGet(), etc) and a CAS is a [CPU supported] race condition waiting to happen. CAS isn't the only place where its okay to let a race condition go unchallenged. Anywhere you can prove that the consequence of a data race is benign is an opportunity to relax.

Let me back up quickly and talk briefly about ordering. I hope you don't think I said that ordering is not important or that you have absolutely no control because that is not true. Let me repeat it. What you don't have control of is when a thread will run. That's [ultimately] the responsibility of the operating system. What that translates into is, you don't have control of when something executes, only what executes.

The new JMM strengthened the guarantees of volatile to prevent the reordering of volatile read/writes in relationship to non volatile fields. In other words, you can use volatile fields to force code to execute in a certain order without the use of the synchronized keyword. Which also means you can use a single volatile field to safely publish multiple non volatile fields (an example of this is located in the code above and is described in the JavaDoc comment for maxtstamp). This was not possible prior to Java 5. Before Java 5 if you wanted visibility guarantees for fields you had to (a) declare all of them volatile or (b) use a synchronized block.

So there is a lot you can accomplish given the new volatile semantics but you can't do everything. Specifically, you can't make binding decisions with volatiles. So what's a binding decision?

if (some_volatile_condition IS True)
{
  //Execute code under the assumption that some_volatile_condition is still True.
}
... is a binding decision and in the absence of locking [before the read of some_volatile_condition] is a bug, except of course, in the case that the code being executed results in a benign race condition. Examples of non-binding decisions can be found in Entry.confirm(int) and Entry.decrement(int).

That's it for today. I'll pick the code apart in Part II. Thanx for stopping bye.