測試您的管線

測試您的管線是在開發有效的資料處理解決方案時特別重要的一個步驟。Beam 模型間接的特性,您的使用者程式碼在其中建構要遠端執行的管線圖表,這可能會讓除錯失敗的執行變得不簡單。通常,對您的管線程式碼執行本機單元測試比除錯管線的遠端執行更快且更簡單。

在您選擇的執行器上執行管線之前,先在本機單元測試您的管線程式碼通常是識別和修正管線程式碼錯誤的最佳方法。在本機單元測試您的管線,也能讓您使用您熟悉/最愛的本機除錯工具。

您可以使用 DirectRunnerPrismRunner。兩者都是對測試和本機開發有幫助的本機執行器。

在本機測試您的管線之後,您可以使用您選擇的執行器以小規模進行測試。例如,使用 Flink 執行器搭配本機或遠端 Flink 叢集。

Beam SDK 提供許多方法來單元測試您的管線程式碼,從最低層級到最高層級都有。從最低層級到最高層級,這些是

為了支援單元測試,Java 的 Beam SDK 在 testing package 中提供許多測試類別。您可以使用這些測試作為參考和指南。

測試轉換

若要測試您建立的轉換,您可以使用以下模式

TestPipeline

TestPipeline 是 Beam Java SDK 中包含的類別,專門用於測試轉換。

TestPipeline 是 Beam Python SDK 中包含的類別,專門用於測試轉換。

對於測試,當您建立管線物件時,請使用 TestPipeline 來取代 Pipeline。與 Pipeline.create 不同,TestPipeline.create 會在內部處理設定 PipelineOptions

您可以如下建立 TestPipeline

Pipeline p = TestPipeline.create();
with TestPipeline as p:
    ...
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"

// Override TestMain with ptest.Main,
// once per package.
func TestMain(m *testing.M) {
	ptest.Main(m)
}

func TestPipeline(t *testing.T) {
	...
	// The Go SDK doesn't use a TestPipeline concept,
	// and recommends using the ptest harness
	// to wrap pipeline construction.
	pr := ptest.BuildAndRun(t, func(s beam.Scope) {
		...
	})
	...
}

注意:請在 這篇部落格文章中閱讀有關在 Beam 中測試無界限管線的內容。

使用 Create 轉換

您可以使用 Create 轉換,從標準的記憶體內集合類別 (例如 Java 或 Python List) 建立 PCollection。如需更多資訊,請參閱建立 PCollection

PAssert

PAssert 是 Beam Java SDK 中包含的類別,它會對 PCollection 的內容進行斷言。您可以使用 PAssert 來驗證 PCollection 是否包含一組特定的預期元素。

對於給定的 PCollection,您可以使用 PAssert 來驗證內容,如下所示

PCollection<String> output = ...;

// Check whether a PCollection contains some elements in any order.
PAssert.that(output)
.containsInAnyOrder(
  "elem1",
  "elem3",
  "elem2");
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

output = ...

# Check whether a PCollection contains some elements in any order.
assert_that(
    output,
    equal_to(["elem1", "elem3", "elem2"]))
import "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"

output := ... // beam.PCollection

// Check whether a PCollection contains some elements in any order.
passert.EqualsList(s, output, ["elem1", "elem3", "elem2"])

任何使用 PAssert 的 Java 程式碼都必須連結 JUnitHamcrest。如果您使用 Maven,您可以透過將下列相依性新增至專案的 pom.xml 檔案來連結 Hamcrest

<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest</artifactId>
    <version>2.2</version>
    <scope>test</scope>
</dependency>

如需這些類別如何運作的更多資訊,請參閱 org.apache.beam.sdk.testing 套件文件。

複合轉換的範例測試

下列程式碼顯示複合轉換的完整測試。測試會將 Count 轉換套用至 String 元素的輸入 PCollection。測試會使用 Create 轉換從 List<String> 建立輸入 PCollection

public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
  "hi", "there", "hi", "hi", "sue", "bob",
  "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

  public void testCount() {
    // Create a test pipeline.
    Pipeline p = TestPipeline.create();

    // Create an input PCollection.
    PCollection<String> input = p.apply(Create.of(WORDS));

    // Apply the Count transform under test.
    PCollection<KV<String, Long>> output =
      input.apply(Count.<String>perElement());

    // Assert on the results.
    PAssert.that(output)
      .containsInAnyOrder(
          KV.of("hi", 4L),
          KV.of("there", 1L),
          KV.of("sue", 2L),
          KV.of("bob", 2L),
          KV.of("", 3L),
          KV.of("ZOW", 1L));

    // Run the pipeline.
    p.run();
  }
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

