在 Apache Beam 中測試 I/O 轉換

測試 Apache Beam I/O 轉換的範例和設計模式

注意:本指南仍在進行中。有一個未解決的問題需要完成本指南:BEAM-1025

簡介

本文件說明 Beam 社群根據我們過去撰寫 I/O 轉換的經驗所建議的一組測試。如果您希望將您的 I/O 轉換貢獻給 Beam 社群,我們會要求您實作這些測試。

雖然編寫單元測試和整合測試是標準做法,但可能有很多定義。我們的定義如下

關於效能基準測試的注意事項

我們不主張編寫單獨的測試專門用於效能基準測試。相反地,我們建議設定整合測試,這些測試可以接受必要的參數以涵蓋許多不同的測試情境。

例如,如果整合測試是按照以下準則編寫的,則整合測試可以在不同的執行器(本地或叢集配置)上,以及針對具有小型資料集的小型執行個體或具有較大資料集的大型生產就緒叢集的資料儲存執行。這可以涵蓋各種情境 - 其中一個是效能基準測試。

測試平衡 - 單元測試與整合測試

使用整合測試很容易涵蓋大量的程式碼,但之後很難找到測試失敗的原因,而且測試也比較不可靠。

但是,有一組有價值的錯誤是透過測試發現的,這些測試會測試多個工作者讀取/寫入具有多個節點的資料儲存執行個體(例如,讀取複本等)。這些情境很難透過單元測試找到,而且我們發現它們通常會在 I/O 轉換中造成錯誤。

我們的測試策略是平衡這兩種矛盾的需求。我們建議在單元測試中盡可能多地進行測試,並編寫一個可以在各種配置中執行的單一小型整合測試。

範例

Java

Python

單元測試

目標

非目標

實作單元測試

關於為所有轉換撰寫單元測試的一般指南,可以在PTransform 風格指南中找到。我們在下面擴展了一些重點。

如果您正在使用 Source API,請確保徹底地對您的程式碼進行單元測試。輕微的實作錯誤可能會導致資料損壞或資料遺失(例如跳過或重複記錄),這可能會讓您的使用者難以偵測。此外,請考慮使用 SourceTestUtilssource_test_utils - 這是測試 Source 實作的關鍵部分。

如果您未使用 Source API,您可以使用 TestPipelinePAssertassert_that 來協助您進行測試。

如果您正在實作寫入功能,您可以使用 TestPipeline 來寫入測試資料,然後使用非 Beam 客戶端讀取並驗證它。

使用偽造

在單元測試中,不要使用 mock(為每個測試預先設定每個呼叫的精確回應),而是使用 fakes。對於 I/O 轉換測試,使用 fakes 的首選方法是使用預先存在的服務的記憶體內/可嵌入版本,但如果不存在,請考慮實作您自己的版本。事實證明,fakes 是「您可以獲得所需的測試條件」和「您不必編寫一百萬個精確的 mock 函式呼叫」之間的正確組合。

網路失敗

為了幫助測試和關注點分離,與網路互動的程式碼應該在與 I/O 轉換不同的類別中處理。建議的設計模式是,一旦您的 I/O 轉換確定讀取或寫入不再可能,它就會拋出例外。

這允許 I/O 轉換的單元測試表現得好像它們具有完美的網路連線,而且它們不需要重試/以其他方式處理網路連線問題。

批次處理

如果您的 I/O 轉換允許批次讀取/寫入,您必須在測試中強制執行批次處理。在您的 I/O 轉換上具有可設定的批次大小選項可以讓這件事很容易發生。這些必須標記為僅供測試。

I/O 轉換整合測試

我們目前沒有 Python I/O 整合測試或非邊界或最終一致的資料儲存整合測試的範例。我們歡迎在這方面的貢獻 - 請聯絡 Beam dev@ 郵件列表以獲取更多資訊。

目標

整合測試、資料儲存和 Kubernetes

為了在真實條件下測試 I/O 轉換,您必須連線到資料儲存實例。

Beam 社群在 Kubernetes 中託管用於整合測試的資料儲存。為了在 Beam 的持續整合環境中執行整合測試,它必須具有設定資料儲存實例的 Kubernetes 腳本。

但是,在本地工作時,沒有使用 Kubernetes 的要求。所有測試基礎架構都允許您傳入連線資訊,因此開發人員可以使用他們偏好的託管基礎架構進行本地開發。

