使用 Direct Runner

Direct Runner 在您的機器上執行管道,其設計目的是盡可能驗證管道是否符合 Apache Beam 模型。 Direct Runner 不專注於有效率的管道執行,而是執行額外的檢查,以確保使用者不會依賴模型不保證的語意。 其中一些檢查包括

使用 Direct Runner 進行測試和開發有助於確保管道在不同的 Beam 執行器上都具有穩健性。 此外,當管道在遠端叢集上執行時,除錯失敗的執行可能是一項不小的任務。 相反,對管道程式碼執行本機單元測試通常更快且更簡單。 在本機對管道進行單元測試也允許您使用偏好的本機除錯工具。

以下是一些關於如何測試管道的資源。

Direct Runner 並非設計用於生產管道,因為它是針對正確性而非效能進行最佳化。 Direct Runner 必須將所有使用者資料放入記憶體中,而 Flink 和 Spark 執行器如果記憶體不足,則可以將資料溢出到磁碟。 因此,Flink 和 Spark 執行器能夠執行較大的管道,並且更適合生產工作負載。

Direct Runner 的先決條件和設定

指定您的依賴項

使用 Java 時,您必須在 pom.xml 中指定您對 Direct Runner 的依賴項。

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.60.0</version>
   <scope>runtime</scope>
</dependency>

本節不適用於 Beam SDK for Python。

Direct Runner 的管道選項

如需有關如何設定管道選項的常規說明,請參閱程式設計指南

從命令列執行管道時,請將 runner 設定為 directDirectRunner。 其他管道選項的預設值通常就足夠了。

請參閱 DirectOptions DirectOptions 介面的參考文件,以了解預設值和其他管道組態選項。

其他資訊和注意事項

記憶體考量

本機執行受本機環境中可用記憶體的限制。 強烈建議您使用足以放入本機記憶體的小型資料集執行管道。 您可以使用 CreateCreate 轉換來建立小型記憶體資料集,或者您可以使用 ReadRead 轉換來處理小型本機或遠端檔案。

串流執行

Python DirectRunner 的串流支援有限。 如需已知問題,請參閱:https://github.com/apache/beam/issues/24528

如果您的管道使用無界資料來源或接收器,則必須將 streaming 選項設定為 true

平行執行

Python FnApiRunner 支援多執行緒和多處理模式。

設定平行度

工作執行緒的數量由 targetParallelism 管道選項定義。 預設情況下,targetParallelism 是可用處理器數量和 3 中較大者。

執行緒或子程序數量由設定 direct_num_workers 管道選項定義。 從 2.22.0 開始,支援 direct_num_workers = 0。 當 direct_num_workers 設定為 0 時,它會將執行緒/子程序的數量設定為執行管道的機器核心數。

設定執行模式

在 Beam 2.19.0 和更新版本中,您可以使用 direct_running_mode 管道選項來設定執行模式。 direct_running_mode 可以是 ['in_memory''multi_threading''multi_processing'] 之一。

in_memory:執行器和工作者的通訊在記憶體中進行 (不是透過 gRPC)。 這是預設模式。

multi_threading:執行器和工作者透過 gRPC 進行通訊,每個工作者都在一個執行緒中執行。

multi_processing:執行器和工作者透過 gRPC 進行通訊,每個工作者都在一個子程序中執行。

在將管道部署到遠端執行器之前

雖然在 direct runner 上進行測試很方便,但它仍然可能與遠端執行器的行為不同,超出 Beam 模型語意範圍,特別是針對執行階段環境相關的問題。 一般而言,建議在完全部署到生產環境之前,先在目標遠端執行器上以小規模測試您的管道。