部落格
2020/12/16
DataFrame API 預覽版現已推出!
我們很高興宣布 Beam Python SDK 新的 DataFrame API 預覽版已在 Beam 2.26.0 中推出。 就像 SqlTransform
(Java, Python) 一樣,DataFrame API 讓 Beam 使用者能夠以比以往更簡潔的方式表達複雜的關聯邏輯。
更具表現力的 API
Beam 新的 DataFrame API 旨在與知名的 Pandas DataFrame API 相容,但有一些細節如下所述。 使用這個新的 API,可以非常簡潔地表達一個簡單的 pipeline,該 pipeline 從 CSV 讀取紐約市計程車乘車數據、執行分組聚合,並將輸出寫入 CSV。
from apache_beam.dataframe.io import read_csv
with beam.Pipeline() as p:
df = p | read_csv("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
usecols=['passenger_count' , 'DOLocationID'])
# Count the number of passengers dropped off per LocationID
agg = df.groupby('DOLocationID').sum()
agg.to_csv(output)
將其與使用 CombinePerKey
實現的傳統 Beam python pipeline 的相同邏輯進行比較
with beam.Pipeline() as p:
(p | beam.io.ReadFromText("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
skip_header_lines=1)
| beam.Map(lambda line: line.split(','))
# Parse CSV, create key - value pairs
| beam.Map(lambda splits: (int(splits[8] or 0), # DOLocationID
int(splits[3] or 0))) # passenger_count
# Sum values per key
| beam.CombinePerKey(sum)
| beam.MapTuple(lambda loc_id, pc: f'{loc_id},{pc}')
| beam.io.WriteToText(known_args.output))
DataFrame 範例更容易快速檢查和理解,因為它允許您簡潔地表達分組聚合,而無需使用底層的 CombinePerKey
。
除了更具表現力之外,使用 DataFrame API 寫入的 pipeline 通常比傳統的 Beam pipeline 更有效率。 這是因為 DataFrame API 盡可能延遲到非常有效率的柱狀 Pandas 實作。
DataFrames 作為 DSL
您可能已經知道 Beam SQL,它是一個使用 Beam 的 Java SDK 建構的領域特定語言 (DSL)。 SQL 被認為是一種 DSL,因為可以使用 SQL 完全表達完整的 pipeline,包括 IO 和複雜操作。
同樣地,DataFrame API 是一個使用 Python SDK 建構的 DSL。 您可以看到上面的範例是在沒有傳統 Beam 建構 (例如 IO、ParDo 或 CombinePerKey) 的情況下寫入的。 事實上,唯一傳統的 Beam 類型是 Pipeline 實例! 否則,此 pipeline 完全是使用 DataFrame API 寫入的。 這是因為 DataFrame API 不僅僅是實作 Pandas 的計算操作,它還包括基於 Pandas 原生實作的 IO (pd.read_{csv,parquet,...}
和 pd.DataFrame.to_{csv,parquet,...}
)。
與 SQL 類似,也可以透過使用 schemas 將 DataFrame API 嵌入到更大的 pipeline 中。 可以將有 schema 感知的 PCollection 轉換為 DataFrame、進行處理,然後將結果轉換回另一個有 schema 感知的 PCollection。 例如,如果您想要使用傳統的 Beam IO 而不是 DataFrame IO 中的一個,您可以像這樣重寫上面的 pipeline
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
with beam.Pipeline() as p:
...
schema_pc = (p | beam.ReadFromText(..)
# Use beam.Select to assign a schema
| beam.Select(DOLocationID=lambda line: int(...),
passenger_count=lambda line: int(...)))
df = to_dataframe(schema_pc)
agg = df.groupby('DOLocationID').sum()
agg_pc = to_pcollection(pc)
# agg_pc has a schema based on the structure of agg
(agg_pc | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
| beam.WriteToText(..))
也可以透過將函數傳遞給 DataframeTransform
來使用 DataFrame API
from apache_beam.dataframe.transforms import DataframeTransform
with beam.Pipeline() as p:
...
| beam.Select(DOLocationID=lambda line: int(..),
passenger_count=lambda line: int(..))
| DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
| beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
...
注意事項
如上所述,Beam 的 DataFrame API 和 Pandas API 之間存在一些差異。 最顯著的差異是 Beam DataFrame API 是延遲的,就像 Beam API 的其他部分一樣。 這表示您無法 print()
DataFrame 實例來檢查資料,因為我們尚未計算資料! 計算直到 pipeline run()
後才會發生。 在那之前,我們只知道結果的形狀/schema (即欄的名稱和類型),而不是結果本身。
嘗試使用某些 Pandas 操作時,您可能會看到一些常見的例外狀況
- NotImplementedError: 表示這是一個我們尚未有時間查看的操作或引數。 我們已盡可能在新的 API 的預覽版中提供許多 Pandas 操作,但仍有很長的路要走。
- WontImplementError: 表示這是一個我們不打算在近期內支援的操作或引數,因為它與 Beam 模型不相容。 產生此錯誤的最大類別操作是那些對順序敏感的操作 (例如 shift、cummax、cummin、head、tail 等)。 這些無法輕易地對應到 Beam,因為代表分散式資料集的 PCollection 是無序的。 請注意,即使未來可能會實作其中一些操作 - 我們實際上有一些關於如何支援對順序敏感的操作的想法 - 但還有一段路要走。
最後,務必注意,這是一個新功能的預覽版,將在接下來的幾個 Beam 版本中加強。 我們希望您現在就試用一下並給我們一些回饋,但我們目前不建議將其用於生產工作負載。
如何參與
參與這項工作的最簡單方法是試用 DataFrames 並告訴我們您的想法! 您可以將問題發送到 user@beam.apache.org,或在 jira 中提交錯誤報告和功能請求。 特別是,如果您發現我們尚未實作但您覺得有用的操作,這將非常有幫助,以便我們可以優先處理它。
如果您想了解更多關於 DataFrame API 如何在底層運作的資訊並參與開發,我們建議您查看 設計文件 和我們的 Beam 高峰會演講。 從那裡來看,最好的協助方法是解決那些尚未實作的操作。 我們正在 BEAM-9547 中協調該工作。