class CountTest(unittest.TestCase):

  def test_count(self):
    # Our static input data, which will make up the initial PCollection.
    WORDS = [
      "hi", "there", "hi", "hi", "sue", "bob",
      "hi", "sue", "", "", "ZOW", "bob", ""
    ]
    # Create a test pipeline.
    with TestPipeline() as p:

      # Create an input PCollection.
      input = p | beam.Create(WORDS)

      # Apply the Count transform under test.
      output = input | beam.combiners.Count.PerElement()

      # Assert on the results.
      assert_that(
        output,
        equal_to([
            ("hi", 4),
            ("there", 1),
            ("sue", 2),
            ("bob", 2),
            ("", 3),
            ("ZOW", 1)]))

      # The pipeline will run and verify the results.
import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)

// formatFn takes a key value pair and puts them
// into a single string for comparison.
func formatFn(w string, c int) string {
	return fmt.Sprintf("%s: %d", w, c)
}

// Register the functional DoFn to ensure execution on workers.
func init() {
	register.Function2x1(formatFn)
}

func TestCountWords(t *testing.T) {
	// The pipeline will run and verify the results.
	ptest.BuildAndRun(t, func(s beam.Scope) {
		words := []string{"hi", "there", "hi", "hi", "sue", "bob",
			"hi", "sue", "", "", "ZOW", "bob", ""}

		wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

		// Create a PCollection from the words static input data.
		input := beam.CreateList(s, words)

		// Apply the Count transform under test.
		output := stats.Count(s, col)
		formatted := beam.ParDo(s, formatFn, output)

		// Assert that the output PCollection matches the wantCounts data.
		passert.Equals(s, formatted, wantCounts...)
	})
}

端對端測試管線

您可以使用 Beam SDK 中的測試類別 (例如 Beam Java SDK 中的 TestPipelinePAssert) 來端對端測試整個管線。通常,若要測試整個管線,您會執行下列操作

測試 WordCount 管線

下列範例程式碼顯示如何測試 WordCount 範例管線WordCount 通常從文字檔讀取輸入資料的行;相反地,測試會建立包含一些文字行的 List<String>,並使用 Create 轉換來建立初始的 PCollection

WordCount 的最終轉換 (來自複合轉換 CountWords) 會產生格式化的字數計數 PCollection<String>,適合列印。我們的測試管線不是將該 PCollection 寫入輸出文字檔,而是使用 PAssert 來驗證 PCollection 的元素是否與包含我們預期輸出資料的靜態 String 陣列的元素相符。

public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.

    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS));

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

class CountWords(beam.PTransform):
    # CountWords transform omitted for conciseness.
    # Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py

class WordCountTest(unittest.TestCase):

  # Our input data, which will make up the initial PCollection.
  WORDS = [
      "hi", "there", "hi", "hi", "sue", "bob",
      "hi", "sue", "", "", "ZOW", "bob", ""
  ]

  # Our output data, which is the expected data that the final PCollection must match.
  EXPECTED_COUNTS = ["hi: 5", "there: 1", "sue: 2", "bob: 2"]

  # Example test that tests the pipeline's transforms.

  def test_count_words(self):
    with TestPipeline() as p:

      # Create a PCollection from the WORDS static input data.
      input = p | beam.Create(WORDS)

      # Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      output = input | CountWords()

      # Assert that the output PCollection matches the EXPECTED_COUNTS data.
      assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput')

    # The pipeline will run and verify the results.
package wordcount

import (
	"testing"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

// CountWords and formatFn are omitted for conciseness.
// Code for the Full transforms can be found here:
// https://github.com/apache/beam/blob/master/sdks/go/examples/debugging_wordcount/debugging_wordcount.go

func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { ... }

func formatFn(w string, c int) string { ... }

func TestCountWords(t *testing.T) {
	// The pipeline will run and verify the results.
	ptest.BuildAndRun(t, func(s beam.Scope) {
		words := []string{"hi", "there", "hi", "hi", "sue", "bob",
			"hi", "sue", "", "", "ZOW", "bob", ""}

		wantCounts := []string{"hi: 5", "there: 1", "sue: 2", "bob: 2"}

		// Create a PCollection from the words static input data.
		input := beam.CreateList(s, words)

		// Run ALL the pipeline's transforms
		// (in this case, the CountWords composite transform).
		output := CountWords(s, input)
		formatted := beam.ParDo(s, formatFn, output)

		// Assert that the output PCollection matches
		// the wantCounts data.
		passert.Equals(s, formatted, wantCounts...)
	})
}