BigQuery ML 整合
透過此頁面的範例,我們將示範如何使用 TFX Basic Shared Libraries (tfx_bsl) 將從 BigQuery ML (BQML) 匯出的模型整合到您的 Apache Beam 管道中。
大致上,以下各節將更詳細地說明下列步驟
- 建立和訓練您的 BigQuery ML 模型
- 匯出您的 BigQuery ML 模型
- 建立使用全新 BigQuery ML 模型的轉換
- Java SDK
- Python SDK
建立和訓練您的 BigQuery ML 模型
為了能夠使用 tfx_bsl 將您的 BQML 模型納入 Apache Beam 管道,它必須採用 TensorFlow SavedModel 格式。您可以在此處找到將不同模型類型對應到其 BQML 匯出模型格式的概述。
為了簡單起見,我們將在 BQML 快速入門指南中訓練邏輯迴歸模型(簡化版),使用公開可用的 Google Analytics 範例資料集(這是一個日期分片資料表 - 或者,您可能會遇到 分割資料表)。您可以在 此處找到所有可以使用 BQML 建立的模型概述。
在建立 BigQuery 資料集之後,您可以繼續建立模型,該模型完全在 SQL 中定義
CREATE MODEL IF NOT EXISTS `bqml_tutorial.sample_model`
OPTIONS(model_type='logistic_reg', input_label_cols=["label"]) AS
SELECT
IF(totals.transactions IS NULL, 0, 1) AS label,
IFNULL(geoNetwork.country, "") AS country
FROM
`bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
_TABLE_SUFFIX BETWEEN '20160801' AND '20170630'
該模型將根據 2016-08-01 到 2017-06-30 之間收集的資料,預測是否會根據訪客的國家/地區進行購買。
匯出您的 BigQuery ML 模型
為了將您的模型納入 Apache Beam 管道中,您需要匯出它。這樣做的先決條件是安裝 bq
命令列工具和建立 Google Cloud Storage 儲存貯體來儲存您匯出的模型。
使用以下命令匯出模型
bq extract -m bqml_tutorial.sample_model gs://some/gcs/path
建立使用您 BigQuery ML 模型的 Apache Beam 轉換
在本節中,我們將建構一個 Apache Beam 管道,該管道將使用我們剛建立和匯出的 BigQuery ML 模型。該模型可以使用 Google Cloud AI Platform Prediction 來服務 - 為此,請參閱AI Platform 模式。在這種情況下,我們將說明如何使用 tfx_bsl 程式庫進行本機預測(在您的 Apache Beam 工作程式上)。
首先,需要將模型下載到您將開發其餘管道的本機目錄(例如,serving_dir/sample_model/1
)。
然後,您可以像平常一樣開始開發管道。我們將使用來自 tfx_bsl 程式庫的 RunInference
PTransform,並且會將其指向儲存模型的本機目錄(請參閱程式碼範例中的 model_path
變數)。該轉換會以 tf.train.Example
類型的元素作為輸入,並輸出 tensorflow_serving.apis.prediction_log_pb2.PredictionLog
類型的元素。根據模型的簽名,您可以從輸出中擷取值;在我們的例子中,我們根據關於邏輯迴歸模型的檔案,在 extract_prediction
函數中擷取 label_probs
、label_values
和 predicted_label
。
import apache_beam
import tensorflow as tf
from google.protobuf import text_format
from tensorflow.python.framework import tensor_util
from tfx_bsl.beam import run_inference
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2
inputs = tf.train.Example(features=tf.train.Features(
feature={
'os': tf.train.Feature(bytes_list=tf.train.BytesList(b"Microsoft"))
})
)
model_path = "serving_dir/sample_model/1"
def extract_prediction(response):
yield response.predict_log.response.outputs['label_values'].string_val,
tensor_util.MakeNdarray(response.predict_log.response.outputs['label_probs']),
response.predict_log.response.outputs['predicted_label'].string_val
with beam.Pipeline() as p:
res = (
p
| beam.Create([inputs])
| RunInference(
model_spec_pb2.InferenceSpecType(
saved_model_spec=model_spec_pb2.SavedModelSpec(
model_path=model_path,
signature_name=['serving_default'])))
| beam.ParDo(extract_prediction)
上次更新於 2024/10/31
您是否找到所有您要尋找的資訊?
這些資訊是否對您有幫助且清楚?是否有任何您想更改的地方?請告訴我們!