Apache Beam Java SDK 擴充功能
Join-library
Join-library 提供內部聯結、外部左聯結和外部右聯結函數。目的是將最常見的聯結簡化為簡單的函數呼叫。
這些函數是通用的,並支援任何 Beam 支援型別的聯結。聯結函數的輸入是 PCollections
的 Key
/ Value
。左側和右側的 PCollection
都需要相同的鍵型別。所有聯結函數都傳回 Key
/ Value
,其中 Key
是聯結鍵,而值是 Key
/ Value
,其中鍵是左側的值,右側是值。
對於外部聯結,使用者必須提供一個代表 null
的值,因為 null
無法序列化。
使用範例
PCollection<KV<String, String>> leftPcollection = ...
PCollection<KV<String, Long>> rightPcollection = ...
PCollection<KV<String, KV<String, Long>>> joinedPcollection =
Join.innerJoin(leftPcollection, rightPcollection);
Sorter
此模組提供 SortValues
轉換,它採用 PCollection<KV<K, Iterable<KV<K2, V>>>>
並產生 PCollection<KV<K, Iterable<KV<K2, V>>>>
,其中,對於每個主鍵 K
,配對的 Iterable<KV<K2, V>>
已按次要鍵 (K2
) 的位元組編碼排序。它是一個有效率且可擴展的迭代器排序器,即使它們很大(不適合放入記憶體)。
注意事項
- 此轉換僅執行值排序;每個鍵附帶的迭代器會被排序,但不同鍵之間沒有任何關係,因為 Beam 不支援
PCollection
中不同元素之間的任何定義關係。
- 每個
Iterable<KV<K2, V>>
都會使用本機記憶體和磁碟在單個工作程式上排序。這表示當在不同的管線中使用時,SortValues
可能會成為效能和/或擴展性的瓶頸。例如,不建議使用者在單個元素的PCollection
上使用SortValues
來全域排序大型PCollection
。如果排序溢出到磁碟,則使用的磁碟空間位元組數量的 (粗略) 估計值為numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3
。
選項
- 使用者可以藉由建立
BufferedExternalSorter.Options
的自訂實例來傳遞到SortValues.create
,以自訂如果排序需要溢出到磁碟時使用的暫存位置以及要使用的最大記憶體量。
SortValues
的使用範例
PCollection<KV<String, KV<String, Integer>>> input = ...
// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
input.apply(GroupByKey.<String, KV<String, Integer>>create());
// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(
SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));