Apache Beam 中的迴圈計時器

Apache Beam 的基本元件讓您可以建立適用於各種使用案例的表達式資料管道。其中一個特定的使用案例是時間序列資料分析,其中跨視窗邊界的連續序列非常重要。當您處理這種類型的資料時,會出現一些有趣的挑戰,在本部落格中,我們將更詳細地探討其中一個挑戰,並使用「迴圈計時器」模式來使用 Timer API(部落格文章)。

在串流模式下使用 Beam,您可以取得資料串流並建立分析轉換,以產生資料結果。但是對於時間序列資料,資料的缺失是有用的資訊。那麼,我們如何在沒有資料的情況下產生結果呢?

讓我們用一個更具體的例子來說明需求。假設您有一個簡單的管道,每分鐘計算來自 IoT 裝置的事件數量。我們希望在特定時間間隔內沒有看到任何資料時產生值 0。那麼,為什麼這會變得棘手呢?嗯,建立一個在事件到達時計數事件的簡單管道很容易,但是當沒有事件時,就沒有什麼可以計數的了!

讓我們建立一個簡單的管道來使用

  // We will start our timer at 1 sec from the fixed upper boundary of our
  // minute window
  Instant now = Instant.parse("2000-01-01T00:00:59Z");

  // ----- Create some dummy data

  // Create 3 elements, incrementing by 1 minute and leaving a time gap between
  // element 2 and element 3
  TimestampedValue<KV<String, Integer>> time_1 =
    TimestampedValue.of(KV.of("Key_A", 1), now);

  TimestampedValue<KV<String, Integer>> time_2 =
    TimestampedValue.of(KV.of("Key_A", 2),
    now.plus(Duration.standardMinutes(1)));

  // No Value for start time + 2 mins
  TimestampedValue<KV<String, Integer>> time_3 =
    TimestampedValue.of(KV.of("Key_A", 3),
    now.plus(Duration.standardMinutes(3)));

  // Create pipeline
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
    .as(PipelineOptions.class);

  Pipeline p = Pipeline.create(options);

  // Apply a fixed window of duration 1 min and Sum the results
  p.apply(Create.timestamped(time_1, time_2, time_3))
   .apply(
      Window.<KV<String,Integer>>into(
FixedWindows.<Integer>of(Duration.standardMinutes(1))))
        .apply(Sum.integersPerKey())
        .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {

          @ProcessElement public void process(ProcessContext c) {
            LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());
          }
       }));

  p.run();

執行該管道將產生以下輸出

INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z

注意:輸出的順序不一致是預期的,但是 key-window 元組的計算是正確的。

如預期的那樣,我們在每個時間間隔視窗中都看到了輸出,該視窗的資料點的時間戳記介於視窗的最小值和最大值之間。時間戳記為 00:00:59、00:01:59 和 00:03:59 的資料點落入了以下時間間隔視窗中。

  • [00:00:00, 00:00:59.999)
  • [00:01:00, 00:01:59.999)
  • [00:03:00, 00:03:59.999)

但是,由於 00:02:00 和 00:02:59 之間沒有資料,因此沒有為時間間隔視窗 [00:02:00,00:02:59.999) 產生任何值。

我們如何讓 Beam 為該遺失的視窗輸出值?首先,讓我們逐步說明一些不使用 Timer API 的選項。

選項 1:外部心跳

我們可以利用外部系統為每個時間間隔發出一個值,並將其注入到 Beam 使用的資料串流中。這個簡單的選項將任何複雜性都移出了 Beam 管道。但是,使用外部系統意味著我們需要監視此系統並與 Beam 管道同時執行其他維護任務。

選項 2:在 Beam 管道中使用產生的來源

我們可以利用產生來源來使用此程式碼片段發出值

pipeline.apply(GenerateSequence.
            from(0).withRate(1,Duration.standardSeconds(1L)))

然後,我們可以

  1. 使用 DoFn 將值轉換為零。
  2. 將此值與實際來源攤平。
  3. 產生一個 PCollection,其中每個時間間隔都有刻度。

這也是一種在每個時間間隔中產生值的簡單方法。

