Flink对于CEP的大概实现

  1. CEP和状态机
  2. 状态机的表示和如何作用在流上

Flink中CEP的一般代码结构如下:

val input = env.fromElements(
  new Event(1, "barfoo", 1.0),
  new Event(2, "start", 2.0),
  new Event(3, "foobar", 3.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0),
  new SubEvent(6, "middle", 6.0, 2.0),
  new SubEvent(7, "bar", 3.0, 3.0),
  new Event(42, "42", 42.0),
  new Event(8, "end", 1.0)
)

val pattern: Pattern[Event, Event] = Pattern.begin[Event]("start")
  .where(new SimpleCondition[Event] {
    override def filter(e: Event): Boolean = {
      e.name.equals("start")
    }
  })
  .followedByAny("middle").subtype[SubEvent](classOf[SubEvent])
  .where(new SimpleCondition[SubEvent] {
    override def filter(e: SubEvent): Boolean = {
      e.name.equals("middle")
    }
  })
  .followedByAny("end")
  .where(new SimpleCondition[Event] {
    override def filter(e: Event): Boolean = {
      e.name.equals("end")
    }
  })

val patternStream = CEP.pattern(input, pattern)
val result = patternStream.process(
  new PatternProcessFunction[Event, String] {
    // 此处因为数据放在一个map里面了, 丧失了先后顺序需要特别注意
    override def processMatch(matchResult: util.Map[String, util.List[Event]],
      ctx: PatternProcessFunction.Context, out: Collector[String]): Unit = {
      val info = matchResult.asScala.map{ case (k, v) =>
        (k, v.asScala.mkString(","))
      }.mkString(";")

      out.collect(info)
    }
  }
)

result.print()
env.execute("cep demo")

从上面可以看出入口是

  1. 一个一般的DataStream
  2. 然后进过一个Pattern, 得到一个 PatternStream,
  3. 最后再通过调用 PatternStream#process 又变成一个一般的DataStream

1. PatternStream#process

现在我们具体看下process 到底做了什么

public <R> SingleOutputStreamOperator<R> process(
    final PatternProcessFunction<T, R> patternProcessFunction,
    final TypeInformation<R> outTypeInfo) {

  return builder.build(
    outTypeInfo,
    builder.clean(patternProcessFunction));
}
<OUT, K> SingleOutputStreamOperator<OUT> build(
  final TypeInformation<OUT> outTypeInfo,
  final PatternProcessFunction<IN, OUT> processFunction) {

  final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
  final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;

  final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
  final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);

  final CepOperator<IN, K, OUT> operator = new CepOperator<>(
    inputSerializer,
    isProcessingTime,
    nfaFactory,
    comparator,
    pattern.getAfterMatchSkipStrategy(),
    processFunction,
    lateDataOutputTag);

  final SingleOutputStreamOperator<OUT> patternStream;
  if (inputStream instanceof KeyedStream) {
    KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;

    patternStream = keyedStream.transform(
      "CepOperator",
      outTypeInfo,
      operator);
  } else {
    KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();

    patternStream = inputStream.keyBy(keySelector).transform(
      "GlobalCepOperator",
      outTypeInfo,
      operator
    ).forceNonParallel();
  }

  return patternStream;
}

从上面可以看出具体的计算其实还是封装进了CepOperator 里面了

2. CepOperator

数据存储对象:

private transient ValueState<NFAState> computationStates;
private transient MapState<Long, List<IN>> elementQueueState;
private transient SharedBuffer<IN> partialMatches;

对每个元素的处理情况:

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
  if (isProcessingTime) {
    if (comparator == null) {
        // there can be no out of order elements in processing time
      NFAState nfaState = getNFAState();
      long timestamp = getProcessingTimeService().getCurrentProcessingTime();
      advanceTime(nfaState, timestamp);
      processEvent(nfaState, element.getValue(), timestamp);
      updateNFA(nfaState);
    } else {
      long currentTime = timerService.currentProcessingTime();
      bufferEvent(element.getValue(), currentTime);
      // register a timer for the next millisecond to sort and emit buffered data
      timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentTime + 1);
    }
  } else {
    long timestamp = element.getTimestamp();
    IN value = element.getValue();
      // In event-time processing we assume correctness of the watermark.
      // Events with timestamp smaller than or equal with the last seen watermark are considered late.
      // Late events are put in a dedicated side output, if the user has specified one.

    if (timestamp > lastWatermark) {
        // we have an event with a valid timestamp, so
        // we buffer it until we receive the proper watermark.
      saveRegisterWatermarkTimer();
      bufferEvent(value, timestamp);
    } else if (lateDataOutputTag != null) {
      output.collect(lateDataOutputTag, element);
    }
  }
}

