與 pandas 的差異

Apache Beam DataFrame API 旨在成為 pandas 的直接替代品,但有一些差異需要注意。本頁描述了 Beam 和 pandas API 之間的差異,並提供了使用 Beam DataFrame API 的提示。有關 Beam DataFrame API 中支援哪些操作和引數的完整參考,請參閱 apache_beam.dataframe.frames API 參考

使用 pandas 來源

Beam 操作始終與管線相關聯。若要將來源資料讀取到 Beam DataFrame 中,您必須將來源套用到管線物件。例如,若要從 CSV 檔案讀取輸入,您可以使用 read_csv,如下所示

df = p | beam.dataframe.io.read_csv(...)

這與 pandas 的 read_csv 類似,但 df 是一個代表檔案內容的延遲 Beam DataFrame。輸入檔名可以是 fileio.MatchFiles 所理解的任何檔案模式。

如需將來源和接收器與 DataFrame API 搭配使用的範例,請參閱 taxiride.py

不支援的操作類別

以下各節描述 Beam DataFrame API 尚未支援或支援但有注意事項的操作類別。如果適用,會建議替代方案。

無法平行化的操作

範例:DeferredDataFrame.quantileDeferredDataFrame.mode

為了支援分散式處理,Beam 會平行在資料子集上調用 DataFrame 操作。某些 DataFrame 操作無法平行化,而且這些操作預設會引發 NonParallelOperation 錯誤。

替代方案

如果您想要使用無法平行化的操作,您可以使用 beam.dataframe.allow_non_parallel_operations 區塊來保護它。例如

from apache_beam import dataframe

with dataframe.allow_non_parallel_operations():
  quantiles = df.quantile()

請注意,這會在單一節點上收集整個輸入資料集,因此有記憶體不足的風險。只有在您確定輸入資料夠小,可以在單一工作節點上處理時,才應該使用此替代方案。

產生非延遲欄位的操作

範例:DeferredDataFrame.pivotDeferredDataFrame.transposeDeferredSeries.factorize

Beam DataFrame 操作是延遲的,但結果 DataFrame 的結構描述並非如此,這表示結果欄位必須可以在不存取資料的情況下計算。某些 DataFrame 操作無法支援此用法,因此無法實作。這些操作會引發 WontImplementError

目前沒有此問題的替代方案。但是,未來 Beam Dataframe 可能會支援在類別欄位上執行非延遲欄位操作。此工作正在 Issue 20958 中追蹤。

產生非延遲值或繪圖的操作

範例:DeferredSeries.to_listDeferredSeries.arrayDeferredDataFrame.plot

由於 Beam 是一個延遲的 API,因此實作會產生非延遲值或繪圖的 DataFrame 操作是不可行的。如果調用這些操作,它們會引發 WontImplementError

未來可能會透過與 Interactive Beam 更緊密的整合來支援這些操作。若要追蹤此問題的進度,請追蹤 Issue 21638。如果您認為我們應該優先處理這項工作,您也可以與我們聯繫讓我們知道。

替代方案

如果您使用 Interactive Beam,您可以使用 collect 將資料集帶入本機記憶體,然後執行這些操作。

順序敏感的操作

範例:DeferredDataFrame.headDeferredSeries.diffDeferredDataFrame.interpolate

Beam PCollection 本質上是無序的,因此不支援對列順序敏感的 pandas 操作。這些操作會引發 WontImplementError

未來可能會支援對順序敏感的操作。若要追蹤此問題的進度,請追蹤 Issue 20862。如果您認為我們應該優先處理這項工作,您也可以與我們聯繫讓我們知道。

替代方案

如果您使用 Interactive Beam,您可以使用 collect 將資料集帶入本機記憶體,然後執行這些操作。

或者,可能有辦法重寫您的程式碼,使其不對順序敏感。例如,pandas 使用者經常呼叫對順序敏感的 head 操作來查看資料,但如果您只想檢視元素子集,您也可以使用 sample,這不需要您先收集資料。同樣地,您可以使用 nlargest 而不是 sort_values(...)

產生延遲純量的操作

某些 DataFrame 操作會產生延遲純量。在 Beam 中,實際值的計算會被延遲,因此這些值無法用於控制流程。例如,您可以使用 Series.sum 計算總和,但您無法立即根據結果進行分支,因為結果資料不會立即提供。Series.is_unique 是一個類似的範例。將延遲純量用於分支邏輯或真值測試會引發 TypeError

尚未實作的操作

Beam DataFrame API 實作了許多常用的 pandas DataFrame 操作,而且我們正在積極努力支援其餘的操作。但是,pandas 具有龐大的 API,而且仍然存在差距 (Issue 20318)。如果您調用尚未實作的操作,它會引發 NotImplementedError。如果您遇到您認為應該優先處理的遺失操作,請告訴我們

使用 Interactive Beam 存取完整的 pandas API

Interactive Beam 是一個設計用於互動式筆記本中的模組。這個依慣例匯入為 ib 的模組,提供了一個 ib.collect 函數,該函數會將 PCollection 或延遲的 DataFrame 作為 pandas DataFrame 帶入本機記憶體。在使用 ib.collect 來具體化延遲的 DataFrame 之後,您將能夠執行 pandas API 中的任何操作,而不僅僅是 Beam 中支援的操作。

在 Colab 中執行 在 Colab 中執行





若要開始在筆記本中使用 Beam,請參閱試用 Apache Beam