為 Python 開發 I/O 連接器

重要:請使用 Splittable DoFn 開發新的 I/O。如需更多詳細資訊,請閱讀新的 I/O 連接器概述

若要連線到 Beam 現有 I/O 連接器不支援的資料儲存區,您必須建立自訂的 I/O 連接器,通常由來源和接收器組成。所有 Beam 來源和接收器都是複合轉換;但是,自訂 I/O 的實作取決於您的使用案例。在開始之前,請閱讀新的 I/O 連接器概述,以瞭解開發新的 I/O 連接器、可用的實作選項,以及如何為您的使用案例選擇正確的選項。

本指南涵蓋使用 Python 的Source 和 FileBasedSink 介面。Java SDK 提供相同的功能,但使用稍微不同的 API。如需 Java SDK 特有的資訊,請參閱為 Java 開發 I/O 連接器

基本程式碼需求

Beam 執行器使用您提供的類別,以平行方式使用多個工作執行個體來讀取和/或寫入資料。因此,您為 SourceFileBasedSink 子類別提供的程式碼必須符合一些基本需求

  1. 可序列化:您的 SourceFileBasedSink 子類別必須是可序列化的。服務可能會建立多個 SourceFileBasedSink 子類別的執行個體,並將其傳送至多個遠端工作者,以促進平行讀取或寫入。來源和接收器物件序列化的方式是執行器特定的。

  2. 不可變性:您的 SourceFileBasedSink 子類別必須實際上是不可變的。您只有在使用需要實作來源的昂貴計算的延遲評估時,才應該在 SourceFileBasedSink 子類別中使用可變狀態。

  3. 執行緒安全:您的程式碼必須是執行緒安全的。適用於 Python 的 Beam SDK 提供 RangeTracker 類別,以簡化此操作。

  4. 可測試性:詳盡地單元測試所有 SourceFileBasedSink 子類別至關重要。小錯誤的實作可能會導致資料損毀或資料遺失 (例如略過或重複記錄),而且很難偵測到。您可以使用source_test_utils 模組中提供的測試工具和公用程式方法,來開發來源測試。

此外,請參閱PTransform 樣式指南,以瞭解 Beam 的轉換樣式指引。

實作 Source 介面

若要為管道建立新的資料來源,您需要提供特定格式的邏輯,以告知服務如何從輸入來源讀取資料,以及如何將資料來源分割成多個部分,以便多個工作執行個體可以平行讀取資料。

透過建立下列類別來提供新來源的邏輯

您可以在apache_beam.io.iobase 模組中找到這些類別。

實作 BoundedSource 子類別

BoundedSource 代表服務讀取的有限資料集,可能會以平行方式讀取。BoundedSource 包含一組方法,服務會使用這些方法來分割資料集,以供多個遠端工作者讀取。

若要實作 BoundedSource,您的子類別必須覆寫下列方法

實作 RangeTracker 子類別

RangeTracker 是一個執行緒安全物件,用於管理 BoundedSource 讀取器的目前範圍和目前位置,並保護對它們的並行存取。

若要實作 RangeTracker,您應先熟悉下列定義

RangeTracker 方法

若要實作 RangeTracker,您的子類別必須覆寫下列方法

此方法會將目前範圍 [self.start_position, self.stop_position) 分割成「主要」部分 [self.start_position, split_position) 和「剩餘」部分 [split_position, self.stop_position),前提是 split_position 尚未被耗用。

如果 split_position 已被耗用,則此方法會傳回 None。否則,它會將目前範圍更新為主要範圍,並傳回一個元組 (split_position, split_fraction)。split_fraction 應為範圍 [self.start_position, split_position) 的大小與原始 (分割前) 範圍 [self.start_position, self.stop_position) 的大小相比的比例。

注意:類別 iobase.RangeTracker 的方法可能會由多個執行緒叫用,因此必須使此類別具有執行緒安全性,例如,使用單一鎖定物件。

方便的 Source 基底類別

適用於 Python 的 Beam SDK 包含一些方便的抽象基底類別,可協助您輕鬆建立新的來源。

FileBasedSource

FileBasedSource 是一個用於開發新檔案類型來源的架構。您可以從 FileBasedSource 類別衍生您的 BoundedSource 類別。

若要為新的檔案類型建立來源,您需要建立 FileBasedSource 的子類別。FileBasedSource 的子類別必須實作 FileBasedSource.read_records() 方法。

如需 FileBasedSource 的實作範例,請參閱 AvroSource

從新的 Source 讀取

下列範例 CountingSource 示範 BoundedSource 的實作,並使用 SDK 提供的 RangeTracker,稱為 OffsetRangeTracker

