Persistent use of Berkeley DB build queue

About Berkeley DB

Berkeley DB (hereinafter referred to as Bdb) is embedded in the keys database. Bdb There are two versions, one is built using c + + version and a java version. c + + version supports the use of many languages, Berkeley DB Java Edition (hereinafter referred to as JE) written entirely in java. JE implementation of applications, not necessary for Client / Server communication. JE easier to deploy and embedded into java program, so I chose to use the Berkeley DB Java Edition (sleep cat).

Berkeley DB programming interface

In the Java program using JE, first need to initialize a database environment.

File home = new File(envHome);
EnvironmentConfig environmentConfig = new EnvironmentConfig();
Environment environment = new Environment(home, environmentConfig);

home is an existing directory, JE will own all database files to this directory. setTransactional set the database supports transactions. setAllowCreate set the database does not exist, it creates the database. setDurability write data set Bdb way, this is very important to the performance and data security implications and significant. Bdb data for performance consideration, write to the data is not immediately written back to disk, you must manually synchronize and close the database when it is back to write-back data. Can modify the default behavior by setting Durability. Durability default value of the following strategies

Transaction commit, the sync data to disk, and wait for the disk to complete, this is the safest strategy, and most time-consuming strategy.
Transaction commit, the write data to disk, do not wait for the disk to complete, security and speed of a compromise strategy.

COMMIT_SYNC guarantee application or system hangs without losing data, to ensure the integrity of the transaction. The COMMIT_WRITE_NO_SYNC can only guarantee application hang without loss of data, can not guarantee the integrity of the transaction when the system hangs.

JE offers two levels of programming interfaces, high-level Direct Persistence Layer (DPL), DPL for direct save and read a Java object of the scene. DPL reads meta-information using the annotation configuration save and read data.

Direct Persistence Layer

Access to the user class to be

public class User {

    @PrimaryKey(sequence = "SEQ_USER_ID")
    private Integer userId;

    @SecondaryKey(relate = MANY_TO_ONE)
    private String nick;

    //constructor, getter and setter

PrimaeyKey configuration userId primary key, sequence by sequence of a self-configured, @ SecondaryKey configured to query the index. MANY_TO_ONE said, nick's value in the database can be repeated, multiple user can use the same nick, by nick query returns a list.

    private static EntityStore createStore(Environment envionment) throws DatabaseException {
        StoreConfig storeConfig = new StoreConfig();
        return new EntityStore(envionment, "store1", storeConfig);

Create a EntityStore

PrimaryIndex<Integer, User> primaryKey = entityStore.getPrimaryIndex(Integer.class, User.class);

primaryKey.put(new User("user" + i));

Obtained by EntityStore corresponding PrimaryIndex, you can save, read, check Java object.

Base API

Similarly, first create a Database.

 DatabaseConfig myDbConfig = new DatabaseConfig();

 Database myDb = myEnv.openDatabase(null,     // txn handle
           dbName,   // Database file name

Use Database stored data, provided by the Key and Value have to serialize DatabaseEntry.

Bdb JE Java Collections

JE provides a high level of Collection API, via java's Map, List and other low-level API that encapsulates access operation. More suitable to build persistent data structures.

Construction of persistent queues

public class BdbMessageQueue<T> {

    private static Logger log = Logger.getLogger(BdbMessageQueue.class);

    private static final String MESSAGE_STORE = "message_store";

    private Database messageDb;
    private StoredSortedMap<Long, T> messageMap;

    private TransactionRunner transactionRunner;

    //EnqueueWorker And DequeueWorker of the synchronization object
    private Object syncObject = new Object();

    public BdbMessageQueue(BdbEnvironment bdbEnvironment, String queueName)
            throws DatabaseException {
        try {
            // Set the Berkeley DB config for opening all stores.
            DatabaseConfig dbConfig = new DatabaseConfig();

            // Open the Berkeley DB database for the part, supplier and shipment
            // stores.  The stores are opened with no duplicate keys allowed.
            messageDb = bdbEnvironment.getEnvironment().openDatabase(null, MESSAGE_STORE + queueName, dbConfig);

            EntryBinding messageKeyBinding =
                    new SerialBinding(bdbEnvironment.getJavaCatalog(), Long.class);
            EntryBinding messageValueBinding =
                    new SerialBinding(bdbEnvironment.getJavaCatalog(), Object.class);

            messageMap = new StoredSortedMap(messageDb, messageKeyBinding, messageValueBinding, true);

            // Create transactionRunner for the transactional operation
            transactionRunner = new TransactionRunner(bdbEnvironment.getEnvironment());

        } catch (DatabaseException dbe) {
            throw new VipServiceException(VipErrorCode.DBD_ERROR, dbe);

     *  It is safe to turn off the berkeley database
    public void close() {
        try {
            if (messageDb != null) messageDb.close();
        } catch (DatabaseException dbe) {
            throw new VipServiceException(VipErrorCode.DBD_ERROR, dbe);

     *  Out team
     * @return
    public void dequeue(TaskCallback task) {
        try {
            DequeueWorker worker = new DequeueWorker(task);
        } catch (Exception e) {
            throw new VipServiceException(VipErrorCode.DBD_QUEUE_ERROR, e);

     *  The squad
     * @param message
    public void enqueue(T message) {
        try {
            EnqueueWorker worker = new EnqueueWorker(message);
        } catch (Exception e) {
            throw new VipServiceException(VipErrorCode.DBD_QUEUE_ERROR, e);

     *  Out of the queue transaction Worker, an inner class
    private class DequeueWorker implements TransactionWorker {

        private TaskCallback task;

        private DequeueWorker(TaskCallback task) {
            this.task = task;

        public void doWork() throws Exception {

            Long firstKey;
            T message;

            synchronized (syncObject) {
                // Didn't get the message would not get up
                while ((firstKey = messageMap.firstKey()) == null ||
                        (message = messageMap.get(firstKey)) == null) {
            // If you perform a task when you throw a  RuntimeException, The message is not removed from the queue transaction rollback .BDB  .


            if (log.isDebugEnabled()) {
                log.debug(String.format("DequeueWorker dequeue %1$s. ", message));


     *  Queued transactions Worker, an inner class
    private class EnqueueWorker implements TransactionWorker {
        private T message;

        private EnqueueWorker(T message) {
            this.message = message;

        public void doWork() throws Exception {
            synchronized (syncObject) {
                Long lastKey = messageMap.lastKey();
                lastKey = (lastKey == null) ? 1L : lastKey + 1;
                messageMap.put(lastKey, message);


            if (log.isDebugEnabled()) {
                log.debug(String.format("EnqueueWorker enqueue %1$s. ", message));

Code is long, use the code provided StoredMap JE high-level API.

Text only a code show, if you are interested in Berkeley db, read Oracle's official documentation.

Getting Started Guide

Writing Transactional Applications

Java Collections Tutorial

Getting Started with BDB JE High Availability

分类:Java 时间:2010-04-13 人气:342
blog comments powered by Disqus


iOS 开发

Android 开发

Python 开发



PHP 开发

Ruby 开发






Javascript 开发

.NET 开发



Copyright (C), All Rights Reserved. 版权所有 黔ICP备15002463号-1

processed in 0.784 (s). 12 q(s)