使用 Apache Samza 執行器
Apache Samza 執行器可用於使用 Apache Samza 執行 Beam 管道。Samza 執行器在 Samza 應用程式中執行 Beam 管道,並可以在本地執行。該應用程式可以進一步建置為 .tgz 檔案,並部署到 YARN 叢集或具有 Zookeeper 的 Samza 獨立叢集。
Samza 執行器和 Samza 適用於大型、有狀態的串流作業,並提供
- 對本地狀態的一流支援(使用 RocksDB 儲存)。這允許為高頻率串流作業快速存取狀態。
- 具有增量狀態檢查點而非完整快照的容錯能力。這使 Samza 能夠擴展到具有非常大狀態的應用程式。
- 一個完全非同步的處理引擎,可提高遠程呼叫的效率。
- 彈性的部署模型,可在任何具有 Zookeeper 的託管環境中執行應用程式。
- 諸如 Canary、升級和回滾等功能,可支援具有最小停機時間的極大型部署。
Beam 功能矩陣記錄了 Samza 執行器目前支援的功能。
Samza 執行器的先決條件和設定
Samza 執行器建基於 1.0 以上的 Samza 版本。
指定您的依賴項
您可以透過將以下內容新增至您的 pom.xml
來指定對 Samza 執行器的依賴項
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza</artifactId>
<version>2.60.0</version>
<scope>runtime</scope>
</dependency>
<!-- Samza dependencies -->
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
使用 Samza 執行器執行管道
如果您在本機執行您的管道或將其部署到具有所有 jar 和資源檔案的獨立叢集,則無需封裝。例如,以下命令執行 WordCount 範例
$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Psamza-runner \
-Dexec.args="--runner=SamzaRunner \
--inputFile=/path/to/input \
--output=/path/to/counts"
要將您的管道部署到 YARN 叢集,請參閱部署範例 Samza 作業的說明。首先,您需要將您的應用程式 jar 和資源檔案封裝到 .tgz
封存檔案中,並使其可供 Yarn 容器下載。在您的設定中,您需要指定此 TGZ 檔案位置的 URI
yarn.package.path=${your_job_tgz_URI}
job.name=${your_job_name}
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.coordinator.system=${job_coordinator_system}
job.default.system=${job_default_system}
有關設定的更多詳細資訊,請參閱Samza 設定參考。
設定檔將透過設定命令列參數 --configFilePath=/path/to/config.properties
傳入。如此一來,您可以在 Yarn 資源管理器中執行 Beam 管道的主類別,而 Samza 執行器將在後端提交一個 Yarn 作業。
請查看我們在 Github 上的 Samza Beam 範例
Samza 執行器的管道選項
使用 Samza 執行器執行管道時,您可以使用以下管道選項。
欄位 | 描述 | 預設值 |
---|---|---|
runner | 要使用的管道執行器。此選項可讓您在執行時確定管道執行器。 | 設定為 SamzaRunner 以使用 Samza 執行。 |
configFilePath | 使用屬性檔案設定 Samza。 | empty ,即使用本地執行。 |
configFactory | 從設定檔路徑讀取設定檔的工廠。 | PropertiesConfigFactory ,將設定檔讀取為屬性檔案。 |
configOverride | 以程式方式設定的設定檔覆寫。 | empty ,即使用設定檔或本地執行。 |
jobInstance | 作業的實例名稱。 | 1 |
samzaExecutionEnvironment | Samza 應用程式執行環境。如需更多詳細資訊,請參閱 SamzaExecutionEnvironment 。 | LOCAL |
watermarkInterval | 檢查浮水印的間隔(以毫秒為單位)。 | 1000 |
systemBufferSize | 給定系統要緩衝的最大訊息數。 | 5000 |
eventTimerBufferSize | 在 PTransform 的記憶體中緩衝的最大事件時間計時器數。 | 5000 |
maxSourceParallelism | 任何資料來源允許的最大平行度。 | 1 |
storeBatchGetSize | 狀態儲存的批次取得大小限制。 | 10000 |
enableMetrics | 在 Samza 執行器中啟用/停用 Beam 指標。 | true |
stateDurable | 狀態持久化的設定。 | false |
maxBundleSize | 一個 Bundle 中的最大元素數。 | 1 (預設情況下,自動綁定已停用) |
maxBundleTimeMs | 在完成一個 Bundle 之前要等待的最長時間(以毫秒為單位)。 | 1000 |
監控您的作業
您可以使用從 Beam 和 Samza 發出的指標來監控您的管道作業,例如 Beam 來源指標(如 elements_read
和 backlog_elements
)以及 Samza 作業指標(如 job-healthy
和 process-envelopes
)。完整的 Samza 指標清單位於 Samza 指標參考中。您可以在開發中使用 JMX 來檢視您作業的指標,並將指標傳送到繪圖系統(如 Graphite)。有關更多詳細資訊,請參閱Samza 指標。
對於正在執行的 Samza YARN 作業,您可以使用 YARN Web UI 來監控作業狀態並檢查日誌。
上次更新時間為 2024/10/31
您找到您要找的所有內容了嗎?
所有內容都有用且清楚嗎?有任何您想要變更的內容嗎?請告訴我們!