大数据框架整理

发表于:2017-1-22 09:55

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:水分子Andy    来源:51Testing软件测试网采编

分享:
  kafka
  1、kafka和jms的区别
  2、kafka的topic理解
  topic是逻辑存在的,真正在物理磁盘中的体现是partitioner,一个topic可以对应多个partition,不同的paritition存放在不同的broker中,以提高并发存储能力。
  3、partitioner
  partition是topic信息在屋里存储中的具体体现,在磁盘中它是一个文件夹,名字是topic名字_partition编号。4、segment
  每个partition对对应多个segment文件,默认大小是1G,为了快速定位到指定的offset位置。
  5、kafka为什么这么快
  1/使用了操作系统使用的pagecache缓存,缓存大,缓存到一定量的数据时,以顺序写入的方    式写入到磁盘中。
  因为:磁盘顺序写入的方式非常的快=>600MB/s,而随机存储只有100kb/s左右。
  2/使用操作系统的sendfile技术。在读取信息发送的时候,不需要经过用户区,而是在os端直接发送,可以减少很多步骤。
  6、为什么要多个partitioner7、为什么每个partitioner需要切分为多个segment文件
  8、kafka的HA
  对partitioner分区进行备份,利用zookeeper的选举机制选择leader。数据的生产存储和消费读取都是有leader负责,其他的replicatition只是负责备份而已。
  9、kafka如何用shell脚本来讲一个文件读写进去?10、kafka如何用JavaAPI实现生产者和消费者?
  大数据一站式解决方案:Scala和Spark部分
  scala回顾
  1、如何定义变量
  2、如何定义函数、方法,如何在将函数作为方法的参数传入进去?
  3、条件判断语句,循环控制语句
  4、集合操作:Array、list、set、tuple、map    (注意:可变和不可变的区别)5、样例类的使用6、trit、抽象类的使用7、主构造器和辅助构造器的使用
  8、scala的高级特性
  高阶函数:作为值得函数、匿名函数、闭包、柯里化
  隐式转换:一个类对象中,如果他没有摸一个功能,但是我们有想要它实现,可以使用英式转换的方式。
  object MyPredef{
  //定义隐式转换方法
  implicit def fileReadToRichFile(file: File)=new RichFile(file)
  }
  使用:
  import MyPredef._9、Actor
  写起来像多线程,用起来像socket10、akka
  ActorSystem.actorOf()创建一个Actor,
  创建的同时,就是执行Actor中的prestart方法,去初始化一些信息。
  Spark RDD
  1、SparkRDD叫做:弹性分布式数据集,其实就是一个类,用来描述:任务的数据从哪里读取、用那个算进行计算、得到的结果有存放在哪里、RDD之间的依赖关系是款以来还是窄依赖
  2、RDD有五个特点
  一系列分区
  每个算子作用在每个分区上
  一系列依赖关系
  最有位置(如果从HDFS上读取数据)
  3、RDD的两种算子Transformation和Action
  Transformation是懒加载,只是定义了这个算子的任务,该如何做,但是还没有做。
  Action是立即执行,当执行到Action时,会触发DAGSchudle切分stage,切分完成后,有TaskScheduler将任务通过DriverActor发送到executor中执行。
  4、RDD的几个复杂的Transformation
  ->combineByKey(x=>x,(a:List[String],b:String) => a :+ b,
  (m:List[String],n:List[String])=> m ++ n)
  第一个参数表示分组后的第一个值如何处理,
  第二个参数表示后续的值和前一个值如何处理,
  第三个参数表示,map端处理完成后,在reduce端如何对这些list进行处理。
  ->aggregate("初始量,可以是String也可以是int")(第一个func,第二个func)
  初始量作用于没一个分区,第一个func作用于map端,第二个func作用于reduce端。
  ->reduceByKey(_+_)  作用于map端和reduce端,可以进行局部聚合。
  其实reduceByKey和aggregateByKey在底层都调用了combineByKey方法来实现响应的功能。
  ->mapPartitions
  对每一个分区进行操作,直接在里面使用匿名函数即可
  当然如果逻辑非常复杂也是可以考虑在外面先定义好这个函数之后在传输进去。
  rdd1.mapPartitions((it:Iterator[String]) => {
  it.toList.map(x => (x,1)).iterator
  })
  ->mapPartitionsWithIndex
  首先定义一个函数,当然也可以写在里面作为匿名函数
  val func = (index:Int, it:Iterator[Int]) => {
  it.toList.map(x => ("index:" + index, x)).iterator
  }
  rdd1.mapPartitionsWithIndex(func).collect
  5、RDD自定义Partitioner
  //自定义分区器,重写里面的getPartition方法和numPartitions方法。
  //构造这个对象的时候,就把所有情况的信息传输过来,然后在里面进行分类处理。
  class HostPartition(hostArr:Array[String]) extends Partitioner{
  //对所有的数据进行分类,每一种类型对应一个int编号。所以使用map比较合适。
  val map = new mutable.HashMap[String,Int]()
  for(index <- 0 until(hostArr.length)){
  map.put(hostArr(index),index)
  }
  //重写getPartition的方法。
  override def getPartition(key: Any): Int = {
  map.getOrElse(key.toString,0)
  }
  override def numPartitions: Int = hostArr.length
  }
  应用:
  val hostPartition: HostPartition = new HostPartition(hostList)
  val allPartitionRDD: RDD[(String, (String, Int))] = host_url_count.partitionBy(hostPartition)
  6、自定义排序规则  ==>定义一个
  case class Gril(yanzhi:Int,nianling:Int) extends Ordered[Gril] with Serializable{
  override def compare(that: Gril): Int = {
  val yanzhiResult: Int = this.yanzhi.compareTo(that.yanzhi)
  if(yanzhiResult == 0){
  return this.nianling.compareTo(that.nianling)
  }
  return yanzhiResult
  }
  }
  应用:
  val rdd2: RDD[(String, Int, Int)] = rdd1.sortBy(msg => Gril(msg._2,msg._3))
  Spark的SQLContext 1、Spark整合Hive和HDFS 只需要将Hive的hive-site.xml ; hadoop的core-site.xml和hdfs-site.xml拷贝到Spark的conf目录下即可。Spark就知道如何使用hive的表,同时也知道去哪个NameNode哪里都数据了。
  2、DataFrame是什么?
  是一个分布式数据集,对RDD的封装。RDD有的方法他基本上都有
  3、DataFrame如何创建?
  三种方式:->RDD + case class
  ->RDD + structType
  ->sqlContext.read.format.options(Map())
  4、DataFrame首先需要注册成表结构之后才可以使用sqlContext来操作。
  dF.registerTempTable("person")
  5、使用sqlContext  ==> 返回一个DataFrame
  sqlContext.sql("select * from person")
  6、DataFrame将数据写入到HDFS或者mysql中
  val prop = new Properties()
  prop.put("user", "root")
  prop.put("password", "815325")
  //如果数据库中没有这个表,那么他也会创建一张表(很强大)
  resultDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata","result",prop)
22/2<12
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号