偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

聊聊Flink:這次把Flink的觸發(fā)器(Trigger)、移除器(Evictor)講透

開發(fā) 前端
窗口的計(jì)算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對應(yīng)的窗口觸發(fā)機(jī)制,都有一個默認(rèn)的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時候來觸發(fā)計(jì)算。flink內(nèi)部定義多種觸發(fā)器,每種觸發(fā)器對應(yīng)于不同的WindowAssigner。

一、觸發(fā)器(Trigger)

Trigger 決定了一個窗口(由 window assigner 定義)何時可以被 window function 處理。每個 WindowAssigner 都有一個默認(rèn)的 Trigger。如果默認(rèn) trigger 無法滿足你的需要,你可以在 trigger(…) 調(diào)用中指定自定義的 trigger。

1.1 Flink中預(yù)置的Trigger

窗口的計(jì)算觸發(fā)依賴于窗口觸發(fā)器,每種類型的窗口都有對應(yīng)的窗口觸發(fā)機(jī)制,都有一個默認(rèn)的窗口觸發(fā)器,觸發(fā)器的作用就是去控制什么時候來觸發(fā)計(jì)算。flink內(nèi)部定義多種觸發(fā)器,每種觸發(fā)器對應(yīng)于不同的WindowAssigner。常見的觸發(fā)器如下:

  • EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計(jì)算,如果EventTime大于Window EndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。
  • ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計(jì)算,否則窗口繼續(xù)等待。
  • ProcessingTimeoutTrigger:可以將任何觸發(fā)器轉(zhuǎn)變?yōu)槌瑫r觸發(fā)器。
  • ContinuousEventTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結(jié)束時間小于當(dāng)前EndTime觸發(fā)窗口計(jì)算。
  • ContinuousProcessingTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結(jié)束時間小于當(dāng)前ProcessTime觸發(fā)窗口計(jì)算。
  • CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過設(shè)定的闕值判斷是否觸發(fā)窗口計(jì)算。
  • DeltaTrigger:根據(jù)接入數(shù)據(jù)計(jì)算出來的Delta指標(biāo)是否超過指定的Threshold去判斷是否觸發(fā)窗口計(jì)算。
  • PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型的觸發(fā)器,計(jì)算完成后數(shù)據(jù)將被清理。
  • NeverTrigger:任何時候都不觸發(fā)窗口計(jì)算

1.2 Trigger的抽象類

Trigger 接口提供了五個方法來響應(yīng)不同的事件:

  • onElement() 方法在每個元素被加入窗口時調(diào)用。
  • onEventTime() 方法在注冊的 event-time timer 觸發(fā)時調(diào)用。
  • onProcessingTime() 方法在注冊的 processing-time timer 觸發(fā)時調(diào)用。
  • canMerge() 方法判斷是否可以合并。
  • onMerge() 方法與有狀態(tài)的 trigger 相關(guān)。該方法會在兩個窗口合并時, 將窗口對應(yīng) trigger 的狀態(tài)進(jìn)行合并,比如使用會話窗口時。
  • clear() 方法處理在對應(yīng)窗口被移除時所需的邏輯。

觸發(fā)器接口的源碼如下:

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    /**
     * Called for every element that gets added to a pane. The result of this will determine whether
     * the pane is evaluated to emit results.
     *
     * @param element The element that arrived.
     * @param timestamp The timestamp of the element that arrived.
     * @param window The window to which the element is being added.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when a processing-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Called when an event-time timer that was set using the trigger context fires.
     *
     * @param time The timestamp at which the timer fired.
     * @param window The window for which the timer fired.
     * @param ctx A context object that can be used to register timer callbacks.
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
            throws Exception;

    /**
     * Returns true if this trigger supports merging of trigger state and can therefore be used with
     * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
     *
     * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
     * OnMergeContext)}
     */
    public boolean canMerge() {
        return false;
    }

    /**
     * Called when several windows have been merged into one window by the {@link
     * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
     *
     * @param window The new window that results from the merge.
     * @param ctx A context object that can be used to register timer callbacks and access state.
     */
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    /**
     * Clears any state that the trigger might still hold for the given window. This is called when
     * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
     * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
     * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {
        // ...
    }

    /**
     * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,
     * OnMergeContext)}.
     */
    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(
                StateDescriptor<S, ?> stateDescriptor);
    }
}

關(guān)于上述方法,需要注意三件事:

(1)前三個方法返回TriggerResult枚舉類型,其包含四個枚舉值:

  • CONTINUE:表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計(jì)算,也不刪除元素。
  • FIRE:觸發(fā)窗口計(jì)算,但是保留窗口元素。
  • PURGE:不觸發(fā)窗口計(jì)算,丟棄窗口,并且刪除窗口的元素。
  • FIRE_AND_PURGE:觸發(fā)窗口計(jì)算,輸出結(jié)果,然后將窗口中的數(shù)據(jù)和窗口進(jìn)行清除。

