為 Java 開發 I/O 連接器

重要:使用 Splittable DoFn 開發新的 I/O。如需更多詳細資訊,請閱讀新的 I/O 連接器概述

若要連線到 Beam 現有 I/O 連接器不支援的資料儲存區,您必須建立自訂 I/O 連接器,該連接器通常包含來源和接收器。所有 Beam 來源和接收器都是複合轉換;然而,您的自訂 I/O 的實作取決於您的使用案例。在開始之前,請閱讀新的 I/O 連接器概述,以了解開發新 I/O 連接器、可用的實作選項,以及如何為您的使用案例選擇正確選項的概述。

本指南涵蓋如何使用 Java 的 SourceFileBasedSink 介面。Python SDK 提供相同的功能,但使用略有不同的 API。請參閱為 Python 開發 I/O 連接器,以取得特定於 Python SDK 的資訊。

基本程式碼需求

Beam 執行器使用您提供的類別,以平行方式使用多個工作執行個體讀取和/或寫入資料。因此,您為 SourceFileBasedSink 子類別提供的程式碼必須符合一些基本需求

  1. 可序列化性:您的 SourceFileBasedSink 子類別,無論是有界或無界,都必須是可序列化的。執行器可能會建立多個 SourceFileBasedSink 子類別的執行個體,以傳送給多個遠端工作站,以便平行讀取或寫入。

  2. 不可變性:您的 SourceFileBasedSink 子類別必須是有效不可變的。所有私有欄位都必須宣告為 final,且所有集合類型的私有變數都必須是有效不可變的。如果您的類別有 setter 方法,這些方法必須傳回已修改相關欄位的物件的獨立副本。

    只有在您使用需要實作來源或接收器的昂貴運算的延遲評估時,才應在 SourceFileBasedSink 子類別中使用可變狀態;在這種情況下,您必須宣告所有可變的執行個體變數為 transient。

  3. 執行緒安全性:您的程式碼必須是執行緒安全的。如果您建置來源以與動態工作重新平衡搭配使用,則讓您的程式碼成為執行緒安全至關重要。Beam SDK 提供輔助類別來簡化此操作。如需更多詳細資訊,請參閱將您的 BoundedSource 與動態工作重新平衡搭配使用

  4. 可測試性:徹底單元測試您的所有 SourceFileBasedSink 子類別至關重要,尤其是當您建置類別以與動態工作重新平衡等進階功能搭配使用時。輕微的實作錯誤可能會導致難以偵測到的資料損毀或資料遺失 (例如跳過或重複記錄)。

    為了協助測試 BoundedSource 實作,您可以使用 SourceTestUtils 類別。SourceTestUtils 包含自動驗證 BoundedSource 實作某些屬性的公用程式。您可以使用 SourceTestUtils,透過相對較少的程式碼行,使用各種輸入來提高實作的測試覆蓋率。如需使用 SourceTestUtils 的範例,請參閱AvroSourceTestTextIOReadTest 原始碼。

此外,請參閱PTransform 樣式指南以取得 Beam 的轉換樣式指引。

實作 Source 介面

若要為您的管線建立資料來源,您必須提供格式特定的邏輯,告訴執行器如何從您的輸入來源讀取資料,以及如何將您的資料來源分割成多個部分,以便多個工作執行個體可以平行讀取您的資料。如果您要建立讀取無界資料的資料來源,您必須提供額外的邏輯來管理您來源的浮水印和選用的檢查點。

透過建立下列類別來提供您來源的邏輯

實作 Source 子類別

您必須建立 BoundedSourceUnboundedSource 的子類別,具體取決於您的資料是有限批次還是無限串流。在任何一種情況下,您的 Source 子類別都必須覆寫超類別中的抽象方法。當使用您的資料來源時,執行器可能會呼叫這些方法。例如,當從有限來源讀取時,執行器會使用這些方法來估計您的資料集大小並將其分割以進行平行讀取。

您的 Source 子類別也應管理有關您資料來源的基本資訊,例如位置。例如,Beam 的 DatastoreIO 類別中的範例 Source 實作會將主機、datasetID 和查詢作為引數。連接器會使用這些值從 Cloud Datastore 取得資料。

BoundedSource

BoundedSource 代表一個有限的資料集,Beam 執行器可能會從中讀取,並且可能以平行方式進行。BoundedSource 包含一組抽象方法,執行器會使用這些方法來分割資料集,以便由多個工作程序讀取。

若要實作 BoundedSource,您的子類別必須覆寫下列抽象方法:

您可以在 Beam 針對 Cloud BigTable (BigtableIO.java) 和 BigQuery (BigQuerySourceBase.java) 的實作中,查看如何實作 BoundedSource 和所需抽象方法的模型。

UnboundedSource

UnboundedSource 代表執行器可能會從中讀取的無限資料串流,並且可能以平行方式進行。UnboundedSource 包含一組抽象方法,執行器會使用這些方法來支援平行串流讀取;這些方法包括用於失敗復原的檢查點、用於防止資料重複的記錄 ID,以及用於估計管線下游部分資料完整性的浮水印

