Tuesday, September 18, 2007

[BUG] JE never stops logging ...

This is entry is a partial repost of a message I posted to Oracle's Berkeley DB JE Forum. The forum software does not allow for the proper formatting of source code and I personally hate reading unformatted source code. Therefore, I have reposted it here so people like me can't read the code right off the page.

The code

import java.io.File;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Random;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.evictor.Evictor;

import it.unimi.dsi.fastutil.ints.IntOpenHashSet;

public class CrushPunyJE
{
    private static final int NUM_DOMAINS = 1000;

    private static final int RECIPIENTS_PER_DOMAIN = 1000;

    public static void main(String[] args) throws Exception
    {
        crush(new File(args[0]));
    }

    private static void crush(File envHome) throws Exception
    {
        MyDbI dbi = MyDbI.openToFill(envHome);
        Cursor queue = dbi.queue().openCursor(null, null);
        Cursor recipients = dbi.recipients.openCursor(null, null);
        Cursor unsent = dbi.unsent.openCursor(null, null);
        DatabaseEntry ename = new DatabaseEntry();
        DatabaseEntry dname = new DatabaseEntry();

        try
        {
            Random rand = new Random();
            byte[] buff = new byte[512];
            for (int a = 0; a < NUM_DOMAINS; a++)
            {
                rand.nextBytes(buff);
                dname.setData(buff);
                for (int z = 0; z < RECIPIENTS_PER_DOMAIN; z++)
                {
                    rand.nextBytes(buff);
                    ename.setData(buff);
                    if (OperationStatus.SUCCESS == unsent.putNoDupData(dname, ename))
                    {
                        recipients.put(dname, ename);
                        queue.put(dname, ename);
                    }
                    else
                        z--;
                }
            }
        }
        finally
        {
            for (Cursor c : new Cursor[]{recipients, unsent, queue})
            {
                try
                {
                    c.close();
                }
                catch (DatabaseException e)
                {
                    e.printStackTrace();
                }
            }
            dbi.close();
        }

        final MyDbI dbi2 = MyDbI.open(envHome);
        final Runnable runnable = new Runnable()
        {
            public void run()
            {
                Evictor evil = (Evictor)getField(getField(dbi2.env(), "environmentImpl"), "evictor");
                while (true)
                {
                    evil.runOrPause(true);
                    try
                    {
                        Thread.sleep(10);
                    }
                    catch (InterruptedException e)
                    {
                    }
                }
            }
        };
        Thread t = new Thread(runnable);
        t.setDaemon(true);
        t.setPriority(Thread.MAX_PRIORITY);
        t.start();
        dbi2.clearQueue().sync();
        dbi2.close();
    }

    private static Object getField(Object o, String fieldName)
    {
        Field field = getField(o.getClass(), fieldName);
        if (null == field)
            throw new AssertionError("Field '" + fieldName + "' not found.");
        else
        {
            try
            {
                return field.get(o);
            }
            catch (IllegalAccessException ex)
            {
                throw new AssertionError("IllegalAccessException thrown while accessing: ".concat(fieldName));
            }
        }
    }

    private static Field getField(Class co, String name)
    {
        for (Class stop = Object.class; co != stop; co = co.getSuperclass())
        {
            for (Field field : co.getDeclaredFields())
            {
                if (name.equals(field.getName()))
                {
                    field.setAccessible(true);
                    return field;
                }
            }
        }
        return null;
    }
    //====================================================================================================================//
    //====================================== Inner Class Definitions Start Here ==========================================//
    //====================================================================================================================//

    private static class MyDbI
    {
        /**
         * A small cache of prime numbers.
         */
        private static final IntOpenHashSet PRIMES = new IntOpenHashSet();

        /**
         * The maximum number of lock tables.
         */
        private static final int MAX_LOCK_TABLES = 523;

        /**
         * The databases.
         */
        public final Database recipients, unsent;

        /**
         * The delivery queue.
         */
        private static final String QUEUE_DB = "queue";

        /**
         * The database name of the unsent recipients.
         */
        private static final String UNSENT_DB = "unsent";

        /**
         * The database name for the recipient list database.
         */
        private static final String RECIP_DB = "recipients";

        private static final int CACHE_SIZE = 1024 << 10 << 4;

        public Database queue;

        /**
         * The database environment object.
         */
        private final Environment env;

        private final boolean dw, readonly;

        private MyDbI(File home, EnvironmentConfig ecfg, DatabaseConfig dbc) throws DatabaseException
        {
            dw = dbc.getDeferredWrite();
            readonly = dbc.getReadOnly();
            dbc.setSortedDuplicates(true);

            env = new Environment(home, ecfg);

            Transaction txn = ecfg.getTransactional() ? env.beginTransaction(null, null) : null;

            if (dbc.getAllowCreate())
            {
                queue = env.openDatabase(txn, QUEUE_DB, dbc);
                unsent = env.openDatabase(txn, UNSENT_DB, dbc);
                recipients = env.openDatabase(txn, RECIP_DB, dbc);
            }
            else
            {
                List names = env.getDatabaseNames();
                queue = names.contains(QUEUE_DB) ? env.openDatabase(txn, QUEUE_DB, dbc) : null;
                unsent = names.contains(UNSENT_DB) ? env.openDatabase(txn, UNSENT_DB, dbc) : null;
                recipients = names.contains(RECIP_DB) ? env.openDatabase(txn, RECIP_DB, dbc) : null;
            }

            if (null != txn)
                txn.commit();
        }

