Hbase学习

上一篇 / 下一篇  2012-09-24 14:56:16 / 个人分类:Hbase

  做了个监控sql小工具,将日常环境所产生的sql都存储到mysql里,最近日常自动化用例跑的比较勤,导致mysql数据量过大而查询很慢(几千万条数据),所以改用Hbase。
1、HBase 基本概念

Table

表,同关系数据库中的表的概念

Row Key

每一行的"主键"

Column Family

列族,一个Table由一个或多个Column Family组成

Column

列,可以理解为关系数据库中的列

Timestamp

时间戳,每次插入操作的时间,可以被指定

Versions

版本,HBase中允许每一列存在不同时间戳的数据版本

Region

HBase中一张表被按行切分成多个全局有序的区域,这些全局有序的区域被称为Region

Bytes

HBase中操作数据的基本类,用于生成不同数据类型的数据到byte数组,方便进行比较,hash,以及用来作为HashMap和HashSet的key方便进行比较

2、我的代码

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.taobao.vsearch.common.util.DateUtil;
import com.taobao.web.app1.bean.Biz;
import com.taobao.web.app1.bean.SqlMessage;
import com.taobao.web.app1.bean.hbase.QuerySqlHbase;
import com.taobao.web.app1.bean.hbase.SqlMessageHbaseBean;
import com.taobao.web.app1.bean.result.ResultSqlMessage;
import com.taobao.web.app1.service.BizService;

public class HBaseStoreManager {
    private static final Logger log = LoggerFactory
            .getLogger(HBaseStoreManager.class);
    private static Configuration conf;
    private int poolSize = 100;
    private int timeout = 2000;
    private HTablePool pool;
    private String tableName = "sql_message";
    private String family = "message";
    @Autowired
    private BizService bizService;

    public HBaseStoreManager() {
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public void init() {
        conf = HBaseConfiguration.create();
        conf.set(HConstants.ZOOKEEPER_QUORUM,
                "根据时间情况填写");
        conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Integer.toString(timeout));
        conf.set("zookeeper.znode.parent", "/hbase-rd-test-0.90");
        pool = new HTablePool(conf, poolSize);

    }

    /**
     * family 为bizCode
     * */
    public void insertSql(SqlMessageHbaseBean sql) throws Exception {
        Put put = new Put(ByteBuffer.allocate(16).putLong(sql.getBizId())
                .putLong(sql.getGmtCreate().getTime()).array());
        put.add(Bytes.toBytes(family), Bytes.toBytes(sql.getBizCode()),
                Bytes.toBytes(sql.getInfo()));
        pool.getTable(tableName).put(put);
    }

    public ResultSqlMessage getSqlList(QuerySqlHbase query) throws Exception {
        Scan scan = new Scan();
        /**
         * 根据family和cloum来查询
         * */
        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(query.getBizCode()));
        scan.setBatch(100);
        scan.setCaching(10);
        /**
         * 根据rowkey来查询
         * */
        // System.out.println(new Date(query.getStartTime().getTime())
        // .toLocaleString());
        // System.out.println(new Date(query.getEndTime().getTime())
        // .toLocaleString());
        scan.setStartRow(ByteBuffer.allocate(16).putLong(query.getBizId())
                .putLong(query.getStartTime().getTime()).array());
        scan.setStopRow(ByteBuffer.allocate(16).putLong(query.getBizId())
                .putLong(query.getEndTime().getTime()).array());
        /**
         * filter 使用正则表达式来 对value值进行过滤
         * */
        FilterList filterList = new FilterList();
        if (StringUtils.isNotBlank(query.getHasMessage())) {
            RegexStringComparator comp = new RegexStringComparator(".*"
                    + query.getHasMessage() + "*");
            SingleColumnValueFilter f1 = new SingleColumnValueFilter(
                    Bytes.toBytes(family), Bytes.toBytes(query.getBizCode()),
                    CompareOp.EQUAL, comp);
            filterList.addFilter(f1);
        }

