Python 串流管線
Python 串流管線執行從 Beam SDK 2.5.0 版本開始提供(有一些限制)。
為什麼要使用串流執行?
如果您的管線從串流或持續更新的資料來源(例如 Cloud Pub/Sub)讀取,Beam 會建立一個無界的 PCollection。執行器必須使用持續執行的串流工作來處理無界的 PCollection,因為整個集合永遠無法一次處理。大小和界限有關於有界和無界集合的更多資訊。
修改管線以使用串流處理
若要修改批次管線以支援串流,您必須進行以下程式碼變更
- 使用支援從無界來源讀取的 I/O 連接器。
- 使用支援寫入無界來源的 I/O 連接器。
- 選擇視窗化策略。
適用於 Python 的 Beam SDK 包含兩個支援無界 PCollection 的 I/O 連接器:Google Cloud Pub/Sub(讀取和寫入)和 Google BigQuery(寫入)。
以下程式碼片段顯示了修改批次 WordCount 範例以支援串流所需的程式碼變更
這些批次 WordCount 程式碼片段來自 wordcount.py。此程式碼使用 TextIO I/O 連接器從有界集合讀取和寫入。
lines = p | 'read' >> ReadFromText(known_args.input)
...
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
...
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
output | 'write' >> WriteToText(known_args.output)
這些串流 WordCount 程式碼片段來自 streaming_wordcount.py。此程式碼使用從無界來源(Cloud Pub/Sub)讀取和寫入的 I/O 連接器,並指定一個固定的視窗化策略。
lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
...
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(six.text_type))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
...
output = counts | 'format' >> beam.Map(format_result)
# Write to Pub/Sub
output | beam.io.WriteStringsToPubSub(known_args.output_topic)
執行串流管線
若要執行範例串流 WordCount 管線,您必須擁有 Cloud Pub/Sub 輸入主題和輸出主題。若要建立主題、訂閱和從主題提取以進行測試,您可以使用Cloud Pub/Sub 快速入門中的命令。
以下簡單的 bash 腳本將輸入文字檔的行饋送到您的輸入主題
cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done
或者,您可以從公開可用的 Cloud Pub/Sub 串流讀取,例如 projects/pubsub-public-data/topics/taxirides-realtime
。但是,您必須建立自己的輸出主題以測試寫入。
以下命令執行 streaming_wordcount.py 範例串流管線。指定您的 Cloud Pub/Sub 專案和輸入主題 (--input_topic
),以及輸出 Cloud Pub/Sub 專案和主題 (--output_topic
)。
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]
# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
--runner DataflowRunner \
--project YOUR_GCP_PROJECT \
--region YOUR_GCP_REGION \
--temp_location gs://YOUR_GCS_BUCKET/tmp/ \
--input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
--output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
--streaming
請查看您的執行器的文件,了解有關執行串流管線的任何其他執行器特定資訊
不支援的功能
Python 串流執行目前不支援以下功能
- 自訂來源 API
- 使用者定義的自訂合併
WindowFn
(使用 fnapi) - 對於可攜式執行器,請參閱可攜性支援表。