`
guoyunsky
  • 浏览: 838801 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
3d3a22a0-f00f-3227-8d03-d2bbe672af75
Heritrix源码分析
浏览量:203144
Group-logo
SQL的MapReduce...
浏览量:0
社区版块
存档分类
最新评论

基于Berkeley DB实现的持久化队列

阅读更多

         本博客属原创文章,欢迎转载!转载请务必注明出处:http://guoyunsky.iteye.com/blog/1169912

 

      队列很常见,但大部分的队列是将数据放入到内存.如果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.比如爬虫,将要抓取的URL放到内存,而URL过多,内存肯定要爆.在读Heritrix源码中,发现Heritrix是基于Bdb实现了一个持久化队列,于是我就将这块代码独立出来,平时使用也蛮爽的,现在拿出来共享.同时数据已经持久化,相比放在内存的一次性,可以循环累加使用.

      大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.

      这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue,这里贴上代码.以下代码需引用bdb-je,common-io,junit.请在附件中下载

  1.       自定义的BDB环境类,可以缓存StoredClassCatalog并共享
package com.guoyun.util;

import java.io.File;

import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
/**
 * BDB数据库环境,可以缓存StoredClassCatalog并共享
 * 
 * @contributor guoyun
 */
public class BdbEnvironment extends Environment {
    StoredClassCatalog classCatalog; 
    Database classCatalogDB;
    
    /**
     * Constructor
     * 
     * @param envHome 数据库环境目录
     * @param envConfig config options  数据库换纪念馆配置
     * @throws DatabaseException
     */
    public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
        super(envHome, envConfig);
    }

    /**
     * 返回StoredClassCatalog
     * @return the cached class catalog
     */
    public StoredClassCatalog getClassCatalog() {
        if(classCatalog == null) {
            DatabaseConfig dbConfig = new DatabaseConfig();
            dbConfig.setAllowCreate(true);
            try {
                classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
                classCatalog = new StoredClassCatalog(classCatalogDB);
            } catch (DatabaseException e) {
                // TODO Auto-generated catch block
                throw new RuntimeException(e);
            }
        }
        return classCatalog;
    }

    @Override
    public synchronized void close() throws DatabaseException {
        if(classCatalogDB!=null) {
            classCatalogDB.close();
        }
        super.close();
    }

}

 

       2.  基于BDB实现的持久化队列

package com.guoyun.util;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.io.FileUtils;

import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.collections.StoredSortedMap;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseExistsException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.EnvironmentConfig;
/**
 * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭
 * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间
 * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,
 * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现
 * 
 * @contributor guoyun
 * @param <E>
 */
public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements
        Serializable {
    private static final long serialVersionUID = 3427799316155220967L;
    private transient BdbEnvironment dbEnv;            // 数据库环境,无需序列化
    private transient Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化
    private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key为指针位置,Value为值,无需序列化
    private transient String dbDir;                 // 数据库所在目录
    private transient String dbName;				// 数据库名字
    private AtomicLong headIndex;                   // 头部指针
    private AtomicLong tailIndex;                   // 尾部指针
    private transient E peekItem=null;              // 当前获取的值
    
    /**
     * 构造函数,传入BDB数据库
     * 
     * @param db
     * @param valueClass
     * @param classCatalog
     */
    public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){
        this.queueDb=db;
        this.dbName=db.getDatabaseName();
        headIndex=new AtomicLong(0);
        tailIndex=new AtomicLong(0);
        bindDatabase(queueDb,valueClass,classCatalog);
    }
    /**
     * 构造函数,传入BDB数据库位置和名字,自己创建数据库
     * 
     * @param dbDir
     * @param dbName
     * @param valueClass
     */
    public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){
        headIndex=new AtomicLong(0);
        tailIndex=new AtomicLong(0);
        this.dbDir=dbDir;
        this.dbName=dbName;
        createAndBindDatabase(dbDir,dbName,valueClass);
    }
    /**
     * 绑定数据库
     * 
     * @param db
     * @param valueClass
     * @param classCatalog
     */
    public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){
        EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
        if(valueBinding == null) {
            valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化绑定
        }
        queueDb = db;
        queueMap = new StoredSortedMap<Long,E>(
                db,                                             // db
                TupleBinding.getPrimitiveBinding(Long.class),   //Key
                valueBinding,                                   // Value
                true);                                          // allow write
    }
    /**
     * 创建以及绑定数据库
     * 
     * @param dbDir
     * @param dbName
     * @param valueClass
     * @throws DatabaseNotFoundException
     * @throws DatabaseExistsException
     * @throws DatabaseException
     * @throws IllegalArgumentException
     */
    private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,
    DatabaseExistsException,DatabaseException,IllegalArgumentException{
        File envFile = null;
        EnvironmentConfig envConfig = null;
        DatabaseConfig dbConfig = null;
        Database db=null;

        try {
            // 数据库位置
            envFile = new File(dbDir);
            
            // 数据库环境配置
            envConfig = new EnvironmentConfig();
            envConfig.setAllowCreate(true);
            envConfig.setTransactional(false);
            
            // 数据库配置
            dbConfig = new DatabaseConfig();
            dbConfig.setAllowCreate(true);
            dbConfig.setTransactional(false);
            dbConfig.setDeferredWrite(true);
            
            // 创建环境
            dbEnv = new BdbEnvironment(envFile, envConfig);
            // 打开数据库
            db = dbEnv.openDatabase(null, dbName, dbConfig);
            // 绑定数据库
            bindDatabase(db,valueClass,dbEnv.getClassCatalog());
             
        } catch (DatabaseNotFoundException e) {
            throw e;
        } catch (DatabaseExistsException e) {
            throw e;
        } catch (DatabaseException e) {
            throw e;
        } catch (IllegalArgumentException e) {
            throw e;
        }

        
    }
    
    /**
     * 值遍历器
     */
    @Override
    public Iterator<E> iterator() {
        return queueMap.values().iterator();
    }
    /**
     * 大小
     */
    @Override
    public int size() {
        synchronized(tailIndex){
            synchronized(headIndex){
                return (int)(tailIndex.get()-headIndex.get());
            }
        }
    }
    
    /**
     * 插入值
     */
    @Override
    public boolean offer(E e) {
        synchronized(tailIndex){
            queueMap.put(tailIndex.getAndIncrement(), e);   // 从尾部插入
        }
        return true;
    }
    
    /**
     * 获取值,从头部获取
     */
    @Override
    public E peek() {
        synchronized(headIndex){
            if(peekItem!=null){
                return peekItem;
            }
            E headItem=null;
            while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围
                headItem=queueMap.get(headIndex.get());
                if(headItem!=null){
                    peekItem=headItem;
                    continue;
                } 
                headIndex.incrementAndGet();    // 头部指针后移
            }
            return headItem;
        }
    }
    
    /**
     * 移出元素,移出头部元素
     */
    @Override
    public E poll() {
        synchronized(headIndex){
            E headItem=peek();
            if(headItem!=null){
                queueMap.remove(headIndex.getAndIncrement());
                peekItem=null;
                return headItem;
            }
        }
        return null;
    }
    
    /**
     * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
     */
    public void close(){
        try {
            if(queueDb!=null){
                queueDb.sync();
                queueDb.close();
            }
        } catch (DatabaseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (UnsupportedOperationException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    /**
     * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
     */
    @Override
    public void clear() {
       try {
    	   close();
    	   if(dbEnv!=null&&queueDb!=null){
				dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName); 
                dbEnv.close();
           }
	    } catch (DatabaseNotFoundException e) {
	        // TODO Auto-generated catch block
	        e.printStackTrace();
	    } catch (DatabaseException e) {
	        // TODO Auto-generated catch block
	        e.printStackTrace();
	    } finally{
	    	try {
	    		if(this.dbDir!=null){
	    			FileUtils.deleteDirectory(new File(this.dbDir));
	    		}
				
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
	    }
    }
    
}

 

 

3. 测试类,测试数据准确性和性能

package com.guoyun.util;

import java.io.File;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import junit.framework.TestCase;

public class BdbPersistentQueueTest extends TestCase{
    Queue<String> memoryQueue;
    Queue<String> persistentQueue;
    
    @Override
    protected void setUp() throws Exception {
        super.setUp();
        memoryQueue=new LinkedBlockingQueue<String>();
        String dbDir="E:/java/test/bdbDir";
        File file=new File(dbDir);
        if(!file.exists()||!file.isDirectory()){
            file.mkdirs();
        }
        persistentQueue=new BdbPersistentQueue(dbDir,"pq",String.class);
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        memoryQueue.clear();
        memoryQueue=null;
        persistentQueue.clear();
        persistentQueue=null;
    }
    
    /**
     * 排放值
     * @param queue
     * @return      排放的数据个数
     */
    public int drain(Queue<String> queue){
        int count=0;
        while(true){
            try {
                queue.remove();
                count++;
            } catch (Exception e) {
                return count;
            }
        }
       
    }
    /**
     * 
     * @param queue
     * @param size
     */
    public void fill(Queue<String> queue,int size){
        for(int i=0;i<size;i++){
            queue.add(i+"");
        }
    }
    
    public void checkTime(int size){
        System.out.println("1.内存Queue插入和排空数据所耗时间");
        long time=0;
        long start=System.nanoTime();
        fill(memoryQueue,size);
        time=System.nanoTime()-start;
        System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");
        start=System.nanoTime();
        drain(memoryQueue);
        time=System.nanoTime()-start;
        System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒");
        
        System.out.println("2.持久化Queue插入和排空数据所耗时间");
        start=System.nanoTime();
        fill(persistentQueue,size);
        time=System.nanoTime()-start;
        System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000000)+" 豪秒");
        start=System.nanoTime();
        drain(persistentQueue);
        time=System.nanoTime()-start;
        System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000)+" 豪秒");
        
    }
    
    /**
     * 十万数量级测试
     */
    public void testTime_tenThousand(){
        System.out.println("========测试1000000(十万)条数据=================");
        checkTime(100000);
    }
    
    
    /**
     * 百万数量级测试
     */
    public void testTime_mil(){
        System.out.println("========测试1000000(百万)条数据=================");
        checkTime(1000000);
    }
    

    /**
     * 千万数量级测试,注意要防止内存溢出
     */
    public void testTime_tenMil(){
        System.out.println("========测试10000000(千万)条数据=================");
        checkTime(10000000);
    }
    
    /**
     * 测试队列数据准确性
     * @param queue
     * @param queueName
     * @param size
     */
    public void checkDataExact(Queue<String> queue,String queueName,int size){
    	if(queue.size()!=size){
    		System.err.println("Error size of "+queueName);
    	}
    	String value=null;
    	for(int i=0;i<size;i++){
    		value=queue.remove();
    		if(!((i+"").equals(value))){
    			System.err.println("Error "+queueName+":"+i+"->"+value);
    		}
    	}
    }
    
    /**
     * 测试队列中数据的准确性,包括长度
     */
    public void testExact(){
    	int size=100;
    	fill(memoryQueue,size);
    	fill(persistentQueue,size);
    	
    	checkDataExact(memoryQueue,"MemoryQueue",100);
    	checkDataExact(persistentQueue,"PersistentQueue",100);
    	 
    }
    
}

 

4.测试性能

========测试1000000(十万)条数据=================
1.内存Queue插入和排空数据所耗时间
 填充 100000 条数据耗时: 53.550787 毫秒,单条耗时: 535.50787 纳秒
 排空 100000 条数据耗时: 27.09901 毫秒,单条耗时: 270.9901 纳秒
2.持久化Queue插入和排空数据所耗时间
 填充 100000 条数据耗时: 1399.644305 毫秒,单条耗时: 0.01399644305 豪秒
 排空 100000 条数据耗时: 2104.765179 毫秒,单条耗时: 21.04765179 豪秒

 持久化写入是内存写入的26倍,读取是77倍

========测试1000000(百万)条数据=================
1.内存Queue插入和排空数据所耗时间
 填充 1000000 条数据耗时: 699.105888 毫秒,单条耗时: 699.105888 纳秒
 排空 1000000 条数据耗时: 158.792281 毫秒,单条耗时: 158.792281 纳秒
2.持久化Queue插入和排空数据所耗时间
 填充 1000000 条数据耗时: 11978.132218 毫秒,单条耗时: 0.011978132218 豪秒
 排空 1000000 条数据耗时: 22355.617205 毫秒,单条耗时: 22.355617204999998 豪秒

 持久化写入是内存写入的17倍,读取是141倍

 

========测试10000000(千万)条数据=================
1.内存Queue插入和排空数据所耗时间
 填充 10000000 条数据耗时: 9678.377046 毫秒,单条耗时: 967.8377046 纳秒
 排空 10000000 条数据耗时: 1473.416825 毫秒,单条耗时: 147.3416825 纳秒
2.持久化Queue插入和排空数据所耗时间
 填充 10000000 条数据耗时: 151177.036391 毫秒,单条耗时: 0.0151177036391 豪秒
 排空 10000000 条数据耗时: 361642.655135 毫秒,单条耗时: 36.164265513500006 豪秒

 持久化写入是内存写入的15倍,读取是245倍

可以看出写入和遍历一条都是在毫秒级别,还有千万级的数据,BDB的性能着实牛逼.而且随着数据的增多,写的时间在缩短,读的时间在增长.

 

 

更多技术文章、感悟、分享、勾搭,请用微信扫描:

      

分享到:
评论
7 楼 ArtsCrafts 2015-04-28  
如果使用StoredMap,关闭之后 数据就没有了?
6 楼 bob_sense 2015-04-03  
:idea:    
5 楼 bob_sense 2015-04-03  
引用
4 楼 km1122 2014-07-25  
你好!BerkeleyDB使用StoredMap,结束后重新启动,获取不到数据?
3 楼 菜菜土人 2012-08-16  
transient 
2 楼 hailong886 2012-06-01  
如果要键值映射的话,还需要一个桥梁
1 楼 hailong886 2012-06-01  
大师级人物,顶起

相关推荐

    Berkeley DB4.8以上各版本

    Berkeley DB4.8以上各版本,已经亲测过哪些版本可与redhat6.5兼容,见附件名称备注。

    Berkeley DB Java 版 4.0.92

    Oracle Berkeley DB Java 版是一个开源的、可嵌入的事务存储引擎,是完全用 Java 编写的。与 Oracle Berkeley DB 类似,Oracle Berkeley DB Java 版在应用程序的地址空间中执行,没有客户端/服务器通信的开销,从而...

    Berkeley DB 读取样例

    嵌入式数据库Berkeley DB Java Edition Berkeley DB的使用 使用Berkeley DB的一般步骤 创建、打开、操作、关闭数据库环境Environment

    Berkeley DB

    Berkeley DB(BDB)是OpenLDAP后台数据库的默认配置,因此在安装OpenLDAP之前应先安装BDB。

    Berkeley DB数据库 6.2.32 64位

    Berkeley DB 6.2.32_64.msi Windows 64-bit binary installer Berkeley DB是一个嵌入式数据库,为应用程序提供可伸缩的、高性能的、有事务保护功能的数据管理服务。 主要特点: 嵌入式:直接链接到应用程序中,与...

    BerkeleyDB测试程序

    BerkeleyDB测试程序 包含散列文件入库,和读取的速度的测试

    基于Berkeley DB的机房环境监控系统

    机房控制系统中小机房环境监控短信报警方案1 标签: 机房短信报警,机房短信监控,机房环境监控,中小机房环境监控 本文主要介绍最实用的中小机房环境监控短信报警管理方案....便可实现实时的监控和管理,克服

    Berkeley DB 5.3.21.tar

    Berkeley DB 5.3.21.tar,你也可以去http://www.oracle.com/technetwork/products/berkeleydb/downloads/index.html下载最新版

    Berkeley DB文章集合

    Berkeley DB文章集合

    BerkeleyDB

    BerkeleyDB的java应用jar包

    Berkeley DB数据库最新版

    Berkeley DB6.0.20 Berkeley DB BDB Berkeley DB数据库

    Berkeley DB C++编程入门教

    介绍DB API的设置与使用的快速入门手册,目标是提供一个快速有效地机制,能让你进入Berkeley DB研发的世界。在本文中侧重于C++语言的研发人员,以及研究进城内数据管理解决方案的资深架构师。

    berkeley db使用手册

    berkeley db 使用手册

    Berkeley DB数据库支持事物的C++语言入门教程

    本文描述了如何在Berkeley DB中使用事务(Transaction)。它简要介绍了事务是如何保护你的应用的数据的...本书假设你已经了解BerkeleyDB的基本架构知识(这些知识在&lt;Getting Started with Berkeley DB Guide&gt;一书中。)

    sqlite PK Berkeley DB

    sqlite 和Berkeley db各方面 的比较

    BerkeleyDB Java Edition用户手册

    Java版本的Berkeley DB用户手册,找了好久

    BerkeleyDB-0.26

    BerkeleyDB和Sqlite是当前最流行的嵌入式开源数据库。

    SQL 开发人员 Oracle Berkeley DB 指南

    不是所有的 SQL 应用程序都应该在 Oracle Berkeley DB 实施( Oracle Berkeley DB 是一个开放源的嵌入式数据库引擎,提供了快速、可靠、本地的持久性,无需管理),但如果您有一系列相对固定的查询且很关心性能,...

    Berkeley DB 资料

    Berkeley DB 批量插入更新与删除用法示例 Berkeley 函数接口 Berkeley DB 由浅入深 SQL 开发人员 Oracle Berkeley DB 指南 Berkeley DB参考手册

Global site tag (gtag.js) - Google Analytics