使用 Google Cloud Dataflow 執行器
- Java SDK
- Python SDK
Google Cloud Dataflow 執行器使用Cloud Dataflow 受管理服務。當您使用 Cloud Dataflow 服務執行管線時,執行器會將您的可執行程式碼和相依性上傳到 Google Cloud Storage 儲存空間,並建立 Cloud Dataflow 工作,該工作會在 Google Cloud Platform 中的受管理資源上執行您的管線。
Cloud Dataflow 執行器和服務適用於大規模、連續的工作,並提供
Beam 功能矩陣記錄了 Cloud Dataflow 執行器支援的功能。
Cloud Dataflow 執行器的先決條件與設定
若要使用 Cloud Dataflow 執行器,您必須完成Cloud Dataflow 快速入門中「開始之前」章節中為您選擇的語言設定。
- 選取或建立 Google Cloud Platform 主控台專案。
- 為您的專案啟用計費功能。
- 啟用必要的 Google Cloud API:Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、Cloud Storage JSON 和 Cloud Resource Manager。如果您在管線程式碼中使用其他 API(例如 BigQuery、Cloud Pub/Sub 或 Cloud Datastore),您可能需要啟用這些 API。
- 使用 Google Cloud Platform 進行驗證。
- 安裝 Google Cloud SDK。
- 建立 Cloud Storage 儲存空間。
指定您的相依性
使用 Java 時,您必須在 pom.xml
中指定您對 Cloud Dataflow 執行器的相依性。
本節不適用於適用於 Python 的 Beam SDK。
自行執行的 JAR
本節不適用於適用於 Python 的 Beam SDK。
在某些情況下,例如使用排程器(例如 Apache AirFlow)啟動管線時,您必須擁有獨立的應用程式。除了加入先前章節中顯示的現有相依性之外,您還可以透過在 pom.xml 的 Project 章節中明確加入下列相依性來封裝自行執行的 JAR。
然後,在 Maven JAR 外掛程式中加入 mainClass 名稱。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>YOUR_MAIN_CLASS_NAME</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
執行 mvn package -Pdataflow-runner
後,執行 ls target
,您應該會看到下列輸出(假設您的 artifactId 為 beam-examples
,版本為 1.0.0)。
若要在 Cloud Dataflow 上執行自行執行的 JAR,請使用下列命令。
Cloud Dataflow 執行器的管線選項
使用 Cloud Dataflow 執行器 (Java) 執行管線時,請考慮這些常見的管線選項。 使用 Cloud Dataflow 執行器 (Python) 執行管線時,請考慮這些常見的管線選項。
欄位 | 說明 | 預設值 |
---|---|---|
runner | 要使用的管線執行器。此選項可讓您在執行階段判斷管線執行器。 | 設定為 dataflow 或 DataflowRunner 以在 Cloud Dataflow 服務上執行。 |
project | 您的 Google Cloud 專案的專案 ID。 | 如果未設定,則預設為目前環境中的預設專案。預設專案是透過 gcloud 設定。 |
region | 要建立工作的 Google Compute Engine 區域。 | 如果未設定,則預設為目前環境中的預設區域。預設區域是透過 gcloud 設定。 |
streaming | 是否啟用或停用串流模式;如果啟用則為 true 。如果執行具有無界限 PCollection 的管線,則設定為 true 。 | false |
tempLocation temp_location | 選用。 必要。暫存檔案的路徑。必須是以 gs:// 開頭的有效 Google Cloud Storage URL。如果已設定,則 tempLocation 會用作 gcpTempLocation 的預設值。 | 無預設值。 |
gcpTempLocation | 暫存檔案的 Cloud Storage 儲存空間路徑。必須是以 gs:// 開頭的有效 Cloud Storage URL。 | 如果未設定,則預設為 tempLocation 的值,前提是 tempLocation 是有效的 Cloud Storage URL。如果 tempLocation 不是有效的 Cloud Storage URL,您必須設定 gcpTempLocation 。 |
stagingLocation staging_location | 選用。用於暫存二進位檔案和任何暫存檔案的 Cloud Storage 儲存空間路徑。必須是以 gs:// 開頭的有效 Cloud Storage URL。 | 如果未設定,則預設為 gcpTempLocation 內的暫存目錄。 如果未設定,則預設為 temp_location 內的暫存目錄。 |
save_main_session | 儲存主要工作階段狀態,以便可以還原 __main__ (例如,互動式工作階段)中定義的 pickle 函式和類別。某些工作流程不需要工作階段狀態,例如,如果它們的所有函式/類別都在正確的模組(而非 __main__ )中定義,而且這些模組可以在工作者中匯入。 | false |
sdk_location | 覆寫從中下載 Beam SDK 的預設位置。此值可以是 URL、Cloud Storage 路徑,或是 SDK tarball 的本機路徑。工作流程提交會從此位置下載或複製 SDK tarball。如果設定為字串 default ,則會使用標準 SDK 位置。如果為空白,則不會複製任何 SDK。 | default |
如需其他管線組態選項,請參閱 DataflowPipelineOptions PipelineOptions
介面(和任何子介面)的參考文件。
額外資訊與注意事項
監控您的工作
當您的管線執行時,您可以使用Dataflow 監控介面或Dataflow 命令列介面,監控工作的進度、檢視執行詳細資料,並接收管線結果的更新。
封鎖執行
若要封鎖直到您的工作完成,請在從 pipeline.run()
傳回的 PipelineResult
上呼叫 waitToFinish
wait_until_finish
。Cloud Dataflow 執行器在等待時會列印工作狀態更新和主控台訊息。雖然結果已連線到現有工作,但請注意,從命令列按下 Ctrl+C 不會取消您的工作。若要取消工作,您可以使用Dataflow 監控介面或Dataflow 命令列介面。
串流執行
如果您的管線使用無界限的資料來源或接收器,您必須將 streaming
選項設定為 true
。
使用串流執行時,請記住下列考量事項。
串流管線不會終止,除非使用者明確取消。您可以從Dataflow 監控介面或使用Dataflow 命令列介面(gcloud dataflow jobs cancel 命令)取消您的串流工作。
串流工作預設使用
n1-standard-2
或更高的 Google Compute Engine 機器類型。您不得覆寫此設定,因為n1-standard-2
是執行串流工作所需的最低機器類型。串流執行的價格與批次執行不同。
上次更新於 2024/10/31
您是否已找到您要尋找的所有內容?
這些內容是否都有用且清楚?是否有任何您想要變更的地方?請告訴我們!