flink 学习笔记 — 基于 Flink 实时数仓的简单实践

回顾

    Flink 因其高吞吐、低延时、有状态、高容错的特性越来越受到数据从业者的青睐,它弥补了Storm、Spark 的很多不足。作为一个实时计算的框架,Flink 在实时数仓中发挥着越来越重要的作用。这里将以简单的案例,实现一个简单的数仓数据流。

简单分层

    我们知道,在离线数仓建设中,会把仓库结构分为不同的层次来存储不同的数据,离线数仓大体可以分为:ODS层、DWD层、DWS层、数据集市以及应用层。当然,数仓的建设的最终目的是提供稳定高可用的数据应用。这里实时数仓也可类比离线数仓做简单的分层。如下图,实时数仓同样可以分为:ODS层、DWD层、DWS层以及最终提供应用数据给各个服务使用。

简单架构.png
  • 如上图,通常我们的数据来至服务中的 SDK 打点、不同业务方的业务库、以及服务中的日志文件。这些数据通常通过某种方式发送到 Kafka 中,这些数据就是所说的 ODS 层。当然,这里 mysql 中的数据可以通过 Flink 直接进行读取处理。事实上,mysql 中的数据通常是较小的维表,可以放在 Flink 中的状态中。也可通过 Flink 进行分发到其他地方。
  • ODS 层的数据通过 Flink 计算之后,会被粗粒度的加工成所需要的明细层以及部分维度表。
  • 在明细数据的基础上,通过某种规则计算得到我们需要的汇总层,事实上,所谓汇总层,也就是接近应用层的应用数据。

简单案例实践

    这里通过一个简单的案例来说明一下整个实时流中的数据走向。需求是通过计算实时更新每个用户在应用中的点击量,然后将计算结果写入 redis 中。

  • 定义用户行为 POJO 类对象:
package bean;

import java.io.Serializable;

// 用户行为数据
public class UserAction implements Serializable {
    public String userId;
    public String articleId;
    public String action;


    public UserAction() {
    }

