第6章 MapReduce应用开发-MRUnit编写

MRUnit是一个测试库,可以方便地测试Mapper与Reducer运行逻辑是否符合预期.

1、Mapper单元测试

如下为V1版本Mapper:

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (value == null) {
            return;
        }
        String line = value.toString();
        String year = line.substring(15, 19);
        int temperature = Integer.parseInt(line.substring(87, 92));
        context.write(new Text(year), new IntWritable(temperature));
    }
}
public class MaxTemperatureMapperTest {

    @Test
    public void processValidRecord() throws IOException {
        String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                              // year ^^^^
                "99999V0203201N00261220001CN9999999N9-00111+99999999999";
                                       //Temperature ^^^^^
        Text input = new Text(line);

        MaxTemperatureMapper mapper = new MaxTemperatureMapper();

        MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>();
        driver.withMapper(mapper)
            .withInput(new LongWritable(0), input)
            .withOutput(new Text("1950"), new IntWritable(-11))
            .runTest();
    }

    //@Ignore
    @Test
    public void ignoreMissingTemperatureRecord() throws IOException {
        String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                              // year ^^^^
                "99999V0203201N00261220001CN9999999N9+99991+99999999999";
                                       //Temperature ^^^^^
        Text input = new Text(line);

        MaxTemperatureMapper mapper = new MaxTemperatureMapper();

        MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>();
        driver.withMapper(mapper)
                .withInput(new LongWritable(0), input)
                .runTest();
    }

    @Test
    public void processMalformedTemperatureRecord() throws IOException {
        Text value = new Text("0335999999433181957042302005+37950+139117SAO  +0004" +
                                      // Year ^^^^
                "RJSN V02011359003150070356999999433201957010100005+353");
                                       // Temperature ^^^^^
        Text input = new Text(value);

        MaxTemperatureMapper mapper = new MaxTemperatureMapper();

        MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>();
        driver.withMapper(mapper)
                .withInput(new LongWritable(0), input)
                .withOutput(new Text("1957"), new IntWritable(1957))
                .runTest();
    }
}

根据withOutput调用次数,MapDriver能用来检查0、1、多个输出记录。+9999代表记录气温值缺失,由于mapper未考虑,ignoreMissingTemperatureRecord将测试不通过。

可以将Mapper中的记录解析逻辑抽取为一个解析类,如下为V2版本Mapper:

public class NcdcRecordParse {

    private static final int MISSING_TEMPERATURE = 9999;

    private String year;
    private int temperature;
    private String quality;

    public NcdcRecordParse() {
        super();
    }

    public NcdcRecordParse(String record) {
        parse(record);
    }

    public NcdcRecordParse(Text record) {
        parse(record);
    }

    public void parse(Text record) {
        parse(record.toString());
    }


    public void parse(String record) {
        if (record == null || record.trim().equals("")) {
            this.year = "";
            this.temperature = 0;
            this.quality = "0";
        }
        this.year = record.substring(15, 19);

        if (record.charAt(87) == '+') {
            this.temperature = Integer.parseInt(record.substring(88, 92));
        } else {
            this.temperature = Integer.parseInt(record.substring(87, 92));
        }

        this.quality = record.substring(92, 93);
    }

    public boolean isValidTemperature() {
        return this.temperature != 0 && this.temperature != MISSING_TEMPERATURE && quality.matches("[01459]");
    }

    public String getYear() {
        return year;
    }

    public int getTemperature() {
        return temperature;
    }
}
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final NcdcRecordParse ncdcRecordParse = new NcdcRecordParse();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        ncdcRecordParse.parse(value);
        if (ncdcRecordParse.isValidTemperature()) {
            String year = ncdcRecordParse.getYear();
            int temperature = ncdcRecordParse.getTemperature();
            context.write(new Text(year), new IntWritable(temperature));
        }
    }
}
@Test
    public void processingValidRecord() throws IOException {
        ...
    }

    @Test
    public void processPositiveTemperatureRecord() throws IOException {
        String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                              // Year ^^^^
                "99999V0203201N00261220001CN9999999N9+00111+99999999999";
                                      // Temperature ^^^^^
        Text input = new Text(line);
        MaxTemperatureMapper mapper = new MaxTemperatureMapper();
        new MapDriver<LongWritable, Text, Text, IntWritable>()
                .withMapper(mapper)
                .withInput(new LongWritable(0), input)
                .withOutput(new Text("1950"), new IntWritable(11))
                .runTest();
    }

    @Test
    public void ignoreMissingTemperatureRecord() throws IOException {
        ...
    }

    @Test
    public void ignoreSuspectRecord() throws IOException {
        String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                              // Year ^^^^
                "99999V0203201N00261220001CN9999999N9+12302+99999999999";
                                      // Temperature ^^^^^
        Text input = new Text(line);
        MaxTemperatureMapper mapper = new MaxTemperatureMapper();

        new MapDriver<LongWritable, Text, Text, IntWritable>()
                .withMapper(mapper)
                .withInput(new LongWritable(0), input)
                //.withOutput(new Text("1950"), new IntWritable(1230))
                .runTest();
    }
