1.基础:wordCount
2.三个重要自定义接口:partitioner、combiner、自定义排序(WritableComparator)
partitioner用于自定义maptask执行结果分区,按照分区结果启动相应数量reduce,默认使用对key进行hash的方式分区。(例子:对手机流量统计同时按照归属地进行分区)自定义一个partitioner继承抽象类:Partitioner然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class),即可在maptask处理数据时(将数据写入缓冲区)对数据进行自定义分区。
combiner用于处理maptask到reduceTask之间的中间结果,Combiner将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。使用combiner要注意不能破坏最终的结果,不适用于求平均值这种情况,具体问题具体分析。(例子:结合wordCount即可,可以发现每个mapTask执行结果变成了类似reduceTask执行结果<hello,n(n>1)>)。
自定义排序:比如需要以某个bean作为key并按照bean中的某个属性进行排序,需对这个bean实现WritableComparable接口,自定义排序逻辑。(对手机流量统计结果按照总流量大小进行排序输出)。
3.coding
3.1 两表join算法的实现
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
在Reduce端实现:
思路:
自定义join后的实体类型infoBean,包含两个表join后的属性,外加一个flag标识是哪个表的数据
mapTask:通过文件名判断是哪种数据(order还是product),切分行,赋值给infoBean,构造k-v键值对,key为join条件
reduceTask:可知一个productBean会对应多个orderBean,在reduce阶段,一次读一组key相同的数据,通过flag区分是哪种bean(orderBean必然是一组,productBean则是一个),对orderBean进行遍历,将所缺的product数据由productBean,set进去即可。
在Map端实现:解决数据倾斜的问题
根据join条件,比如有很大一部分pid分区后涌入一个reduce,而其他pid只有少数,却也涌入其他reduce,就会造成数据量大的reduce处理起来较慢,并发效率低的情况。解决方式是在Map端实现,适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。
思路:
先在mapper类中预先定义好小表,进行join------引入实际场景中的解决方案:一次加载数据库或者用distributedcache
在mapper类中重写 setup方法,该方法在map任务初始化时调用,在此处将小表读取放入本地缓存。在map方法中只读取大表,然后将小表和其关联即可,无需reduce。
指定需要缓存一个文件到所有的maptask运行节点工作目录的方法:
3.2 找出共同好友(difficult)
思路:
重点:好友关系是单向的,如何得到一组数据,以两个用户为key,value为两者的共同好友。
第一步 :map
读一行 A:B,C,D,F,E,O
输出 <B,A><C,A><D,A><F,A><E,A><O,A> (B在A的好友列表、C在A的好友列表,D在A的好友列表。。。)
第二步:reduce
拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......
输出:<C A,B,E,F,G…….> ——>C为key,value为,列表中有C作为好友的人,即,A,B,E,F,G…都有C作为好友。
第三步:map,以第二步输出作为输入
读入一行<C A,B,E,F,G…….>
输出<A-B,C><A-E,C><A-F,C>……还要把A,B,E,F,G…….先排序防止A-B和B-A这种情况
从而得到两两共同好友C
第四步:reduce
读入数据 <A-B,C><A-B,F><A-B,G>.......
输出: A-B C,F,G,.....
A-B为key,收集到key对应的所有value,就是这两两的共同好友
3.3 流量统计
1.对流量日志中的用户统计总上、下行流量
map:切分数据,对日志进行处理,将电话号码作为key,将上行流量、下行流量、总流量生成一个bean作为value。
reduce:一个key,得到一组bean,然后统计总流量。
2.统计流量且按照流量大小倒序排序
使用第一步的结果作为输入,使bean作为key,电话号码作为value,实现WritableComparable接口,自定义排序方法,这样就自动排序了。因为每个bean都是不同的,所以对于reduce来说,不存在一个key对应一组value,所以reduce一次只处理一个k-v,将v作为key,key作v,得到排序后的结果。
3.根据号码归属地对数据分区,将数据写到不同数据
实现方法:自定义Partitioner,实现getPartition方法,然后在客户端程序定义
job.setPartitionerClass(ProvincePartitioner.class);
自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(n);
这样就可以按照自定义方式对数据进行分区处理了
注意:如果reduceTask的数量>= getPartition的结果数 ,则会多产生几个空的输出文件part-r-000xx
如果 1<reduceTask的数量<getPartition的结果数 ,则有一部分分区数据无处安放,会Exception!!!
如果 reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000
3.4 web日志预处理
需求:对web访问日志中的各字段识别切分,去除日志中不合法的记录
定义一个流量数据的bean
定义一个数据校验函数,当数据不全或者数据中日志的请求状态字段为400,设置为非法记录。
使用mapper对数据进行处理,每次读取一行,bean为key,当bean不合法,不写入。