Beam YAML API
Beam YAML 是一種宣告式語法,可使用 YAML 檔案描述 Apache Beam 管線。您可以使用 Beam YAML 來撰寫和執行 Beam 管線,而無需編寫任何程式碼。
概觀
Beam 提供了一個強大的模型來建立複雜的資料處理管線。但是,開始使用 Beam 程式設計可能會很有挑戰性,因為它需要在支援的 Beam SDK 語言之一中編寫程式碼。您需要了解 API、設定專案、管理相依性以及執行其他程式設計任務。
Beam YAML 可以更輕鬆地開始建立 Beam 管線。您可以不用編寫程式碼,而是使用任何文字編輯器建立 YAML 檔案。然後您提交 YAML 檔案以由執行器執行。
Beam YAML 語法設計為人類可讀,但也適合做為工具的中繼表示法。例如,管線撰寫 GUI 可以輸出 YAML,或者譜系分析工具可以使用 YAML 管線規格。
Beam YAML 仍在開發中,但已包含的任何功能都被視為穩定。歡迎透過 dev@apache.beam.org 提供意見回饋。
先決條件
Beam YAML 剖析器目前包含在 Apache Beam Python SDK 中。您不需要編寫 Python 程式碼來使用 Beam YAML,但您需要 SDK 才能在本地執行管線。
我們建議建立一個虛擬環境,以便所有套件都安裝在隔離且獨立的環境中。設定好 Python 環境後,請依照下列步驟安裝 SDK
pip install apache_beam[yaml,gcp]
此外,一些提供的轉換(例如 SQL 轉換)是以 Java 實作,並且需要可用的 Java 直譯器。當您執行具有這些轉換的管線時,所需的成品會自動從 Apache Maven 儲存庫下載。
開始使用
使用文字編輯器建立一個名為 pipeline.yaml
的檔案。將以下文字貼到檔案中並儲存
pipeline:
transforms:
- type: Create
config:
elements: [1, 2, 3]
- type: LogForTesting
input: Create
此檔案定義一個具有兩個轉換的簡單管線
Create
轉換會建立一個集合。config
的值是組態設定的字典。在這種情況下,elements
指定集合的成員。其他轉換類型具有其他組態設定。LogForTesting
轉換會記錄每個輸入元素。此轉換不需要config
設定。input
鍵指定LogForTesting
從Create
轉換接收輸入。
執行管線
若要執行管線,請執行下列 Python 命令
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml
輸出應包含類似以下的日誌陳述式
INFO:root:{"element": 1}
INFO:root:{"element": 2}
INFO:root:{"element": 3}
在 Dataflow 中執行管線
您可以使用 gcloud CLI 將 YAML 管線提交到 Dataflow。若要從 YAML 檔案建立 Dataflow 工作,請使用 gcloud dataflow yaml run
命令
gcloud dataflow yaml run $JOB_NAME \
--yaml-pipeline-file=pipeline.yaml \
--region=$REGION
當您使用 gcloud
CLI 時,您不需要在本地安裝 Beam SDK。
視覺化管線
您可以使用 apache_beam.runners.render
模組將管線執行圖呈現為 PNG 檔案,如下所示
安裝 Graphviz。
執行以下命令
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml \ --runner=apache_beam.runners.render.RenderRunner \ --render_output=out.png
範例:讀取 CSV 資料
以下管線從一組 CSV 檔案讀取資料,並以 JSON 格式寫入資料。此管線假設 CSV 檔案具有標題列。欄名會變成 JSON 欄位名稱。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: WriteToJson
config:
path: /path/to/output.json
input: ReadFromCsv
新增篩選器
Filter
轉換會篩選記錄。它會保留符合布林述詞的輸入記錄,並捨棄不符合述詞的記錄。以下範例會保留 col3
的值大於 100 的記錄
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: WriteToJson
config:
path: /path/to/output.json
input: Filter
新增對應函數
Beam YAML 支援各種對應函數。以下範例使用 Sql
轉換,依 col1
分組並輸出每個鍵的計數。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: Sql
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: Filter
- type: WriteToJson
config:
path: /path/to/output.json
input: Sql
模式
本節說明 Beam YAML 中一些常見的模式。
已命名轉換
您可以命名管線中的轉換,以協助監控和偵錯。如果管線包含多個相同類型的轉換,名稱也可用於消除轉換的歧義。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadMyData
config:
path: /path/to/input*.csv
- type: Filter
name: KeepBigRecords
config:
language: python
keep: "col3 > 100"
input: ReadMyData
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: KeepBigRecords
- type: WriteToJson
name: WriteTheOutput
config:
path: /path/to/output.json
input: MySqlTransform
鏈結轉換
如果管線是線性的(沒有分支或合併),您可以將管線指定為 chain
類型。在 chain
類型管線中,您不需要指定輸入。輸入是從它們在 YAML 檔案中出現的順序隱含的
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
- type: WriteToJson
config:
path: /path/to/output.json
來源和接收器轉換
作為語法糖,您可以將管線中的第一個和最後一個轉換分別命名為 source
和 sink
。此慣例不會變更產生的管線,但它會發出來源和接收器轉換的意圖。
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
sink:
type: WriteToJson
config:
path: /path/to/output.json
非線性管線
Beam YAML 支援任意的非線性管線。以下管線讀取兩個來源,將它們聯結起來,並寫入兩個輸出
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: Filter
name: FilterToBig
input: Sql
config:
language: python
keep: "col2 > 100"
- type: WriteToCsv
name: WriteBig
input: FilterToBig
config:
path: /path/to/big.csv
由於管線不是線性的,您必須明確宣告每個轉換的輸入。但是,您可以在非線性管線中巢狀化 chain
。鏈結是管線內的線性子路徑。
以下範例建立一個名為 ExtraProcessingForBigRows
的鏈結。鏈結從 Sql
轉換取得輸入,並套用幾個額外的篩選器以及接收器。請注意,在鏈結內,不需要指定輸入。
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: chain
name: ExtraProcessingForBigRows
input: Sql
transforms:
- type: Filter
config:
language: python
keep: "col2 > 100"
- type: Filter
config:
language: python
keep: "len(col1) > 10"
- type: Filter
config:
language: python
keep: "col1 > 'z'"
sink:
type: WriteToCsv
config:
path: /path/to/big.csv
視窗化
此 API 可用於定義串流和批次管線。為了在串流管線中有意義地彙總元素,通常需要某種類型的視窗化。可以使用所有其他 Beam SDK 中可用的相同 WindowInto
轉換來宣告 Beam 的視窗化和觸發。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: JSON
schema:
type: object
properties:
col1: {type: string}
col2: {type: integer}
col3: {type: number}
- type: WindowInto
windowing:
type: fixed
size: 60s
- type: SomeGroupingTransform
config:
arg: ...
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
您可以不用使用明確的 WindowInto
作業,而是使用指定的視窗化標記轉換,這會導致其輸入(因此轉換本身)套用該視窗化。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeGroupingTransform
config:
arg: ...
windowing:
type: sliding
size: 60s
period: 10s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
請注意,Sql
作業本身通常是一種彙總形式,並且套用視窗化(或使用已視窗化的輸入)會導致每個視窗完成所有分組。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
windowing:
type: sessions
gap: 60s
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
指定的視窗化會套用至所有輸入,在本例中會產生每個視窗的聯結。
pipeline:
transforms:
- type: ReadFromPubSub
name: ReadLeft
config:
topic: leftTopic
format: ...
schema: ...
- type: ReadFromPubSub
name: ReadRight
config:
topic: rightTopic
format: ...
schema: ...
- type: Sql
config:
query: select A.col1, B.col2 from A join B using (col3)
input:
A: ReadLeft
B: ReadRight
windowing:
type: fixed
size: 60s
options:
streaming: true
對於沒有輸入的轉換,指定的視窗化會改為套用至其輸出。根據 Beam 模型,視窗化會由所有使用作業繼承。這對於 Read 等根作業特別有用。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true
也可以在管線(或複合)的最上層指定視窗化,這是將相同的視窗化套用至所有未另行指定自己的視窗化的根作業的簡寫。此方法是在管線中的任何地方套用視窗的有效方法。
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
windowing:
type: fixed
size: 60
options:
streaming: true
請注意,所有這些視窗化規格也與 source
和 sink
語法相容
pipeline:
type: chain
source:
type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 10s
transforms:
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
sink:
type: WriteToCsv
config:
path: /path/to/output.json
windowing:
type: fixed
size: 5m
options:
streaming: true
供應商
儘管我們的目標是提供一套大型的內建轉換,但人們不可避免地會想要撰寫自己的轉換。透過利用擴充服務和架構轉換的供應商概念,可以實現這一點。
例如,您可以建置一個 jar,以提供跨語言轉換或架構轉換,然後在轉換中使用它,如下所示
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
config:
arg: whatever
sink:
type: WriteToJson
config:
path: /path/to/output.json
providers:
- type: javaJar
config:
jar: /path/or/url/to/myExpansionService.jar
transforms:
MyCustomTransform: "urn:registered:in:expansion:service"
也可以使用語法提供任意 Python 轉換
providers:
- type: pythonPackage
config:
packages:
- my_pypi_package>=version
- /path/to/local/package.zip
transforms:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
管線選項
管線選項用於設定管線的不同方面,例如將執行管線的管線執行器,以及所選執行器所需的任何執行器特定組態。若要設定管線選項,請在 YAML 檔案的結尾附加選項區塊。例如
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60s
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: JSON
options:
streaming: true