Beam YAML 錯誤處理
隨著管道規模越大,越常遇到格式錯誤、無法處理正確的先決條件,或在處理過程中中斷的「異常」資料。通常任何此類記錄都會導致管道永久失敗,但通常希望允許管道繼續執行,將錯誤記錄重新導向到另一個路徑進行特殊處理,或僅記錄下來以供稍後離線分析。這通常稱為「死信佇列」模式。
如果轉換支援帶有 output
欄位的 error_handling
組態參數,則 Beam YAML 對此模式有特殊支援。output
參數是一個名稱,必須將其引用為將處理錯誤的另一個轉換的輸入 (例如,將它們寫出)。例如,以下程式碼會將所有「良好」的已處理記錄寫入一個檔案,並將任何「錯誤」的記錄連同有關遇到什麼錯誤的中繼資料寫入另一個檔案。
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
- type: WriteToJson
input: MapToFields
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
input: MapToFields.my_error_output
config:
path: /path/to/errors.json
請注意,在宣告 error_handling
的情況下,必須取用 MapToFields.my_error_output
;忽略它將會是一個錯誤。任何使用都是可以的,例如,將錯誤記錄記錄到 stdout 就足夠了 (雖然不建議用於穩健的管道)。
另請注意,錯誤輸出的確切格式仍在最終確定中。它們可以安全地列印並寫入輸出,但它們的精確結構描述可能會在未來版本的 Beam 中變更,因此不應過於依賴。目前,它至少有一個 element
欄位,其中包含導致錯誤的元素。
某些轉換允許在其 error_handling 組態中包含額外的引數,例如,對於 Python 函數,可以給定一個 threshold
,它會限制在將整個管道視為失敗之前,可能出現的錯誤記錄的相對數量
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
# If more than 10% of records throw an error, stop the pipeline.
threshold: 0.1
- type: WriteToJson
input: MapToFields
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
input: MapToFields.my_error_output
config:
path: /path/to/errors.json
如果需要,可以對這些失敗的記錄進行任意進一步的處理,例如:
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
name: ComputeRatio
input: ReadFromCsv
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: my_error_output
- type: MapToFields
name: ComputeRatioForBadRecords
input: ComputeRatio.my_error_output
config:
language: python
fields:
col1: col1
ratio: col2 / (col3 + 1)
error_handling:
output: still_bad
- type: WriteToJson
# Takes as input everything from the "success" path of both transforms.
input: [ComputeRatio, ComputeRatioForBadRecords]
config:
path: /path/to/output.json
- type: WriteToJson
name: WriteErrorsToJson
# These failed the first and the second transform.
input: ComputeRatioForBadRecords.still_bad
config:
path: /path/to/errors.json
使用 chain
語法時,所需的錯誤取用可以在 extra_transforms
區塊中發生。
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: MapToFields
name: SomeStep
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
ratio: col2 / col3
error_handling:
output: errors
- type: MapToFields
name: AnotherStep
config:
language: python
fields:
col1: col1
# This could raise a divide-by-zero error.
inverse_ratio: 1 / ratio
error_handling:
output: errors
- type: WriteToJson
config:
path: /path/to/output.json
extra_transforms:
- type: WriteToJson
name: WriteErrors
input: [SomeStep.errors, AnotherStep.errors]
config:
path: /path/to/errors.json