    public UserAction(String userId, String articleId, String action) {
        this.userId = userId;
        this.articleId = articleId;
        this.action = action;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getArticleId() {
        return articleId;
    }

    public void setArticleId(String articleId) {
        this.articleId = articleId;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    @Override
    public String toString() {
        return "UserAction{" +
                "userId='" + userId + '\'' +
                ", articleId='" + articleId + '\'' +
                ", action='" + action + '\'' +
                '}';
    }
}
  • 定义统计结果映射类对象:
package bean;

import java.io.Serializable;

// 统计结果
public class ActionStat implements Serializable {
    public String userId;
    public Long count;

    public ActionStat() {
    }

    public ActionStat(String userId, Long count) {
        this.userId = userId;
        this.count = count;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public Long getCount() {
        return count;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "ActionStat{" +
                "userId='" + userId + '\'' +
                ", count=" + count +
                '}';
    }
}

  • 定义配置,因为这里我们使用到 Kafka 和 Redis,因此只需要添加他们的配置就 ok 了。
package bean;

public class CONSTANT {

    // 连接 kafka 相关信息
    public static String BROKERS = "";
    public static String GROUPID = "";

    // 连接 redis
    public static String REDIS_HOST = "";
    public static int REDIS_PORT = 6379;
    public static String PASSWORD = "";

}

  • OK,在完成以上基础类对象之后,开始我们的程序启动类:
package task;

import bean.ActionStat;
import bean.CONSTANT;
import bean.UserAction;
import com.alibaba.fastjson.JSONObject;
import kafka.KafkaConsumer;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import util.MyRedisMapper;


// 处理数据并写入到 redis
public class SinkToRedis {

    public static void main(String[] args) throws Exception {
        // 初始化 flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 从 kafka 获取数据

        String brokers;
        String groupId;
        String topic;

        ParameterTool param = ParameterTool.fromArgs(args);
        if (param.equals(null)) {
            brokers = param.get("brokers");
            groupId = param.get("groupId");
            topic = param.get("topic");
        } else {
            brokers = "";
            groupId = "";
            topic = "";
        }

        // 消费 kafka,接入数据源
        DataStream<String> dataStream = env.addSource(KafkaConsumer.consumer(brokers, groupId, topic));

        SingleOutputStreamOperator<ActionStat> userStat = dataStream.map(new MyMap())
                .filter(user -> (user.userId != null && user.articleId != null && "AppClick".equals(user.action)))
                .keyBy("userId")
                .timeWindow(Time.milliseconds(5000))
                .aggregate(new AggDiY());

        userStat.print();

        // 初始化 redis 配置
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost(CONSTANT.REDIS_HOST)
                .setPort(CONSTANT.REDIS_PORT)
                .setPassword(CONSTANT.PASSWORD)
                .setDatabase(0)
                .build();

        userStat.addSink(new RedisSink<>(jedisPoolConfig, new MyRedisMapper(RedisCommand.SET)));

        env.execute("filnk-test");
    }

    // 按 userid 统计
    static class AggDiY implements AggregateFunction<UserAction, ActionStat, ActionStat> {

        Long count = 0L;

        @Override
        public ActionStat createAccumulator() {
            return new ActionStat();
        }

        @Override
        public ActionStat add(UserAction value, ActionStat accumulator) {
            accumulator.userId = value.userId;
            accumulator.count = ++count;
            return accumulator;
        }

        @Override
        public ActionStat getResult(ActionStat accumulator) {
            return accumulator;
        }

        @Override
        public ActionStat merge(ActionStat a, ActionStat b) {
            a.count = a.count + b.count;
            return a;
        }
    }

    // 按 userid 统计
    static class MyMap implements MapFunction<String, UserAction> {

        @Override
        public UserAction map(String value) throws Exception {

            JSONObject jsonObject = JSONObject.parseObject(value);
            JSONObject content = JSONObject.parseObject(jsonObject.getString("content"));
            if (jsonObject.getString("content") != null) {
                JSONObject properties = JSONObject.parseObject(content.getString("properties"));
                String userId = properties.getString("userId");
                String articleId = properties.getString("article_id");
                String action = content.getString("event");

                UserAction us = new UserAction(userId, articleId, action);
                return us;
            }
            return null;
        }
    }
}

    这里简单实现了从数据 ODS 层(也即是 Kafka)获取数据,通过 Flink 计算处理,将想要的结果最终写入 Redis 中供其他应用使用。当然,这里没有涉及到复杂的明细表以及维表的关联等操作。后续会不断的进行完善。

小结

    实时数仓是目前 Flink 应用较多的场景,理解整个数据流的走向是建设实时数仓的基础。在我们不断的使用中,会逐渐的用到明细表、维表,同时也会用到较为难懂的状态。当然,随着开发者能力的不断提升,完成这些亦会逐渐的得心应手。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • 1、需求背景 根据目前大数据这一块的发展,已经不局限于离线的分析,挖掘数据潜在的价值,数据的时效性最近几年变得刚需...
    愤怒的谜团阅读 9,351评论 6 39
  • 1 我觉得一开始我是想上班的,因为刚步入大学的时候,我就开始迫切的期待毕业的来临。 记得那年高考发挥失常,我去不了...
    闭壁清野阅读 1,363评论 3 4
  • 老师的观念很新鲜,父母的爱就是为分离而爱的,让孩子18岁能独立起来。 孩子是父母的复印件,原件出问题复印件就易出问...
    6_刘欣阅读 63评论 0 0
  • 每日累积: 1.诵读国学经典(1.《论语》颜渊第十二;2.《孝经》全文;3.《黄帝内经》五藏别论篇第十一;4.《易...
    蒲公英的蒲阅读 87评论 0 1
  • 在人人骑着共享单车出行,旅行使用airbnb订房,出门预约高级专车的时代,共享一词正在风口飘扬,以共享单车为例,一...
    草莓布丁猫阅读 115评论 0 0