測試您的管線
測試您的管線是在開發有效的資料處理解決方案時特別重要的一個步驟。Beam 模型間接的特性,您的使用者程式碼在其中建構要遠端執行的管線圖表,這可能會讓除錯失敗的執行變得不簡單。通常,對您的管線程式碼執行本機單元測試比除錯管線的遠端執行更快且更簡單。
在您選擇的執行器上執行管線之前,先在本機單元測試您的管線程式碼通常是識別和修正管線程式碼錯誤的最佳方法。在本機單元測試您的管線,也能讓您使用您熟悉/最愛的本機除錯工具。
您可以使用 DirectRunner 或 PrismRunner。兩者都是對測試和本機開發有幫助的本機執行器。
在本機測試您的管線之後,您可以使用您選擇的執行器以小規模進行測試。例如,使用 Flink 執行器搭配本機或遠端 Flink 叢集。
Beam SDK 提供許多方法來單元測試您的管線程式碼,從最低層級到最高層級都有。從最低層級到最高層級,這些是
- 您可以測試管線中使用的個別函式。
- 您可以將整個 轉換當作一個單元來測試。
- 您可以對整個管線執行端對端測試。
為了支援單元測試,Java 的 Beam SDK 在 testing package 中提供許多測試類別。您可以使用這些測試作為參考和指南。
測試轉換
若要測試您建立的轉換,您可以使用以下模式
- 建立
TestPipeline
。 - 建立一些靜態、已知的測試輸入資料。
- 使用
Create
轉換來建立輸入資料的PCollection
。 - 將您的轉換
Apply
至輸入PCollection
,並儲存產生的輸出PCollection
。 - 使用
PAssert
及其子類別來驗證輸出PCollection
是否包含您預期的元素。
TestPipeline
TestPipeline 是 Beam Java SDK 中包含的類別,專門用於測試轉換。
TestPipeline 是 Beam Python SDK 中包含的類別,專門用於測試轉換。
對於測試,當您建立管線物件時,請使用TestPipeline
來取代 Pipeline
。與 Pipeline.create
不同,TestPipeline.create
會在內部處理設定 PipelineOptions
。您可以如下建立 TestPipeline
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
ptest.Main(m)
}
func TestPipeline(t *testing.T) {
...
// The Go SDK doesn't use a TestPipeline concept,
// and recommends using the ptest harness
// to wrap pipeline construction.
pr := ptest.BuildAndRun(t, func(s beam.Scope) {
...
})
...
}
注意:請在 這篇部落格文章中閱讀有關在 Beam 中測試無界限管線的內容。
使用 Create 轉換
您可以使用 Create
轉換,從標準的記憶體內集合類別 (例如 Java 或 Python List
) 建立 PCollection
。如需更多資訊,請參閱建立 PCollection。
PAssert
PAssert 是 Beam Java SDK 中包含的類別,它會對 PCollection
的內容進行斷言。您可以使用 PAssert
來驗證 PCollection
是否包含一組特定的預期元素。
對於給定的 PCollection
,您可以使用 PAssert
來驗證內容,如下所示
任何使用 PAssert
的 Java 程式碼都必須連結 JUnit
和 Hamcrest
。如果您使用 Maven,您可以透過將下列相依性新增至專案的 pom.xml
檔案來連結 Hamcrest
如需這些類別如何運作的更多資訊,請參閱 org.apache.beam.sdk.testing 套件文件。
複合轉換的範例測試
下列程式碼顯示複合轉換的完整測試。測試會將 Count
轉換套用至 String
元素的輸入 PCollection
。測試會使用 Create
轉換從 List<String>
建立輸入 PCollection
。
public class CountTest {
// Our static input data, which will make up the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() {
// Create a test pipeline.
Pipeline p = TestPipeline.create();
// Create an input PCollection.
PCollection<String> input = p.apply(Create.of(WORDS));
// Apply the Count transform under test.
PCollection<KV<String, Long>> output =
input.apply(Count.<String>perElement());
// Assert on the results.
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 4L),
KV.of("there", 1L),
KV.of("sue", 2L),
KV.of("bob", 2L),
KV.of("", 3L),
KV.of("ZOW", 1L));
// Run the pipeline.
p.run();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
class CountTest(unittest.TestCase):
def test_count(self):
# Our static input data, which will make up the initial PCollection.
WORDS = [
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""
]
# Create a test pipeline.
with TestPipeline() as p:
# Create an input PCollection.
input = p | beam.Create(WORDS)
# Apply the Count transform under test.
output = input | beam.combiners.Count.PerElement()
# Assert on the results.
assert_that(
output,
equal_to([
("hi", 4),
("there", 1),
("sue", 2),
("bob", 2),
("", 3),
("ZOW", 1)]))
# The pipeline will run and verify the results.
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %d", w, c)
}
// Register the functional DoFn to ensure execution on workers.
func init() {
register.Function2x1(formatFn)
}
func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}
wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}
// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)
// Apply the Count transform under test.
output := stats.Count(s, col)
formatted := beam.ParDo(s, formatFn, output)
// Assert that the output PCollection matches the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
端對端測試管線
您可以使用 Beam SDK 中的測試類別 (例如 Beam Java SDK 中的 TestPipeline
和 PAssert
) 來端對端測試整個管線。通常,若要測試整個管線,您會執行下列操作
- 針對管線的每個輸入資料來源,建立一些已知的靜態測試輸入資料。
- 建立一些與您在管線最終輸出
PCollection
中預期的內容相符的靜態測試輸出資料。 - 建立
TestPipeline
來取代標準的Pipeline.create
。 - 取代管線的
Read
轉換,使用Create
轉換從您的靜態輸入資料建立一或多個PCollection
。 - 套用管線的轉換。
- 取代管線的
Write
轉換,使用PAssert
來驗證管線產生的最終PCollection
內容是否與您的靜態輸出資料中預期的值相符。
測試 WordCount 管線
下列範例程式碼顯示如何測試 WordCount 範例管線。WordCount
通常從文字檔讀取輸入資料的行;相反地,測試會建立包含一些文字行的 List<String>
,並使用 Create
轉換來建立初始的 PCollection
。
WordCount
的最終轉換 (來自複合轉換 CountWords
) 會產生格式化的字數計數 PCollection<String>
,適合列印。我們的測試管線不是將該 PCollection
寫入輸出文字檔,而是使用 PAssert
來驗證 PCollection
的元素是否與包含我們預期輸出資料的靜態 String
陣列的元素相符。
public class WordCountTest {
// Our static input data, which will comprise the initial PCollection.
static final String[] WORDS_ARRAY = new String[] {
"hi there", "hi", "hi sue bob",
"hi sue", "", "bob hi"};
static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
// Our static output data, which is the expected data that the final PCollection must match.
static final String[] COUNTS_ARRAY = new String[] {
"hi: 5", "there: 1", "sue: 2", "bob: 2"};
// Example test that tests the pipeline's transforms.
public void testCountWords() throws Exception {
Pipeline p = TestPipeline.create();
// Create a PCollection from the WORDS static input data.
PCollection<String> input = p.apply(Create.of(WORDS));
// Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
PCollection<String> output = input.apply(new CountWords());
// Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
// Run the pipeline.
p.run();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
class CountWords(beam.PTransform):
# CountWords transform omitted for conciseness.
# Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py
class WordCountTest(unittest.TestCase):
# Our input data, which will make up the initial PCollection.
WORDS = [
"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""
]
# Our output data, which is the expected data that the final PCollection must match.
EXPECTED_COUNTS = ["hi: 5", "there: 1", "sue: 2", "bob: 2"]
# Example test that tests the pipeline's transforms.
def test_count_words(self):
with TestPipeline() as p:
# Create a PCollection from the WORDS static input data.
input = p | beam.Create(WORDS)
# Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
output = input | CountWords()
# Assert that the output PCollection matches the EXPECTED_COUNTS data.
assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput')
# The pipeline will run and verify the results.
package wordcount
import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }
func formatFn(w string, c int) string { ... }
func TestCountWords(t *testing.T) {
// The pipeline will run and verify the results.
ptest.BuildAndRun(t, func(s beam.Scope) {
words := []string{"hi", "there", "hi", "hi", "sue", "bob",
"hi", "sue", "", "", "ZOW", "bob", ""}
wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}
// Create a PCollection from the words static input data.
input := beam.CreateList(s, words)
// Run ALL the pipeline's transforms
// (in this case, the CountWords composite transform).
output := CountWords(s, input)
formatted := beam.ParDo(s, formatFn, output)
// Assert that the output PCollection matches
// the wantCounts data.
passert.Equals(s, formatted, wantCounts...)
})
}
上次更新於 2024/10/31
您是否找到所需的一切?
這些資訊對您是否有幫助且清楚明瞭呢?您是否有任何想變更的地方?請告訴我們!