package com.ctgu.flink.project;
import com.ctgu.flink.entity.OrderEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class Flink_Sql_CEP_Order {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.readTextFile("data/OrderLog.csv");
DataStream<OrderEvent> orderDataStream = dataStream
.filter(line -> line.split(",").length >= 4)
.map(line -> {
String[] s = line.split(",");
return new OrderEvent(Long.valueOf(s[0]), s[1], s[2], Long.valueOf(s[3]) * 1000);})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
Pattern<OrderEvent, OrderEvent> payPattern =
Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return "create".equals(orderEvent.getEventType());
}
}).followedBy("pay").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return "pay".equals(orderEvent.getEventType());
}
}).within(Time.minutes(15));
PatternStream<OrderEvent> patternStream =
CEP.pattern(orderDataStream.keyBy(OrderEvent::getOrderId), payPattern);
OutputTag<Tuple4<Long, Long, Long, String>> outputTag =
new OutputTag<Tuple4<Long, Long, Long, String>>("pay-timeout") {};
SingleOutputStreamOperator<Tuple4<Long, Long, Long, String>> select =
patternStream.select(outputTag, new TimeoutSelectFunction(), new MyPatternSelectFunction());
select.print("pay");
select.getSideOutput(outputTag).print("timeout");
env.execute("Table SQL");
System.out.println("耗时: " + (System.currentTimeMillis() - start) / 1000);
}
public static class TimeoutSelectFunction
implements PatternTimeoutFunction<OrderEvent, Tuple4<Long, Long, Long, String>> {
@Override
public Tuple4<Long, Long, Long, String> timeout(Map<String, List<OrderEvent>> map, long l) throws Exception {
OrderEvent create = map.get("create").iterator().next();
return new Tuple4<>(create.getOrderId(), create.getTimestamp(), l, "timeout: " + l);
}
}
public static class MyPatternSelectFunction
implements PatternSelectFunction<OrderEvent, Tuple4<Long, Long, Long, String>> {
@Override
public Tuple4<Long, Long, Long, String> select(Map<String, List<OrderEvent>> map) throws Exception {
OrderEvent create = map.get("create").iterator().next();
OrderEvent pay = map.get("pay").get(0);
return new Tuple4<>(create.getOrderId(), create.getTimestamp(),
pay.getTimestamp(), "payed");
}
}
}
测试data
34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,create,,1558430846
34731,pay,35jue34we,1558430849
34732,create,,1558430852
34733,create,,1558430855
34734,create,,1558430859
34732,pay,32h3h4b4t,1558430861
34735,create,,1558430862
34734,pay,435kjb45d,1558430863
34733,pay,766lk5nk4,1558430864
34736,create,,1558430866
34737,create,,1558430868
34735,pay,5k432k4n,1558430869
34738,create,,1558430871
34739,create,,1558430874
34736,pay,435kjb45s,1558430875
34740,create,,1558430877
34741,create,,1558430882
34742,create,,1558430884
34743,create,,1558430885
34744,create,,1558430886
34745,create,,1558430889
34746,create,,1558430892
34747,create,,1558430893
34748,create,,1558430895
34746,pay,3243hr9h9,1558430895
34738,pay,43jhin3k4,1558430896
34745,pay,8xz09ddsaf,1558430896
34741,pay,88df0wn92,1558430896
34749,create,,1558430899
34747,pay,329d09f9f,1558430893
34743,pay,3hefw8jf,1558430900
34750,create,,1558430901
34737,pay,324jnd45s,1558430902
34751,create,,1558430902
34744,pay,499dfano2,1558430903
34752,create,,1558430905
34742,pay,435kjb4432,1558430906
34753,create,,1558430906
34739,pay,98x0f8asd,1558430907
34754,create,,1558430908
34755,create,,1558430911
34740,pay,392094j32,1558430913
34756,create,,1558430913
34753,pay,8c6vs8dd,1558430913
34757,create,,1558430915
34749,pay,324n0239,1558430916
34755,pay,8x0zvy8w3,1558430918
34758,create,,1558430921
34759,create,,1558430922
34752,pay,rnp435rk,1558430925
34760,create,,1558430926
34761,create,,1558430927
34762,create,,1558430933
34748,pay,809saf0ff,1558430934
34763,create,,1558430936
34764,create,,1558430938
34765,create,,1558430940
34751,pay,24309dsf,1558430941
34750,pay,sad90df3,1558430941
34761,pay,902dsqw45,1558430943
34766,create,,1558430944
34767,create,,1558430949
34768,pay,88snrn932,1558430950
34759,pay,9203kmfn,1558430950
34754,pay,3245nbo7,1558430950
34758,pay,32499fd9w,1558430950
34760,pay,390mf2398,1558430960
34757,pay,d8938034,1558430962
34762,pay,84309dw31r,1558430983
34763,pay,sddf9809ew,1558431068
34764,pay,832jksmd9,1558431079
34765,pay,m23sare32e,1558431082
34766,pay,92nr903msa,1558431095
34767,pay,sdafen9932,1558432021