執行模型

Beam 模型允許執行器以不同的方式執行您的管線。您可能會因為執行器的選擇而觀察到各種效果。此頁面描述這些效果,以便您更好地了解 Beam 管線的執行方式。

元素處理

機器之間元素的序列化和通訊是管線分散式執行中最昂貴的操作之一。避免這種序列化可能需要在失敗後重新處理元素,或者可能會限制輸出到其他機器的分發。

序列化和通訊

執行器可能會為了通訊目的和諸如持久性等其他原因而在機器之間序列化元素。

執行器可能會決定以多種方式在轉換之間傳輸元素,例如

執行器可能序列化和保留元素的一些情況如下

  1. 當作為有狀態的 DoFn 的一部分使用時,執行器可能會將值保留到某些狀態機制。
  2. 當提交處理結果時,執行器可能會將輸出保留為檢查點。

捆綁和持久性

Beam 管線通常側重於「可輕易平行化」的問題。因此,API 強調平行處理元素,這使得表達諸如「將序號分配給 PCollection 中的每個元素」之類的操作變得困難。這是故意的,因為此類演算法更容易出現可擴展性問題。

平行處理所有元素也有一些缺點。具體而言,它使得無法批次處理任何操作,例如將元素寫入接收器或在處理過程中檢查點進度。

PCollection 中的元素不是同時處理所有元素,而是以捆綁方式處理。將集合劃分為捆綁是任意的,並由執行器選擇。這允許執行器在每個元素之後保留結果,以及在發生失敗時必須重試所有內容之間選擇適當的折衷方案。例如,串流執行器可能傾向於處理和提交小的捆綁包,而批次執行器可能傾向於處理較大的捆綁包。

資料分割和跨階段執行

Beam 管線中元素處理的分割和平行化取決於兩件事

Beam 管線從來源讀取資料 (例如 KafkaIOBigQueryIOJdbcIO 或您自己的來源實作)。要在 Beam 中實作來源,必須將其實作為可分割的 DoFn。可分割的 DoFn 為執行器提供介面以方便分割工作。

當在 Beam 中執行基於鍵的操作 (例如 GroupByKeyCombineReshuffle.perKey 和有狀態的 DoFn) 時,Beam 執行器會執行稱為重新洗牌1的資料序列化和傳輸。重新洗牌允許將相同鍵的資料元素一起處理。

執行器重新洗牌資料的方式對於批次和串流執行模式可能略有不同。

1不要與某些執行器中的 shuffle 操作混淆。

管線執行中的資料順序

Beam 模型沒有定義關於執行器處理元素或跨 PTransforms 傳輸元素的順序的嚴格指南。執行器可以自由地以不同的形式實作資料傳輸語義。

存在一些使用案例,其中使用者管線可能需要依賴於管線執行中的特定排序語義。功能矩陣文件記錄了執行器的鍵順序傳遞行為。

考慮一個 Beam 工作程式處理來自同一 Beam 轉換的一系列捆綁,並考慮一個將資料從此階段輸出到下游 PCollectionPTransform。最後,考慮此工作程式以特定順序發出的兩個具有相同鍵的事件 (在同一捆綁中或作為不同捆綁的一部分)。

如果 Beam 執行器保證這兩個事件將被直接下游的 PTransform 以相同的順序觀察到,而與資料傳輸方法無關,則我們說 Beam 執行器支援鍵順序傳遞

此特性在具有鍵限制平行處理的執行器和操作中將保持為真。

轉換內部和之間的失敗和平行處理

在本節中,我們討論如何平行處理輸入集合中的元素,以及在發生失敗時如何重試轉換。

單一轉換內的資料平行處理

當執行單個 ParDo 時,執行器可能會將九個元素的範例輸入集合分成兩個捆綁,如圖 1 所示。

Bundle A contains five elements. Bundle B contains four elements.

圖 1:執行器將輸入集合分成兩個捆綁。

ParDo 執行時,工作程式可能會平行處理這兩個捆綁,如圖 2 所示。

Two workers process the two bundles in parallel. Worker one processes bundle A. Worker two processes bundle B.

圖 2:兩個工作程式平行處理這兩個捆綁。

由於元素無法分割,因此轉換的最大平行處理取決於集合中元素的數量。在圖 3 中,輸入集合有九個元素,因此最大平行處理為九。