2、Reducer单元测试
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
       throws IOException, InterruptedException {
        int max = Integer.MIN_VALUE;
        for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {
            int value = it.next().get();
            if (max < value) {
                max = value;
            }
        }
        context.write(key, new IntWritable(max));
    }
}
    @Test
    public void returnsMaximumIntegerInValues() throws IOException {
        MaxTemperatureReducer reducer = new MaxTemperatureReducer();

        Text key = new Text("1950");
        List<IntWritable> values = new ArrayList() {
            {
                add(new IntWritable(30));
                add(new IntWritable(25));
                add(new IntWritable(33));
            }
        };

        ReduceDriver driver = new ReduceDriver();
        driver.withReducer(reducer)
                .withInput(key, values)
                .withOutput(key, new IntWritable(30))
                .runTest();
    }
3、Driver测试

任务启动入口:

public class MaxTemperatureDriver extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        if (strings.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n",
                    getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        Job job = Job.getInstance(getConf());

        job.setJobName("MaxTemperature");
        job.setJarByClass(MaxTemperatureDriver.class);

        FileInputFormat.addInputPath(job, new Path(strings[0]));
        FileOutputFormat.setOutputPath(job, new Path(strings[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
    }
}

1)使用本地作业运行器

@Test
    public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        conf.set("mapreduce.framework.name", "local");
        conf.setInt("mapreduce.task.io.sort.mb", 1);

        Path input = new Path("input/ncdc");
        Path output = new Path("output");

        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(output)) {
            fileSystem.delete(output, true);
        }

        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);

        int exitCode = driver.run(new String[]{input.toString(), output.toString()});
        Assert.assertThat(exitCode, CoreMatchers.is(0));

        checkOutput(conf, output);
    }

    static class OutputLogFilter implements PathFilter {

        @Override
        public boolean accept(Path path) {
            return !path.getName().startsWith("_");
        }
    }

    public void checkOutput(Configuration conf, Path output) throws IOException {
        FileSystem fileSystem = FileSystem.get(conf);
        FileStatus[] status = fileSystem.listStatus(output, new OutputLogFilter());
        Assert.assertThat(status.length, CoreMatchers.is(1));

        Path[] paths = FileUtil.stat2Paths(status);
        BufferedReader outputBufferedReader = asBufferedReader(fileSystem.open(paths[0]));
        BufferedReader expectedBufferedReader = asBufferedReader(getClass().getResourceAsStream("/expected.txt"));

        String line;
        while((line = expectedBufferedReader.readLine()) != null) {
            Assert.assertEquals(line, outputBufferedReader.readLine());
        }

        Assert.assertNull(outputBufferedReader.readLine());

        expectedBufferedReader.close();
        outputBufferedReader.close();
    }

    private BufferedReader asBufferedReader(InputStream is) {
        return new BufferedReader(new InputStreamReader(is));
    }

2)使用mini集群运行
mini集群上节点管理器启动不同的JVM来执行任务,调试会更困难。

public class MaxTemperatureDriverMiniTest extends ClusterMapReduceTestCase {
    @Override
    protected void setUp() throws Exception {
        if (System.getProperty("test.build.data") == null) {
            System.setProperty("test.build.data", "D:\\temp\\hadoop-testdata");
        }
        if (System.getProperty("hadoop.log.dir") == null) {
            System.setProperty("hadoop.log.dir", "D:\\temp\\hadoop-testdata");
        }

        super.setUp();
    }

    public void test() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path local = new Path("input/ncdc");
        Path input = getInputDir();
        Path output = getOutputDir();
        fileSystem.copyFromLocalFile(local, input);

        JobConf conf = createJobConf();
        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);
        int exitCode = ToolRunner.run(driver, new String[]{input.toString(), output.toString()});
        Assert.assertEquals(exitCode, 0);

        FileStatus[] status = fileSystem.listStatus(output, new OutputLogFilter());
        Assert.assertEquals(status.length, 1);

        Path[] paths = FileUtil.stat2Paths(status);
        BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(paths[0])));
        Assert.assertEquals(reader.readLine(), "1949\t111");
        Assert.assertEquals(reader.readLine(), "1950\t22");
        Assert.assertNull(reader.readLine());
        reader.close();
    }

    static class OutputLogFilter implements PathFilter {

        @Override
        public boolean accept(Path path) {
            return !path.getName().startsWith("_");
        }
    }
}

注意:windows环境下,需要winutils支持程序运行,可在github上下载,将bin替换本地hadoop/bin。winutils用于在windows环境运行Linux命令,如:
winutils.exe ls -F D:\...\dfs\data\data1。在win7下,MaxTemperatureDriverMiniTest运行失败,执行创建容器命令winutils.exe task create -m -1 -c -1 container_1559097916661_0001_01_000001 "cmd /c C:/.../default_container_executor.cmd",会报CreateTask error (5),搜索了下没找到解决方案,default_container_executor.cmd调用了同目录下launch_container.cmd脚本,launch_container.cmd启动JVM运行org.apache.hadoop.mapreduce.v2.app.MRAppMaster

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,612评论 5 471
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,345评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,625评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,022评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,974评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,227评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,688评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,358评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,490评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,402评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,446评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,126评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,721评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,802评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,013评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,504评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,080评论 2 341

推荐阅读更多精彩内容