在本機上執行整合測試

您始終可以在自己的機器上執行 IO 整合測試。執行整合測試的高階步驟是

  1. 設定與正在執行的測試相對應的資料儲存。
  2. 執行測試,從剛剛建立的資料儲存傳遞連線資訊給它。
  3. 清除資料儲存。

資料儲存設定/清除

如果您使用 Kubernetes 腳本來託管資料儲存,請確保您可以使用 kubectl 在本地連線到您的叢集。如果您已經設定了自己的資料儲存,您只需要執行以下列表中的步驟 3。

  1. 設定與您要執行的測試相對應的資料儲存。您可以在 .test-infra/kubernetes 中找到目前所有支援的資料儲存的 Kubernetes 腳本。
    1. 在某些情況下,有一個專用的設定腳本 (*.sh)。在其他情況下,您只需執行 kubectl create -f [scriptname] 即可建立資料儲存。您也可以讓 kubernetes.sh 腳本為您執行一些標準步驟。
    2. 慣例規定會有
      1. 資料儲存本身的 yml 腳本,以及一個 NodePort 服務。NodePort 服務會為從同一個子網路連線到 Kubernetes 叢集機器的任何人開啟資料儲存的連接埠。當在 Minikube Kubernetes 引擎上執行腳本時,此類腳本通常很有用。
      2. 一個單獨的腳本,帶有 LoadBalancer 服務。此類服務將為資料儲存公開一個外部 ip。當需要外部存取時(例如在 Jenkins 上),需要此類腳本。
    3. 範例
      1. 對於 JDBC,您可以設定 Postgres:kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
      2. 對於 Elasticsearch,您可以執行設定腳本:bash .test-infra/kubernetes/elasticsearch/setup.sh
  2. 確定服務的 IP 位址
    1. NodePort 服務:kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
    2. LoadBalancer 服務:kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
  3. 使用 integrationTest gradle 任務和測試類別中的說明來執行測試(例如,請參閱 JdbcIOIT.java 中的說明)。
  4. 告訴 Kubernetes 刪除 Kubernetes 腳本中指定的資源
    1. JDBC:kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
    2. Elasticsearch:bash .test-infra/kubernetes/elasticsearch/teardown.sh

執行特定測試

integrationTest 是一個用於執行 IO 整合測試的專用 gradle 任務。

在 Cloud Dataflow 執行器上的使用範例

./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT

在 HDFS 檔案系統和 Direct 執行器上的使用範例

注意:只有當 /etc/hosts 檔案包含帶有 hadoop namenode 和 hadoop datanodes 外部 IP 的條目時,以下設定才有效。請參閱以下說明:小型叢集設定檔大型叢集設定檔

export HADOOP_USER_NAME=root

./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT

參數說明

選項功能
-p sdks/java/io/file-based-io-tests/指定要測試的 I/O 的專案子模組。
-DintegrationTestPipelineOptions直接將管道選項傳遞給正在執行的測試。
-DintegrationTestRunner用於執行測試的執行器。目前可能的選項是:direct、dataflow。
-Dfilesystem(可選,如果適用)用於執行測試的檔案系統。目前可能的選項是:gcs、hdfs、s3。如果未提供,將使用本機檔案系統。
--tests指定要執行的測試(對類別/測試方法的完整限定參照)。

在提取請求上執行整合測試

大多數 IO 整合測試都有專用的 Jenkins 工作,這些工作會定期執行以收集指標並避免回歸。由於 ghprb 外掛程式,也可以在 Github Pull Request 的註解中輸入特定詞組後,按需觸發這些工作。這樣,您可以檢查您對特定 IO 的貢獻是改進還是使情況更糟(希望不會!)。

若要執行 IO 整合測試,請在您的 Pull Request 中輸入以下註解

測試詞組
JdbcIOIT執行 Java JdbcIO 效能測試
MongoDBIOIT執行 Java MongoDBIO 效能測試
HadoopFormatIOIT執行 Java HadoopFormatIO 效能測試
TextIO - 本機檔案系統執行 Java TextIO 效能測試
TextIO - HDFS執行 Java TextIO 效能測試 HDFS
壓縮 TextIO - 本機檔案系統執行 Java CompressedTextIO 效能測試
壓縮 TextIO - HDFS執行 Java CompressedTextIO 效能測試 HDFS
AvroIO - 本機檔案系統執行 Java AvroIO 效能測試
AvroIO - HDFS執行 Java AvroIO 效能測試 HDFS
TFRecordIO - 本機檔案系統執行 Java TFRecordIO 效能測試
ParquetIO - 本機檔案系統執行 Java ParquetIO 效能測試
XmlIO - 本機檔案系統執行 Java XmlIO 效能測試
XmlIO - HDFS在 HDFS 上執行 Java XmlIO 效能測試