選項 1 和 2 多個鍵的問題

當管道處理單個鍵時,選項 1 和選項 2 都能正常運作。現在,讓我們處理以下情況:不是 1 個 IoT 裝置,而是有成千上萬甚至數十萬個具有唯一鍵的裝置。為了使選項 1 或選項 2 在此情況下發揮作用,我們需要執行一個額外的步驟:建立一個 FanOut DoFn。每個刻度都需要分發到所有可能的鍵,因此我們需要建立一個 FanOut DoFn,該 DoFn 取得虛擬值並為每個可用的鍵產生一個鍵值對。

例如,假設我們有 3 個用於 3 個 IoT 裝置的鍵 {key1,key2,key3}。當我們從 GenerateSequence 取得第一個元素時,使用選項 2 中概述的方法,我們需要在 DoFn 中建立一個迴圈,以產生 3 個鍵值對。這些對成為每個 IoT 裝置的心跳值。

當我們需要處理大量的 IoT 裝置時,事情會變得更加有趣,因為鍵的列表會動態變化。我們需要新增一個轉換,執行 Distinct 操作,並將產生的資料作為側輸入饋送到 FanOut DoFn 中。

選項 3:使用 Beam 計時器實作心跳

那麼,計時器如何提供協助呢?讓我們看一下新的轉換

編輯:迴圈計時器狀態從 Boolean 變更為 Long,以允許最小值檢查。

public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {

    Instant stopTimerTime;

    LoopingStatefulTimer(Instant stopTime){
      this.stopTimerTime = stopTime;
    }

    @StateId("loopingTimerTime")
    private final StateSpec<ValueState<Long>> loopingTimerTime =
        StateSpecs.value(BigEndianLongCoder.of());

    @StateId("key")
    private final StateSpec<ValueState<String>> key =
        StateSpecs.value(StringUtf8Coder.of());

    @TimerId("loopingTimer")
    private final TimerSpec loopingTimer =
        TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key,
        @StateId("loopingTimerTime") ValueState<Long> loopingTimerTime,
        @TimerId("loopingTimer") Timer loopingTimer) {

      // If the timer has been set already, or if the value is smaller than
      // the current element + window duration, do not set
      Long currentTimerValue = loopingTimerTime.read();
      Instant nextTimerTimeBasedOnCurrentElement = c.timestamp().plus(Duration.standardMinutes(1));

      if (currentTimerValue == null || currentTimerValue >
          nextTimerTimeBasedOnCurrentElement.getMillis()) {
        loopingTimer.set(nextTimerTimeBasedOnCurrentElement);
        loopingTimerTime.write(nextTimerTimeBasedOnCurrentElement.getMillis());
      }

      // We need this value so that we can output a value for the correct key in OnTimer
      if (key.read() == null) {
        key.write(c.element().getKey());
      }

      c.output(c.element());
    }

    @OnTimer("loopingTimer")
    public void onTimer(
        OnTimerContext c,
        @StateId("key") ValueState<String> key,
        @TimerId("loopingTimer") Timer loopingTimer) {

      LOG.info("Timer @ {} fired", c.timestamp());
      c.output(KV.of(key.read(), 0));

      // If we do not put in a “time to live” value, then the timer would loop forever
      Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
      if (nextTimer.isBefore(stopTimerTime)) {
        loopingTimer.set(nextTimer);
      } else {
        LOG.info(
            "Timer not being set as exceeded Stop Timer value {} ",
            stopTimerTime);
      }
    }
  }

狀態 API 需要保留兩個資料值

  1. 一個 Boolean timeRunning 值,用於避免在計時器已在執行時重設計時器。
  2. 一個「key」狀態物件值,允許我們儲存我們正在使用的鍵。此資訊稍後將在 OnTimer 事件中需要。

我們還有一個 ID 為 **loopingTimer** 的計時器,該計時器充當我們的每個時間間隔鬧鐘。請注意,計時器是一個事件計時器。它根據浮水印觸發,而不是根據管道執行時的時間經過來觸發。

接下來,讓我們解包 @ProcessElement 區塊中發生的事情

