240 发简信
IP属地:山东
  • 求这个一站式辅助功能解决工具,感谢感谢!!

    Android:辅助功能之自动抢红包

    hi大家好。新年又来了,微信群里又是各种红包横飞。作为技术人员的我们却大可不必担心一不小心,手速慢了点,又错过了几十万。我们可以通过安卓的辅助功能来实现自己的微信自动抢红包,...

  • ```
    public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {

    @Nullable private CheckpointBarrierHandler checkpointBarrierHandler;

    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();
    ```
    大概是在这里吧,OneInputStreamTask代表一个阶段,初始化了一个final WatermarkGauge,每个阶段共享一个WatermarkGauge对象,刷新了volatile currentWatermark,各个线程里的channel立马就拿到了最新的的值。就达到了`flush所有channel上的watermark`的目的。

    watermark原理之watermark的下发

    watermark 如何下发的? 一、 数据读取 A. AbstractStreamTaskNetworkInput:该类是用于读取上游数据 对象类别 含义:StreamS...

  • ```
    @Override
    public void emitWatermark(Watermark watermark) throws Exception {
    // flush水印
    watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
    // 下发水印
    operator.processWatermark(watermark);
    }
    ```
    ```
    private volatile long currentWatermark = 1657898217001L;

    public void setCurrentWatermark(long watermark) {
    currentWatermark = watermark;
    }
    ```
    flush 水印即是更新这里的currentWatermark,它是volatile,各线程立马可见。这里是在做UI统计的时候会调用到,但是该phase的各channel是怎么刷新的,说实话,还是不太明白,但它确实做到了。

    watermark原理之watermark的下发

    watermark 如何下发的? 一、 数据读取 A. AbstractStreamTaskNetworkInput:该类是用于读取上游数据 对象类别 含义:StreamS...

  • 120
    watermark原理之watermark的下发

    watermark 如何下发的? 一、 数据读取 A. AbstractStreamTaskNetworkInput:该类是用于读取上游数据 对象类别 含义:StreamS...

  • ```
    // 因为当前所有channel空闲,并且channel都无法继续更新水印,应该flush所有channel上的watermark,
    // 意味着我们设置channel 中最大的watermark作为新水印
    // 因为在其他变为idle的时候仍然会下发min watermark,所以只需要在最后一个channel变成idle且等于上次min watermark的时候flush
    if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
    findAndOutputMaxWatermarkAcrossAllChannels(output);
    }
    ```
    写得很好。但对于这段代码,只看到当所有通道都空闲后,取最大水印下发,但并没有看到对当前所有通道进行flush的动作。

    watermark原理之watermark的下发

    watermark 如何下发的? 一、 数据读取 A. AbstractStreamTaskNetworkInput:该类是用于读取上游数据 对象类别 含义:StreamS...

  • private List<People> sameList(List<People> oldArrayList, List<People> newArrayList) {
    List<People> resultList = newArrayList.stream()
    .filter(item -> oldArrayList.stream().map(e -> e.getCode())
    .collect(Collectors.toList()).contains(item.getCode()))
    .collect(Collectors.toList());
    return resultList;
    }

    大哥,你这两个list如果都是上千的数据,一下就是几百万次循环,得把
    oldArrayList.stream().map(e -> e.getCode())
    .collect(Collectors.toList())
    单独拿出来

    两个对象List根据属性取交集和差集

    背景介绍 咸鱼君最近做了个需求, excel导入功能, 其中 需要对已导入条目的做“更新” 未导入的条目做“新增” 其余的做“删除” 细品需求 无非是对excel的数据和数据...

  • 简书吃相?

    “27岁离过婚和35岁未婚的女人,你会娶谁?”这个男人的回答惹怒整个朋友圈

    来源|妈妈网育儿(ID:mmwyuer) 27岁离过婚的女人,和35岁没结过婚的女人,你愿意娶谁? 有人做过一期街头采访,很多男士的回答,看得我三观震碎。 有男士说,当然要选...

  • 你哪个版本啊
    launcher.setConf(SparkLauncher.EXECUTOR_CORES, 16);
    这样写不会报错吗

    java中使用SparkLauncher提交spark应用

    将开发好的spark application(对于java/scala来说是jar)提交到spark集群执行的方式通常包括两种,一种是通常使用的spark submit脚本(...