从上面可以看出当isProcessingTime && comparator == null 的时候, 会进行数据的及时处理

// 找出超时的元素
private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
  try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
    Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
    nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
    if (!timedOut.isEmpty()) {
      processTimedOutSequences(timedOut);
    }
  }
}

// 处理每条数据
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
  try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
    Collection<Map<String, List<IN>>> patterns =
    nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
    processMatchedSequences(patterns, timestamp);
  }
}

其他的都是调用 bufferEvent 并同时注册一个定时器, 来处理这些缓存起来的数据

bufferEvent 将数据都放入了elementQueueState

private void bufferEvent(IN event, long currentTime) throws Exception {
  List<IN> elementsForTimestamp =  elementQueueState.get(currentTime);
  if (elementsForTimestamp == null) {
    elementsForTimestamp = new ArrayList<>();
  }

  if (getExecutionConfig().isObjectReuseEnabled()) {
      // copy the StreamRecord so that it cannot be changed
    elementsForTimestamp.add(inputSerializer.copy(event));
  } else {
    elementsForTimestamp.add(event);
  }
  elementQueueState.put(currentTime, elementsForTimestamp);
}

又因为CepOperator继承了 Triggerable 并实现了 onEventTimeonProcessingTime, 所以上面的定时器触发的时候就可以调用这2个实现来处理数据了

private PriorityQueue<Long> getSortedTimestamps() throws Exception {
  PriorityQueue<Long> sortedTimestamps = new PriorityQueue<>();
  for (Long timestamp : elementQueueState.keys()) {
    sortedTimestamps.offer(timestamp);
  }
  return sortedTimestamps;
}

@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {

    // 1) get the queue of pending elements for the key and the corresponding NFA,
    // 2) process the pending elements in event time order and custom comparator if exists
    //    by feeding them in the NFA
    // 3) advance the time to the current watermark, so that expired patterns are discarded.
    // 4) update the stored state for the key, by only storing the new NFA and MapState iff they
    //    have state to be used later.
    // 5) update the last seen watermark.

    // STEP 1
  PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
  NFAState nfaState = getNFAState();

    // STEP 2
  while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
    long timestamp = sortedTimestamps.poll();
    advanceTime(nfaState, timestamp);
    try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
      elements.forEachOrdered(
        event -> {
          try {
            processEvent(nfaState, event, timestamp);
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        }
        );
    }
    elementQueueState.remove(timestamp);
  }

    // STEP 3
  advanceTime(nfaState, timerService.currentWatermark());

    // STEP 4
  updateNFA(nfaState);

  if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
    saveRegisterWatermarkTimer();
  }

    // STEP 5
  updateLastSeenWatermark(timerService.currentWatermark());
}

@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
    // 1) get the queue of pending elements for the key and the corresponding NFA,
    // 2) process the pending elements in process time order and custom comparator if exists
    //    by feeding them in the NFA
    // 3) update the stored state for the key, by only storing the new NFA and MapState iff they
    //    have state to be used later.

    // STEP 1
  PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
  NFAState nfa = getNFAState();

    // STEP 2
  while (!sortedTimestamps.isEmpty()) {
    long timestamp = sortedTimestamps.poll();
    advanceTime(nfa, timestamp);
    try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
      elements.forEachOrdered(
        event -> {
          try {
            processEvent(nfa, event, timestamp);
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        }
        );
    }
    elementQueueState.remove(timestamp);
  }

    // STEP 3
  updateNFA(nfa);
}

3. NFA

从上面的代码可以看出代码的核心处理都放在了NFA里面了

NFA的具体论文参见 Efficient Pattern Matching over Event Streams

对于开发人员来说我们需要关注NFA的大概实现逻辑和解决的核心问题就可以了

上面调用的NFA方法有2个:

  1. advanceTime
  2. process

3.1 NFACompiler

NFA的初始化使用到了NFACompiler

final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);

final CepOperator<IN, K, OUT> operator = new CepOperator<>(
  inputSerializer,
  isProcessingTime,
  nfaFactory,
  comparator,
  pattern.getAfterMatchSkipStrategy(),
  processFunction,
  lateDataOutputTag);

该类将pattern 进行处理, 得到一个NFAFactory 并将其传入了CepOperator, 而不是将pattern传入了进去