進入此區塊的第一個元素將

  1. timerRunner 的狀態設定為 True。
  2. 將鍵值對中的鍵值寫入鍵 StateValue 中。
  3. 程式碼設定計時器的值,以在元素的的時間戳記之後一分鐘觸發。請注意,此時間戳記允許的最大值為 XX:XX:59.999。這會將最大警報值放置在下一個時間間隔的上限。
  4. 最後,我們使用 c.output@ProcessElement 區塊輸出資料。

在 @OnTimer 區塊中,會發生以下情況

  1. 程式碼發出一個值,該值帶有從我們的鍵 StateValue 中提取的鍵和值 0。事件的時間戳記對應於計時器觸發的事件時間。
  2. 除非我們已超過 stopTimerTime 值,否則我們會從現在起設定一個新的計時器一分鐘。您的使用案例通常會有更複雜的停止條件,但我們在此處使用一個簡單的條件,以使說明的程式碼保持簡單。停止條件的主題將在稍後更詳細地討論。

就是這樣,讓我們將轉換新增回管道中

  // Apply a fixed window of duration 1 min and Sum the results
  p.apply(Create.timestamped(time_1, time_2, time_3)).apply(
    Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(1))))
    // We use a combiner to reduce the number of calls in keyed state
    // from all elements to 1 per FixedWindow
    .apply(Sum.integersPerKey())
    .apply(Window.into(new GlobalWindows()))
    .apply(ParDo.of(new LoopingStatefulTimer(Instant.parse("2000-01-01T00:04:00Z"))))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
    .apply(Sum.integersPerKey())
    .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {

      @ProcessElement public void process(ProcessContext c) {

        LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());

     }
  }));
  1. 在管道的第一部分,我們建立 FixedWindows 並將每個鍵的值減少到單個 Sum。
  2. 接下來,我們將輸出重新視窗化到 GlobalWindow 中。由於狀態和計時器是按視窗計算的,因此必須在視窗邊界內設定它們。我們希望迴圈計時器跨越所有固定視窗,因此我們在全域視窗中進行設定。
  3. 然後,我們新增 LoopingStatefulTimer DoFn。
  4. 最後,我們重新套用 FixedWindows 並加總值。

即使管道的 Source 在時間間隔視窗的最小值和最大值邊界中發出值,此管道也會確保每個時間間隔視窗都有值零。這表示我們可以標記資料的缺失。

您可能會質疑為什麼我們使用兩個帶有多個 Sum.integersPerKey 的縮減器。為什麼不只使用一個?從功能上來說,使用一個也會產生正確的結果。但是,放置兩個 Sum.integersPerKey 會為我們帶來良好的效能優勢。它可以將每個時間間隔的元素數量從許多減少到只有一個。這可以減少在 @ProcessElement 呼叫期間讀取狀態 API 的次數。

以下是執行修改後的管道的記錄輸出

INFO  LoopingTimer  - Timer @ 2000-01-01T00:01:59.999Z fired
INFO  LoopingTimer  - Timer @ 2000-01-01T00:02:59.999Z fired
INFO  LoopingTimer  - Timer @ 2000-01-01T00:03:59.999Z fired
INFO  LoopingTimer  - Timer not being set as exceeded Stop Timer value 2000-01-01T00:04:00.000Z
INFO  LoopingTimer  - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 0} timestamp is 2000-01-01T00:02:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z
INFO  LoopingTimer  - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z

耶!即使來源資料集中該時間間隔中沒有元素,我們現在也具有來自時間間隔 [00:01:00, 00:01:59.999) 的輸出。

在本部落格中,我們涵蓋了時間序列使用案例的一些有趣領域,並逐步說明了幾個選項,包括 Timer API 的進階使用案例。祝大家迴圈愉快!

注意:迴圈計時器是 Timer API 的一個有趣的新使用案例,執行器將需要為其所有更進階的功能集新增支援。您今天可以使用 DirectRunner 實驗這種模式。對於其他執行器,請留意他們的版本說明,了解關於在生產中處理此使用案例的支援。

(功能矩陣)

特定於執行器的注意事項:Google Cloud Dataflow 執行器的 Drain 功能不支援迴圈計時器(連結到矩陣)