從 Apache Spark 開始
如果您已經了解 Apache Spark,使用 Beam 應該很容易。基本概念相同,而且 API 也類似。
Spark 將結構化資料儲存在 Spark DataFrames 中,將非結構化資料儲存在 彈性分散式資料集 (RDD) 中。本指南我們使用 RDD。
Spark RDD 代表元素的集合,而在 Beam 中稱為平行集合 (PCollection)。Beam 中的 PCollection 不保證任何順序。
同樣地,Beam 中的轉換稱為平行轉換 (PTransform)。
以下是一些常見操作及其在 PySpark 和 Beam 之間的對應範例。
概觀
這是一個簡單的 PySpark 管道範例,它取得從一到四的數字,將它們乘以二,將所有值加總,然後列印結果。
在 Beam 中,您使用管道運算子 |
將資料導入管道,例如 data | beam.Map(...)
,而不是像 data.map(...)
那樣串聯方法,但它們的功能相同。
以下是 Beam 中等效管道的樣子。
ℹ️ 請注意,我們在
Map
轉換內呼叫了
另一個要注意的是,Beam 管道是以延遲方式建構的。這表示當您使用 |
導入資料時,您只宣告轉換和您希望發生的順序,但實際計算並不會發生。管道會在 with beam.Pipeline() as pipeline
內容關閉後執行。
ℹ️ 當
with beam.Pipeline() as pipeline
內容關閉時,它會隱式呼叫pipeline.run()
,觸發計算發生。
然後管道會傳送到您選擇的 執行器,並處理資料。
ℹ️ 管道可以在本機使用 DirectRunner 執行,或在分散式執行器中執行,例如 Flink、Spark 或 Dataflow。Spark 執行器與 PySpark 無關。
可以使用右移運算子 >>
將標籤選擇性地新增至轉換,例如 data | '我的描述' >> beam.Map(...)
。這可以作為註解,並使您的管道更容易除錯。
這是新增標籤後管道的樣子。
設定
以下是如何在 PySpark 和 Beam 中開始使用的比較。
PySpark | Beam | |
---|---|---|
安裝 | $ pip install pyspark | $ pip install apache-beam |
匯入 | import pyspark | import apache_beam as beam |
建立 本機管道 | sc = pyspark.SparkContext() as sc # 您的管道程式碼在此。 | with beam.Pipeline() as pipeline # 您的管道程式碼在此。 |
建立值 | values = sc.parallelize([1, 2, 3, 4]) | values = pipeline | beam.Create([1, 2, 3, 4]) |
建立 鍵值對 | pairs = sc.parallelize([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) | pairs = pipeline | beam.Create([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) |
執行 本機管道 | $ spark-submit spark_pipeline.py | $ python beam_pipeline.py |
轉換
以下是 PySpark 和 Beam 中一些常見轉換的對應項目。
PySpark | Beam | |
---|---|---|
Map | values.map(lambda x: x * 2) | values | beam.Map(lambda x: x * 2) |
Filter | values.filter(lambda x: x % 2 == 0) | values | beam.Filter(lambda x: x % 2 == 0) |
FlatMap | values.flatMap(lambda x: range(x)) | values | beam.FlatMap(lambda x: range(x)) |
依鍵分組 | pairs.groupByKey() | pairs | beam.GroupByKey() |
Reduce | values.reduce(lambda x, y: x+y) | values | beam.CombineGlobally(sum) |
依鍵縮減 | pairs.reduceByKey(lambda x, y: x+y) | pairs | beam.CombinePerKey(sum) |
Distinct | values.distinct() | values | beam.Distinct() |
Count | values.count() | values | beam.combiners.Count.Globally() |
依鍵計數 | pairs.countByKey() | pairs | beam.combiners.Count.PerKey() |
取最小 | values.takeOrdered(3) | values | beam.combiners.Top.Smallest(3) |
取最大 | values.takeOrdered(3, lambda x: -x) | values | beam.combiners.Top.Largest(3) |
隨機取樣 | values.takeSample(False, 3) | values | beam.combiners.Sample.FixedSizeGlobally(3) |
聯集 | values.union(otherValues) | (values, otherValues) | beam.Flatten() |
Co-group | pairs.cogroup(otherPairs) | {'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey() |
ℹ️ 若要深入了解 Beam 中可用的轉換,請查看Python 轉換庫。
使用計算值
由於我們是在潛在的分散式環境中工作,因此我們無法保證我們計算的結果在任何給定機器上都可用。
在 PySpark 中,我們可以使用 data.collect()
或其他彙總,例如 reduce()
、count()
等,從元素集合 (RDD) 取得結果。
以下是一個範例,可將數字縮放到零到一之間的範圍。
import pyspark
sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)
# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))
# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())
在 Beam 中,所有轉換的結果都會產生 PCollection。我們使用側邊輸入將 PCollection 輸入轉換並存取其值。
任何接受函式的轉換(例如 Map
)都可以接受側邊輸入。如果我們只需要單一值,可以使用 beam.pvalue.AsSingleton
,並將它們當作 Python 值存取。如果我們需要多個值,可以使用 beam.pvalue.AsIter
,並將它們當作iterable
存取。
import apache_beam as beam
with beam.Pipeline() as pipeline:
values = pipeline | beam.Create([1, 2, 3, 4])
min_value = values | beam.CombineGlobally(min)
max_value = values | beam.CombineGlobally(max)
# To access `min_value` and `max_value`, we need to pass them as a side input.
scaled_values = values | beam.Map(
lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
minimum=beam.pvalue.AsSingleton(min_value),
maximum=beam.pvalue.AsSingleton(max_value),
)
scaled_values | beam.Map(print)
ℹ️ 在 Beam 中,我們需要明確傳遞側邊輸入,但我們的好處是縮減或彙總不必放入記憶體中。延遲計算側邊輸入也允許我們只計算
values
一次,而不是針對每個不同的縮減 (或需要 RDD 的明確快取)。
下一步
- 查看Python 轉換庫中所有可用的轉換。
- 了解如何在程式碼 I/O 區段的程式設計指南中讀取和寫入檔案。
- 在 WordCount 範例逐步解說中,逐步解說其他 WordCount 範例。
- 透過我們的 學習資源進行自我進度的導覽。
- 深入了解我們最喜歡的一些 影片和 Podcast。
- 加入 Beam users@ 通訊論壇。
- 如果您有興趣為 Apache Beam 程式碼庫貢獻,請參閱貢獻指南。
如果您遇到任何問題,請隨時與我們聯絡!
上次更新於 2024/10/31
您是否找到您要尋找的所有內容?
這些資訊是否都實用且清楚?您是否想要變更任何內容?請告訴我們!