源碼如下:

public enum TriggerResult {

    // 不觸發(fā),也不刪除元素
    CONTINUE(false, false),

    // 觸發(fā)窗口,窗口出發(fā)后刪除窗口中的元素
    FIRE_AND_PURGE(true, true),

    // 觸發(fā)窗口,但是保留窗口元素
    FIRE(true, false),

    // 不觸發(fā)窗口,丟棄窗口,并且刪除窗口的元素
    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}

(2) 每一個窗口分配器都擁有一個屬于自己的 Trigger,Trigger上會有定時器,用來決定一個窗口何時能夠被計(jì)算或清除,當(dāng)定時器觸發(fā)后,會調(diào)用對應(yīng)的回調(diào)返回,返回TriggerResult。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個Trigger的調(diào)用結(jié)果只是fire的話,那么會計(jì)算窗口并保留窗口原樣,也就是說窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計(jì)算。一個窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會一直占用著內(nèi)存。

1.3 ProcessingTimeTrigger源碼分析

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the time is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the time is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    /** Creates a new trigger that fires once system time passes the end of the window. */
    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數(shù)是window.maxTimestamp(),也就是窗口的最終時間,當(dāng)時間到達(dá)這個窗口最終時間,定時器觸發(fā)并調(diào)用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發(fā)窗口中數(shù)據(jù)的計(jì)算,但是會保留窗口元素。

需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達(dá)的時候觸發(fā)窗口函數(shù)的計(jì)算,計(jì)算完成后并不會清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實(shí)際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數(shù)據(jù)進(jìn)行清除。

EventTimeTriggerr在onElement設(shè)置的定時器:

圖片圖片

EventTime通過registerEventTimeTimer注冊定時器,在內(nèi)部Watermark達(dá)到或超過Timer設(shè)定的時間戳?xí)r觸發(fā)。

二、移除器(Evictor)

2.1 Evictor扮演的角色

圖片圖片

當(dāng)一個元素進(jìn)入stream中之后,一般要經(jīng)歷Window(開窗)、Trigger(觸發(fā)器)、Evitor(移除器)、Windowfunction(窗口計(jì)算操作),具體過程如下:


  • Window中的WindowAssigner(窗口分配器)定義了數(shù)據(jù)應(yīng)該被分配到哪個窗口中,每一個 WindowAssigner都會有一個默認(rèn)的Trigger,如果用戶在代碼中指定了窗口的trigger,默認(rèn)的 trigger 將會被覆蓋。
  • Trigger上會有定時器,用來決定一個窗口何時能夠被計(jì)算或清除。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個Trigger的調(diào)用結(jié)果只是fire的話,那么會計(jì)算窗口并保留窗口原樣,也就是說窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計(jì)算。一個窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會一直占用著內(nèi)存。
  • 當(dāng)Trigger fire了,窗口中的元素集合就會交給Evictor(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表,并決定最先進(jìn)入窗口的多少個元素需要被移除。剩余的元素會交給用戶指定的函數(shù)進(jìn)行窗口的計(jì)算。如果沒有 Evictor 的話,窗口中的所有元素會一起交給WindowFunction進(jìn)行計(jì)算。
  • WindowFunction收到了窗口的元素(可能經(jīng)過了 Evictor 的過濾),并計(jì)算出窗口的結(jié)果值,并發(fā)送給下游。窗口計(jì)算操作有很多,比如預(yù)定義的sum(),min(),max(),還有 ReduceFunction,WindowFunction。WindowFunction 是最通用的計(jì)算函數(shù),其他的預(yù)定義的函數(shù)基本都是基于該函數(shù)實(shí)現(xiàn)的。

現(xiàn)在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪個位置,讓我們繼續(xù)看為何使用Evictor。

Evictor接口定義如下:

圖片圖片

evictBefore()包含要在窗口函數(shù)之前應(yīng)用的清除邏輯,而evictAfter()包含要在窗口函數(shù)之后應(yīng)用的清除邏輯。應(yīng)用窗口函數(shù)之前清除的元素將不會被窗口函數(shù)處理。

窗格是具有相同Key和相同窗口的元素組成的桶,即同一個窗口中相同Key的元素一定屬于同一個窗格。一個元素可以在多個窗格中(當(dāng)一個元素被分配給多個窗口時),這些窗格都有自己的清除器實(shí)例。

注:window默認(rèn)沒有evictor,一旦把window指定Evictor,該window會由EvictWindowOperator類來負(fù)責(zé)操作。

2.2 Flink內(nèi)置的Evitor

  • CountEvictor:保留窗口中用戶指定的元素?cái)?shù)量,并丟棄窗口緩沖區(qū)剩余的元素。
  • DeltaEvictor:依次計(jì)算窗口緩沖區(qū)中的最后一個元素與其余每個元素之間的delta值,若delta值大于等于指定的閾值,則該元素會被移除。使用DeltaEvictor清除器需要指定兩個參數(shù),一個是double類型的閾值;另一個是DeltaFunction接口的實(shí)例,DeltaFunction用于指定具體的delta值計(jì)算邏輯。
  • TimeEvictor:傳入一個以毫秒為單位的時間間隔參數(shù)(例如以size表示),對于給定的窗口,取窗口中元素的最大時間戳(例如以max表示),使用TimeEvictor清除器將刪除所有時間戳小于或等于max-size的元素(即清除從窗口開頭到指定的截止時間之間的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
    if (size <= maxCount) {
        // 小于最大數(shù)量,不做處理
        return;
    } else {
        int evictedCount = 0;
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
            iterator.next();
            evictedCount++;
            if (evictedCount > size - maxCount) {
                break;
            } else {
                // 移除前size - maxCount個元素,只剩下最后maxCount個元素
                iterator.remove();
            }
        }
    }
}