每個工作定義都可以在 .test-infra/jenkins 中找到。如果您在 Pull Request 中修改/新增了新的 Jenkins 工作定義,請在執行整合測試之前執行 seed 工作(註解:「執行 seed 工作」)。

效能測試儀表板

如前所述,我們透過收集定期執行的 Jenkins 工作中的測試執行時間來測量 IOIT 的效能。隨後的結果會儲存在資料庫 (BigQuery) 中,因此我們可以以繪圖的形式顯示它們。

此處提供收集所有結果的儀表板:效能測試儀表板

實作整合測試

實作整合測試需要三個元件

以下將詳細討論這兩個部分。

測試程式碼

這些是整合測試程式碼使用的慣例

這些原則的端對端範例可以在 JdbcIOIT 中找到。

Kubernetes 指令碼

如同在整合測試、資料儲存和 Kubernetes中所討論的,為了讓您的測試在 Beam 的持續整合伺服器上執行,您需要實作一個 Kubernetes 腳本來建立資料儲存的實例。

如果您需要這方面的協助或有其他問題,請聯繫 Beam dev@ mailing list,社群或許能夠協助您。

建立 Beam 資料儲存 Kubernetes 腳本的指南

  1. 您應該定義兩個 Kubernetes 腳本。
    • 這是實作第一項的最佳已知方法。
    • 第一個腳本將包含主要的資料儲存實例腳本 (StatefulSet),以及一個公開資料儲存的 NodePort 服務。這將是 Beam Jenkins 持續整合伺服器執行的腳本。
    • 第二個腳本將定義一個額外的 LoadBalancer 服務,用於在 Kubernetes 叢集位於另一個網路時,向資料儲存公開外部 IP 位址。此檔案的名稱通常會加上 '-for-local-dev' 後綴。
  2. 您必須確保在 Pod 崩潰後會重新建立。
    • 如果您直接使用 pod,如果 Pod 崩潰或發生某些原因導致叢集移動您的 Pod 容器,它將不會被重新建立。
    • 在大多數情況下,您會想要使用 StatefulSet,因為它支援在重新啟動之間持續存在的持久磁碟,並且使用特定的持久磁碟時,有一個與 Pod 相關聯的穩定網路識別符。DeploymentReplicaSet 也可能有用,但可能在較少的情況下,因為它們沒有這些功能。
  3. 您應該為資料儲存的小型和大型實例建立不同的腳本。
    • 小型和大型整合測試中所討論的,這似乎是支援同時具有小型和大型資料儲存以進行整合測試的最佳方法。
  4. 您必須使用來自可信任來源的 Docker 映像檔,並釘選 Docker 映像檔的版本。
    • 您應該優先使用以下順序的映像檔
      1. 由資料來源/接收器建立者提供的映像檔(如果他們官方維護)。對於 Apache 專案,這將是官方 Apache 儲存庫。
      2. 官方 Docker 映像檔,因為它們具有安全性修復和保證的維護。
      3. 非官方的 Docker 映像檔,或來自具有良好維護者的其他供應商的映像檔(例如 quay.io)。

Jenkins 工作

您可以在.test-infra/jenkins 目錄中找到現有的 IOIT jenkins 工作定義範例。尋找名為 job_PerformanceTest_*.groovy 的檔案。最突出的範例是

請注意,有一個實用程式類別可以幫助輕鬆建立工作,而不會忘記重要的步驟或重複程式碼。有關詳細資訊,請參閱Kubernetes.groovy

小規模和大規模整合測試

Apache Beam 希望它可以在多種組態中執行整合測試

您可以透過以下方式執行此操作

  1. 建立兩個 Kubernetes 腳本:一個用於資料儲存的小型實例,另一個用於大型實例。
  2. 讓您的測試採用一個管道選項,該選項決定是否產生少量或大量的測試資料(其中小型和大型是適合您資料儲存的大小)

一個例子是 HadoopFormatIO 的測試。