Beam YAML 轉換索引

AssertEqual

斷言輸入內容是否完全符合提供的元素。

這主要用於測試;如果此轉換的輸入與配置參數中給定的 `elements` 集合不完全相同,則會導致整個管道失敗。

與 Create 相同,YAML/JSON 樣式的映射會被解釋為 Beam 列,例如:

type: AssertEqual
input: SomeTransform
config:
  elements:
     - {a: 0, b: "foo"}
     - {a: 1, b: "bar"}

將確保 `SomeTransform` 產生恰好兩個值分別為 `(a=0, b="foo")` 和 `(a=1, b="bar")` 的元素。

配置

用法

type: AssertEqual
input: ...
config:
  elements:
  - element
  - element
  - ...

AssignTimestamps

為其輸入的每個元素分配一個新的時間戳記。

當讀取時間戳記嵌入其中的記錄時,這可能很有用,例如,對於各種文件類型或其他預設將所有時間戳記設置為無限過去的來源。

請注意,時間戳記應僅向前設置,因為向後設置可能不會阻止已提前的水印,並且數據可能會變得過於延遲而被丟棄。

支援的語言:通用、javascript、python

配置

用法

type: AssignTimestamps
input: ...
config:
  timestamp: timestamp
  language: "language"
  error_handling:
    output: "output"

Combine

群組和組合共享通用欄位的記錄。

內建的組合函數為 `sum`、`max`、`min`、`all`、`any`、`mean`、`count`、`group`、`concat`,但也可以使用自訂的聚合函數。

另請參閱有關 YAML 聚合 的文件。

支援的語言:calcite、通用、javascript、python、sql

配置

用法

type: Combine
input: ...
config:
  group_by:
  - "group_by"
  - "group_by"
  - ...
  combine:
    a:
      a: combine_value_a_value_a
      b: combine_value_a_value_b
      c: ...
    b:
      a: combine_value_b_value_a
      b: combine_value_b_value_b
      c: ...
    c: ...
  language: "language"

Create

建立一個包含指定元素集合的集合。

此轉換始終產生 schema'd 數據。例如:

type: Create
config:
  elements: [1, 2, 3]

將產生一個具有三個元素的輸出,其 schema 為 Row(element=int),而 YAML/JSON 樣式的映射將直接被解釋為 Beam 列,例如:

type: Create
config:
  elements:
     - {first: 0, second: {str: "foo", values: [1, 2, 3]}}
     - {first: 1, second: {str: "bar", values: [4, 5, 6]}}

將產生 (int, Row(string, List[int])) 形式的 schema。

這也可以表示為 YAML

type: Create
config:
  elements:
    - first: 0
      second:
        str: "foo"
         values: [1, 2, 3]
    - first: 1
      second:
        str: "bar"
         values: [4, 5, 6]

配置

用法

type: Create
config:
  elements:
  - element
  - element
  - ...
  reshuffle: true|false

Explode

展開(又稱取消巢狀/扁平化)一個或多個欄位,產生多列。

給定一個或多個可迭代類型的欄位,產生多列,每個欄位的值對應一列。例如,當在第二個欄位上展開時,`('a', [1, 2, 3])` 形式的列將展開為 `('a', 1)`、`('a', 2')` 和 `('a', 3)`。

當與 MapToFields 轉換配對時,這類似於 `FlatMap`。

請參閱有關 YAML 映射函數 的更完整的文件。

配置

用法

type: Explode
input: ...
config:
  fields: fields
  cross_product: true|false
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...

Filter

僅保留符合指定條件的記錄。

請參閱有關 YAML 過濾 的更完整的文件。

支援的語言:calcite、通用、java、javascript、python、sql

配置

用法

type: Filter
input: ...
config:
  keep: keep
  language: "language"
  error_handling:
    output: "output"

Flatten

將多個 PCollection 扁平化為單個 PCollection。

結果 PCollection 的元素將是所有輸入的所有元素的(不相交)聯集。

請注意,在 YAML 中,轉換始終可以採用將被隱式扁平化的輸入列表。

配置

沒有配置參數。

用法

type: Flatten
input: ...
config: ...

Join

