關於 Beam ML

Pydoc Pydoc




Javadoc Javadoc

您可以使用 Apache Beam 來

AI/ML 工作負載

您可以使用 Apache Beam 進行資料驗證、資料預先處理、模型驗證以及模型部署和推論。

Overview of AI/ML building blocks and where Apache Beam can be used

  1. 資料擷取:傳入的新資料會儲存在您的檔案系統或資料庫中,或是發佈到訊息佇列。
  2. 資料驗證:收到資料後,請檢查資料的品質。例如,您可能想要偵測離群值並計算標準差和類別分佈。
  3. 資料預先處理:驗證資料後,轉換資料,使其準備好用於訓練您的模型。
  4. 模型訓練:當您的資料準備好時,請訓練您的 AI/ML 模型。此步驟通常會重複多次,具體取決於您訓練的模型品質。
  5. 模型驗證:在您部署模型之前,請驗證其效能和準確度。
  6. 模型部署:部署您的模型,使用它對新的或現有的資料執行推論。

為了使您的模型保持最新狀態並在資料增長和發展時表現良好,請多次執行這些步驟。此外,您還可以將 ML ops 應用到您的專案中,以自動化整個模型和資料生命週期的 AI/ML 工作流程。使用協調器來自動化此流程並處理專案中不同建置區塊之間的轉換。

使用 RunInference

RunInference API 是一個針對機器學習推論進行最佳化的 PTransform,可讓您在管道中有效率地使用 ML 模型。此 API 包括以下功能

支援和限制

BatchElements PTransform

為了利用許多模型實作的向量化推論最佳化,BatchElements 轉換會用作對模型進行預測之前的中間步驟。此轉換會將元素批次處理在一起。然後將批次處理的元素與 RunInference 特定框架的轉換一起套用。例如,對於 numpy ndarrays,我們呼叫 numpy.stack(),而對於 torch Tensor 元素,我們呼叫 torch.stack()

若要自訂 beam.BatchElements 的設定,請在 ModelHandler 中覆寫 batch_elements_kwargs 函式。例如,使用 min_batch_size 來設定每個批次的最小元素數量,或使用 max_batch_size 來設定每個批次的最大元素數量。

更多資訊,請參閱 BatchElements 轉換的文件

共用協助程式類別

在 RunInference 實作中使用 Shared 類別,可以讓每個程序只載入模型一次,並與該程序中建立的所有 DoFn 實例共享。此功能可減少記憶體消耗和模型載入時間。更多資訊,請參閱 Shared 類別的文件

修改 Python 管道以使用 ML 模型

若要使用 RunInference 轉換,請將以下程式碼新增至您的管道

