Beam YAML 對應

Beam YAML 具有執行簡單轉換的能力,可用於將資料轉換為正確的格式。其中最簡單的是 MapToFields,它會根據輸入欄位定義建立具有新欄位的記錄。

欄位重新命名

若要重新命名欄位,可以寫入

- type: MapToFields
  config:
    fields:
      new_col1: col1
      new_col2: col2

結果將產生一個輸出,其中每個記錄都有兩個欄位 new_col1new_col2,其值分別為 col1col2 的值(這是輸入架構中的兩個欄位名稱)。

可以指定 append 參數,表示應保留原始欄位,類似於 SQL select 陳述式中使用 * 的方式。例如

- type: MapToFields
  config:
    append: true
    fields:
      new_col1: col1
      new_col2: col2

將輸出具有 new_col1new_col2 作為額外欄位的記錄。指定 append 欄位時,也可以捨棄欄位,例如

- type: MapToFields
  config:
    append: true
    drop:
      - col3
    fields:
      new_col1: col1
      new_col2: col2

它除了輸出兩個新欄位之外,還包含所有原始欄位,但不包含 col3。

對應函式

當然,您可能希望執行不僅僅是捨棄和重新命名欄位的轉換。Beam YAML 能夠內嵌簡單的 UDF。這需要語言規範。例如,我們可以提供一個引用輸入欄位的 Python 表達式

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "col1.upper()"
      another_col: "col2 + col3"

此外,您可以提供一個完整的 Python 可呼叫物件,將該列作為引數來執行更複雜的對應(請參閱 PythonCallableSource 以取得可接受的格式)。因此,您可以寫入

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        callable: |
          import re
          def my_mapping(row):
            if re.match("[0-9]+", row.col1) and row.col2 > 0:
              return "good"
            else:
              return "bad"

一旦達到一定的複雜程度,最好將其打包為相依性,然後僅使用完整限定名稱來引用它,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        callable: pkg.module.fn

也可以將函式邏輯儲存在檔案中,並指向函式名稱,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        path: /path/to/some/udf.py
        name: my_mapping

目前,除了 Python 之外,也支援 Java、SQL 和 JavaScript(實驗性)表達式。

Java

使用 Java 對應時,即使是簡單的表達式,也必須宣告 UDF 類型,例如

- type: MapToFields
  config:
    language: java
    fields:
      new_col:
        expression: col1.toUpperCase()

對於可呼叫的 UDF,Java 要求將該函式宣告為實作 java.util.function.Function 的類別,例如

- type: MapToFields
  config:
    language: java
    fields:
      new_col:
        callable: |
          import org.apache.beam.sdk.values.Row;
          import java.util.function.Function;
          public class MyFunction implements Function<Row, String> {
            public String apply(Row row) {
              return row.getString("col1").toUpperCase();
            }
          }

SQL

當 SQL 用於 MapToFields UDF 時,它本質上是 SQL SELECT 陳述式。

例如,查詢 SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION 看起來會像

- type: MapToFields
  config:
    language: sql
    fields:
      new_col: "UPPER(col1)"
      another_col: "col2 + col3"

請記住,任何不打算包含在輸出中的欄位都應新增至 drop 欄位。

如果想要選取與 保留 SQL 關鍵字 衝突的欄位,則必須使用反引號將欄位括起來。例如,假設傳入的 PCollection 具有欄位「timestamp」,則必須寫入

- type: MapToFields
  config:
    language: sql
    fields:
      new_col: "`timestamp`"

注意drop 中定義的欄位對應標籤和欄位不需要逸出。只有 UDF 本身需要是有效的 SQL 陳述式。

通用

如果未指定語言,則表達式集會限制為預先存在的欄位以及整數、浮點數或字串常值。例如

- type: MapToFields
  config:
    fields:
      new_col: col1
      int_literal: 389
      float_litera: 1.90216
      str_literal: '"example"'  # note the double quoting

FlatMap

有時,可能需要為每個輸入記錄發出一個以上的記錄。這可以透過對應至可迭代類型,然後對應後續的 Explode 作業來完成,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "col2 + col3"
- type: Explode
  config:
    fields: new_col

將為每個輸入記錄產生三個輸出記錄。

如果要展開一個以上的記錄,則必須指定是否應取所有欄位的交叉乘積。例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
  config:
    fields: [new_col, another_col]
    cross_product: true

將發出九個記錄,而

- type: MapToFields
  config:
    language: python
    fields:
      new_col: "[col1.upper(), col1.lower(), col1.title()]"
      another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
  config:
    fields: [new_col, another_col]
    cross_product: false

將只發出三個記錄。

如果相關欄位已經是可迭代類型,則可以單獨使用 Explode 作業。

- type: Explode
  config:
    fields: [col1]

篩選

