從 YAML 使用 PyTransform

Beam YAML 提供透過 PyTransform 型別輕鬆調用 Python 轉換的能力,只需依完整限定名稱參考它們即可。例如,

- type: PyTransform
  config:
    constructor: apache_beam.pkg.module.SomeTransform
    args: [1, 'foo']
    kwargs:
       baz: 3

將會調用轉換 apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)。此完整限定名稱可以是任何 PTransform 類別或其他傳回 PTransform 的可呼叫物件。但是,請注意,不接受或傳回結構化資料的 PTransform 可能不那麼容易從 YAML 使用。在非傳回結構的轉換之後恢復結構化屬性,可以透過在 MapToFields 上使用 callable 選項來完成,該選項將整個元素作為輸入,例如:

- type: PyTransform
  config:
    constructor: apache_beam.pkg.module.SomeTransform
    args: [1, 'foo']
    kwargs:
       baz: 3
- type: MapToFields
  config:
    language: python
    fields:
      col1:
        callable: 'lambda element: element.col1'
        output_type: string
      col2:
        callable: 'lambda element: element.col2'
        output_type: integer

這可以用來呼叫 Beam SDK 中的任意轉換,例如:

pipeline:
  transforms:
    - type: PyTransform
      name: ReadFromTsv
      input: {}
      config:
        constructor: apache_beam.io.ReadFromCsv
        kwargs:
           path: '/path/to/*.tsv'
           sep: '\t'
           skip_blank_lines: True
           true_values: ['yes']
           false_values: ['no']
           comment: '#'
           on_bad_lines: 'skip'
           binary: False
           splittable: False

使用 __constructor__ 內嵌定義轉換

如果所需的轉換不存在,也可以內嵌定義它。這可以使用特殊的 __constructor__ 關鍵字完成,類似於跨語言轉換的完成方式。

使用 __constuctor__ 關鍵字,可以定義一個 Python 可呼叫物件,該物件在調用時會「傳回」所需的轉換。第一個引數(或 source 關鍵字引數,如果沒有位置引數)會被解讀為 Python 程式碼。例如

- type: PyTransform
  config:
    constructor: __constructor__
    kwargs:
      source: |
        def create_my_transform(inc):
          return beam.Map(lambda x: beam.Row(a=x.col2 + inc))

      inc: 10

將會將 beam.Map(lambda x: beam.Row(a=x.col2 + 10)) 套用至傳入的 PCollection。

由於類別物件可以作為自己的建構函式調用,因此這允許內嵌定義 beam.PTransform,例如:

- type: PyTransform
  config:
    constructor: __constructor__
    kwargs:
      source: |
        class MyPTransform(beam.PTransform):
          def __init__(self, inc):
            self._inc = inc
          def expand(self, pcoll):
            return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc))

      inc: 10

其運作方式完全如預期的那樣。

使用 __callable__ 內嵌定義轉換

__callable__ 關鍵字的工作方式類似,但不是定義傳回適用 PTransform 的可呼叫物件,而是簡單地定義要作為可呼叫物件執行的展開。這類似於 BeamPython 的 ptransform.ptransform_fn 修飾詞。

在這種情況下,可以簡單地寫成

- type: PyTransform
  config:
    constructor: __callable__
    kwargs:
      source: |
        def my_ptransform(pcoll, inc):
          return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc))

      inc: 10

外部轉換

也可以透過 python 提供者調用其他地方定義的 PTransforms,例如

pipeline:
  transforms:
    - ...
    - type: MyTransform
      config:
        kwarg: whatever

providers:
  - ...
  - type: python
    input: ...
    config:
      packages:
        - 'some_pypi_package>=version'
    transforms:
      MyTransform: 'pkg.module.MyTransform'

這些也可以內嵌定義,帶或不帶相依性,例如:

pipeline:
  transforms:
    - ...
    - type: ToCase
      input: ...
      config:
        upper: True

providers:
  - type: python
    config: {}
    transforms:
      'ToCase': |
        @beam.ptransform_fn
        def ToCase(pcoll, upper):
          if upper:
            return pcoll | beam.Map(lambda x: str(x).upper())
          else:
            return pcoll | beam.Map(lambda x: str(x).lower())