Flink最佳实践

解析命令行参数和在Flink应用程序中传递参数

几乎所有的Flink应用程序,包括批处理和流处理,都依赖于外部配置参数,这些参数被用来指定输入和输出源(如路径或者地址),系统参数(并发数,运行时配置)和应用程序的可配参数(通常用在自定义函数中)。

Flink提供了一个简单的叫做ParameterToolutilityParameterTool提供了一些基础的工具来解决这些问题,当然你也可以不用这里所有描述的ParameterTool,其他框架如:Commons CLIargparse4j在Flink中也是支持的。

获取你的配置值并传入ParameterTool中

ParameterTool提供了一系列预定义的静态方法来读取配置信息,ParameterTool内部是一个Map<String, String>,所以很容易与你自己的配置形式相集成。

从.properties文件中获取

下面方法将去读取一个Properties文件,并返回若干key/value对:

String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
从命令行参数中获取

下面会从命令行中获取像--input hdfs:///mydata --elements 42这种形式的参数:

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..
从系统属性中获取

当启动一个JVM时,你可以给它传递一些系统属性如:-Dinput=hdfs:///mydata,你也可以用这些系统属性来初始化ParameterTool:

ParameterTool parameter = ParameterTool.fromSystemProperties();

在程序中使用ParameterTool的参数

既然我们已经从其他地方(方法如上)拿到了配置参数,我们就可以以各种形式来使用它们了。

直接从ParameterTool中获取

ParameterTool本身有方法来获取这些值:

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.

你可以在提交应用程序的客户端main()方法中直接使用这些方法返回的值,例如:你可以按如下方法来设置一个算子的并发度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

因为ParameterTool是可序列化的,所以你可以将它传递给函数本身;

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

之后在函数内部使用ParameterTool来获取命令行参数。

将参数以Configuration对象的形式传递给函数

下面的例子展示了如何将参数以Configuration对象的形式传递给用户自定义函数。

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())

在Tokenizer内部,Configuration对象可以通过open(Configuration conf)方法来获取;

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void open(Configuration parameters) throws Exception {
  parameters.getInteger("myInt", -1);
  // .. do
注册全局参数

在ExecutionConfig中注册为全作业参数的参数,可以被JobManager的web端以及用户自定义函数中以配置值的形式访问.
注册全局参数:

ParameterTool parameters = ParameterTool.fromArgs(args);

// 创建一个执行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在用户自定义的富函数中获取它们:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  ParameterTool parameters = (ParameterTool)
      getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  parameters.getRequired("input");
  // .. do more ..

命名大型的TupleX类型

对于有很多字段的数据类型我们建议采用POJOs(普通Java对象),而不是TupleX;同时,POJOs还可以用来为大型的Tuple命名。、
例如:
不使用:Tuple11<String, String, ..., String> var = new ...;
因为继承大型Tuple类来创建一个自定义的类型会比直接使用大型Tuple简单得多:

CustomType var = new ...;

public static class CustomType extends Tuple11<String, String, ..., String> {
    // constructor matching super
}

使用Logback而不是Log4j

注意:本文档适用于Flink 0.10之后的版本
Apache Flink使用slf4j来作为logging 抽象,也建议用户在他们自定义的方法中也使用slf4j。Slf4j是一个编译时的logging接口,在运行时可以使用不同的logging实现,例如:log4j或者Logback
默认情况下Flink依赖Log4j,本也描述了如何在Flink中使用Logback。有用户反馈它们通过本指南,可以使用Graylog来建立集中式日志收集。
使用下面的代码来获取一个logger实例:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
    // ...
在IDE之外运行Flink时使用Logback或者在一个Java应用程序中使用Logback

在所有情况下类会在一个由依赖管理器如Maven创建的classpath中执行,Flink会将log4j推到classpath中。
因此,你需要将log4j从Flink的依赖中剔除,下面的描述假定有一个跟Maven工程:
按如下方式来修改你的工程:

<dependencies>
  <!-- Add the two required logback dependencies -->
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.1.3</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.3</version>
  </dependency>

  <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
   Hadoop is logging to log4j! -->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.7</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
</dependencies>

在<dependencies>部分进行如下修改;
  1、将所有的log4j依赖从所有的Flink dependencies中剔除:这会导致Maven忽略Flink对log4j的传递依赖。
  2、将slf4j-log4j12 artifact从Flink依赖中剔除:因为我们将用slf4j到logback的绑定,所以我们需要删除slf4j到log4j的绑定。
  3、添加Logback依赖:logback-core和logback-classic
  4、添加log4j-over-slf4j依赖:log4j-over-slf4j是一种允许旧应用程序直接使用Log4j API来调用Slf4j接口的工具。Flink依赖Hadoop,而Hadoop是使用Log4j来记录日志的

请注意:你需要手动添加exclusion到所有你添加到pom文件中的Flink依赖。
你可能还需要检查一下其他非Flink依赖是否也是log4j的绑定,你可以使用mvn dependency:true来分析你的工程依赖。

当Flink运行在集群中是使用Logback

本指南适用于当Flink运行在YARN或者standalong集群时。

为了在Flink中使用Logback而不是Log4j,你需要将 log4j-1.2.xx.jar 和 sfl4j-log4j12-xxx.jar从 lib/目录中删除
接下来,你需要将下面的jar文件添加到 lib/目录下:
  logback-classic.jar
  logback-core.jar
  log4j-over-slf4j.jar

注意:当使用单任务的YARN集群时,你需要明确的设置 lib/目录!

将自定义logger的Flink提交到YARN中的命令是:./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容