Python 多語言管道快速入門
此頁面提供使用 Apache Beam SDK for Python 建立多語言管道的高階概述。如需更全面地了解此主題,請參閱多語言管道。
此快速入門中顯示的程式碼可在可執行的範例集合中取得。
若要建置並執行多語言 Python 管道,您需要一個安裝了 Beam SDK 的 Python 環境。如果沒有設定環境,請先完成Apache Beam Python SDK 快速入門。
「多語言管道」是以一種 Beam SDK 語言建置的管道,並使用來自另一種 Beam SDK 語言的一個或多個轉換。這些「其他語言」的轉換稱為「跨語言轉換」。其目的是讓管道元件更容易在 Beam SDK 之間共用,並擴大所有 SDK 可用的轉換池。在下列範例中,多語言管道是以 Beam Python SDK 建置的,而跨語言轉換則是以 Beam Java SDK 建置的。
建立跨語言轉換
這是一個簡單的 Java 轉換,JavaPrefix,它會在輸入字串中新增前置詞
public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {
final String prefix;
public JavaPrefix(String prefix) {
this.prefix = prefix;
}
class AddPrefixDoFn extends DoFn<String, String> {
@ProcessElement
public void process(@Element String input, OutputReceiver<String> o) {
o.output(prefix + input);
}
}
@Override
public PCollection<String> expand(PCollection<String> input) {
return input
.apply(
"AddPrefix",
ParDo.of(new AddPrefixDoFn()));
}
}
若要將其作為跨語言轉換使用,您必須新增設定物件和建構器。
注意:從 Beam 2.34.0 開始,Python SDK 使用者可以使用一些 Java 轉換,而無需撰寫額外的 Java 程式碼。若要了解更多資訊,請參閱建立跨語言 Java 轉換。
設定物件是一個簡單的 Java 物件 (POJO),其中包含轉換所需的欄位。這是一個範例,JavaPrefixConfiguration
public class JavaPrefixConfiguration {
String prefix;
public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
建構器類別 (在下方實作為JavaPrefixBuilder) 必須實作ExternalTransformBuilder 並覆寫 buildExternal
,它會使用設定物件。
public class JavaPrefixBuilder implements
ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {
@Override
public PTransform<PCollection<String>, PCollection<String>> buildExternal(
JavaPrefixConfiguration configuration) {
return new JavaPrefix(configuration.prefix);
}
}
您還需要新增註冊器類別,以便向擴充服務註冊您的轉換。
@AutoService(ExternalTransformRegistrar.class)
public class JavaPrefixRegistrar implements ExternalTransformRegistrar {
final String URN = "beam:transform:my.beam.test:javaprefix:v1";
@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
return ImmutableMap.of(URN,new JavaPrefixBuilder());
}
}
如此處在JavaPrefixRegistrar中所示,註冊器必須實作ExternalTransformRegistrar,它有一個方法 knownBuilderInstances
。這會傳回一個將唯一的 URN 對應至您的建構器執行個體的對應。您可以使用AutoService註解,以向擴充服務註冊此類別。
選擇擴充服務
在為多語言管道建置工作時,Beam 會使用擴充服務來擴充複合轉換。每個遠端 SDK 至少必須有一個擴充服務。
在大多數情況下,您可以使用預設的 Java ExpansionService。此服務採用單一參數,以指定擴充服務的連接埠。然後由 Python 管道提供位址。
在執行多語言管道之前,您需要建置 Java 跨語言轉換並啟動擴充服務。當您啟動擴充服務時,您需要將相依性新增至類別路徑。您可以使用多個 JAR,但通常建立單一陰影 JAR 會比較容易。Python 和 Java 相依性都將由 Python SDK 為執行器進行分段。
執行擴充服務的步驟會因您的建置工具而異。假設您已建置一個名為 java-prefix-bundled-0.1.jar 的 JAR,則可以使用類似以下的命令來啟動服務,其中 12345
是擴充服務將在其上執行的連接埠
java -jar java-prefix-bundled-0.1.jar 12345
如需執行範例擴充服務的指示,請參閱此 README。
建立 Python 管道
您的 Python 管道現在可以使用ExternalTransform API 來設定您的跨語言轉換。以下是addprefix.py中的一個範例
with beam.Pipeline(options=pipeline_options) as p:
input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)
java_output = (
input
| 'JavaPrefix' >> beam.ExternalTransform(
'beam:transform:my.beam.test:javaprefix:v1',
ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
"localhost:12345"))
def python_prefix(record):
return 'python:%s' % record
output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
output | 'Write' >> WriteToText(output_path)
ExternalTransform
採用三個參數
- 跨語言轉換的 URN
- 有效負載 (可以是位元組字串或PayloadBuilder)
- 擴充服務
URN 只是轉換的唯一 Beam 識別碼,而擴充服務已討論過。PayloadBuilder 是一個新的概念,接下來將討論。
注意:為確保您的 URN 不會與來自其他轉換的 URN 發生衝突,請遵循選取跨語言轉換的 URN中所述的 URN 慣例。
提供有效負載建構器
上述的 Python 管道範例提供了一個ImplicitSchemaPayloadBuilder作為 ExternalTransform
的第二個引數。ImplicitSchemaPayloadBuilder
會建置一個從提供的數值產生綱要的有效負載。在此案例中,提供的數值包含在下列的鍵值對中:{'prefix': 'java:'}
。JavaPrefix
轉換預期會有一個 prefix
引數,而有效負載建構器會傳入字串 java:
,它會附加到每個輸入元素。
有效負載建構器可協助建置擴充請求中轉換的有效負載。您可以改用NamedTupleBasedPayloadBuilder (它會根據具名元組綱要建置有效負載),或是AnnotationBasedPayloadBuilder (它會根據型別註解建置綱要) 來取代 ImplicitSchemaPayloadBuilder
。如需可用有效負載建構器的完整清單,請參閱transforms.external API 參考。
使用標準元素型別
在多語言界限,您必須使用所有 Beam SDK 都了解的元素型別。這些是Beam 標準編碼器所表示的型別
BYTES
STRING_UTF8
KV
BOOL
VARINT
DOUBLE
ITERABLE
TIMER
WINDOWED_VALUE
ROW
對於任意結構型別 (例如,任意 Java 物件),請使用 ROW
(PCollection<Row>
)。您可能需要開發產生 PCollection<Row>
的新 Java 複合轉換。您可以在複合跨語言轉換中使用 SDK 特有的編碼器,只要其他 SDK 所耗用的 PCollection 未使用這些編碼器即可。
執行管道
執行 Python 管道的確切命令會因您的環境而異。假設您的管道程式碼位於名為 addprefix.py 的檔案中,則步驟應該與下方類似。如需更多資訊,請參閱addprefix.py 中的註解。
使用直接執行器執行
在下列命令中,input1
是包含文字行的檔案
python addprefix.py --runner DirectRunner --environment_type=DOCKER --input input1 --output output
使用 Dataflow 執行器執行
下列指令碼會使用來自 Cloud Storage 儲存貯體的範例文字,在 Dataflow 上執行多語言管道。您需要調整指令碼以符合您的環境。
#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCKET=<bucket>
export TEMP_LOCATION=gs://$GCS_BUCKET/tmp
export GCP_REGION=<region>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"
# other commands, e.g. changing into the appropriate directory
gsutil rm gs://$GCS_BUCKET/javaprefix/*
python addprefix.py \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $GCP_PROJECT \
--region $GCP_REGION \
--job_name $JOB_NAME \
--num_workers $NUM_WORKERS \
--input "gs://dataflow-samples/shakespeare/kinglear.txt" \
--output "gs://$GCS_BUCKET/javaprefix/output"