設計您的管線

本頁面可協助您設計 Apache Beam 管線。 其中包含有關如何判斷管線結構、如何選擇要套用至資料的轉換,以及如何判斷輸入和輸出方法等資訊。

在閱讀本節之前,建議您先熟悉Beam 程式設計指南中的資訊。

設計管線時要考量的因素

在設計 Beam 管線時,請考慮幾個基本問題

基本管線

最簡單的管線代表操作的線性流程,如圖 1 所示。

A linear pipeline starts with one input collection, sequentially appliesthree transforms, and ends with one output collection.

圖 1:線性管線。

但是,您的管線可能會複雜得多。 管線代表步驟的有向非循環圖。 它可以有多個輸入來源、多個輸出接收器,並且其操作 (PTransform) 可以讀取和輸出多個 PCollection。 以下範例顯示您的管線可以採用的不同形狀。

分支 PCollection

重要的是要了解轉換不會耗用 PCollection;而是會考慮 PCollection 的每個個別元素,並建立新的 PCollection 作為輸出。 這樣,您就可以對同一個 PCollection 中的不同元素執行不同的操作。

多個轉換處理相同的 PCollection

您可以將相同的 PCollection 作為多個轉換的輸入,而無需耗用輸入或變更它。

圖 2 中的管線是分支管線。 該管線從資料庫表格讀取其輸入(表示為字串的名字),並建立表格列的 PCollection。 然後,該管線將多個轉換套用至相同PCollection。 轉換 A 會擷取以字母「A」開頭的 PCollection 中所有名稱,而轉換 B 會擷取以字母「B」開頭的 PCollection 中所有名稱。 轉換 A 和 B 都有相同的輸入 PCollection

The pipeline applies two transforms to a single input collection. Eachtransform produces an output collection.

圖 2:分支管線。 兩個轉換套用至單個資料庫表格列的 PCollection。

以下範例程式碼會將兩個轉換套用至單個輸入集合。

PCollection<String> dbRowCollection = ...;

PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("A")){
      c.output(c.element());
    }
  }
}));

PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
  @ProcessElement
  public void processElement(ProcessContext c) {
    if(c.element().startsWith("B")){
      c.output(c.element());
    }
  }
}));

產生多個輸出的單一轉換

分支管線的另一種方法是透過使用標記輸出,讓單一轉換輸出至多個 PCollection。 產生多個輸出的轉換會處理輸入的每個元素一次,並輸出到零或多個 PCollection

圖 3 說明了上述相同的範例,但有一個轉換會產生多個輸出。 以「A」開頭的名稱會新增至主要輸出 PCollection,而以「B」開頭的名稱會新增至額外輸出 PCollection

The pipeline applies one transform that produces multiple output collections.

圖 3:具有產生多個 PCollection 輸出的轉換的管線。

如果我們比較圖 2 和圖 3 中的管線,您可以看到它們以不同的方式執行相同的操作。 圖 2 中的管線包含兩個轉換,它們會處理相同輸入 PCollection 中的元素。 一個轉換使用以下邏輯

if (starts with 'A') { outputToPCollectionA }

而另一個轉換使用

if (starts with 'B') { outputToPCollectionB }

因為每個轉換都會讀取整個輸入的 PCollection,因此輸入 PCollection 中的每個元素都會被處理兩次。

圖 3 中的管線以不同的方式執行相同的操作 - 只使用一個轉換,該轉換使用以下邏輯

if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }

其中輸入 PCollection 中的每個元素都只會被處理一次。

以下程式碼範例應用了一個轉換,該轉換會處理每個元素一次並輸出兩個集合。

// Define two TupleTags, one for each output.
final TupleTag<String> startsWithATag = new TupleTag<String>(){};
final TupleTag<String> startsWithBTag = new TupleTag<String>(){};

PCollectionTuple mixedCollection =
    dbRowCollection.apply(ParDo
        .of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            if (c.element().startsWith("A")) {
              // Emit to main output, which is the output with tag startsWithATag.
              c.output(c.element());
            } else if(c.element().startsWith("B")) {
              // Emit to output with tag startsWithBTag.
              c.output(startsWithBTag, c.element());
            }
          }
        })
        // Specify main output. In this example, it is the output
        // with tag startsWithATag.
        .withOutputTags(startsWithATag,
        // Specify the output with tag startsWithBTag, as a TupleTagList.
                        TupleTagList.of(startsWithBTag)));

// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);

// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

您可以使用任一種機制來產生多個輸出的 PCollection。如果將處理邏輯組合到一個 ParDo 中在邏輯上沒有意義,則建議使用第一種選項。但是,如果每個元素的轉換計算耗時,則使用第二種選項(產生多個輸出的單一轉換)更有意義,而且如果您計劃將來增加更多輸出類型,則更具擴展性。

合併 PCollection

通常,在您通過多個轉換將 PCollection 分支為多個 PCollection 之後,您會希望將部分或所有結果的 PCollection 合併回一起。您可以使用下列其中一種方法來做到這一點

圖 4 中的範例是上面章節中圖 2 範例的延續。在分支為兩個 PCollection 後,一個包含以 'A' 開頭的名稱,另一個包含以 'B' 開頭的名稱,管線將兩者合併為一個單一的 PCollection,現在其中包含所有以 'A' 或 'B' 開頭的名稱。在此處,使用 Flatten 是有意義的,因為要合併的 PCollection 都包含相同的類型。

The pipeline merges two collections into one collection with the Flatten transform.

圖 4:使用 Flatten 轉換將兩個集合合併為一個集合的管線。

以下程式碼範例應用 Flatten 來合併兩個集合。

//merge the two PCollections with Flatten
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
    .apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

多個來源

您的管線可以從一個或多個來源讀取其輸入。如果您的管線從多個來源讀取,並且這些來源的資料相關,則將輸入聯結在一起可能很有用。在以下圖 5 中說明的範例中,管線從資料庫表讀取名稱和地址,並從 Kafka 主題讀取名稱和訂單號。然後,管線使用 CoGroupByKey 來聯結此資訊,其中鍵是名稱;結果的 PCollection 包含名稱、地址和訂單的所有組合。

The pipeline joins two input collections into one collection with the Join transform.

圖 5:執行兩個輸入集合關聯式聯結的管線。

以下程式碼範例應用 Join 來聯結兩個輸入集合。

PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...);

PCollection<KV<String, String>> userOrder = pipeline.apply(KafkaIO.<String, String>read()...);

final TupleTag<String> addressTag = new TupleTag<String>();
final TupleTag<String> orderTag = new TupleTag<String>();

// Merge collection values into a CoGbkResult collection.
PCollection<KV<String, CoGbkResult>> joinedCollection =
  KeyedPCollectionTuple.of(addressTag, userAddress)
                       .and(orderTag, userOrder)
                       .apply(CoGroupByKey.<String>create());

joinedCollection.apply(...);

下一步