Apache Beam Java SDK 快速入門

此快速入門會示範如何使用 Direct Runner 執行以 Apache Beam Java SDK 編寫的範例管道。Direct Runner 會在您的本機電腦上執行管道。

如果您有興趣貢獻 Apache Beam Java 程式碼庫,請參閱貢獻指南

本頁內容

設定您的開發環境

使用 sdkman 安裝 Java 開發套件 (JDK)。

# Install sdkman
curl -s "https://get.sdkman.io" | bash

# Install Java 17
sdk install java 17.0.5-tem

您可以使用 GradleApache Maven 來執行此快速入門

# Install Gradle
sdk install gradle

# Install Maven
sdk install maven

複製 GitHub 儲存庫

複製或下載 apache/beam-starter-java GitHub 儲存庫,並變更到 beam-starter-java 目錄。

git clone https://github.com/apache/beam-starter-java.git
cd beam-starter-java

執行快速入門

Gradle:若要使用 Gradle 執行快速入門,請執行下列命令

gradle run --args='--inputText=Greetings'

Maven:若要使用 Maven 執行快速入門,請執行下列命令

mvn compile exec:java -Dexec.args=--inputText='Greetings'

輸出類似以下

Hello
World!
Greetings

這些行可能會以不同的順序出現。

探索程式碼

此快速入門的主要程式碼檔案是 App.java (GitHub)。程式碼會執行下列步驟

  1. 建立 Beam 管道。
  2. 建立初始 PCollection
  3. 將轉換套用到 PCollection
  4. 使用 Direct Runner 執行管道。

建立管道

程式碼首先會建立 Pipeline 物件。Pipeline 物件會建立要執行的轉換圖。

var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
var pipeline = Pipeline.create(options);

PipelineOptions 物件可讓您設定管道的各種選項。此範例中顯示的 fromArgs 方法會剖析命令列引數,讓您透過命令列設定管道選項。

建立初始 PCollection

PCollection 抽象化表示可能分散式、多元素的資料集。Beam 管道需要資料來源來填入初始 PCollection。來源可以是有限的 (具有已知的固定大小) 或無限的 (具有無限大小)。

此範例使用 Create.of 方法,從記憶體中的字串陣列建立 PCollection。產生的 PCollection 包含字串「Hello」、「World!」和使用者提供的輸入字串。

return pipeline
  .apply("Create elements", Create.of(Arrays.asList("Hello", "World!", inputText)))

將轉換套用到 PCollection

轉換可以變更、篩選、分組、分析或以其他方式處理 PCollection 中的元素。此範例使用 MapElements 轉換,將集合的元素對應到新的集合

.apply("Print elements",
    MapElements.into(TypeDescriptors.strings()).via(x -> {
      System.out.println(x);
      return x;
    }));

其中

在此範例中,對應函式是一個 lambda,只會傳回原始值。它也會將值列印到 System.out 作為副作用。

執行管道

先前章節中顯示的程式碼會定義一個管道,但尚未處理任何資料。若要處理資料,您需要執行管道

pipeline.run().waitUntilFinish();

Beam 執行器會在特定平台上執行 Beam 管道。此範例使用 Direct Runner,如果您未指定執行器,它會是預設執行器。Direct Runner 會在本機電腦上執行管道。它是為了測試和開發而設計,而不是為了效率而最佳化。如需更多資訊,請參閱使用 Direct Runner

對於生產工作負載,您通常會使用分散式執行器,在 Apache Flink、Apache Spark 或 Google Cloud Dataflow 等巨量資料處理系統上執行管道。這些系統支援大規模並行處理。

下一步

如果您遇到任何問題,請隨時聯絡我們