AI 平台整合模式

此頁面描述具有 Google Cloud AI Platform 轉換的管道中的常見模式。

分析文字的結構和含義

本節說明如何使用 Google Cloud Natural Language API 執行文字分析。

Beam 提供一個名為 AnnotateText 的 PTransform。轉換會採用類型為 Document 的 PCollection。每個 Document 物件包含有關文字的各種資訊。這包括內容、它是純文字還是 HTML、選用的語言提示和其他設定。AnnotateText 會產生從 API 傳回的 AnnotateTextResponse 類型的回應物件。AnnotateTextResponse 是一個 protobuf 訊息,其中包含許多屬性,其中一些屬性是複雜的結構。

以下是一個管道的範例,該管道會建立字串的記憶體中 PCollection、將每個字串變更為 Document 物件,並叫用 Natural Language API。然後,對於每個回應物件,會呼叫一個函數來擷取分析的某些結果。

features = nlp.types.AnnotateTextRequest.Features(
    extract_entities=True,
    extract_document_sentiment=True,
    extract_entity_sentiment=True,
    extract_syntax=True,
)

with beam.Pipeline() as pipeline:
  responses = (
      pipeline
      | beam.Create([
          'My experience so far has been fantastic! '
          'I\'d really recommend this product.'
      ])
      | beam.Map(lambda x: nlp.Document(x, type='PLAIN_TEXT'))
      | nlp.AnnotateText(features))

  _ = (
      responses
      | beam.Map(extract_sentiments)
      | 'Parse sentiments to JSON' >> beam.Map(json.dumps)
      | 'Write sentiments' >> beam.io.WriteToText('sentiments.txt'))

  _ = (
      responses
      | beam.Map(extract_entities)
      | 'Parse entities to JSON' >> beam.Map(json.dumps)
      | 'Write entities' >> beam.io.WriteToText('entities.txt'))

  _ = (
      responses
      | beam.Map(analyze_dependency_tree)
      | 'Parse adjacency list to JSON' >> beam.Map(json.dumps)
      | 'Write adjacency list' >> beam.io.WriteToText('adjancency_list.txt'))
AnnotateTextRequest.Features features =
    AnnotateTextRequest.Features.newBuilder()
        .setExtractEntities(true)
        .setExtractDocumentSentiment(true)
        .setExtractEntitySentiment(true)
        .setExtractSyntax(true)
        .build();
AnnotateText annotateText = AnnotateText.newBuilder().setFeatures(features).build();

PCollection<AnnotateTextResponse> responses =
    p.apply(
            Create.of(
                "My experience so far has been fantastic, "
                    + "I\'d really recommend this product."))
        .apply(
            MapElements.into(TypeDescriptor.of(Document.class))
                .via(
                    (SerializableFunction<String, Document>)
                        input ->
                            Document.newBuilder()
                                .setContent(input)
                                .setType(Document.Type.PLAIN_TEXT)
                                .build()))
        .apply(annotateText);

responses
    .apply(MapElements.into(TypeDescriptor.of(TextSentiments.class)).via(extractSentiments))
    .apply(
        MapElements.into(TypeDescriptors.strings())
            .via((SerializableFunction<TextSentiments, String>) TextSentiments::toJson))
    .apply(TextIO.write().to("sentiments.txt"));

