CoGroupByKey

Javadoc Javadoc


依據其鍵聚合所有輸入元素,並允許下游處理使用與該鍵相關聯的所有值。雖然 GroupByKey 會對單一輸入集合執行此操作,因此僅對單一類型的輸入值執行此操作,但 CoGroupByKey 會對多個輸入集合進行操作。因此,每個鍵的結果是在每個輸入集合中與該鍵相關聯的值的元組。

請參閱 Beam 程式設計指南 以取得更多資訊。

範例

範例 1:假設您有兩個不同的檔案包含使用者資料;一個檔案具有名稱和電子郵件地址,另一個檔案具有名稱和電話號碼。

您可以使用使用者名稱作為共同鍵,其他資料作為相關聯的值來聯結這兩個資料集。聯結後,您會有一個資料集,其中包含與每個名稱相關聯的所有資訊(電子郵件地址和電話號碼)。

PCollection<KV<UID, Integer>> pt1 = /* ... */;
PCollection<KV<UID, String>> pt2 = /* ... */;

final TupleTag<Integer> t1 = new TupleTag<>();
final TupleTag<String> t2 = new TupleTag<>();
PCollection<KV<UID, CoGBKResult>> result =
  KeyedPCollectionTuple.of(t1, pt1).and(t2, pt2)
    .apply(CoGroupByKey.create());
result.apply(ParDo.of(new DoFn<KV<K, CoGbkResult>, /* some result */>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<K, CoGbkResult> e = c.element();
    CoGbkResult result = e.getValue();
    // Retrieve all integers associated with this key from pt1
    Iterable<Integer> allIntegers = result.getAll(t1);
    // Retrieve the string associated with this key from pt2.
    // Note: This will fail if multiple values had the same key in pt2.
    String string = e.getOnly(t2);
    ...
}));

範例 2