MapReduce算法模式-数据聚合(最大值/最小时/总和)

美妙的函数.jpg
开会时间尽量写写这部分的内容

数据聚合

数据聚合的过程是在分析统计过程中十分重要的过程,包括熟悉的最大值/最小值,求平均值,中值等,在本文章中需要实现的是最大值/最小时,并计算总和的过程;

模拟的是usrid相同的情况下,时间上最早出现时间和最晚出现时间,并统计出现的频率;

输入文件的值:
123 -- 2017-10-22 12:33:78
123 -- 2017-12-21 12:22:30
234 -- 2017-08-22 11:17:20
2546 -- 2008-10-08 21:19:40
map的输出的中间结果:
123     2017-12-21 00:22:30 2017-12-21 00:22:30 1
123     2017-10-22 00:34:18 2017-10-22 00:34:18 1
234     2017-08-22 11:17:20 2017-08-22 11:17:20 1
2546    2008-10-08 21:19:40 2008-10-08 21:19:40 1
reduce的输出的结果值:
123     2017-10-22 00:34:18 2017-12-21 00:22:30 2
234     2017-08-22 11:17:20 2017-08-22 11:17:20 1
2546    2008-10-08 21:19:40 2008-10-08 21:19:40 1

总体的过程类为:

NumbericalObject:定义map和reduce的value的数据类型
NumbericalObjectMapper:统计生成key和value的值(value的值是构造生成的,保持starttime和endtime一致)
NUmbericalObjectReducer:统计max和min的值
NumbericalObjectMain:主函数

各个类的函数

NumbericalObject:

package Numberical.MaxMinMidObject;

import org.apache.hadoop.io.Writable;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/*
    数值聚合操作:
    模拟商品评论列表:
    原始数据-->用户ID:评论时间
    所要实现的通过用户ID,判断出所有数据对应的开始时间和结束时间,统计这个时间段内用户评论的次数
     */
public class NumbericalObject implements Writable{
    /*
    定义相应的时间变量
     */
    private Date startTime = new Date();
    private Date endTime = new Date();
    private Long count;

    private final static SimpleDateFormat simpletime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }

    public void setEndTime(Date endTime) {
        this.endTime = endTime;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    public Date getStartTime() {

        return startTime;
    }

    public Date getEndTime() {
        return endTime;
    }

    public Long getCount() {
        return count;
    }

    //序列化对象
    public void write(DataOutput dataOutput) throws IOException {
        //使用writeLong的原因是因为Date的值在序列话的时候可以转换成时间戳的形式,这样位数大于8位,应该用Long的形式
        dataOutput.writeLong(startTime.getTime());
        dataOutput.writeLong(endTime.getTime());
        dataOutput.writeLong(count);
    }
    //反序列化对象
    public void readFields(DataInput dataInput) throws IOException {
        startTime = new Date(dataInput.readLong());
        endTime = new Date(dataInput.readLong());
        count = dataInput.readLong();
    }
    //实现writeable接口必须实现write的方法,实现序列化和反序列化的过程,因为这个参数作为了map或reduce的数据的key或value的值了
    //需要实现toString的方法,这样才能把序列化的结果输出到文件中
    @Override
    public String toString(){
        String str = simpletime.format(startTime)+"\t"+simpletime.format(endTime)+"\t"+count;
        return str;
    }
}

NUmbericalObjectMapper:

package Numberical.MaxMinMidObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.StringTokenizer;

public class NumbericalObjectMapper {
    //删除相应的已经存在的目录
    /*
    Configuration:定义的配置文件
    Path:定义的文件的路径
     */
    public static void delete(Configuration conf, Path path) throws Exception{
        FileSystem fileSystem = FileSystem.get(new URI(path.toString()),conf);
        Path fspath=path;
        if(fileSystem.exists(fspath)){
            fileSystem.delete(fspath,true);
        }
    }

    public static class NumbericalMap extends Mapper<Object,Text,Text,NumbericalObject>{
        private static final Logger log =LoggerFactory.getLogger("LOG");
        private Text Numbericalkey = new Text();
        private NumbericalObject NumbericalValue = new NumbericalObject();
        protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            log.info("时间是:");
            String[] valueInfo = value.toString().split("--");

            Numbericalkey.set(valueInfo[0]);
            SimpleDateFormat data1 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            SimpleDateFormat data2 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

            try {
                NumbericalValue.setStartTime(data1.parse(valueInfo[1].toString()));
            }catch (Exception e){
                e.printStackTrace();
                }
            try {
                NumbericalValue.setEndTime(data2.parse(valueInfo[1].toString()));
            }catch (Exception e){
                e.printStackTrace();
            }
            NumbericalValue.setCount(Long.valueOf(1));

            context.write(Numbericalkey,NumbericalValue);

        }
    }
}

NumbericalObjectReducer:

package Numberical.MaxMinMidObject;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class NumbericalObjectReducer {

    public static class NumbericalCombine extends Reducer<Text,NumbericalObject,Text,NumbericalObject>{
        //先处理时间的字段的情况
        //key+value[starttime,endtime] -->key
        //key值要进行处理,min(starttime),max(endtime)
        //value[count++] --value

        private NumbericalObject numbericalObject = new NumbericalObject();
        protected void reduce(Text key,Iterable<NumbericalObject> value,Context context) throws IOException,InterruptedException{
            boolean flag =false;
            Long sum=0L;
            for(NumbericalObject val:value){
                if(flag == false){
                    numbericalObject.setStartTime(val.getStartTime());
                    numbericalObject.setEndTime(val.getEndTime());
                    numbericalObject.setCount(val.getCount());
                }
                flag=true;
                if (val.getStartTime().compareTo(numbericalObject.getStartTime())<0){
                    //min
                    numbericalObject.setStartTime(val.getStartTime());
                }
                if (val.getEndTime().compareTo(numbericalObject.getEndTime())>0){
                    //max
                    numbericalObject.setEndTime(val.getEndTime());
                }
                sum+=val.getCount();
                numbericalObject.setCount(sum);
            }
            context.write(key,numbericalObject);
        }
    }
}

NumericalObjectMain:

package Numberical.MaxMinMidObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NumbericalObjectMain {
    private static final Logger log = LoggerFactory.getLogger("LOG");
    public static void main(String[] args) throws Exception{

        Configuration conf =new Configuration();
        Path path1 =new Path("NumbericalData/numberical");
        Path path2 =new Path("outputNumberical");
        NumbericalObjectMapper.delete(conf,path2);
        Job job=Job.getInstance(conf,"Numberical");

        FileInputFormat.setInputPaths(job,path1);
        FileOutputFormat.setOutputPath(job,path2);

        job.setJarByClass(NumbericalObjectMain.class);

        job.setMapperClass(NumbericalObjectMapper.NumbericalMap.class);
        job.setCombinerClass(NumbericalObjectReducer.NumbericalCombine.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NumbericalObject.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NumbericalObject.class);

        job.waitForCompletion(true);
    }
}

相应的程序的目录和输出结果:


结果和输出.png
PS:最近时间挺忙的,加班累到狗,难得有时间静下来写点自己想写的东西,希望以后有时间多写很多好的文章!!!
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容