Java 的 WordCount 快速入門

此快速入門將示範如何設定 Java 開發環境,並使用您選擇的執行器,執行使用 Apache Beam Java SDK 撰寫的範例管道

如果您有興趣貢獻 Apache Beam Java 程式碼庫,請參閱貢獻指南

本頁內容

設定您的開發環境

  1. 下載並安裝 Java 開發套件 (JDK) 8、11 或 17 版。確認已設定 JAVA_HOME 環境變數,並且指向您的 JDK 安裝位置。
  2. 依照您的作業系統的安裝指南下載並安裝 Apache Maven
  3. 選用:如果您想要將您的 Maven 專案轉換為 Gradle,請安裝 Gradle

取得範例程式碼

  1. 產生一個 Maven 範例專案,該專案會針對最新的 Beam 版本建置

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.60.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
       
    mvn archetype:generate `
      -D archetypeGroupId=org.apache.beam `
      -D archetypeArtifactId=beam-sdks-java-maven-archetypes-examples `
      -D archetypeVersion=2.60.0 `
      -D groupId=org.example `
      -D artifactId=word-count-beam `
      -D version="0.1" `
      -D package=org.apache.beam.examples `
      -D interactiveMode=false
       

    Maven 會在 word-count-beam 目錄中建立一個新的專案。

  2. 變更為 word-count-beam

    cd word-count-beam/
       
    cd .\word-count-beam
       
    此目錄包含一個 pom.xml 和一個包含範例管道的 src 目錄。

  3. 列出範例管道

    ls src/main/java/org/apache/beam/examples/
       
    dir .\src\main\java\org\apache\beam\examples
       
    您應該會看到以下範例

    本教學課程中使用的範例 WordCount.java 定義了一個 Beam 管道,該管道會計算輸入檔案中的字詞(預設為包含莎士比亞「李爾王」的 .txt 檔案)。若要深入了解範例,請參閱WordCount 範例逐步解說

選用:從 Maven 轉換為 Gradle

以下步驟說明如何將建置從 Maven 轉換為 Gradle,以用於下列執行器

其他執行器的轉換過程類似。如需其他指引,請參閱從 Apache Maven 遷移建置

  1. 在包含 pom.xml 檔案的目錄中,執行自動化的 Maven 到 Gradle 轉換
    gradle init
       
    系統會詢問您是否要產生 Gradle 建置。輸入 yes。系統也會提示您選擇 DSL (Groovy 或 Kotlin)。在本教學課程中,請輸入 2 以選擇 Kotlin。
  2. 開啟產生的 build.gradle.kts 檔案,並進行下列變更
    1. repositories 中,將 mavenLocal() 取代為 mavenCentral()
    2. repositories 中,為 Confluent Kafka 相依性宣告一個儲存庫
      maven {
          url = uri("https://packages.confluent.io/maven/")
      }
            
    3. 在建置指令碼的結尾,新增下列條件相依性
      if (project.hasProperty("dataflow-runner")) {
          dependencies {
              runtimeOnly("org.apache.beam:beam-runners-google-cloud-dataflow-java:2.60.0")
          }
      }
            
    4. 在建置指令碼的結尾,新增下列工作
      tasks.register<JavaExec>("execute") {
        mainClass.set(System.getProperty("mainClass"))
        classpath = sourceSets.main.get().runtimeClasspath
      }
            
  3. 建置您的專案
    gradle build
       

取得範例文字

如果您計劃使用 DataflowRunner,您可以略過此步驟。執行器會直接從 Google Cloud Storage 中提取文字。

  1. word-count-beam 目錄中,建立一個名為 sample.txt 的檔案。
  2. 將一些文字新增至該檔案。在此範例中,使用莎士比亞李爾王的文字。

執行管道

單一 Beam 管道可以在多個 Beam 執行器上執行。DirectRunner 非常適合入門,因為它會在您的機器上執行,而且不需要特定的設定。如果您只是在試用 Beam,而且不確定要使用哪一個執行器,請使用DirectRunner

執行管道的一般流程如下

  1. 完成任何執行器特定的設定。
  2. 建置您的命令列
    1. 使用 --runner=<執行器> 指定執行器(預設為 DirectRunner)。
    2. 新增任何執行器特定的必要選項。
    3. 選擇執行器可以存取的輸入檔案和輸出位置。(例如,如果您在外部叢集上執行管道,您就無法存取本機檔案。)
  3. 執行命令。

