異常偵測範例
異常偵測範例示範如何設定異常偵測管道,該管道會即時從 Pub/Sub 讀取文字,然後使用訓練過的 HDBSCAN 叢集模型偵測異常。
異常偵測的資料集
此範例使用名為 emotion 的資料集,其中包含 20,000 則英文 Twitter 訊息,並具有 6 種基本情緒:憤怒、恐懼、喜悅、愛、悲傷和驚訝。資料集有三個分割:訓練(用於訓練)、驗證和測試(用於效能評估)。因為它包含資料集的文字和類別(類別),所以它是一個監督式資料集。您可以使用 Hugging Face 資料集頁面來存取此資料集。
以下文字顯示來自資料集訓練分割的範例
文字 | 情緒類型 |
---|---|
我正在抓緊時間發文,我感到貪婪是錯誤的 | 憤怒 |
我一直對壁爐感到懷舊,我會知道它仍然在產權上 | 愛 |
我一直在服用或建議劑量的毫克或次數,我入睡的速度快了很多,但我也覺得很有趣 | 恐懼 |
在前往丹麥的乘船旅行中 | 喜悅 |
我覺得你知道基本上就像科幻領域中的假貨 | 悲傷 |
我開始每週出現幾次幻覺,被移動的人和人物、聲音和震動所折磨 | 恐懼 |
異常偵測演算法
HDBSCAN 是一種叢集演算法,透過將 DBSCAN 轉換為階層式叢集演算法,然後根據叢集的穩定性提取平坦叢集來延伸 DBSCAN。訓練完成後,如果新的資料點是離群值,則模型會預測 -1
,否則它會預測現有叢集之一。
擷取到 Pub/Sub
將資料擷取到 Pub/Sub 中,以便在叢集時,模型可以從 Pub/Sub 讀取推文。Pub/Sub 是一種訊息傳遞服務,用於在應用程式和服務之間交換事件資料。串流分析和資料整合管道會使用 Pub/Sub 來擷取和散佈資料。
您可以在 GitHub 中看到將資料擷取到 Pub/Sub 的完整範例程式碼
擷取管道的檔案結構如下圖所示
write_data_to_pubsub_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/utils.py
包含用於載入情緒資料集以及兩個用於資料轉換的 beam.DoFn
的程式碼。
pipeline/options.py
包含用於設定 Dataflow 管道的管道選項。
config.py
定義多次使用的變數,例如 Google Cloud PROJECT_ID 和 NUM_WORKERS。
setup.py
定義管道執行的套件和需求。
main.py
包含管道程式碼和用於執行管道的其他函數。
執行管道
若要執行管道,請安裝必要的套件。在此範例中,您需要存取 Google Cloud 專案,並且需要在 config.py
檔案中設定 Google Cloud 變數,例如 PROJECT_ID
、REGION
、PubSub TOPIC_ID
和其他變數。
- 在本機電腦上:
python main.py
- 在 GCP 上用於 Dataflow:
python main.py --mode cloud
write_data_to_pubsub_pipeline
包含四個不同的轉換
- 使用 Hugging Face 資料集載入情緒資料集(為了簡單起見,我們從三個類別而不是六個類別中取樣)。
- 將每段文字與唯一的識別碼 (UID) 建立關聯。
- 將文字轉換為 Pub/Sub 預期的格式。
- 將格式化的訊息寫入 Pub/Sub。
串流資料上的異常偵測
將資料擷取至 Pub/Sub 後,執行異常偵測管線。此管線會從 Pub/Sub 讀取串流訊息,使用語言模型將文字轉換為嵌入向量,並將嵌入向量饋送到已訓練好的分群模型,以預測訊息是否為異常。此管線的一個先決條件是必須有一個基於資料集訓練分割所訓練的 HDBSCAN 分群模型。
您可以在 GitHub 中找到異常偵測的完整範例程式碼
下圖顯示了 anomaly_detection 管線的檔案結構
anomaly_detection_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/transformations.py
包含不同 beam.DoFn
的程式碼以及管線中使用的其他函式。
pipeline/options.py
包含用於設定 Dataflow 管道的管道選項。
config.py
定義了多次使用的變數,例如 Google Cloud PROJECT_ID 和 NUM_WORKERS。
setup.py
定義管道執行的套件和需求。
main.py
包含管線程式碼和用於執行管線的其他函式。
執行管道
安裝所需的套件,並將資料推送至 Pub/Sub。在此範例中,您需要存取 Google Cloud 專案,並需要在 config.py
檔案中設定 Google Cloud 變數,例如 PROJECT_ID
、REGION
、PubSub SUBSCRIPTION_ID
等。
- 在本機電腦上:
python main.py
- 在 GCP 上用於 Dataflow:
python main.py --mode cloud
該管線包含以下步驟
- 從 Pub/Sub 讀取訊息。
- 將 Pub/Sub 訊息轉換為字典的
PCollection
,其中鍵為 UID,值為 Twitter 文字。 - 使用 Tokenizer 將文字編碼為 Transformer 可讀取的 Token ID 整數。
- 使用 RunInference 從基於 Transformer 的語言模型取得向量嵌入。
- 標準化嵌入。
- 使用 RunInference 從已訓練的 HDBSCAN 分群模型取得異常預測。
- 將預測寫入 BigQuery,以便在需要時重新訓練分群模型。
- 如果偵測到異常,則發送電子郵件警報。
以下程式碼片段顯示了管線的前兩個步驟
下一節將描述以下管線步驟
- 將文字 Token 化
- 使用 RunInference 取得嵌入
- 從 HDBSCAN 模型取得預測
從語言模型取得嵌入
為了使用文字資料進行分群,首先將文字對應到適用於統計分析的數值向量。此範例使用名為 sentence-transformers/stsb-distilbert-base/stsb-distilbert-base 的基於 Transformer 的語言模型。此模型將句子和段落對應到 768 維的密集向量空間,您可以使用它來執行分群或語義搜尋等任務。
由於語言模型預期的是 Token 化輸入,而不是原始文字,因此請先將文字 Token 化。Token 化是一個預處理任務,它轉換文字,以便可以將其饋送到模型以取得預測。
在這裡,tokenize_sentence
是一個函式,它接受一個包含文字和 ID 的字典,將文字 Token 化,並傳回文字和 ID 以及 Token 化輸出的一個元組。
Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)
def tokenize_sentence(input_dict):
"""
Takes a dictionary with a text and an id, tokenizes the text, and
returns a tuple of the text and id and the tokenized text
Args:
input_dict: a dictionary with the text and id of the sentence
Returns:
A tuple of the text and id, and a dictionary of the tokens.
"""
text, uid = input_dict["text"], input_dict["id"]
tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
return (text, uid), tokens
然後將 Token 化輸出傳遞到語言模型以取得嵌入。為了從語言模型取得嵌入,我們使用 Apache Beam 的 RunInference()
。
embedding_model_handler
為我們定義 PytorchNoBatchModelHandler
作為 PytorchModelHandler
的包裝器,以將批次大小限制為一。
# Can be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
"""Wrapper to PytorchModelHandler to limit batch size to 1.
The tokenized strings generated from BertTokenizer may have different
lengths, which doesn't work with torch.stack() in current RunInference
implementation since stack() requires tensors to be the same size.
Restricting max_batch_size to 1 means there is only 1 example per `batch`
in the run_inference() call.
"""
def batch_elements_kwargs(self):
return {"max_batch_size": 1}
由於 DistilBertModel
的 forward()
不會傳回嵌入,因此我們自訂義 model_class ModelWrapper
來取得向量嵌入。
class ModelWrapper(DistilBertModel):
"""Wrapper to DistilBertModel to get embeddings when calling
forward function."""
def forward(self, **kwargs):
output = super().forward(**kwargs)
sentence_embedding = (
self.mean_pooling(output,
kwargs["attention_mask"]).detach().cpu().numpy())
return sentence_embedding
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(self, model_output, attention_mask):
"""
Calculates the mean of token embeddings
Args:
model_output: The output of the model.
attention_mask: This is a tensor that contains 1s for all input tokens and
0s for all padding tokens.
Returns:
The mean of the token embeddings.
"""
token_embeddings = model_output[
0] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9)
在取得每段 Twitter 文字的嵌入後,會將嵌入標準化,因為已訓練的模型預期的是標準化的嵌入。
取得預測
然後將標準化的嵌入轉發到已訓練的 HDBSCAN 模型以取得預測。
其中 clustering_model_handler
為
我們定義 CustomSklearnModelHandlerNumpy
作為 SklearnModelHandlerNumpy
的包裝器,以將批次大小限制為一,並覆寫 run_inference
,以便使用 hdbscan.approximate_predict()
來取得異常預測。
class CustomSklearnModelHandlerNumpy(SklearnModelHandlerNumpy):
# limit batch size to 1 can be removed once: https://github.com/apache/beam/issues/21863 is fixed
def batch_elements_kwargs(self):
"""Limit batch size to 1 for inference"""
return {"max_batch_size": 1}
# run_inference can be removed once: https://github.com/apache/beam/issues/22572 is fixed
def run_inference(self, batch, model, inference_args=None):
"""Runs inferences on a batch of numpy arrays.
Args:
batch: A sequence of examples as numpy arrays. They should
be single examples.
model: A numpy model or pipeline. Must implement predict(X).
Where the parameter X is a numpy array.
inference_args: Any additional arguments for an inference.
Returns:
An Iterable of type PredictionResult.
"""
_validate_inference_args(inference_args)
vectorized_batch = np.vstack(batch)
predictions = hdbscan.approximate_predict(model, vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
在取得模型預測後,將來自 RunInference
的輸出解碼為字典。接下來,將預測儲存在 BigQuery 資料表中以進行分析、更新 HDBSCAN 模型,並且如果預測為異常,則發送電子郵件警報。
_ = (
predictions
| "Decode Prediction" >> beam.ParDo(DecodePrediction())
| "Write to BQ" >> beam.io.WriteToBigQuery(
table=cfg.TABLE_URI,
schema=cfg.TABLE_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))
_ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())
上次更新時間:2024/10/31
您是否找到了您要找的所有內容?
這一切都實用且清楚嗎?您想更改任何內容嗎?請告訴我們!