為 Python 開發 I/O 連接器
重要:請使用 Splittable DoFn
開發新的 I/O。如需更多詳細資訊,請閱讀新的 I/O 連接器概述。
若要連線到 Beam 現有 I/O 連接器不支援的資料儲存區,您必須建立自訂的 I/O 連接器,通常由來源和接收器組成。所有 Beam 來源和接收器都是複合轉換;但是,自訂 I/O 的實作取決於您的使用案例。在開始之前,請閱讀新的 I/O 連接器概述,以瞭解開發新的 I/O 連接器、可用的實作選項,以及如何為您的使用案例選擇正確的選項。
本指南涵蓋使用 Python 的Source 和 FileBasedSink 介面。Java SDK 提供相同的功能,但使用稍微不同的 API。如需 Java SDK 特有的資訊,請參閱為 Java 開發 I/O 連接器。
基本程式碼需求
Beam 執行器使用您提供的類別,以平行方式使用多個工作執行個體來讀取和/或寫入資料。因此,您為 Source
和 FileBasedSink
子類別提供的程式碼必須符合一些基本需求
可序列化:您的
Source
或FileBasedSink
子類別必須是可序列化的。服務可能會建立多個Source
或FileBasedSink
子類別的執行個體,並將其傳送至多個遠端工作者,以促進平行讀取或寫入。來源和接收器物件序列化的方式是執行器特定的。不可變性:您的
Source
或FileBasedSink
子類別必須實際上是不可變的。您只有在使用需要實作來源的昂貴計算的延遲評估時,才應該在Source
或FileBasedSink
子類別中使用可變狀態。執行緒安全:您的程式碼必須是執行緒安全的。適用於 Python 的 Beam SDK 提供
RangeTracker
類別,以簡化此操作。可測試性:詳盡地單元測試所有
Source
和FileBasedSink
子類別至關重要。小錯誤的實作可能會導致資料損毀或資料遺失 (例如略過或重複記錄),而且很難偵測到。您可以使用source_test_utils 模組中提供的測試工具和公用程式方法,來開發來源測試。
此外,請參閱PTransform 樣式指南,以瞭解 Beam 的轉換樣式指引。
實作 Source 介面
若要為管道建立新的資料來源,您需要提供特定格式的邏輯,以告知服務如何從輸入來源讀取資料,以及如何將資料來源分割成多個部分,以便多個工作執行個體可以平行讀取資料。
透過建立下列類別來提供新來源的邏輯
BoundedSource
的子類別。BoundedSource
是一個來源,會讀取有限數量的輸入記錄。該類別描述您想要讀取的資料,包括資料的位置和參數 (例如要讀取多少資料)。RangeTracker
的子類別。RangeTracker
是一個執行緒安全物件,用於管理給定位置類型的範圍。- 一或多個面向使用者包裝函式複合轉換 (
PTransform
),用於包裝讀取作業。PTransform 包裝函式討論為何應該避免公開來源,並逐步說明如何建立包裝函式。
您可以在apache_beam.io.iobase 模組中找到這些類別。
實作 BoundedSource 子類別
BoundedSource
代表服務讀取的有限資料集,可能會以平行方式讀取。BoundedSource
包含一組方法,服務會使用這些方法來分割資料集,以供多個遠端工作者讀取。
若要實作 BoundedSource
,您的子類別必須覆寫下列方法
estimate_size
:服務會使用此方法來估計您的資料總大小,以位元組為單位。此估計是以外部儲存大小為準,在執行解壓縮或其他處理之前。split
:服務會使用此方法將您的有限資料分割成給定大小的捆綁包。get_range_tracker
:服務會使用此方法來取得給定位置範圍的RangeTracker
,並使用這些資訊來報告進度並執行來源的動態分割。read
:此方法會傳回一個迭代器,該迭代器會根據給定的RangeTracker
物件所定義的邊界,從來源讀取資料。
實作 RangeTracker 子類別
RangeTracker
是一個執行緒安全物件,用於管理 BoundedSource
讀取器的目前範圍和目前位置,並保護對它們的並行存取。
若要實作 RangeTracker
,您應先熟悉下列定義
基於位置的來源 - 基於位置的來源可以使用有序類型的位置範圍來描述,而來源讀取的記錄可以使用該類型的位置來描述。例如,對於檔案中的記錄,位置可以是記錄的起始位元組偏移量。在這種情況下,記錄的位置類型為
long
。基於位置的來源的主要要求是結合律:讀取位置範圍「[A, B)」中的記錄和讀取位置範圍「[B, C)」中的記錄,應與讀取位置範圍「[A, C)」中的記錄得到相同的記錄,其中「A」<=「B」<=「C」。此屬性確保無論將位置範圍分割成多少個任意子範圍,它們描述的記錄總集保持不變。
另一個重要的屬性是來源的範圍如何與來源中記錄的位置相關。在許多來源中,每筆記錄都可以使用唯一的起始位置來識別。在這種情況下
- 來源「[A, B)」傳回的所有記錄都必須在此範圍內具有起始位置。
- 除了最後一筆記錄之外,所有記錄都應在此範圍內結束。最後一筆記錄可能會或可能不會延伸超出範圍的末端。
- 記錄不得重疊。
此類來源應將「讀取『[A, B)』」定義為「從『A』或之後開始的第一筆記錄讀取,直到但不包括『B』或之後開始的第一筆記錄」。
此類來源的一些範例包括從文字檔案讀取行或 CSV、從資料庫讀取鍵和值等。
分割點的概念允許擴展定義,以處理某些記錄無法使用唯一起始位置識別的來源。
分割點 - 分割點描述的是當從位置 A (含) 到無限大 (即 [A, 無限大)) 讀取範圍時,傳回的第一筆記錄。
某些來源可能具有無法直接定址的記錄。例如,假設一種由一連串壓縮區塊組成的檔案格式。可以為每個區塊指派一個偏移量,但如果不解壓縮該區塊,則無法直接定址區塊內的記錄。讓我們將這種假設格式稱為 CBF (壓縮區塊格式)。
許多此類格式仍然可以滿足結合律。例如,在 CBF 中,讀取 [A, B) 可能表示「讀取起始偏移量在 [A, B) 中的所有區塊中的所有記錄」。
為了支援此類複雜格式,Beam 引入了分割點的概念。如果存在位置 A,使得當讀取範圍 [A, 無限大) 時,該記錄是第一個傳回的記錄,則該記錄是一個分割點。在 CBF 中,唯一的分割點將是每個區塊中的第一筆記錄。
分割點允許我們在下列情況中定義記錄的位置和來源範圍的意義
- 對於位於分割點的記錄,其位置定義為最大的 A,使得讀取範圍為 [A, 無限大) 的來源會傳回此記錄。
- 其他記錄的位置僅需為非遞減即可。
- 讀取來源 [A, B) 必須傳回從 A 或之後的第一個分割點開始的記錄,直到但不包括 B 或之後的第一個分割點。特別是,這表示來源傳回的第一筆記錄必須始終是一個分割點。
- 分割點的位置必須是唯一的。
因此,對於將來源的完整範圍分解為位置範圍的任何情況,記錄的總集將是來源中的完整記錄集,而且每筆記錄都將只讀取一次。
已耗用位置 - 已耗用位置是指已讀取的記錄。
當讀取來源時,而且從來源讀取的記錄會傳遞到管道中的下游轉換時,我們說來源中的位置正在被耗用。當讀取器已讀取一筆記錄 (或向呼叫者承諾將傳回一筆記錄) 時,將記錄的起始位置 (含) 之前的位置視為已耗用。
動態分割只能在未耗用的位置發生。如果讀取器剛傳回檔案中偏移量 42 的記錄,則動態分割只能在偏移量 43 或之後發生。否則,該記錄可能會被讀取兩次 (由目前的讀取器和新工作的讀取器讀取)。
RangeTracker 方法
若要實作 RangeTracker
,您的子類別必須覆寫下列方法
start_position
:傳回目前範圍的起始位置 (含)。stop_position
:傳回目前範圍的結束位置 (不含)。try_claim
:此方法用於判斷分割點的記錄是否在範圍內。此方法應透過將上次耗用的位置更新為來源讀取之記錄的給定起始position
來修改RangeTracker
的內部狀態。如果給定位置落在目前範圍內,則此方法傳回 true。set_current_position
:此方法會將上次耗用的位置更新為來源讀取之記錄的給定起始位置。您可以針對不在分割點開始的記錄呼叫此方法,而且這應修改RangeTracker
的內部狀態。如果記錄在分割點開始,則您必須呼叫try_claim
,而不是此方法。position_at_fraction
:給定範圍 [0.0, 1.0) 內的比例,此方法將傳回與位置範圍 [self.start_position
,self.stop_position
) 相比,給定比例的位置。try_split
:此方法會嘗試將目前範圍分割成建議位置附近的兩個部分。允許在不同的位置分割,但在大多數情況下,它將在建議的位置分割。
此方法會將目前範圍 [self.start_position
, self.stop_position
) 分割成「主要」部分 [self.start_position
, split_position
) 和「剩餘」部分 [split_position
, self.stop_position
),前提是 split_position
尚未被耗用。
如果 split_position
已被耗用,則此方法會傳回 None
。否則,它會將目前範圍更新為主要範圍,並傳回一個元組 (split_position
, split_fraction
)。split_fraction
應為範圍 [self.start_position
, split_position
) 的大小與原始 (分割前) 範圍 [self.start_position
, self.stop_position
) 的大小相比的比例。
fraction_consumed
:傳回來源中已耗用位置的大約比例。
注意:類別 iobase.RangeTracker
的方法可能會由多個執行緒叫用,因此必須使此類別具有執行緒安全性,例如,使用單一鎖定物件。
方便的 Source 基底類別
適用於 Python 的 Beam SDK 包含一些方便的抽象基底類別,可協助您輕鬆建立新的來源。
FileBasedSource
FileBasedSource
是一個用於開發新檔案類型來源的架構。您可以從 FileBasedSource 類別衍生您的 BoundedSource
類別。
若要為新的檔案類型建立來源,您需要建立 FileBasedSource
的子類別。FileBasedSource
的子類別必須實作 FileBasedSource.read_records()
方法。
如需 FileBasedSource
的實作範例,請參閱 AvroSource。
從新的 Source 讀取
下列範例 CountingSource
示範 BoundedSource
的實作,並使用 SDK 提供的 RangeTracker
,稱為 OffsetRangeTracker
。
class CountingSource(iobase.BoundedSource):
def __init__(self, count):
self.records_read = Metrics.counter(self.__class__, 'recordsRead')
self._count = count
def estimate_size(self):
return self._count
def get_range_tracker(self, start_position, stop_position):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
return OffsetRangeTracker(start_position, stop_position)
def read(self, range_tracker):
for i in range(range_tracker.start_position(),
range_tracker.stop_position()):
if not range_tracker.try_claim(i):
return
self.records_read.inc()
yield i
def split(self, desired_bundle_size, start_position=None, stop_position=None):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._count
bundle_start = start_position
while bundle_start < stop_position:
bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
yield iobase.SourceBundle(
weight=(bundle_stop - bundle_start),
source=self,
start_position=bundle_start,
stop_position=bundle_stop)
bundle_start = bundle_stop
若要在您的管道中從來源讀取資料,請使用 Read
轉換
注意:當您建立終端使用者將會使用的來源時,我們建議您不要像上述範例中示範的那樣公開來源本身的程式碼。請改用包裝的 PTransform
。PTransform 包裝函式討論了為什麼您應該避免公開來源,並逐步說明如何建立包裝函式。
使用 FileBasedSink 抽象化
如果您的資料來源使用檔案,您可以實作 FileBasedSink 抽象,以建立基於檔案的接收器。對於其他接收器,請使用 ParDo
、GroupByKey
和適用於 Python 的 Beam SDK 提供的其他轉換。如需更多詳細資訊,請參閱開發 I/O 連接器總覽。
當使用 FileBasedSink
介面時,您必須提供格式特定的邏輯,以告知執行器如何將管道 PCollection
中的有界資料寫入輸出接收器。執行器會使用多個工作站平行寫入資料捆綁包。
藉由實作下列類別,為您的基於檔案的接收器提供邏輯
抽象基底類別
FileBasedSink
的子類別。FileBasedSink
描述您的管道可以平行寫入的位置或資源。為避免將接收器公開給終端使用者,請在建立FileBasedSink
子類別時使用_
字首。使用者面向的包裝函式
PTransform
,其會將Write
作為邏輯的一部分進行呼叫,並將您的FileBasedSink
作為參數傳遞。使用者不應需要直接呼叫Write
。
FileBasedSink
抽象基底類別會實作與和檔案互動的 Beam 接收器通用的程式碼,包括
- 設定檔案標頭和頁尾
- 循序記錄寫入
- 設定輸出 MIME 類型
FileBasedSink
及其子類別支援將檔案寫入任何 Beam 支援的 FileSystem
實作。如需範例,請參閱下列 Beam 提供的 FileBasedSink
實作
PTransform 包裝函式
當您建立終端使用者將會使用的來源或接收器時,請避免公開您的來源或接收器程式碼。為避免將您的來源和接收器公開給終端使用者,您的新類別應使用 _
字首。然後,實作使用者面向的包裝函式 PTransform
。藉由將您的來源或接收器公開為轉換,您的實作會被隱藏,而且可以任意複雜或簡單。不公開實作詳細資料的最大好處是,您稍後可以新增其他功能,而不會破壞使用者現有的實作。
例如,如果您的使用者的管道使用 beam.io.Read
從您的來源讀取,而您想要在管道中插入重新分割,則所有使用者都需要自行新增重新分割 (使用 GroupByKey
轉換)。為解決此問題,我們建議您將來源公開為執行讀取操作和重新分割的複合 PTransform
。
如需有關使用 PTransform
包裝的其他資訊,請參閱 Beam 的 PTransform 樣式指南。
下列範例會變更上述章節中的來源和接收器,使其不會公開給終端使用者。對於來源,將 CountingSource
重新命名為 _CountingSource
。然後,建立稱為 ReadFromCountingSource
的包裝函式 PTransform
最後,從來源讀取
對於接收器,將 SimpleKVSink
重新命名為 _SimpleKVSink
。然後,建立稱為 WriteToKVSink
的包裝函式 PTransform
class WriteToKVSink(PTransform):
def __init__(self, simplekv, url, final_table_name):
self._simplekv = simplekv
super().__init__()
self._url = url
self._final_table_name = final_table_name
def expand(self, pcoll):
return pcoll | iobase.Write(
_SimpleKVSink(self._simplekv, self._url, self._final_table_name))
最後,寫入接收器
上次更新時間為 2024/10/31
您找到您要找的所有內容了嗎?
這一切都有用且清晰嗎?您有任何想要變更的地方嗎?請告訴我們!