使用共享物件快取資料
快取是一種軟體元件,用於儲存資料,以便可以更快地服務未來對該資料的要求。若要存取快取,您可以使用側邊輸入、具狀態的 DoFn
和對外部服務的呼叫。Python SDK 在共享模組中提供另一種選項。此選項可能比側邊輸入更有效率,比具狀態的 DoFn
更簡單,而且比呼叫外部服務的效能更高,因為它不必為每個元素或元素組合存取外部服務。如需使用 Beam SDK 快取資料的策略的詳細資訊,請參閱 2022 年 Beam Summit 的會議 使用 Beam SDK 在 Dataflow 中快取資料的策略。
本頁的範例示範如何在有界和無界 PCollection
物件中使用 shared 模組
的 Shared
類別來擴充元素。範例中使用兩個資料集:order 和 customer。order 記錄包含客戶 ID,這些客戶 ID 會透過對應客戶記錄來新增客戶屬性。
在批次管道上建立快取
在此範例中,客戶快取會以字典形式載入 EnrichOrderFn
的 setup
方法中。快取用於將客戶屬性新增至 order 記錄。由於 Python 字典不支援弱參考,而且 Shared
物件會封裝對共享資源單例執行個體的弱參考,因此請建立包裝函式類別。
# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
pass
class EnrichOrderFn(beam.DoFn):
def __init__(self):
self._customers = {}
self._shared_handle = shared.Shared()
def setup(self):
# setup is a good place to initialize transient in-memory resources.
self._customer_lookup = self._shared_handle.acquire(self.load_customers)
def load_customers(self):
self._customers = expensive_remote_call_to_load_customers()
return WeakRefDict(self._customers)
def process(self, element):
attr = self._customer_lookup.get(element["customer_id"], {})
yield {**element, **attr}
在串流管道上建立快取並定期更新
由於假設客戶快取會隨著時間變更,因此您需要定期重新整理它。若要重新載入共享物件,請變更 acquire
方法的 tag
引數。在此範例中,重新整理會在 start_bundle
方法中實作,其中會將目前的標記值與與現有共享物件相關聯的值進行比較。set_tag
方法會傳回在最大陳舊秒數內相同的標記值。因此,如果標記值大於現有標記值,則會觸發重新整理客戶快取。
# The wrapper class is needed for a dictionary, because it does not support weak references.
class WeakRefDict(dict):
pass
class EnrichOrderFn(beam.DoFn):
def __init__(self):
self._max_stale_sec = 60
self._customers = {}
self._shared_handle = shared.Shared()
def setup(self):
# setup is a good place to initialize transient in-memory resources.
self._customer_lookup = self._shared_handle.acquire(
self.load_customers, self.set_tag()
)
def set_tag(self):
# A single tag value is returned within a period, which is upper-limited by the max stale second.
current_ts = datetime.now().timestamp()
return current_ts - (current_ts % self._max_stale_sec)
def load_customers(self):
# Assign the tag value of the current period for comparison.
self._customers = expensive_remote_call_to_load_customers(tag=self.set_tag())
return WeakRefDict(self._customers)
def start_bundle(self):
# Update the shared object when the current tag value exceeds the existing value.
if self.set_tag() > self._customers["tag"]:
self._customer_lookup = self._shared_handle.acquire(
self.load_customers, self.set_tag()
)
def process(self, element):
attr = self._customer_lookup.get(element["customer_id"], {})
yield {**element, **attr}
上次更新於 2024/10/31
您是否找到了您要找的所有內容?
是否一切都實用且清楚?您想要變更任何內容嗎?請告訴我們!