Beam YAML 聚合
Beam YAML 具有執行聚合以群組和合併跨記錄值的能力。這是透過 Combine
轉換類型完成的。
例如,可以寫成
- type: Combine
config:
group_by: col1
combine:
total:
value: col2
fn:
type: sum
如果函數沒有設定需求,可以直接以字串形式提供
- type: Combine
config:
group_by: col1
combine:
total:
value: col2
fn: sum
如果輸出欄位名稱與輸入欄位名稱相同,則可以進一步簡化此操作
- type: Combine
config:
group_by: col1
combine:
col2: sum
可以一次聚合多個欄位
- type: Combine
config:
group_by: col1
combine:
col2: sum
col3: max
和/或依多個欄位分組
- type: Combine
config:
group_by: [col1, col2]
combine:
col3: sum
或完全不分組(這將導致具有單一輸出的全域合併)
- type: Combine
config:
group_by: []
combine:
col2: sum
col3: max
視窗化聚合
與所有轉換一樣,Combine
可以採用視窗化參數
- type: Combine
windowing:
type: fixed
size: 60s
config:
group_by: col1
combine:
col2: sum
col3: max
如果未提供視窗化規格,它會繼承上游的視窗化參數,例如
- type: WindowInto
windowing:
type: fixed
size: 60s
- type: Combine
config:
group_by: col1
combine:
col2: sum
col3: max
等效於先前的範例。
自訂聚合函數
可以透過設定語言參數來使用 Python 中定義的聚合函數。
- type: Combine
config:
language: python
group_by: col1
combine:
biggest:
value: "col2 + col2"
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 10
SQL 樣式聚合
透過將語言設定為 SQL,可以提供完整的 SQL 片段作為合併函數。
- type: Combine
config:
language: sql
group_by: col1
combine:
num_values: "count(*)"
total: "sum(col2)"
當然,也可以使用 Sql
轉換類型並直接提供查詢。