使用指定的條件聯結兩個或多個輸入。

例如:

type: Join
input:
  input1: SomeTransform
  input2: AnotherTransform
  input3: YetAnotherTransform
config:
  type: inner
  equalities:
    - input1: colA
      input2: colB
    - input2: colX
      input3: colY
  fields:
    input1: [colA, colB, colC]
    input2: {new_name: colB}

將在三個輸入上執行內部聯結,滿足 `input1.colA = input2.colB` 和 `input2.colX = input3.colY` 的條件,輸出具有來自 `input1` 的 `colA`、`colB` 和 `colC` 的列、作為名為 `new_name` 的欄位的 `input2.colB` 的值以及來自 `input3` 的所有欄位的列。

配置

用法

type: Join
input: ...
config:
  equalities: equalities
  type: type
  fields:
    a: fields_value_a
    b: fields_value_b
    c: ...

LogForTesting

記錄其輸入 PCollection 的每個元素。

此轉換的輸出是其輸入的副本,以便在鏈式管道中輕鬆使用。

配置

用法

type: LogForTesting
input: ...
config:
  level: "level"
  prefix: "prefix"

MLTransform

配置

用法

type: MLTransform
input: ...
config:
  write_artifact_location: "write_artifact_location"
  read_artifact_location: "read_artifact_location"
  transforms:
  - transforms
  - transforms
  - ...

MapToFields

建立具有根據輸入欄位定義的新欄位的記錄。

請參閱有關 YAML 映射函數 的更完整的文件。

支援的語言:calcite、通用、java、javascript、python、sql

配置

用法

type: MapToFields
input: ...
config:
  language: language
  error_handling:
    output: "output"
  mapping_args: mapping_args

Partition

將輸入拆分為多個不同的輸出。

每個輸入元素將根據 `by` 配置參數中給定的欄位或函數進入不同的輸出。

支援的語言:通用、javascript、python

配置

用法

type: Partition
input: ...
config:
  by: by
  outputs:
  - "outputs"
  - "outputs"
  - ...
  unknown_output: "unknown_output"
  error_handling:
    a: error_handling_value_a
    b: error_handling_value_b
    c: ...
  language: "language"

PyTransform

由完整限定名稱識別的 Python PTransform。

這允許導入、建構和應用任何 Beam Python 轉換。這對於使用尚未透過 YAML 介面公開的轉換可能很有用。但是,請注意,如果此轉換不接受或產生 Beam 列,則可能需要轉換。

例如:

type: PyTransform
config:
   constructor: apache_beam.pkg.mod.SomeClass
   args: [1, 'foo']
   kwargs:
     baz: 3

可用於存取轉換 `apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)`。

另請參閱有關 內聯 Python 的文件。

配置

用法

type: PyTransform
input: ...
config:
  constructor: "constructor"
  args:
  - arg
  - arg
  - ...
  kwargs:
    a: kwargs_value_a
    b: kwargs_value_b
    c: ...

Sql

配置

用法

type: Sql
input: ...
config: ...

WindowInto

一個視窗轉換,將視窗分配給 PCollection 的每個元素。

分配的視窗將影響所有下游的聚合操作,這些操作將按視窗和鍵進行聚合。

請參閱關於視窗化的 Beam 文件以了解更多詳細資訊。

大小、偏移量、週期和間隔(如果適用)必須使用時間單位後綴 'ms'、's'、'm'、'h' 或 'd' 分別表示毫秒、秒、分鐘、小時或天。 如果未指定時間單位,則預設為 's'。

例如:

windowing:
   type: fixed
   size: 30s

請注意,任何 Yaml 轉換都可以具有視窗化參數,該參數將應用於其輸入(如果有)或輸出(如果沒有輸入),這表示通常不需要明確的 WindowInto 操作。

配置

用法

type: WindowInto
input: ...
config:
  windowing: windowing

ReadFromAvro

一個用於從 avro 檔案讀取記錄的 PTransform

結果 PCollection 的每個記錄都將包含從來源讀取的單一記錄。屬於簡單類型的記錄將映射到具有包含記錄值的單一 record 欄位的 Beam 列。屬於 Avro 類型 RECORD 的記錄將映射到符合包含這些記錄的 Avro 檔案中結構描述的 Beam 列。