class CountingSource(iobase.BoundedSource):
  def __init__(self, count):
    self.records_read = Metrics.counter(self.__class__, 'recordsRead')
    self._count = count

  def estimate_size(self):
    return self._count

  def get_range_tracker(self, start_position, stop_position):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    return OffsetRangeTracker(start_position, stop_position)

  def read(self, range_tracker):
    for i in range(range_tracker.start_position(),
                   range_tracker.stop_position()):
      if not range_tracker.try_claim(i):
        return
      self.records_read.inc()
      yield i

  def split(self, desired_bundle_size, start_position=None, stop_position=None):
    if start_position is None:
      start_position = 0
    if stop_position is None:
      stop_position = self._count

    bundle_start = start_position
    while bundle_start < stop_position:
      bundle_stop = min(stop_position, bundle_start + desired_bundle_size)
      yield iobase.SourceBundle(
          weight=(bundle_stop - bundle_start),
          source=self,
          start_position=bundle_start,
          stop_position=bundle_stop)
      bundle_start = bundle_stop

若要在您的管道中從來源讀取資料,請使用 Read 轉換

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> beam.io.Read(CountingSource(count))

注意:當您建立終端使用者將會使用的來源時,我們建議您不要像上述範例中示範的那樣公開來源本身的程式碼。請改用包裝的 PTransformPTransform 包裝函式討論了為什麼您應該避免公開來源,並逐步說明如何建立包裝函式。

使用 FileBasedSink 抽象化

如果您的資料來源使用檔案,您可以實作 FileBasedSink 抽象,以建立基於檔案的接收器。對於其他接收器,請使用 ParDoGroupByKey 和適用於 Python 的 Beam SDK 提供的其他轉換。如需更多詳細資訊,請參閱開發 I/O 連接器總覽

當使用 FileBasedSink 介面時,您必須提供格式特定的邏輯,以告知執行器如何將管道 PCollection 中的有界資料寫入輸出接收器。執行器會使用多個工作站平行寫入資料捆綁包。

藉由實作下列類別,為您的基於檔案的接收器提供邏輯

FileBasedSink 抽象基底類別會實作與和檔案互動的 Beam 接收器通用的程式碼,包括

FileBasedSink 及其子類別支援將檔案寫入任何 Beam 支援的 FileSystem 實作。如需範例,請參閱下列 Beam 提供的 FileBasedSink 實作

PTransform 包裝函式

當您建立終端使用者將會使用的來源或接收器時,請避免公開您的來源或接收器程式碼。為避免將您的來源和接收器公開給終端使用者,您的新類別應使用 _ 字首。然後,實作使用者面向的包裝函式 PTransform。藉由將您的來源或接收器公開為轉換,您的實作會被隱藏,而且可以任意複雜或簡單。不公開實作詳細資料的最大好處是,您稍後可以新增其他功能,而不會破壞使用者現有的實作。

例如,如果您的使用者的管道使用 beam.io.Read 從您的來源讀取,而您想要在管道中插入重新分割,則所有使用者都需要自行新增重新分割 (使用 GroupByKey 轉換)。為解決此問題,我們建議您將來源公開為執行讀取操作和重新分割的複合 PTransform

如需有關使用 PTransform 包裝的其他資訊,請參閱 Beam 的 PTransform 樣式指南

下列範例會變更上述章節中的來源和接收器,使其不會公開給終端使用者。對於來源,將 CountingSource 重新命名為 _CountingSource。然後,建立稱為 ReadFromCountingSource 的包裝函式 PTransform

class ReadFromCountingSource(PTransform):
  def __init__(self, count):
    super().__init__()
    self._count = count

  def expand(self, pcoll):
    return pcoll | iobase.Read(_CountingSource(self._count))

最後,從來源讀取

with beam.Pipeline() as pipeline:
  numbers = pipeline | 'ProduceNumbers' >> ReadFromCountingSource(count)

對於接收器,將 SimpleKVSink 重新命名為 _SimpleKVSink。然後,建立稱為 WriteToKVSink 的包裝函式 PTransform

class WriteToKVSink(PTransform):
  def __init__(self, simplekv, url, final_table_name):
    self._simplekv = simplekv
    super().__init__()
    self._url = url
    self._final_table_name = final_table_name

  def expand(self, pcoll):
    return pcoll | iobase.Write(
        _SimpleKVSink(self._simplekv, self._url, self._final_table_name))

最後,寫入接收器

with beam.Pipeline(options=PipelineOptions()) as pipeline:
  kvs = pipeline | 'CreateKVs' >> beam.core.Create(KVs)
  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
      simplekv, 'http://url_to_simple_kv/', final_table_name)