部落格
2019/06/11
Apache Beam 中的迴圈計時器
Apache Beam 的基本元件讓您可以建立適用於各種使用案例的表達式資料管道。其中一個特定的使用案例是時間序列資料分析,其中跨視窗邊界的連續序列非常重要。當您處理這種類型的資料時,會出現一些有趣的挑戰,在本部落格中,我們將更詳細地探討其中一個挑戰,並使用「迴圈計時器」模式來使用 Timer API(部落格文章)。
在串流模式下使用 Beam,您可以取得資料串流並建立分析轉換,以產生資料結果。但是對於時間序列資料,資料的缺失是有用的資訊。那麼,我們如何在沒有資料的情況下產生結果呢?
讓我們用一個更具體的例子來說明需求。假設您有一個簡單的管道,每分鐘計算來自 IoT 裝置的事件數量。我們希望在特定時間間隔內沒有看到任何資料時產生值 0。那麼,為什麼這會變得棘手呢?嗯,建立一個在事件到達時計數事件的簡單管道很容易,但是當沒有事件時,就沒有什麼可以計數的了!
讓我們建立一個簡單的管道來使用
// We will start our timer at 1 sec from the fixed upper boundary of our
// minute window
Instant now = Instant.parse("2000-01-01T00:00:59Z");
// ----- Create some dummy data
// Create 3 elements, incrementing by 1 minute and leaving a time gap between
// element 2 and element 3
TimestampedValue<KV<String, Integer>> time_1 =
TimestampedValue.of(KV.of("Key_A", 1), now);
TimestampedValue<KV<String, Integer>> time_2 =
TimestampedValue.of(KV.of("Key_A", 2),
now.plus(Duration.standardMinutes(1)));
// No Value for start time + 2 mins
TimestampedValue<KV<String, Integer>> time_3 =
TimestampedValue.of(KV.of("Key_A", 3),
now.plus(Duration.standardMinutes(3)));
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
Pipeline p = Pipeline.create(options);
// Apply a fixed window of duration 1 min and Sum the results
p.apply(Create.timestamped(time_1, time_2, time_3))
.apply(
Window.<KV<String,Integer>>into(
FixedWindows.<Integer>of(Duration.standardMinutes(1))))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@ProcessElement public void process(ProcessContext c) {
LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());
}
}));
p.run();
執行該管道將產生以下輸出
INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z
INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z
注意:輸出的順序不一致是預期的,但是 key-window 元組的計算是正確的。
如預期的那樣,我們在每個時間間隔視窗中都看到了輸出,該視窗的資料點的時間戳記介於視窗的最小值和最大值之間。時間戳記為 00:00:59、00:01:59 和 00:03:59 的資料點落入了以下時間間隔視窗中。
- [00:00:00, 00:00:59.999)
- [00:01:00, 00:01:59.999)
- [00:03:00, 00:03:59.999)
但是,由於 00:02:00 和 00:02:59 之間沒有資料,因此沒有為時間間隔視窗 [00:02:00,00:02:59.999) 產生任何值。
我們如何讓 Beam 為該遺失的視窗輸出值?首先,讓我們逐步說明一些不使用 Timer API 的選項。
選項 1:外部心跳
我們可以利用外部系統為每個時間間隔發出一個值,並將其注入到 Beam 使用的資料串流中。這個簡單的選項將任何複雜性都移出了 Beam 管道。但是,使用外部系統意味著我們需要監視此系統並與 Beam 管道同時執行其他維護任務。
選項 2:在 Beam 管道中使用產生的來源
我們可以利用產生來源來使用此程式碼片段發出值
pipeline.apply(GenerateSequence.
from(0).withRate(1,Duration.standardSeconds(1L)))
然後,我們可以
- 使用 DoFn 將值轉換為零。
- 將此值與實際來源攤平。
- 產生一個 PCollection,其中每個時間間隔都有刻度。
這也是一種在每個時間間隔中產生值的簡單方法。
選項 1 和 2 多個鍵的問題
當管道處理單個鍵時,選項 1 和選項 2 都能正常運作。現在,讓我們處理以下情況:不是 1 個 IoT 裝置,而是有成千上萬甚至數十萬個具有唯一鍵的裝置。為了使選項 1 或選項 2 在此情況下發揮作用,我們需要執行一個額外的步驟:建立一個 FanOut DoFn。每個刻度都需要分發到所有可能的鍵,因此我們需要建立一個 FanOut DoFn,該 DoFn 取得虛擬值並為每個可用的鍵產生一個鍵值對。
例如,假設我們有 3 個用於 3 個 IoT 裝置的鍵 {key1,key2,key3}。當我們從 GenerateSequence 取得第一個元素時,使用選項 2 中概述的方法,我們需要在 DoFn 中建立一個迴圈,以產生 3 個鍵值對。這些對成為每個 IoT 裝置的心跳值。
當我們需要處理大量的 IoT 裝置時,事情會變得更加有趣,因為鍵的列表會動態變化。我們需要新增一個轉換,執行 Distinct 操作,並將產生的資料作為側輸入饋送到 FanOut DoFn 中。
選項 3:使用 Beam 計時器實作心跳
那麼,計時器如何提供協助呢?讓我們看一下新的轉換
編輯:迴圈計時器狀態從 Boolean 變更為 Long,以允許最小值檢查。
public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> {
Instant stopTimerTime;
LoopingStatefulTimer(Instant stopTime){
this.stopTimerTime = stopTime;
}
@StateId("loopingTimerTime")
private final StateSpec<ValueState<Long>> loopingTimerTime =
StateSpecs.value(BigEndianLongCoder.of());
@StateId("key")
private final StateSpec<ValueState<String>> key =
StateSpecs.value(StringUtf8Coder.of());
@TimerId("loopingTimer")
private final TimerSpec loopingTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key,
@StateId("loopingTimerTime") ValueState<Long> loopingTimerTime,
@TimerId("loopingTimer") Timer loopingTimer) {
// If the timer has been set already, or if the value is smaller than
// the current element + window duration, do not set
Long currentTimerValue = loopingTimerTime.read();
Instant nextTimerTimeBasedOnCurrentElement = c.timestamp().plus(Duration.standardMinutes(1));
if (currentTimerValue == null || currentTimerValue >
nextTimerTimeBasedOnCurrentElement.getMillis()) {
loopingTimer.set(nextTimerTimeBasedOnCurrentElement);
loopingTimerTime.write(nextTimerTimeBasedOnCurrentElement.getMillis());
}
// We need this value so that we can output a value for the correct key in OnTimer
if (key.read() == null) {
key.write(c.element().getKey());
}
c.output(c.element());
}
@OnTimer("loopingTimer")
public void onTimer(
OnTimerContext c,
@StateId("key") ValueState<String> key,
@TimerId("loopingTimer") Timer loopingTimer) {
LOG.info("Timer @ {} fired", c.timestamp());
c.output(KV.of(key.read(), 0));
// If we do not put in a “time to live” value, then the timer would loop forever
Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
if (nextTimer.isBefore(stopTimerTime)) {
loopingTimer.set(nextTimer);
} else {
LOG.info(
"Timer not being set as exceeded Stop Timer value {} ",
stopTimerTime);
}
}
}
狀態 API 需要保留兩個資料值
- 一個 Boolean
timeRunning
值,用於避免在計時器已在執行時重設計時器。 - 一個「key」狀態物件值,允許我們儲存我們正在使用的鍵。此資訊稍後將在
OnTimer
事件中需要。
我們還有一個 ID 為 **loopingTimer**
的計時器,該計時器充當我們的每個時間間隔鬧鐘。請注意,計時器是一個事件計時器。它根據浮水印觸發,而不是根據管道執行時的時間經過來觸發。
接下來,讓我們解包 @ProcessElement 區塊中發生的事情
進入此區塊的第一個元素將
- 將
timerRunner
的狀態設定為 True。 - 將鍵值對中的鍵值寫入鍵 StateValue 中。
- 程式碼設定計時器的值,以在元素的的時間戳記之後一分鐘觸發。請注意,此時間戳記允許的最大值為 XX:XX:59.999。這會將最大警報值放置在下一個時間間隔的上限。
- 最後,我們使用
c.output
從@ProcessElement
區塊輸出資料。
在 @OnTimer 區塊中,會發生以下情況
- 程式碼發出一個值,該值帶有從我們的鍵 StateValue 中提取的鍵和值 0。事件的時間戳記對應於計時器觸發的事件時間。
- 除非我們已超過
stopTimerTime
值,否則我們會從現在起設定一個新的計時器一分鐘。您的使用案例通常會有更複雜的停止條件,但我們在此處使用一個簡單的條件,以使說明的程式碼保持簡單。停止條件的主題將在稍後更詳細地討論。
就是這樣,讓我們將轉換新增回管道中
// Apply a fixed window of duration 1 min and Sum the results
p.apply(Create.timestamped(time_1, time_2, time_3)).apply(
Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(1))))
// We use a combiner to reduce the number of calls in keyed state
// from all elements to 1 per FixedWindow
.apply(Sum.integersPerKey())
.apply(Window.into(new GlobalWindows()))
.apply(ParDo.of(new LoopingStatefulTimer(Instant.parse("2000-01-01T00:04:00Z"))))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(Sum.integersPerKey())
.apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
@ProcessElement public void process(ProcessContext c) {
LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp());
}
}));
- 在管道的第一部分,我們建立 FixedWindows 並將每個鍵的值減少到單個 Sum。
- 接下來,我們將輸出重新視窗化到 GlobalWindow 中。由於狀態和計時器是按視窗計算的,因此必須在視窗邊界內設定它們。我們希望迴圈計時器跨越所有固定視窗,因此我們在全域視窗中進行設定。
- 然後,我們新增 LoopingStatefulTimer DoFn。
- 最後,我們重新套用 FixedWindows 並加總值。
即使管道的 Source 在時間間隔視窗的最小值和最大值邊界中發出值,此管道也會確保每個時間間隔視窗都有值零。這表示我們可以標記資料的缺失。
您可能會質疑為什麼我們使用兩個帶有多個 Sum.integersPerKey
的縮減器。為什麼不只使用一個?從功能上來說,使用一個也會產生正確的結果。但是,放置兩個 Sum.integersPerKey
會為我們帶來良好的效能優勢。它可以將每個時間間隔的元素數量從許多減少到只有一個。這可以減少在 @ProcessElement
呼叫期間讀取狀態 API 的次數。
以下是執行修改後的管道的記錄輸出
INFO LoopingTimer - Timer @ 2000-01-01T00:01:59.999Z fired
INFO LoopingTimer - Timer @ 2000-01-01T00:02:59.999Z fired
INFO LoopingTimer - Timer @ 2000-01-01T00:03:59.999Z fired
INFO LoopingTimer - Timer not being set as exceeded Stop Timer value 2000-01-01T00:04:00.000Z
INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z
INFO LoopingTimer - Value is KV{Key_A, 0} timestamp is 2000-01-01T00:02:59.999Z
INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z
INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z
耶!即使來源資料集中該時間間隔中沒有元素,我們現在也具有來自時間間隔 [00:01:00, 00:01:59.999) 的輸出。
在本部落格中,我們涵蓋了時間序列使用案例的一些有趣領域,並逐步說明了幾個選項,包括 Timer API 的進階使用案例。祝大家迴圈愉快!
注意:迴圈計時器是 Timer API 的一個有趣的新使用案例,執行器將需要為其所有更進階的功能集新增支援。您今天可以使用 DirectRunner 實驗這種模式。對於其他執行器,請留意他們的版本說明,了解關於在生產中處理此使用案例的支援。
(功能矩陣)
特定於執行器的注意事項:Google Cloud Dataflow 執行器的 Drain 功能不支援迴圈計時器(連結到矩陣)