Apache Beam WordCount 範例

WordCount 範例示範如何設定一個處理管道,該管道可以讀取文字、將文字行符記化為個別的單字,並對每個單字執行頻率計數。Beam SDK 包含一系列這四個逐步更詳細的 WordCount 範例,它們彼此建立。所有範例的輸入文字都是一組莎士比亞的文本。

每個 WordCount 範例都會介紹 Beam 程式設計模型中的不同概念。從了解最簡單的範例 MinimalWordCount 開始。一旦您熟悉建立管道的基本原則,請繼續學習其他範例中的更多概念。

MinimalWordCount 範例

MinimalWordCount 示範了一個簡單的管道,該管道使用 Direct Runner 從文字檔讀取、套用轉換來符記化並計算單字,以及將資料寫入輸出文字檔。

此範例會硬式編碼其輸入和輸出檔案的位置,並且不執行任何錯誤檢查;它僅旨在向您展示建立 Beam 管道的「基本要素」。這種缺乏參數化的特性使得此特定管道在不同執行器之間的移植性不如標準 Beam 管道。在後面的範例中,我們將參數化管道的輸入和輸出來源,並展示其他最佳實務。

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount
python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
$ go install github.com/apache/beam/sdks/v2/go/examples/minimal_wordcount
$ minimal_wordcount

若要檢視 Java 中的完整程式碼,請參閱 MinimalWordCount

若要檢視 Python 中的完整程式碼,請參閱 wordcount_minimal.py

若要檢視 Go 中的完整程式碼,請參閱 minimal_wordcount.go

重要概念

以下各節將使用 MinimalWordCount 管道中的相關程式碼摘錄來詳細說明這些概念。

建立管道

在本範例中,程式碼首先建立 PipelineOptions 物件。此物件可讓我們為管道設定各種選項,例如將執行我們管道的管道執行器以及所選執行器所需的任何執行器特定設定。在本範例中,我們以程式設計方式設定這些選項,但更常見的是,使用命令列引數來設定 PipelineOptions

您可以為執行管道指定執行器,例如 DataflowRunnerSparkRunner。如果您省略指定執行器,如本範例所示,您的管道將使用 DirectRunner 在本機執行。在下一節中,我們將指定管道的執行器。

 // Create a PipelineOptions object. This object lets us set various execution
 // options for our pipeline, such as the runner you wish to use. This example
 // will run with the DirectRunner by default, based on the class path configured
 // in its dependencies.
 PipelineOptions options = PipelineOptionsFactory.create();
from apache_beam.options.pipeline_options import PipelineOptions

input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
)

下一步是使用我們剛建構的選項建立 Pipeline 物件。Pipeline 物件會建立要執行的轉換圖形,並與該特定管道相關聯。

第一步是建立 Pipeline 物件。它會建立要執行的轉換圖形,並與該特定管道相關聯。範圍允許分組為複合轉換。

Pipeline p = Pipeline.create(options);
pipeline = beam.Pipeline(options=beam_options)
p := beam.NewPipeline()
s := p.Root()

套用管道轉換

MinimalWordCount 管道包含多個轉換,用於將資料讀取到管道中、操作或以其他方式轉換資料,以及寫出結果。轉換可以由單一操作組成,或者可以包含多個巢狀轉換(這是複合轉換)。

每個轉換都會採用某種輸入資料並產生一些輸出資料。輸入和輸出資料通常由 SDK 類別 PCollection 表示。PCollection 是一個由 Beam SDK 提供的特殊類別,您可以使用它來表示幾乎任何大小的資料集,包括無界限的資料集。

The MinimalWordCount pipeline data flow.

圖 1:MinimalWordCount 管道資料流。

