背景:
前些天笔者面临这样一个问题,在hdfs上有一个目录存放着一些文件,定期要通过mr的api将这些文件转换为HBase的HFile。但是文件中可能会存在一些可以检测出来的脏数据,现在希望能够在生成HFile的同时,统计每次任务脏数据的比例,超过一定阈值的时候就发告警。
现有框架的处理方法与问题:
MR:
将生成HFile与统计脏数据视为两个MR任务,分别计算,两次提交。
Spark:
按照如下RDD血缘图,cache RDD1,并进行两次计算分别得到RDD2与RDD3,然后在将两个衍生出来的子RDD持久化到HDFS之类的存储系统上。
通过MR处理,HDFS上的文件将被读取两次,虽然在Spark的计算模型中,可以通过cache方法,将数据尽可能的放在内存中,但是在转化为RDD2与RDD3的过程中仍然会有两次内存IO(当然很有可能因为内存存不下,成为了磁盘IO)。哪种IO相对于CPU而言都慢了不止一个级别,因此能不能有一种方法像下面的图这样,将两种运算在上游放在一起(在一个map中同时统计脏数据与生成HBase的Cell),将不同的结果发送给不同的下游呢?
这样一来,两次不同的下游计算(往往是Reduce或者子RDD),可以绑定同一个上游计算(往往是Map或者父RDD),而上游计算又只会有一次IO。但是现有的计算框架,好像都不支持一个上游运算与多个下游运算绑定。
自己想到的解决办法:
下面以MR运算框架为例,谈谈自己的解决办法。现在的MR框架中,input dir、output dir与shuffle context是与一个job绑定的;我们可以将input dir与map绑定,output dir 和shuffle context与reduce绑定.
将现有map端的api修改为如下形式:
修改前(对应现在的Mapper类):
void map(KEYIN key, VALUEIN value, Context context);
void run(Context context);
void cleanup(Context context);
void setup(Context context)
修改后(不妨叫这个类为NewMapper):
void map(KEYIN key, VALUEIN value, List reduceContexts);
void run(Context context);
void cleanup(Context context);
void setup(Context context)
像笔者提到的问题可以用如下伪代码解决
public class CombinedMapper extendsNewMapper {
privateint dirtyrows = 0;
privateint totalrows = 0;
privateList reduceContexts = null;
void map(KEYIN key, VALUEIN value, List reduceContexts ) {
totalrows++;
if(dirtyrow(key)) {
dirtyrows++;
//脏数据就直接过滤了
return;
}
contextForHFile.write(*******);
}
void setup(Context context) {
reduceContexts= context.getReduceContexts();
contextForHFile = reduceContexts.getContextForHFile();
contextForCounter= reduceContexts.getContextForCounter();
}
void run(Context context) {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(),context.getCurrentValue(), reduceContexts);
}
contextForCounter.write(****);
cleanup(context);
}
}
Reducer端的代码无需任何改动,只是在初始化job的时候可能需要按照如下方法初始化job
job.addReducer(ReducerClass1.class).addReducer(ReducerClass2.class);
OutputFormat.setOutputdir(Reducer1.class,outputdir1);
OutputFormat.setOutputdir(Reducer2.class,outputdir2);
表示上游计算绑定多个下游计算。这样一来,可以在一次IO中完成两种不同的运算。
缺点:
笔者设计的对现有计算框架的补充,虽然可以减少IO,比如现在的场景是要对一个很大的数据集用两种完全不同的方法做分析,肯定是大有裨益的。但是缺点也是很明显的,那就是耦合度变大,上游的一个子模块失败可能影响整体计算,比如上面生成HFile的任务如果导致Map程序不能跑通就会导致统计脏数据的任务也失败。
可是耦合这种东西可能真的是“过犹不及”吧,一个零耦合的东西既没有存在的必要也没有存在的可能。而且笔者的意思并不是修改现有的计算框架api,而是增加一种api来支持想减少IO的场景,以前的代码是完全不用修改的。
后续:
从MR的观点来看,笔者做的补充可以说是让一个任务支持多种Reduce,但是其实MR计算框架对多种Map的支持也不是很好,比如我现在想同时处理TXT文件与parquet文件再生成HFile。但是spark可以通过像下图的做法,对不同的RDD做不同的transformation然后再将新的RDD做union来支持“一个任务,多种Map”。也许将来也可以通过类似的办法让一个MR任务支持多个Map输入吧。