This is entry is a partial repost of a message I posted to Oracle's Berkeley DB JE Forum. The forum software does not allow for the proper formatting of source code and I personally hate reading unformatted source code. Therefore, I have reposted it here so people like me can't read the code right off the page.
The code
import java.io.File;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Random;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.evictor.Evictor;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
public class CrushPunyJE
{
private static final int NUM_DOMAINS = 1000;
private static final int RECIPIENTS_PER_DOMAIN = 1000;
public static void main(String[] args) throws Exception
{
crush(new File(args[0]));
}
private static void crush(File envHome) throws Exception
{
MyDbI dbi = MyDbI.openToFill(envHome);
Cursor queue = dbi.queue().openCursor(null, null);
Cursor recipients = dbi.recipients.openCursor(null, null);
Cursor unsent = dbi.unsent.openCursor(null, null);
DatabaseEntry ename = new DatabaseEntry();
DatabaseEntry dname = new DatabaseEntry();
try
{
Random rand = new Random();
byte[] buff = new byte[512];
for (int a = 0; a < NUM_DOMAINS; a++)
{
rand.nextBytes(buff);
dname.setData(buff);
for (int z = 0; z < RECIPIENTS_PER_DOMAIN; z++)
{
rand.nextBytes(buff);
ename.setData(buff);
if (OperationStatus.SUCCESS == unsent.putNoDupData(dname, ename))
{
recipients.put(dname, ename);
queue.put(dname, ename);
}
else
z--;
}
}
}
finally
{
for (Cursor c : new Cursor[]{recipients, unsent, queue})
{
try
{
c.close();
}
catch (DatabaseException e)
{
e.printStackTrace();
}
}
dbi.close();
}
final MyDbI dbi2 = MyDbI.open(envHome);
final Runnable runnable = new Runnable()
{
public void run()
{
Evictor evil = (Evictor)getField(getField(dbi2.env(), "environmentImpl"), "evictor");
while (true)
{
evil.runOrPause(true);
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
}
}
}
};
Thread t = new Thread(runnable);
t.setDaemon(true);
t.setPriority(Thread.MAX_PRIORITY);
t.start();
dbi2.clearQueue().sync();
dbi2.close();
}
private static Object getField(Object o, String fieldName)
{
Field field = getField(o.getClass(), fieldName);
if (null == field)
throw new AssertionError("Field '" + fieldName + "' not found.");
else
{
try
{
return field.get(o);
}
catch (IllegalAccessException ex)
{
throw new AssertionError("IllegalAccessException thrown while accessing: ".concat(fieldName));
}
}
}
private static Field getField(Class co, String name)
{
for (Class stop = Object.class; co != stop; co = co.getSuperclass())
{
for (Field field : co.getDeclaredFields())
{
if (name.equals(field.getName()))
{
field.setAccessible(true);
return field;
}
}
}
return null;
}
//====================================================================================================================//
//====================================== Inner Class Definitions Start Here ==========================================//
//====================================================================================================================//
private static class MyDbI
{
/**
* A small cache of prime numbers.
*/
private static final IntOpenHashSet PRIMES = new IntOpenHashSet();
/**
* The maximum number of lock tables.
*/
private static final int MAX_LOCK_TABLES = 523;
/**
* The databases.
*/
public final Database recipients, unsent;
/**
* The delivery queue.
*/
private static final String QUEUE_DB = "queue";
/**
* The database name of the unsent recipients.
*/
private static final String UNSENT_DB = "unsent";
/**
* The database name for the recipient list database.
*/
private static final String RECIP_DB = "recipients";
private static final int CACHE_SIZE = 1024 << 10 << 4;
public Database queue;
/**
* The database environment object.
*/
private final Environment env;
private final boolean dw, readonly;
private MyDbI(File home, EnvironmentConfig ecfg, DatabaseConfig dbc) throws DatabaseException
{
dw = dbc.getDeferredWrite();
readonly = dbc.getReadOnly();
dbc.setSortedDuplicates(true);
env = new Environment(home, ecfg);
Transaction txn = ecfg.getTransactional() ? env.beginTransaction(null, null) : null;
if (dbc.getAllowCreate())
{
queue = env.openDatabase(txn, QUEUE_DB, dbc);
unsent = env.openDatabase(txn, UNSENT_DB, dbc);
recipients = env.openDatabase(txn, RECIP_DB, dbc);
}
else
{
List names = env.getDatabaseNames();
queue = names.contains(QUEUE_DB) ? env.openDatabase(txn, QUEUE_DB, dbc) : null;
unsent = names.contains(UNSENT_DB) ? env.openDatabase(txn, UNSENT_DB, dbc) : null;
recipients = names.contains(RECIP_DB) ? env.openDatabase(txn, RECIP_DB, dbc) : null;
}
if (null != txn)
txn.commit();
}
/**
* @return The Environment object that backs this DbI.
*/
public Environment env()
{
return env;
}
/**
* Get the queue database.
*
* @return The queue database.
*/
public synchronized Database queue()
{
return queue;
}
public synchronized MyDbI clearQueue() throws DatabaseException
{
DatabaseConfig config = queue.getConfig();
queue.close();
env.truncateDatabase(null, QUEUE_DB, false);
queue = env.openDatabase(null, QUEUE_DB, config);
return this;
}
/**
* Synchronizes {@link #unsent} with {@link #queue}.
*
* @return this
*
* @throws DatabaseException If there is a database error.
*/
public synchronized MyDbI sync() throws DatabaseException
{
if (readonly)
throw new IllegalStateException("read-only mode");
DatabaseEntry k = new DatabaseEntry();
DatabaseEntry v = new DatabaseEntry();
Cursor uc = unsent.openCursor(null, null);
try
{
if (dw)
{
while (OperationStatus.SUCCESS == uc.getNext(k, v, null))
if (OperationStatus.SUCCESS != queue.putNoDupData(null, k, v))
assert false : "Duplicate email addresses not allowed in queue.";
}
else
{
boolean commit = false;
Transaction txn = env.beginTransaction(null, null);
try
{
Cursor qc = queue.openCursor(txn, null);
try
{
while (OperationStatus.SUCCESS == uc.getNext(k, v, null))
if (OperationStatus.SUCCESS != qc.putNoDupData(k, v))
assert false : "Duplicate email addresses not allowed in queue.";
commit = true;
}
finally
{
qc.close();
}
}
finally
{
if (commit)
txn.commit();
else
txn.abort();
}
}
}
finally
{
uc.close();
}
return this;
}
/**
* Synchronizes {@link #unsent} with {@link #queue}.
*
* @return this
*
* @throws DatabaseException If there is a database error.
*/
public synchronized MyDbI sync2() throws DatabaseException
{
if (readonly)
throw new IllegalStateException("read-only mode");
DatabaseEntry k = new DatabaseEntry();
DatabaseEntry v = new DatabaseEntry();
Cursor uc = unsent.openCursor(null, null);
try
{
if (dw)
{
while (OperationStatus.SUCCESS == uc.getNext(k, v, null))
if (OperationStatus.SUCCESS != queue.putNoDupData(null, k, v))
assert false : "Duplicate email addresses not allowed in queue.";
}
else
{
boolean commit = false;
Transaction txn = env.beginTransaction(null, null);
try
{
Cursor qc = queue.openCursor(txn, null);
try
{
for (int x = 0; OperationStatus.SUCCESS == uc.getNext(k, v, null);)
{
if (OperationStatus.SUCCESS != qc.putNoDupData(k, v))
assert false : "Duplicate email addresses not allowed in queue.";
commit = true;
if (++x == 1000)
{
x = 0;
commit = false;
qc.close();
txn.commit();
txn = env.beginTransaction(null, null);
qc = queue.openCursor(txn, null);
}
}
}
finally
{
qc.close();
}
}
finally
{
if (commit)
txn.commit();
else
txn.abort();
}
}
}
finally
{
uc.close();
}
return this;
}
/**
* Closes the databases and environment.
*/
public synchronized void close()
{
try
{
for (Database db : new Database[]{unsent, unsent, recipients})
db.close();
}
catch (DatabaseException e)
{
e.printStackTrace();
}
try
{
env.close();
}
catch (DatabaseException e)
{
e.printStackTrace();
}
}
/**
* Use when populating the databases.
*
* @param home The home directory of the blast.
*
* @return An environment optimized for single threaded write only access.
*
* @throws DatabaseException If there is a problem opening the databases or the database environment.
*/
public static MyDbI openToFill(File home) throws DatabaseException
{
DatabaseConfig dcfg = new DatabaseConfig();
dcfg.setAllowCreate(true);
dcfg.setDeferredWrite(true);
return new MyDbI(home, getFillConfig(), dcfg);
}
/**
* Use for normal blast database access.
*
* @param home The home directory of the blast.
*
*
* @throws DatabaseException If there is a problem opening the databases or the database environment.
*/
public static MyDbI open(File home) throws DatabaseException
{
DatabaseConfig config = new DatabaseConfig();
config.setAllowCreate(true);
config.setTransactional(true);
return new MyDbI(home, getDefaultConfig(), config);
}
/**
* @return The number of lock tables based on the number of CPUs.
*/
public static int getLockTableSize()
{
int cpus = Runtime.getRuntime().availableProcessors();
if (cpus < 4)
return 1;
for (cpus = Math.min(MAX_LOCK_TABLES, cpus); cpus > 0 && !PRIMES.contains(cpus);)
cpus--;
return Math.max(1, cpus);
}
/**
* Creates a new EnvironmentConfig object suitable for single threaded, write intensive access.
*
* @return An EnvironmentConfig object suitable for a single write only thread.
*/
private static EnvironmentConfig getFillConfig()
{
EnvironmentConfig config = getNormalConfig();
config.setCacheSize(1024 << 10 << 4);
config.setAllowCreate(true);
config.setLocking(false);
return config;
}
/**
* Creates a new com.sleepycat.dbi.EnvironmentConfig object suitable for normal data access patterns.
*
* @return An com.sleepycat.dbi.EnvironmentConfig object suitable for normal data access patterns.
*/
private static EnvironmentConfig getNormalConfig()
{
EnvironmentConfig config = new EnvironmentConfig();
config.setConfigParam("je.log.faultReadSize", "4096");
config.setConfigParam("je.lock.nLockTables", Integer.toString(getLockTableSize()));
return config;
}
/**
* Configures an environment for normal transactional access.
*
* @return A configuration for normal transactional access.
*/
private static EnvironmentConfig getDefaultConfig()
{
EnvironmentConfig ecfg = getNormalConfig();
ecfg.setCacheSize(CACHE_SIZE);
ecfg.setTransactional(true);
ecfg.setTxnNoSync(true);
return ecfg;
}
static
{
PRIMES.add(2);
PRIMES.add(3);
PRIMES.add(5);
PRIMES.add(7);
PRIMES.add(11);
PRIMES.add(13);
PRIMES.add(17);
PRIMES.add(19);
PRIMES.add(23);
PRIMES.add(29);
PRIMES.add(31);
PRIMES.add(37);
PRIMES.add(41);
PRIMES.add(43);
PRIMES.add(47);
PRIMES.add(53);
PRIMES.add(59);
PRIMES.add(61);
PRIMES.add(67);
PRIMES.add(71);
PRIMES.add(73);
PRIMES.add(79);
PRIMES.add(83);
PRIMES.add(89);
PRIMES.add(97);
PRIMES.add(101);
PRIMES.add(103);
PRIMES.add(107);
PRIMES.add(109);
PRIMES.add(113);
PRIMES.add(127);
PRIMES.add(131);
PRIMES.add(137);
PRIMES.add(139);
PRIMES.add(149);
PRIMES.add(151);
PRIMES.add(157);
PRIMES.add(163);
PRIMES.add(167);
PRIMES.add(173);
PRIMES.add(179);
PRIMES.add(181);
PRIMES.add(191);
PRIMES.add(193);
PRIMES.add(197);
PRIMES.add(199);
PRIMES.add(229);
PRIMES.add(241);
PRIMES.add(241);
PRIMES.add(241);
PRIMES.add(271);
PRIMES.add(283);
PRIMES.add(283);
PRIMES.add(313);
PRIMES.add(313);
PRIMES.add(313);
PRIMES.add(349);
PRIMES.add(349);
PRIMES.add(349);
PRIMES.add(349);
PRIMES.add(421);
PRIMES.add(433);
PRIMES.add(463);
PRIMES.add(463);
PRIMES.add(463);
PRIMES.add(523);
PRIMES.trim();
}
}
}