在 Beam 中進行大型語言模型推論

在 Apache Beam 2.40.0 中,Beam 引入了 RunInference API,可讓您在 Beam 管道中部署機器學習模型。RunInference 轉換會使用機器學習 (ML) 模型對範例的 PCollection 執行推論。該轉換會輸出包含輸入範例和輸出預測的 PCollection。如需詳細資訊,請參閱此處的 RunInference 這裡。您也可以在 GitHub 上找到推論範例

將 RunInference 與非常大的模型搭配使用

只要它們可以安裝在您的硬體上,RunInference 就可以在任意大小的模型上正常運作。

記憶體管理

RunInference 具有數種機制可減少記憶體使用量。例如,依預設,RunInference 在每個程序中最多載入每個模型的一個副本 (而不是每個執行緒一個副本)。

然而,許多 Beam 執行器會同時在每台機器上執行多個 Beam 程序。這可能會導致問題,因為多次載入大型模型 (如 LLM) 的記憶體佔用量可能太大,無法安裝在單一機器中。對於記憶體密集型模型,RunInference 提供了一種更智慧地在多個程序之間共享記憶體以減少整體記憶體佔用量的方法。若要啟用此模式,使用者只需在模型組態中將參數 large_model 設定為 True (請參閱下方的範例),而 Beam 將會負責記憶體管理。當使用自訂模型處理器時,您可以覆寫 share_model_across_processes 函式或 model_copies 函式以達到類似的效果。

使用 T5 執行範例管道

此範例示範如何使用管道中的 RunInference 執行 T5 語言模型的推論。T5 是一個編碼器-解碼器模型,預先在無監督和監督任務的多任務混合中訓練。每個任務都會轉換為文字到文字格式。此範例使用 T5-11B,其中包含 110 億個參數且大小為 45 GB。為了在各種任務中良好運作,T5 會在輸入中加入與每個任務對應的不同前置字元。例如,對於翻譯,輸入將會是:將英文翻譯成德文:…,而對於摘要,輸入將會是:摘要:…。如需有關 T5 的詳細資訊,請參閱 HuggingFace 文件中的 T5 概觀

若要使用此模型執行推論,請先安裝 apache-beam 2.40 或更高版本

pip install apache-beam -U

接下來,安裝 requirements.txt 中列出的必要套件,並傳遞必要的引數。您可以透過以下步驟從 Hugging Face Hub 下載 T5-11b 模型

import torch
from transformers import T5ForConditionalGeneration

model = T5ForConditionalGeneration.from_pretrained("path/to/cloned/t5-11b")
torch.save(model.state_dict(), "path/to/save/state_dict.pth")

您可以在 GitHub 上檢視程式碼

  1. 在您的本機電腦上
python main.py --runner DirectRunner \
               --model_state_dict_path <local or remote path to state_dict> \
               --model_name t5-11b

您需要有 45 GB 的可用磁碟空間才能執行此範例。

  1. 在使用 Dataflow 的 Google Cloud 上
python main.py --runner DataflowRunner \
                --model_state_dict_path <gs://path/to/saved/state_dict.pth> \
                --model_name t5-11b \
                --project <PROJECT_ID> \
                --region <REGION> \
                --requirements_file requirements.txt \
                --staging_location <gs://path/to/staging/location>
                --temp_location <gs://path/to/temp/location> \
                --experiments "use_runner_v2,no_use_multiple_sdk_containers" \
                --machine_type=n1-highmem-16 \
                --disk_size_gb=200

您也可以傳遞其他組態參數,如此處所述。

管道步驟

管道包含以下步驟

  1. 讀取輸入。
  2. 使用 tokenizer 將文字編碼為轉換器可讀取的符號 ID 整數。
  3. 使用 RunInference 取得輸出。
  4. 解碼 RunInference 輸出並列印它。

以下程式碼片段包含四個步驟

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | "CreateInputs" >> beam.Create(task_sentences)
            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
            | "RunInference" >> RunInference(model_handler=model_handler)
            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
        )

