資料探索

幾種 Apache Beam 資料處理適用於 AI/ML 專案

資料處理可以分為兩個主要主題。此範例首先檢查資料探索,然後檢查 ML 中使用資料預處理和驗證的資料管道。不涵蓋資料後處理,因為它與預處理類似。後處理僅在管道的順序和類型上有所不同。

初始資料探索

Pandas 是一種用於執行資料探索的常用工具。Pandas 是一個用於 Python 的資料分析和操作工具。它使用 DataFrames,這是一種包含二維表格資料的資料結構,並為資料提供標記的列和欄。Apache Beam Python SDK 提供了一個 DataFrame API,用於處理類似 Pandas 的 DataFrame 物件。

Beam DataFrame API 旨在在 Apache Beam 管道中提供對熟悉程式設計介面的存取。此 API 允許您執行資料探索。您可以重複使用資料預處理管道的程式碼。使用 DataFrame API,您可以透過調用標準 Pandas 命令來建立複雜的資料處理管道。

您可以將 DataFrame API 與 Beam 互動式執行器JupyterLab 筆記本 中結合使用。使用筆記本來迭代開發管道並顯示個別管道步驟的結果。

以下是在筆記本中 Apache Beam 中進行資料探索的範例

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

p = beam.Pipeline(InteractiveRunner())
beam_df = p | beam.dataframe.io.read_csv(input_path)

# Investigate columns and data types
beam_df.dtypes

# Generate descriptive statistics
ib.collect(beam_df.describe())

# Investigate missing values
ib.collect(beam_df.isnull())

如需完整端對端範例,該範例使用 Apache Beam 和 DataFrame API 為您的 AI/ML 專案實作資料探索和資料預處理,請參閱 用於 AI/ML 的 Beam DataFrame API 教學

用於 ML 的資料管道

典型的資料預處理管道包含以下步驟

  1. 讀取和寫入資料:從您的檔案系統、資料庫或訊息佇列讀取和寫入資料。Apache Beam 具有豐富的 IO 連接器 集,可用於擷取和寫入資料。
  2. 資料清理:在 ML 模型中使用資料之前,請篩選並清理您的資料。您可能會移除重複或不相關的資料、更正資料集中的錯誤、篩選掉不需要的離群值或處理遺失的資料。
  3. 資料轉換:您的資料需要符合您的模型需要訓練的預期輸入。您可能需要正規化、單熱編碼、縮放或向量化您的資料。
  4. 資料擴充:您可能想要使用外部資料來源來擴充您的資料,以使您的資料更有意義或更容易讓 ML 模型解讀。例如,您可能想要將城市名稱或地址轉換為一組座標。
  5. 資料驗證與指標:確保您的資料符合一組特定的要求,這些要求可以在您的管道中進行驗證。報告您資料中的指標,例如類別分佈。

您可以使用 Apache Beam 管道來實作所有這些步驟。此範例顯示一個管道,示範先前提及的所有步驟

import apache_beam as beam
from apache_beam.metrics import Metrics

with beam.Pipeline() as pipeline:
  # Create data
  input_data = (
      pipeline
      | beam.Create([
         {'age': 25, 'height': 176, 'weight': 60, 'city': 'London'},
         {'age': 61, 'height': 192, 'weight': 95, 'city': 'Brussels'},
         {'age': 48, 'height': 163, 'weight': None, 'city': 'Berlin'}]))

  # Clean data
  def filter_missing_data(row):
    return row['weight'] is not None

  cleaned_data = input_data | beam.Filter(filter_missing_data)

  # Transform data
  def scale_min_max_data(row):
    row['age'] = (row['age']/100)
    row['height'] = (row['height']-150)/50
    row['weight'] = (row['weight']-50)/50
    yield row

  transformed_data = cleaned_data | beam.FlatMap(scale_min_max_data)

  # Enrich data
  side_input = pipeline | beam.io.ReadFromText('coordinates.csv')
  def coordinates_lookup(row, coordinates):
    row['coordinates'] = coordinates.get(row['city'], (0, 0))
    del row['city']
    yield row

  enriched_data = (
      transformed_data
      | beam.FlatMap(coordinates_lookup, coordinates=beam.pvalue.AsDict(side_input)))

  # Metrics
  counter = Metrics.counter('main', 'counter')

  def count_data(row):
    counter.inc()
    yield row

  output_data = enriched_data | beam.FlatMap(count_data)

  # Write data
  output_data | beam.io.WriteToText('output.csv')