若要執行 WordCount 管道

  1. 請依照您執行器的設定步驟

    DirectRunner 將在無需額外設定的情況下運作。

  2. 執行下方對應的 Maven 或 Gradle 命令。

使用 Maven 執行 WordCount

針對 Unix Shell

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--inputFile=sample.txt --output=counts" -Pdirect-runner
mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                 --inputFile=sample.txt --output=/tmp/counts" -Pflink-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=SparkRunner --inputFile=sample.txt --output=counts" -Pspark-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
                 --region=<your-gcp-region> \
                 --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                 --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
    -Pdataflow-runner
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--inputFile=sample.txt --output=/tmp/counts --runner=SamzaRunner" -Psamza-runner
mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
    --runner=NemoRunner --inputFile=`pwd`/sample.txt --output=counts
mvn package -Pjet-runner
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
    --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/sample.txt --output=counts

針對 Windows PowerShell

mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--inputFile=sample.txt --output=counts" -P direct-runner
mvn package exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=.\target\word-count-beam-bundled-0.1.jar `
               --inputFile=C:\path\to\quickstart\sample.txt --output=C:\tmp\counts" -P flink-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=SparkRunner --inputFile=sample.txt --output=counts" -P spark-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
 -D exec.args="--runner=DataflowRunner --project=<your-gcp-project> `
               --region=<your-gcp-region> \
               --gcpTempLocation=gs://<your-gcs-bucket>/tmp `
               --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" `
 -P dataflow-runner
mvn compile exec:java -D exec.mainClass=org.apache.beam.examples.WordCount `
    -D exec.args="--inputFile=sample.txt --output=/tmp/counts --runner=SamzaRunner" -P samza-runner
mvn package -P nemo-runner -DskipTests
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount `
    --runner=NemoRunner --inputFile=`pwd`/sample.txt --output=counts
mvn package -P jet-runner
java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount `
    --runner=JetRunner --jetLocalMode=3 --inputFile=$pwd/sample.txt --output=counts

使用 Gradle 執行 WordCount

針對 Unix Shell

gradle clean execute -DmainClass=org.apache.beam.examples.WordCount \
    --args="--inputFile=sample.txt --output=counts"
TODO: document FlinkCluster on Gradle: https://github.com/apache/beam/issues/21499
TODO: document Spark on Gradle: https://github.com/apache/beam/issues/21502
gradle clean execute -DmainClass=org.apache.beam.examples.WordCount \
    --args="--project=<your-gcp-project> --inputFile=gs://apache-beam-samples/shakespeare/* \
    --output=gs://<your-gcs-bucket>/counts --runner=DataflowRunner" -Pdataflow-runner
TODO: document Samza on Gradle: https://github.com/apache/beam/issues/21500
TODO: document Nemo on Gradle: https://github.com/apache/beam/issues/21503
TODO: document Jet on Gradle: https://github.com/apache/beam/issues/21501

檢查結果

管道完成後,您可以檢視輸出。可能會有多個以 count 為字首的輸出檔案。輸出檔案的數量由執行器決定,讓執行器可以彈性地執行有效率的分散式執行。

  1. 在 Unix Shell 中檢視輸出檔案
    ls counts*
       
    ls /tmp/counts*
       
    ls counts*
       
    gsutil ls gs://<your-gcs-bucket>/counts*
       
    ls /tmp/counts*
       
    ls counts*
       
    ls counts*
       
    輸出檔案包含獨特的字詞和每個字詞的出現次數。
  2. 在 Unix Shell 中檢視輸出內容
    more counts*
       
    more /tmp/counts*
       
    more counts*
       
    gsutil cat gs://<your-gcs-bucket>/counts*
       
    more /tmp/counts*
       
    more counts*
       
    more counts*
       
    為了讓執行器能夠最佳化效率,無法保證元素的順序。但輸出應該會如下所示
    ...
    Think: 3
    slower: 1
    Having: 1
    revives: 1
    these: 33
    wipe: 1
    arrives: 1
    concluded: 1
    begins: 3
    ...
    

下一步

如果您遇到任何問題,請隨時與我們聯絡