话不多说,直接上手!
在myeclipse上安装好hadoop插件后直接开始
【1:】准备工作
新建java工程,导入jar,开启hadoop服务器
【2:】
mapreduce的Demo通常分为三个主要模块run(运行)、map(分)、reduce(合)
该案例以统计一个大文件的单词出现个数为例(以空格隔开)
WcMapper代码
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author mis
*输入数据类型
* KEYIN 键的输入 是LongWritable 基本固定的类型,
* VALUEIN 值的输入 是Text 基本固定的类型,
*输出数据类型
* KEYOUT 键的输出 是Text 根据业务的不同来决定,这里是统计单词次数,所以输出的key应该是字符串类型,
* VALUEOUT 值的输出 是IntWritable 根据业务的不同来决定,这里是统计单词次数,所以输出的value应该是int类型,
*/
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//每次调用map方法会传入split中的一行数据key:改行数据所在文件中的位置下标 value:这行数据
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException ,InterruptedException {
String line = value.toString();
StringTokenizer st = new StringTokenizer(line);//StringTokenizer默认按照空格来切
while(st.hasMoreTokens()){
String world = st.nextToken();
context.write(new Text(world), new IntWritable(1));//map输出
}
};
}
WcReduce代码
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author mis
*输入数据类型
* KEYIN 键的输入 是text reduce的键输入类型对应map的键输出类型,
* VALUEIN 值的输入 是IntWritable reduce的值输入类型对应map的值输出类型,
*输出数据类型
* KEYOUT 键的输出 是Text 根据业务的不同来决定,这里是统计单词次数,所以输出的key应该是字符串类型,
* VALUEOUT 值的输出 是IntWritable 根据业务的不同来决定,这里是统计单词次数,所以输出的value应该是int类型,
*/
public class WcReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
//重写reduce方法
protected void reduce(Text key,Iterable<IntWritable> iterator, Context context)
throws java.io.IOException ,InterruptedException {
int sum = 0 ;
for(IntWritable i:iterator){
sum+=i.get();
}
context.write(key, new IntWritable(sum));
};
}
JobRun代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobRun {
public static void main(String[] args){
Configuration conf = new Configuration();
//设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
conf.set("mapred.job.tracker", "hadoop01:9001");
try {
//新建一个Job工作
Job job = new Job(conf);
//设置运行类
job.setJarByClass(JobRun.class);
//设置要执行的mapper类(自己书写的)
job.setMapperClass(WcMapper.class);
//设置要执行的reduce类(自己书写的)
job.setReducerClass(WcReduce.class);
//设置输出key的类型
job.setMapOutputKeyClass(Text.class);
//设置输出value的类型
job.setMapOutputValueClass(IntWritable.class);
//设置ruduce任务的个数,默认个数为一个(一般reduce的个数越多效率越高)
//job.setNumReduceTasks(2);
//mapreduce 输入数据的文件/目录
FileInputFormat.addInputPath(job, new Path("/usr/input/wc/w_seer1111"));
//mapreduce 执行后输出的数据目录
FileOutputFormat.setOutputPath(job, new Path("/usr/output/all"));
//执行完毕退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行日志:
分析结果:
结果部分展示:
运行解析过程:
MR运行流程: