BigQuery ML 整合

透過此頁面的範例,我們將示範如何使用 TFX Basic Shared Libraries (tfx_bsl) 將從 BigQuery ML (BQML) 匯出的模型整合到您的 Apache Beam 管道中。

大致上,以下各節將更詳細地說明下列步驟

  1. 建立和訓練您的 BigQuery ML 模型
  2. 匯出您的 BigQuery ML 模型
  3. 建立使用全新 BigQuery ML 模型的轉換

建立和訓練您的 BigQuery ML 模型

為了能夠使用 tfx_bsl 將您的 BQML 模型納入 Apache Beam 管道,它必須採用 TensorFlow SavedModel 格式。您可以在此處找到將不同模型類型對應到其 BQML 匯出模型格式的概述。

為了簡單起見,我們將在 BQML 快速入門指南中訓練邏輯迴歸模型(簡化版),使用公開可用的 Google Analytics 範例資料集(這是一個日期分片資料表 - 或者,您可能會遇到 分割資料表)。您可以在 此處找到所有可以使用 BQML 建立的模型概述。

在建立 BigQuery 資料集之後,您可以繼續建立模型,該模型完全在 SQL 中定義

CREATE MODEL IF NOT EXISTS `bqml_tutorial.sample_model`
OPTIONS(model_type='logistic_reg', input_label_cols=["label"]) AS
SELECT
  IF(totals.transactions IS NULL, 0, 1) AS label,
  IFNULL(geoNetwork.country, "") AS country
FROM
  `bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
  _TABLE_SUFFIX BETWEEN '20160801' AND '20170630'

該模型將根據 2016-08-01 到 2017-06-30 之間收集的資料,預測是否會根據訪客的國家/地區進行購買。

匯出您的 BigQuery ML 模型

為了將您的模型納入 Apache Beam 管道中,您需要匯出它。這樣做的先決條件是安裝 bq 命令列工具建立 Google Cloud Storage 儲存貯體來儲存您匯出的模型。

使用以下命令匯出模型

bq extract -m bqml_tutorial.sample_model gs://some/gcs/path

建立使用您 BigQuery ML 模型的 Apache Beam 轉換

在本節中,我們將建構一個 Apache Beam 管道,該管道將使用我們剛建立和匯出的 BigQuery ML 模型。該模型可以使用 Google Cloud AI Platform Prediction 來服務 - 為此,請參閱AI Platform 模式。在這種情況下,我們將說明如何使用 tfx_bsl 程式庫進行本機預測(在您的 Apache Beam 工作程式上)。

首先,需要將模型下載到您將開發其餘管道的本機目錄(例如,serving_dir/sample_model/1)。

然後,您可以像平常一樣開始開發管道。我們將使用來自 tfx_bsl 程式庫的 RunInference PTransform,並且會將其指向儲存模型的本機目錄(請參閱程式碼範例中的 model_path 變數)。該轉換會以 tf.train.Example 類型的元素作為輸入,並輸出 tensorflow_serving.apis.prediction_log_pb2.PredictionLog 類型的元素。根據模型的簽名,您可以從輸出中擷取值;在我們的例子中,我們根據關於邏輯迴歸模型的檔案,在 extract_prediction 函數中擷取 label_probslabel_valuespredicted_label

import apache_beam
import tensorflow as tf
from google.protobuf import text_format
from tensorflow.python.framework import tensor_util
from tfx_bsl.beam import run_inference
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2


inputs = tf.train.Example(features=tf.train.Features(
            feature={
                'os': tf.train.Feature(bytes_list=tf.train.BytesList(b"Microsoft"))
            })
          )

model_path = "serving_dir/sample_model/1"

def extract_prediction(response):
  yield response.predict_log.response.outputs['label_values'].string_val,
        tensor_util.MakeNdarray(response.predict_log.response.outputs['label_probs']),
        response.predict_log.response.outputs['predicted_label'].string_val

with beam.Pipeline() as p:
    res = (
        p
        | beam.Create([inputs])
        | RunInference(
            model_spec_pb2.InferenceSpecType(
                saved_model_spec=model_spec_pb2.SavedModelSpec(
                    model_path=model_path,
                    signature_name=['serving_default'])))
        | beam.ParDo(extract_prediction)
Implemented in Python.