Beam YAML 錯誤處理

隨著管道規模越大,越常遇到格式錯誤、無法處理正確的先決條件,或在處理過程中中斷的「異常」資料。通常任何此類記錄都會導致管道永久失敗,但通常希望允許管道繼續執行,將錯誤記錄重新導向到另一個路徑進行特殊處理,或僅記錄下來以供稍後離線分析。這通常稱為「死信佇列」模式。

如果轉換支援帶有 output 欄位的 error_handling 組態參數,則 Beam YAML 對此模式有特殊支援。output 參數是一個名稱,必須將其引用為將處理錯誤的另一個轉換的輸入 (例如,將它們寫出)。例如,以下程式碼會將所有「良好」的已處理記錄寫入一個檔案,並將任何「錯誤」的記錄連同有關遇到什麼錯誤的中繼資料寫入另一個檔案。

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

請注意,在宣告 error_handling 的情況下,必須取用 MapToFields.my_error_output;忽略它將會是一個錯誤。任何使用都是可以的,例如,將錯誤記錄記錄到 stdout 就足夠了 (雖然不建議用於穩健的管道)。

另請注意,錯誤輸出的確切格式仍在最終確定中。它們可以安全地列印並寫入輸出,但它們的精確結構描述可能會在未來版本的 Beam 中變更,因此不應過於依賴。目前,它至少有一個 element 欄位,其中包含導致錯誤的元素。

某些轉換允許在其 error_handling 組態中包含額外的引數,例如,對於 Python 函數,可以給定一個 threshold,它會限制在將整個管道視為失敗之前,可能出現的錯誤記錄的相對數量

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output
          # If more than 10% of records throw an error, stop the pipeline.
          threshold: 0.1

    - type: WriteToJson
      input: MapToFields
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      input: MapToFields.my_error_output
      config:
        path: /path/to/errors.json

如果需要,可以對這些失敗的記錄進行任意進一步的處理,例如:

pipeline:
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: ComputeRatio
      input: ReadFromCsv
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: my_error_output

    - type: MapToFields
      name: ComputeRatioForBadRecords
      input: ComputeRatio.my_error_output
      config:
        language: python
        fields:
          col1: col1
          ratio: col2 / (col3 + 1)
        error_handling:
          output: still_bad

    - type: WriteToJson
      # Takes as input everything from the "success" path of both transforms.
      input: [ComputeRatio, ComputeRatioForBadRecords]
      config:
        path: /path/to/output.json

    - type: WriteToJson
      name: WriteErrorsToJson
      # These failed the first and the second transform.
      input: ComputeRatioForBadRecords.still_bad
      config:
        path: /path/to/errors.json

使用 chain 語法時,所需的錯誤取用可以在 extra_transforms 區塊中發生。

pipeline:
  type: chain
  transforms:
    - type: ReadFromCsv
      config:
        path: /path/to/input*.csv

    - type: MapToFields
      name: SomeStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          ratio: col2 / col3
        error_handling:
          output: errors

    - type: MapToFields
      name: AnotherStep
      config:
        language: python
        fields:
          col1: col1
          # This could raise a divide-by-zero error.
          inverse_ratio: 1 / ratio
        error_handling:
          output: errors

    - type: WriteToJson
      config:
        path: /path/to/output.json

  extra_transforms:
    - type: WriteToJson
      name: WriteErrors
      input: [SomeStep.errors, AnotherStep.errors]
      config:
        path: /path/to/errors.json