Hbase学习
上一篇 / 下一篇 2012-09-24 14:56:16 / 个人分类:Hbase
做了个监控sql小工具,将日常环境所产生的sql都存储到mysql里,最近日常自动化用例跑的比较勤,导致mysql数据量过大而查询很慢(几千万条数据),所以改用Hbase。
1、HBase 基本概念
1、HBase 基本概念
Table
表,同关系数据库中的表的概念
Row Key
每一行的"主键"
Column Family
列族,一个Table由一个或多个Column Family组成
Column
列,可以理解为关系数据库中的列
Timestamp
时间戳,每次插入操作的时间,可以被指定
Versions
版本,HBase中允许每一列存在不同时间戳的数据版本
Region
HBase中一张表被按行切分成多个全局有序的区域,这些全局有序的区域被称为Region
Bytes
HBase中操作数据的基本类,用于生成不同数据类型的数据到byte数组,方便进行比较,hash,以及用来作为HashMap和HashSet的key方便进行比较
2、我的代码
import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import com.taobao.vsearch.common.util.DateUtil; import com.taobao.web.app1.bean.Biz; import com.taobao.web.app1.bean.SqlMessage; import com.taobao.web.app1.bean.hbase.QuerySqlHbase; import com.taobao.web.app1.bean.hbase.SqlMessageHbaseBean; import com.taobao.web.app1.bean.result.ResultSqlMessage; import com.taobao.web.app1.service.BizService; public class HBaseStoreManager { private static final Logger log = LoggerFactory .getLogger(HBaseStoreManager.class); private static Configuration conf; private int poolSize = 100; private int timeout = 2000; private HTablePool pool; private String tableName = "sql_message"; private String family = "message"; @Autowired private BizService bizService; public HBaseStoreManager() { } public void setTimeout(int timeout) { this.timeout = timeout; } public void setPoolSize(int poolSize) { this.poolSize = poolSize; } public void init() { conf = HBaseConfiguration.create(); conf.set(HConstants.ZOOKEEPER_QUORUM, "根据时间情况填写"); conf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Integer.toString(timeout)); conf.set("zookeeper.znode.parent", "/hbase-rd-test-0.90"); pool = new HTablePool(conf, poolSize); } /** * family 为bizCode * */ public void insertSql(SqlMessageHbaseBean sql) throws Exception { Put put = new Put(ByteBuffer.allocate(16).putLong(sql.getBizId()) .putLong(sql.getGmtCreate().getTime()).array()); put.add(Bytes.toBytes(family), Bytes.toBytes(sql.getBizCode()), Bytes.toBytes(sql.getInfo())); pool.getTable(tableName).put(put); } public ResultSqlMessage getSqlList(QuerySqlHbase query) throws Exception { Scan scan = new Scan(); /** * 根据family和cloum来查询 * */ scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(query.getBizCode())); scan.setBatch(100); scan.setCaching(10); /** * 根据rowkey来查询 * */ // System.out.println(new Date(query.getStartTime().getTime()) // .toLocaleString()); // System.out.println(new Date(query.getEndTime().getTime()) // .toLocaleString()); scan.setStartRow(ByteBuffer.allocate(16).putLong(query.getBizId()) .putLong(query.getStartTime().getTime()).array()); scan.setStopRow(ByteBuffer.allocate(16).putLong(query.getBizId()) .putLong(query.getEndTime().getTime()).array()); /** * filter 使用正则表达式来 对value值进行过滤 * */ FilterList filterList = new FilterList(); if (StringUtils.isNotBlank(query.getHasMessage())) { RegexStringComparator comp = new RegexStringComparator(".*" + query.getHasMessage() + "*"); SingleColumnValueFilter f1 = new SingleColumnValueFilter( Bytes.toBytes(family), Bytes.toBytes(query.getBizCode()), CompareOp.EQUAL, comp); filterList.addFilter(f1); } if (query.getFlag() != null && query.getFlag().length > 0) { String s = ".* ["; for (int i = 0; i < query.getFlag().length; i++) { if (i == query.getFlag().length - 1) { s = s + query.getFlag()[i]; } else { s = s + query.getFlag()[i] + "|"; } } s = s + "] *"; RegexStringComparator comp1 = new RegexStringComparator(s); SingleColumnValueFilter f2 = new SingleColumnValueFilter( Bytes.toBytes(family), Bytes.toBytes(query.getBizCode()), CompareOp.EQUAL, comp1); filterList.addFilter(f2); } if (filterList.getFilters().size() > 0) { scan.setFilter(filterList); } scan.setTimeRange(query.getStartTime().getTime(), query.getEndTime() .getTime()); ResultScanner result = pool.getTable(tableName).getScanner(scan); ResultSqlMessage sqls = new ResultSqlMessage(); List<SqlMessage> list = new ArrayList<SqlMessage>(); for (Result r : result) { for (KeyValue kv : r.list()) { // System.out.println("family:" + // Bytes.toString(kv.getFamily())); // System.out.println("qualifier:" // + Bytes.toString(kv.getQualifier())); // System.out.println("value:" + Bytes.toString(kv.getValue())); SqlMessage sql = new SqlMessage(); sql.setBizId(query.getBizId()); sql.setMessage(Bytes.toString(kv.getValue())); list.add(sql); } } sqls.setCount(list.size()); sqls.setModule(list); result.close(); return sqls; } public void delSqlList(QuerySqlHbase query) throws Exception { Scan scan = new Scan(); List<Delete> keys = new ArrayList<Delete>(); List<Biz> list = bizService.queryAllBiz(); for (int i = 0; i < list.size(); i++) { scan.setStartRow(ByteBuffer.allocate(16).putLong(list.get(i).getId()) .putLong(query.getStartTime().getTime()).array()); scan.setStopRow(ByteBuffer.allocate(16).putLong(list.get(i).getId()) .putLong(query.getEndTime().getTime()).array()); ResultScanner result = pool.getTable(tableName).getScanner(scan); for (Result r : result) { keys.add(new Delete(r.getRow())); } result.close(); } pool.getTable(tableName).delete(keys); } public static void main(String[] args) throws Exception { HBaseStoreManager test = new HBaseStoreManager(); test.init(); /** * 创建表 * */ HBaseAdmin admin = new HBaseAdmin(conf);// HTableDescriptor desc = new HTableDescriptor(test.tableName); desc.addFamily(new HColumnDescriptor("message")); //HBaseAdmin负责跟表相关的操作如create,drop等 admin.createTable(desc); //删除表 // admin.disableTable("sql"); // admin.deleteTable("sql"); /** * 插入数据 * */ SqlMessageHbaseBean sql=new SqlMessageHbaseBean(); sql.setBizId(1L); sql.setBizCode("media"); sql.setGmtCreate(new Date()); sql.setInfo("select * from media"); test.insertSql(sql); String line="2012-09-18 11:02:38,181 {pstm-125654} SELECT COUNT(*) FROM (select t.id from picture_recommend_reward t where t.accepter_id=181743912 and t.status=1 ) as p"; Date date = test.getDate(line); SqlMessageHbaseBean sql1=new SqlMessageHbaseBean(); sql1.setBizCode("picturecenter"); sql1.setBizId(2L); sql1.setGmtCreate(date); sql1.setInfo(line); test.insertSql(sql); QuerySqlHbase query = new QuerySqlHbase(); query.setBizCode("picturecenter"); query.setBizId(1L); query.setStartTime(DateUtil.parseDate("2012-9-17 20:00:00")); query.setEndTime(new Date()); query.setHasMessage("2032738978"); String[] flags = new String[4]; flags[0] = "select"; flags[1] = "delete"; flags[2] = "update"; flags[3] = "SELECT"; query.setFlag(flags); System.out.println(test.getSqlList(query).getCount()); } Date getDate(String url) throws Exception { String[] sqls = url.split(","); String time = sqls[0]; String[] parsePatterns = { "yyyy-MM-dd HH:mm:ss" }; Date date = DateUtils.parseDate(time, parsePatterns); return date; } } |
TAG:
我的栏目
标题搜索
日历
|
|||||||||
日 | 一 | 二 | 三 | 四 | 五 | 六 | |||
1 | 2 | 3 | 4 | 5 | 6 | ||||
7 | 8 | 9 | 10 | 11 | 12 | 13 | |||
14 | 15 | 16 | 17 | 18 | 19 | 20 | |||
21 | 22 | 23 | 24 | 25 | 26 | 27 | |||
28 | 29 | 30 |
我的存档
数据统计
- 访问量: 110844
- 日志数: 75
- 建立时间: 2011-10-31
- 更新时间: 2015-09-07