例如给出表child-parent表,要求输出grandchildren-grandparent表
输入:
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
输出:
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
用输入的单表构建两个表,即child-parent表和parent-child表,将两个表自然连接,就可以得到结果。程序的关键在于在map中构建出左右两表。
代码
package com.hy.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
public class SingleJoin {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String[] lines=value.toString().split(" ");
String parentName=lines[1];
String childName=lines[0];
context.write(new Text(parentName),new Text(""+1+" "+childName)); //右表
context.write(new Text(childName),new Text(""+2+" "+parentName)); //左表
}
}
public static class IntSumReducer
extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
List<String> grandchild=new ArrayList<String>();
List<String> grandparent=new ArrayList<String>();
for (Text val : values) {
String[] tmp=val.toString().split(" ");
if(tmp[0].equals("1"))
grandchild.add(tmp[1]);
else
grandparent.add(tmp[1]);
}
if(grandchild.size()!=0&&grandparent.size()!=0){
for (String gc:grandchild){
for(String gp:grandparent){
context.write(new Text(gc),new Text(gp));
}
}
}
}
}
public static void main(String[] args) throws Exception {
BasicConfigurator.configure();
Configuration conf = new Configuration();
conf.set("mapreduce.cluster.local.dir", "/Users/hy/hadoop/var");
Job job = Job.getInstance(conf, "Single join");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem fs = FileSystem.get(conf);
//如果输出文件夹存在,则删除
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]),true);
}
if(!job.waitForCompletion(true))
System.exit(1);
//输出结果
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(new Path(args[1]+"/part-r-00000"))));
try {
String line;
line=br.readLine();
while (line != null){
System.out.println(line);
line = br.readLine();
}
} finally {
br.close();
}
}
}