解析命令行参数和在Flink应用程序中传递参数
几乎所有的Flink应用程序,包括批处理和流处理,都依赖于外部配置参数,这些参数被用来指定输入和输出源(如路径或者地址),系统参数(并发数,运行时配置)和应用程序的可配参数(通常用在自定义函数中)。
Flink提供了一个简单的叫做ParameterTool
的utility
,ParameterTool
提供了一些基础的工具来解决这些问题,当然你也可以不用这里所有描述的ParameterTool
,其他框架如:Commons CLI
和argparse4j
在Flink中也是支持的。
获取你的配置值并传入ParameterTool中
ParameterTool
提供了一系列预定义的静态方法来读取配置信息,ParameterTool
内部是一个Map<String, String>
,所以很容易与你自己的配置形式相集成。
从.properties文件中获取
下面方法将去读取一个Properties文件,并返回若干key/value对:
String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
从命令行参数中获取
下面会从命令行中获取像--input hdfs:///mydata --elements 42
这种形式的参数:
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
从系统属性中获取
当启动一个JVM时,你可以给它传递一些系统属性如:-Dinput=hdfs:///mydata
,你也可以用这些系统属性来初始化ParameterTool:
ParameterTool parameter = ParameterTool.fromSystemProperties();
在程序中使用ParameterTool的参数
既然我们已经从其他地方(方法如上)拿到了配置参数,我们就可以以各种形式来使用它们了。
直接从ParameterTool中获取
ParameterTool本身有方法来获取这些值:
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
你可以在提交应用程序的客户端main()方法中直接使用这些方法返回的值,例如:你可以按如下方法来设置一个算子的并发度:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
因为ParameterTool是可序列化的,所以你可以将它传递给函数本身;
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
之后在函数内部使用ParameterTool来获取命令行参数。
将参数以Configuration对象的形式传递给函数
下面的例子展示了如何将参数以Configuration对象的形式传递给用户自定义函数。
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
在Tokenizer内部,Configuration对象可以通过open(Configuration conf)方法来获取;
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do
注册全局参数
在ExecutionConfig中注册为全作业参数的参数,可以被JobManager的web端以及用户自定义函数中以配置值的形式访问.
注册全局参数:
ParameterTool parameters = ParameterTool.fromArgs(args);
// 创建一个执行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在用户自定义的富函数中获取它们:
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
命名大型的TupleX类型
对于有很多字段的数据类型我们建议采用POJOs
(普通Java
对象),而不是TupleX
;同时,POJOs
还可以用来为大型的Tuple
命名。、
例如:
不使用:Tuple11<String, String, ..., String> var = new ...;
因为继承大型Tuple类来创建一个自定义的类型会比直接使用大型Tuple简单得多:
CustomType var = new ...;
public static class CustomType extends Tuple11<String, String, ..., String> {
// constructor matching super
}
使用Logback而不是Log4j
注意:本文档适用于Flink 0.10之后的版本
Apache Flink使用slf4j来作为logging 抽象,也建议用户在他们自定义的方法中也使用slf4j。Slf4j是一个编译时的logging接口,在运行时可以使用不同的logging实现,例如:log4j或者Logback。
默认情况下Flink依赖Log4j,本也描述了如何在Flink中使用Logback。有用户反馈它们通过本指南,可以使用Graylog来建立集中式日志收集。
使用下面的代码来获取一个logger实例:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
在IDE之外运行Flink时使用Logback或者在一个Java应用程序中使用Logback
在所有情况下类会在一个由依赖管理器如Maven创建的classpath中执行,Flink会将log4j推到classpath中。
因此,你需要将log4j从Flink的依赖中剔除,下面的描述假定有一个跟Maven工程:
按如下方式来修改你的工程:
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
在<dependencies>部分进行如下修改;
1、将所有的log4j依赖从所有的Flink dependencies
中剔除:这会导致Maven忽略Flink对log4j的传递依赖。
2、将slf4j-log4j12 artifact从Flink依赖中剔除:因为我们将用slf4j到logback的绑定,所以我们需要删除slf4j到log4j的绑定。
3、添加Logback依赖:logback-core和logback-classic
4、添加log4j-over-slf4j依赖:log4j-over-slf4j是一种允许旧应用程序直接使用Log4j API来调用Slf4j接口的工具。Flink依赖Hadoop,而Hadoop是使用Log4j来记录日志的
请注意:你需要手动添加exclusion到所有你添加到pom文件中的Flink依赖。
你可能还需要检查一下其他非Flink依赖是否也是log4j的绑定,你可以使用mvn dependency:true
来分析你的工程依赖。
当Flink运行在集群中是使用Logback
本指南适用于当Flink运行在YARN或者standalong集群时。
为了在Flink中使用Logback而不是Log4j,你需要将 log4j-1.2.xx.jar 和 sfl4j-log4j12-xxx.jar从 lib/目录中删除
接下来,你需要将下面的jar文件添加到 lib/目录下:
logback-classic.jar
logback-core.jar
log4j-over-slf4j.jar
注意:当使用单任务的YARN集群时,你需要明确的设置 lib/目录!
将自定义logger的Flink提交到YARN中的命令是:./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>