2.2.2 DeltaEvictor

DeltaEvictor通過計(jì)算DeltaFunction的值(依次傳入每個元素和最后一個元素),并將其與threshold進(jìn)行對比,如果DeltaFunction計(jì)算結(jié)果大于等于threshold,則該元素會被移除。DeltaEvictor的實(shí)現(xiàn)如下:

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {

    // 獲取最后一個元素
    TimestampedValue<T> lastElement = Iterables.getLast(elements);

    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
        TimestampedValue<T> element = iterator.next();
        // 依次計(jì)算每個元素和最后一個元素的delta值,同時和threshold的值進(jìn)行比較
        // 若計(jì)算結(jié)果大于threshold值或者是相等,則該元素會被移除
        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
            iterator.remove();
        }
    }
}

2.2.3 TimeEvictor

TimeEvictor以時間為判斷標(biāo)準(zhǔn),決定元素是否會被移除。TimeEvictor會獲取窗口中所有元素的最大時間戳currentTime,currentTime減去窗口大小(windowSize) 可得到能保留最久的元素的時間戳evictCutoff,然后再遍歷窗口中的元素,如果元素的時間戳小于evictCutoff,就執(zhí)行移除操作,否則不移除。具體邏輯如下圖所示:

TimeEvictor的代碼實(shí)現(xiàn)如下:

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {

    // 如果element沒有timestamp,直接返回
    if (!hasTimestamp(elements)) {
        return;
    }

    // 獲取elements中最大的時間戳(到來最晚的元素的時間)
    long currentTime = getMaxTimestamp(elements);
    // 截止時間為: 到來最晚的元素的時間 - 窗口大?。梢岳斫鉃楸A糇罱亩嗑玫脑兀?    long evictCutoff = currentTime - windowSize;

    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
        TimestampedValue<Object> record = iterator.next();

        // 清除所有時間戳小于截止時間的元素
        if (record.getTimestamp() <= evictCutoff) {
            iterator.remove();
        }
    }
}


責(zé)任編輯:武曉燕 來源: 老周聊架構(gòu)
相關(guān)推薦

2010-05-04 09:44:12

Oracle Trig

2024-02-27 08:05:32

Flink分區(qū)機(jī)制數(shù)據(jù)傳輸

2011-05-20 14:06:25

Oracle觸發(fā)器

2024-01-29 08:07:42

FlinkYARN架構(gòu)

2024-04-09 07:50:59

Flink語義Watermark

2011-04-14 13:54:22

Oracle觸發(fā)器

2011-05-19 14:29:49

Oracle觸發(fā)器語法

2010-10-12 10:04:15

MySQL觸發(fā)器

2010-05-31 18:06:07

MySQL 觸發(fā)器

2011-03-28 10:05:57

sql觸發(fā)器代碼

2009-09-18 14:31:33

CLR觸發(fā)器

2009-04-26 22:27:54

觸發(fā)器密碼修改數(shù)據(jù)庫

2009-10-22 17:18:20

CLR觸發(fā)器

2009-04-07 13:56:03

SQL Server觸發(fā)器實(shí)例

2010-10-11 14:52:43

Mysql觸發(fā)器

2010-05-18 15:36:44

MySQL觸發(fā)器

2009-11-18 13:15:06

Oracle觸發(fā)器

2021-07-30 10:33:57

MySQL觸發(fā)器數(shù)據(jù)

2011-03-03 09:30:24

downmoonsql登錄觸發(fā)器

2010-10-12 10:24:58

mysql觸發(fā)器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號