Java 多語言管線快速入門
此頁面提供使用 Apache Beam SDK for Java 建立多語言管線的高階概述。如需更完整的討論,請參閱多語言管線。
多語言管線是一種以一種 Beam SDK 語言建置,並使用來自另一種 Beam SDK 語言的一或多個轉換的管線。這些來自另一個 SDK 的轉換稱為跨語言轉換。多語言支援讓管線元件更容易在 Beam SDK 之間共用,並增加所有 SDK 可用的轉換池。
在以下範例中,多語言管線是使用 Beam Java SDK 建置的,而跨語言轉換是使用 Beam Python SDK 建置的。
先決條件
此快速入門基於 Java 範例管線 PythonDataframeWordCount,用於計算莎士比亞文本中的單字。如果您想執行該管線,可以複製或下載 Beam 儲存庫,並從原始碼建置範例。
若要建置和執行範例,您需要安裝 Beam Java SDK 2.41.0 或更高版本的 Java 環境,以及 Python 環境。如果您尚未設定這些環境,請先完成Apache Beam Java SDK 快速入門和Apache Beam Python SDK 快速入門。
若要使用可攜式 DirectRunner 執行,您需要在本機安裝 Docker,且 Docker 精靈應該正在執行。Dataflow 不需要此步驟。
若要在 Dataflow 上執行,您需要啟用計費的 Google Cloud 專案和Google Cloud Storage 儲存貯體。
此範例依賴 Python pandas 套件 1.4.0 或更高版本,該版本不適用於早於 3.8 的 Python 版本。因此,請確保您系統中安裝的預設 Python 版本為 3.8 或更高版本。
指定跨語言轉換
Java 範例管線使用 Python DataframeTransform 作為跨語言轉換。此轉換是 Beam Dataframe API 的一部分,用於處理類似 pandas 的 DataFrame 物件。
若要套用跨語言轉換,您的管線必須指定它。Python 轉換是透過其完整限定名稱來識別的。例如,DataframeTransform
可以在 apache_beam.dataframe.transforms
套件中找到,因此其完整限定名稱為 apache_beam.dataframe.transforms.DataframeTransform
。範例管線 PythonDataframeWordCount 將此完整限定名稱傳遞至 PythonExternalTransform。
注意:範例管線旨在示範使用任意 Python 跨語言轉換的 Java 多語言管線的開發。若要在 Java 中生產使用 Dataframe API,您應該改用更高階的 DataframeTransform。
以下是範例中的完整管線定義
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWordsFn()))
.setRowSchema(ExtractWordsFn.SCHEMA)
.apply(
PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
"apache_beam.dataframe.transforms.DataframeTransform",
options.getExpansionService())
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
.withKwarg("include_indexes", true))
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
PythonExternalTransform
是用於叫用外部 Python 轉換的包裝函式。from
方法接受兩個字串:1) 完整限定轉換名稱;2) 擴充服務的可選位址和連接埠號碼。此方法會傳回可在 Java 管線中直接使用的 Python 跨語言轉換的 Stub。withKwarg
會指定用於具現化 Python 跨語言轉換的關鍵字引數。在此案例中,會叫用 withKwarg
兩次,以指定 func
引數和 include_indexes
引數,而這些引數會傳遞至 DataframeTransform
。PythonExternalTransform
也提供其他方式來指定 Python 跨語言轉換的 args 和 kwargs。
若要了解此管線的運作方式,仔細查看第一個 withKwarg
叫用會很有幫助
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
PythonCallableSource.of
的引數是 Python lambda 函數的字串表示法。DataframeTransform
會採用 Python 可呼叫的引數,以將其套用至 PCollection
,就像它是一個 Dataframe 一樣。withKwarg
方法可讓您在 Java 管線中指定 Python 可呼叫的項目。若要深入了解將函數傳遞至 DataframeTransform
,請參閱在管線中嵌入 DataFrames。
執行 Java 管線
如果您想要自訂環境或使用預設 Beam SDK 中不可用的轉換,您可能需要執行自己的擴充服務。在這種情況下,請在執行管線之前啟動擴充服務。
在執行管線之前,請務必為您選取的 Beam 執行器執行執行器特定設定。
使用 Maven Archetype (Beam 2.43.0 或更高版本) 與 Dataflow 執行器一起執行
- 查看相關 Beam 版本的 Beam 範例 Maven 原型。
export BEAM_VERSION=<Beam version>
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=$BEAM_VERSION \
-DgroupId=org.example \
-DartifactId=multi-language-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
- 執行管線。
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.PythonDataframeWordCount \
-Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output" \
-Pdataflow-runner
在 HEAD 使用 Dataflow 執行器執行
以下指令碼會使用 Cloud Storage 儲存貯體中的範例文字,在 Dataflow 上執行範例多語言管線。您需要根據您的環境調整指令碼。
export GCP_PROJECT=<project>
export OUTPUT_BUCKET=<bucket>
export GCP_REGION=<region>
export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--output=gs://${OUTPUT_BUCKET}/count \
--region=${GCP_REGION}"
管線會將結果輸出到 gs://$OUTPUT_BUCKET/count-00000-of-00001 的檔案中。
使用 DirectRunner 執行
注意:多語言管線需要使用可攜式執行器。可攜式 DirectRunner 仍處於實驗階段,並不支援所有 Beam 功能。
- 使用已安裝最新版 Beam Python SDK 的 Python 虛擬環境。請參閱此處以取得指示。
- 執行可攜式 DirectRunner 的工作伺服器(在 Python 中實作)。
export JOB_SERVER_PORT=<port>
python -m apache_beam.runners.portability.local_job_service_main -p $JOB_SERVER_PORT
在另一個 Shell 中,移至 Beam HEAD Git 複製。
建置用於本機管線執行的 Beam Java SDK 容器(本指南要求您的 JAVA_HOME 設定為 Java 11)。
./gradlew :sdks:java:container:java11:docker -Pjava11Home=$JAVA_HOME
- 執行管線。
export JOB_SERVER_PORT=<port> # Same port as before
export OUTPUT_FILE=<local relative path>
./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=PortableRunner \
--jobEndpoint=localhost:$JOB_SERVER_PORT \
--output=$OUTPUT_FILE"
注意 此輸出會寫入 Python Docker 容器的本機檔案系統。若要藉由寫入 GCS 來驗證輸出,您需要為
output
選項指定可公開存取的 GCS 路徑,因為可攜式 DirectRunner 目前無法正確轉發本機認證以存取 GCS。
進階:啟動擴充服務
當為多語言管道建立任務時,Beam 會使用擴充服務來擴充複合轉換。每個遠端 SDK 至少需要一個擴充服務。
在一般情況下,如果您的系統上安裝了受支援版本的 Python,您可以讓 PythonExternalTransform
處理建立和啟動擴充服務的詳細資訊。但是,如果您想要自訂環境或使用預設 Beam SDK 中不可用的轉換,您可能需要執行自己的擴充服務。
例如,要啟動 Python 轉換的標準擴充服務,ExpansionServiceServicer,請按照以下步驟操作
按照這些說明啟動新的虛擬環境。
安裝包含
gcp
和dataframe
套件的 Apache Beam。
pip install 'apache-beam[gcp,dataframe]'
- 執行以下命令
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
此命令會執行 expansion_service_main.py,該程式會啟動標準擴充服務。當您使用 Gradle 執行 Java 管道時,可以使用 expansionService
選項指定擴充服務。例如:--expansionService=localhost:<PORT>
。
下一步
若要深入瞭解 Beam 對跨語言管道的支援,請參閱多語言管道。若要深入瞭解 Beam DataFrame API,請參閱Beam DataFrames 概述。