資料探索
幾種 Apache Beam 資料處理適用於 AI/ML 專案
- 資料探索:在您開始部署專案或資料變更時,了解您的資料(屬性、分佈、統計資料)。
- 資料預處理:轉換您的資料,使其準備好用於訓練您的模型。
- 資料後處理:在執行推論之後,您可能需要轉換模型的輸出,使其有意義。
- 資料驗證:檢查您的資料品質,以偵測離群值並計算標準差和類別分佈。
資料處理可以分為兩個主要主題。此範例首先檢查資料探索,然後檢查 ML 中使用資料預處理和驗證的資料管道。不涵蓋資料後處理,因為它與預處理類似。後處理僅在管道的順序和類型上有所不同。
初始資料探索
Pandas 是一種用於執行資料探索的常用工具。Pandas 是一個用於 Python 的資料分析和操作工具。它使用 DataFrames,這是一種包含二維表格資料的資料結構,並為資料提供標記的列和欄。Apache Beam Python SDK 提供了一個 DataFrame API,用於處理類似 Pandas 的 DataFrame 物件。
Beam DataFrame API 旨在在 Apache Beam 管道中提供對熟悉程式設計介面的存取。此 API 允許您執行資料探索。您可以重複使用資料預處理管道的程式碼。使用 DataFrame API,您可以透過調用標準 Pandas 命令來建立複雜的資料處理管道。
您可以將 DataFrame API 與 Beam 互動式執行器 在 JupyterLab 筆記本 中結合使用。使用筆記本來迭代開發管道並顯示個別管道步驟的結果。
以下是在筆記本中 Apache Beam 中進行資料探索的範例
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
p = beam.Pipeline(InteractiveRunner())
beam_df = p | beam.dataframe.io.read_csv(input_path)
# Investigate columns and data types
beam_df.dtypes
# Generate descriptive statistics
ib.collect(beam_df.describe())
# Investigate missing values
ib.collect(beam_df.isnull())
如需完整端對端範例,該範例使用 Apache Beam 和 DataFrame API 為您的 AI/ML 專案實作資料探索和資料預處理,請參閱 用於 AI/ML 的 Beam DataFrame API 教學。
用於 ML 的資料管道
典型的資料預處理管道包含以下步驟
- 讀取和寫入資料:從您的檔案系統、資料庫或訊息佇列讀取和寫入資料。Apache Beam 具有豐富的 IO 連接器 集,可用於擷取和寫入資料。
- 資料清理:在 ML 模型中使用資料之前,請篩選並清理您的資料。您可能會移除重複或不相關的資料、更正資料集中的錯誤、篩選掉不需要的離群值或處理遺失的資料。
- 資料轉換:您的資料需要符合您的模型需要訓練的預期輸入。您可能需要正規化、單熱編碼、縮放或向量化您的資料。
- 資料擴充:您可能想要使用外部資料來源來擴充您的資料,以使您的資料更有意義或更容易讓 ML 模型解讀。例如,您可能想要將城市名稱或地址轉換為一組座標。
- 資料驗證與指標:確保您的資料符合一組特定的要求,這些要求可以在您的管道中進行驗證。報告您資料中的指標,例如類別分佈。
您可以使用 Apache Beam 管道來實作所有這些步驟。此範例顯示一個管道,示範先前提及的所有步驟
import apache_beam as beam
from apache_beam.metrics import Metrics
with beam.Pipeline() as pipeline:
# Create data
input_data = (
pipeline
| beam.Create([
{'age': 25, 'height': 176, 'weight': 60, 'city': 'London'},
{'age': 61, 'height': 192, 'weight': 95, 'city': 'Brussels'},
{'age': 48, 'height': 163, 'weight': None, 'city': 'Berlin'}]))
# Clean data
def filter_missing_data(row):
return row['weight'] is not None
cleaned_data = input_data | beam.Filter(filter_missing_data)
# Transform data
def scale_min_max_data(row):
row['age'] = (row['age']/100)
row['height'] = (row['height']-150)/50
row['weight'] = (row['weight']-50)/50
yield row
transformed_data = cleaned_data | beam.FlatMap(scale_min_max_data)
# Enrich data
side_input = pipeline | beam.io.ReadFromText('coordinates.csv')
def coordinates_lookup(row, coordinates):
row['coordinates'] = coordinates.get(row['city'], (0, 0))
del row['city']
yield row
enriched_data = (
transformed_data
| beam.FlatMap(coordinates_lookup, coordinates=beam.pvalue.AsDict(side_input)))
# Metrics
counter = Metrics.counter('main', 'counter')
def count_data(row):
counter.inc()
yield row
output_data = enriched_data | beam.FlatMap(count_data)
# Write data
output_data | beam.io.WriteToText('output.csv')
上次更新時間:2024/10/31
您是否找到了您要找的所有內容?
所有內容是否實用且清楚?您有任何想要更改的內容嗎?請告訴我們!