在 Apache Beam 中測試 I/O 轉換
測試 Apache Beam I/O 轉換的範例和設計模式
- Java SDK
- Python SDK
注意:本指南仍在進行中。有一個未解決的問題需要完成本指南:BEAM-1025。
簡介
本文件說明 Beam 社群根據我們過去撰寫 I/O 轉換的經驗所建議的一組測試。如果您希望將您的 I/O 轉換貢獻給 Beam 社群,我們會要求您實作這些測試。
雖然編寫單元測試和整合測試是標準做法,但可能有很多定義。我們的定義如下
- 單元測試
- 目標:僅驗證轉換的正確性 - 核心行為、邊角案例等。
- 使用的資料儲存:資料儲存的記憶體版本(如果可用),否則您需要編寫一個 偽造
- 資料集大小:微小(數十到數百行)
- 整合測試
- 目標:捕獲與執行器/資料儲存的真實版本互動時發生的問題
- 使用的資料儲存:實際的執行個體,在測試之前預先設定
- 資料集大小:小到中等(1000 行到數十 GB)
關於效能基準測試的注意事項
我們不主張編寫單獨的測試專門用於效能基準測試。相反地,我們建議設定整合測試,這些測試可以接受必要的參數以涵蓋許多不同的測試情境。
例如,如果整合測試是按照以下準則編寫的,則整合測試可以在不同的執行器(本地或叢集配置)上,以及針對具有小型資料集的小型執行個體或具有較大資料集的大型生產就緒叢集的資料儲存執行。這可以涵蓋各種情境 - 其中一個是效能基準測試。
測試平衡 - 單元測試與整合測試
使用整合測試很容易涵蓋大量的程式碼,但之後很難找到測試失敗的原因,而且測試也比較不可靠。
但是,有一組有價值的錯誤是透過測試發現的,這些測試會測試多個工作者讀取/寫入具有多個節點的資料儲存執行個體(例如,讀取複本等)。這些情境很難透過單元測試找到,而且我們發現它們通常會在 I/O 轉換中造成錯誤。
我們的測試策略是平衡這兩種矛盾的需求。我們建議在單元測試中盡可能多地進行測試,並編寫一個可以在各種配置中執行的單一小型整合測試。
範例
Java
- BigtableIO 的測試實作被認為是目前單元測試
Source
的最佳實務範例 - JdbcIO 有目前撰寫整合測試的最佳實務範例。
- ElasticsearchIO 示範了對有界讀取/寫入的測試
- MqttIO 和 AmpqpIO 示範了無界讀取/寫入
Python
- avroio_test 提供了測試液體分片、
source_test_utils
、assert_that
和equal_to
的範例
單元測試
目標
- 驗證 I/O 轉換中程式碼的正確性。
- 驗證 I/O 轉換與其連接的資料儲存的參考實作(其中「參考實作」表示偽造或記憶體版本)一起使用時是否能正確運作。
- 能夠快速執行,只需要一台機器,且記憶體/磁碟佔用空間合理地小,且沒有非本地網路存取(最好完全沒有)。目標是在幾秒鐘內完成測試 - 任何超過 20 秒的測試都應該與 beam dev 郵件列表討論。
- 驗證 I/O 轉換可以處理網路故障。
非目標
- 測試外部資料儲存中的問題 - 這可能會導致非常複雜的測試。
實作單元測試
關於為所有轉換撰寫單元測試的一般指南,可以在PTransform 風格指南中找到。我們在下面擴展了一些重點。
如果您正在使用 Source
API,請確保徹底地對您的程式碼進行單元測試。輕微的實作錯誤可能會導致資料損壞或資料遺失(例如跳過或重複記錄),這可能會讓您的使用者難以偵測。此外,請考慮使用 SourceTestUtils
source_test_utils
- 這是測試 Source
實作的關鍵部分。
如果您未使用 Source
API,您可以使用 TestPipeline
和 PAssert
assert_that
來協助您進行測試。
如果您正在實作寫入功能,您可以使用 TestPipeline
來寫入測試資料,然後使用非 Beam 客戶端讀取並驗證它。
使用偽造
在單元測試中,不要使用 mock(為每個測試預先設定每個呼叫的精確回應),而是使用 fakes。對於 I/O 轉換測試,使用 fakes 的首選方法是使用預先存在的服務的記憶體內/可嵌入版本,但如果不存在,請考慮實作您自己的版本。事實證明,fakes 是「您可以獲得所需的測試條件」和「您不必編寫一百萬個精確的 mock 函式呼叫」之間的正確組合。
網路失敗
為了幫助測試和關注點分離,與網路互動的程式碼應該在與 I/O 轉換不同的類別中處理。建議的設計模式是,一旦您的 I/O 轉換確定讀取或寫入不再可能,它就會拋出例外。
這允許 I/O 轉換的單元測試表現得好像它們具有完美的網路連線,而且它們不需要重試/以其他方式處理網路連線問題。
批次處理
如果您的 I/O 轉換允許批次讀取/寫入,您必須在測試中強制執行批次處理。在您的 I/O 轉換上具有可設定的批次大小選項可以讓這件事很容易發生。這些必須標記為僅供測試。
I/O 轉換整合測試
我們目前沒有 Python I/O 整合測試或非邊界或最終一致的資料儲存整合測試的範例。我們歡迎在這方面的貢獻 - 請聯絡 Beam dev@ 郵件列表以獲取更多資訊。
目標
- 允許資料儲存、I/O 轉換和執行器之間的互動進行端對端測試,模擬真實世界的情況。
- 允許小規模和大規模測試。
- 自我包含:除了測試可以修改的資料儲存之外,需要最少的初始設定或現有外部狀態。
- 任何人都可以在持續整合伺服器上執行 Beam 執行的同一組 I/O 轉換整合測試。
整合測試、資料儲存和 Kubernetes
為了在真實條件下測試 I/O 轉換,您必須連線到資料儲存實例。
Beam 社群在 Kubernetes 中託管用於整合測試的資料儲存。為了在 Beam 的持續整合環境中執行整合測試,它必須具有設定資料儲存實例的 Kubernetes 腳本。
但是,在本地工作時,沒有使用 Kubernetes 的要求。所有測試基礎架構都允許您傳入連線資訊,因此開發人員可以使用他們偏好的託管基礎架構進行本地開發。
在本機上執行整合測試
您始終可以在自己的機器上執行 IO 整合測試。執行整合測試的高階步驟是
- 設定與正在執行的測試相對應的資料儲存。
- 執行測試,從剛剛建立的資料儲存傳遞連線資訊給它。
- 清除資料儲存。
資料儲存設定/清除
如果您使用 Kubernetes 腳本來託管資料儲存,請確保您可以使用 kubectl 在本地連線到您的叢集。如果您已經設定了自己的資料儲存,您只需要執行以下列表中的步驟 3。
- 設定與您要執行的測試相對應的資料儲存。您可以在 .test-infra/kubernetes 中找到目前所有支援的資料儲存的 Kubernetes 腳本。
- 在某些情況下,有一個專用的設定腳本 (*.sh)。在其他情況下,您只需執行
kubectl create -f [scriptname]
即可建立資料儲存。您也可以讓 kubernetes.sh 腳本為您執行一些標準步驟。 - 慣例規定會有
- 資料儲存本身的 yml 腳本,以及一個
NodePort
服務。NodePort
服務會為從同一個子網路連線到 Kubernetes 叢集機器的任何人開啟資料儲存的連接埠。當在 Minikube Kubernetes 引擎上執行腳本時,此類腳本通常很有用。 - 一個單獨的腳本,帶有 LoadBalancer 服務。此類服務將為資料儲存公開一個外部 ip。當需要外部存取時(例如在 Jenkins 上),需要此類腳本。
- 資料儲存本身的 yml 腳本,以及一個
- 範例
- 對於 JDBC,您可以設定 Postgres:
kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
- 對於 Elasticsearch,您可以執行設定腳本:
bash .test-infra/kubernetes/elasticsearch/setup.sh
- 對於 JDBC,您可以設定 Postgres:
- 在某些情況下,有一個專用的設定腳本 (*.sh)。在其他情況下,您只需執行
- 確定服務的 IP 位址
- NodePort 服務:
kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
- LoadBalancer 服務:
kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
- NodePort 服務:
- 使用
integrationTest
gradle 任務和測試類別中的說明來執行測試(例如,請參閱 JdbcIOIT.java 中的說明)。 - 告訴 Kubernetes 刪除 Kubernetes 腳本中指定的資源
- JDBC:
kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
- Elasticsearch:
bash .test-infra/kubernetes/elasticsearch/teardown.sh
- JDBC:
執行特定測試
integrationTest
是一個用於執行 IO 整合測試的專用 gradle 任務。
在 Cloud Dataflow 執行器上的使用範例
./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
在 HDFS 檔案系統和 Direct 執行器上的使用範例
注意:只有當 /etc/hosts 檔案包含帶有 hadoop namenode 和 hadoop datanodes 外部 IP 的條目時,以下設定才有效。請參閱以下說明:小型叢集設定檔和 大型叢集設定檔。
export HADOOP_USER_NAME=root
./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT
參數說明
選項 | 功能 |
-p sdks/java/io/file-based-io-tests/ | 指定要測試的 I/O 的專案子模組。 |
-DintegrationTestPipelineOptions | 直接將管道選項傳遞給正在執行的測試。 |
-DintegrationTestRunner | 用於執行測試的執行器。目前可能的選項是:direct、dataflow。 |
-Dfilesystem | (可選,如果適用)用於執行測試的檔案系統。目前可能的選項是:gcs、hdfs、s3。如果未提供,將使用本機檔案系統。 |
--tests | 指定要執行的測試(對類別/測試方法的完整限定參照)。 |
在提取請求上執行整合測試
大多數 IO 整合測試都有專用的 Jenkins 工作,這些工作會定期執行以收集指標並避免回歸。由於 ghprb 外掛程式,也可以在 Github Pull Request 的註解中輸入特定詞組後,按需觸發這些工作。這樣,您可以檢查您對特定 IO 的貢獻是改進還是使情況更糟(希望不會!)。
若要執行 IO 整合測試,請在您的 Pull Request 中輸入以下註解
測試 | 詞組 |
JdbcIOIT | 執行 Java JdbcIO 效能測試 |
MongoDBIOIT | 執行 Java MongoDBIO 效能測試 |
HadoopFormatIOIT | 執行 Java HadoopFormatIO 效能測試 |
TextIO - 本機檔案系統 | 執行 Java TextIO 效能測試 |
TextIO - HDFS | 執行 Java TextIO 效能測試 HDFS |
壓縮 TextIO - 本機檔案系統 | 執行 Java CompressedTextIO 效能測試 |
壓縮 TextIO - HDFS | 執行 Java CompressedTextIO 效能測試 HDFS |
AvroIO - 本機檔案系統 | 執行 Java AvroIO 效能測試 |
AvroIO - HDFS | 執行 Java AvroIO 效能測試 HDFS |
TFRecordIO - 本機檔案系統 | 執行 Java TFRecordIO 效能測試 |
ParquetIO - 本機檔案系統 | 執行 Java ParquetIO 效能測試 |
XmlIO - 本機檔案系統 | 執行 Java XmlIO 效能測試 |
XmlIO - HDFS | 在 HDFS 上執行 Java XmlIO 效能測試 |
每個工作定義都可以在 .test-infra/jenkins 中找到。如果您在 Pull Request 中修改/新增了新的 Jenkins 工作定義,請在執行整合測試之前執行 seed 工作(註解:「執行 seed 工作」)。
效能測試儀表板
如前所述,我們透過收集定期執行的 Jenkins 工作中的測試執行時間來測量 IOIT 的效能。隨後的結果會儲存在資料庫 (BigQuery) 中,因此我們可以以繪圖的形式顯示它們。
此處提供收集所有結果的儀表板:效能測試儀表板
實作整合測試
實作整合測試需要三個元件
- 測試程式碼:執行實際測試的程式碼:與 I/O 轉換互動、讀取和寫入資料,以及驗證資料。
- Kubernetes 腳本:設定測試程式碼將使用的資料儲存的 Kubernetes 腳本。
- Jenkins 工作:執行設定資料來源、執行測試以及在測試後清除所有必要步驟的 Jenkins Job DSL 腳本。
以下將詳細討論這兩個部分。
測試程式碼
這些是整合測試程式碼使用的慣例
- 您的測試應使用管道選項來接收連線資訊。
- 對於 Java,io/common 目錄中有一個共用的管道選項物件。這表示,如果同一個資料儲存有兩個測試(例如,對於
Elasticsearch
和HadoopFormatIO
測試),這些測試會共用相同的管道選項。
- 對於 Java,io/common 目錄中有一個共用的管道選項物件。這表示,如果同一個資料儲存有兩個測試(例如,對於
- 以程式方式產生測試資料並參數化用於測試的資料量。
- 對於 Java,可以結合
CountingInput
+TestRow
來產生任何規模的確定性測試資料。
- 對於 Java,可以結合
- 針對您的測試使用先寫入再讀取的樣式。
- 在單個
Test
中,執行管道以使用您的 I/O 轉換進行寫入,然後執行另一個管道以使用您的 I/O 轉換進行讀取。 - 資料的唯一驗證應該是讀取的結果。請勿以任何其他方式驗證寫入資料庫的資料。
- 以有效率的方式驗證所有列的實際內容。一種簡單的方法是取得列的雜湊並將它們組合在一起。
HashingFn
可以讓這變得簡單,而且TestRow
具有預先計算的雜湊。 - 為了方便除錯,請使用
PAssert
的containsInAnyOrder
來驗證所有列子集的內容。
- 在單個
- 測試應假設它們可能會在同一個資料庫實例上多次和/或同時執行。
- 清除測試資料:在
@AfterClass
中執行此操作以確保它會執行。 - 每個執行(時間戳記是一種簡單的方法)和每個方法(在適當時)使用唯一的資料表名稱。
- 清除測試資料:在
這些原則的端對端範例可以在 JdbcIOIT 中找到。
Kubernetes 指令碼
如同在整合測試、資料儲存和 Kubernetes中所討論的,為了讓您的測試在 Beam 的持續整合伺服器上執行,您需要實作一個 Kubernetes 腳本來建立資料儲存的實例。
如果您需要這方面的協助或有其他問題,請聯繫 Beam dev@ mailing list,社群或許能夠協助您。
建立 Beam 資料儲存 Kubernetes 腳本的指南
- 您應該定義兩個 Kubernetes 腳本。
- 這是實作第一項的最佳已知方法。
- 第一個腳本將包含主要的資料儲存實例腳本 (
StatefulSet
),以及一個公開資料儲存的NodePort
服務。這將是 Beam Jenkins 持續整合伺服器執行的腳本。 - 第二個腳本將定義一個額外的
LoadBalancer
服務,用於在 Kubernetes 叢集位於另一個網路時,向資料儲存公開外部 IP 位址。此檔案的名稱通常會加上 '-for-local-dev' 後綴。
- 您必須確保在 Pod 崩潰後會重新建立。
- 如果您直接使用
pod
,如果 Pod 崩潰或發生某些原因導致叢集移動您的 Pod 容器,它將不會被重新建立。 - 在大多數情況下,您會想要使用
StatefulSet
,因為它支援在重新啟動之間持續存在的持久磁碟,並且使用特定的持久磁碟時,有一個與 Pod 相關聯的穩定網路識別符。Deployment
和ReplicaSet
也可能有用,但可能在較少的情況下,因為它們沒有這些功能。
- 如果您直接使用
- 您應該為資料儲存的小型和大型實例建立不同的腳本。
- 如小型和大型整合測試中所討論的,這似乎是支援同時具有小型和大型資料儲存以進行整合測試的最佳方法。
- 您必須使用來自可信任來源的 Docker 映像檔,並釘選 Docker 映像檔的版本。
- 您應該優先使用以下順序的映像檔
- 由資料來源/接收器建立者提供的映像檔(如果他們官方維護)。對於 Apache 專案,這將是官方 Apache 儲存庫。
- 官方 Docker 映像檔,因為它們具有安全性修復和保證的維護。
- 非官方的 Docker 映像檔,或來自具有良好維護者的其他供應商的映像檔(例如 quay.io)。
- 您應該優先使用以下順序的映像檔
Jenkins 工作
您可以在.test-infra/jenkins 目錄中找到現有的 IOIT jenkins 工作定義範例。尋找名為 job_PerformanceTest_*.groovy 的檔案。最突出的範例是
請注意,有一個實用程式類別可以幫助輕鬆建立工作,而不會忘記重要的步驟或重複程式碼。有關詳細資訊,請參閱Kubernetes.groovy。
小規模和大規模整合測試
Apache Beam 希望它可以在多種組態中執行整合測試
- 小規模
- 在執行器的單個 worker 上執行(應該是可能的,但不是必需的)。
- 資料儲存應設定為使用單個節點。
- 資料集可以非常小(1000 列)。
- 大規模
- 在執行器的多個 worker 上執行。
- 資料儲存應設定為使用多個節點。
- 在這種情況下使用的資料集較大(數十 GB)。
您可以透過以下方式執行此操作
- 建立兩個 Kubernetes 腳本:一個用於資料儲存的小型實例,另一個用於大型實例。
- 讓您的測試採用一個管道選項,該選項決定是否產生少量或大量的測試資料(其中小型和大型是適合您資料儲存的大小)
一個例子是 HadoopFormatIO 的測試。
上次更新於 2024/10/31
您是否找到了您要找的所有內容?
所有內容是否實用且清晰?您是否有任何想要變更的地方?請告訴我們!