Apache Beam Python SDK 快速入門

此快速入門將示範如何使用 範例管線,該管線使用 Apache Beam Python SDK,並使用 Direct Runner。Direct Runner 會在本機電腦上執行管線。

如果您有興趣為 Apache Beam Python 程式碼庫做出貢獻,請參閱貢獻指南

本頁內容

設定您的開發環境

Apache Beam 的目標是支援已發佈且尚未達到生命週期終止的 Python 版本,但可能需要幾個版本,Apache Beam 才會完全支援最新發佈的 Python 次要版本。

所需的最低 Python 版本列在 apache-beam 專案頁面的Meta區段下的Requires中。所有支援的 Python 版本列表列在頁面底部的Classifiers區段下的Programming Language中。

執行以下命令以檢查您的 Python 版本

python3 --version

如果您沒有 Python 直譯器,您可以從 Python 下載頁面下載並安裝。

如果您需要安裝其他 Python 版本(除了您已有的版本之外),您可以在我們的 開發人員 Wiki 中找到一些建議。

複製 GitHub 儲存庫

複製或下載 apache/beam-starter-python GitHub 儲存庫,並切換到 beam-starter-python 目錄。

git clone https://github.com/apache/beam-starter-python.git
cd beam-starter-python

建立並啟用虛擬環境

虛擬環境是一個目錄樹,其中包含自己的 Python 發行版本。我們建議使用虛擬環境,以便您的專案的所有相依性都安裝在隔離且獨立的環境中。若要設定虛擬環境,請執行以下命令

# Create a new Python virtual environment.
python3 -m venv env

# Activate the virtual environment.
source env/bin/activate

如果這些命令在您的平台上無法運作,請參閱 venv 文件。

安裝專案相依性

執行以下命令,從 requirements.txt 檔案安裝專案的相依性

pip install -e .

執行快速入門

執行以下命令

python main.py --input-text="Greetings"

輸出結果類似如下

Hello
World!
Greetings

這些行可能會以不同的順序出現。

執行以下命令以停用虛擬環境

deactivate

探索程式碼

此快速入門的主要程式碼檔案是 app.py (GitHub)。此程式碼執行以下步驟

  1. 建立 Beam 管線。
  2. 建立初始 PCollection
  3. PCollection 套用轉換。
  4. 使用 Direct Runner 執行管線。

建立管線

此程式碼首先建立一個 Pipeline 物件。Pipeline 物件會建立要執行的轉換圖形。

with beam.Pipeline(options=beam_options) as pipeline:

此處顯示的 beam_option 變數是一個 PipelineOptions 物件,用於設定管線的選項。如需詳細資訊,請參閱設定管線選項

建立初始 PCollection

PCollection 抽象概念代表潛在的分散式多元素資料集。Beam 管線需要一個資料來源來填入初始 PCollection。來源可以是有限的(具有已知、固定的大小)或無限的(具有無限的大小)。

此範例使用 Create 方法從記憶體中的字串陣列建立 PCollection。產生的 PCollection 包含字串「Hello」、「World!」和使用者提供的輸入字串。

pipeline
| "Create elements" >> beam.Create(["Hello", "World!", input_text])

注意:管道運算子 | 用於串聯轉換。

對 PCollection 套用轉換

轉換可以變更、篩選、群組、分析或以其他方式處理 PCollection 中的元素。此範例使用 Map 轉換,它會將集合的元素對應到新的集合

| "Print elements" >> beam.Map(print)

執行管線

若要執行管線,您可以呼叫 Pipeline.run 方法

pipeline.run.wait_until_finish()

但是,透過將 Pipeline 物件包裝在 with 陳述式中,會自動叫用 run 方法。

with beam.Pipeline(options=beam_options) as pipeline:
    # ...
    # run() is called automatically

Beam 執行器會在特定平台上執行 Beam 管線。如果您未指定執行器,則預設值為 Direct Runner。Direct Runner 會在本機電腦上執行管線。它適用於測試和開發,而不是為了效率而最佳化。如需詳細資訊,請參閱使用 Direct Runner

對於生產工作負載,您通常會使用分散式執行器,該執行器會在 Apache Flink、Apache Spark 或 Google Cloud Dataflow 等巨量資料處理系統上執行管線。這些系統支援大規模並行處理。

下一步

如果您遇到任何問題,請隨時聯絡我們