部落格
2024/09/13
Beam 中的單元測試:一份帶有主觀意見的指南
測試仍然是軟體工程最基本的組成部分之一。在這篇部落格文章中,我們將闡明 Apache Beam 為測試提供的一些結構。我們涵蓋了一套有主觀意見的最佳實務,來為您的資料管道編寫單元測試。這篇文章不包含整合測試,您需要另外撰寫這些測試。本文中的所有程式碼片段都包含在這個筆記本中。此外,要查看展現最佳實務的測試,請查看Beam 起始專案,其中包含展現最佳實務的測試。
最佳實務
在測試 Beam 管道時,我們建議以下最佳實務
不要為 Beam 程式庫中已支援的連接器編寫單元測試,例如
ReadFromBigQuery
和WriteToText
。這些連接器已經在 Beam 的測試套件中進行測試,以確保功能正確。它們會為單元測試增加不必要的成本和相依性。當使用
Map
、FlatMap
或Filter
時,請確保您的函式經過充分測試。當使用Map(your_function)
時,您可以假設您的函式將按預期運作。對於更複雜的轉換,例如
ParDo
、側輸入、時間戳記檢查等,請將整個轉換視為一個單元並進行測試。如果需要,請使用 Mocking 來 Mock 您 DoFn 中可能存在的任何 API 呼叫。Mocking 的目的是廣泛測試您的功能,即使此測試需要來自 API 呼叫的特定回應。
- 請務必將您的 API 呼叫模組化為個別函式,而不是直接在
DoFn
中進行 API 呼叫。當 Mock 外部 API 呼叫時,此步驟可提供更清晰的體驗。
- 請務必將您的 API 呼叫模組化為個別函式,而不是直接在
範例 1
以下列管道作為範例。假設函式 median_house_value_per_bedroom
已在程式碼的其他位置進行單元測試,則您不必編寫單獨的單元測試來在此管道的上下文中測試此函式。您可以信任 Map
基本型別會如預期般運作(這說明了先前說明的第 2 點)。
# The following code computes the median house value per bedroom.
with beam.Pipeline() as p1:
result = (
p1
| ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
| beam.Map(median_house_value_per_bedroom)
| WriteToText("/content/example2")
)
範例 2
以下列函式作為範例。函式 median_house_value_per_bedroom
和 multiply_by_factor
在其他地方進行測試,但整個由複合轉換組成的管道沒有進行測試。
with beam.Pipeline() as p2:
result = (
p2
| ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
| beam.Map(median_house_value_per_bedroom)
| beam.Map(multiply_by_factor)
| beam.CombinePerKey(sum)
| WriteToText("/content/example3")
)
先前程式碼的最佳實務是建立一個轉換,其中包含 ReadFromText
和 WriteToText
之間的所有函式。此步驟將轉換邏輯與 I/O 分開,讓您可以對轉換邏輯進行單元測試。以下範例是先前程式碼的重構
def transform_data_set(pcoll):
return (pcoll
| beam.Map(median_house_value_per_bedroom)
| beam.Map(multiply_by_factor)
| beam.CombinePerKey(sum))
# Define a new class that inherits from beam.PTransform.
class MapAndCombineTransform(beam.PTransform):
def expand(self, pcoll):
return transform_data_set(pcoll)
with beam.Pipeline() as p2:
result = (
p2
| ReadFromText("/content/sample_data/california_housing_test.csv",skip_header_lines=1)
| MapAndCombineTransform()
| WriteToText("/content/example3")
)
此程式碼顯示先前範例的對應單元測試
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestBeam(unittest.TestCase):
# This test corresponds to example 3, and is written to confirm the pipeline works as intended.
def test_transform_data_set(self):
expected=[(1, 10570.185786231425), (2, 13.375337533753376), (3, 13.315649867374006)]
input_elements = [
'-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000',
'121.05,99.99,23.30,39.5,55.55,41.01,10,34,74.30,91.91',
'122.05,100.99,24.30,40.5,56.55,42.01,11,35,75.30,92.91',
'-120.05,39.37,29.00,4085.00,681.00,1557.00,626.00,6.8085,364700.00'
]
with beam.Pipeline() as p2:
result = (
p2
| beam.Create(input_elements)
| beam.Map(MapAndCombineTransform())
)
assert_that(result,equal_to(expected))
範例 3
假設我們編寫一個從 JSON 檔案讀取資料的管道,將其傳遞給一個用於解析的自訂函式(會進行外部 API 呼叫),然後將其寫入自訂目的地(例如,如果我們需要執行一些自訂資料格式化,以便為下游應用程式準備資料)。
管道具有以下結構
# The following packages are used to run the example pipelines.
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class MyDoFn(beam.DoFn):
def process(self,element):
returned_record = MyApiCall.get_data("http://my-api-call.com")
if len(returned_record)!=10:
raise ValueError("Length of record does not match expected length")
yield returned_record
with beam.Pipeline() as p3:
result = (
p3
| ReadFromText("/content/sample_data/anscombe.json")
| beam.ParDo(MyDoFn())
| WriteToText("/content/example1")
)
此測試檢查 API 回應是否為長度錯誤的記錄,如果測試失敗,則會擲回預期的錯誤。
!pip install mock # Install the 'mock' module.
# Import the mock package for mocking functionality.
from unittest.mock import Mock,patch
# from MyApiCall import get_data
import mock
# MyApiCall is a function that calls get_data to fetch some data by using an API call.
@patch('MyApiCall.get_data')
def test_error_message_wrong_length(self, mock_get_data):
response = ['field1','field2']
mock_get_data.return_value = Mock()
mock_get_data.return_value.json.return_value=response
input_elements = ['-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000'] #input length 9
with self.assertRaisesRegex(ValueError,
"Length of record does not match expected length'"):
p3 = beam.Pipeline()
result = p3 | beam.create(input_elements) | beam.ParDo(MyDoFn())
result
其他測試最佳實務
- 測試您引發的所有錯誤訊息。
- 涵蓋您資料中可能存在的任何邊緣案例。
- 範例 1 可以使用 Lambda 函式編寫
beam.Map
步驟,而不是使用beam.Map(median_house_value_per_bedroom)
beam.Map(lambda x: x.strip().split(',')) | beam.Map(lambda x: float(x[8])/float(x[4])
建議使用 beam.Map(median_house_value_per_bedroom)
將 Lambda 分隔到協助程式函式中,以獲得更易於測試的程式碼,因為對函式的變更將會模組化。
- 使用
assert_that
陳述式來確保PCollection
值正確匹配,如先前的範例所示。
有關在 Beam 和 Dataflow 上測試的更多指南,請參閱Google Cloud 文件。有關 Beam 中單元測試的更多範例,請參閱 base_test.py
程式碼。
特別感謝 Robert Bradshaw、Danny McCormick、XQ Hu、Surjit Singh 和 Rebecca Spzer,他們協助完善了這篇文章中的想法。