多模型管線

Apache Beam 可讓您開發多模型管線。此範例示範如何擷取和轉換輸入資料、透過模型執行,然後將第一個模型的結果傳遞到第二個模型。此頁面說明多模型管線的運作方式,並概述您需要知道的內容才能建立一個管線。

在閱讀本節之前,建議您熟悉管線開發生命週期中的資訊。

如何使用 Beam 建立多模型管線

典型的機器學習工作流程包含一系列資料轉換步驟,例如資料擷取、資料處理任務、推論和後處理。Apache Beam 可讓您將所有這些步驟一起協調,方法是將它們封裝在單一的 Apache Beam 有向非循環圖 (DAG) 中,讓您可以建立彈性且可擴充的端對端機器學習系統。

若要在 Apache Beam 管線中部署您的機器學習模型,請使用RunInferenceAPI,這有助於將您的模型整合為 DAG 中的 PTransform 步驟。在單一 DAG 中組合多個 RunInference 轉換,即可建立由多個 ML 模型組成的管線。透過這種方式,Apache Beam 支援複雜 ML 系統的開發。

您可以使用不同的模式在 Apache Beam 中建立多模型管線。此頁面將探討 A/B 模式和階層模式。

A/B 模式

A/B 模式描述一個多個 ML 模型平行運作的框架。此模式的一個應用是測試不同機器學習模型的效能,並判斷新模型是否優於現有模型。這也稱為「冠軍/挑戰者」方法。通常,您會定義一個業務指標來比較控制模型的效能與目前的模型。

一個例子可能是推薦引擎模型,您有一個現有模型可以根據使用者的偏好和活動歷史記錄推薦廣告。在決定部署新模型時,您可以將傳入的使用者流量分成兩個分支,其中一半的使用者會接觸到新模型,另一半則會接觸到目前的模型。

之後,您可以測量這兩組使用者在定義期間內廣告的平均點擊率 (CTR),以判斷新模型的效能是否優於現有模型。

import apache_beam as beam

with beam.Pipeline() as pipeline:
   userset_a_traffic, userset_b_traffic =
     (pipeline | 'ReadFromStream' >> beam.ReadFromStream('stream_source')
               | ‘Partition’ >> beam.partition(split_dataset, 2, ratio=[5, 5])
     )

model_a_predictions = userset_a_traffic | RunInference(<model_handler_A>)
model_b_predictions = userset_b_traffic | RunInference(<model_handler_B>)

其中 beam.partition 用於將資料來源分割成 50/50 的分割區。如需有關資料分割的詳細資訊,請參閱Partition

階層模式

當問題的解決方案涉及一系列 ML 模型時,會使用階層模式。在這種情況下,模型的輸出通常會使用 PTransform 轉換為合適的格式,然後再將其傳遞到另一個模型。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = model_a_predictions | beam.ParDo(post_processing()) | RunInference(<model_handler_B>)

使用影像字幕和排名範例的集成模型筆記本顯示用於產生和排名影像字幕的階層管線的端對端範例。該解決方案包含兩個開放原始碼模型

  1. 字幕產生模型 (BLIP),可從輸入影像產生候選影像字幕。
  2. 字幕排名模型 (CLIP),使用影像和候選字幕來排名字幕,順序為最能描述影像的字幕。

使用多個不同訓練的模型

您可以使用 KeyedModelHandler 將多個不同的模型載入 RunInference 轉換。使用關聯的鍵來決定要將哪個模型用於哪個資料。下列範例會使用 config1 載入模型。該模型用於推論所有與 key1 關聯的範例。它會使用 config2 載入第二個模型。該模型用於所有與 key2key3 關聯的範例。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
   data = p | beam.Create([
      ('key1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

如需更詳細的範例,請參閱筆記本 使用多個不同訓練模型執行 ML 推論

同時載入多個模型會增加記憶體不足錯誤 (OOMs) 的風險。 依預設,KeyedModelHandler 不會限制同時載入到記憶體中的模型數量。 如果模型無法全部放入記憶體中,您的管道可能會因記憶體不足錯誤而失敗。 若要避免此問題,請使用 max_models_per_worker_hint 參數設定可同時載入到記憶體中的最大模型數量。

以下範例在每個 SDK 工作站程序中,一次最多載入兩個模型。 它會卸載目前未使用的模型。

mhs = [
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
  KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
  KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

在特定機器上有多個 SDK 工作站程序的執行器,最多會在該機器上載入 max_models_per_worker_hint*<工作站程序數量> 個模型。

請為模型和其他轉換的額外記憶體需求保留足夠空間。 因為模型卸載後記憶體可能不會立即釋放,因此建議保留額外緩衝區。

注意:模型數量多但 max_models_per_worker_hint 值小時,可能會導致記憶體抖動,在這種情況下,大量執行時間會用於在記憶體中換入和換出模型。 為了減少記憶體抖動的可能性和影響,如果您使用的是分散式執行器,請在推論步驟之前插入 GroupByKey 轉換。 GroupByKey 轉換可確保具有相同鍵和模型的元素位於相同的工作站上,從而減少抖動。

如需更多資訊,請參閱 KeyedModelHander