        /**
         * @return The Environment object that backs this DbI.
         */
        public Environment env()
        {
            return env;
        }

        /**
         * Get the queue database.
         *
         * @return The queue database.
         */
        public synchronized Database queue()
        {
            return queue;
        }

        public synchronized MyDbI clearQueue() throws DatabaseException
        {
            DatabaseConfig config = queue.getConfig();
            queue.close();
            env.truncateDatabase(null, QUEUE_DB, false);
            queue = env.openDatabase(null, QUEUE_DB, config);
            return this;
        }

        /**
         * Synchronizes {@link #unsent} with {@link #queue}.
         *
         * @return this
         *
         * @throws DatabaseException If there is a database error.
         */
        public synchronized MyDbI sync() throws DatabaseException
        {
            if (readonly)
                throw new IllegalStateException("read-only mode");

            DatabaseEntry k = new DatabaseEntry();
            DatabaseEntry v = new DatabaseEntry();
            Cursor uc = unsent.openCursor(null, null);
            try
            {
                if (dw)
                {
                    while (OperationStatus.SUCCESS == uc.getNext(k, v, null))
                        if (OperationStatus.SUCCESS != queue.putNoDupData(null, k, v))
                            assert false : "Duplicate email addresses not allowed in queue.";
                }
                else
                {
                    boolean commit = false;
                    Transaction txn = env.beginTransaction(null, null);
                    try
                    {
                        Cursor qc = queue.openCursor(txn, null);
                        try
                        {
                            while (OperationStatus.SUCCESS == uc.getNext(k, v, null))
                                if (OperationStatus.SUCCESS != qc.putNoDupData(k, v))
                                    assert false : "Duplicate email addresses not allowed in queue.";
                            commit = true;
                        }
                        finally
                        {
                            qc.close();
                        }
                    }
                    finally
                    {
                        if (commit)
                            txn.commit();
                        else
                            txn.abort();
                    }
                }
            }
            finally
            {
                uc.close();
            }
            return this;
        }

        /**
         * Synchronizes {@link #unsent} with {@link #queue}.
         *
         * @return this
         *
         * @throws DatabaseException If there is a database error.
         */
        public synchronized MyDbI sync2() throws DatabaseException
        {
            if (readonly)
                throw new IllegalStateException("read-only mode");

            DatabaseEntry k = new DatabaseEntry();
            DatabaseEntry v = new DatabaseEntry();
            Cursor uc = unsent.openCursor(null, null);
            try
            {
                if (dw)
                {
                    while (OperationStatus.SUCCESS == uc.getNext(k, v, null))
                        if (OperationStatus.SUCCESS != queue.putNoDupData(null, k, v))
                            assert false : "Duplicate email addresses not allowed in queue.";
                }
                else
                {
                    boolean commit = false;
                    Transaction txn = env.beginTransaction(null, null);
                    try
                    {
                        Cursor qc = queue.openCursor(txn, null);
                        try
                        {
                            for (int x = 0; OperationStatus.SUCCESS == uc.getNext(k, v, null);)
                            {
                                if (OperationStatus.SUCCESS != qc.putNoDupData(k, v))
                                    assert false : "Duplicate email addresses not allowed in queue.";
                                commit = true;
                                if (++x == 1000)
                                {
                                    x = 0;
                                    commit = false;
                                    qc.close();
                                    txn.commit();
                                    txn = env.beginTransaction(null, null);
                                    qc = queue.openCursor(txn, null);
                                }
                            }
                        }
                        finally
                        {
                            qc.close();
                        }
                    }
                    finally
                    {
                        if (commit)
                            txn.commit();
                        else
                            txn.abort();
                    }
                }
            }
            finally
            {
                uc.close();
            }
            return this;
        }

        /**
         * Closes the databases and environment.
         */
        public synchronized void close()
        {
            try
            {
                for (Database db : new Database[]{unsent, unsent, recipients})
                    db.close();
            }
            catch (DatabaseException e)
            {
                e.printStackTrace();
            }
            try
            {
                env.close();
            }
            catch (DatabaseException e)
            {
                e.printStackTrace();
            }
        }

        /**
         * Use when populating the databases.
         *
         * @param home The home directory of the blast.
         *
         * @return An environment optimized for single threaded write only access.
         *
         * @throws DatabaseException If there is a problem opening the databases or the database environment.
         */
        public static MyDbI openToFill(File home) throws DatabaseException
        {
            DatabaseConfig dcfg = new DatabaseConfig();
            dcfg.setAllowCreate(true);
            dcfg.setDeferredWrite(true);
            return new MyDbI(home, getFillConfig(), dcfg);
        }