MinimalWordCount 管道包含五個轉換

  1. 文字檔 Read 轉換會套用到 Pipeline 物件本身,並產生 PCollection 作為輸出。輸出 PCollection 中的每個元素都代表輸入檔中的一行文字。此範例使用儲存在可公開存取的 Google Cloud Storage 值區 ("gs://") 中的輸入資料。
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
pipeline
| beam.io.ReadFromText(input_file)
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/*")
  1. 此轉換會分割 PCollection<String> 中的行,其中每個元素都是莎士比亞合集中個別的單字。或者,可以使用 ParDo 轉換,在每個元素上呼叫 DoFn(定義為內嵌匿名類別),將文字行符記化為個別的單字。此轉換的輸入是先前的 TextIO.Read 轉換產生的文字行 PCollectionParDo 轉換會輸出新的 PCollection,其中每個元素都代表文字中的個別單字。
    .apply("ExtractWords", FlatMapElements
        .into(TypeDescriptors.strings())
        .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
# The Flatmap transform is a simplified version of ParDo.

| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)
  1. SDK 提供的 Count 轉換是一個通用轉換,它採用任何類型的 PCollection,並傳回鍵/值對的 PCollection。每個鍵都代表輸入集合中的唯一元素,每個值都代表該鍵在輸入集合中出現的次數。

    在此管道中,Count 的輸入是先前的 ParDo 產生的個別單字 PCollection,輸出是鍵/值對的 PCollection,其中每個鍵代表文字中的唯一單字,而相關值是每個單字的出現次數。

.apply(Count.<String>perElement())
| beam.combiners.Count.PerElement()
counted := stats.Count(s, words)
  1. 下一個轉換會將每個唯一單字和出現次數的鍵/值對格式化為可列印的字串,適合寫入輸出檔案。

    map 轉換是一個較高層級的複合轉換,它封裝了一個簡單的 ParDo。對於輸入 PCollection 中的每個元素,map 轉換會套用一個函式,該函式會產生一個輸出元素。

.apply("FormatResults", MapElements
    .into(TypeDescriptors.strings())
    .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
| beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)
  1. 文字檔寫入轉換。此轉換會採用格式化字串的最終 PCollection 作為輸入,並將每個元素寫入輸出文字檔。輸入 PCollection 中的每個元素都代表產生的輸出檔案中的一行文字。
.apply(TextIO.write().to("wordcounts"));
| beam.io.WriteToText(output_path)
textio.Write(s, "wordcounts.txt", formatted)

請注意,Write 轉換會產生 PDone 類型的瑣碎結果值,在本例中,該值將被忽略。

請注意,Write 轉換不會傳回任何 PCollection。

執行管道

藉由呼叫 run 方法來執行管道,該方法會將您的管道傳送到您在 PipelineOptions 中指定的管道執行器執行。

將管道傳遞給執行器來執行管道。

p.run().waitUntilFinish();
with beam.Pipeline(...) as p:
  [construction]
# p.run() automatically called
direct.Execute(context.Background(), p)

請注意,run 方法是非同步的。對於阻塞式執行,請在呼叫 run 所傳回的結果物件上呼叫 waitUntilFinish wait_until_finish 方法。

在 Playground 中試用完整範例

WordCount 範例

此 WordCount 範例介紹了一些建議的程式設計實務,這些實務可以讓您的管道更容易讀取、撰寫和維護。雖然不是明確要求的,但它們可以使管道的執行更具彈性、有助於測試管道,並有助於使管道的程式碼可重複使用。

本節假設您已充分了解建立管道的基本概念。如果您覺得自己還沒達到那個程度,請閱讀上面的章節:MinimalWordCount

若要在 Java 中執行此範例

依照Java WordCount 快速入門所述,設定您的開發環境並產生 Maven 原型。然後使用其中一個執行器執行管道。

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                  --project=YOUR_PROJECT --region=GCE_REGION \
                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts --runner=SamzaRunner" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

若要檢視 Java 的完整程式碼,請參閱 WordCount

若要在 Python 中執行此範例

python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts
# Running Beam Python on a distributed Flink cluster requires additional configuration.
# See /documentation/runners/flink/ for more information.
python -m apache_beam.examples.wordcount --input /path/to/inputfile \
                                         --output /path/to/write/counts \
                                         --runner SparkRunner
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --region YOUR_GCP_REGION \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

若要檢視 Python 的完整程式碼,請參閱 wordcount.py

若要在 Go 中執行此範例

$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
$ wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --region your-gcp-region \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache/beam_go_sdk:latest
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

若要檢視 Go 的完整程式碼,請參閱 wordcount.go

新概念

以下章節將詳細說明這些關鍵概念,並將管道程式碼分解為較小的部分。

指定明確的 DoFn

當使用 ParDo 轉換時,您需要指定套用至輸入 PCollection 中每個元素的處理操作。此處理操作是 SDK 類別 DoFn 的子類別。您可以為每個 ParDo 內嵌建立 DoFn 子類別,作為匿名內部類別的實例,如同先前的範例 (MinimalWordCount) 中所做的。但是,在全域層級定義 DoFn 通常是一個好主意,這使其更容易進行單元測試,並可使 ParDo 程式碼更具可讀性。

當使用 ParDo 轉換時,您需要指定套用至輸入 PCollection 中每個元素的處理操作。此處理操作可以是具名函式或具有特殊命名方法的結構。您可以使用匿名函式 (但不能使用閉包)。但是,在全域層級定義 DoFn 通常是一個好主意,這使其更容易進行單元測試,並可使 ParDo 程式碼更具可讀性。

// In this example, ExtractWordsFn is a DoFn that is defined as a static class:

static class ExtractWordsFn extends DoFn<String, String> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
        ...
    }
}
# In this example, the DoFns are defined as classes:


class FormatAsTextFn(beam.DoFn):
  def process(self, element):
    word, count = element
    yield '%s: %s' % (word, count)

formatted = counts | beam.ParDo(FormatAsTextFn())
// In this example, extractFn is a DoFn that is defined as a function:

func extractFn(ctx context.Context, line string, emit func(string)) {
   ...
}

建立複合轉換

如果您有一個包含多個轉換或 ParDo 步驟的處理操作,您可以將其建立為 PTransform 的子類別。建立 PTransform 子類別可讓您封裝複雜的轉換,可以使您的管道結構更清晰和模組化,並使單元測試更容易。

如果您有一個包含多個轉換或 ParDo 步驟的處理操作,您可以使用一般的 Go 函式來封裝它們。您還可以進一步使用具名的子範圍將它們分組為可見於監控的複合轉換。

在此範例中,兩個轉換被封裝為 PTransform 子類別 CountWordsCountWords 包含執行 ExtractWordsFnParDo 和 SDK 提供的 Count 轉換。

在此範例中,兩個轉換被封裝為 CountWords 函式。

當定義 CountWords 時,我們指定其最終的輸入和輸出;輸入是提取操作的 PCollection<String>,輸出是由計數操作產生的 PCollection<KV<String, Long>>

public static class CountWords extends PTransform<PCollection<String>,
    PCollection<KV<String, Long>>> {
  @Override
  public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

    return wordCounts;
  }
}

public static void main(String[] args) throws IOException {
  Pipeline p = ...

  p.apply(...)
   .apply(new CountWords())
   ...
}
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >>
      beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))

      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement())

counts = lines | CountWords()
func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
	s = s.Scope("CountWords")

	// Convert lines of text into individual words.
	col := beam.ParDo(s, extractFn, lines)

	// Count the number of times each word occurs.
	return stats.Count(s, col)
}

使用可參數化的 PipelineOptions

您可以在執行管道時硬式編碼各種執行選項。但是,更常見的方法是透過命令列引數剖析定義您自己的組態選項。透過命令列定義您的組態選項可使程式碼更容易在不同的執行器之間移植。

新增要由命令列剖析器處理的引數,並為它們指定預設值。然後您可以在管道程式碼中存取選項值。

您可以使用標準的 flag 套件來達到此目的。

public static interface WordCountOptions extends PipelineOptions {
  @Description("Path of the file to read from")
  @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
  String getInputFile();
  void setInputFile(String value);
  ...
}

public static void main(String[] args) {
  WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
      .as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  ...
}
import argparse

parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = pipeline | beam.io.ReadFromText(args.input_file)
var input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.")

func main() {
    ...
    p := beam.NewPipeline()
    s := p.Root()

    lines := textio.Read(s, *input)
    ...

在 Playground 中試用完整範例

DebuggingWordCount 範例

DebuggingWordCount 範例示範了一些檢測您的管道程式碼的最佳實務。

若要在 Java 中執行此範例

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SparkRunner --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
     -Dexec.args="--runner=SamzaRunner --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.DebuggingWordCount \
     --runner=JetRunner --jetLocalMode=3 --output=counts

若要檢視 Java 的完整程式碼,請參閱 DebuggingWordCount

若要在 Python 中執行此範例

python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                         --output gs://YOUR_GCS_BUCKET/counts \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

若要檢視 Python 的完整程式碼,請參閱 wordcount_debugging.py

若要在 Go 中執行此範例

$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
$ debugging_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                      --output gs://<your-gcs-bucket>/counts \
                      --runner dataflow \
                      --project your-gcp-project \
                      --region your-gcp-region \
                      --temp_location gs://<your-gcs-bucket>/tmp/ \
                      --staging_location gs://<your-gcs-bucket>/binaries/ \
                      --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

若要檢視 Go 的完整程式碼,請參閱 debugging_wordcount.go

新概念

以下章節將詳細說明這些關鍵概念,並將管道程式碼分解為較小的部分。

記錄

每個執行器可能會選擇以自己的方式處理記錄。

// This example uses .trace and .debug:

public class DebuggingWordCount {

  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
    ...

    @ProcessElement
    public void processElement(ProcessContext c) {
      if (...) {
        ...
        LOG.debug("Matched: " + c.element().getKey());
      } else {
        ...
        LOG.trace("Did not match: " + c.element().getKey());
      }
    }
  }
}
# [START example_wordcount_debugging_aggregators]
import logging

class FilterTextFn(beam.DoFn):
  """A DoFn that filters for a specific key based on a regular expression."""
  def __init__(self, pattern):
    self.pattern = pattern
    # A custom metric can track values in your pipeline as it runs. Create
    # custom metrics matched_word and unmatched_words.
    self.matched_words = Metrics.counter(self.__class__, 'matched_words')
    self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')

  def process(self, element):
    word, _ = element
    if re.match(self.pattern, word):
      # Log at INFO level each element we match. When executing this pipeline
      # using the Dataflow service, these log lines will appear in the Cloud
      # Logging UI.
      logging.info('Matched %s', word)

      # Add 1 to the custom metric counter matched_words
      self.matched_words.inc()
      yield element
    else:
      # Log at the "DEBUG" level each element that is not matched. Different
      # log levels can be used to control the verbosity of logging providing
      # an effective mechanism to filter less important information. Note
      # currently only "INFO" and higher level logs are emitted to the Cloud
      # Logger. This log message will not be visible in the Cloud Logger.
      logging.debug('Did not match %s', word)

      # Add 1 to the custom metric counter umatched_words
      self.umatched_words.inc()
type filterFn struct {
    ...
}

func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
    if f.re.MatchString(word) {
         // Log at the "INFO" level each element that we match.
         log.Infof(ctx, "Matched: %v", word)
         emit(word, count)
    } else {
        // Log at the "DEBUG" level each element that is not matched.
        log.Debugf(ctx, "Did not match: %v", word)
    }
}

Direct Runner

當使用 DirectRunner 執行您的管道時,您可以將記錄訊息直接列印到您的本機主控台。 如果您使用 Beam SDK for Java,則必須將 Slf4j 新增到您的類別路徑。

Cloud Dataflow Runner

當使用 DataflowRunner 執行您的管道時,您可以使用 Stackdriver Logging。Stackdriver Logging 會將所有 Cloud Dataflow 工作者的記錄彙整到 Google Cloud Platform 主控台中的單一位置。您可以使用 Stackdriver Logging 來搜尋和存取 Cloud Dataflow 已啟動以完成您工作的所有工作者的記錄。您管道的 DoFn 實例中的記錄陳述式將在您的管道執行時出現在 Stackdriver Logging 中。

您也可以控制工作者的記錄層級。預設情況下,執行使用者程式碼的 Cloud Dataflow 工作者會設定為以「INFO」記錄層級和更高層級記錄到 Stackdriver Logging。您可以透過指定以下內容來覆寫特定記錄命名空間的記錄層級:--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}。例如,在執行使用 Cloud Dataflow 服務的管道時,透過指定 --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"},Stackdriver Logging 除了預設的「INFO」或更高層級記錄外,還將包含該套件的「DEBUG」或更高層級記錄。

可以透過指定 --defaultWorkerLogLevel=<TRACE, DEBUG, INFO, WARN, ERROR 其中之一> 來覆寫預設的 Cloud Dataflow 工作者記錄組態。例如,當使用 Cloud Dataflow 服務執行管道時,透過指定 --defaultWorkerLogLevel=DEBUG,Cloud Logging 將包含所有「DEBUG」或更高層級的記錄。請注意,將預設工作者記錄層級變更為 TRACE 或 DEBUG 會大幅增加記錄輸出量。

Apache Spark Runner

注意:此章節尚未新增。有一個關於此的未解決問題 (Issue 18076)。

注意:此章節尚未新增。有一個關於此的未解決問題 (Issue 18075)。

Apache Nemo Runner

當使用 NemoRunner 執行您的管道時,大多數記錄訊息會直接列印到您的本機主控台。您應該將 Slf4j 新增到您的類別路徑,以充分利用記錄。為了觀察驅動程式和執行器端的記錄,您應該觀察 Apache REEF 建立的資料夾。例如,當透過本機執行階段執行您的管道時,會在您的工作目錄上建立一個名為 REEF_LOCAL_RUNTIME 的資料夾,並且可以在該目錄下找到記錄和指標資訊。

使用斷言測試您的管道

PAssertassert_that 是一組方便的 PTransforms,其風格類似於 Hamcrest 的集合比對器,可用於編寫管道層級測試以驗證 PCollection 的內容。斷言最適合在具有小型資料集的單元測試中使用。

passert 套件包含方便的 PTransforms,可用於編寫管道層級測試以驗證 PCollection 的內容。斷言最適合在具有小型資料集的單元測試中使用。

以下範例驗證篩選後的字組集符合我們預期的計數。斷言不會產生任何輸出,並且僅當滿足所有預期時,管道才會成功。

以下範例驗證兩個集合包含相同的值。斷言不會產生任何輸出,並且僅當滿足所有預期時,管道才會成功。

public static void main(String[] args) {
  ...
  List<KV<String, Long>> expectedResults = Arrays.asList(
        KV.of("Flourish", 3L),
        KV.of("stomach", 1L));
  PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
  ...
}
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

with TestPipeline() as p:
  assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
...
passert.Equals(s, formatted, "Flourish: 3", "stomach: 1")

請參閱 DebuggingWordCountTest 以取得單元測試範例。

在 Playground 中試用完整範例

WindowedWordCount 範例

WindowedWordCount 範例如同先前的範例一樣計算文字中的字組,但引入了幾個進階概念。

新概念

以下章節將詳細說明這些關鍵概念,並將管道程式碼分解為較小的部分。

若要在 Java 中執行此範例

$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner

You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
                --project=YOUR_PROJECT --region=GCE_REGION \
                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
     -Pdataflow-runner
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
     -Dexec.args="--runner=SamzaRunner --inputFile=pom.xml --output=counts" -Psamza-runner
$ mvn package -Pnemo-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=NemoRunner --inputFile=`pwd`/pom.xml --output=counts
$ mvn package -P jet-runner && java -cp target/word-count-beam-bundled-0.1.jar org.apache.beam.examples.WindowedWordCount \
     --runner=JetRunner --jetLocalMode=3 --inputFile=`pwd`/pom.xml --output=counts

若要檢視 Java 的完整程式碼,請參閱 WindowedWordCount

若要在 Python 中執行此範例

此管道使用 PROJECT:DATASET.TABLEDATASET.TABLE 格式,將結果寫入 BigQuery 表格 --output_table 參數。

python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
                                         --output_table PROJECT:DATASET.TABLE \
                                         --runner DataflowRunner \
                                         --project YOUR_GCP_PROJECT \
                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

若要檢視 Python 的完整程式碼,請參閱 windowed_wordcount.py

若要在 Go 中執行此範例

$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
$ windowed_wordcount --input <PATH_TO_INPUT_FILE> --output counts
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://<your-gcs-bucket>/counts \
            --runner dataflow \
            --project your-gcp-project \
            --temp_location gs://<your-gcs-bucket>/tmp/ \
            --staging_location gs://<your-gcs-bucket>/binaries/ \
            --worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.
This runner is not yet available for the Go SDK.

若要檢視 Go 的完整程式碼,請參閱 windowed_wordcount.go

無界限和有界限的資料集

Beam 可讓您建立單一管道,可以處理有界和無界資料集。如果您的資料集具有固定數量的元素,則它是 有界資料集,並且可以一起處理所有資料。對於有界資料集,要問的問題是「我是否擁有所有資料?」如果資料持續到達 (例如 行動遊戲範例中的無盡遊戲分數串流),則它是 無界資料集。無界資料集永遠無法在任何時間進行處理,因此必須使用持續執行的串流管道來處理資料。資料集僅會完整到特定點,因此要問的問題是「我擁有資料直到哪個點?」Beam 使用視窗化將持續更新的資料集劃分為有限大小的邏輯視窗。如果您的輸入是無界的,則必須使用支援串流的執行器。

如果您的管道的輸入是有界的,則所有下游的 PCollection 也將是有界的。同樣地,如果輸入是無界的,則管道的所有下游 PCollection 也將是無界的,儘管單獨的分支可能是獨立有界的。

回想一下,此範例的輸入是一組莎士比亞的文本,這是一組有限的資料。因此,此範例從文字檔讀取有界資料

public static void main(String[] args) throws IOException {
    Options options = ...
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline
      .apply(TextIO.read().from(options.getInputFile()))
def main(arvg=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input-file',
                      dest='input_file',
                      default='/Users/home/words-example.txt')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)
  lines  = p | 'read' >> ReadFromText(known_args.input_file)
func main() {
   ...
   p := beam.NewPipeline()
   s := p.Root()

   lines := textio.Read(s, *input)
   ...
}

將時間戳記新增至資料

PCollection 中的每個元素都有一個關聯的時間戳記。每個元素的時間戳記最初由建立 PCollection 的來源指派。某些建立無界 PCollection 的來源可以為每個新元素指派一個與讀取或新增元素時相對應的時間戳記。您可以使用 DoFn 手動指派或調整時間戳記;但是,您只能將時間戳記向前移動。

在此範例中,輸入是有界的。為了範例的目的,名為 AddTimestampsFnDoFn 方法 (由 ParDo 叫用) 將為 PCollection 中的每個元素設定時間戳記。

.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
beam.Map(AddTimestampFn(min_timestamp, max_timestamp))
timestampedLines := beam.ParDo(s, &addTimestampFn{Min: mtime.Now()}, lines)

以下是 AddTimestampFn 的程式碼,AddTimestampFn 是一個由 ParDo 叫用的 DoFn,它會設定時間戳記的資料元素 (假設元素本身)。例如,如果元素是記錄行,則此 ParDo 可以從記錄字串中剖析時間並將其設定為元素的時間戳記。莎士比亞的作品中沒有固有的時間戳記,因此在此範例中,我們僅為了說明這個概念而虛構了隨機時間戳記。輸入文字的每一行都會在 2 小時的時間段內取得隨機關聯的時間戳記。

static class AddTimestampFn extends DoFn<String, String> {
  private final Instant minTimestamp;
  private final Instant maxTimestamp;

  AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
    this.minTimestamp = minTimestamp;
    this.maxTimestamp = maxTimestamp;
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    Instant randomTimestamp =
      new Instant(
          ThreadLocalRandom.current()
          .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));

    /**
     * Concept #2: Set the data element with that timestamp.
     */
    c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
  }
}
class AddTimestampFn(beam.DoFn):

  def __init__(self, min_timestamp, max_timestamp):
     self.min_timestamp = min_timestamp
     self.max_timestamp = max_timestamp

  def process(self, element):
    return window.TimestampedValue(
       element,
       random.randint(self.min_timestamp, self.max_timestamp))
