Apache Beam WordCount 範例
- Java SDK
- Python SDK
- Go SDK
WordCount 範例示範如何設定一個處理管道,該管道可以讀取文字、將文字行符記化為個別的單字,並對每個單字執行頻率計數。Beam SDK 包含一系列這四個逐步更詳細的 WordCount 範例,它們彼此建立。所有範例的輸入文字都是一組莎士比亞的文本。
每個 WordCount 範例都會介紹 Beam 程式設計模型中的不同概念。從了解最簡單的範例 MinimalWordCount 開始。一旦您熟悉建立管道的基本原則,請繼續學習其他範例中的更多概念。
- MinimalWordCount 示範了建立管道所涉及的基本原則。
- WordCount 介紹了一些在建立可重複使用和可維護的管道時更常見的最佳實務。
- DebuggingWordCount 介紹了記錄和偵錯實務。
- WindowedWordCount 示範了如何使用 Beam 的程式設計模型來處理有界限和無界限的資料集。
MinimalWordCount 範例
MinimalWordCount 示範了一個簡單的管道,該管道使用 Direct Runner 從文字檔讀取、套用轉換來符記化並計算單字,以及將資料寫入輸出文字檔。
此範例會硬式編碼其輸入和輸出檔案的位置,並且不執行任何錯誤檢查;它僅旨在向您展示建立 Beam 管道的「基本要素」。這種缺乏參數化的特性使得此特定管道在不同執行器之間的移植性不如標準 Beam 管道。在後面的範例中,我們將參數化管道的輸入和輸出來源,並展示其他最佳實務。
若要檢視 Java 中的完整程式碼,請參閱 MinimalWordCount。
若要檢視 Python 中的完整程式碼,請參閱 wordcount_minimal.py。
若要檢視 Go 中的完整程式碼,請參閱 minimal_wordcount.go。
重要概念
- 建立管道
- 將轉換套用到管道
- 讀取輸入(在本範例中:讀取文字檔)
- 套用 ParDo 轉換
- 套用 SDK 提供的轉換(在本範例中:Count)
- 寫入輸出(在本範例中:寫入文字檔)
- 執行管道
以下各節將使用 MinimalWordCount 管道中的相關程式碼摘錄來詳細說明這些概念。
建立管道
在本範例中,程式碼首先建立 PipelineOptions
物件。此物件可讓我們為管道設定各種選項,例如將執行我們管道的管道執行器以及所選執行器所需的任何執行器特定設定。在本範例中,我們以程式設計方式設定這些選項,但更常見的是,使用命令列引數來設定 PipelineOptions
。
您可以為執行管道指定執行器,例如 DataflowRunner
或 SparkRunner
。如果您省略指定執行器,如本範例所示,您的管道將使用 DirectRunner
在本機執行。在下一節中,我們將指定管道的執行器。
// Create a PipelineOptions object. This object lets us set various execution
// options for our pipeline, such as the runner you wish to use. This example
// will run with the DirectRunner by default, based on the class path configured
// in its dependencies.
PipelineOptions options = PipelineOptionsFactory.create();
from apache_beam.options.pipeline_options import PipelineOptions
input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://my-bucket/counts.txt'
beam_options = PipelineOptions(
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
)
下一步是使用我們剛建構的選項建立 Pipeline
物件。Pipeline 物件會建立要執行的轉換圖形,並與該特定管道相關聯。
第一步是建立 Pipeline
物件。它會建立要執行的轉換圖形,並與該特定管道相關聯。範圍允許分組為複合轉換。
套用管道轉換
MinimalWordCount 管道包含多個轉換,用於將資料讀取到管道中、操作或以其他方式轉換資料,以及寫出結果。轉換可以由單一操作組成,或者可以包含多個巢狀轉換(這是複合轉換)。
每個轉換都會採用某種輸入資料並產生一些輸出資料。輸入和輸出資料通常由 SDK 類別 PCollection
表示。PCollection
是一個由 Beam SDK 提供的特殊類別,您可以使用它來表示幾乎任何大小的資料集,包括無界限的資料集。
圖 1:MinimalWordCount 管道資料流。
MinimalWordCount 管道包含五個轉換
- 文字檔
Read
轉換會套用到Pipeline
物件本身,並產生PCollection
作為輸出。輸出PCollection
中的每個元素都代表輸入檔中的一行文字。此範例使用儲存在可公開存取的 Google Cloud Storage 值區 ("gs://") 中的輸入資料。
- 此轉換會分割
PCollection<String>
中的行,其中每個元素都是莎士比亞合集中個別的單字。或者,可以使用 ParDo 轉換,在每個元素上呼叫DoFn
(定義為內嵌匿名類別),將文字行符記化為個別的單字。此轉換的輸入是先前的TextIO.Read
轉換產生的文字行PCollection
。ParDo
轉換會輸出新的PCollection
,其中每個元素都代表文字中的個別單字。
SDK 提供的
Count
轉換是一個通用轉換,它採用任何類型的PCollection
,並傳回鍵/值對的PCollection
。每個鍵都代表輸入集合中的唯一元素,每個值都代表該鍵在輸入集合中出現的次數。在此管道中,
Count
的輸入是先前的ParDo
產生的個別單字PCollection
,輸出是鍵/值對的PCollection
,其中每個鍵代表文字中的唯一單字,而相關值是每個單字的出現次數。
下一個轉換會將每個唯一單字和出現次數的鍵/值對格式化為可列印的字串,適合寫入輸出檔案。
map 轉換是一個較高層級的複合轉換,它封裝了一個簡單的
ParDo
。對於輸入PCollection
中的每個元素,map 轉換會套用一個函式,該函式會產生一個輸出元素。
- 文字檔寫入轉換。此轉換會採用格式化字串的最終
PCollection
作為輸入,並將每個元素寫入輸出文字檔。輸入PCollection
中的每個元素都代表產生的輸出檔案中的一行文字。
請注意,Write
轉換會產生 PDone
類型的瑣碎結果值,在本例中,該值將被忽略。
請注意,Write
轉換不會傳回任何 PCollection。
執行管道
藉由呼叫 run
方法來執行管道,該方法會將您的管道傳送到您在 PipelineOptions
中指定的管道執行器執行。
將管道傳遞給執行器來執行管道。
請注意,run
方法是非同步的。對於阻塞式執行,請在呼叫 run
所傳回的結果物件上呼叫 waitUntilFinish
wait_until_finish
方法。
在 Playground 中試用完整範例
WordCount 範例
此 WordCount 範例介紹了一些建議的程式設計實務,這些實務可以讓您的管道更容易讀取、撰寫和維護。雖然不是明確要求的,但它們可以使管道的執行更具彈性、有助於測試管道,並有助於使管道的程式碼可重複使用。
本節假設您已充分了解建立管道的基本概念。如果您覺得自己還沒達到那個程度,請閱讀上面的章節:MinimalWordCount。
若要在 Java 中執行此範例
依照Java WordCount 快速入門所述,設定您的開發環境並產生 Maven 原型。然後使用其中一個執行器執行管道。
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
-Pdataflow-runner
若要檢視 Java 的完整程式碼,請參閱 WordCount。
若要在 Python 中執行此範例
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://YOUR_GCS_BUCKET/counts \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
若要檢視 Python 的完整程式碼,請參閱 wordcount.py。
若要在 Go 中執行此範例
$ go install github.com/apache/beam/sdks/v2/go/examples/wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache/beam_go_sdk:latest
若要檢視 Go 的完整程式碼,請參閱 wordcount.go。
新概念
- 使用明確的
DoFn
應用ParDo
- 建立複合轉換
- 使用可參數化的
PipelineOptions
以下章節將詳細說明這些關鍵概念,並將管道程式碼分解為較小的部分。
指定明確的 DoFn
當使用 ParDo
轉換時,您需要指定套用至輸入 PCollection
中每個元素的處理操作。此處理操作是 SDK 類別 DoFn
的子類別。您可以為每個 ParDo
內嵌建立 DoFn
子類別,作為匿名內部類別的實例,如同先前的範例 (MinimalWordCount) 中所做的。但是,在全域層級定義 DoFn
通常是一個好主意,這使其更容易進行單元測試,並可使 ParDo
程式碼更具可讀性。
當使用 ParDo
轉換時,您需要指定套用至輸入 PCollection
中每個元素的處理操作。此處理操作可以是具名函式或具有特殊命名方法的結構。您可以使用匿名函式 (但不能使用閉包)。但是,在全域層級定義 DoFn
通常是一個好主意,這使其更容易進行單元測試,並可使 ParDo
程式碼更具可讀性。
建立複合轉換
如果您有一個包含多個轉換或 ParDo
步驟的處理操作,您可以將其建立為 PTransform
的子類別。建立 PTransform
子類別可讓您封裝複雜的轉換,可以使您的管道結構更清晰和模組化,並使單元測試更容易。
如果您有一個包含多個轉換或 ParDo
步驟的處理操作,您可以使用一般的 Go 函式來封裝它們。您還可以進一步使用具名的子範圍將它們分組為可見於監控的複合轉換。
在此範例中,兩個轉換被封裝為 PTransform
子類別 CountWords
。 CountWords
包含執行 ExtractWordsFn
的 ParDo
和 SDK 提供的 Count
轉換。
在此範例中,兩個轉換被封裝為 CountWords
函式。
當定義 CountWords
時,我們指定其最終的輸入和輸出;輸入是提取操作的 PCollection<String>
,輸出是由計數操作產生的 PCollection<KV<String, Long>>
。
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
public static void main(String[] args) throws IOException {
Pipeline p = ...
p.apply(...)
.apply(new CountWords())
...
}
使用可參數化的 PipelineOptions
您可以在執行管道時硬式編碼各種執行選項。但是,更常見的方法是透過命令列引數剖析定義您自己的組態選項。透過命令列定義您的組態選項可使程式碼更容易在不同的執行器之間移植。
新增要由命令列剖析器處理的引數,並為它們指定預設值。然後您可以在管道程式碼中存取選項值。
您可以使用標準的 flag
套件來達到此目的。
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
...
}
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
...
}
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = pipeline | beam.io.ReadFromText(args.input_file)
在 Playground 中試用完整範例
DebuggingWordCount 範例
DebuggingWordCount 範例示範了一些檢測您的管道程式碼的最佳實務。
若要在 Java 中執行此範例
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.DebuggingWordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
-Pdataflow-runner
若要檢視 Java 的完整程式碼,請參閱 DebuggingWordCount。
若要在 Python 中執行此範例
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://YOUR_GCS_BUCKET/counts \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
若要檢視 Python 的完整程式碼,請參閱 wordcount_debugging.py。
若要在 Go 中執行此範例
$ go install github.com/apache/beam/sdks/v2/go/examples/debugging_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ debugging_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--region your-gcp-region \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
若要檢視 Go 的完整程式碼,請參閱 debugging_wordcount.go。
新概念
- 記錄
- 透過
PAssert
測試您的管道
以下章節將詳細說明這些關鍵概念,並將管道程式碼分解為較小的部分。
記錄
每個執行器可能會選擇以自己的方式處理記錄。
// This example uses .trace and .debug:
public class DebuggingWordCount {
public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
...
@ProcessElement
public void processElement(ProcessContext c) {
if (...) {
...
LOG.debug("Matched: " + c.element().getKey());
} else {
...
LOG.trace("Did not match: " + c.element().getKey());
}
}
}
}
# [START example_wordcount_debugging_aggregators]
import logging
class FilterTextFn(beam.DoFn):
"""A DoFn that filters for a specific key based on a regular expression."""
def __init__(self, pattern):
self.pattern = pattern
# A custom metric can track values in your pipeline as it runs. Create
# custom metrics matched_word and unmatched_words.
self.matched_words = Metrics.counter(self.__class__, 'matched_words')
self.umatched_words = Metrics.counter(self.__class__, 'umatched_words')
def process(self, element):
word, _ = element
if re.match(self.pattern, word):
# Log at INFO level each element we match. When executing this pipeline
# using the Dataflow service, these log lines will appear in the Cloud
# Logging UI.
logging.info('Matched %s', word)
# Add 1 to the custom metric counter matched_words
self.matched_words.inc()
yield element
else:
# Log at the "DEBUG" level each element that is not matched. Different
# log levels can be used to control the verbosity of logging providing
# an effective mechanism to filter less important information. Note
# currently only "INFO" and higher level logs are emitted to the Cloud
# Logger. This log message will not be visible in the Cloud Logger.
logging.debug('Did not match %s', word)
# Add 1 to the custom metric counter umatched_words
self.umatched_words.inc()
type filterFn struct {
...
}
func (f *filterFn) ProcessElement(ctx context.Context, word string, count int, emit func(string, int)) {
if f.re.MatchString(word) {
// Log at the "INFO" level each element that we match.
log.Infof(ctx, "Matched: %v", word)
emit(word, count)
} else {
// Log at the "DEBUG" level each element that is not matched.
log.Debugf(ctx, "Did not match: %v", word)
}
}
Direct Runner
當使用 DirectRunner
執行您的管道時,您可以將記錄訊息直接列印到您的本機主控台。 如果您使用 Beam SDK for Java,則必須將 Slf4j
新增到您的類別路徑。
Cloud Dataflow Runner
當使用 DataflowRunner
執行您的管道時,您可以使用 Stackdriver Logging。Stackdriver Logging 會將所有 Cloud Dataflow 工作者的記錄彙整到 Google Cloud Platform 主控台中的單一位置。您可以使用 Stackdriver Logging 來搜尋和存取 Cloud Dataflow 已啟動以完成您工作的所有工作者的記錄。您管道的 DoFn
實例中的記錄陳述式將在您的管道執行時出現在 Stackdriver Logging 中。
您也可以控制工作者的記錄層級。預設情況下,執行使用者程式碼的 Cloud Dataflow 工作者會設定為以「INFO」記錄層級和更高層級記錄到 Stackdriver Logging。您可以透過指定以下內容來覆寫特定記錄命名空間的記錄層級:--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
。例如,在執行使用 Cloud Dataflow 服務的管道時,透過指定 --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}
,Stackdriver Logging 除了預設的「INFO」或更高層級記錄外,還將包含該套件的「DEBUG」或更高層級記錄。
可以透過指定 --defaultWorkerLogLevel=<TRACE, DEBUG, INFO, WARN, ERROR 其中之一>
來覆寫預設的 Cloud Dataflow 工作者記錄組態。例如,當使用 Cloud Dataflow 服務執行管道時,透過指定 --defaultWorkerLogLevel=DEBUG
,Cloud Logging 將包含所有「DEBUG」或更高層級的記錄。請注意,將預設工作者記錄層級變更為 TRACE 或 DEBUG 會大幅增加記錄輸出量。
Apache Spark Runner
注意:此章節尚未新增。有一個關於此的未解決問題 (Issue 18076)。
Apache Flink Runner
注意:此章節尚未新增。有一個關於此的未解決問題 (Issue 18075)。
Apache Nemo Runner
當使用 NemoRunner
執行您的管道時,大多數記錄訊息會直接列印到您的本機主控台。您應該將 Slf4j
新增到您的類別路徑,以充分利用記錄。為了觀察驅動程式和執行器端的記錄,您應該觀察 Apache REEF 建立的資料夾。例如,當透過本機執行階段執行您的管道時,會在您的工作目錄上建立一個名為 REEF_LOCAL_RUNTIME
的資料夾,並且可以在該目錄下找到記錄和指標資訊。
使用斷言測試您的管道
PAssert
assert_that
是一組方便的 PTransforms,其風格類似於 Hamcrest 的集合比對器,可用於編寫管道層級測試以驗證 PCollection 的內容。斷言最適合在具有小型資料集的單元測試中使用。
passert
套件包含方便的 PTransforms,可用於編寫管道層級測試以驗證 PCollection 的內容。斷言最適合在具有小型資料集的單元測試中使用。
以下範例驗證篩選後的字組集符合我們預期的計數。斷言不會產生任何輸出,並且僅當滿足所有預期時,管道才會成功。
以下範例驗證兩個集合包含相同的值。斷言不會產生任何輸出,並且僅當滿足所有預期時,管道才會成功。
請參閱 DebuggingWordCountTest 以取得單元測試範例。
在 Playground 中試用完整範例
WindowedWordCount 範例
WindowedWordCount 範例如同先前的範例一樣計算文字中的字組,但引入了幾個進階概念。
新概念
- 無界限和有界限的資料集
- 將時間戳記新增至資料
- 視窗化
- 在視窗化的 PCollection 上重複使用 PTransform
以下章節將詳細說明這些關鍵概念,並將管道程式碼分解為較小的部分。
若要在 Java 中執行此範例
$ mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
--inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
You can monitor the running job by visiting the Flink dashboard at http://<flink master>:8081
$ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
--project=YOUR_PROJECT --region=GCE_REGION \
--inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
-Pdataflow-runner
若要檢視 Java 的完整程式碼,請參閱 WindowedWordCount。
若要在 Python 中執行此範例
此管道使用 PROJECT:DATASET.TABLE
或 DATASET.TABLE
格式,將結果寫入 BigQuery 表格 --output_table
參數。
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
--output_table PROJECT:DATASET.TABLE \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--temp_location gs://YOUR_GCS_BUCKET/tmp/
若要檢視 Python 的完整程式碼,請參閱 windowed_wordcount.py。
若要在 Go 中執行此範例
$ go install github.com/apache/beam/sdks/v2/go/examples/windowed_wordcount
# As part of the initial setup, for non linux users - install package unix before run
$ go get -u golang.org/x/sys/unix
$ windowed_wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://<your-gcs-bucket>/counts \
--runner dataflow \
--project your-gcp-project \
--temp_location gs://<your-gcs-bucket>/tmp/ \
--staging_location gs://<your-gcs-bucket>/binaries/ \
--worker_harness_container_image=apache-docker-beam-snapshots-docker.bintray.io/beam/go:20180515
若要檢視 Go 的完整程式碼,請參閱 windowed_wordcount.go。
無界限和有界限的資料集
Beam 可讓您建立單一管道,可以處理有界和無界資料集。如果您的資料集具有固定數量的元素,則它是 有界資料集,並且可以一起處理所有資料。對於有界資料集,要問的問題是「我是否擁有所有資料?」如果資料持續到達 (例如 行動遊戲範例中的無盡遊戲分數串流),則它是 無界資料集。無界資料集永遠無法在任何時間進行處理,因此必須使用持續執行的串流管道來處理資料。資料集僅會完整到特定點,因此要問的問題是「我擁有資料直到哪個點?」Beam 使用視窗化將持續更新的資料集劃分為有限大小的邏輯視窗。如果您的輸入是無界的,則必須使用支援串流的執行器。
如果您的管道的輸入是有界的,則所有下游的 PCollection 也將是有界的。同樣地,如果輸入是無界的,則管道的所有下游 PCollection 也將是無界的,儘管單獨的分支可能是獨立有界的。
回想一下,此範例的輸入是一組莎士比亞的文本,這是一組有限的資料。因此,此範例從文字檔讀取有界資料
def main(arvg=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input-file',
dest='input_file',
default='/Users/home/words-example.txt')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
lines = p | 'read' >> ReadFromText(known_args.input_file)
將時間戳記新增至資料
PCollection
中的每個元素都有一個關聯的時間戳記。每個元素的時間戳記最初由建立 PCollection
的來源指派。某些建立無界 PCollection 的來源可以為每個新元素指派一個與讀取或新增元素時相對應的時間戳記。您可以使用 DoFn
手動指派或調整時間戳記;但是,您只能將時間戳記向前移動。
在此範例中,輸入是有界的。為了範例的目的,名為 AddTimestampsFn
的 DoFn
方法 (由 ParDo
叫用) 將為 PCollection
中的每個元素設定時間戳記。
以下是 AddTimestampFn
的程式碼,AddTimestampFn
是一個由 ParDo
叫用的 DoFn
,它會設定時間戳記的資料元素 (假設元素本身)。例如,如果元素是記錄行,則此 ParDo
可以從記錄字串中剖析時間並將其設定為元素的時間戳記。莎士比亞的作品中沒有固有的時間戳記,因此在此範例中,我們僅為了說明這個概念而虛構了隨機時間戳記。輸入文字的每一行都會在 2 小時的時間段內取得隨機關聯的時間戳記。
static class AddTimestampFn extends DoFn<String, String> {
private final Instant minTimestamp;
private final Instant maxTimestamp;
AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
}
@ProcessElement
public void processElement(ProcessContext c) {
Instant randomTimestamp =
new Instant(
ThreadLocalRandom.current()
.nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
/**
* Concept #2: Set the data element with that timestamp.
*/
c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
}
}
請注意,使用 beam.X
「類型變數」可讓轉換用於任何類型。
視窗化
Beam 使用稱為視窗化的概念,將 PCollection
細分為有界元素的集合。彙總多個元素的 PTransforms 會將每個 PCollection
處理為一系列多個有限視窗,即使整個集合本身可能是無限大小的 (無界的)。
WindowedWordCount 範例會套用固定時間視窗化,其中每個視窗代表固定的時間間隔。此範例的固定視窗大小預設為 1 分鐘 (您可以使用命令列選項變更此值)。
在視窗化的 PCollection 上重複使用 PTransform
您也可以重複使用為處理視窗化 PCollection 建立的現有 PTransforms。
在 Playground 中試用完整範例
StreamingWordCount 範例
StreamingWordCount 範例是一個串流管道,它從 Pub/Sub 訂閱或主題讀取 Pub/Sub 訊息,並對每個訊息中的字組執行頻率計數。與 WindowedWordCount 類似,此範例會套用固定時間視窗化,其中每個視窗代表固定的時間間隔。此範例的固定視窗大小為 15 秒。管道會輸出在每個 15 秒視窗中看到的字組的頻率計數。
新概念
- 讀取無界限的資料集
- 寫入無界限的結果
若要在 Java 中執行此範例
注意: StreamingWordCount 尚未適用於 Java SDK。
若要在 Python 中執行此範例
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
python -m apache_beam.examples.streaming_wordcount \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/ \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
若要查看完整的 Python 程式碼,請參閱 streaming_wordcount.py。
若要在 Go 中執行此範例
注意: StreamingWordCount 目前不適用於 Go SDK。此問題目前已開啟 ( Issue 18879 )。
讀取無界限的資料集
此範例使用無邊界資料集作為輸入。程式碼使用 beam.io.ReadFromPubSub
從 Pub/Sub 訂閱或主題讀取 Pub/Sub 訊息。
寫入無界限的結果
當輸入為無邊界時,輸出 PCollection
也會是如此。因此,您必須確保為結果選擇適當的 I/O。某些 I/O 僅支援邊界輸出,而其他 I/O 則同時支援邊界和無邊界輸出。
此範例使用無邊界的 PCollection
,並將結果串流至 Google Pub/Sub。程式碼會格式化結果,並使用 beam.io.WriteToPubSub
將結果寫入 Pub/Sub 主題。
下一步
- 請瀏覽 行動遊戲範例逐步解說 中的行動遊戲範例。
- 請透過我們的 學習資源 自行進度學習。
- 深入了解我們最喜歡的一些 影片和 Podcast。
- 加入 Beam users@ 電子郵件清單。
如果您遇到任何問題,請隨時 與我們聯繫!
上次更新時間:2024/10/31
您是否找到了想要的一切?
這些資訊是否實用且清楚?您是否想要修改任何內容?請告訴我們!