DBMapper类:
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class DBMapper extends MapReduceBase implements Mapper<LongWritable, TeacherRecord, LongWritable, Text> { public void map(LongWritable key, TeacherRecord value, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { collector.collect(new LongWritable(value.id), new Text(value.toString())); } } |
DBReducer类:
package com.simope.mr.db; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class DBReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentRecord, Text>{ @Override public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<StudentRecord, Text> output, Reporter reporter) throws IOException { String[] InfoArr = values.next().toString().split("\t"); StudentRecord s = new StudentRecord(); // t.id = Integer.parseInt(InfoArr[0]); //id是自增长 s.name = InfoArr[0]; s.age = Integer.parseInt(InfoArr[1]); s.departmentID = Integer.parseInt(InfoArr[2]); output.collect(s, new Text(s.name)); } } |
DBJob类:(读取数据库表内容,并将数据写入hdfs文件中)数据库表-hdfs文件
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /** * @deprecated 读取数据库录入文件 * @author JimLy * @see 20160202 * */ public class DBJob { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DBJob.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String[] fields = {"id", "name", "age", "departmentID"}; //从my_hd数据库的teacher表查询数据 DBInputFormat.setInput(jobConf, TeacherRecord.class, "teacher", null, "id", fields); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient.runJob(jobConf); } } |
DB2Job类:(读取数据库表内容,并将数据写入hdfs文件中)数据库表-hdfs文件
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /** * @deprecated 读取数据库录入文件 * @author JimLy * @see 20160202 * */ public class DB2Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB2Job.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); // String[] fields = {"id", "name", "age", "departmentID"}; String inputQuery = "SELECT * FROM teacher where id != 4"; String inputCountQuery = "SELECT COUNT(1) FROM teacher where id != 4"; //从my_hd数据库的teacher表查询数据 DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient.runJob(jobConf); } } |