Beam YAML API

Beam YAML 是一種宣告式語法,可使用 YAML 檔案描述 Apache Beam 管線。您可以使用 Beam YAML 來撰寫和執行 Beam 管線,而無需編寫任何程式碼。

概觀

Beam 提供了一個強大的模型來建立複雜的資料處理管線。但是,開始使用 Beam 程式設計可能會很有挑戰性,因為它需要在支援的 Beam SDK 語言之一中編寫程式碼。您需要了解 API、設定專案、管理相依性以及執行其他程式設計任務。

Beam YAML 可以更輕鬆地開始建立 Beam 管線。您可以不用編寫程式碼,而是使用任何文字編輯器建立 YAML 檔案。然後您提交 YAML 檔案以由執行器執行。

Beam YAML 語法設計為人類可讀,但也適合做為工具的中繼表示法。例如,管線撰寫 GUI 可以輸出 YAML,或者譜系分析工具可以使用 YAML 管線規格。

Beam YAML 仍在開發中,但已包含的任何功能都被視為穩定。歡迎透過 dev@apache.beam.org 提供意見回饋。

先決條件

Beam YAML 剖析器目前包含在 Apache Beam Python SDK 中。您不需要編寫 Python 程式碼來使用 Beam YAML,但您需要 SDK 才能在本地執行管線。

我們建議建立一個虛擬環境,以便所有套件都安裝在隔離且獨立的環境中。設定好 Python 環境後,請依照下列步驟安裝 SDK

pip install apache_beam[yaml,gcp]

此外,一些提供的轉換(例如 SQL 轉換)是以 Java 實作,並且需要可用的 Java 直譯器。當您執行具有這些轉換的管線時,所需的成品會自動從 Apache Maven 儲存庫下載。

開始使用

使用文字編輯器建立一個名為 pipeline.yaml 的檔案。將以下文字貼到檔案中並儲存

pipeline:
  transforms:
    - type: Create
      config:
        elements: [1, 2, 3]
    - type: LogForTesting
      input: Create

此檔案定義一個具有兩個轉換的簡單管線

執行管線

若要執行管線,請執行下列 Python 命令

python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml

輸出應包含類似以下的日誌陳述式

INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}

在 Dataflow 中執行管線

您可以使用 gcloud CLI 將 YAML 管線提交到 Dataflow。若要從 YAML 檔案建立 Dataflow 工作,請使用 gcloud dataflow yaml run 命令

gcloud dataflow yaml run $JOB_NAME \
  --yaml-pipeline-file=pipeline.yaml \
  --region=$REGION

當您使用 gcloud CLI 時,您不需要在本地安裝 Beam SDK。

視覺化管線

您可以使用 apache_beam.runners.render 模組將管線執行圖呈現為 PNG 檔案,如下所示

  1. 安裝 Graphviz

  2. 執行以下命令

    python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \
      --runner=apache_beam.runners.render.RenderRunner \
      --render_output=out.png
    

範例:讀取 CSV 資料

以下管線從一組 CSV 檔案讀取資料,並以 JSON 格式寫入資料。此管線假設 CSV 檔案具有標題列。欄名會變成 JSON 欄位名稱。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: ReadFromCsv

新增篩選器

Filter 轉換會篩選記錄。它會保留符合布林述詞的輸入記錄,並捨棄不符合述詞的記錄。以下範例會保留 col3 的值大於 100 的記錄

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
      input: ReadFromCsv
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: Filter

新增對應函數

Beam YAML 支援各種對應函數。以下範例使用 Sql 轉換,依 col1 分組並輸出每個鍵的計數。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
      input: ReadFromCsv
    - type: Sql
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: Filter
    - type: WriteToJson
      config:
        path: /path/to/output.json
      input: Sql

模式

本節說明 Beam YAML 中一些常見的模式。

已命名轉換

您可以命名管線中的轉換,以協助監控和偵錯。如果管線包含多個相同類型的轉換,名稱也可用於消除轉換的歧義。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadMyData
      config:
        path: /path/to/input*.csv
    - type: Filter
      name: KeepBigRecords
      config:
        language: python
        keep: "col3 > 100"
      input: ReadMyData
    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: KeepBigRecords
    - type: WriteToJson
      name: WriteTheOutput
      config:
        path: /path/to/output.json
      input: MySqlTransform

鏈結轉換

如果管線是線性的(沒有分支或合併),您可以將管線指定為 chain 類型。在 chain 類型管線中,您不需要指定輸入。輸入是從它們在 YAML 檔案中出現的順序隱含的

pipeline:
  type: chain

  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"
    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
    - type: WriteToJson
      config:
        path: /path/to/output.json

來源和接收器轉換

作為語法糖,您可以將管線中的第一個和最後一個轉換分別命名為 sourcesink。此慣例不會變更產生的管線,但它會發出來源和接收器轉換的意圖。

pipeline:
  type: chain

  source:
    type: ReadFromCsv
    config:
      path: /path/to/input*.csv

  transforms:
    - type: Filter
      config:
        language: python
        keep: "col3 > 100"

    - type: Sql
      name: MySqlTransform
      config:
        query: "select col1, count(*) as cnt from PCOLLECTION group by col1"

  sink:
    type: WriteToJson
    config:
      path: /path/to/output.json

非線性管線

