ParDo
![]() |
用於一般平行處理的轉換。ParDo
轉換會考慮輸入 PCollection
中的每個元素,對該元素執行一些處理函數 (您的使用者程式碼),並將零或多個元素發射到輸出 PCollection
。
請參閱 Beam 程式設計指南以了解更多資訊。
範例
在以下範例中,我們將探索如何建立自訂 DoFn
並存取時間戳記和視窗資訊。
範例 1:使用簡單 DoFn 的 ParDo
以下範例定義了一個簡單的 DoFn
類別,名為 SplitWords
,它將 delimiter
儲存為物件欄位。process
方法會針對每個元素呼叫一次,並且可以產生零或多個輸出元素。
範例 2:具有時間戳記和視窗資訊的 ParDo
在此範例中,我們將新參數新增至 process
方法,以便在執行時間繫結參數值。
beam.DoFn.TimestampParam
將時間戳記資訊繫結為apache_beam.utils.timestamp.Timestamp
物件。beam.DoFn.WindowParam
將視窗資訊繫結為適當的apache_beam.transforms.window.*Window
物件。
範例 3:具有 DoFn 方法的 ParDo
DoFn
可以使用許多方法自訂,這些方法可以協助建立更複雜的行為。您可以使用 setup
和 teardown
自訂 worker 在啟動和關閉時所執行的動作。您也可以自訂在元素套件開始和完成時所執行的動作,使用 start_bundle
和 finish_bundle
。
DoFn.setup()
:每當 worker 上還原序列化DoFn
執行個體時呼叫。這表示每個 worker 可以呼叫多次,因為可能會建立給定DoFn
子類別的多個執行個體 (例如,由於平行化,或由於閒置一段時間後進行垃圾收集)。這是連線至資料庫執行個體、開啟網路連線或其他資源的好地方。另請參閱DoFn.SetupContextParam
,以了解如何透過內容管理員完成此作業。DoFn.start_bundle()
:在針對套件中的第一個元素呼叫process
之前,針對每個元素套件呼叫一次。這是開始追蹤套件元素的好地方。另請參閱DoFn.BundleContextParam
,以了解如何透過內容管理員完成此作業。DoFn.process(element, *args, **kwargs)
:針對每個元素呼叫一次,可以產生零或多個元素。其他*args
或**kwargs
可以透過beam.ParDo()
傳遞。[必要]DoFn.finish_bundle()
:在針對套件中的最後一個元素呼叫process
之後,針對每個元素套件呼叫一次,可以產生零或多個元素。這是對套件中的元素執行批次呼叫的好地方,例如執行資料庫查詢。例如,您可以在
start_bundle
中初始化批次,將元素新增至process
中的批次,而不是產生它們,然後在finish_bundle
上對這些元素執行批次查詢,並產生所有結果。請注意,從
finish_bundle
產生的元素必須為apache_beam.utils.windowed_value.WindowedValue
類型。您需要以 Unix 時間戳記的形式提供時間戳記,您可以從相關的已處理元素取得時間戳記。您也需要提供視窗,您可以從相關的已處理元素取得視窗,如下例所示。DoFn.teardown()
:當DoFn
執行個體關閉時,每個DoFn
執行個體呼叫一次 (盡可能)。這是關閉資料庫執行個體、關閉網路連線或其他資源的好地方。請注意,
teardown
會盡可能呼叫,且不保證。例如,如果 worker 損毀,則可能不會呼叫teardown
。
已知問題
- [Issue 19394]
DoFn.teardown()
指標遺失。
相關轉換
![]() |
上次更新於 2024/10/31
您是否找到您要找的所有內容?
它是否都實用且清楚?是否有任何您想要變更的內容?請告訴我們!