from apache_beam.ml.inference.base import RunInference
with pipeline as p:
   predictions = ( p |  'Read' >> beam.ReadFromSource('a_source')
                     | 'RunInference' >> RunInference(<model_handler>)

model_handler 替換為模型處理程式設定程式碼。

若要匯入模型,您需要設定一個 ModelHandler 物件,該物件會封裝底層模型。您匯入哪個模型處理程式取決於框架和包含輸入的資料結構類型。ModelHandler 物件還允許您使用 env_vars 關鍵字引數設定推論所需的環境變數。以下範例顯示一些您可能想要匯入的模型處理程式。

from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from tfx_bsl.public.beam.run_inference import CreateModelHandler

使用預先訓練的模型

本節提供搭配 PyTorch、Scikit-learn 和 TensorFlow 使用預訓練模型的需求。

PyTorch

您需要提供一個檔案路徑,該檔案包含模型的已儲存權重。此路徑必須可供管道存取。若要搭配 RunInference API 和 PyTorch 框架使用預訓練模型,請完成以下步驟

  1. 下載預訓練權重,並將其託管在管道可以存取的位置。
  2. 透過使用以下程式碼將模型權重路徑傳遞給 PyTorch ModelHandlerstate_dict_path=<權重路徑>

請參閱 此筆記本,其中說明如何使用 Apache Beam 執行 PyTorch 模型。

Scikit-learn

您需要提供一個檔案路徑,該檔案包含已 pickle 的 Scikit-learn 模型。此路徑必須可供管道存取。若要搭配 RunInference API 和 Scikit-learn 框架使用預訓練模型,請完成以下步驟

  1. 下載已 pickle 的模型類別,並將其託管在管道可以存取的位置。
  2. 透過使用以下程式碼將模型路徑傳遞給 Sklearn ModelHandlermodel_uri=<已 pickle 檔案路徑>model_file_type: <ModelFileType>,您可以在其中指定 ModelFileType.PICKLEModelFileType.JOBLIB,具體取決於模型的序列化方式。

請參閱 此筆記本,其中說明如何使用 Apache Beam 執行 Scikit-learn 模型。

TensorFlow

若要搭配 RunInference API 使用 TensorFlow,您有兩個選項

  1. 使用 Apache Beam SDK 中的內建 TensorFlow 模型處理程式 - TFModelHandlerNumpyTFModelHandlerTensor
    • 根據模型輸入的類型,分別針對 numpy 輸入使用 TFModelHandlerNumpy,針對 tf.Tensor 輸入使用 TFModelHandlerTensor
    • 使用 tensorflow 2.7 或更高版本。
    • 透過使用 model_uri=<已訓練模型路徑> 將模型路徑傳遞給 TensorFlow ModelHandler
    • 或者,您可以傳遞已訓練模型已儲存權重的路徑、使用 create_model_fn=<function> 建置模型的函式,並設定 model_type=ModelType.SAVED_WEIGHTS。請參閱 此筆記本,其中說明如何使用內建模型處理程式執行 Tensorflow 模型。
  2. 使用 tfx_bsl
    • 如果您的模型輸入類型為 tf.Example,請使用此方法。
    • 使用 tfx_bsl 1.10.0 或更高版本。
    • 使用 tfx_bsl.public.beam.run_inference.CreateModelHandler() 建立模型處理程式。
    • 將模型處理程式與 apache_beam.ml.inference.base.RunInference 轉換搭配使用。請參閱 此筆記本,其中說明如何使用 Apache Beam 和 tfx-bsl 執行 TensorFlow 模型。

使用自訂模型

如果您想要使用未由任何支援的框架指定的模型,RunInference API 的設計相當彈性,可讓您使用任何自訂機器學習模型。您只需要建立自己的 ModelHandlerKeyedModelHandler,其中包含載入模型並使用它來執行推論的邏輯即可。

您可以在 此筆記本中找到一個簡單的範例。load_model 方法顯示如何使用常用的 spaCy 套件載入模型,而 run_inference 則顯示如何在批次範例中執行推論。

RunInference 模式

本節提供一些模式和最佳實務,您可以使用這些模式和最佳實務來讓您的推論管道更簡單、更穩健且更有效率。

使用索引鍵模型處理常式物件

如果範例附加了索引鍵,請將 KeyedModelHandler 包裝在 ModelHandler 物件周圍

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
with pipeline as p:
   data = p | beam.Create([
      ('img1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('img2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('img3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

如果您不確定您的資料是否已建立索引鍵,可以使用 MaybeKeyedModelHandler

您也可以使用 KeyedModelHandler 根據其關聯的索引鍵載入多個不同的模型。以下範例使用 config1 載入模型。該模型用於與 key1 關聯的所有範例的推論。它使用 config2 載入第二個模型。該模型用於與 key2key3 關聯的所有範例。

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
   data = p | beam.Create([
      ('key1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('key3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

如需更詳細的範例,請參閱筆記本 使用多個經過不同訓練的模型執行 ML 推論

同時載入多個模型會增加記憶體不足錯誤 (OOM) 的風險。依預設,KeyedModelHandler 不會限制同時載入到記憶體中的模型數量。如果模型無法全部放入記憶體,您的管道可能會因記憶體不足錯誤而失敗。若要避免此問題,請使用 max_models_per_worker_hint 參數設定可同時載入到記憶體中的最大模型數。

以下範例每次最多在每個 SDK 工作站程序中載入兩個模型。它會卸載目前未使用的模型。

mhs = [
  KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
  KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
  KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
  KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

在特定機器上有多個 SDK 工作站程序的執行器,最多會將 max_models_per_worker_hint*<num worker processes> 個模型載入到機器上。

為模型和來自其他轉換的任何額外記憶體需求留下足夠的空間。因為記憶體可能不會在模型卸載後立即釋放,建議保留額外的緩衝區。

注意:擁有許多模型但 max_models_per_worker_hint 很小可能會導致記憶體抖動,其中大量執行時間用於在記憶體中交換模型。若要減少記憶體抖動的可能性和影響,如果您使用的是分散式執行器,請在推論步驟之前插入 GroupByKey 轉換。GroupByKey 轉換可確保具有相同索引鍵和模型的元素共置在同一個工作站上,進而減少抖動。

更多資訊,請參閱 KeyedModelHander

使用 PredictionResult 物件

在 Apache Beam 中進行預測時,輸出 PCollection 同時包含輸入範例的索引鍵和推論。在輸出中包含這兩個項目可讓您找到決定預測的輸入。

PredictionResult 物件是一個 NamedTuple,其中包含輸入和推論,分別命名為 exampleinference。當索引鍵與輸入資料一起傳遞到 RunInference 轉換時,輸出 PCollection 會傳回一個 Tuple[str, PredictionResult],即索引鍵和 PredictionResult 物件。您的管道會在 RunInference 轉換後的步驟中與 PredictionResult 物件互動。

class PostProcessor(beam.DoFn):
    def process(self, element: Tuple[str, PredictionResult]):
       key, prediction_result = element
       inputs = prediction_result.example
       predictions = prediction_result.inference

       # Post-processing logic
       result = ...

       yield (key, result)

with pipeline as p:
    output = (
        p | 'Read' >> beam.ReadFromSource('a_source')
                | 'PyTorchRunInference' >> RunInference(<keyed_model_handler>)
                | 'ProcessOutput' >> beam.ParDo(PostProcessor()))

如果您需要明確使用此物件,請在您的管道中加入以下程式碼行以匯入該物件

from apache_beam.ml.inference.base import PredictionResult

更多資訊,請參閱 PredictionResult 文件

自動模型重新整理

若要自動更新 RunInference PTransform 使用的模型而無需停止管道,請將 ModelMetadata 側輸入 PCollection 傳遞給 RunInference 輸入參數 model_metadata_pcoll

ModelMetdata 是一個 NamedTuple,其中包含

使用案例

側輸入 PCollection 必須遵循 AsSingleton 檢視,以避免錯誤。

注意:如果主 PCollection 發出輸入,且側輸入尚未接收輸入,則會緩衝主 PCollection,直到側輸入更新為止。當全域視窗側輸入具有資料驅動的觸發條件(例如 AfterCountAfterProcessingTime)時,可能會發生這種情況。在側輸入更新之前,請發出用於傳遞各自的 ModelHandler 作為側輸入的預設或初始模型 ID。

預先處理和後處理您的記錄

使用 RunInference,您可以將前處理和後處理操作新增至您的轉換。若要套用前處理操作,請在您的模型處理程式上使用 with_preprocess_fn

inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))

若要套用後處理操作,請在您的模型處理程式上使用 with_postprocess_fn

inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))

您也可以鏈結多個前處理和後處理操作

inference = pcoll | RunInference(
    model_handler.with_preprocess_fn(
      lambda x : do_something(x)
    ).with_preprocess_fn(
      lambda x : do_something_else(x)
    ).with_postprocess_fn(
      lambda x : do_something_after_inference(x)
    ).with_postprocess_fn(
      lambda x : do_something_else_after_inference(x)
    ))

前處理函式會在批次處理和推論之前執行。此函式會將您的輸入 PCollection 對應至模型處理程式的基本輸入類型。如果您套用多個前處理函式,它們會依據最後套用到第一個套用的順序在您的原始 PCollection 上執行。

後處理函式會在推論之後執行。此函式會將基本模型處理程式的輸出類型對應至您想要的輸出類型。如果您套用多個後處理函式,它們會依據第一個套用到最後一個套用的順序在您的原始推論結果上執行。

處理錯誤

為了在使用 RunInference 時能穩健地處理錯誤,您可以使用死信佇列。死信佇列會將失敗的記錄輸出到一個單獨的 PCollection 中,以便進一步處理。然後,可以分析這個 PCollection 並將其發送到儲存系統,以便在那裡進行審查並重新提交到管道,或將其丟棄。RunInference 內建支援死信佇列。您可以將 with_exception_handling 應用於您的 RunInference 轉換來使用死信佇列。

main, other = pcoll | RunInference(model_handler).with_exception_handling()
other.failed_inferences | beam.Map(print) # insert logic to handle failed records here

您也可以將此模式應用於具有相關預處理和後處理操作的 RunInference 轉換。

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
other.failed_inferences | beam.Map(print) # handles failed inferences
other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied

從 Java 管道執行推論

RunInference API 可透過 Apache Beam 的多語言管道框架,在 Beam Java SDK 2.41.0 及更新版本中使用。如需 Java 包裝轉換的相關資訊,請參閱 RunInference.java。若要試用,請參閱Java Sklearn Mnist 分類範例。此外,請參閱從 Java SDK 使用 RunInference,以了解使用 RunInference API 以及來自 Beam Java SDK 管道的預處理和後處理的複合 Python 轉換範例。

自訂推論

RunInference API 目前不支援使用遠端推論呼叫,例如 Natural Language API 或 Cloud Vision API。因此,若要將這些遠端 API 與 Apache Beam 搭配使用,您需要編寫自訂的推論呼叫。Apache Beam 筆記本中的遠端推論展示如何使用 beam.DoFn 實作自訂遠端推論呼叫。當您為真實專案實作遠端推論時,請考慮以下因素:

多模型管道

使用 RunInference 轉換將多個推論模型新增到您的管道。多模型管道對於 A/B 測試或建構由執行權杖化、句子分割、詞性標記、命名實體提取、語言偵測、共指解析等模型組成的級聯模型非常有用。如需更多資訊,請參閱多模型管道

A/B 模式

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = data | RunInference(<model_handler_B>)

其中 model_handler_Amodel_handler_B 是模型處理程式設定程式碼。

串聯模式

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>)

其中 model_handler_Amodel_handler_B 是模型處理程式設定程式碼。

針對不同的模型需求使用資源提示

當在單一管道中使用多個模型時,不同的模型可能具有不同的記憶體或工作者 SKU 需求。資源提示可讓您向執行器提供有關管道中每個步驟的運算資源需求資訊。

例如,以下程式碼片段透過每個 RunInference 呼叫的提示來擴充先前的級聯模式,以指定 RAM 和硬體加速器需求。

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(<model_handler_A>).with_resource_hints(min_ram="20GB")
   model_b_predictions = model_a_predictions
      | beam.Map(some_post_processing)
      | RunInference(<model_handler_B>).with_resource_hints(
         min_ram="4GB",
         accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")

如需有關資源提示的更多資訊,請參閱資源提示

模型驗證

模型驗證可讓您根據先前未見過的資料集來基準化模型的效能。您可以提取選定的指標、建立視覺化、記錄中繼資料,並比較不同模型的效能,最終目標是驗證您的模型是否準備好部署。Beam 提供在您的管道中直接對 TensorFlow 模型執行模型評估的支援。

ML 模型評估頁面展示如何使用TensorFlow 模型分析 (TFMA)將模型評估整合為管道的一部分。

疑難排解

如果您的管道或作業遇到問題,本節會列出您可能會遇到的問題,並提供如何修復這些問題的建議。

無法批次處理張量元素

RunInference 使用動態批次處理。但是,RunInference API 無法批次處理不同大小的張量元素,因此傳遞給 RunInference 轉換的樣本必須具有相同的維度或長度。如果您提供不同大小的影像或不同長度的文字嵌入,可能會發生以下錯誤:

File "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py", line 232, in run_inference batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack expects each tensor to be equal size, but got [12] at entry 0 and [10] at entry 1 [while running 'PyTorchRunInference/ParDo(_RunInferenceDoFn)']

若要避免此問題,請使用相同大小的元素,或停用批次處理。

選項 1:使用相同大小的元素

使用相同大小的元素或調整輸入大小。對於電腦視覺應用程式,調整影像輸入的大小,使其具有相同的維度。對於具有不同長度文字的自然語言處理 (NLP) 應用程式,調整文字或文字嵌入的大小,使其具有相同的長度。當處理不同長度的文字時,可能無法調整大小。在這種情況下,您可以停用批次處理(請參閱選項 2)。

選項 2:停用批次處理

透過覆寫 ModelHandler 中的 batch_elements_kwargs 函數,並將最大批次大小 (max_batch_size) 設定為 1 來停用批次處理:max_batch_size=1。如需更多資訊,請參閱BatchElements PTransforms。如需範例,請參閱我們的語言建模範例

Pydoc Pydoc




Javadoc Javadoc