Beam YAML 轉換索引
AssertEqual
斷言輸入內容是否完全符合提供的元素。
這主要用於測試;如果此轉換的輸入與配置參數中給定的 `elements` 集合不完全相同,則會導致整個管道失敗。
與 Create 相同,YAML/JSON 樣式的映射會被解釋為 Beam 列,例如:
type: AssertEqual
input: SomeTransform
config:
elements:
- {a: 0, b: "foo"}
- {a: 1, b: "bar"}
將確保 `SomeTransform` 產生恰好兩個值分別為 `(a=0, b="foo")` 和 `(a=1, b="bar")` 的元素。
配置
- elements
Array[?]
: 應該屬於 PCollection 的元素集合。YAML/JSON 樣式的映射將被解釋為 Beam 列。
用法
type: AssertEqual
input: ...
config:
elements:
- element
- element
- ...
AssignTimestamps
為其輸入的每個元素分配一個新的時間戳記。
當讀取時間戳記嵌入其中的記錄時,這可能很有用,例如,對於各種文件類型或其他預設將所有時間戳記設置為無限過去的來源。
請注意,時間戳記應僅向前設置,因為向後設置可能不會阻止已提前的水印,並且數據可能會變得過於延遲而被丟棄。
支援的語言:通用、javascript、python
配置
-
timestamp
?
(選填) : 一個欄位、可呼叫物件或表示式,用於指定新的時間戳記。 -
language
string
(選填) : 時間戳記表示式的語言。 -
error_handling
Row
: 如何處理時間戳記評估期間的錯誤。Row 欄位
- output
string
- output
用法
type: AssignTimestamps
input: ...
config:
timestamp: timestamp
language: "language"
error_handling:
output: "output"
Combine
群組和組合共享通用欄位的記錄。
內建的組合函數為 `sum`、`max`、`min`、`all`、`any`、`mean`、`count`、`group`、`concat`,但也可以使用自訂的聚合函數。
另請參閱有關 YAML 聚合 的文件。
支援的語言:calcite、通用、javascript、python、sql
配置
-
group_by
Array[string]
-
combine
Map[string, Map[string, ?]]
-
language
string
(選填)
用法
type: Combine
input: ...
config:
group_by:
- "group_by"
- "group_by"
- ...
combine:
a:
a: combine_value_a_value_a
b: combine_value_a_value_b
c: ...
b:
a: combine_value_b_value_a
b: combine_value_b_value_b
c: ...
c: ...
language: "language"
Create
建立一個包含指定元素集合的集合。
此轉換始終產生 schema'd 數據。例如:
type: Create
config:
elements: [1, 2, 3]
將產生一個具有三個元素的輸出,其 schema 為 Row(element=int),而 YAML/JSON 樣式的映射將直接被解釋為 Beam 列,例如:
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
- {first: 1, second: {str: "bar", values: [4, 5, 6]}}
將產生 (int, Row(string, List[int])) 形式的 schema。
這也可以表示為 YAML
type: Create
config:
elements:
- first: 0
second:
str: "foo"
values: [1, 2, 3]
- first: 1
second:
str: "bar"
values: [4, 5, 6]
配置
-
elements
Array[?]
: 應該屬於 PCollection 的元素集合。YAML/JSON 樣式的映射將被解釋為 Beam 列。基本型別將會被映射到具有單一 "element" 欄位的列。 -
reshuffle
boolean
(選填) : (選填)如果集合中有多個元素,是否引入 reshuffle(以可能重新分配工作)。預設為 True。
用法
type: Create
config:
elements:
- element
- element
- ...
reshuffle: true|false
Explode
展開(又稱取消巢狀/扁平化)一個或多個欄位,產生多列。
給定一個或多個可迭代類型的欄位,產生多列,每個欄位的值對應一列。例如,當在第二個欄位上展開時,`('a', [1, 2, 3])` 形式的列將展開為 `('a', 1)`、`('a', 2')` 和 `('a', 3)`。
當與 MapToFields 轉換配對時,這類似於 `FlatMap`。
請參閱有關 YAML 映射函數 的更完整的文件。
配置
-
fields
?
(選填) : 要展開的欄位列表。 -
cross_product
boolean
(選填) : 如果指定了多個欄位,則表示是否應產生組合的完整叉積,還是第一個欄位的第一個元素對應於第二個欄位的第一個元素,依此類推。例如,當 `cross_product` 設置為 `true` 時,列 `(['a', 'b'], [1, 2])` 將展開為四列 `('a', 1)`、`('a', 2)`、`('b', 1)` 和 `('b', 2)`,但當設置為 `false` 時,僅展開為兩列 `('a', 1)` 和 `('b', 2)`。僅當指定多列時才有意義(且為必填)。 -
error_handling
Map[string, ?]
(選填) : (選填) 是否以及如何處理迭代期間的錯誤。
用法
type: Explode
input: ...
config:
fields: fields
cross_product: true|false
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
Filter
僅保留符合指定條件的記錄。
請參閱有關 YAML 過濾 的更完整的文件。
支援的語言:calcite、通用、java、javascript、python、sql
配置
-
keep
?
(選填) -
language
string
(選填) -
error_handling
Row
Row 欄位
- output
string
- output
用法
type: Filter
input: ...
config:
keep: keep
language: "language"
error_handling:
output: "output"
Flatten
將多個 PCollection 扁平化為單個 PCollection。
結果 PCollection 的元素將是所有輸入的所有元素的(不相交)聯集。
請注意,在 YAML 中,轉換始終可以採用將被隱式扁平化的輸入列表。
配置
沒有配置參數。
用法
type: Flatten
input: ...
config: ...
Join
使用指定的條件聯結兩個或多個輸入。
例如:
type: Join
input:
input1: SomeTransform
input2: AnotherTransform
input3: YetAnotherTransform
config:
type: inner
equalities:
- input1: colA
input2: colB
- input2: colX
input3: colY
fields:
input1: [colA, colB, colC]
input2: {new_name: colB}
將在三個輸入上執行內部聯結,滿足 `input1.colA = input2.colB` 和 `input2.colX = input3.colY` 的條件,輸出具有來自 `input1` 的 `colA`、`colB` 和 `colC` 的列、作為名為 `new_name` 的欄位的 `input2.colB` 的值以及來自 `input3` 的所有欄位的列。
配置
-
equalities
?
(選填) : 要聯結的條件。應該相等以滿足聯結條件的欄位集合的列表。對於跨所有輸入在相同欄位上聯結的簡單情況,其中欄位名稱相同,可以將欄位名稱指定為相等性,而不必為每個輸入列出它。 -
type
?
(選填) : 聯結的類型。可以是 ["inner", "left", "right", "outer"] 中的字串值,指定要執行的聯結類型。對於具有多個要聯結的輸入且需要不同聯結類型的場景,請指定要外部聯結的輸入。例如,`{outer: [input1, input2]}` 表示 `input1` 和 `input2` 將使用指定的條件進行外部聯結,而其他輸入將進行內部聯結。 -
fields
Map[string, ?]
(選填) : 要輸出的欄位。一個以輸入別名為鍵,輸入中要輸出的欄位列表為值的映射。映射中的值可以是字典,其中新欄位名稱為鍵,原始欄位名稱為值(例如 new_field_name: field_name),也可以是要輸出且帶有原始名稱的欄位列表(例如 `[col1, col2, col3]`),或表示將輸出輸入中所有欄位的 '*'。如果未指定,則將輸出所有輸入的所有欄位。
用法
type: Join
input: ...
config:
equalities: equalities
type: type
fields:
a: fields_value_a
b: fields_value_b
c: ...
LogForTesting
記錄其輸入 PCollection 的每個元素。
此轉換的輸出是其輸入的副本,以便在鏈式管道中輕鬆使用。
配置
-
level
string
(選填) : ERROR、INFO 或 DEBUG 之一,映射到相應的特定語言的日誌記錄級別 -
prefix
string
(選填) : 一個可選的識別符號,將被前置到正在記錄的元素
用法
type: LogForTesting
input: ...
config:
level: "level"
prefix: "prefix"
MLTransform
配置
-
write_artifact_location
string
(選填) -
read_artifact_location
string
(選填) -
transforms
Array[?]
(選填)
用法
type: MLTransform
input: ...
config:
write_artifact_location: "write_artifact_location"
read_artifact_location: "read_artifact_location"
transforms:
- transforms
- transforms
- ...
MapToFields
建立具有根據輸入欄位定義的新欄位的記錄。
請參閱有關 YAML 映射函數 的更完整的文件。
支援的語言:calcite、通用、java、javascript、python、sql
配置
-
language
?
(選填) -
error_handling
Row
Row 欄位
- output
string
- output
-
mapping_args
?
(選填)
用法
type: MapToFields
input: ...
config:
language: language
error_handling:
output: "output"
mapping_args: mapping_args
Partition
將輸入拆分為多個不同的輸出。
每個輸入元素將根據 `by` 配置參數中給定的欄位或函數進入不同的輸出。
支援的語言:通用、javascript、python
配置
-
by
?
(選填) : 一個欄位、可呼叫物件或表示式,用於指定此元素的目標輸出。應返回一個字串,該字串是 `outputs` 參數的成員。如果同時設置了 `unknown_output`,則也會接受其他返回值,否則會引發錯誤。 -
outputs
Array[string]
: 此輸入正在被分割到的輸出集合。 -
unknown_output
string
(選填) : (選填)如果設置,則表示未分配給 `outputs` 參數中列出的輸出的任何元素的目標輸出。 -
error_handling
Map[string, ?]
(選填) : (選填) 是否以及如何處理分割期間的錯誤。 -
language
string
(選填) : (選填) `by` 表示式的語言。
用法
type: Partition
input: ...
config:
by: by
outputs:
- "outputs"
- "outputs"
- ...
unknown_output: "unknown_output"
error_handling:
a: error_handling_value_a
b: error_handling_value_b
c: ...
language: "language"
PyTransform
由完整限定名稱識別的 Python PTransform。
這允許導入、建構和應用任何 Beam Python 轉換。這對於使用尚未透過 YAML 介面公開的轉換可能很有用。但是,請注意,如果此轉換不接受或產生 Beam 列,則可能需要轉換。
例如:
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
可用於存取轉換 `apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)`。
另請參閱有關 內聯 Python 的文件。
配置
-
constructor
string
: 用於建構轉換的可呼叫物件的完整限定名稱。通常這是一個類別,例如 `apache_beam.pkg.mod.SomeClass`,但它也可以是返回 PTransform 的函數或任何其他可呼叫物件。 -
args
Array[?]
(選填) : 要作為位置引數傳遞給可呼叫物件的參數列表。 -
kwargs
Map[string, ?]
(選用) : 要傳遞給可呼叫物件的關鍵字參數列表。
用法
type: PyTransform
input: ...
config:
constructor: "constructor"
args:
- arg
- arg
- ...
kwargs:
a: kwargs_value_a
b: kwargs_value_b
c: ...
Sql
配置
用法
type: Sql
input: ...
config: ...
WindowInto
一個視窗轉換,將視窗分配給 PCollection 的每個元素。
分配的視窗將影響所有下游的聚合操作,這些操作將按視窗和鍵進行聚合。
請參閱關於視窗化的 Beam 文件以了解更多詳細資訊。
大小、偏移量、週期和間隔(如果適用)必須使用時間單位後綴 'ms'、's'、'm'、'h' 或 'd' 分別表示毫秒、秒、分鐘、小時或天。 如果未指定時間單位,則預設為 's'。
例如:
windowing:
type: fixed
size: 30s
請注意,任何 Yaml 轉換都可以具有視窗化參數,該參數將應用於其輸入(如果有)或輸出(如果沒有輸入),這表示通常不需要明確的 WindowInto 操作。
配置
- windowing
?
(選用) : 要執行的視窗化類型和參數
用法
type: WindowInto
input: ...
config:
windowing: windowing
ReadFromAvro
一個用於從 avro 檔案讀取記錄的 PTransform
。
結果 PCollection 的每個記錄都將包含從來源讀取的單一記錄。屬於簡單類型的記錄將映射到具有包含記錄值的單一 record
欄位的 Beam 列。屬於 Avro 類型 RECORD
的記錄將映射到符合包含這些記錄的 Avro 檔案中結構描述的 Beam 列。
配置
- path
?
(選用)
用法
type: ReadFromAvro
config:
path: path
WriteToAvro
一個用於寫入 avro 檔案的 PTransform
。
如果輸入具有結構描述,將會自動產生對應的 avro 結構描述,並用於寫入輸出記錄。
配置
- path
?
(選用)
用法
type: WriteToAvro
input: ...
config:
path: path
ReadFromBigQuery
從 BigQuery 讀取資料。
必須設定 table 或 query 其中一個。 如果設定 query,則不應設定 row_restriction 或 fields。
配置
-
table
string
(選用) : 要從中讀取的資料表,指定為DATASET.TABLE
或PROJECT:DATASET.TABLE
。 -
query
string
(選用) : 用來代替 table 參數的查詢。 -
row_restriction
string
(選用) : 可選的 SQL 文字篩選語句,類似於查詢中的 WHERE 子句。 不支援聚合。 限制為最大長度 1 MB。 -
fields
Array[string]
(選用)
用法
type: ReadFromBigQuery
config:
table: "table"
query: "query"
row_restriction: "row_restriction"
fields:
- "field"
- "field"
- ...
WriteToBigQuery
配置
用法
type: WriteToBigQuery
input: ...
config: ...
ReadFromCsv
一個將逗號分隔值 (csv) 檔案讀取到 PCollection 的 PTransform。
配置
-
path
string
: 要從中讀取檔案的路徑。 此路徑可以包含萬用字元,例如*
和?
。 -
delimiter
?
(選用) -
comment
?
(選用)
用法
type: ReadFromCsv
config:
path: "path"
delimiter: delimiter
comment: comment
WriteToCsv
一個將具有結構描述的 PCollection 作為(一組)逗號分隔值 (csv) 檔案寫入的 PTransform。
配置
-
path
string
: 要寫入的檔案路徑。 寫入的檔案將以此字首開頭,然後是根據file_naming
參數的分片識別碼(請參閱num_shards
)。 -
delimiter
?
(選用)
用法
type: WriteToCsv
input: ...
config:
path: "path"
delimiter: delimiter
ReadFromJdbc
配置
用法
type: ReadFromJdbc
config: ...
WriteToJdbc
配置
用法
type: WriteToJdbc
input: ...
config: ...
ReadFromJson
一個將 json 值從檔案讀取到 PCollection 的 PTransform。
配置
- path
string
: 要從中讀取檔案的路徑。 此路徑可以包含萬用字元,例如*
和?
。
用法
type: ReadFromJson
config:
path: "path"
WriteToJson
一個將 PCollection 作為 json 值寫入檔案的 PTransform。
配置
- path
string
: 要寫入的檔案路徑。 寫入的檔案將以此字首開頭,然後是根據file_naming
參數的分片識別碼(請參閱num_shards
)。
用法
type: WriteToJson
input: ...
config:
path: "path"
ReadFromKafka
配置
用法
type: ReadFromKafka
config: ...
WriteToKafka
配置
用法
type: WriteToKafka
input: ...
config: ...
ReadFromMySql
配置
用法
type: ReadFromMySql
config: ...
WriteToMySql
配置
用法
type: WriteToMySql
input: ...
config: ...
ReadFromOracle
配置
用法
type: ReadFromOracle
config: ...
WriteToOracle
配置
用法
type: WriteToOracle
input: ...
config: ...
ReadFromParquet
一個用於讀取 Parquet 檔案的 PTransform
。
配置
- path
?
(選用)
用法
type: ReadFromParquet
config:
path: path
WriteToParquet
一個用於寫入 Parquet 檔案的 PTransform
。
配置
- path
?
(選用)
用法
type: WriteToParquet
input: ...
config:
path: path
ReadFromPostgres
配置
用法
type: ReadFromPostgres
config: ...
WriteToPostgres
配置
用法
type: WriteToPostgres
input: ...
config: ...
ReadFromPubSub
從 Cloud Pub/Sub 讀取訊息。
配置
-
topic
string
(選用) : Cloud Pub/Sub 主題,格式為 "projects//topics/ "。 如果提供,subscription 必須為 None。 -
subscription
string
(選用) : 要使用的現有 Cloud Pub/Sub 訂閱,格式為 "projects//subscriptions/ "。 如果未指定,將從指定的主題建立臨時訂閱。 如果提供,topic 必須為 None。 -
format
string
: 訊息酬載的預期格式。 目前支援的格式為- RAW:產生具有單一
payload
欄位的記錄,其內容是 pubsub 訊息的原始位元組。 - AVRO:使用給定的 Avro 結構描述剖析記錄。
- JSON:使用給定的 JSON 結構描述剖析記錄。
- RAW:產生具有單一
-
schema
?
(選用) : 給定格式的結構描述規範。 -
attributes
Array[string]
(選用) : 其值將作為其他欄位攤平到輸出訊息中的屬性鍵清單。 例如,如果格式為raw
且 attributes 為["a", "b"]
,則此讀取將產生Row(payload=..., a=..., b=...)
形式的元素。 -
attributes_map
string
(選用) -
id_attribute
string
(選用) : 輸入 Pub/Sub 訊息上用作唯一記錄識別碼的屬性。 當指定時,此屬性的值(可以是唯一識別記錄的任何字串)將用於訊息的重複資料刪除。 如果未提供,則我們無法保證 Pub/Sub 串流上不會傳遞重複資料。 在此情況下,串流的重複資料刪除將會盡力而為。 -
timestamp_attribute
string
(選用) : 用作元素時間戳記的訊息值。 如果為 None,則使用訊息發佈時間作為時間戳記。時間戳記值應採用以下兩種格式之一
- 表示自 Unix epoch 以來毫秒數的數值。
- UTC 時區的 RFC 3339 格式字串。 範例:
2015-10-29T23:41:41.123Z
。 時間戳記的次秒元件是可選的,並且可以忽略前三個數字以外的數字(即小於毫秒的時間單位)。
-
error_handling
Row
Row 欄位
- output
string
- output
用法
type: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
WriteToPubSub
將訊息寫入 Cloud Pub/Sub。
配置
-
topic
string
: Cloud Pub/Sub 主題,格式為 "/topics// ". -
format
string
: 如何格式化訊息酬載。 目前支援的格式為- RAW:預期具有單一欄位(不包括與屬性相關的欄位)的訊息,其內容用作 pubsub 訊息的原始位元組。
- AVRO:使用給定的 Avro 結構描述編碼記錄,可以從輸入 PCollection 結構描述推斷出來。
- JSON:使用給定的 JSON 結構描述格式化記錄,可以從輸入 PCollection 結構描述推斷出來。
-
schema
?
(選用) : 給定格式的結構描述規範。 -
attributes
Array[string]
(選用) : 其值將作為 PubSub 訊息屬性提取的屬性鍵清單。 例如,如果格式為raw
且 attributes 為["a", "b"]
,則Row(any_field=..., a=..., b=...)
形式的元素將產生酬載具有 any_field 內容且屬性將填入a
和b
值的 PubSub 訊息。 -
attributes_map
string
(選用) -
id_attribute
string
(選用) : 如果設定,將會為每個 Cloud Pub/Sub 訊息設定一個具有給定名稱和唯一值的屬性。 然後,可以在 ReadFromPubSub PTransform 中使用此屬性來刪除重複的訊息。 -
timestamp_attribute
string
(選用) : 如果設定,將會為每個 Cloud Pub/Sub 訊息設定一個具有給定名稱和訊息發佈時間的屬性值。 -
error_handling
Row
Row 欄位
- output
string
- output
用法
type: WriteToPubSub
input: ...
config:
topic: "topic"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
ReadFromPubSubLite
配置
用法
type: ReadFromPubSubLite
config: ...
WriteToPubSubLite
配置
用法
type: WriteToPubSubLite
input: ...
config: ...
ReadFromSpanner
配置
用法
type: ReadFromSpanner
config: ...
WriteToSpanner
配置
用法
type: WriteToSpanner
input: ...
config: ...
ReadFromSqlServer
配置
用法
type: ReadFromSqlServer
config: ...
WriteToSqlServer
配置
用法
type: WriteToSqlServer
input: ...
config: ...
ReadFromText
從文字檔案讀取行。
結果 PCollection 由具有名為「line」的單一字串欄位的列組成。
配置
- path
string
: 要從中讀取檔案的路徑。 此路徑可以包含萬用字元,例如*
和?
。
用法
type: ReadFromText
config:
path: "path"
WriteToText
將 PCollection 寫入到(一組)文字檔案。
輸入必須是其結構描述恰好有一個欄位的 PCollection。
配置
- path
string
: 要寫入的檔案路徑。 寫入的檔案將以此字首開頭,後接分片識別碼。
用法
type: WriteToText
input: ...
config:
path: "path"