Beam DataFrames 概觀

在 Colab 中執行 在 Colab 中執行





Apache Beam Python SDK 提供了一個 DataFrame API,用於處理類似 pandas 的 DataFrame 物件。此功能可讓您將 PCollection 轉換為 DataFrame,然後使用 pandas DataFrame API 上可用的標準方法與 DataFrame 互動。DataFrame API 建構在 pandas 實作之上,pandas DataFrame 方法會在資料集的子集上平行調用。Beam DataFrames 與 pandas DataFrames 之間的最大差異在於,操作是由 Beam API 延遲的,以支援 Beam 平行處理模型。(若要進一步了解 DataFrame 實作之間的差異,請參閱與 pandas 的差異。)

您可以將 Beam DataFrames 視為 Beam 管線的領域特定語言 (DSL)。與Beam SQL 類似,DataFrames 是內建於 Beam Python SDK 中的 DSL。使用此 DSL,您可以建立管線,而無需參考標準的 Beam 建構,例如 ParDoCombinePerKey

Beam DataFrame API 旨在提供 Beam 管線中熟悉的程式設計介面。在某些情況下,DataFrame API 也可以透過延遲到高效的向量化 pandas 實作來提高管線效率。

什麼是 DataFrame?

如果您不熟悉 pandas DataFrames,您可以從閱讀 10 分鐘掌握 pandas 開始,其中顯示如何匯入和使用 pandas 套件。pandas 是一個用於資料操作和分析的開放原始碼 Python 程式庫。它提供了簡化處理關聯或標記資料的資料結構。這些資料結構之一是 DataFrame,其中包含二維表格資料,並為資料提供標記的列和欄。

先決條件

若要使用 Beam DataFrames,您需要安裝 Beam Python 2.26.0 或更高版本(如需完整的設定說明,請參閱Apache Beam Python SDK 快速入門)和支援的 pandas 版本。在 Beam 2.34.0 及更新版本中,最簡單的方式是使用「dataframe」額外功能

pip install apache_beam[dataframe]

請注意,當在分散式執行器上執行 DataFrame API 管線時,應在 Worker 上安裝相同pandas 版本。請參考 base_image_requirements.txt,了解您正在使用的 Python 版本和 Beam 發行版本,以查看 Worker 預設會使用哪個版本的 pandas

使用 DataFrames

您可以使用 DataFrames,如下列範例所示,該範例會從 CSV 檔案讀取紐約市的計程車資料、執行分組聚合,並將輸出寫回 CSV

from apache_beam.dataframe.io import read_csv

with pipeline as p:
  rides = p | read_csv(input_path)

  # Count the number of passengers dropped off per LocationID
  agg = rides.groupby('DOLocationID').passenger_count.sum()
  agg.to_csv(output_path)

pandas 可以從 CSV 資料的第一列推斷欄名,這就是 passenger_countDOLocationID 的來源。

在此範例中,唯一傳統的 Beam 類型是 Pipeline 執行個體。否則,此範例完全以 DataFrame API 編寫。這是可行的,因為 Beam DataFrame API 包含自己的 IO 操作(例如,read_csvto_csv),這些操作基於 pandas 原生實作。read_*to_* 操作支援檔案模式和任何與 Beam 相容的檔案系統。分組是透過按鍵分組來完成的,並且可以在最終寫入之前應用任意的 pandas 操作(在本例中為 sum),而最終寫入會透過 to_csv 來執行。

Beam DataFrame API 的目標是與原生 pandas 實作相容,但在與 pandas 的差異中詳細說明了一些注意事項。

在管線中嵌入 DataFrames

若要在較大的管線中使用 DataFrames API,您可以將 PCollection 轉換為 DataFrame、處理 DataFrame,然後將 DataFrame 轉換回 PCollection。為了將 PCollection 轉換為 DataFrame 並轉換回來,您必須使用附加了結構描述的 PCollection。附加了結構描述的 PCollection 也稱為結構描述感知 PCollection。若要進一步了解如何將結構描述附加至 PCollection,請參閱建立結構描述

以下範例會建立一個結構描述感知 PCollection,使用 to_dataframe 將其轉換為 DataFrame,處理 DataFrame,然後使用 to_pcollection 將 DataFrame 轉換回 PCollection

from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
...


    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(known_args.input)

    words = (
        lines
        | 'Split' >> beam.FlatMap(
            lambda line: re.findall(r'[\w]+', line)).with_output_types(str)
        # Map to Row objects to generate a schema suitable for conversion
        # to a dataframe.
        | 'ToRows' >> beam.Map(lambda word: beam.Row(word=word)))

    df = to_dataframe(words)
    df['count'] = 1
    counted = df.groupby('word').sum()
    counted.to_csv(known_args.output)

    # Deferred DataFrames can also be converted back to schema'd PCollections
    counted_pc = to_pcollection(counted, include_indexes=True)


您可以在 GitHub 上找到完整的 wordcount 範例,以及其他範例 DataFrame 管線

也可以透過將函式傳遞給 DataframeTransform 來使用 DataFrame API

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f"{row.DOLocationID},{row.passenger_count}")
  ...

DataframeTransformSqlTransform 來自 Beam SQL DSL。其中 SqlTransform 將 SQL 查詢轉換為 PTransform,DataframeTransform 是一個 PTransform,它會套用一個接受並傳回 DataFrames 的函式。DataframeTransform 在您具有可以同時在 Beam 和一般 pandas DataFrames 上呼叫的獨立函式時,特別有用。

DataframeTransform 可以按名稱和關鍵字接受和傳回多個 PCollections,如下列範例所示

output = (pc1, pc2) | DataframeTransform(lambda df1, df2: ...)

output = {'a': pc, ...} | DataframeTransform(lambda a, ...: ...)

pc1, pc2 = {'a': pc} | DataframeTransform(lambda a: expr1, expr2)

{...} = {a: pc} | DataframeTransform(lambda a: {...})