為 Java 開發 I/O 連接器
重要:使用 Splittable DoFn
開發新的 I/O。如需更多詳細資訊,請閱讀新的 I/O 連接器概述。
若要連線到 Beam 現有 I/O 連接器不支援的資料儲存區,您必須建立自訂 I/O 連接器,該連接器通常包含來源和接收器。所有 Beam 來源和接收器都是複合轉換;然而,您的自訂 I/O 的實作取決於您的使用案例。在開始之前,請閱讀新的 I/O 連接器概述,以了解開發新 I/O 連接器、可用的實作選項,以及如何為您的使用案例選擇正確選項的概述。
本指南涵蓋如何使用 Java 的 Source
和 FileBasedSink
介面。Python SDK 提供相同的功能,但使用略有不同的 API。請參閱為 Python 開發 I/O 連接器,以取得特定於 Python SDK 的資訊。
基本程式碼需求
Beam 執行器使用您提供的類別,以平行方式使用多個工作執行個體讀取和/或寫入資料。因此,您為 Source
和 FileBasedSink
子類別提供的程式碼必須符合一些基本需求
可序列化性:您的
Source
或FileBasedSink
子類別,無論是有界或無界,都必須是可序列化的。執行器可能會建立多個Source
或FileBasedSink
子類別的執行個體,以傳送給多個遠端工作站,以便平行讀取或寫入。不可變性:您的
Source
或FileBasedSink
子類別必須是有效不可變的。所有私有欄位都必須宣告為 final,且所有集合類型的私有變數都必須是有效不可變的。如果您的類別有 setter 方法,這些方法必須傳回已修改相關欄位的物件的獨立副本。只有在您使用需要實作來源或接收器的昂貴運算的延遲評估時,才應在
Source
或FileBasedSink
子類別中使用可變狀態;在這種情況下,您必須宣告所有可變的執行個體變數為 transient。執行緒安全性:您的程式碼必須是執行緒安全的。如果您建置來源以與動態工作重新平衡搭配使用,則讓您的程式碼成為執行緒安全至關重要。Beam SDK 提供輔助類別來簡化此操作。如需更多詳細資訊,請參閱將您的 BoundedSource 與動態工作重新平衡搭配使用。
可測試性:徹底單元測試您的所有
Source
和FileBasedSink
子類別至關重要,尤其是當您建置類別以與動態工作重新平衡等進階功能搭配使用時。輕微的實作錯誤可能會導致難以偵測到的資料損毀或資料遺失 (例如跳過或重複記錄)。為了協助測試
BoundedSource
實作,您可以使用 SourceTestUtils 類別。SourceTestUtils
包含自動驗證BoundedSource
實作某些屬性的公用程式。您可以使用SourceTestUtils
,透過相對較少的程式碼行,使用各種輸入來提高實作的測試覆蓋率。如需使用SourceTestUtils
的範例,請參閱AvroSourceTest 和TextIOReadTest 原始碼。
此外,請參閱PTransform 樣式指南以取得 Beam 的轉換樣式指引。
實作 Source 介面
若要為您的管線建立資料來源,您必須提供格式特定的邏輯,告訴執行器如何從您的輸入來源讀取資料,以及如何將您的資料來源分割成多個部分,以便多個工作執行個體可以平行讀取您的資料。如果您要建立讀取無界資料的資料來源,您必須提供額外的邏輯來管理您來源的浮水印和選用的檢查點。
透過建立下列類別來提供您來源的邏輯
如果您想要讀取有限 (批次) 資料集,請建立
BoundedSource
的子類別;如果您想要讀取無限 (串流) 資料集,則請建立UnboundedSource
的子類別。這些子類別會描述您想要讀取的資料,包括資料的位置和參數 (例如,要讀取的資料量)。Source.Reader
的子類別。每個來源都必須有相關聯的 Reader,以擷取從該Source
讀取時所涉及的所有狀態。這可以包含檔案控制代碼、RPC 連線和其他取決於您要讀取之資料格式特定需求的參數。Reader
類別階層會鏡像 Source 階層。如果您正在擴充BoundedSource
,則需要提供相關聯的BoundedReader
。如果您正在擴充UnboundedSource
,則需要提供相關聯的UnboundedReader
。一或多個面向使用者的包裝複合轉換 (
PTransform
),會包裝讀取作業。PTransform 包裝函式討論為何您應避免公開您的來源。
實作 Source 子類別
您必須建立 BoundedSource
或 UnboundedSource
的子類別,具體取決於您的資料是有限批次還是無限串流。在任何一種情況下,您的 Source
子類別都必須覆寫超類別中的抽象方法。當使用您的資料來源時,執行器可能會呼叫這些方法。例如,當從有限來源讀取時,執行器會使用這些方法來估計您的資料集大小並將其分割以進行平行讀取。
您的 Source
子類別也應管理有關您資料來源的基本資訊,例如位置。例如,Beam 的 DatastoreIO 類別中的範例 Source
實作會將主機、datasetID 和查詢作為引數。連接器會使用這些值從 Cloud Datastore 取得資料。
BoundedSource
BoundedSource
代表一個有限的資料集,Beam 執行器可能會從中讀取,並且可能以平行方式進行。BoundedSource
包含一組抽象方法,執行器會使用這些方法來分割資料集,以便由多個工作程序讀取。
若要實作 BoundedSource
,您的子類別必須覆寫下列抽象方法:
split
:執行器會使用此方法將您的有限資料分割成給定大小的捆綁包。getEstimatedSizeBytes
:執行器會使用此方法來估計您的資料總大小,以位元組為單位。createReader
:為此BoundedSource
建立相關聯的BoundedReader
。
您可以在 Beam 針對 Cloud BigTable (BigtableIO.java) 和 BigQuery (BigQuerySourceBase.java) 的實作中,查看如何實作 BoundedSource
和所需抽象方法的模型。
UnboundedSource
UnboundedSource
代表執行器可能會從中讀取的無限資料串流,並且可能以平行方式進行。UnboundedSource
包含一組抽象方法,執行器會使用這些方法來支援平行串流讀取;這些方法包括用於失敗復原的檢查點、用於防止資料重複的記錄 ID,以及用於估計管線下游部分資料完整性的浮水印。
若要實作 UnboundedSource
,您的子類別必須覆寫下列抽象方法:
split
:執行器會使用此方法產生UnboundedSource
物件的清單,這些物件代表服務應從中平行讀取的子串流實例數量。getCheckpointMarkCoder
:執行器會使用此方法取得您來源檢查點的編碼器 (如果有的話)。requiresDeduping
:執行器會使用此方法來判斷資料是否需要明確移除重複記錄。如果此方法傳回 true,則執行器會自動插入一個步驟以從您來源的輸出中移除重複項。僅當您的來源為每個記錄提供記錄 ID 時,此方法才應傳回 true。請參閱UnboundedReader.getCurrentRecordId
,以了解何時應執行此操作。createReader
:為此UnboundedSource
建立相關聯的UnboundedReader
。
實作 Reader 子類別
您必須建立 BoundedReader
或 UnboundedReader
的子類別,以由您來源子類別的 createReader
方法傳回。執行器會使用您的 Reader
(無論是有界還是無界) 中的方法來實際讀取您的資料集。
BoundedReader
和 UnboundedReader
具有類似的基本介面,您需要定義這些介面。此外,還有一些 UnboundedReader
特有的其他方法,您需要實作這些方法才能處理無界資料,以及一個可選方法,如果您希望您的 BoundedReader
利用動態工作重新平衡,則可以實作該方法。當使用 UnboundedReader
時,start()
和 advance()
方法的語意也存在細微差異。
BoundedReader 和 UnboundedReader 共用的 Reader 方法
執行器會使用下列方法來讀取使用 BoundedReader
或 UnboundedReader
的資料:
start
:初始化Reader
並前進到要讀取的第一筆記錄。當執行器開始讀取您的資料時,此方法只會被呼叫一次,並且是放置初始化所需昂貴操作的好地方。advance
:將讀取器前進到下一個有效的記錄。如果沒有更多輸入可用,則此方法必須傳回 false。BoundedReader
在 advance 傳回 false 後應停止讀取,但UnboundedReader
可以在您的串流中有更多資料可用時,在未來的呼叫中傳回 true。getCurrent
:傳回目前位置的資料記錄,上次由 start 或 advance 讀取。getCurrentTimestamp
:傳回目前資料記錄的時間戳記。僅當您的來源讀取具有內在時間戳記的資料時,您才需要覆寫getCurrentTimestamp
。執行器會使用此值來設定產生的輸出PCollection
中每個元素的內在時間戳記。
UnboundedReader 獨有的 Reader 方法
除了基本的 Reader
介面之外,UnboundedReader
還有一些用於管理從無界資料來源讀取的其他方法:
getCurrentRecordId
:傳回目前記錄的唯一識別碼。執行器會使用這些記錄 ID 來篩選掉重複記錄。如果您的資料在每個記錄中都有邏輯 ID,您可以讓此方法傳回它們;否則,您可以使用至少 128 位元雜湊傳回記錄內容的雜湊。使用 Java 的Object.hashCode()
是不正確的,因為 32 位元雜湊通常不足以防止衝突,而且無法保證hasCode()
在跨程序之間保持穩定。如果您的來源使用唯一識別每個記錄的檢查點機制,則實作
getCurrentRecordId
是可選的。例如,如果您的分割是檔案,而檢查點是已讀取所有資料的檔案位置,則您不需要記錄 ID。但是,如果上游系統將資料寫入您的來源時偶爾會產生重複記錄,而您的來源可能會讀取這些記錄,則記錄 ID 仍然很有用。getWatermark
:傳回您的Reader
提供的浮水印。浮水印是您的Reader
要讀取的未來元素的近似時間戳記下限。執行器會使用浮水印來估計資料的完整性。浮水印用於視窗和觸發器中。getCheckpointMark
:執行器會使用此方法在您的資料串流中建立檢查點。檢查點表示UnboundedReader
的進度,可用於失敗復原。不同的資料串流可能會使用不同的檢查點方法;某些來源可能需要確認接收到的記錄,而其他來源可能會使用位置檢查點。您需要針對最合適的檢查點機制調整此方法。例如,您可能會讓此方法傳回最近已確認的記錄。getCheckpointMark
是可選的;如果您的資料沒有有意義的檢查點,則您不需要實作它。但是,如果您選擇不在您的來源中實作檢查點,則您的管線中可能會遇到重複的資料或資料遺失,具體取決於您的資料來源是否嘗試在發生錯誤時重新傳送記錄。
您可以透過在從您的來源讀取時指定 .withMaxNumRecords
或 .withMaxReadTime
,從 UnboundedSource
讀取有界的 PCollection
。.withMaxNumRecords
會從您的無界來源讀取固定的最大記錄數,而 .withMaxReadTime
會從您的無界來源讀取固定的最大時間長度。
將您的 BoundedSource 與動態工作重新平衡搭配使用
如果您的來源提供有界資料,您可以透過實作方法 splitAtFraction
,讓您的 BoundedReader
使用動態工作重新平衡。執行器可能會在給定讀取器上與 start 或 advance 同時呼叫 splitAtFraction
,以便可以分割您 Source
中的剩餘資料並將其重新分配給其他工作程序。
當您實作 splitAtFraction
時,您的程式碼必須產生一組互斥的分割,其中這些分割的聯集符合總資料集。
如果您實作 splitAtFraction
,則您必須以執行緒安全的方式實作 splitAtFraction
和 getFractionConsumed
,否則可能會發生資料遺失。您也應徹底單元測試您的實作,以避免資料重複或資料遺失。
為了確保您的程式碼是執行緒安全的,請在使用 splitAtFraction
和 getFractionConsumed
時,使用 RangeTracker
執行緒安全協助程式物件來管理資料來源中的位置。
我們強烈建議您使用 SourceTestUtils
類別來單元測試您的 splitAtFraction
實作。SourceTestUtils
包含許多用於測試您 splitAtFraction
實作的方法,包括詳盡的自動測試。
便利的 Source 和 Reader 基底類別
Beam SDK 包含一些方便的抽象基底類別,可協助您建立使用通用資料儲存格式 (例如檔案) 的 Source
和 Reader
類別。
FileBasedSource
如果您的資料來源使用檔案,您可以從 FileBasedSource
和 FileBasedReader
抽象基底類別衍生您的 Source
和 Reader
類別。FileBasedSource
是一個有限的來源子類別,它實作與檔案互動的 Beam 來源的通用程式碼,包括:
- 檔案模式擴充
- 循序記錄讀取
- 分割點
使用 FileBasedSink 抽象
如果您的資料來源使用檔案,您可以實作 FileBasedSink
抽象來建立基於檔案的接收器。對於其他接收器,請使用 ParDo
、GroupByKey
以及 Beam SDK for Java 提供的其他轉換。如需詳細資訊,請參閱開發 I/O 連接器概觀。
當使用 FileBasedSink
介面時,您必須提供格式特定的邏輯,告訴執行器如何將您的管線 PCollection
中的有界資料寫入輸出接收器。執行器會使用多個工作程序以平行方式寫入資料捆綁包。
透過實作下列類別,提供您基於檔案的接收器的邏輯:
抽象基底類別
FileBasedSink
的子類別。FileBasedSink
描述您的管線可以平行寫入的位置或資源。為了避免將您的接收器暴露給最終使用者,您的FileBasedSink
子類別應受到保護或設為私有。面向使用者的包裝
PTransform
,作為邏輯的一部分,它會呼叫 WriteFiles 並將您的FileBasedSink
作為參數傳遞。使用者不應需要直接呼叫WriteFiles
。
FileBasedSink
抽象基底類別實作 Beam 接收器與檔案互動的通用程式碼,包括:
- 設定檔案標頭和頁尾
- 循序記錄寫入
- 設定輸出 MIME 類型
FileBasedSink
及其子類別支援將檔案寫入任何 Beam 支援的 FileSystem
實作。請參閱以下 Beam 提供的 FileBasedSink
實作範例:
PTransform 包裝函式
當您建立終端使用者會使用的來源或接收器時,請避免暴露您的來源或接收器程式碼。為了避免將您的來源和接收器暴露給終端使用者,您新的類別應該設定為 protected 或 private。然後,實作一個使用者導向的包裝器 PTransform
。透過將您的來源或接收器以轉換的形式暴露,您的實作會被隱藏起來,並且可以任意地複雜或簡單。不暴露實作細節的最大好處是,您可以在不破壞使用者現有實作的情況下,稍後新增額外的功能。
例如,如果您的使用者管道使用 read
從您的來源讀取,並且您想在管道中插入一個重新分片 (reshard),則所有使用者都需要自行新增重新分片(使用 GroupByKey
轉換)。為了解決這個問題,我們建議您將來源公開為一個複合的 PTransform
,它會執行讀取操作和重新分片。
有關使用 PTransform
進行包裝的更多資訊,請參閱 Beam 的 PTransform 風格指南。
上次更新時間:2024/10/31
您是否找到了所有您需要的資訊?
內容是否都實用且清晰?您希望更改任何內容嗎?請告訴我們!