在管道的第三個步驟中,我們使用 RunInference。若要使用它,您必須先定義一個 ModelHandler。RunInference 提供 PyTorchTensorFlowScikit-Learn 的模型處理器。由於此範例使用 PyTorch 模型,因此會使用 PyTorchModelHandlerTensor 模型處理器。

  gen_fn = make_tensor_model_fn('generate')

  model_handler = PytorchModelHandlerTensor(
      state_dict_path=args.model_state_dict_path,
      model_class=T5ForConditionalGeneration,
      model_params={"config": AutoConfig.from_pretrained(args.model_name)},
      device="cpu",
      inference_fn=gen_fn,
      large_model=True)

ModelHandler 需要以下參數,例如

大型模型疑難排解

Pickling 錯誤

當使用 large_model=True 在不同程序之間共享模型,或使用自訂模型處理器時,Beam 會跨程序邊界傳送輸入和輸出資料。為了做到這一點,它使用一種稱為 序列化 (pickling) 的方法。例如,如果您呼叫 output=model.my_inference_fn(input_1, input_2),則 input_1input_2output 都需要被序列化。模型本身不需要被序列化,因為它不會跨程序邊界傳遞。

雖然大多數物件都可以順利序列化,但如果其中一個物件無法序列化,您可能會遇到類似 error: can't pickle fasttext_pybind.fasttext objects 的錯誤。要解決這個問題,有幾種選擇:

首先,如果可以的話,您可以選擇不跨程序共享模型。這會增加額外的記憶體壓力,但在某些情況下可能是可以接受的。

其次,使用自訂模型處理器,您可以封裝您的模型以接收和返回可序列化的類型。例如,如果您的模型處理器如下所示:

class MyModelHandler():
   def load_model(self):
      return model_loading_logic()

   def run_inference(self, batch: Sequence[str], model, inference_args):
      unpickleable_object = Unpickleable(batch)
      unpickleable_returned = model.predict(unpickleable_object)
      my_output = int(unpickleable_returned[0])
      return my_output

您可以使用模型封裝器來封裝無法序列化的部分。由於模型封裝器將位於推論程序中,只要它只接收/返回可序列化的物件,這就會有效。

class MyWrapper():
   def __init__(self, model):
      self._model = model

   def predict(self, batch: Sequence[str]):
      unpickleable_object = Unpickleable(batch)
      unpickleable_returned = model.predict(unpickleable_object)
      return int(prediction[0])

class MyModelHandler():
   def load_model(self):
      return MyWrapper(model_loading_logic())

   def run_inference(self, batch: Sequence[str], model: MyWrapper, inference_args):
      return model.predict(unpickleable_object)

Beam 中的 RAG 和提示工程

Beam 也是一個出色的工具,可使用檢索增強生成 (Retrieval Augmented Generation,RAG) 來提高 LLM 提示的品質。檢索增強生成是一種透過將大型語言模型 (LLM) 連接到外部知識來源來增強它們的技術。這使得 LLM 可以存取和處理即時資訊,提高其回應的準確性、相關性和事實性。

Beam 有幾種機制可以簡化此過程:

  1. Beam 的 MLTransform 提供了一個嵌入套件來生成 RAG 所需的嵌入。如果您有一個沒有嵌入處理器的模型,您也可以使用 RunInference 來生成嵌入。
  2. Beam 的 Enrichment 轉換 可以輕鬆地在外部儲存系統(例如 向量資料庫)中查找嵌入或其他資訊。

總體而言,您可以使用這些步驟來執行 RAG:

管線 1 - 生成知識庫

  1. 使用 Beam 的 IO 連接器之一,從外部來源擷取資料。
  2. 使用 MLTransform 在該資料上生成嵌入。
  3. 使用 ParDo 將這些嵌入寫入向量資料庫。

管線 2 - 使用知識庫執行 RAG

  1. 使用 Beam 的 IO 連接器之一,從外部來源擷取資料。
  2. 使用 MLTransform 在該資料上生成嵌入。
  3. 使用 Enrichment,從您的向量資料庫中取得額外的嵌入來豐富該資料。
  4. 使用豐富後的資料,透過 RunInference 來提示您的 LLM。
  5. 使用 Beam 的 IO 連接器之一,將該資料寫入您想要的接收器。

若要檢視執行 RAG 的範例管線,請參閱 https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/rag_usecase/beam_rag_notebook.ipynb