public static <T> NFAFactory<T> compileFactory(
  final Pattern<T, ?> pattern,
  boolean timeoutHandling) {
  if (pattern == null) {
      // return a factory for empty NFAs
    return new NFAFactoryImpl<>(0, Collections.<State<T>>emptyList(), timeoutHandling);
  } else {
    final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
    nfaFactoryCompiler.compileFactory();
    return new NFAFactoryImpl<>(nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
  }
}

compileFactory函数里面会真正将patternstates 关联起来, 这里的states 也会在下面初始化NFA的时候使用到, 并且不再变化

void compileFactory() {
  if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
    throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!");
  }
  checkPatternNameUniqueness();
  checkPatternSkipStrategy();

      // we're traversing the pattern from the end to the beginning --> the first state is the final state
  State<T> sinkState = createEndingState();
      // add all the normal states
  sinkState = createMiddleStates(sinkState);
      // add the beginning state
  createStartState(sinkState);
}

CepOperator#open里面创建NFA

@Override
public NFA<T> createNFA() {
    return new NFA<>(states, windowTime, timeoutHandling);
}

3.2 NFA#process

NFA 中它自身的成员变量 states (即上文提到的) 是静态的, 不变的, 而我们的代码会随着数据的不断变化整个缓存的数据会处于不同的状态这些状态的变动都是由NFAState 来维护的

由于新来的数据, 当这个数据进入状态机的不同地方, 会产生不同的后续状态, 因此需要用当前的数据来驱动当前状态机的所有状态, 此时真正的数据都在SharedBuffer里面, 并通过sharedBufferAccessor 来访问/修改

以下是代码的实现逻辑看起来很复杂, 具体的逻辑可以参看上面提到的论文,能有个大概的了解

private Collection<Map<String, List<T>>> doProcess(
  final SharedBufferAccessor<T> sharedBufferAccessor,
  final NFAState nfaState,
  final EventWrapper event,
  final AfterMatchSkipStrategy afterMatchSkipStrategy,
  final TimerService timerService) throws Exception {

  final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
  final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);

    // iterate over all current computations
  // 一个新的event进来, 需要遍历整个`partialMatches`, 而每个match都会根据当前的event
  // 会计算newComputationStates
  for (ComputationState computationState : nfaState.getPartialMatches()) {
    final Collection<ComputationState> newComputationStates = computeNextStates(
      sharedBufferAccessor,
      computationState,
      event,
      timerService);

    if (newComputationStates.size() != 1) {
      nfaState.setStateChanged();
    } else if (!newComputationStates.iterator().next().equals(computationState)) {
      nfaState.setStateChanged();
    }

      //delay adding new computation states in case a stop state is reached and we discard the path.
    final Collection<ComputationState> statesToRetain = new ArrayList<>();
      //if stop state reached in this path
    boolean shouldDiscardPath = false;
    for (final ComputationState newComputationState : newComputationStates) {

      if (isFinalState(newComputationState)) {
        potentialMatches.add(newComputationState);
      } else if (isStopState(newComputationState)) {
          //reached stop state. release entry for the stop state
        shouldDiscardPath = true;
        sharedBufferAccessor.releaseNode(newComputationState.getPreviousBufferEntry());
      } else {
          // add new computation state; it will be processed once the next event arrives
        statesToRetain.add(newComputationState);
      }
    }

    if (shouldDiscardPath) {
        // a stop state was reached in this branch. release branch which results in removing previous event from
        // the buffer
      for (final ComputationState state : statesToRetain) {
        sharedBufferAccessor.releaseNode(state.getPreviousBufferEntry());
      }
    } else {
      newPartialMatches.addAll(statesToRetain);
    }
  }

  if (!potentialMatches.isEmpty()) {
    nfaState.setStateChanged();
  }

  // 在这里会拿出真正匹配到的数据
  List<Map<String, List<T>>> result = new ArrayList<>();
  if (afterMatchSkipStrategy.isSkipStrategy()) {
    processMatchesAccordingToSkipStrategy(sharedBufferAccessor,
      nfaState,
      afterMatchSkipStrategy,
      potentialMatches,
      newPartialMatches,
      result);
  } else {
    for (ComputationState match : potentialMatches) {
      Map<String, List<T>> materializedMatch =
      sharedBufferAccessor.materializeMatch(
        sharedBufferAccessor.extractPatterns(
          match.getPreviousBufferEntry(),
          match.getVersion()).get(0)
        );

      result.add(materializedMatch);
      sharedBufferAccessor.releaseNode(match.getPreviousBufferEntry());
    }
  }
  // 修改nfaState的状态
  nfaState.setNewPartialMatches(newPartialMatches);

  return result;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容