從 Apache Spark 開始

如果您已經了解 Apache Spark,使用 Beam 應該很容易。基本概念相同,而且 API 也類似。

Spark 將結構化資料儲存在 Spark DataFrames 中,將非結構化資料儲存在 彈性分散式資料集 (RDD) 中。本指南我們使用 RDD。

Spark RDD 代表元素的集合,而在 Beam 中稱為平行集合 (PCollection)。Beam 中的 PCollection 保證任何順序。

同樣地,Beam 中的轉換稱為平行轉換 (PTransform)。

以下是一些常見操作及其在 PySpark 和 Beam 之間的對應範例。

概觀

這是一個簡單的 PySpark 管道範例,它取得從一到四的數字,將它們乘以二,將所有值加總,然後列印結果。

import pyspark

sc = pyspark.SparkContext()
result = (
    sc.parallelize([1, 2, 3, 4])
    .map(lambda x: x * 2)
    .reduce(lambda x, y: x + y)
)
print(result)

在 Beam 中,您使用管道運算子 | 將資料導入管道,例如 data | beam.Map(...),而不是像 data.map(...) 那樣串聯方法,但它們的功能相同。

以下是 Beam 中等效管道的樣子。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | beam.Create([1, 2, 3, 4])
        | beam.Map(lambda x: x * 2)
        | beam.CombineGlobally(sum)
        | beam.Map(print)
    )

ℹ️ 請注意,我們在 Map 轉換內呼叫了 print。這是因為我們只能從 PTransform 內存取 PCollection 的元素。若要在本機檢查資料,您可以使用 InteractiveRunner

另一個要注意的是,Beam 管道是以延遲方式建構的。這表示當您使用 | 導入資料時,您只宣告轉換和您希望發生的順序,但實際計算並不會發生。管道會在 with beam.Pipeline() as pipeline 內容關閉後執行。

ℹ️ 當 with beam.Pipeline() as pipeline 內容關閉時,它會隱式呼叫 pipeline.run(),觸發計算發生。

然後管道會傳送到您選擇的 執行器,並處理資料。

ℹ️ 管道可以在本機使用 DirectRunner 執行,或在分散式執行器中執行,例如 Flink、Spark 或 Dataflow。Spark 執行器與 PySpark 無關。

可以使用右移運算子 >> 將標籤選擇性地新增至轉換,例如 data | '我的描述' >> beam.Map(...)。這可以作為註解,並使您的管道更容易除錯。

這是新增標籤後管道的樣子。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create numbers' >> beam.Create([1, 2, 3, 4])
        | 'Multiply by two' >> beam.Map(lambda x: x * 2)
        | 'Sum everything' >> beam.CombineGlobally(sum)
        | 'Print results' >> beam.Map(print)
    )

設定

以下是如何在 PySpark 和 Beam 中開始使用的比較。

PySparkBeam
安裝$ pip install pyspark$ pip install apache-beam
匯入import pysparkimport apache_beam as beam
建立
本機管道
sc = pyspark.SparkContext() as sc
# 您的管道程式碼在此。
with beam.Pipeline() as pipeline
    # 您的管道程式碼在此。
建立值values = sc.parallelize([1, 2, 3, 4])values = pipeline | beam.Create([1, 2, 3, 4])
建立
鍵值對
pairs = sc.parallelize([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
pairs = pipeline | beam.Create([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
執行
本機管道
$ spark-submit spark_pipeline.py$ python beam_pipeline.py

轉換

以下是 PySpark 和 Beam 中一些常見轉換的對應項目。

PySparkBeam
Mapvalues.map(lambda x: x * 2)values | beam.Map(lambda x: x * 2)
Filtervalues.filter(lambda x: x % 2 == 0)values | beam.Filter(lambda x: x % 2 == 0)
FlatMapvalues.flatMap(lambda x: range(x))values | beam.FlatMap(lambda x: range(x))
依鍵分組pairs.groupByKey()pairs | beam.GroupByKey()
Reducevalues.reduce(lambda x, y: x+y)values | beam.CombineGlobally(sum)
依鍵縮減pairs.reduceByKey(lambda x, y: x+y)pairs | beam.CombinePerKey(sum)
Distinctvalues.distinct()values | beam.Distinct()
Countvalues.count()values | beam.combiners.Count.Globally()
依鍵計數pairs.countByKey()pairs | beam.combiners.Count.PerKey()
取最小values.takeOrdered(3)values | beam.combiners.Top.Smallest(3)
取最大values.takeOrdered(3, lambda x: -x)values | beam.combiners.Top.Largest(3)
隨機取樣values.takeSample(False, 3)values | beam.combiners.Sample.FixedSizeGlobally(3)
聯集values.union(otherValues)(values, otherValues) | beam.Flatten()
Co-grouppairs.cogroup(otherPairs){'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey()

ℹ️ 若要深入了解 Beam 中可用的轉換,請查看Python 轉換庫

使用計算值

由於我們是在潛在的分散式環境中工作,因此我們無法保證我們計算的結果在任何給定機器上都可用。

在 PySpark 中,我們可以使用 data.collect() 或其他彙總,例如 reduce()count() 等,從元素集合 (RDD) 取得結果。

以下是一個範例,可將數字縮放到零到一之間的範圍。

import pyspark

sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)

# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))

# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())

在 Beam 中,所有轉換的結果都會產生 PCollection。我們使用側邊輸入將 PCollection 輸入轉換並存取其值。

任何接受函式的轉換(例如 Map)都可以接受側邊輸入。如果我們只需要單一值,可以使用 beam.pvalue.AsSingleton,並將它們當作 Python 值存取。如果我們需要多個值,可以使用 beam.pvalue.AsIter,並將它們當作iterable存取。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    values = pipeline | beam.Create([1, 2, 3, 4])
    min_value = values | beam.CombineGlobally(min)
    max_value = values | beam.CombineGlobally(max)

    # To access `min_value` and `max_value`, we need to pass them as a side input.
    scaled_values = values | beam.Map(
        lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
        minimum=beam.pvalue.AsSingleton(min_value),
        maximum=beam.pvalue.AsSingleton(max_value),
    )

    scaled_values | beam.Map(print)

ℹ️ 在 Beam 中,我們需要明確傳遞側邊輸入,但我們的好處是縮減或彙總必放入記憶體中。延遲計算側邊輸入也允許我們只計算 values 一次,而不是針對每個不同的縮減 (或需要 RDD 的明確快取)。

下一步

如果您遇到任何問題,請隨時與我們聯絡