Beam YAML 對應
Beam YAML 具有執行簡單轉換的能力,可用於將資料轉換為正確的格式。其中最簡單的是 MapToFields
,它會根據輸入欄位定義建立具有新欄位的記錄。
欄位重新命名
若要重新命名欄位,可以寫入
- type: MapToFields
config:
fields:
new_col1: col1
new_col2: col2
結果將產生一個輸出,其中每個記錄都有兩個欄位 new_col1
和 new_col2
,其值分別為 col1
和 col2
的值(這是輸入架構中的兩個欄位名稱)。
可以指定 append 參數,表示應保留原始欄位,類似於 SQL select 陳述式中使用 *
的方式。例如
- type: MapToFields
config:
append: true
fields:
new_col1: col1
new_col2: col2
將輸出具有 new_col1
和 new_col2
作為額外欄位的記錄。指定 append 欄位時,也可以捨棄欄位,例如
- type: MapToFields
config:
append: true
drop:
- col3
fields:
new_col1: col1
new_col2: col2
它除了輸出兩個新欄位之外,還包含所有原始欄位,但不包含 col3。
對應函式
當然,您可能希望執行不僅僅是捨棄和重新命名欄位的轉換。Beam YAML 能夠內嵌簡單的 UDF。這需要語言規範。例如,我們可以提供一個引用輸入欄位的 Python 表達式
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
此外,您可以提供一個完整的 Python 可呼叫物件,將該列作為引數來執行更複雜的對應(請參閱 PythonCallableSource 以取得可接受的格式)。因此,您可以寫入
- type: MapToFields
config:
language: python
fields:
new_col:
callable: |
import re
def my_mapping(row):
if re.match("[0-9]+", row.col1) and row.col2 > 0:
return "good"
else:
return "bad"
一旦達到一定的複雜程度,最好將其打包為相依性,然後僅使用完整限定名稱來引用它,例如
- type: MapToFields
config:
language: python
fields:
new_col:
callable: pkg.module.fn
也可以將函式邏輯儲存在檔案中,並指向函式名稱,例如
- type: MapToFields
config:
language: python
fields:
new_col:
path: /path/to/some/udf.py
name: my_mapping
目前,除了 Python 之外,也支援 Java、SQL 和 JavaScript(實驗性)表達式。
Java
使用 Java 對應時,即使是簡單的表達式,也必須宣告 UDF 類型,例如
- type: MapToFields
config:
language: java
fields:
new_col:
expression: col1.toUpperCase()
對於可呼叫的 UDF,Java 要求將該函式宣告為實作 java.util.function.Function
的類別,例如
- type: MapToFields
config:
language: java
fields:
new_col:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
public class MyFunction implements Function<Row, String> {
public String apply(Row row) {
return row.getString("col1").toUpperCase();
}
}
SQL
當 SQL 用於 MapToFields
UDF 時,它本質上是 SQL SELECT
陳述式。
例如,查詢 SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION
看起來會像
- type: MapToFields
config:
language: sql
fields:
new_col: "UPPER(col1)"
another_col: "col2 + col3"
請記住,任何不打算包含在輸出中的欄位都應新增至 drop
欄位。
如果想要選取與 保留 SQL 關鍵字 衝突的欄位,則必須使用反引號將欄位括起來。例如,假設傳入的 PCollection 具有欄位「timestamp」,則必須寫入
- type: MapToFields
config:
language: sql
fields:
new_col: "`timestamp`"
注意:drop
中定義的欄位對應標籤和欄位不需要逸出。只有 UDF 本身需要是有效的 SQL 陳述式。
通用
如果未指定語言,則表達式集會限制為預先存在的欄位以及整數、浮點數或字串常值。例如
- type: MapToFields
config:
fields:
new_col: col1
int_literal: 389
float_litera: 1.90216
str_literal: '"example"' # note the double quoting
FlatMap
有時,可能需要為每個輸入記錄發出一個以上的記錄。這可以透過對應至可迭代類型,然後對應後續的 Explode
作業來完成,例如
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
- type: Explode
config:
fields: new_col
將為每個輸入記錄產生三個輸出記錄。
如果要展開一個以上的記錄,則必須指定是否應取所有欄位的交叉乘積。例如
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
將發出九個記錄,而
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
將只發出三個記錄。
如果相關欄位已經是可迭代類型,則可以單獨使用 Explode
作業。
- type: Explode
config:
fields: [col1]
篩選
有時,可能需要僅保留符合特定條件的記錄。這可以使用 Filter
轉換來完成,例如
- type: Filter
config:
keep: "col2 > 0"
對於比現有欄位和數值常值之間的簡單比較更複雜的任何內容,都必須提供 language
參數,例如
- type: Filter
config:
language: python
keep: "col2 + col3 > 0"
對於更複雜的篩選函式,您可以提供一個完整的 Python 可呼叫物件,將該列作為引數來執行更複雜的對應(請參閱 PythonCallableSource 以取得可接受的格式)。因此,您可以寫入
- type: Filter
config:
language: python
keep:
callable: |
import re
def my_filter(row):
return re.match("[0-9]+", row.col1) and row.col2 > 0
一旦達到一定的複雜程度,最好將其打包為相依性,然後僅使用完整限定名稱來引用它,例如
- type: Filter
config:
language: python
keep:
callable: pkg.module.fn
也可以將函式邏輯儲存在檔案中,並指向函式名稱,例如
- type: Filter
config:
language: python
keep:
path: /path/to/some/udf.py
name: my_filter
目前,除了 Python 之外,也支援 Java、SQL 和 JavaScript(實驗性)表達式。
Java
使用 Java 篩選時,即使是簡單的表達式,也必須宣告 UDF 類型,例如
- type: Filter
config:
language: java
keep:
expression: col2 > 0
對於可呼叫的 UDF,Java 要求將該函式宣告為實作 java.util.function.Function
的類別,例如
- type: Filter
config:
language: java
keep:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
import java.util.regex.Pattern;
public class MyFunction implements Function<Row, Boolean> {
public Boolean apply(Row row) {
Pattern pattern = Pattern.compile("[0-9]+");
return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
}
}
SQL
與對應函式類似,當 SQL 用於 MapToFields
UDF 時,它本質上是 SQL WHERE
陳述式。
例如,查詢 SELECT * FROM PCOLLECTION WHERE col2 > 0
看起來會像
- type: Filter
config:
language: sql
keep: "col2 > 0"
如果想要篩選與 保留 SQL 關鍵字 衝突的欄位,則必須使用反引號將欄位括起來。例如,假設傳入的 PCollection 具有欄位「timestamp」,則必須寫入
- type: Filter
config:
language: sql
keep: "`timestamp` > 0"
分割
將不同的元素傳送到不同的位置也很有用(類似於在其他 SDK 中使用側輸出所做的事情)。雖然可以使用一組 Filter
作業來完成,但是如果每個元素都有單一目的地,則使用 Partition
轉換會更自然,它會將每個元素傳送到唯一的輸出。例如,這會將所有 col1
等於 "a"
的元素傳送到輸出 Partition.a
。
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
- type: SomeTransform
input: Partition.a
config:
param: ...
- type: AnotherTransform
input: Partition.b
config:
param: ...
也可以將目的地指定為函式,例如
- type: Partition
input: input
config:
by: "'even' if col2 % 2 == 0 else 'odd'"
language: python
outputs: ['even', 'odd']
您可以選擇性地提供一個包羅萬象的輸出,它將捕獲所有不在具名輸出中的元素(否則將會出現錯誤)
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
unknown_output: 'other'
有時,您想要將 PCollection 分割為不一定不相交的多個 PCollection。若要將元素傳送到多個(或無)輸出,可以使用可迭代的欄位,並在 Partition
前面加上 Explode
。
- type: Explode
input: input
config:
fields: col1
- type: Partition
input: Explode
config:
by: col1
outputs: ['a', 'b', 'c']
類型
Beam 會嘗試推斷對應中涉及的類型,但有時這是不可能的。在這些情況下,您可以明確表示預期的輸出類型,例如
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
預期的類型是以 json 架構表示法給定的,此外,頂層基本類型可以給定為文字字串,而不是要求使用 {type: 'basic_type_name'}
巢狀結構。
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
another_col:
expression: "beam.Row(a=col1, b=[col2])"
output_type:
type: 'object'
properties:
a:
type: 'string'
b:
type: 'array'
items:
type: 'number'
這對於解決無法處理 beam:logical:pythonsdk_any:v1
類型的錯誤尤其有用。