Python 多語言管道快速入門

此頁面提供使用 Apache Beam SDK for Python 建立多語言管道的高階概述。如需更全面地了解此主題,請參閱多語言管道

此快速入門中顯示的程式碼可在可執行的範例集合中取得。

若要建置並執行多語言 Python 管道,您需要一個安裝了 Beam SDK 的 Python 環境。如果沒有設定環境,請先完成Apache Beam Python SDK 快速入門

「多語言管道」是以一種 Beam SDK 語言建置的管道,並使用來自另一種 Beam SDK 語言的一個或多個轉換。這些「其他語言」的轉換稱為「跨語言轉換」。其目的是讓管道元件更容易在 Beam SDK 之間共用,並擴大所有 SDK 可用的轉換池。在下列範例中,多語言管道是以 Beam Python SDK 建置的,而跨語言轉換則是以 Beam Java SDK 建置的。

建立跨語言轉換

這是一個簡單的 Java 轉換,JavaPrefix,它會在輸入字串中新增前置詞

public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {

  final String prefix;

  public JavaPrefix(String prefix) {
    this.prefix = prefix;
  }

  class AddPrefixDoFn extends DoFn<String, String> {

    @ProcessElement
    public void process(@Element String input, OutputReceiver<String> o) {
      o.output(prefix + input);
    }
  }

  @Override
  public PCollection<String> expand(PCollection<String> input) {
    return input
        .apply(
            "AddPrefix",
            ParDo.of(new AddPrefixDoFn()));
  }
}

若要將其作為跨語言轉換使用,您必須新增設定物件和建構器。

注意:從 Beam 2.34.0 開始,Python SDK 使用者可以使用一些 Java 轉換,而無需撰寫額外的 Java 程式碼。若要了解更多資訊,請參閱建立跨語言 Java 轉換

設定物件是一個簡單的 Java 物件 (POJO),其中包含轉換所需的欄位。這是一個範例,JavaPrefixConfiguration

public class JavaPrefixConfiguration {

  String prefix;

  public void setPrefix(String prefix) {
    this.prefix = prefix;
  }
}

建構器類別 (在下方實作為JavaPrefixBuilder) 必須實作ExternalTransformBuilder 並覆寫 buildExternal,它會使用設定物件。

public class JavaPrefixBuilder implements
    ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {

    @Override
    public PTransform<PCollection<String>, PCollection<String>> buildExternal(
        JavaPrefixConfiguration configuration) {
      return new JavaPrefix(configuration.prefix);
    }
}

您還需要新增註冊器類別,以便向擴充服務註冊您的轉換。

@AutoService(ExternalTransformRegistrar.class)
public class JavaPrefixRegistrar implements ExternalTransformRegistrar {

  final String URN = "beam:transform:my.beam.test:javaprefix:v1";

  @Override
  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
    return ImmutableMap.of(URN,new JavaPrefixBuilder());
  }
}

如此處在JavaPrefixRegistrar中所示,註冊器必須實作ExternalTransformRegistrar,它有一個方法 knownBuilderInstances。這會傳回一個將唯一的 URN 對應至您的建構器執行個體的對應。您可以使用AutoService註解,以向擴充服務註冊此類別。

選擇擴充服務

在為多語言管道建置工作時,Beam 會使用擴充服務來擴充複合轉換。每個遠端 SDK 至少必須有一個擴充服務。

在大多數情況下,您可以使用預設的 Java ExpansionService。此服務採用單一參數,以指定擴充服務的連接埠。然後由 Python 管道提供位址。

在執行多語言管道之前,您需要建置 Java 跨語言轉換並啟動擴充服務。當您啟動擴充服務時,您需要將相依性新增至類別路徑。您可以使用多個 JAR,但通常建立單一陰影 JAR 會比較容易。Python 和 Java 相依性都將由 Python SDK 為執行器進行分段。

執行擴充服務的步驟會因您的建置工具而異。假設您已建置一個名為 java-prefix-bundled-0.1.jar 的 JAR,則可以使用類似以下的命令來啟動服務,其中 12345 是擴充服務將在其上執行的連接埠

java -jar java-prefix-bundled-0.1.jar 12345

如需執行範例擴充服務的指示,請參閱此 README

建立 Python 管道

您的 Python 管道現在可以使用ExternalTransform API 來設定您的跨語言轉換。以下是addprefix.py中的一個範例

with beam.Pipeline(options=pipeline_options) as p:
  input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)

  java_output = (
      input
      | 'JavaPrefix' >> beam.ExternalTransform(
            'beam:transform:my.beam.test:javaprefix:v1',
            ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
            "localhost:12345"))

  def python_prefix(record):
    return 'python:%s' % record

  output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
  output | 'Write' >> WriteToText(output_path)

ExternalTransform 採用三個參數

URN 只是轉換的唯一 Beam 識別碼,而擴充服務已討論過。PayloadBuilder 是一個新的概念,接下來將討論。

注意:為確保您的 URN 不會與來自其他轉換的 URN 發生衝突,請遵循選取跨語言轉換的 URN中所述的 URN 慣例。

提供有效負載建構器

上述的 Python 管道範例提供了一個ImplicitSchemaPayloadBuilder作為 ExternalTransform 的第二個引數。ImplicitSchemaPayloadBuilder 會建置一個從提供的數值產生綱要的有效負載。在此案例中,提供的數值包含在下列的鍵值對中:{'prefix': 'java:'}JavaPrefix 轉換預期會有一個 prefix 引數,而有效負載建構器會傳入字串 java:,它會附加到每個輸入元素。

有效負載建構器可協助建置擴充請求中轉換的有效負載。您可以改用NamedTupleBasedPayloadBuilder (它會根據具名元組綱要建置有效負載),或是AnnotationBasedPayloadBuilder (它會根據型別註解建置綱要) 來取代 ImplicitSchemaPayloadBuilder。如需可用有效負載建構器的完整清單,請參閱transforms.external API 參考

使用標準元素型別

在多語言界限,您必須使用所有 Beam SDK 都了解的元素型別。這些是Beam 標準編碼器所表示的型別

對於任意結構型別 (例如,任意 Java 物件),請使用 ROW (PCollection<Row>)。您可能需要開發產生 PCollection<Row> 的新 Java 複合轉換。您可以在複合跨語言轉換中使用 SDK 特有的編碼器,只要其他 SDK 所耗用的 PCollection 未使用這些編碼器即可。

執行管道

執行 Python 管道的確切命令會因您的環境而異。假設您的管道程式碼位於名為 addprefix.py 的檔案中,則步驟應該與下方類似。如需更多資訊,請參閱addprefix.py 中的註解

使用直接執行器執行

在下列命令中,input1 是包含文字行的檔案

python addprefix.py --runner DirectRunner --environment_type=DOCKER --input input1 --output output

使用 Dataflow 執行器執行

下列指令碼會使用來自 Cloud Storage 儲存貯體的範例文字,在 Dataflow 上執行多語言管道。您需要調整指令碼以符合您的環境。

#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCKET=<bucket>
export TEMP_LOCATION=gs://$GCS_BUCKET/tmp
export GCP_REGION=<region>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"

# other commands, e.g. changing into the appropriate directory

gsutil rm gs://$GCS_BUCKET/javaprefix/*

python addprefix.py \
    --runner DataflowRunner \
    --temp_location $TEMP_LOCATION \
    --project $GCP_PROJECT \
    --region $GCP_REGION \
    --job_name $JOB_NAME \
    --num_workers $NUM_WORKERS \
    --input "gs://dataflow-samples/shakespeare/kinglear.txt" \
    --output "gs://$GCS_BUCKET/javaprefix/output"