        if (query.getFlag() != null && query.getFlag().length > 0) {
            String s = ".* [";
            for (int i = 0; i < query.getFlag().length; i++) {
                if (i == query.getFlag().length - 1) {
                    s = s + query.getFlag()[i];
                } else {
                    s = s + query.getFlag()[i] + "|";
                }
            }
            s = s + "] *";
            RegexStringComparator comp1 = new RegexStringComparator(s);

            SingleColumnValueFilter f2 = new SingleColumnValueFilter(
                    Bytes.toBytes(family), Bytes.toBytes(query.getBizCode()),
                    CompareOp.EQUAL, comp1);
            filterList.addFilter(f2);

        }
        if (filterList.getFilters().size() > 0) {
            scan.setFilter(filterList);
        }
        scan.setTimeRange(query.getStartTime().getTime(), query.getEndTime()
                .getTime());
        ResultScanner result = pool.getTable(tableName).getScanner(scan);
        ResultSqlMessage sqls = new ResultSqlMessage();
        List<SqlMessage> list = new ArrayList<SqlMessage>();

        for (Result r : result) {
            for (KeyValue kv : r.list()) {
                // System.out.println("family:" +
                // Bytes.toString(kv.getFamily()));
                // System.out.println("qualifier:"
                // + Bytes.toString(kv.getQualifier()));
                // System.out.println("value:" + Bytes.toString(kv.getValue()));
                SqlMessage sql = new SqlMessage();
                sql.setBizId(query.getBizId());
                sql.setMessage(Bytes.toString(kv.getValue()));
                list.add(sql);
            }
        }
        sqls.setCount(list.size());
        sqls.setModule(list);
        result.close();
        return sqls;
    }

    public void delSqlList(QuerySqlHbase query) throws Exception {
        Scan scan = new Scan();
        List<Delete> keys = new ArrayList<Delete>();
        List<Biz> list = bizService.queryAllBiz();
        for (int i = 0; i < list.size(); i++) {
            scan.setStartRow(ByteBuffer.allocate(16).putLong(list.get(i).getId())
                    .putLong(query.getStartTime().getTime()).array());
            scan.setStopRow(ByteBuffer.allocate(16).putLong(list.get(i).getId())
                    .putLong(query.getEndTime().getTime()).array());
            ResultScanner result = pool.getTable(tableName).getScanner(scan);

            for (Result r : result) {
                keys.add(new Delete(r.getRow()));
            }
            result.close();
        }
        pool.getTable(tableName).delete(keys);

    }

    public static void main(String[] args) throws Exception {
        HBaseStoreManager test = new HBaseStoreManager();
        test.init();
        /**
         * 创建表
         * */
         HBaseAdmin admin = new HBaseAdmin(conf);//
         HTableDescriptor desc = new HTableDescriptor(test.tableName);
         desc.addFamily(new HColumnDescriptor("message"));
         //HBaseAdmin负责跟表相关的操作如create,drop等
         admin.createTable(desc);
         //删除表
//         admin.disableTable("sql");
//         admin.deleteTable("sql");
        /**
         * 插入数据
         * */
         SqlMessageHbaseBean sql=new SqlMessageHbaseBean();
         sql.setBizId(1L);
         sql.setBizCode("media");
         sql.setGmtCreate(new Date());
         sql.setInfo("select * from media");
         test.insertSql(sql);
         String
         line="2012-09-18 11:02:38,181 {pstm-125654} SELECT COUNT(*) FROM (select t.id from picture_recommend_reward t where t.accepter_id=181743912 and t.status=1 ) as p";
         Date date = test.getDate(line);
         SqlMessageHbaseBean sql1=new SqlMessageHbaseBean();
         sql1.setBizCode("picturecenter");
         sql1.setBizId(2L);
         sql1.setGmtCreate(date);
         sql1.setInfo(line);
         test.insertSql(sql);
        QuerySqlHbase query = new QuerySqlHbase();
        query.setBizCode("picturecenter");
        query.setBizId(1L);
        query.setStartTime(DateUtil.parseDate("2012-9-17 20:00:00"));
        query.setEndTime(new Date());
        query.setHasMessage("2032738978");
        String[] flags = new String[4];
        flags[0] = "select";
        flags[1] = "delete";
        flags[2] = "update";
        flags[3] = "SELECT";
        query.setFlag(flags);
        System.out.println(test.getSqlList(query).getCount());
    }

    Date getDate(String url) throws Exception {
        String[] sqls = url.split(",");
        String time = sqls[0];
        String[] parsePatterns = { "yyyy-MM-dd HH:mm:ss" };
        Date date = DateUtils.parseDate(time, parsePatterns);
        return date;
    }

}


TAG:

 

评分:0

我来说两句

Open Toolbar