达观数据分析平台架构和Hive实践

发表于:2017-8-03 10:11

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:wangjj89621    来源:博客园

  3 Hive分析实践
  3.1 Schema设计
  没有通用的schema,只有合适的schema。在设计Hive的schema的时候,需要考虑到存储、业务上的高频查询造成的开销等等,设计适合自己的数据模型。
  设置分区表
  对于Hive来说,利用分区来设计表总是必要的,分区提供了一种隔离数据和优化查询的便利的方式。设置分区时,需要考虑被设置成分区的字段,按照时间分区一般而言就是一个好的方案,其好处在于其是按照不同时间粒度来确定合适大小的数据积累量,随着时间的推移,分区数量的增长是均匀的,分区的大小也是均匀的。
  避免小文件
  虽然分区有利于隔离数据和查询,设置过多过细的分区也会带来瓶颈,主要是因为过多的分区意味着文件的数目就越多,过多增长的小文件会给namecode带来巨大的性能压力。同时小文件过多会影响JOB的执行,hadoop会将一个job转换成多个task,即使对于每个小文件也需要一个task去单独处理,带来性能开销。因此,hive表设计的分区不应该过多过细,每个目录下的文件足够大,应该是文件系统中块大小的若干倍。
  选择文件格式
  Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。
  在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。rcfile的聚合运算不一定总是存在,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。
  3.2 统计分析
  本节将从排序和窗口函数两个方面的介绍Hive的统计分析功能。
  排名热门排名在实际的业务场景中经常遇见。例如最受欢迎的书籍、销量TOP100的商品等等。再实际情况下,我们不仅需要考虑各量化指标,还需要考虑置信度问题。
  最简单的排名:ORDER BY value LIMIT n
  上述查询仅仅考虑了量化指标,排名不够平滑,波动较大。
  各种排名方法众多,达观数据分析平台在进行item 排名多采用基于用户投票的排名算法。如基于威尔逊区间的排名算法,该算法可以较好的解决小样本的不准确问题。
  图:威尔逊区间
  窗口分析函数
  Hive提供了丰富了数学统计函数,同时也提供了用户自定义函数的接口,用户可以自定义UDF、UDAF、UDTF Hive 0.11版本开始提供窗口和分析函数(Windowing and Analytics Functions),包括LEAD、LAG、FIRST_VALUE、LAST_VALUE、RANK、ROW_NUMBER、PERCENT_RANK、CUBE、ROLLUP等。窗口函数与聚合函数一样,都是对表子集的操作,从结果上看,区别在于窗口函数的结果不会聚合,原有的每行记录依然会存在。窗口函数的典型分析应用包括:按分区聚合(排序,top n问题)、行间计算(时间序列分析)、关联计算(购物篮分析)。
  我们以一个简单的行间计算的例子说明窗口函数的应用(关于其他函数的具体说明,请参考hive文档)。用户阅读行为的统计分析需要从点击书籍行为中归纳统计出来。用户浏览日志结构如下表所示,每条记录为用户的单次点击行为。
  通过对连续的用户点击日志分析,通过Hive提供的窗口分析函数可以计算出用户各章节的阅读时间。
  SELECT userid, bookid, chapterid, end_time – start_time as read_time
  FROM
  (
      SELECT userid, bookid, chapterid, log_time as start_time, 
      lead(log_time,1,null) over(partition by userid, bookid order by log_time) as end_time  
      FROM user_read_log where pt=’2015-12-01’
  ) t;
  通过上述查询既可以找出2015-12-01日所有用户对每一章节的阅读时间。只能通过开发mr代码或者实现udaf来实现上述功能。
  窗口分析函数关键在于定义的窗口数据集及其对窗口的操作,通过over(窗口定义语句)来定义窗口。日常分析和实际应用中,经常会有窗口分析应用的场景,例如基于分区的排序、集合、统计等复杂操作。例如我们需要统计每个用户阅读时间最多的3本书:
  图:行间计算示意图及代码
  窗口函数使得Hive的具备了完整的数据分析功能,在实际的应用环境中,达观数据分析团队大量使用hive窗口分析函数来实现较为复杂的逻辑,提高开发和迭代效率。
  3.3 用户画像
  用户画像即基于真实数据的用户模型。简单来说,用户画像提取了用户的属性信息、行为信息,从而归纳统计出其人口学特征、偏好特征等。建立用户模型的首要任务就是提取特征,既包括用户基本特征,也包括行为特征和统计特征。
  用户模型本质上就是刻画用户兴趣的模型,而用户的兴趣模型是多维度、多尺度的。刻画用户模型还需要从时间上进行度量,甚至是进行多尺度的组合,根据用户行为统计时间的长短,可以将用户的偏好分为短期偏好和长期偏好。偏好的权重即为用户的偏好程度的度量。
  对用户偏好的描述,还需要考虑置信度的问题,例如对于一个阅读行为极其稀疏的用户来说,刻画其阅读类别偏好是毫无意义的。
  图:用户画像刻画
  3.4 反作弊分析
  众所周知,存在排名就可能存在作弊。搜索广告、索互联网刷单、刷榜现象层出不穷。一般来说,作弊的目的都是为了提高自己的排名,或者是降低对手的排名。利用Hive对数据进行分析可以过滤掉较明显的作弊数据,达到数据清洗的目的。
  例如对于一个刷榜作弊行为,需要作弊着不断刷日志行为来提高其排名,我们可以指定若干规则来过滤作弊数据。如同IP同物品同行为数目异常、同用户ID行为频次异常、同物品ID行为频次异常等等。如下图,如果相比于所有item的平均增长趋势,如果某item的增长趋势相对平均水平过大,那么其作弊的概率就比较高。
  图:作弊数据趋势与平均趋势数据对比
  作弊分析还需要结合业务需求和特点,采用合适的机器学习算法来进行更进一步的判断和过滤,达到反作弊的目标。
  4 Hive优化
  达观的数据仓库基于Hive搭建,每日需要处理大量的计算流程,Hive的稳定性和性能至关重要。众多的任务需要我们合理的调节分配集群资源,合理的配置各参数,合理的优化查询。Hive优化包含各个方面,如job个数优化、job的map/reducer个数优化、并行执行优化等等,本节将主要讨论HQL中的无时不在的JOIN的优化经验。
  4.1 Join语句
  对于上述的join语句,其中book_info表数量为千规模,
  INSERT OVERWRITE TABLE read_log_tmp
  SELECT a.userid,a.bookid,b.author 
  FROM user_read_log a JOIN book_info b ON a.bookid = b.bookid;
  该语句的执行计划为:
  图:map join的任务执行流程
  对于小数据量,hive会自动采取map join的方式来优化join,从mapreduce的编程模型来看,实现join的方式主要有map端join、reduce端join。Map端join利用hadoop 分布式缓存技术通过将小表变换成hashtable文件分发到各个task,map大表时可以直接判断hashtable来完成join,注意小表的hashtable是放在内存中的,在内存中作匹配,因此map join是一种非常快的join方式,也是一种常见的优化方式。如果小表够小,那么就可以以map join的方式来完成join完成。Hive通过设置hive.auto.convert.join=true(默认值)来自动完成map join的优化,而无需显示指示map join。缺省情况下map join的优化是打开的。
  Reduce端join需要reducer来完成join过程,对于上述join代码,reduce 端join的mr流程如下,
  图:reduce端join的mapreduce过程
  相比于map join, reduce 端join无法再map过程中过滤任何记录,只能将join的两张表的所有数据按照join key进行shuffle/sort,并按照join key的hash值将<key,value>对分发到特定的reducer。Reducer对于所有的键值对执行join操作,例如0号(bookid的hash值为0)reducer收到的键值对如下,其中T1、T2表示记录的来源表,起到标识作用:
  图:reduce端join的reducer join
  Reducer端join无法避免的reduce截断以及传输的大量数据都会给集群网络带来压力,从上图可以看出所有hash(bookid) % reducer_number等于0的key-value对都会通过shuffle被分发到0号reducer,如果分到0号reducer的记录数目远大于其他reducer的记录数目,显然0号的reducer的数据处理量将会远大于其他reducer,因此处理时间也会远大于其他reducer,甚至会带来内存等其他问题,这就是数据倾斜问题。对于join造成的数据倾斜问题我们可以通过设置参数set Hive.optimize.skewjoin=true,让hive自己尝试解决join过程中产生的倾斜问题。
  4.2 Group by
  语句我们对user_read_log表按userid goup by语句来继续探讨数据倾斜问题,首先我们explain group by语句:
  explain select userid,count(*) from user_read_log group by userid
  图:goup by的执行计划
  Group by的执行计划按照userid的hash值分发数据,同时在map端也做了本地reduce,group by的shuffle过程是按照hash(userid)来分发的,实际应用中日志中很多用户都是未注册用户或者未登录,userid字段为空的记录数远大于userid不为空的记录数,当所有的空userid记录都分发到特定某一个reducer后,也会带来严重的数据倾斜问题。造成数据倾斜的主要原因在于分发到某个或某几个reducer的数据量远大于其他reducer的数据量。
  对于group by造成的数据倾斜问题,我们可以通过设置参数
  set hive.map.aggr=true (开启map端combiner);
  set hive.groupby.skewindata=true;
  这个参数的作用是做reduce操作的时候,拿到的key并不是所有相同值给同一个Reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。虽然多了一轮MR任务,但是可以有效的减少数据倾斜问题可能带来的危险。
  Hive解决数据倾斜
  正确的设置Hive参数可以在某种程度上避免的数据倾斜问题,合适的查询语句也可以避免数据倾斜问题。要尽早的过滤数据和裁剪数据,减少后续处理的数据量,使得join key的数据分布较为均匀,将空字段随机赋予值,这样既可以均匀分发倾斜的数据:
  select userid,name from user_info a
  join (
      select case when userid is null then cast(rand(47)*100000 as int)
      else userid 
      from user_read_log
  ) b on a.userid = b.userid
  如果用户在定义schema的时候就已经预料到表数据可能会存在严重的数据倾斜问题,Hive自0.10.0引入了skew table的概念,如建表语句
  CREATE TABLE user_read_log (userid int,bookid, …)
  SKEWED BY (userid) ON (null) [STORED AS DIRECTORIES];
  需要注意的是,skew table只是将倾斜特别严重的列的分开存储为不同的文件,每个制定的倾斜值制定为一个文件或者目录,因此在查询的时候可以通过过滤倾斜值来避免数据倾斜问题:
  select userid,name from user_info a
  join (
  select userid from user_read_log where pt=’2015’ and userid is not null
  ) b on a.userid = b.userid
  可以看出,如果不加过滤条件,倾斜问题还是会存在,通过对skew table加过滤条件的好处是避免了mapper的表扫描过滤操作。
  4.3 Join的物理优化
  Hive内部实现了MapJoinResolver(处理MapJoin)、SkewJoinResolver(处理倾斜join)、CommonJoinResolver(处理普通Join)等类来实现join的查询物理优化(/org/apache/hadoop/hive/ql/optimizer/physical)。
  CommonJoinResolver类负责将普通Join转换成MapJoin,Hive通过这个类来实现mapjoin的自动优化。对于表A和表B的join查询,会产生3个分支:
  1) 以表A作为大表进行Mapjoin;
  2) 以表A作为大表进行Mapjoin;
  3) Map-reduce join
  由于不知道输入数据规模,因此编译时并不会决定走那个分支,而是在运行时判断走那个分支。需要注意的是要像完成上述自动转换,需要将hive.auto.convert.join.noconditionaltask设置为true(默认值),同时可以手工控制转载进内存的小表的大小(hive.auto.convert.join.noconditionaltask.size)。
  MapJoinResolver 类负责迭代各个mr任务,检查每个任务是否存在map join操作,如果有,会将local map work转换成local map join work。
  SkewJoinResolver类负责迭代有join操作的reducer任务,一旦单个reducer产生了倾斜,那么就会将倾斜值得数据写入hdfs,然后用一个新的map join的任务来处理倾斜值的计算。虽然多了一轮mr任务,但是由于采用的map join,效率也是很高的。良好的mr模式和执行流程总是至关重要的。
  5 总结
  本文详细介绍了达观大数据分析平台的基本架构和原理,基于hadoop/hive的大数据分析平台使海量数据的存储、分析、挖掘逐步成为现实,并带来意想不到的益处。作为数据分析平台主力军的Hive仍然处在不断的发展之中,将HQL理解成Mapreduce程序、理解Hadoop的核心能力是更好的使用和优化Hive的根本。达观数据团队也将紧跟技术发展潮流,结合自身的业务需求,采取合理的框架架构,提升数据平台的处理能力。
22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号