hadoop与mysql数据库的那点事

发表于:2016-2-03 09:03

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:背着理想去流浪    来源:51Testing软件测试网采编

分享:
  DBMapper类:
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DBMapper extends MapReduceBase implements
Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
public void map(LongWritable key, TeacherRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter)
throws IOException {
collector.collect(new LongWritable(value.id),
new Text(value.toString()));
}
}
  DBReducer类:
package com.simope.mr.db;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class DBReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentRecord, Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<StudentRecord, Text> output, Reporter reporter)
throws IOException {
String[] InfoArr = values.next().toString().split("\t");
StudentRecord s = new StudentRecord();
//         t.id = Integer.parseInt(InfoArr[0]);  //id是自增长
s.name = InfoArr[0];
s.age = Integer.parseInt(InfoArr[1]);
s.departmentID = Integer.parseInt(InfoArr[2]);
output.collect(s, new Text(s.name));
}
}
  DBJob类:(读取数据库表内容,并将数据写入hdfs文件中)数据库表-hdfs文件
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
/**
* @deprecated 读取数据库录入文件
* @author JimLy
* @see 20160202
* */
public class DBJob {
public static void main(String[] args) throws IOException{
JobConf jobConf = new JobConf(DBJob.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db"));
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");
String[] fields = {"id", "name", "age", "departmentID"};
//从my_hd数据库的teacher表查询数据
DBInputFormat.setInput(jobConf, TeacherRecord.class, "teacher", null, "id", fields);
jobConf.setMapperClass(DBMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
JobClient.runJob(jobConf);
}
}
  DB2Job类:(读取数据库表内容,并将数据写入hdfs文件中)数据库表-hdfs文件
package com.simope.mr.db;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
/**
* @deprecated 读取数据库录入文件
* @author JimLy
* @see 20160202
* */
public class DB2Job {
public static void main(String[] args) throws IOException{
JobConf jobConf = new JobConf(DB2Job.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db"));
DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root");
//        String[] fields = {"id", "name", "age", "departmentID"};
String inputQuery = "SELECT * FROM teacher where id != 4";
String inputCountQuery = "SELECT COUNT(1) FROM teacher where id != 4";
//从my_hd数据库的teacher表查询数据
DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery);
jobConf.setMapperClass(DBMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
JobClient.runJob(jobConf);
}
}
32/3<123>
精选软件测试好文,快来阅读吧~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号