在flink的function类中传递参数,对于flink datastream和dataset是不同的,对于dataset,可以通过类构造函数、withParameters(Configuration)、全局参数、广播变量等方法,详细参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/。对于datastream,常用的是类构造函数、ParameterTool,参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html
下面一一进行介绍。
一、类构造函数
这个方法就是在要传递参数的方法所在类中,增加待参数的构造方法。
public class TestFlatMap extends RichFlatMapFunction<IN, OUT> {
/**
*
*/
private static final long serialVersionUID = 1L;
private String dc;
public TestFlatMap (String dc) {
// TODO Auto-generated constructor stub
this.dc = dc;
}
@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {
// TODO Auto-generated method stub
try {
// System.out.println(value);
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("flat map error, " + value);
}
}
}
二、ParameterTool
注册全局变量
ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在rich function中使用
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
}
}
三、withParameters(Configuration)
public class TestMap extends RichMapFunction<IN, OUT> {
/**
*
*/
private static final long serialVersionUID = 1L;
private String dc;
@Override
public void open(Configuration parameters) throws Exception {
dc = parameters.getString("param", "");
}
@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {
// TODO Auto-generated method stub
try {
// System.out.println(value);
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("flat map error, " + value);
}
}
}
外层引用
DataSet<Integer> toMap = env.fromElements(1, 2, 3);
Configuration config = new Configuration();
config.setInteger("param", "test");
toMap.map(new TestMap()).withParameters(config);