使用 Bigtable 擴充資料

Pydoc Pydoc




在 Apache Beam 2.54.0 和更新版本中,擴充轉換包含用於 Bigtable 的內建擴充處理常式。以下範例示範如何建立一個管道,該管道將擴充轉換與 BigTableEnrichmentHandler 處理常式一起使用。

儲存在 Bigtable 叢集中的資料使用以下格式

資料列鍵product:product_idproduct:product_nameproduct:product_stock
11pixel 52
22pixel 64
33pixel 720
44pixel 810
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

project_id = 'apache-beam-testing'
instance_id = 'beam-test'
table_id = 'bigtable-enrichment-test'
row_key = 'product_id'

data = [
    beam.Row(sale_id=1, customer_id=1, product_id=1, quantity=1),
    beam.Row(sale_id=3, customer_id=3, product_id=2, quantity=3),
    beam.Row(sale_id=5, customer_id=5, product_id=4, quantity=2)
]

bigtable_handler = BigTableEnrichmentHandler(
    project_id=project_id,
    instance_id=instance_id,
    table_id=table_id,
    row_key=row_key)
with beam.Pipeline() as p:
  _ = (
      p
      | "Create" >> beam.Create(data)
      | "Enrich W/ BigTable" >> Enrichment(bigtable_handler)
      | "Print" >> beam.Map(print))

輸出

Row(sale_id=1, customer_id=1, product_id=1, quantity=1, product={'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'})
Row(sale_id=3, customer_id=3, product_id=2, quantity=3, product={'product_id': '2', 'product_name': 'pixel 6', 'product_stock': '4'})
Row(sale_id=5, customer_id=5, product_id=4, quantity=2, product={'product_id': '4', 'product_name': 'pixel 8', 'product_stock': '10'})

不適用。

Pydoc Pydoc