Beam 中的單元測試:一份帶有主觀意見的指南

測試仍然是軟體工程最基本的組成部分之一。在這篇部落格文章中,我們將闡明 Apache Beam 為測試提供的一些結構。我們涵蓋了一套有主觀意見的最佳實務,來為您的資料管道編寫單元測試。這篇文章不包含整合測試,您需要另外撰寫這些測試。本文中的所有程式碼片段都包含在這個筆記本中。此外,要查看展現最佳實務的測試,請查看Beam 起始專案,其中包含展現最佳實務的測試。

最佳實務

在測試 Beam 管道時,我們建議以下最佳實務

  1. 不要為 Beam 程式庫中已支援的連接器編寫單元測試,例如 ReadFromBigQueryWriteToText。這些連接器已經在 Beam 的測試套件中進行測試,以確保功能正確。它們會為單元測試增加不必要的成本和相依性。

  2. 當使用 MapFlatMapFilter 時,請確保您的函式經過充分測試。當使用 Map(your_function) 時,您可以假設您的函式將按預期運作。

  3. 對於更複雜的轉換,例如 ParDo、側輸入、時間戳記檢查等,請將整個轉換視為一個單元並進行測試。

  4. 如果需要,請使用 Mocking 來 Mock 您 DoFn 中可能存在的任何 API 呼叫。Mocking 的目的是廣泛測試您的功能,即使此測試需要來自 API 呼叫的特定回應。

    1. 請務必將您的 API 呼叫模組化為個別函式,而不是直接在 DoFn 中進行 API 呼叫。當 Mock 外部 API 呼叫時,此步驟可提供更清晰的體驗。

範例 1

以下列管道作為範例。假設函式 median_house_value_per_bedroom 已在程式碼的其他位置進行單元測試,則您不必編寫單獨的單元測試來在此管道的上下文中測試此函式。您可以信任 Map 基本型別會如預期般運作(這說明了先前說明的第 2 點)。

# The following code computes the median house value per bedroom.

with beam.Pipeline() as p1:
   result = (
       p1
       | ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
       | beam.Map(median_house_value_per_bedroom)
       | WriteToText("/content/example2")
   )

範例 2

以下列函式作為範例。函式 median_house_value_per_bedroommultiply_by_factor 在其他地方進行測試,但整個由複合轉換組成的管道沒有進行測試。

with beam.Pipeline() as p2:
    result = (
        p2
        | ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
        | beam.Map(median_house_value_per_bedroom)
        | beam.Map(multiply_by_factor)
        | beam.CombinePerKey(sum)
        | WriteToText("/content/example3")
    )

先前程式碼的最佳實務是建立一個轉換,其中包含 ReadFromTextWriteToText 之間的所有函式。此步驟將轉換邏輯與 I/O 分開,讓您可以對轉換邏輯進行單元測試。以下範例是先前程式碼的重構

def transform_data_set(pcoll):
  return (pcoll
          | beam.Map(median_house_value_per_bedroom)
          | beam.Map(multiply_by_factor)
          | beam.CombinePerKey(sum))

# Define a new class that inherits from beam.PTransform.
class MapAndCombineTransform(beam.PTransform):
  def expand(self, pcoll):
    return transform_data_set(pcoll)

with beam.Pipeline() as p2:
   result = (
       p2
       | ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
       | MapAndCombineTransform()
       | WriteToText("/content/example3")
   )

此程式碼顯示先前範例的對應單元測試

import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestBeam(unittest.TestCase):

# This test corresponds to example 3, and is written to confirm the pipeline works as intended.
  def test_transform_data_set(self):
    expected=[(1, 10570.185786231425), (2, 13.375337533753376), (3, 13.315649867374006)]
    input_elements = [
      '-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000',
      '121.05,99.99,23.30,39.5,55.55,41.01,10,34,74.30,91.91',
      '122.05,100.99,24.30,40.5,56.55,42.01,11,35,75.30,92.91',
      '-120.05,39.37,29.00,4085.00,681.00,1557.00,626.00,6.8085,364700.00'
    ]
    with beam.Pipeline() as p2:
      result = (
                p2
                | beam.Create(input_elements)
                | beam.Map(MapAndCombineTransform())
        )
      assert_that(result,equal_to(expected))

範例 3

假設我們編寫一個從 JSON 檔案讀取資料的管道,將其傳遞給一個用於解析的自訂函式(會進行外部 API 呼叫),然後將其寫入自訂目的地(例如,如果我們需要執行一些自訂資料格式化,以便為下游應用程式準備資料)。

管道具有以下結構

# The following packages are used to run the example pipelines.

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

class MyDoFn(beam.DoFn):
  def process(self,element):
          returned_record = MyApiCall.get_data("http://my-api-call.com")
          if len(returned_record)!=10:
            raise ValueError("Length of record does not match expected length")
          yield returned_record

with beam.Pipeline() as p3:
  result = (
          p3
          | ReadFromText("/content/sample_data/anscombe.json")
          | beam.ParDo(MyDoFn())
          | WriteToText("/content/example1")
  )

此測試檢查 API 回應是否為長度錯誤的記錄,如果測試失敗,則會擲回預期的錯誤。

!pip install mock  # Install the 'mock' module.
# Import the mock package for mocking functionality.
from unittest.mock import Mock,patch
# from MyApiCall import get_data
import mock


# MyApiCall is a function that calls get_data to fetch some data by using an API call.
@patch('MyApiCall.get_data')
def test_error_message_wrong_length(self, mock_get_data):
 response = ['field1','field2']
 mock_get_data.return_value = Mock()
 mock_get_data.return_value.json.return_value=response

 input_elements = ['-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000'] #input length 9
 with self.assertRaisesRegex(ValueError,
                             "Length of record does not match expected length'"):
     p3 = beam.Pipeline()
     result = p3 | beam.create(input_elements) | beam.ParDo(MyDoFn())
     result

其他測試最佳實務

  1. 測試您引發的所有錯誤訊息。
  2. 涵蓋您資料中可能存在的任何邊緣案例。
  3. 範例 1 可以使用 Lambda 函式編寫 beam.Map 步驟,而不是使用 beam.Map(median_house_value_per_bedroom)
beam.Map(lambda x: x.strip().split(',')) | beam.Map(lambda x: float(x[8])/float(x[4])

建議使用 beam.Map(median_house_value_per_bedroom) 將 Lambda 分隔到協助程式函式中,以獲得更易於測試的程式碼,因為對函式的變更將會模組化。

  1. 使用 assert_that 陳述式來確保 PCollection 值正確匹配,如先前的範例所示。

有關在 Beam 和 Dataflow 上測試的更多指南,請參閱Google Cloud 文件。有關 Beam 中單元測試的更多範例,請參閱 base_test.py 程式碼

特別感謝 Robert Bradshaw、Danny McCormick、XQ Hu、Surjit Singh 和 Rebecca Spzer,他們協助完善了這篇文章中的想法。