responses
    .apply(
        MapElements.into(
                TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(extractEntities))
    .apply(MapElements.into(TypeDescriptors.strings()).via(mapEntitiesToJson))
    .apply(TextIO.write().to("entities.txt"));

responses
    .apply(
        MapElements.into(
                TypeDescriptors.lists(
                    TypeDescriptors.maps(
                        TypeDescriptors.strings(),
                        TypeDescriptors.lists(TypeDescriptors.strings()))))
            .via(analyzeDependencyTree))
    .apply(MapElements.into(TypeDescriptors.strings()).via(mapDependencyTreesToJson))
    .apply(TextIO.write().to("adjacency_list.txt"));

擷取情感

這是從 API 傳回的回應物件的一部分。可以在 sentences 屬性中找到句子層級的情感。sentences 的行為類似標準 Python 序列,因此所有核心語言功能(如反覆運算或切片)都將起作用。可以在 document_sentiment 屬性中找到整體情感。

sentences {
  text {
    content: "My experience so far has been fantastic!"
  }
  sentiment {
    magnitude: 0.8999999761581421
    score: 0.8999999761581421
  }
}
sentences {
  text {
    content: "I\'d really recommend this product."
    begin_offset: 41
  }
  sentiment {
    magnitude: 0.8999999761581421
    score: 0.8999999761581421
  }
}

...many lines omitted

document_sentiment {
  magnitude: 1.899999976158142
  score: 0.8999999761581421
}

用於擷取句子層級和文件層級情感資訊的函數顯示在下一個程式碼片段中。

return {
    'sentences': [{
        sentence.text.content: sentence.sentiment.score
    } for sentence in response.sentences],
    'document_sentiment': response.document_sentiment.score,
}
extractSentiments =
(SerializableFunction<AnnotateTextResponse, TextSentiments>)
    annotateTextResponse -> {
      TextSentiments sentiments = new TextSentiments();
      sentiments.setDocumentSentiment(
          annotateTextResponse.getDocumentSentiment().getMagnitude());
      Map<String, Float> sentenceSentimentsMap =
          annotateTextResponse.getSentencesList().stream()
              .collect(
                  Collectors.toMap(
                      (Sentence s) -> s.getText().getContent(),
                      (Sentence s) -> s.getSentiment().getMagnitude()));
      sentiments.setSentenceSentiments(sentenceSentimentsMap);
      return sentiments;
    };

程式碼片段會迴圈處理 sentences,並針對每個句子擷取情感分數。

輸出為

{"sentences": [{"My experience so far has been fantastic!": 0.8999999761581421}, {"I'd really recommend this product.": 0.8999999761581421}], "document_sentiment": 0.8999999761581421}

擷取實體

下一個函數會檢查實體的回應,並傳回這些實體的名稱和類型。

return [{
    'name': entity.name,
    'type': nlp.enums.Entity.Type(entity.type).name,
} for entity in response.entities]
extractEntities =
(SerializableFunction<AnnotateTextResponse, Map<String, String>>)
    annotateTextResponse ->
        annotateTextResponse.getEntitiesList().stream()
            .collect(
                Collectors.toMap(Entity::getName, (Entity e) -> e.getType().toString()));

可以在 entities 屬性中找到實體。就像之前一樣,entities 是一個序列,這就是為什麼列表推導式是一個可行的選擇。最棘手的部分是解釋實體的類型。Natural Language API 將實體類型定義為列舉。在回應物件中,實體類型會以整數傳回。這就是為什麼使用者必須實例化 naturallanguageml.enums.Entity.Type 才能存取人類可讀的名稱。

輸出為

[{"name": "experience", "type": "OTHER"}, {"name": "product", "type": "CONSUMER_GOOD"}]

存取句子依存樹

下列程式碼會迴圈處理句子,並針對每個句子建立一個鄰接列表,表示一個依存樹。如需依存樹的詳細資訊,請參閱形態學與依存樹

from collections import defaultdict
adjacency_lists = []

index = 0
for sentence in response.sentences:
  adjacency_list = defaultdict(list)
  sentence_begin = sentence.text.begin_offset
  sentence_end = sentence_begin + len(sentence.text.content) - 1

  while index < len(response.tokens) and \
      response.tokens[index].text.begin_offset <= sentence_end:
    token = response.tokens[index]
    head_token_index = token.dependency_edge.head_token_index
    head_token_text = response.tokens[head_token_index].text.content
    adjacency_list[head_token_text].append(token.text.content)
    index += 1
  adjacency_lists.append(adjacency_list)
analyzeDependencyTree =
    (SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>)
        response -> {
          List<Map<String, List<String>>> adjacencyLists = new ArrayList<>();
          int index = 0;
          for (Sentence s : response.getSentencesList()) {
            Map<String, List<String>> adjacencyMap = new HashMap<>();
            int sentenceBegin = s.getText().getBeginOffset();
            int sentenceEnd = sentenceBegin + s.getText().getContent().length() - 1;
            while (index < response.getTokensCount()
                && response.getTokens(index).getText().getBeginOffset() <= sentenceEnd) {
              Token token = response.getTokensList().get(index);
              int headTokenIndex = token.getDependencyEdge().getHeadTokenIndex();
              String headTokenContent =
                  response.getTokens(headTokenIndex).getText().getContent();
              List<String> adjacencyList =
                  adjacencyMap.getOrDefault(headTokenContent, new ArrayList<>());
              adjacencyList.add(token.getText().getContent());
              adjacencyMap.put(headTokenContent, adjacencyList);
              index++;
            }
            adjacencyLists.add(adjacencyMap);
          }
          return adjacencyLists;
        };

輸出如下。為了提高可讀性,索引會替換為它們所指的文字

[
  {
    "experience": [
      "My"
    ],
    "been": [
      "experience",
      "far",
      "has",
      "been",
      "fantastic",
      "!"
    ],
    "far": [
      "so"
    ]
  },
  {
    "recommend": [
      "I",
      "'d",
      "really",
      "recommend",
      "product",
      "."
    ],
    "product": [
      "this"
    ]
  }
]

取得預測

本節說明如何使用 Google Cloud AI Platform Prediction,針對雲端託管機器學習模型的新資料進行預測。

tfx_bsl 是一個具有名為 RunInference 的 Beam PTransform 的程式庫。RunInference 能夠執行推論,該推論可以使用外部服務端點來接收資料。使用服務端點時,轉換會採用類型為 tf.train.Example 的 PCollection,並針對每批元素,將請求傳送至 AI Platform Prediction。會自動計算批次的大小。如需 Beam 如何找到最佳批次大小的詳細資訊,請參閱 BatchElements 的文件字串。目前,轉換不支援使用 tf.train.SequenceExample 作為輸入,但工作正在進行中。

轉換會產生類型為 PredictionLog 的 PCollection,其中包含預測。

在開始之前,請將 TensorFlow 模型部署到 AI Platform Prediction。雲端服務會管理以有效率且可擴展的方式處理預測請求所需之基礎架構。請注意,轉換僅支援 TensorFlow 模型。如需詳細資訊,請參閱匯出 SavedModel 以進行預測

部署機器學習模型後,準備一份要取得預測的執行個體清單。若要傳送二進位資料,請確定輸入的名稱以 _bytes 結尾。這會在傳送請求之前,將資料進行 base64 編碼。

範例

以下是一個管道的範例,該管道會從檔案讀取輸入執行個體、將 JSON 物件轉換為 tf.train.Example 物件,並將資料傳送至 AI Platform Prediction。檔案的內容可以如下所示

{"input": "the quick brown"}
{"input": "la bruja le"}

此範例會建立 tf.train.BytesList 執行個體,因此它需要位元組式字串作為輸入。但是,轉換也支援其他資料類型,例如 tf.train.FloatListtf.train.Int64List

這是程式碼

import json

import apache_beam as beam

import tensorflow as tf
from tfx_bsl.beam.run_inference import RunInference
from tfx_bsl.proto import model_spec_pb2

def convert_json_to_tf_example(json_obj):
  samples = json.loads(json_obj)
  for name, text in samples.items():
      value = tf.train.Feature(bytes_list=tf.train.BytesList(
        value=[text.encode('utf-8')]))
      feature = {name: value}
      return tf.train.Example(features=tf.train.Features(feature=feature))

with beam.Pipeline() as p:
     _ = (p
         | beam.io.ReadFromText('gs://my-bucket/samples.json')
         | beam.Map(convert_json_to_tf_example)
         | RunInference(
             model_spec_pb2.InferenceEndpoint(
                 model_endpoint_spec=model_spec_pb2.AIPlatformPredictionModelSpec(
                     project_id='my-project-id',
                     model_name='my-model-name',
                     version_name='my-model-version'))))
// Getting predictions is not yet available for Java. [https://github.com/apache/beam/issues/20001]