hadoop中mapreduce部分执行流程
上一篇 / 下一篇 2011-11-27 16:28:08 / 个人分类:测试经验
查看( 796 ) /
评论( 1 )
[p=30, 2, left]最近看了hadoop的mapreduce部分代码,看了之后总结了一下,算是成果吧。以下是程序执行的主要流程,其中参考了网上的一些文章。[/p]51Testing软件测试网X"p)N+N-n5c#Y(X
)wy"dI$c%mQB6DU0 [p=30, 2, left]概括[/p]
6c ` mD7N)c8I7E/R1}0 [p=30, 2, left]Hadoop包括hdfs与mapreduce两部分,在试用期期间我主要看了mapreduce部分,即hadoop执行作业的部分。[/p]
[p=30, 2, left]mapreduce整体上可以分为这么几条执行的线索,jobclient,JobTracker与TaskTracker。[/p]
4Z(^;J_,|:B Mf)O+v0
E|"yh%gu"C0 51Testing软件测试网 S8^#vl)Ga$Bv"o
[p=30, 2, left]下图简单的描述了三者之间的关系:[/p]
1@7D J"PY:UKA0
6e@b+j \'g0
0gg-E8oITOA0 [p=30, 2, left]2.2 TaskInProgress[/p][p=30, 2, left]JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。[/p]
f6i pV3oeW3Q:Z0 51Testing软件测试网 sDy)kZkH
[p=30, 2, left]2.3 MapTask和ReduceTask[/p][p=30, 2, left]一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。[/p]
I*W7Ra \`0
%@:?.y R1U0
HP)OxeD0 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空,51Testing软件测试网B Tj;y+F!c-L"H
则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程。slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。以下是简略示意图:[/p]51Testing软件测试网tg,d_Q{`P`
51Testing软件测试网 zi4k.^.bz6h`~'@4o
51Testing软件测试网s6AKvX%V
[p=30, 2, left]下图比较详细的解释了程序的流程:[/p]51Testing软件测试网-^i;NA6]]"l
%AHF4lk2`0
i.DsaS/iXc0
读取hadoop配置[/p][p=30, 2, left]Job job = new Job(conf, "作业名称"); //
f k|N"m4Z?0[3X0 实例化一道作业[/p][p=30, 2, left]job.setMapperClass(Mapper类型);[/p][p=30, 2, left]job.setCombinerClass(Combiner类型);[/p][p=30, 2, left]job.setReducerClass(Reducer类型);[/p][p=30, 2, left]job.setOutputKeyClass(输出Key的类型);[/p][p=30, 2, left]job.setOutputValueClass(输出Value的类型);[/p][p=30, 2, left]FileInputFormat.addInputPath(job, new Path(输入hdfs路径));[/p][p=30, 2, left]FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径));[/p][p=30, 2, left]//
c c'Q [8YdT.a5Y0 其它初始化配置[/p][p=30, 2, left]JobClient.runJob(job);[/p][p=30, 2, left]4.1
*@8_} _1Q D!W0 配置Job[/p][p=30, 2, left]JobConf是用户描述一个job的接口。下面的信息是MapReduce过程中一些较关键的定制信息:[/p]
!V%h6K5d:^pV"j+k0
1N"F^)^KA4u3N0
?2WVn?*c Lx]0 [p=30, 2, left]4.2 JobClient.runJob():运行Job并分解输入数据集[/p]
B:cn&\7h,jz4k!?0 [p=30, 2, left]runJob()提交作业,如何等待返回的状态,根据状态返回不同的结构给客户端。[/p][p=30, 2, left]其中runJob()使用submitJob(job)方法向
-].}7uS]0 master提交作业。[/p][p=30, 2, left]submitJob(Job)方法的流程[/p]
s5a#MW1FtOI0
5i)x9iF$sq[ ER0
\8it~ E)Pm0 51Testing软件测试网*pz+W1?ZWg
0`"_f^1Z,l'F$KB!g&A0 [p=30, 2, left]一个MapReduce的Job会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。这些信息会统一打包到jobFile的jar中。[/p]
^#R}*j.hfzY Z0
Z%RB V_k%S0
S2p6u3^,GSi:w2{0 [p=30, 2, left]向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。 [/p][p=30, 2, left]job.xml:
9JVd1FW0oq!h0 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。51Testing软件测试网?NZ.B)u
[/p][p=30, 2, left]job.jar: jar包,里面包含了执行此任务需要的各种类,比如51Testing软件测试网5T7Y?Rf6f*AS(|xp
Mapper,Reducer等实现。
JAb7K T^EL.Y0 [/p][p=30, 2, left]job.split:
7T9m?`.`-y\1_0 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。51Testing软件测试网+I mUqPs
[/p][p=30, 2, left]这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性
uL,C;?^$h3b.Zk+W0 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文 件之后,51Testing软件测试网*Z"n V h8O ~
此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,等待返回状态,此时作业已经提交完成。[/p][p=30, 2, left]接下来转到JobTracker上执行。[/p][p=30, 2, left](事实上这里还涉及到一些相关的类与方法)[/p][p=30, 2, left]4.351Testing软件测试网Tz;k2])F+un&z.M H
提交Job[/p][p=30, 2, left]jobFile的提交过程是通过RPC(远程进程调用)模块来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口调用JobTracker的submitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。[/p][p=30, 2, left]JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。[/p][p=30, 2, left]与创建Job过程相关的类和方法如下图所示[/p]51Testing软件测试网5k2B%Dc"]G%DI `
(nL~$M v/d0 51Testing软件测试网g5p(H#X*h2|.tSy
JobTracker启动[/p][p=30, 2, left]JobTracker类中有一个main()函数,在软件启动的时候执行此main()函数启动JobTracker进程,main()中生成一个JobTracker的对象,然后通过tracker.offerService()语句启动服务,即启动一些线程,下面是几个主要的线程:[/p][p=30, 2, left]taskScheduler:一个抽象类,被JobTracker用于安排执行在TaskTrackers上的task任务,它使用一个或多个JobInProgressListeners接收jobs的通知。另外一个任务是调用JobInProgress.initTask()为job初始化tasks。启动,提交作业,设置配置参数,终止等方法。[/p]51Testing软件测试网Ic T8\(bj^
[p=30, 2, left]completedJobsStoreThread对应completedJobStatusStore;CompletedJobStatusStore类:把JobInProgress中的job信息存储到DFS中;提供一些读取状态信息的方法;是一个守护进程,用于删除DFS中的保存时间超过规定时间的job status删除,[/p]
M1ic!RQ*|%t*Z5?#o/}1S0 [p=30, 2, left]interTrackerServer,抽象类Server类型的实例。一个IPC (Inter-Process Communication,进程间通信)服务器,IPC调用一个以一个参数的形式调用Writable,然后返回一个Writable作为返回值,在某个端口上运行。提供了call,listener,responder,connection,handle类。包括start(),stop(),join(),getListenerAddress(),call()等方法。[/p][p=30, 2, left]这些线程启动之后,便可开始工作了。[/p]51Testing软件测试网m1Jj3F$I[BF?.d
51Testing软件测试网$pTnRN
51Testing软件测试网4H){'gM-e|l6L#P
[p=30, 2, left]job是统一由JobTracker来调度的,把具体的Task分发给各个TaskTracker节点来执行。下面来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。[/p]
%IT a8R:^0|3zx0 收到请求[/p][p=30, 2, left]当JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理和调度任务。JobInProgress在创建的时候会初始化一系列与任务有关的参数,调用到FileSystem,把在JobClient端上传的所有任务文件下载到本地的文件系统中的临时目录里。这其中包括上传的*.jar文件包、记录配置信息的xml、记录分割信息的文件。[/p][p=30, 2, left]5.2 JobTracker.JobInitThread
o9y2s;y1b4D0 通知初始化线程[/p][p=30, 2, left]JobTracker
$rYoh(`#?_0 中的**类EagerTaskInitializationListener负责任务Task的初始化。JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一个专门管理需要初始化的队列里,即一个list成员变量jobInitQueue里。resortInitQueue方法根据作业的优先级排序。然后调用notifyAll()函数,会唤起一个用于初始化job的线程JobInitThread来处理???。JobInitThread收到信号后即取出最靠前的job,即优先级别最高的job,调用TaskTrackerManager的initJob最终调用JobInProgress.initTasks()执行真正的初始化工作。[/p][p=30, 2, left]5.3 JobInProgress.initTasks()
+c'A)W0d8C`q#`'A0 初始化TaskInProgress[/p][p=30, 2, left]任务Task分两种: MapTask
B4g9]:s.g.??0 和reduceTask,它们的管理对象都是TaskInProgress51Testing软件测试网UG*H$g$af%mN
。[/p][p=30, 2, left]首先JobInProgress会创建Map的监控对象。在initTasks()函数里通过调用JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的host,这个会在RawSplit创建时通过FileSplit的getLocations()函数获取,该函数会调用DistributedFileSystem的getFileCacheHints()获得。当然如果是存储在本地文件系统中,即使用LocalFileSystem时当然只有一个location即“localhost”了。[/p][p=30, 2, left]创建这些TaskInProgress对象完毕后,initTasks()方法会通 过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。slave端的51Testing软件测试网X] J7LJ-w
TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。[/p][p=30, 2, left]其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法产生nonRunningReduces成员。[/p][p=30, 2, left]JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束。[/p]
W D!~ fr Zb5OLmQ0
PH^7an"We_0 [p=30, 2, left]5.3.2 JobTracker调度Job[/p][p=30, 2, left]hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量 jobQueueJobInProgressListener与上面说的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一个**类,它包含了一个映射,用来管理和调度所有的JobInProgress。jobAdded(job)同时会加入job到JobQueueJobInProgressListener中的映射。[/p][p=30, 2, left]JobQueueTaskScheduler最重要的方法是assignTasks51Testing软件测试网b8tG&Wdeb z[
,他实现了工作调度。具体实现:JobTracker
[T0}(huI r~ m ~l0 接到TaskTracker
1j%H}T9Ym0 的heartbeat()51Testing软件测试网^'F@xw:KY"R![%h
调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。首先它会检查
BbH#BQ&k0 TaskTracker
;I1R+^]5Lw0 端还可以做多少个
u^{'I$y0 map
0A[D0wa?*o.Y0 和
)u,o#B"Q Q ^&[8Tk:Y0 reduce51Testing软件测试网[#XthC3F};[)l
任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此51Testing软件测试网&yis,d|
TaskTracker
U)[4Q2yD?1i?0 分配一个51Testing软件测试网 ]u(e6H,S|0G
MapTask51Testing软件测试网8[z W [ E1v;d qK
或51Testing软件测试网$L3hVt5vyK
ReduceTask
7M2rc/n!f rOy:U j0 。产生Map51Testing软件测试网+l)P]%y+X+F;L-^w-h3X,m
任务使用51Testing软件测试网 DGB^ ~'a'P
JobInProgress51Testing软件测试网,Bg.?;bHv
的51Testing软件测试网[m!uje
obtainNewMapTask()51Testing软件测试网-wB-U lr(f
方法,实质上最后调用了
mHll vvC0 JobInProgress
(X'Q4X;b cH txO0 的
Y B!J$w cm%Vj&~(]0 findNewMapTask()51Testing软件测试网Lb%?cZ$V;k
访问
s]HQqL `%u0 nonRunningMapCache。[/p][p=30, 2, left]上面讲解任务初始化时说过,createCache()方法会在网络拓扑结构上挂上需要执行的TaskInProgress。findNewMapTask()从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同数据中心下的节点,直到找了maxLevel层结束。这样的话,在JobTracker给TaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。[/p][p=30, 2, left]最终生成一个Task类对象,该对象被封装在一个LanuchTaskAction51Testing软件测试网 P0uz}yhP
中,发回给TaskTracker,让它去执行任务。[/p][p=30, 2, left]产生
$K.i o9]O_C0 Reduce
3KoC;DZwwFm8@~i0 任务过程类似,使用51Testing软件测试网E&W9sX"I @Zh0s
JobInProgress.obtainNewReduceTask()51Testing软件测试网0v;g3H/D*ZC X
方法,实质上最后调用了
E5?o^"HT:`P0 JobInProgress
)wy"dI$c%mQB6DU0 [p=30, 2, left]概括[/p]
6c ` mD7N)c8I7E/R1}0 [p=30, 2, left]Hadoop包括hdfs与mapreduce两部分,在试用期期间我主要看了mapreduce部分,即hadoop执行作业的部分。[/p]
- [p=30, 2, left]mapreduce中几个主要的概念[/p]
[p=30, 2, left]mapreduce整体上可以分为这么几条执行的线索,jobclient,JobTracker与TaskTracker。[/p]
- [p=30, 2, left]JobClient[/p]
4Z(^;J_,|:B Mf)O+v0
- [p=30, 2, left]JobTracker[/p]
- [p=30, 2, left]TaskTracker[/p]
E|"yh%gu"C0 51Testing软件测试网 S8^#vl)Ga$Bv"o
[p=30, 2, left]下图简单的描述了三者之间的关系:[/p]
1@7D J"PY:UKA0
- [p=30, 2, left]数据结构[/p]
6e@b+j \'g0
0gg-E8oITOA0 [p=30, 2, left]2.2 TaskInProgress[/p][p=30, 2, left]JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。[/p]
f6i pV3oeW3Q:Z0 51Testing软件测试网 sDy)kZkH
[p=30, 2, left]2.3 MapTask和ReduceTask[/p][p=30, 2, left]一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。[/p]
I*W7Ra \`0
%@:?.y R1U0
- [p=30, 2, left]整体流程[/p]
HP)OxeD0 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空,51Testing软件测试网B Tj;y+F!c-L"H
则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程。slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。以下是简略示意图:[/p]51Testing软件测试网tg,d_Q{`P`
51Testing软件测试网 zi4k.^.bz6h`~'@4o
51Testing软件测试网s6AKvX%V
[p=30, 2, left]下图比较详细的解释了程序的流程:[/p]51Testing软件测试网-^i;NA6]]"l
%AHF4lk2`0
i.DsaS/iXc0
- [p=30, 2, left]Jobclient[/p]
读取hadoop配置[/p][p=30, 2, left]Job job = new Job(conf, "作业名称"); //
f k|N"m4Z?0[3X0 实例化一道作业[/p][p=30, 2, left]job.setMapperClass(Mapper类型);[/p][p=30, 2, left]job.setCombinerClass(Combiner类型);[/p][p=30, 2, left]job.setReducerClass(Reducer类型);[/p][p=30, 2, left]job.setOutputKeyClass(输出Key的类型);[/p][p=30, 2, left]job.setOutputValueClass(输出Value的类型);[/p][p=30, 2, left]FileInputFormat.addInputPath(job, new Path(输入hdfs路径));[/p][p=30, 2, left]FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径));[/p][p=30, 2, left]//
c c'Q [8YdT.a5Y0 其它初始化配置[/p][p=30, 2, left]JobClient.runJob(job);[/p][p=30, 2, left]4.1
*@8_} _1Q D!W0 配置Job[/p][p=30, 2, left]JobConf是用户描述一个job的接口。下面的信息是MapReduce过程中一些较关键的定制信息:[/p]
!V%h6K5d:^pV"j+k0
1N"F^)^KA4u3N0
?2WVn?*c Lx]0 [p=30, 2, left]4.2 JobClient.runJob():运行Job并分解输入数据集[/p]
B:cn&\7h,jz4k!?0 [p=30, 2, left]runJob()提交作业,如何等待返回的状态,根据状态返回不同的结构给客户端。[/p][p=30, 2, left]其中runJob()使用submitJob(job)方法向
-].}7uS]0 master提交作业。[/p][p=30, 2, left]submitJob(Job)方法的流程[/p]
s5a#MW1FtOI0
5i)x9iF$sq[ ER0
\8it~ E)Pm0 51Testing软件测试网*pz+W1?ZWg
0`"_f^1Z,l'F$KB!g&A0 [p=30, 2, left]一个MapReduce的Job会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。这些信息会统一打包到jobFile的jar中。[/p]
^#R}*j.hfzY Z0
Z%RB V_k%S0
S2p6u3^,GSi:w2{0 [p=30, 2, left]向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。 [/p][p=30, 2, left]job.xml:
9JVd1FW0oq!h0 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。51Testing软件测试网?NZ.B)u
[/p][p=30, 2, left]job.jar: jar包,里面包含了执行此任务需要的各种类,比如51Testing软件测试网5T7Y?Rf6f*AS(|xp
Mapper,Reducer等实现。
JAb7K T^EL.Y0 [/p][p=30, 2, left]job.split:
7T9m?`.`-y\1_0 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。51Testing软件测试网+I mUqPs
[/p][p=30, 2, left]这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性
uL,C;?^$h3b.Zk+W0 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文 件之后,51Testing软件测试网*Z"n V h8O ~
此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,等待返回状态,此时作业已经提交完成。[/p][p=30, 2, left]接下来转到JobTracker上执行。[/p][p=30, 2, left](事实上这里还涉及到一些相关的类与方法)[/p][p=30, 2, left]4.351Testing软件测试网Tz;k2])F+un&z.M H
提交Job[/p][p=30, 2, left]jobFile的提交过程是通过RPC(远程进程调用)模块来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口调用JobTracker的submitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。[/p][p=30, 2, left]JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。[/p][p=30, 2, left]与创建Job过程相关的类和方法如下图所示[/p]51Testing软件测试网5k2B%Dc"]G%DI `
(nL~$M v/d0 51Testing软件测试网g5p(H#X*h2|.tSy
- [p=30, 2, left]JobTracker[/p]
JobTracker启动[/p][p=30, 2, left]JobTracker类中有一个main()函数,在软件启动的时候执行此main()函数启动JobTracker进程,main()中生成一个JobTracker的对象,然后通过tracker.offerService()语句启动服务,即启动一些线程,下面是几个主要的线程:[/p][p=30, 2, left]taskScheduler:一个抽象类,被JobTracker用于安排执行在TaskTrackers上的task任务,它使用一个或多个JobInProgressListeners接收jobs的通知。另外一个任务是调用JobInProgress.initTask()为job初始化tasks。启动,提交作业,设置配置参数,终止等方法。[/p]51Testing软件测试网Ic T8\(bj^
[p=30, 2, left]completedJobsStoreThread对应completedJobStatusStore;CompletedJobStatusStore类:把JobInProgress中的job信息存储到DFS中;提供一些读取状态信息的方法;是一个守护进程,用于删除DFS中的保存时间超过规定时间的job status删除,[/p]
M1ic!RQ*|%t*Z5?#o/}1S0 [p=30, 2, left]interTrackerServer,抽象类Server类型的实例。一个IPC (Inter-Process Communication,进程间通信)服务器,IPC调用一个以一个参数的形式调用Writable,然后返回一个Writable作为返回值,在某个端口上运行。提供了call,listener,responder,connection,handle类。包括start(),stop(),join(),getListenerAddress(),call()等方法。[/p][p=30, 2, left]这些线程启动之后,便可开始工作了。[/p]51Testing软件测试网m1Jj3F$I[BF?.d
51Testing软件测试网$pTnRN
51Testing软件测试网4H){'gM-e|l6L#P
[p=30, 2, left]job是统一由JobTracker来调度的,把具体的Task分发给各个TaskTracker节点来执行。下面来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。[/p]
- [p=30, 2, left]JobTracker初始化Job[/p]
%IT a8R:^0|3zx0 收到请求[/p][p=30, 2, left]当JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理和调度任务。JobInProgress在创建的时候会初始化一系列与任务有关的参数,调用到FileSystem,把在JobClient端上传的所有任务文件下载到本地的文件系统中的临时目录里。这其中包括上传的*.jar文件包、记录配置信息的xml、记录分割信息的文件。[/p][p=30, 2, left]5.2 JobTracker.JobInitThread
o9y2s;y1b4D0 通知初始化线程[/p][p=30, 2, left]JobTracker
$rYoh(`#?_0 中的**类EagerTaskInitializationListener负责任务Task的初始化。JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一个专门管理需要初始化的队列里,即一个list成员变量jobInitQueue里。resortInitQueue方法根据作业的优先级排序。然后调用notifyAll()函数,会唤起一个用于初始化job的线程JobInitThread来处理???。JobInitThread收到信号后即取出最靠前的job,即优先级别最高的job,调用TaskTrackerManager的initJob最终调用JobInProgress.initTasks()执行真正的初始化工作。[/p][p=30, 2, left]5.3 JobInProgress.initTasks()
+c'A)W0d8C`q#`'A0 初始化TaskInProgress[/p][p=30, 2, left]任务Task分两种: MapTask
B4g9]:s.g.??0 和reduceTask,它们的管理对象都是TaskInProgress51Testing软件测试网UG*H$g$af%mN
。[/p][p=30, 2, left]首先JobInProgress会创建Map的监控对象。在initTasks()函数里通过调用JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的host,这个会在RawSplit创建时通过FileSplit的getLocations()函数获取,该函数会调用DistributedFileSystem的getFileCacheHints()获得。当然如果是存储在本地文件系统中,即使用LocalFileSystem时当然只有一个location即“localhost”了。[/p][p=30, 2, left]创建这些TaskInProgress对象完毕后,initTasks()方法会通 过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。slave端的51Testing软件测试网X] J7LJ-w
TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。[/p][p=30, 2, left]其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法产生nonRunningReduces成员。[/p][p=30, 2, left]JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束。[/p]
W D!~ fr Zb5OLmQ0
PH^7an"We_0 [p=30, 2, left]5.3.2 JobTracker调度Job[/p][p=30, 2, left]hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量 jobQueueJobInProgressListener与上面说的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一个**类,它包含了一个映射,用来管理和调度所有的JobInProgress。jobAdded(job)同时会加入job到JobQueueJobInProgressListener中的映射。[/p][p=30, 2, left]JobQueueTaskScheduler最重要的方法是assignTasks51Testing软件测试网b8tG&Wdeb z[
,他实现了工作调度。具体实现:JobTracker
[T0}(huI r~ m ~l0 接到TaskTracker
1j%H}T9Ym0 的heartbeat()51Testing软件测试网^'F@xw:KY"R![%h
调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。首先它会检查
BbH#BQ&k0 TaskTracker
;I1R+^]5Lw0 端还可以做多少个
u^{'I$y0 map
0A[D0wa?*o.Y0 和
)u,o#B"Q Q ^&[8Tk:Y0 reduce51Testing软件测试网[#XthC3F};[)l
任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此51Testing软件测试网&yis,d|
TaskTracker
U)[4Q2yD?1i?0 分配一个51Testing软件测试网 ]u(e6H,S|0G
MapTask51Testing软件测试网8[z W [ E1v;d qK
或51Testing软件测试网$L3hVt5vyK
ReduceTask
7M2rc/n!f rOy:U j0 。产生Map51Testing软件测试网+l)P]%y+X+F;L-^w-h3X,m
任务使用51Testing软件测试网 DGB^ ~'a'P
JobInProgress51Testing软件测试网,Bg.?;bHv
的51Testing软件测试网[m!uje
obtainNewMapTask()51Testing软件测试网-wB-U lr(f
方法,实质上最后调用了
mHll vvC0 JobInProgress
(X'Q4X;b cH txO0 的
Y B!J$w cm%Vj&~(]0 findNewMapTask()51Testing软件测试网Lb%?cZ$V;k
访问
s]HQqL `%u0 nonRunningMapCache。[/p][p=30, 2, left]上面讲解任务初始化时说过,createCache()方法会在网络拓扑结构上挂上需要执行的TaskInProgress。findNewMapTask()从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同数据中心下的节点,直到找了maxLevel层结束。这样的话,在JobTracker给TaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。[/p][p=30, 2, left]最终生成一个Task类对象,该对象被封装在一个LanuchTaskAction51Testing软件测试网 P0uz}yhP
中,发回给TaskTracker,让它去执行任务。[/p][p=30, 2, left]产生
$K.i o9]O_C0 Reduce
3KoC;DZwwFm8@~i0 任务过程类似,使用51Testing软件测试网E&W9sX"I @Zh0s
JobInProgress.obtainNewReduceTask()51Testing软件测试网0v;g3H/D*ZC X
方法,实质上最后调用了
E5?o^"HT:`P0 JobInProgress