Hadoop默认的partitioner是HashPartitioner,它的实现是这样的:
|
它将每个Key的HashCode对总reducer数取模,转换成partion index。
个人理解这样做有两个目的:
● 所有相同Key的数据在一个Reducer内处理
● 尽量均匀的将数据分配到各个Reducer
但毫无疑问,HashPartitioner不能保证它的Partion之间的有序。
为了保证Partion之间的有序,TeraSort定义了一个TotalOrderPartitioner。
TotalOrderPartitioner首先要解决的问题是,partitioner发生在map里,而每个mapper只处理它自己的一份split数据,它如何知道它所处理的数据在全局所有输入数据里的位置?
回溯到源头,InputFormat有数据的全局观。TeraSort定义的TeraInputFormat有一个重要的功能,就是把对全部数据形成一个摘要文件,以提供给之后的partitioner使用。为了保证保证效率,TeraSort采用抽样来实现摘要。
运行TeraSort后,它做的第一件事情是对输入数据进行抽样,抽样频率由设置项抽样总条数terasort.partitions.sample决定,默认值为100000条。对输入记录分10个区间(或更小)来分批采样,直到采到足够条数;采样完成后对这些抽样点进行排序,然后对排序后记录均分成partitions个区间,最终将这些区间分割点写到文件,文件名为_partition.lst。这个文件会被加到distributed cache里,目的是为了能被hadoop分发到将来运行mapper的每一个TT上去。
有了所有数据的摘要信息,后面的partitioner做起来就有依据了。当它处理一条mapper的输出记录时,它可以按照一种映射算法,依据每条记录的Key与_partition.lst记录的对应信息做比较,将它划分到某一个partition,从而保证partition之间的有序性。
假设我们从_partition.lst得到的Key组合为sample[];某一条记录的Key值为key,如果查找到sample[i-1] <= key < sample[i] , 那么这条记录会被分配到第i个partition,也就是第i个reducer来处理。这样,partition之间也就有序了。
在TeraSort中,构建了一个trie来实现对Key的查找归类(Partitioner的过程其实就是归类,然后把每一类交给一个reducer处理)。TeraSort默认使用2层的Trie,意味着它只用Key的前两个字节与与分割点比较; Trie的非叶子节点有256个子节点(对应着Key的每一个字节的binary code,有256种可能)。
需要提到的是,TeraSort输出的replica数设置是1份,而不是Hadoop默认使用的3份。为什么?因为SortBenchmark没有规定结果要存多份副本,而设置成1份,Hadoop会就近存在本地(如果这个reducer的TT上也同时有DN)。这可节省了不少网络和磁盘消耗,间接的提高了TeraSort的执行效率。