        /**
         * Use for normal blast database access.
         *
         * @param home The home directory of the blast.
         *
         *
         * @throws DatabaseException If there is a problem opening the databases or the database environment.
         */
        public static MyDbI open(File home) throws DatabaseException
        {
            DatabaseConfig config = new DatabaseConfig();
            config.setAllowCreate(true);
            config.setTransactional(true);
            return new MyDbI(home, getDefaultConfig(), config);
        }

        /**
         * @return The number of lock tables based on the number of CPUs.
         */
        public static int getLockTableSize()
        {
            int cpus = Runtime.getRuntime().availableProcessors();
            if (cpus < 4)
                return 1;
            for (cpus = Math.min(MAX_LOCK_TABLES, cpus); cpus > 0 && !PRIMES.contains(cpus);)
                cpus--;
            return Math.max(1, cpus);
        }

        /**
         * Creates a new EnvironmentConfig object suitable for single threaded, write intensive access.
         *
         * @return An EnvironmentConfig object suitable for a single write only thread.
         */
        private static EnvironmentConfig getFillConfig()
        {
            EnvironmentConfig config = getNormalConfig();
            config.setCacheSize(1024 << 10 << 4);
            config.setAllowCreate(true);
            config.setLocking(false);
            return config;
        }

        /**
         * Creates a new com.sleepycat.dbi.EnvironmentConfig object suitable for normal data access patterns.
         *
         * @return An com.sleepycat.dbi.EnvironmentConfig object suitable for normal data access patterns.
         */
        private static EnvironmentConfig getNormalConfig()
        {
            EnvironmentConfig config = new EnvironmentConfig();
            config.setConfigParam("je.log.faultReadSize", "4096");
            config.setConfigParam("je.lock.nLockTables", Integer.toString(getLockTableSize()));
            return config;
        }

        /**
         * Configures an environment for normal transactional access.
         *
         * @return A configuration for normal transactional access.
         */
        private static EnvironmentConfig getDefaultConfig()
        {
            EnvironmentConfig ecfg = getNormalConfig();
            ecfg.setCacheSize(CACHE_SIZE);
            ecfg.setTransactional(true);
            ecfg.setTxnNoSync(true);
            return ecfg;
        }

        static
        {
            PRIMES.add(2);
            PRIMES.add(3);
            PRIMES.add(5);
            PRIMES.add(7);
            PRIMES.add(11);
            PRIMES.add(13);
            PRIMES.add(17);
            PRIMES.add(19);
            PRIMES.add(23);
            PRIMES.add(29);
            PRIMES.add(31);
            PRIMES.add(37);
            PRIMES.add(41);
            PRIMES.add(43);
            PRIMES.add(47);
            PRIMES.add(53);
            PRIMES.add(59);
            PRIMES.add(61);
            PRIMES.add(67);
            PRIMES.add(71);
            PRIMES.add(73);
            PRIMES.add(79);
            PRIMES.add(83);
            PRIMES.add(89);
            PRIMES.add(97);
            PRIMES.add(101);
            PRIMES.add(103);
            PRIMES.add(107);
            PRIMES.add(109);
            PRIMES.add(113);
            PRIMES.add(127);
            PRIMES.add(131);
            PRIMES.add(137);
            PRIMES.add(139);
            PRIMES.add(149);
            PRIMES.add(151);
            PRIMES.add(157);
            PRIMES.add(163);
            PRIMES.add(167);
            PRIMES.add(173);
            PRIMES.add(179);
            PRIMES.add(181);
            PRIMES.add(191);
            PRIMES.add(193);
            PRIMES.add(197);
            PRIMES.add(199);
            PRIMES.add(229);
            PRIMES.add(241);
            PRIMES.add(241);
            PRIMES.add(241);
            PRIMES.add(271);
            PRIMES.add(283);
            PRIMES.add(283);
            PRIMES.add(313);
            PRIMES.add(313);
            PRIMES.add(313);
            PRIMES.add(349);
            PRIMES.add(349);
            PRIMES.add(349);
            PRIMES.add(349);
            PRIMES.add(421);
            PRIMES.add(433);
            PRIMES.add(463);
            PRIMES.add(463);
            PRIMES.add(463);
            PRIMES.add(523);
            PRIMES.trim();
        }
    }
}

Saturday, September 15, 2007

The Missing Minute

Sometime earlier today
A minute of mine went away
So I thought about main
But that thought was in vain
The minute was hiding okay?

I opened the profiler quick
And attach to the app in a nick
I collected a sample
It wasn't quite ample
The minute continued to tick.

I decided to give it a rest
In order to give it my best
I went for a drive
And in about five
My mind was finally unstressed.

But on the way home I found it
The place where the minute was grounded
It was caught in a latch
With a timer dispatch
But no signal handler around it.

Let this be a lesson to all
When debugging processes stall
Take a step back and maybe a nap
And the bug is likely to fall.