概述:開發新的 I/O 連接器
針對需要連線至 內建 I/O 連接器 不支援的資料儲存區的使用者指南
若要連線至 Beam 現有 I/O 連接器不支援的資料儲存區,您必須建立自訂 I/O 連接器。連接器通常包含來源和接收器。所有 Beam 來源和接收器都是複合轉換;不過,自訂 I/O 的實作取決於您的使用案例。以下是開始使用的建議步驟
閱讀此概述並選擇您的實作。如有任何問題,可以寄電子郵件至 Beam 開發郵件清單。此外,您可以檢查是否有人正在處理相同的 I/O 連接器。
如果您計畫將 I/O 連接器貢獻給 Beam 社群,請參閱 Apache Beam 貢獻指南。
請閱讀 PTransform 樣式指南,以取得其他樣式指南建議。
來源
針對有界限(批次)來源,目前有兩種選項可建立 Beam 來源
使用
Splittable DoFn
。使用
ParDo
和GroupByKey
。
Splittable DoFn
是建議的選項,因為它是最新的有界限和無界限來源的來源架構。這表示在新的系統中取代 Source
API ( BoundedSource 和 UnboundedSource)。請閱讀 可分割 DoFn 程式設計指南,以了解如何撰寫一個可分割 DoFn。如需詳細資訊,請參閱多 SDK 連接器工作的路線圖。
針對 Java 和 Python 無界限 (串流) 來源,您必須使用 Splittable DoFn
,它支援對串流管道有用的功能,包括檢查點、控制浮水印和追蹤待辦項目。
何時使用可分割 DoFn 介面
如果您不確定是否要使用 Splittable DoFn
,請隨時寄電子郵件至 Beam 開發郵件清單,我們可以討論您案例的具體優缺點。
在某些情況下,實作 Splittable DoFn
可能有必要或會帶來較佳的效能
無界限來源:
ParDo
不適用於從無界限來源讀取。ParDo
不支援檢查點或類似於對串流資料來源有用的重複資料刪除機制。進度和大小估計:
ParDo
無法向執行器提供關於進度或正在讀取資料大小的提示。如果沒有資料的大小估計或讀取進度,執行器就沒有任何方法可以猜測您的讀取有多大。因此,如果執行器嘗試動態配置工作站,則沒有關於您管道可能需要多少工作站的線索。動態工作重新平衡:
ParDo
不支援動態工作重新平衡,有些讀取器會使用動態工作重新平衡來改善作業的處理速度。根據您的資料來源,動態工作重新平衡可能無法實現。初始分割以增加平行處理:
ParDo
沒有執行初始分割的能力。
例如,如果您想要從每個檔案包含許多記錄的新檔案格式讀取,或者如果您想要從支援依排序索引鍵順序讀取作業的索引鍵值儲存區讀取。
使用 SDF 的 I/O 範例
Java 範例
- Kafka:Apache Kafka (開放原始碼分散式事件串流平台) 的 I/O 連接器。
- Watch:使用輪詢函數,為每個輸入產生一組不斷增加的輸出,直到滿足每個輸入的終止條件。
- Parquet:Apache Parquet (開放原始碼欄狀儲存格式) 的 I/O 連接器。
- HL7v2:HL7v2 訊息 (提供組織內部發生的事件相關資料的臨床訊息格式) 的 I/O 連接器,為 Google 的 Cloud Healthcare API 的一部分。
- BoundedSource 包裝函式:將現有 BoundedSource 實作轉換為可分割 DoFn 的包裝函式。
- UnboundedSource 包裝器:一個包裝器,將現有的 UnboundedSource 實作轉換為可分割的 DoFn。
Python 範例
- BoundedSourceWrapper:一個包裝器,將現有的 BoundedSource 實作轉換為可分割的 DoFn。
使用 ParDo 和 GroupByKey
對於可以平行讀取資料的資料儲存或檔案類型,您可以將此過程視為一個小型管道。這通常包含兩個步驟
將資料分割成數個部分以平行讀取
從每個部分讀取資料
每個步驟都會是一個 ParDo
,中間夾著一個 GroupByKey
。GroupByKey
是一個實作細節,但對於大多數執行器來說,GroupByKey
允許執行器在某些情況下使用不同數量的 worker。
決定如何分割要讀取的資料成數個區塊
讀取資料,這通常能從更多 worker 中獲益
此外,GroupByKey
也允許在支援此功能的執行器上進行動態工作重新平衡。
以下是一些讀取轉換實作的範例,這些實作在可以平行讀取資料時使用「以小型管道方式讀取」模型
從檔案 glob 讀取:例如,讀取 “~/data/**” 中的所有檔案。
- 取得檔案路徑
ParDo
:以檔案 glob 作為輸入。產生一個字串的PCollection
,每個字串都是一個檔案路徑。 - 讀取
ParDo
:給定檔案路徑的PCollection
,讀取每個路徑,產生一個記錄的PCollection
。
- 取得檔案路徑
從 NoSQL 資料庫讀取(例如 Apache HBase):這些資料庫通常允許平行讀取範圍。
- 決定鍵範圍
ParDo
:以資料庫的連線資訊和要讀取的鍵範圍作為輸入。產生一個可以有效平行讀取的鍵範圍的PCollection
。 - 讀取鍵範圍
ParDo
:給定鍵範圍的PCollection
,讀取鍵範圍,產生一個記錄的PCollection
。
- 決定鍵範圍
對於無法平行讀取的資料儲存或檔案,讀取是一個簡單的任務,可以使用單個 ParDo
+GroupByKey
完成。例如
從資料庫查詢讀取:傳統 SQL 資料庫查詢通常只能依序讀取。在這種情況下,
ParDo
將建立與資料庫的連線並讀取批次的記錄,產生這些記錄的PCollection
。從 gzip 檔案讀取:gzip 檔案必須依序讀取,因此無法平行化讀取。在這種情況下,
ParDo
將開啟檔案並依序讀取,產生來自檔案的記錄的PCollection
。
接收器
要建立 Beam sink,我們建議您使用將接收到的記錄寫入資料儲存的 ParDo
。要開發更複雜的 sink(例如,在執行器重試失敗時支援資料去重複),請使用 ParDo
、GroupByKey
和其他可用的 Beam 轉換。許多資料服務都最佳化為一次寫入批次的元素,因此在寫入之前將元素分組為批次可能是有意義的。持續性連線也可以在 DoFn 的 setUp
或 startBundle
方法中初始化,而不是在收到每個元素時初始化。還應注意,在大型分散式系統中,工作可能會 失敗和/或重試,因此盡可能使外部互動具有冪等性是較佳的。
對於基於檔案的 sink,您可以使用 Java 和 Python SDK 提供的 FileBasedSink
抽象。Beam 的 FileSystems
公用程式類別對於讀取和寫入檔案也很有用。請參閱我們的語言特定實作指南以了解更多詳細資訊
上次更新於 2024/10/31
您是否找到了您要尋找的所有內容?
所有內容是否有用且清楚?您想更改任何內容嗎?請告訴我們!