除了从DataStream
操作的结果中获取主数据流之外,你还可以产生任意数量额外的侧输出结果流。侧输出结果流的数据类型不需要与主数据流的类型一致,不同侧输出流的类型也可以不同。当您想要拆分数据流时(通常必须复制流),然后从每个流过滤出您不想拥有的数据,此操作将非常有用。
当使用侧输出流时,你首先得定义一个OutputTag
,这个OutputTag
将用来标识一个侧输出流:
Java 代码:
// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
Scala代码:
val outputTag = OutputTag[String]("side-output")
注意,OutputTag
是根据侧输出流所包含的元素的类型来输入的。
数据发送到侧输出流只能从一个ProcessFunction
中发出,你可以使用Context
参数来发送数据到一个通过OutputTag
标记的侧输出流中:
Java 代码:
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(
Integer value,
Context ctx,
Collector<Integer> out) throws Exception {
// 将数据发送到常规输出中
out.collect(value);
// 将数据发送到侧输出中
ctx.output(outputTag, "sideout-" + String.valueOf(value));
}
});
Scala代码:
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")
val mainDataStream = input
.process(new ProcessFunction[Int, Int] {
override def processElement(
value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
// 将数据发送到常规输出中
out.collect(value)
// 将数据发送到侧输出中
ctx.output(outputTag, "sideout-" + String.valueOf(value))
}
})
你可以在DataStream
操作的结果中使用getSideOutput(OutputTag)
来获取侧输出,这里为您提供一个DataStream
类型,用于输出端输出流的结果:
Java 代码:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
Scala代码:
val outputTag = OutputTag[String]("side-output")
val mainDataStream = ...
val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)