用於資料處理的 ML轉換

Pydoc Pydoc




使用 MLTransform 在鍵值資料上套用常見的機器學習 (ML) 處理任務。Apache Beam 提供 ML 資料處理轉換,您可以與 MLTransform 一起使用。如需可用的資料處理轉換的完整清單,請參閱 GitHub 中的 tft.py 檔案

若要使用 MLTransform 定義資料處理轉換,請建立以 columns 作為輸入參數的資料處理轉換的執行個體。指定 columns 中的資料會經過轉換,並輸出至 beam.Row 物件。

以下範例示範如何使用 MLTransform,透過使用整個資料集中最小值和最大值,將資料正規化為介於 0 和 1 之間。MLTransform 使用 ScaleTo01 轉換。

scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
  (data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))

在此範例中,MLTransform 接收 write_artifact_location 的值。然後,MLTransform 會使用此位置值來寫入轉換產生的成品。若要傳遞資料處理轉換,您可以使用 MLTransformwith_transform 方法或清單。

MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)

傳遞至 MLTransform 的轉換會依序套用至資料集。MLTransform 預期會收到字典,並傳回具有 NumPy 陣列的已轉換列物件。

範例

以下範例示範如何建立使用 MLTransform 來預處理資料的管道。

MLTransform 可以在資料集上執行完整傳遞,這在您只需要在分析整個資料集之後轉換單一元素時非常有用。前兩個範例需要在資料集上完整傳遞才能完成資料轉換。

範例 1

此範例建立一個使用 MLTransform 將資料縮放到介於 0 和 1 之間的管道。此範例會取得整數清單,並使用轉換 ScaleTo01 將其轉換為 0 到 1 的範圍。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ScaleTo01
import tempfile

data = [
    {
        'x': [1, 5, 3]
    },
    {
        'x': [4, 2, 8]
    },
]

artifact_location = tempfile.mkdtemp()
scale_to_0_1_fn = ScaleTo01(columns=['x'])

with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          scale_to_0_1_fn)
      | beam.Map(print))

輸出

Row(x=array([0.       , 0.5714286, 0.2857143], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1.        ], dtype=float32))

範例 2

此範例建立一個使用 MLTransform 在整個資料集上計算詞彙,並將索引指派給每個唯一詞彙項目的管道。它會取得字串清單、計算整個資料集的詞彙,然後將唯一索引套用至每個詞彙項目。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile

artifact_location = tempfile.mkdtemp()
data = [
    {
        'x': ['I', 'love', 'Beam']
    },
    {
        'x': ['Beam', 'is', 'awesome']
    },
]
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

輸出

Row(x=array([4, 1, 0]))
Row(x=array([0, 2, 3]))

範例 3

此範例建立一個使用 MLTransform 在整個資料集上計算詞彙,並將索引指派給每個唯一詞彙項目的管道。此管道會取得單一元素作為輸入,而不是元素清單。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
    {
        'x': 'I'
    },
    {
        'x': 'love'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'is'
    },
    {
        'x': 'awesome'
    },
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

輸出

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))