若要實作 UnboundedSource,您的子類別必須覆寫下列抽象方法:

實作 Reader 子類別

您必須建立 BoundedReaderUnboundedReader 的子類別,以由您來源子類別的 createReader 方法傳回。執行器會使用您的 Reader (無論是有界還是無界) 中的方法來實際讀取您的資料集。

BoundedReaderUnboundedReader 具有類似的基本介面,您需要定義這些介面。此外,還有一些 UnboundedReader 特有的其他方法,您需要實作這些方法才能處理無界資料,以及一個可選方法,如果您希望您的 BoundedReader 利用動態工作重新平衡,則可以實作該方法。當使用 UnboundedReader 時,start()advance() 方法的語意也存在細微差異。

BoundedReader 和 UnboundedReader 共用的 Reader 方法

執行器會使用下列方法來讀取使用 BoundedReaderUnboundedReader 的資料:

UnboundedReader 獨有的 Reader 方法

除了基本的 Reader 介面之外,UnboundedReader 還有一些用於管理從無界資料來源讀取的其他方法:

您可以透過在從您的來源讀取時指定 .withMaxNumRecords.withMaxReadTime,從 UnboundedSource 讀取有界的 PCollection.withMaxNumRecords 會從您的無界來源讀取固定的最大記錄數,而 .withMaxReadTime 會從您的無界來源讀取固定的最大時間長度。

將您的 BoundedSource 與動態工作重新平衡搭配使用

如果您的來源提供有界資料,您可以透過實作方法 splitAtFraction,讓您的 BoundedReader 使用動態工作重新平衡。執行器可能會在給定讀取器上與 start 或 advance 同時呼叫 splitAtFraction,以便可以分割您 Source 中的剩餘資料並將其重新分配給其他工作程序。

當您實作 splitAtFraction 時,您的程式碼必須產生一組互斥的分割,其中這些分割的聯集符合總資料集。

如果您實作 splitAtFraction,則您必須以執行緒安全的方式實作 splitAtFractiongetFractionConsumed,否則可能會發生資料遺失。您也應徹底單元測試您的實作,以避免資料重複或資料遺失。

為了確保您的程式碼是執行緒安全的,請在使用 splitAtFractiongetFractionConsumed 時,使用 RangeTracker 執行緒安全協助程式物件來管理資料來源中的位置。

我們強烈建議您使用 SourceTestUtils 類別來單元測試您的 splitAtFraction 實作。SourceTestUtils 包含許多用於測試您 splitAtFraction 實作的方法,包括詳盡的自動測試。

便利的 Source 和 Reader 基底類別

Beam SDK 包含一些方便的抽象基底類別,可協助您建立使用通用資料儲存格式 (例如檔案) 的 SourceReader 類別。

FileBasedSource

如果您的資料來源使用檔案,您可以從 FileBasedSourceFileBasedReader 抽象基底類別衍生您的 SourceReader 類別。FileBasedSource 是一個有限的來源子類別,它實作與檔案互動的 Beam 來源的通用程式碼,包括:

使用 FileBasedSink 抽象

如果您的資料來源使用檔案,您可以實作 FileBasedSink 抽象來建立基於檔案的接收器。對於其他接收器,請使用 ParDoGroupByKey 以及 Beam SDK for Java 提供的其他轉換。如需詳細資訊,請參閱開發 I/O 連接器概觀

當使用 FileBasedSink 介面時,您必須提供格式特定的邏輯,告訴執行器如何將您的管線 PCollection 中的有界資料寫入輸出接收器。執行器會使用多個工作程序以平行方式寫入資料捆綁包。

透過實作下列類別,提供您基於檔案的接收器的邏輯:

FileBasedSink 抽象基底類別實作 Beam 接收器與檔案互動的通用程式碼,包括:

FileBasedSink 及其子類別支援將檔案寫入任何 Beam 支援的 FileSystem 實作。請參閱以下 Beam 提供的 FileBasedSink 實作範例:

PTransform 包裝函式

當您建立終端使用者會使用的來源或接收器時,請避免暴露您的來源或接收器程式碼。為了避免將您的來源和接收器暴露給終端使用者,您新的類別應該設定為 protected 或 private。然後,實作一個使用者導向的包裝器 PTransform。透過將您的來源或接收器以轉換的形式暴露,您的實作會被隱藏起來,並且可以任意地複雜或簡單。不暴露實作細節的最大好處是,您可以在不破壞使用者現有實作的情況下,稍後新增額外的功能。

例如,如果您的使用者管道使用 read 從您的來源讀取,並且您想在管道中插入一個重新分片 (reshard),則所有使用者都需要自行新增重新分片(使用 GroupByKey 轉換)。為了解決這個問題,我們建議您將來源公開為一個複合的 PTransform,它會執行讀取操作和重新分片。

有關使用 PTransform 進行包裝的更多資訊,請參閱 Beam 的 PTransform 風格指南