部落格
2016/10/20
在 Apache Beam 中測試無界限的管線
Beam 程式設計模型統一了批次和串流管線的撰寫方式。我們最近引入了一個新的 PTransform 來為將在無界限資料集上執行,並且必須處理無序和延遲資料的管線編寫測試。
浮水印 (Watermarks)、視窗 (Windows) 和觸發器 (Triggers) 構成了 Beam 程式設計模型的核心部分 – 它們分別決定了您的資料如何分組、輸入何時完成以及何時產生結果。這適用於所有管線,無論它們是處理有界限還是無界限的輸入。如果您不熟悉 Beam 模型中的浮水印、視窗和觸發器,串流 101 和 串流 102 是一個很好的入門點。從這些文章中可以得出一個重要的結論:在具有間歇性故障和斷線使用者的實際串流情境中,資料可能會無序到達或延遲到達。儘管存在這些挑戰,Beam 的基礎元件為使用者提供了一種執行有用、強大且正確計算的方法。
作為 Beam 管線的作者,我們需要涵蓋關鍵失敗情境和邊緣案例的全面測試,才能真正確信管線已準備好投入生產。Beam SDK 中的現有測試基礎結構允許編寫測試,以檢查執行時管線的內容。然而,為可能接收延遲資料或多次觸發的管線編寫單元測試,從歷史上來看,其複雜程度從複雜到不可能不等,因為從無界限來源讀取的管線在沒有外部干預的情況下不會關閉,而僅從有界限來源讀取的管線則無法測試延遲資料的行為,也不能測試大多數推測性的觸發器。如果沒有其他工具,則無法輕鬆測試使用自訂觸發器並處理無序資料的管線。
這篇部落格文章介紹了我們新的框架,用於針對行動遊戲範例系列中 LeaderBoard 管線處理延遲和無序資料的管線編寫測試。
LeaderBoard 和行動遊戲範例
LeaderBoard 是 Beam 行動遊戲範例(和逐步解說)的一部分,它會持續記錄使用者和團隊的分數。使用者分數是在程式的整個生命週期內計算的,而團隊分數則是在固定視窗內計算的,預設持續時間為一小時。LeaderBoard 管線會根據管線的設定觸發和允許延遲,適當地產生推測性和延遲的窗格。LeaderBoard 管線的預期輸出會根據元素相對於浮水印到達的時間和處理時間的進度而有所不同,這以前無法在測試中控制。
撰寫決定性測試以模擬非決定性
Beam 測試基礎結構提供了 PAssert 方法,該方法會斷言管線中 PCollection 的內容屬性。我們擴展了此基礎結構,加入了 TestStream,這是一個 PTransform,它會執行一系列事件,包括將其他元素新增到管線、推進 TestStream 的浮水印,以及推進管線處理時間時鐘。TestStream 允許測試觀察觸發器對管線產生的輸出的影響。
執行從 TestStream 讀取的管線時,讀取會等待每個事件的所有後果完成,然後繼續下一個事件,從而確保當處理時間推進時,基於處理時間的觸發器會按需觸發。使用此轉換,可以在管線上觀察觸發和允許延遲的效果,包括對推測性和延遲窗格以及捨棄資料的反應。
元素時序
元素會在浮水印之前、同時或之後到達,這會將它們分類為「早期」、「準時」和「延遲」類別。「延遲」元素可以進一步細分為「無法觀察到」、「可觀察到」和「可捨棄」延遲,具體取決於它們被指派到的視窗和最大允許延遲(由視窗策略指定)。具有這些時序的元素會發射到窗格中,這些窗格可以是「EARLY」、「ON-TIME」或「LATE」,具體取決於發射窗格時浮水印的位置。
使用 TestStream,我們可以編寫測試來證明推測性窗格在其觸發條件滿足後輸出,浮水印的推進會導致產生準時窗格,而延遲到達的資料會在最大允許延遲之前到達時產生改進,並在其之後捨棄。
以下範例說明如何使用 TestStream 向管線提供一系列事件,其中元素的到達會穿插著對浮水印的更新和處理時間的推進。每個事件都會執行到完成,然後才會發生其他事件。
在圖表中,「實際」時間(事件)發生時間會隨著圖形向右移動而進展。管線接收它們的時間會隨著圖形向上移動而進展。浮水印由波浪形的紅色線條表示,每個星爆都是觸發器的觸發和相關的窗格。

一切都準時到達
例如,如果我們建立一個 TestStream,其中所有資料都在浮水印之前到達,並將結果 PCollection 作為輸入提供給 CalculateTeamScores PTransform
TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
.addElements(new GameActionInfo("sky", "blue", 12, new Instant(0L)),
new GameActionInfo("navy", "blue", 3, new Instant(0L)),
new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
// Move the watermark past the end the end of the window
.advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
.plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
.apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));
然後我們可以斷言結果 PCollection 包含已到達的元素

