設計您的管線
本頁面可協助您設計 Apache Beam 管線。 其中包含有關如何判斷管線結構、如何選擇要套用至資料的轉換,以及如何判斷輸入和輸出方法等資訊。
在閱讀本節之前,建議您先熟悉Beam 程式設計指南中的資訊。
設計管線時要考量的因素
在設計 Beam 管線時,請考慮幾個基本問題
- 您的輸入資料儲存在哪裡?您有多少組輸入資料? 這將決定您需要在管線開頭套用哪些類型的
Read
轉換。 - 您的資料長什麼樣子? 可能是純文字、格式化的記錄檔,或資料庫表格中的列。 某些 Beam 轉換僅適用於鍵/值配對的
PCollection
;您需要判斷您的資料是否已鍵入,以及如何以最佳方式在管線的PCollection
中表示它。 - 您想要對您的資料做什麼? Beam SDK 中的核心轉換是通用轉換。 了解您需要如何變更或操作資料,將決定您如何建置像 ParDo 等核心轉換,或何時使用 Beam SDK 中包含的預先撰寫轉換。
- 您的輸出資料長什麼樣子,以及應該放到哪裡? 這將決定您需要在管線末尾套用哪些類型的
Write
轉換。
基本管線
最簡單的管線代表操作的線性流程,如圖 1 所示。
圖 1:線性管線。
但是,您的管線可能會複雜得多。 管線代表步驟的有向非循環圖。 它可以有多個輸入來源、多個輸出接收器,並且其操作 (PTransform
) 可以讀取和輸出多個 PCollection
。 以下範例顯示您的管線可以採用的不同形狀。
分支 PCollection
重要的是要了解轉換不會耗用 PCollection
;而是會考慮 PCollection
的每個個別元素,並建立新的 PCollection
作為輸出。 這樣,您就可以對同一個 PCollection
中的不同元素執行不同的操作。
多個轉換處理相同的 PCollection
您可以將相同的 PCollection
作為多個轉換的輸入,而無需耗用輸入或變更它。
圖 2 中的管線是分支管線。 該管線從資料庫表格讀取其輸入(表示為字串的名字),並建立表格列的 PCollection
。 然後,該管線將多個轉換套用至相同的 PCollection
。 轉換 A 會擷取以字母「A」開頭的 PCollection
中所有名稱,而轉換 B 會擷取以字母「B」開頭的 PCollection
中所有名稱。 轉換 A 和 B 都有相同的輸入 PCollection
。
圖 2:分支管線。 兩個轉換套用至單個資料庫表格列的 PCollection。
以下範例程式碼會將兩個轉換套用至單個輸入集合。
PCollection<String> dbRowCollection = ...;
PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){
c.output(c.element());
}
}
}));
PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){
c.output(c.element());
}
}
}));
產生多個輸出的單一轉換
分支管線的另一種方法是透過使用標記輸出,讓單一轉換輸出至多個 PCollection
。 產生多個輸出的轉換會處理輸入的每個元素一次,並輸出到零或多個 PCollection
。
圖 3 說明了上述相同的範例,但有一個轉換會產生多個輸出。 以「A」開頭的名稱會新增至主要輸出 PCollection
,而以「B」開頭的名稱會新增至額外輸出 PCollection
。
圖 3:具有產生多個 PCollection 輸出的轉換的管線。
如果我們比較圖 2 和圖 3 中的管線,您可以看到它們以不同的方式執行相同的操作。 圖 2 中的管線包含兩個轉換,它們會處理相同輸入 PCollection
中的元素。 一個轉換使用以下邏輯
if (starts with 'A') { outputToPCollectionA }
而另一個轉換使用
if (starts with 'B') { outputToPCollectionB }
因為每個轉換都會讀取整個輸入的 PCollection
,因此輸入 PCollection
中的每個元素都會被處理兩次。
圖 3 中的管線以不同的方式執行相同的操作 - 只使用一個轉換,該轉換使用以下邏輯
if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }
其中輸入 PCollection
中的每個元素都只會被處理一次。
以下程式碼範例應用了一個轉換,該轉換會處理每個元素一次並輸出兩個集合。
// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// Emit to main output, which is the output with tag startsWithATag.
c.output(c.element());
} else if(c.element().startsWith("B")) {
// Emit to output with tag startsWithBTag.
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
您可以使用任一種機制來產生多個輸出的 PCollection
。如果將處理邏輯組合到一個 ParDo
中在邏輯上沒有意義,則建議使用第一種選項。但是,如果每個元素的轉換計算耗時,則使用第二種選項(產生多個輸出的單一轉換)更有意義,而且如果您計劃將來增加更多輸出類型,則更具擴展性。
合併 PCollection
通常,在您通過多個轉換將 PCollection
分支為多個 PCollection
之後,您會希望將部分或所有結果的 PCollection
合併回一起。您可以使用下列其中一種方法來做到這一點
- Flatten - 您可以使用 Beam SDK 中的
Flatten
轉換來合併多個相同類型的PCollection
。 - Join - 您可以使用 Beam SDK 中的
CoGroupByKey
轉換來在兩個PCollection
之間執行關聯式聯結。PCollection
必須是鍵控的(即它們必須是鍵/值對的集合),並且它們必須使用相同的鍵類型。
圖 4 中的範例是上面章節中圖 2 範例的延續。在分支為兩個 PCollection
後,一個包含以 'A' 開頭的名稱,另一個包含以 'B' 開頭的名稱,管線將兩者合併為一個單一的 PCollection
,現在其中包含所有以 'A' 或 'B' 開頭的名稱。在此處,使用 Flatten
是有意義的,因為要合併的 PCollection
都包含相同的類型。
圖 4:使用 Flatten 轉換將兩個集合合併為一個集合的管線。
以下程式碼範例應用 Flatten
來合併兩個集合。
//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());
// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);
多個來源
您的管線可以從一個或多個來源讀取其輸入。如果您的管線從多個來源讀取,並且這些來源的資料相關,則將輸入聯結在一起可能很有用。在以下圖 5 中說明的範例中,管線從資料庫表讀取名稱和地址,並從 Kafka 主題讀取名稱和訂單號。然後,管線使用 CoGroupByKey
來聯結此資訊,其中鍵是名稱;結果的 PCollection
包含名稱、地址和訂單的所有組合。
圖 5:執行兩個輸入集合關聯式聯結的管線。
以下程式碼範例應用 Join
來聯結兩個輸入集合。
PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);
PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);
final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();
// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.<String>create());
joinedCollection.apply(...);
下一步
上次更新於 2024/10/31
您找到您要尋找的所有內容了嗎?
這些內容都有用且清楚嗎?您有任何想要變更的內容嗎?請告訴我們!