Beam YAML 聚合

Beam YAML 具有執行聚合以群組和合併跨記錄值的能力。這是透過 Combine 轉換類型完成的。

例如,可以寫成

- type: Combine
  config:
    group_by: col1
    combine:
      total:
        value: col2
        fn:
          type: sum

如果函數沒有設定需求,可以直接以字串形式提供

- type: Combine
  config:
    group_by: col1
    combine:
      total:
        value: col2
        fn: sum

如果輸出欄位名稱與輸入欄位名稱相同,則可以進一步簡化此操作

- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum

可以一次聚合多個欄位

- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

和/或依多個欄位分組

- type: Combine
  config:
    group_by: [col1, col2]
    combine:
      col3: sum

或完全不分組(這將導致具有單一輸出的全域合併)

- type: Combine
  config:
    group_by: []
    combine:
      col2: sum
      col3: max

視窗化聚合

與所有轉換一樣,Combine 可以採用視窗化參數

- type: Combine
  windowing:
    type: fixed
    size: 60s
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

如果未提供視窗化規格,它會繼承上游的視窗化參數,例如

- type: WindowInto
  windowing:
    type: fixed
    size: 60s
- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

等效於先前的範例。

自訂聚合函數

可以透過設定語言參數來使用 Python 中定義的聚合函數。

- type: Combine
  config:
    language: python
    group_by: col1
    combine:
      biggest:
        value: "col2 + col2"
        fn:
          type: 'apache_beam.transforms.combiners.TopCombineFn'
          config:
            n: 10

SQL 樣式聚合

透過將語言設定為 SQL,可以提供完整的 SQL 片段作為合併函數。

- type: Combine
  config:
    language: sql
    group_by: col1
    combine:
      num_values: "count(*)"
      total: "sum(col2)"

當然,也可以使用 Sql 轉換類型並直接提供查詢。