type addTimestampFn struct {
	Min beam.EventTime `json:"min"`
}

func (f *addTimestampFn) ProcessElement(x beam.X) (beam.EventTime, beam.X) {
	timestamp := f.Min.Add(time.Duration(rand.Int63n(2 * time.Hour.Nanoseconds())))
	return timestamp, x
}

請注意,使用 beam.X「類型變數」可讓轉換用於任何類型。

視窗化

Beam 使用稱為視窗化的概念,將 PCollection 細分為有界元素的集合。彙總多個元素的 PTransforms 會將每個 PCollection 處理為一系列多個有限視窗,即使整個集合本身可能是無限大小的 (無界的)。

WindowedWordCount 範例會套用固定時間視窗化,其中每個視窗代表固定的時間間隔。此範例的固定視窗大小預設為 1 分鐘 (您可以使用命令列選項變更此值)。

PCollection<String> windowedWords = input
  .apply(Window.<String>into(
    FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
windowed_words = input | beam.WindowInto(window.FixedWindows(60 * window_size_minutes))
windowedLines := beam.WindowInto(s, window.NewFixedWindows(time.Minute), timestampedLines)

在視窗化的 PCollection 上重複使用 PTransform

您也可以重複使用為處理視窗化 PCollection 建立的現有 PTransforms。

PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
word_counts = windowed_words | CountWords()
counted := wordcount.CountWords(s, windowedLines)

在 Playground 中試用完整範例

StreamingWordCount 範例

StreamingWordCount 範例是一個串流管道,它從 Pub/Sub 訂閱或主題讀取 Pub/Sub 訊息,並對每個訊息中的字組執行頻率計數。與 WindowedWordCount 類似,此範例會套用固定時間視窗化,其中每個視窗代表固定的時間間隔。此範例的固定視窗大小為 15 秒。管道會輸出在每個 15 秒視窗中看到的字組的頻率計數。

新概念

若要在 Java 中執行此範例

注意: StreamingWordCount 尚未適用於 Java SDK。

若要在 Python 中執行此範例

python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.
This runner is not yet available for the Python SDK.

若要查看完整的 Python 程式碼,請參閱 streaming_wordcount.py

若要在 Go 中執行此範例

注意: StreamingWordCount 目前不適用於 Go SDK。此問題目前已開啟 ( Issue 18879 )。

讀取無界限的資料集

此範例使用無邊界資料集作為輸入。程式碼使用 beam.io.ReadFromPubSub 從 Pub/Sub 訂閱或主題讀取 Pub/Sub 訊息。

  // This example is not currently available for the Beam SDK for Java.
  # Read from Pub/Sub into a PCollection.
  if known_args.input_subscription:
    data = p | beam.io.ReadFromPubSub(
        subscription=known_args.input_subscription)
  else:
    data = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  lines = data | 'DecodeString' >> beam.Map(lambda d: d.decode('utf-8'))
  // This example is not currently available for the Beam SDK for Go.

寫入無界限的結果

當輸入為無邊界時,輸出 PCollection 也會是如此。因此,您必須確保為結果選擇適當的 I/O。某些 I/O 僅支援邊界輸出,而其他 I/O 則同時支援邊界和無邊界輸出。

此範例使用無邊界的 PCollection,並將結果串流至 Google Pub/Sub。程式碼會格式化結果,並使用 beam.io.WriteToPubSub 將結果寫入 Pub/Sub 主題。

  // This example is not currently available for the Beam SDK for Java.
  # Write to Pub/Sub
  _ = (output
    | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
    | beam.io.WriteToPubSub(known_args.output_topic))
  // This example is not currently available for the Beam SDK for Go.

下一步

如果您遇到任何問題,請隨時 與我們聯繫