配置

用法

type: ReadFromAvro
config:
  path: path

WriteToAvro

一個用於寫入 avro 檔案的 PTransform

如果輸入具有結構描述,將會自動產生對應的 avro 結構描述,並用於寫入輸出記錄。

配置

用法

type: WriteToAvro
input: ...
config:
  path: path

ReadFromBigQuery

從 BigQuery 讀取資料。

必須設定 table 或 query 其中一個。 如果設定 query,則不應設定 row_restriction 或 fields。

配置

用法

type: ReadFromBigQuery
config:
  table: "table"
  query: "query"
  row_restriction: "row_restriction"
  fields:
  - "field"
  - "field"
  - ...

WriteToBigQuery

配置

用法

type: WriteToBigQuery
input: ...
config: ...

ReadFromCsv

一個將逗號分隔值 (csv) 檔案讀取到 PCollection 的 PTransform。

配置

用法

type: ReadFromCsv
config:
  path: "path"
  delimiter: delimiter
  comment: comment

WriteToCsv

一個將具有結構描述的 PCollection 作為(一組)逗號分隔值 (csv) 檔案寫入的 PTransform。

配置

用法

type: WriteToCsv
input: ...
config:
  path: "path"
  delimiter: delimiter

ReadFromJdbc

配置

用法

type: ReadFromJdbc
config: ...

WriteToJdbc

配置

用法

type: WriteToJdbc
input: ...
config: ...

ReadFromJson

一個將 json 值從檔案讀取到 PCollection 的 PTransform。

配置

用法

type: ReadFromJson
config:
  path: "path"

WriteToJson

一個將 PCollection 作為 json 值寫入檔案的 PTransform。

配置

用法

type: WriteToJson
input: ...
config:
  path: "path"

ReadFromKafka

配置

用法

type: ReadFromKafka
config: ...

WriteToKafka

配置

用法

type: WriteToKafka
input: ...
config: ...

ReadFromMySql

配置

用法

type: ReadFromMySql
config: ...

WriteToMySql

配置

用法

type: WriteToMySql
input: ...
config: ...

ReadFromOracle

配置

用法

type: ReadFromOracle
config: ...

WriteToOracle

配置

用法

type: WriteToOracle
input: ...
config: ...

ReadFromParquet

一個用於讀取 Parquet 檔案的 PTransform

配置

用法

type: ReadFromParquet
config:
  path: path

WriteToParquet

一個用於寫入 Parquet 檔案的 PTransform

配置

用法

type: WriteToParquet
input: ...
config:
  path: path

ReadFromPostgres

配置

用法

type: ReadFromPostgres
config: ...

WriteToPostgres

配置

用法

type: WriteToPostgres
input: ...
config: ...

ReadFromPubSub

從 Cloud Pub/Sub 讀取訊息。

配置

用法

type: ReadFromPubSub
config:
  topic: "topic"
  subscription: "subscription"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

WriteToPubSub

將訊息寫入 Cloud Pub/Sub。

配置

用法

type: WriteToPubSub
input: ...
config:
  topic: "topic"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

ReadFromPubSubLite

配置

用法

type: ReadFromPubSubLite
config: ...

WriteToPubSubLite

配置

用法

type: WriteToPubSubLite
input: ...
config: ...

ReadFromSpanner

配置

用法

type: ReadFromSpanner
config: ...

WriteToSpanner

配置

用法

type: WriteToSpanner
input: ...
config: ...

ReadFromSqlServer

配置

用法

type: ReadFromSqlServer
config: ...

WriteToSqlServer

配置

用法

type: WriteToSqlServer
input: ...
config: ...

ReadFromText

從文字檔案讀取行。

結果 PCollection 由具有名為「line」的單一字串欄位的列組成。

配置

用法

type: ReadFromText
config:
  path: "path"

WriteToText

將 PCollection 寫入到(一組)文字檔案。

輸入必須是其結構描述恰好有一個欄位的 PCollection。

配置

用法

type: WriteToText
input: ...
config:
  path: "path"