某些元素延遲到達,但在視窗結束前到達
我們也可以在浮水印之後,但在視窗結束之前將資料新增到 TestStream(如下圖紅色浮水印的左側所示),這展示了「無法觀察到延遲」的資料 – 也就是說,延遲到達的資料,但由於它在浮水印通過視窗結束之前到達,因此被系統升級為準時資料
TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
.addElements(new GameActionInfo("sky", "blue", 3, new Instant(0L)),
new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
// Move the watermark up to "near" the end of the window
.advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
.minus(Duration.standardMinutes(1)))
.addElements(new GameActionInfo("sky", "blue", 12, Duration.ZERO))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
.apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));

元素延遲到達,並在視窗結束後到達
藉由在新增延遲資料之前將浮水印進一步推進到時間中,我們可以展示觸發行為,該行為會導致系統發射一個準時窗格,然後在延遲資料到達後,發射一個改進結果的窗格。
TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
.addElements(new GameActionInfo("sky", "blue", 3, new Instant(0L)),
new GameActionInfo("navy", "blue", 3, new Instant(0L).plus(Duration.standardMinutes(3))))
// Move the watermark up to "near" the end of the window
.advanceWatermarkTo(new Instant(0L).plus(TEAM_WINDOW_DURATION)
.minus(Duration.standardMinutes(1)))
.addElements(new GameActionInfo("sky", "blue", 12, Duration.ZERO))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
.apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));

// An on-time pane is emitted with the events that arrived before the window closed
PAssert.that(teamScores)
.inOnTimePane(window)
.containsInAnyOrder(KV.of("blue", 6));
// The final pane contains the late refinement
PAssert.that(teamScores)
.inFinalPane(window)
.containsInAnyOrder(KV.of("blue", 18));
p.run();
元素延遲到達,並且在視窗結束加上允許的延遲之後到達
如果我們將浮水印進一步推向未來,超出設定的最大允許延遲,我們可以證明系統會捨棄延遲元素。
TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
.addElements(new GameActionInfo("sky", "blue", 3, Duration.ZERO),
new GameActionInfo("navy", "blue", 3, Duration.standardMinutes(3)))
// Move the watermark up to "near" the end of the window
.advanceWatermarkTo(new Instant(0).plus(TEAM_WINDOW_DURATION)
.plus(ALLOWED_LATENESS)
.plus(Duration.standardMinutes(1)))
.addElements(new GameActionInfo(
"sky",
"blue",
12,
new Instant(0).plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1))))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> teamScores = p.apply(createEvents)
.apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS));

元素在視窗結束前到達,並且經過一些處理時間
使用其他方法,我們可以藉由推進 TestStream 的處理時間來展示推測性觸發器的行為。如果我們將元素新增到輸入 PCollection,偶爾推進處理時間時鐘,並套用 CalculateUserScores
TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class))
.addElements(new GameActionInfo("scarlet", "red", 3, new Instant(0L)),
new GameActionInfo("scarlet", "red", 2, new Instant(0L).plus(Duration.standardMinutes(1))))
.advanceProcessingTime(Duration.standardMinutes(12))
.addElements(new GameActionInfo("oxblood", "red", 2, new Instant(0L)).plus(Duration.standardSeconds(22)),
new GameActionInfo("scarlet", "red", 4, new Instant(0L).plus(Duration.standardMinutes(2))))
.advanceProcessingTime(Duration.standardMinutes(15))
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> userScores =
p.apply(createEvents).apply(new CalculateUserScores(ALLOWED_LATENESS));

TestStream - 底層原理
TestStream 依賴於我們引入的管線概念,稱為靜止,以利用現有的執行器基礎結構,同時保證執行器何時呼叫根轉換。這包含有關待處理元素和觸發器的屬性,即
- 不允許有觸發器觸發但尚未觸發
- 所有元素都緩衝在狀態中,或無法進展,直到側邊輸入可用
簡而言之,這表示,在沒有輸入浮水印或處理時間推進,或者沒有將其他元素新增到管線的情況下,管線將不會進展。每當 TestStream PTransform 執行動作時,執行器都不得重新調用相同的執行個體,直到管線靜止。這可確保 TestStream 指定的事件「按順序」發生,這可確保輸入浮水印和系統時鐘不會在其希望保留的元素之前推進。
DirectRunner 已被修改為使用靜止作為訊號,表示它應向管線新增更多工作,而該執行器中 TestStream 的實作會利用此事實來執行每個事件的單一輸出。DirectRunner 實作也會直接控制執行器的系統時鐘,確保即使管線內存在數分鐘的處理時間觸發器,測試也能及時完成。
DirectRunner 中支援 TestStream 轉換。對於大多數使用者來說,使用 TestPipeline 和 PAsserts 編寫的測試在使用 TestStream 時將自動發揮作用。
摘要
在 PAssert 中加入 TestStream 和特定於視窗和窗格的匹配器,已啟用對產生推測性和延遲窗格的管線的測試。這允許在 Java SDK 中直接表達所有樣式的管線的測試。如果您有任何問題或意見,我們很樂意在郵件清單中聽到它們。