建立您的管線

您的 Beam 程式會表達從頭到尾的資料處理管線。本節說明如何使用 Beam SDK 中的類別來建立管線。若要使用 Beam SDK 中的類別建構管線,您的程式需要執行下列一般步驟

建立您的管線物件

Beam 程式通常會從建立 Pipeline 物件開始。

在 Beam SDK 中,每個管線都以 Pipeline 類型的明確物件表示。每個 Pipeline 物件都是一個獨立的實體,其中封裝了管線操作的資料以及套用至該資料的轉換。

若要建立管線,請宣告 Pipeline 物件,並傳遞一些組態選項

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

將資料讀取到您的管線中

若要建立管線的初始 PCollection,您需要將根轉換套用到您的管線物件。根轉換會從外部資料來源或您指定的某些本機資料建立 PCollection

Beam SDK 中有兩種根轉換:ReadCreateRead 轉換會從外部來源讀取資料,例如文字檔或資料庫表格。Create 轉換會從記憶體中的 java.util.Collection 建立 PCollection

下列範例程式碼示範如何apply TextIO.Read 根轉換,以從文字檔讀取資料。該轉換會套用到 Pipeline 物件 p,並以 PCollection<String> 的形式傳回管線資料集

PCollection<String> lines = p.apply(
  "ReadLines", TextIO.read().from("gs://some/inputData.txt"));

套用轉換來處理管線資料

您可以使用 Beam SDK 中提供的各種轉換來操作您的資料。若要執行此操作,請將轉換套用到管線的 PCollection,方法是在您想要處理的每個 PCollection 上呼叫 apply 方法,並將所需的轉換物件當作引數傳遞。

下列程式碼示範如何將轉換apply到字串的 PCollection。該轉換是使用者定義的自訂轉換,會反轉每個字串的內容,並輸出包含反轉字串的新 PCollection

輸入是一個名為 wordsPCollection<String>;程式碼會將名為 ReverseWordsPTransform 物件的執行個體傳遞給 apply,並將傳回值儲存為名為 reversedWordsPCollection<String>

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

寫入或輸出最終的管線資料

一旦您的管線套用了其所有轉換,您通常需要輸出結果。若要輸出管線的最終 PCollection,您需要將 Write 轉換套用到該 PCollectionWrite 轉換可以將 PCollection 的元素輸出到外部資料接收器,例如資料庫表格。您可以在管線中的任何時間使用 Write 來輸出 PCollection,但您通常會在管線的末尾寫出資料。

下列範例程式碼示範如何apply TextIO.Write 轉換,以將 StringPCollection 寫入文字檔

PCollection<String> filteredWords = ...;

filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt"));

執行您的管線

建構管線後,請使用 run 方法來執行管線。管線會非同步執行:您建立的程式會將管線的規格傳送至管線執行器,然後該執行器會建構並執行實際的管線操作序列。

p.run();

run 方法是非同步的。如果您希望改為進行封鎖執行,請在執行管線時附加 waitUntilFinish 方法

p.run().waitUntilFinish();

下一步是什麼