Python 串流管線

Python 串流管線執行從 Beam SDK 2.5.0 版本開始提供(有一些限制)。

為什麼要使用串流執行?

如果您的管線從串流或持續更新的資料來源(例如 Cloud Pub/Sub)讀取,Beam 會建立一個無界的 PCollection。執行器必須使用持續執行的串流工作來處理無界的 PCollection,因為整個集合永遠無法一次處理。大小和界限有關於有界和無界集合的更多資訊。

修改管線以使用串流處理

若要修改批次管線以支援串流,您必須進行以下程式碼變更

適用於 Python 的 Beam SDK 包含兩個支援無界 PCollection 的 I/O 連接器:Google Cloud Pub/Sub(讀取和寫入)和 Google BigQuery(寫入)。

以下程式碼片段顯示了修改批次 WordCount 範例以支援串流所需的程式碼變更

這些批次 WordCount 程式碼片段來自 wordcount.py。此程式碼使用 TextIO I/O 連接器從有界集合讀取和寫入。

  lines = p | 'read' >> ReadFromText(known_args.input)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))
  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  output | 'write' >> WriteToText(known_args.output)

這些串流 WordCount 程式碼片段來自 streaming_wordcount.py。此程式碼使用從無界來源(Cloud Pub/Sub)讀取和寫入的 I/O 連接器,並指定一個固定的視窗化策略。

  lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | beam.WindowInto(window.FixedWindows(15, 0))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write to Pub/Sub
  output | beam.io.WriteStringsToPubSub(known_args.output_topic)

執行串流管線

若要執行範例串流 WordCount 管線,您必須擁有 Cloud Pub/Sub 輸入主題和輸出主題。若要建立主題、訂閱和從主題提取以進行測試,您可以使用Cloud Pub/Sub 快速入門中的命令。

以下簡單的 bash 腳本將輸入文字檔的行饋送到您的輸入主題

cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done

或者,您可以從公開可用的 Cloud Pub/Sub 串流讀取,例如 projects/pubsub-public-data/topics/taxirides-realtime。但是,您必須建立自己的輸出主題以測試寫入。

以下命令執行 streaming_wordcount.py 範例串流管線。指定您的 Cloud Pub/Sub 專案和輸入主題 (--input_topic),以及輸出 Cloud Pub/Sub 專案和主題 (--output_topic)。

# DirectRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
See /documentation/runners/spark/ for more information.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]

# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming

請查看您的執行器的文件,了解有關執行串流管線的任何其他執行器特定資訊

不支援的功能

Python 串流執行目前不支援以下功能