有時,可能需要僅保留符合特定條件的記錄。這可以使用 Filter 轉換來完成,例如

- type: Filter
  config:
    keep: "col2 > 0"

對於比現有欄位和數值常值之間的簡單比較更複雜的任何內容,都必須提供 language 參數,例如

- type: Filter
  config:
    language: python
    keep: "col2 + col3 > 0"

對於更複雜的篩選函式,您可以提供一個完整的 Python 可呼叫物件,將該列作為引數來執行更複雜的對應(請參閱 PythonCallableSource 以取得可接受的格式)。因此,您可以寫入

- type: Filter
  config:
    language: python
    keep:
      callable: |
        import re
        def my_filter(row):
          return re.match("[0-9]+", row.col1) and row.col2 > 0

一旦達到一定的複雜程度,最好將其打包為相依性,然後僅使用完整限定名稱來引用它,例如

- type: Filter
  config:
    language: python
    keep:
      callable: pkg.module.fn

也可以將函式邏輯儲存在檔案中,並指向函式名稱,例如

- type: Filter
  config:
    language: python
    keep:
      path: /path/to/some/udf.py
      name: my_filter

目前,除了 Python 之外,也支援 Java、SQL 和 JavaScript(實驗性)表達式。

Java

使用 Java 篩選時,即使是簡單的表達式,也必須宣告 UDF 類型,例如

- type: Filter
  config:
    language: java
    keep:
      expression: col2 > 0

對於可呼叫的 UDF,Java 要求將該函式宣告為實作 java.util.function.Function 的類別,例如

- type: Filter
  config:
    language: java
    keep:
      callable: |
        import org.apache.beam.sdk.values.Row;
        import java.util.function.Function;
        import java.util.regex.Pattern;
        public class MyFunction implements Function<Row, Boolean> {
          public Boolean apply(Row row) {
            Pattern pattern = Pattern.compile("[0-9]+");
            return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
          }
        }

SQL

對應函式類似,當 SQL 用於 MapToFields UDF 時,它本質上是 SQL WHERE 陳述式。

例如,查詢 SELECT * FROM PCOLLECTION WHERE col2 > 0 看起來會像

- type: Filter
  config:
    language: sql
    keep: "col2 > 0"

如果想要篩選與 保留 SQL 關鍵字 衝突的欄位,則必須使用反引號將欄位括起來。例如,假設傳入的 PCollection 具有欄位「timestamp」,則必須寫入

- type: Filter
  config:
    language: sql
    keep: "`timestamp` > 0"

分割

將不同的元素傳送到不同的位置也很有用(類似於在其他 SDK 中使用側輸出所做的事情)。雖然可以使用一組 Filter 作業來完成,但是如果每個元素都有單一目的地,則使用 Partition 轉換會更自然,它會將每個元素傳送到唯一的輸出。例如,這會將所有 col1 等於 "a" 的元素傳送到輸出 Partition.a

- type: Partition
  input: input
  config:
    by: col1
    outputs: ['a', 'b', 'c']

- type: SomeTransform
  input: Partition.a
  config:
    param: ...

- type: AnotherTransform
  input: Partition.b
  config:
    param: ...

也可以將目的地指定為函式,例如

- type: Partition
  input: input
  config:
    by: "'even' if col2 % 2 == 0 else 'odd'"
    language: python
    outputs: ['even', 'odd']

您可以選擇性地提供一個包羅萬象的輸出,它將捕獲所有不在具名輸出中的元素(否則將會出現錯誤)

- type: Partition
  input: input
  config:
    by: col1
    outputs: ['a', 'b', 'c']
    unknown_output: 'other'

有時,您想要將 PCollection 分割為不一定不相交的多個 PCollection。若要將元素傳送到多個(或無)輸出,可以使用可迭代的欄位,並在 Partition 前面加上 Explode

- type: Explode
  input: input
  config:
    fields: col1

- type: Partition
  input: Explode
  config:
    by: col1
    outputs: ['a', 'b', 'c']

類型

Beam 會嘗試推斷對應中涉及的類型,但有時這是不可能的。在這些情況下,您可以明確表示預期的輸出類型,例如

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        expression: "col1.upper()"
        output_type: string

預期的類型是以 json 架構表示法給定的,此外,頂層基本類型可以給定為文字字串,而不是要求使用 {type: 'basic_type_name'} 巢狀結構。

- type: MapToFields
  config:
    language: python
    fields:
      new_col:
        expression: "col1.upper()"
        output_type: string
      another_col:
        expression: "beam.Row(a=col1, b=[col2])"
        output_type:
          type: 'object'
          properties:
            a:
              type: 'string'
            b:
              type: 'array'
              items:
                type: 'number'

這對於解決無法處理 beam:logical:pythonsdk_any:v1 類型的錯誤尤其有用。