Beam YAML 支援任意的非線性管線。以下管線讀取兩個來源,將它們聯結起來,並寫入兩個輸出

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadLeft
      config:
        path: /path/to/left*.csv

    - type: ReadFromCsv
      name: ReadRight
      config:
        path: /path/to/right*.csv

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight

    - type: WriteToJson
      name: WriteAll
      input: Sql
      config:
        path: /path/to/all.json

    - type: Filter
      name: FilterToBig
      input: Sql
      config:
        language: python
        keep: "col2 > 100"

    - type: WriteToCsv
      name: WriteBig
      input: FilterToBig
      config:
        path: /path/to/big.csv

由於管線不是線性的,您必須明確宣告每個轉換的輸入。但是,您可以在非線性管線中巢狀化 chain。鏈結是管線內的線性子路徑。

以下範例建立一個名為 ExtraProcessingForBigRows 的鏈結。鏈結從 Sql 轉換取得輸入,並套用幾個額外的篩選器以及接收器。請注意,在鏈結內,不需要指定輸入。

pipeline:
  transforms:
    - type: ReadFromCsv
      name: ReadLeft
      config:
        path: /path/to/left*.csv

    - type: ReadFromCsv
      name: ReadRight
      config:
        path: /path/to/right*.csv

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight

    - type: WriteToJson
      name: WriteAll
      input: Sql
      config:
        path: /path/to/all.json

    - type: chain
      name: ExtraProcessingForBigRows
      input: Sql
      transforms:
        - type: Filter
          config:
            language: python
            keep: "col2 > 100"
        - type: Filter
          config:
            language: python
            keep: "len(col1) > 10"
        - type: Filter
          config:
            language: python
            keep: "col1 > 'z'"
      sink:
        type: WriteToCsv
        config:
          path: /path/to/big.csv

視窗化

此 API 可用於定義串流和批次管線。為了在串流管線中有意義地彙總元素,通常需要某種類型的視窗化。可以使用所有其他 Beam SDK 中可用的相同 WindowInto 轉換來宣告 Beam 的視窗化觸發

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: JSON
        schema:
          type: object
          properties:
            col1: {type: string}
            col2: {type: integer}
            col3: {type: number}
    - type: WindowInto
      windowing:
        type: fixed
        size: 60s
    - type: SomeGroupingTransform
      config:
        arg: ...
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

您可以不用使用明確的 WindowInto 作業,而是使用指定的視窗化標記轉換,這會導致其輸入(因此轉換本身)套用該視窗化。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: SomeGroupingTransform
      config:
        arg: ...
      windowing:
        type: sliding
        size: 60s
        period: 10s
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

請注意,Sql 作業本身通常是一種彙總形式,並且套用視窗化(或使用已視窗化的輸入)會導致每個視窗完成所有分組。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
      windowing:
        type: sessions
        gap: 60s
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

指定的視窗化會套用至所有輸入,在本例中會產生每個視窗的聯結。

pipeline:
  transforms:
    - type: ReadFromPubSub
      name: ReadLeft
      config:
        topic: leftTopic
        format: ...
        schema: ...

    - type: ReadFromPubSub
      name: ReadRight
      config:
        topic: rightTopic
        format: ...
        schema: ...

    - type: Sql
      config:
        query: select A.col1, B.col2 from A join B using (col3)
      input:
        A: ReadLeft
        B: ReadRight
      windowing:
        type: fixed
        size: 60s
options:
  streaming: true

對於沒有輸入的轉換,指定的視窗化會改為套用至其輸出。根據 Beam 模型,視窗化會由所有使用作業繼承。這對於 Read 等根作業特別有用。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
      windowing:
        type: fixed
        size: 60s
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

也可以在管線(或複合)的最上層指定視窗化,這是將相同的視窗化套用至所有未另行指定自己的視窗化的根作業的簡寫。此方法是在管線中的任何地方套用視窗的有效方法。

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
  windowing:
    type: fixed
    size: 60
options:
  streaming: true

請注意,所有這些視窗化規格也與 sourcesink 語法相容

pipeline:
  type: chain

  source:
    type: ReadFromPubSub
    config:
      topic: myPubSubTopic
      format: ...
      schema: ...
    windowing:
      type: fixed
      size: 10s

  transforms:
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"

  sink:
    type: WriteToCsv
    config:
      path: /path/to/output.json
    windowing:
      type: fixed
      size: 5m

options:
  streaming: true

供應商

儘管我們的目標是提供一套大型的內建轉換,但人們不可避免地會想要撰寫自己的轉換。透過利用擴充服務和架構轉換的供應商概念,可以實現這一點。

例如,您可以建置一個 jar,以提供跨語言轉換架構轉換,然後在轉換中使用它,如下所示

pipeline:
  type: chain
  source:
    type: ReadFromCsv
    config:
      path: /path/to/input*.csv

  transforms:
    - type: MyCustomTransform
      config:
        arg: whatever

  sink:
    type: WriteToJson
    config:
      path: /path/to/output.json

providers:
  - type: javaJar
    config:
       jar: /path/or/url/to/myExpansionService.jar
    transforms:
       MyCustomTransform: "urn:registered:in:expansion:service"

也可以使用語法提供任意 Python 轉換

providers:
  - type: pythonPackage
    config:
       packages:
           - my_pypi_package>=version
           - /path/to/local/package.zip
    transforms:
       MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"

管線選項

管線選項用於設定管線的不同方面,例如將執行管線的管線執行器,以及所選執行器所需的任何執行器特定組態。若要設定管線選項,請在 YAML 檔案的結尾附加選項區塊。例如

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      config:
        topic: myPubSubTopic
        format: ...
        schema: ...
      windowing:
        type: fixed
        size: 60s
    - type: Sql
      config:
        query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      config:
        topic: anotherPubSubTopic
        format: JSON
options:
  streaming: true

其他資源