Nine workers process a nine element input collection in parallel.

圖 3:九個工作程式平行處理九個元素的輸入集合。

注意:可分割的 ParDo 允許跨多個捆綁 (bundle) 分割單個輸入的處理。此功能仍在開發中。

轉換之間的相依平行處理

如果執行器選擇在不更改捆綁的情況下,在產生轉換的輸出元素上執行消耗轉換,則依序排列的 ParDo 轉換可能是相依並行的。在圖 4 中,如果必須在同一個工作節點上處理給定元素的 ParDo1 輸出,則 ParDo1ParDo2相依並行的

ParDo1 processes an input collection that contains bundles A and B. ParDo2 then processes the output collection from ParDo1, which contains bundles C and D.

圖 4:依序排列的兩個轉換及其對應的輸入集合。

圖 5 顯示了這些相依並行的轉換可能如何執行。第一個工作節點在捆綁 A 中的元素上執行 ParDo1 (產生捆綁 C),然後在捆綁 C 中的元素上執行 ParDo2。同樣地,第二個工作節點在捆綁 B 中的元素上執行 ParDo1 (產生捆綁 D),然後在捆綁 D 中的元素上執行 ParDo2

Worker one executes ParDo1 on bundle A and Pardo2 on bundle C. Worker two executes ParDo1 on bundle B and ParDo2 on bundle D.

圖 5:兩個工作節點執行相依並行的 ParDo 轉換。

以這種方式執行轉換允許執行器避免在工作節點之間重新分配元素,從而節省通信成本。但是,最大平行度現在取決於相依並行步驟中第一個步驟的最大平行度。

單一轉換內的失敗

如果捆綁中元素的處理失敗,則整個捆綁失敗。必須重試捆綁中的元素(否則整個管線會失敗),儘管它們不需要以相同的捆綁方式重試。

在此範例中,我們將使用圖 1 中的 ParDo,它有一個包含九個元素的輸入集合,並分為兩個捆綁。

在圖 6 中,第一個工作節點成功處理了捆綁 A 中的所有五個元素。第二個工作節點處理捆綁 B 中的四個元素:前兩個元素已成功處理,第三個元素的處理失敗,還有一個元素仍在等待處理。

我們看到執行器重試了捆綁 B 中的所有元素,並且第二次處理成功完成。請注意,重試不一定會發生在與原始處理嘗試相同的工作節點上,如圖所示。

Worker two fails to process an element in bundle B. Worker one finishes processing bundle A and then successfully retries to execute bundle B.

圖 6:捆綁 B 中一個元素的處理失敗,另一個工作節點重試整個捆綁。

由於我們在處理輸入捆綁中的元素時遇到失敗,因此我們必須重新處理輸入捆綁中的所有元素。這表示執行器必須丟棄整個捆綁的輸出(包括任何狀態變更和設定的計時器),因為它包含的所有結果都將重新計算。

請注意,如果失敗的轉換是 ParDo,則 DoFn 實例會被拆解並捨棄。

耦合失敗:轉換之間的失敗

如果 ParDo2 中元素的處理失敗導致 ParDo1 重新執行,則這兩個步驟被稱為是共同失敗的

在此範例中,我們將使用圖 4 中的兩個 ParDo

在圖 7 中,工作節點 2 成功地在捆綁 B 中的所有元素上執行了 ParDo1。但是,該工作節點無法處理捆綁 D 中的一個元素,因此 ParDo2 失敗(顯示為紅色 X)。因此,執行器必須丟棄並重新計算 ParDo2 的輸出。由於執行器將 ParDo1ParDo2 一起執行,因此也必須丟棄 ParDo1 的輸出捆綁,並且必須重試輸入捆綁中的所有元素。這兩個 ParDo 是共同失敗的。

Worker two fails to process en element in bundle D, so all elements in both bundle B and bundle D must be retried.

圖 7:捆綁 D 中一個元素的處理失敗,因此重試輸入捆綁中的所有元素。

請注意,重試不一定具有與原始嘗試相同的處理時間,如圖所示。

所有遇到耦合失敗的 DoFn 都會被終止,並且必須被拆解,因為它們不遵循正常的 DoFn 生命周期。

以這種方式執行轉換允許執行器避免在轉換之間持久化元素,從而節省持久化成本。