使用共享物件快取資料

快取是一種軟體元件,用於儲存資料,以便可以更快地服務未來對該資料的要求。若要存取快取,您可以使用側邊輸入、具狀態的 DoFn 和對外部服務的呼叫。Python SDK 在共享模組中提供另一種選項。此選項可能比側邊輸入更有效率,比具狀態的 DoFn 更簡單,而且比呼叫外部服務的效能更高,因為它不必為每個元素或元素組合存取外部服務。如需使用 Beam SDK 快取資料的策略的詳細資訊,請參閱 2022 年 Beam Summit 的會議 使用 Beam SDK 在 Dataflow 中快取資料的策略

本頁的範例示範如何在有界和無界 PCollection 物件中使用 shared 模組Shared 類別來擴充元素。範例中使用兩個資料集:ordercustomer。order 記錄包含客戶 ID,這些客戶 ID 會透過對應客戶記錄來新增客戶屬性。

在批次管道上建立快取

在此範例中,客戶快取會以字典形式載入 EnrichOrderFnsetup 方法中。快取用於將客戶屬性新增至 order 記錄。由於 Python 字典不支援弱參考,而且 Shared 物件會封裝對共享資源單例執行個體的弱參考,因此請建立包裝函式類別。

# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
    pass

class EnrichOrderFn(beam.DoFn):
    def __init__(self):
        self._customers = {}
        self._shared_handle = shared.Shared()

    def setup(self):
        # setup is a good place to initialize transient in-memory resources.
        self._customer_lookup = self._shared_handle.acquire(self.load_customers)

    def load_customers(self):
        self._customers = expensive_remote_call_to_load_customers()
        return WeakRefDict(self._customers)

    def process(self, element):
        attr = self._customer_lookup.get(element["customer_id"], {})
        yield {**element, **attr}

在串流管道上建立快取並定期更新

由於假設客戶快取會隨著時間變更,因此您需要定期重新整理它。若要重新載入共享物件,請變更 acquire 方法的 tag 引數。在此範例中,重新整理會在 start_bundle 方法中實作,其中會將目前的標記值與與現有共享物件相關聯的值進行比較。set_tag 方法會傳回在最大陳舊秒數內相同的標記值。因此,如果標記值大於現有標記值,則會觸發重新整理客戶快取。

# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
    pass

class EnrichOrderFn(beam.DoFn):
    def __init__(self):
        self._max_stale_sec = 60
        self._customers = {}
        self._shared_handle = shared.Shared()

    def setup(self):
        # setup is a good place to initialize transient in-memory resources.
        self._customer_lookup = self._shared_handle.acquire(
            self.load_customers, self.set_tag()
        )

    def set_tag(self):
        # A single tag value is returned within a period, which is upper-limited by the max stale second.
        current_ts = datetime.now().timestamp()
        return current_ts - (current_ts % self._max_stale_sec)

    def load_customers(self):
        # Assign the tag value of the current period for comparison.
        self._customers = expensive_remote_call_to_load_customers(tag=self.set_tag())
        return WeakRefDict(self._customers)

    def start_bundle(self):
        # Update the shared object when the current tag value exceeds the existing value.
        if self.set_tag() > self._customers["tag"]:
            self._customer_lookup = self._shared_handle.acquire(
                self.load_customers, self.set_tag()
            )

    def process(self, element):
        attr = self._customer_lookup.get(element["customer_id"], {})
        yield {**element, **attr}