概述

Hazelcast Jet 執行器可用於使用 Hazelcast Jet 執行 Beam 管線。

Jet 執行器和 Jet 適用於大型連續作業,並提供

請務必注意,Jet 執行器目前處於實驗性狀態,無法利用 Jet 中的許多功能

Beam 能力矩陣記錄了 Jet 執行器支援的功能。

使用 Hazelcast Jet 執行器執行 WordCount

產生 Beam 範例專案

只需按照Java 快速入門頁面的指示進行操作

在本地 Jet 集群上執行 WordCount

在 Beam 範例專案中發出以下命令,以啟動新的 Jet 集群並在其上執行 WordCount 範例。

    $ mvn package exec:java \
        -DskipTests \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="\
            --runner=JetRunner \
            --jetLocalMode=3 \
            --inputFile=pom.xml \
            --output=counts" \
        -Pjet-runner

在遠端 Jet 集群上執行 WordCount

當從原型產生時,Beam 範例專案來自特定的已發佈 Beam 版本(這就是 archetypeVersion 屬性的用途)。每個包含 Jet 執行器的 Beam 版本(即從 2.14.0 開始)都使用特定版本的 Jet。因此,當我們啟動獨立的 Jet 集群並嘗試在其上執行 Beam 範例時,需要確保兩者相容。請參閱下表,了解各個 Beam 版本建議使用的 Jet 版本。

Beam 版本相容的 Jet 版本
2.20.0 或更新版本4.x
2.14.0 - 2.19.03.x
2.13.0 或更舊版本不適用

Hazelcast Jet 網站下載與您使用的 Beam 相容的最新 Hazelcast Jet 版本。

下載完成後,您需要啟動 Jet 集群。最簡單的方法是使用下載的 Jet 發行版本隨附的 jet-start 腳本啟動 Jet 集群成員。成員使用自動探索功能 自動探索功能來形成集群。讓我們啟動由兩個成員組成的集群

$ cd hazelcast-jet
$ bin/jet-start.sh &
$ bin/jet-start.sh &
$ cd hazelcast-jet
$ bin/jet-start &
$ bin/jet-start &

檢查集群是否已啟動並執行

$ bin/jet.sh cluster
$ bin/jet cluster

您應該會看到類似的內容

State: ACTIVE
Version: 3.0
Size: 2

ADDRESS                  UUID
[192.168.0.117]:5701     76bea7ba-f032-4c25-ad04-bdef6782f481
[192.168.0.117]:5702     03ecfaa2-be16-41b6-b5cf-eea584d7fb86
State: ACTIVE
Version: 4.0
Size: 2

ADDRESS                  UUID
[192.168.0.117]:5701     b9937bba-32aa-48ba-8e32-423aafed763b
[192.168.0.117]:5702     dfeadfb2-3ba5-4d1c-95e7-71a1a3ca4937

將目錄變更為 Beam 範例專案,並發出以下命令以提交並在遠端 Jet 集群上執行您的管線。請務必將輸入檔案(包含要計數的單字)分發到集群執行的所有機器。否則,單字計數作業將無法讀取資料。

    $ mvn package exec:java \
        -DskipTests \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="\
            --runner=JetRunner \
            --jetServers=192.168.0.117:5701,192.168.0.117:5702 \
            --codeJarPathname=target/word-count-beam-bundled-0.1.jar \
            --inputFile=<INPUT_FILE_AVAILABLE_ON_ALL_CLUSTER_MEMBERS> \
            --output=/tmp/counts" \
        -Pjet-runner

Jet 執行器的管線選項

欄位描述預設值
runner要使用的管線執行器。此選項可讓您在執行階段決定管線執行器。設定為 JetRunner 以使用 Jet 執行。
jetGroupNamejetClusterName要加入的 Hazelcast 群組名稱,本質上是執行器將使用的 Jet 集群 ID。透過群組,可以建立多個集群,每個集群都有自己的群組,並且不會干擾其他集群。 執行器將使用的 Hazelcast 集群名稱。jet
jetServersJet 集群成員的位址清單,當執行器不啟動自己的 Jet 集群,而是使用外部、獨立啟動的集群時需要。格式為逗號分隔的 IP/主機名稱-埠對清單,例如:192.168.0.117:5701,192.168.0.117:5702127.0.0.1:5701
codeJarPathname只有在使用外部 Jet 集群時才需要的屬性,指定包含需要在集群上執行的所有程式碼(因此至少是管線和執行器程式碼)的胖 JAR 的位置。值是 new java.io.File() 接受為參數的任何字串。沒有預設值。
jetLocalMode執行器應在本地啟動的 Jet 集群成員數。如果為 0,則執行器將使用外部集群。如果大於 0,則執行器將使用自己啟動的集群。0
jetDefaultParallelismJet 成員的本機平行處理,即將在每個 Jet 集群成員上建立的 DAG 的每個頂點的處理器數。2
jetProcessorsCooperative布林值旗標,指定是否允許 DoFn 的 Jet 處理器具有協作性(即使用綠色執行緒而不是專用的 OS 執行緒)。如果設定為 true,則除了沒有輸出的處理器(因此假設它們是同步器)之外,所有此類處理器都將是協作的。false