Euphoria Java 8 DSL

什麼是 Euphoria

易於使用的 Java 8 API,建立於 Beam 的 Java SDK 之上。API 提供了資料轉換的高階抽象,專注於 Java 8 的語言特性(例如 lambda 和串流)。它與現有的 Beam SDK 完全可互操作,並且可以來回轉換。它允許透過使用(可選的)基於 Kryo 的編碼器、lambda 和高階運算子進行快速原型設計,並且可以無縫整合到現有的 Beam Pipelines 中。

Euphoria API 專案於 2014 年啟動,明確目標是為 Seznam.cz 的資料基礎架構提供主要建構區塊。2015 年,DataFlow 白皮書啟發了原始作者更進一步,並為串流和批次處理提供統一的 API。該 API 已於 2016 年開源,目前仍在積極開發中。由於 Beam 社群的目標非常相似,我們決定將該 API 作為 Beam Java SDK 的高階 DSL 貢獻出來,並與社群分享我們的努力。

Euphoria DSL 的整合仍在進行中,並在 BEAM-3900 中追蹤。

WordCount 範例

讓我們從一個小範例開始。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);

// Source of data loaded from Beam IO.
PCollection<String> input =
    pipeline
        .apply(Create.of(textLineByLine))
        .setTypeDescriptor(TypeDescriptor.of(String.class));

// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
    FlatMap.named("TOKENIZER")
        .of(lines)
        .using(
            (String line, Collector<String> context) -> {
              for (String word : Splitter.onPattern("\\s+").split(line)) {
                context.collect(word);
              }
            })
        .output();

// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
    CountByKey.named("COUNT")
        .of(words)
        .keyBy(w -> w)
        .output();

// Format output.
PCollection<String> output =
    MapElements.named("FORMAT")
        .of(counted)
        .using(p -> p.getKey() + ": " + p.getValue())
        .output();

// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
    .apply(TextIO.write()
    .to("counted_words"));

pipeline.run();

Euphoria 指南

Euphoria API 由一組運算子組成,可讓您根據應用程式需求建構 Pipeline

輸入與輸出

輸入資料可以透過 Beam 的 IO 供應到 PCollection 中,與 Beam 中的方式相同。

PCollection<String> input =
  pipeline
    .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
    .setTypeDescriptor(TypeDescriptor.of(String.class));

新增運算子

Euphoria API 的真正威力在於其運算子套件。每個運算子消耗一個或多個輸入,並產生一個輸出 PCollection。讓我們看看簡單的 MapElements 範例。

PCollection<Integer> input = ...

PCollection<String> mappedElements =
  MapElements
    .named("Int2Str")
    .of(input)
    .using(String::valueOf)
    .output();
該運算子消耗 input,它將給定的 lambda 表達式(String::valueOf)應用於 input 的每個元素,並傳回對應的 PCollection。開發人員在建立運算子時會經過一系列步驟的引導,因此運算子的宣告非常簡單。要開始建立運算子,只需寫下其名稱和 '.'(點)。您的 IDE 會提供提示。

建構任何運算子的第一步是透過 named() 方法給它一個名稱。該名稱會在系統中傳播,並且可以在以後除錯時使用。

Coders 與類型

Beam 的 Java SDK 要求開發人員為自訂元素類型提供 Coder,以便能夠具體化元素。Euphoria 允許使用 Kryo 作為序列化的方式。Kryo 位於 :sdks:java:extensions:kryo 模組中。

//gradle
dependencies {
    compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-kryo</artifactId>
  <version>${beam.version}</version>
</dependency>

您只需要建立 KryoCoderProvider 並將其註冊到您的 Pipeline。有兩種方法可以做到這一點。

在原型設計時,您可能決定不太關心編碼器,然後建立不向 Kryo 進行任何類別註冊的 KryoCoderProvider

//Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
KryoCoderProvider.of().registerTo(pipeline);
這樣的 KryoCoderProvider 將為每個非原始元素類型傳回 KryoCoder。當然,這會降低效能,因為 Kryo 無法有效地序列化未知類型的實例。但它可以提高管線開發的速度。預設情況下會啟用此行為,並且可以在透過 KryoOptions 建立 Pipeline 時停用。
PipelineOptions options = PipelineOptionsFactory.create();
options.as(KryoOptions.class).setKryoRegistrationRequired(true);

第二種更友善效能的方式是註冊 Kryo 將序列化的所有類型。有時,註冊 Kryo 自身的序列化器也是一個好主意。Euphoria 允許您透過實作自己的 KryoRegistrar 並在建立 KryoCoderProvider 時使用它來做到這一點。

//Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
options.as(KryoOptions.class).setKryoRegistrationRequired(true);

KryoCoderProvider.of(
        (kryo) -> { //KryoRegistrar of your uwn
          kryo.register(KryoSerializedElementType.class); //other may follow
        })
    .registerTo(pipeline);
Beam 使用元素的類型解析編碼器。當元素類型由 lambda 實作描述時,類型資訊在執行階段不可用。這是由於類型擦除和 lambda 表達式的動態性質所致。因此,有一種可選的方式可以在每次在運算子建構期間引入新類型時提供 TypeDescriptor
PCollection<Integer> input = ...

MapElements
  .named("Int2Str")
  .of(input)
  .using(String::valueOf, TypeDescriptors.strings())
  .output();
當使用者未提供 TypeDescriptors 時,Euphoria 運算子將使用 TypeDescriptor<Object>。因此,如果 KryoOptions 允許,KryoCoderProvider 可能會為每個未知類型的元素傳回 KryoCoder<Object>。當使用 .setKryoRegistrationRequired(true) 時,提供 TypeDescriptors 將成為強制性的。

度量與累加器

有關作業內部的統計資訊在分散式作業的開發過程中非常有用。Euphoria 稱它們為累加器。它們可以透過環境 Context 存取,該環境可以從 Collector 中取得,無論何時使用它。當預期運算子會產生零到多個輸出元素時,通常會出現這種情況。例如,在 FlatMap 的情況下。

Pipeline pipeline = ...
PCollection<String> dataset = ..

PCollection<String> mapped =
FlatMap
  .named("FlatMap1")
  .of(dataset)
  .using(
    (String value, Collector<String> context) -> {
      context.getCounter("my-counter").increment();
        context.collect(value);
    })
  .output();
MapElements 也允許透過提供 UnaryFunctionEnv(新增第二個內容引數)的實作,而不是 UnaryFunctor 來存取 Context
Pipeline pipeline = ...
PCollection<String> dataset = ...

PCollection<String> mapped =
  MapElements
    .named("MapThem")
    .of(dataset)
    .using(
      (input, context) -> {
        // use simple counter
        context.getCounter("my-counter").increment();
        return input.toLowerCase();
        })
      .output();
累加器在背景中轉換為 Beam 度量,因此它們可以以相同的方式檢視。轉換後的度量的命名空間設定為運算子的名稱。

視窗化

Euphoria 遵循與 Beam Java SDK 相同的視窗化原則。每個 shuffle 運算子(需要在網路上 shuffle 資料的運算子)都允許您設定它。與 Beam 中相同的參數是必要的。WindowFnTriggerWindowingStrategy 和其他。使用者在建構運算子時會被引導設定所有強制性和幾個可選參數,或者都不設定。視窗化會向下傳播到 Pipeline

PCollection<KV<Integer, Long>> countedElements =
  CountByKey.of(input)
      .keyBy(e -> e)
      .windowBy(FixedWindows.of(Duration.standardSeconds(1)))
      .triggeredBy(DefaultTrigger.of())
      .discardingFiredPanes()
      .withAllowedLateness(Duration.standardSeconds(5))
      .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
      .withTimestampCombiner(TimestampCombiner.EARLIEST)
      .output();

如何取得 Euphoria

Euphoria 位於 Apache Beam 專案的 dsl-euphoria 分支、beam-sdks-java-extensions-euphoria 模組中。要建構 euphoria 子專案,請呼叫

./gradlew beam-sdks-java-extensions-euphoria:build

運算子參考

運算子基本上是更高階的資料轉換,可讓您以簡單的方式建構資料處理作業的商業邏輯。本節中記錄了所有 Euphoria 運算子,包括範例。為了簡潔起見,沒有應用視窗化的範例。有關更多詳細資訊,請參閱視窗化章節

CountByKey

計算具有相同鍵的元素數量。需要輸入資料集透過給定的鍵提取器 (UnaryFunction) 映射到鍵,然後計算這些鍵的數量。輸出以 KV<K, Long> 形式發出 (K 是鍵的類型),其中每個 KV 包含鍵和輸入資料集中該鍵的元素數量。

// suppose input: [1, 2, 4, 1, 1, 3]
PCollection<KV<Integer, Long>> output =
  CountByKey.of(input)
    .keyBy(e -> e)
    .output();
// Output will contain:  [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)]

Distinct

輸出不同的元素 (基於 equals 方法)。它接受一個可選的 UnaryFunction 映射器參數,該參數將元素映射到輸出類型。

// suppose input: [1, 2, 3, 3, 2, 1]
Distinct.named("unique-integers-only")
  .of(input)
  .output();
// Output will contain:  1, 2, 3
 
帶有映射器的 Distinct
// suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
Distinct.named("unique-keys-only")
  .of(keyValueInput)
  .projected(KV::getKey)
  .output();
// Output will contain kvs with keys:  1, 3, 42 with some arbitrary values associated with given keys

Join

表示兩個 (左和右) 資料集在給定鍵上的內部聯結,產生新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftTRightT。聯結本身由使用者提供的 BinaryFunctor 執行,該函式使用來自共享相同鍵的兩個資料集的元素。並輸出聯結的結果 (OutputT)。運算子發出 KV<K, OutputT> 類型的輸出資料集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  Join.named("join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
    .output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]

LeftJoin

表示兩個 (左和右) 資料集在給定鍵上的左聯結,產生單一新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftTRightT。聯結本身由使用者提供的 BinaryFunctor 執行,該函式使用來自共享相同鍵的兩個資料集的元素,其中右側元素是可選的。並輸出聯結的結果 (OutputT)。運算子發出 KV<K, OutputT> 類型的輸出資料集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  LeftJoin.named("left-join-length-to-words")
      .of(left, right)
      .by(le -> le, String::length) // key extractors
      .using(
          (Integer l, Optional<String> r, Collector<String> c) ->
              c.collect(l + "+" + r.orElse(null)))
      .output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
Euphoria 支援針對 LeftJoin 的名為「BroadcastHashJoin」的效能優化。當聯結兩個資料集時,如果其中一個資料集可以放入記憶體中 (在 LeftJoin 中,右側資料集必須放入記憶體),廣播聯結可能會非常有效率。如何使用「廣播雜湊聯結」在 翻譯 部分中進行說明。

RightJoin

表示兩個 (左和右) 資料集在給定鍵上的右聯結,產生單一新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftTRightT。聯結本身由使用者提供的 BinaryFunctor 執行,該函式使用來自共享相同鍵的兩個資料集的元素,其中左側元素是可選的。並輸出聯結的結果 (OutputT)。運算子發出 KV<K, OutputT> 類型的輸出資料集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  RightJoin.named("right-join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using(
      (Optional<Integer> l, String r, Collector<String> c) ->
        c.collect(l.orElse(null) + "+" + r))
    .output();
    // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
    // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
    // KV(8, "null+elephant"), KV(5, "null+mouse")]
Euphoria 支援針對 RightJoin 的名為「BroadcastHashJoin」的效能優化。當聯結兩個資料集時,如果其中一個資料集可以放入記憶體中 (在 RightJoin 中,左側資料集必須放入記憶體),廣播聯結可能會非常有效率。如何使用「廣播雜湊聯結」在 翻譯 部分中進行說明。

FullJoin

表示兩個 (左和右) 資料集在給定鍵上的完整外部聯結,產生單一新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftTRightT。聯結本身由使用者提供的 BinaryFunctor 執行,該函式使用來自共享相同鍵的兩個資料集的元素,其中兩個元素都是可選的。並輸出聯結的結果 (OutputT)。運算子發出 KV<K, OutputT> 類型的輸出資料集。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  FullJoin.named("join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using(
      (Optional<Integer> l, Optional<String> r, Collector<String> c) ->
        c.collect(l.orElse(null) + "+" + r.orElse(null)))
    .output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
//  KV(1, "null+elephant"), KV(5, "null+mouse")]

MapElements

將輸入類型 InputT 的一個輸入元素轉換為另一個 (可能相同) OutputT 類型的一個輸出元素。轉換是透過使用者指定的 UnaryFunction 完成的。

// suppose inputs contains: [ 0, 1, 2, 3, 4, 5]
PCollection<String> strings =
  MapElements.named("int2str")
    .of(input)
    .using(i -> "#" + i)
    .output();
// strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"]

FlatMap

將輸入類型 InputT 的一個輸入元素轉換為零個或多個另一個 (可能相同) OutputT 類型的輸出元素。轉換是透過使用者指定的 UnaryFunctor 完成的,其中 Collector<OutputT> 用於發出輸出元素。請注意與 MapElements 的相似性,後者始終只能發出一個元素。

// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
  FlatMap.named("str2char")
    .of(words)
    .using(
      (String s, Collector<String> collector) -> {
        for (int i = 0; i < s.length(); i++) {
          char c = s.charAt(i);
          collector.collect(String.valueOf(c));
        }
      })
    .output();
// characters will contain: ["B", "r", "o", "w", "n",  "f", "o", "x", "."]
FlatMap 可用於確定元素的時間戳記。這是在建置時提供 ExtractEventTime 時間提取器的實作來完成的。有一個專用的 AssignEventTime 運算子來為元素指派時間戳記。請考慮使用它,您的程式碼可能會更具可讀性。
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
  FlatMap.named("extract-event-time")
    .of(events)
    .using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
    .eventTimeBy(SomeEventObject::getEventTimeInMillis)
    .output();
//Euphoria will now know event time for each event

Filter

Filter 會捨棄所有未通過給定條件的元素。條件由使用者以 UnaryPredicate 的實作形式提供。輸入和輸出元素的類型相同。

// suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]
PCollection<Integer> divisibleBythree =
  Filter.named("divisibleByThree").of(nums).by(e -> e % 3 == 0).output();
//divisibleBythree will contain: [ 0, 3, 6, 9]

ReduceByKey

透過使用者提供的 reduce 函數,對具有相同鍵的 InputT 類型元素執行聚合。鍵是從每個元素透過 UnaryFunction 提取的,該函式會採用輸入元素並輸出類型為 K 的鍵。元素可以選擇性地映射到 V 類型的值,它會在元素洗牌之前發生,因此它可以對效能產生正面影響。

最後,具有相同鍵的元素會由使用者定義的 ReduceFunctorReduceFunctionCombinableReduceFunction 聚合。它們在它們接受的引數數量以及輸出被解釋的方式上有所不同。ReduceFunction 基本上是一個將元素 Stream 作為輸入並輸出一個聚合結果的函數。ReduceFunctor 採用第二個 Collector,允許存取 Context。當提供 CombinableReduceFunction 時,會在洗牌之前執行部分縮減,因此透過網路傳輸的資料較少。

以下範例展示了 ReduceByKey 運算子的基本用法,包括值提取。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-counts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1)
    .reduceBy(Stream::count)
    .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

現在假設我們要使用計數器追蹤我們的 ReduceByKey 內部狀態。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1)
    .reduceBy(
      (Stream<Integer> s, Collector<Long> collector) -> {
        collector.collect(s.count());
        collector.asContext().getCounter("num-of-keys").increment();
      })
      .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

同樣的範例,但使用最佳化的可組合輸出。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will e used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1L)
    .combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
    .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
請注意,提供的 CombinableReduceFunction 必須是關聯的且可交換的,才能真正可組合。因此,它可用於在洗牌之前計算部分結果。然後將部分結果合併為一個。這就是為什麼簡單的 Stream::count 在此範例中無法運作,不像前一個範例。

Euphoria 的目標是使程式碼易於編寫和閱讀。因此,已經有一些支援以 Fold 或折疊函數的形式編寫可組合的 reduce 函數。它允許使用者僅提供縮減邏輯 (BinaryFunction) 並從中建立 CombinableReduceFunction。提供的 BinaryFunction 仍然必須是關聯的。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1L)
    .combineBy(Fold.of((l1, l2) -> l1 + l2))
    .output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

ReduceWindow

縮減 視窗 中的所有元素。運算子對應於所有元素具有相同鍵的 ReduceByKey,因此實際鍵僅由視窗定義。

//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();

PCollection<Integer> output =
  ReduceWindow.of(withEventTime)
    .combineBy(Fold.of((i1, i2) -> i1 + i2))
    .windowBy(FixedWindows.of(Duration.millis(5000)))
    .triggeredBy(DefaultTrigger.of())
    .discardingFiredPanes()
    .output();
//output will contain: [ 10, 26 ]

SumByKey

將具有相同鍵的元素加總。需要輸入資料集透過給定的鍵提取器 (UnaryFunction) 映射到鍵。透過值提取器,也是一個輸出到 LongUnaryFunction,映射到值。然後這些值會依鍵分組並加總。輸出以 KV<K, Long> 形式發出 (K 是鍵的類型),其中每個 KV 包含鍵和輸入資料集中該鍵的元素數量。

//suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
PCollection<KV<Integer, Long>> output =
  SumByKey.named("sum-odd-and-even")
    .of(input)
    .keyBy(e -> e % 2)
    .valueBy(e -> (long) e)
    .output();
// output will contain: [ KV.of(0, 20L), KV.of(1, 25L)]

Union

合併至少兩個相同類型的資料集,而不保證元素的順序。

//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
  Union.named("to-animals")
    .of(cats, rodents)
    .output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"

TopPerKey

為每個鍵發出一個評分最高的元素。類型為 K 的鍵由給定的 UnaryFunction 提取。另一個 UnaryFunction 提取器允許將輸入元素轉換為類型為 V 的值。頂部元素的選擇是基於分數,該分數是透過使用者提供的稱為分數計算器的 UnaryFunction 從每個元素獲得的。分數類型表示為 ScoreT,並且需要擴充 Comparable<ScoreT>,以便可以直接比較兩個元素的分數。輸出資料集元素的類型為 Triple<K, V, ScoreT>

// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
  TopPerKey.named("longest-animal-names")
    .of(animals)
    .keyBy(name -> name.charAt(0)) // first character is the key
    .valueBy(UnaryFunction.identity()) // value type is the same as input element type
    .scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
    .output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKey 是一個洗牌運算子,因此它允許定義視窗。

AssignEventTime

當應用 視窗 時,Euphoria 需要知道如何從元素中提取時間戳記。AssignEventTime 透過給定的 ExtractEventTime 函數的實作來告知 Euphoria 如何執行此操作。

// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
  AssignEventTime.named("extract-event-time")
    .of(events)
    .using(SomeEventObject::getEventTimeInMillis)
    .output();
//Euphoria will now know event time for each event

轉換

Euphoria API 是在 Beam Java SDK 之上建置的。該 API 在背景中會透明地轉換為 Beam 的 PTransforms

Euphoria API 轉換為 Beam Java SDK 的事實讓我們可以選擇微調轉換本身。Operator 的轉換是透過 OperatorTranslator 的實作來實現的。Euphoria 使用 TranslationProvider 來決定應使用哪個轉換器。Euphoria API 的使用者可以透過擴充 EuphoriaOptions,透過 TranslationProvider 提供自己的 OperatorTranslator。Euphoria 已經包含一些有用的實作。

TranslationProviders

GenericTranslatorProvider

一般 TranslationProvider。允許透過三種不同的方式註冊 OperatorTranslator

GenericTranslatorProvider.newBuilder()
  .register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
  .register(
    Join.class,
    (Join op) -> {
      String name = ((Optional<String>) op.getName()).orElse("");
      return name.toLowerCase().startsWith("broadcast");
    },
    new BroadcastHashJoinTranslator<>()) // register by class and predicate
  .register(
    op -> op instanceof CompositeOperator,
    new CompositeOperatorTranslator<>()) // register by predicate only
  .build();

GenericTranslatorProvider 是預設提供者,請參閱 GenericTranslatorProvider.createWithDefaultTranslators()

CompositeProvider

按照給定的順序實作 TranslationProvider 的鏈接。反過來,這允許將使用者定義的 TranslationProvider 與 Euphoria API 已經提供的 TranslationProvider 組合在一起。

CompositeProvider.of(
  CustomTranslatorProvider.of(), // first ask CustomTranslatorProvider for translator
  GenericTranslatorProvider.createWithDefaultTranslators()); // then ask default provider if needed

運算子轉換器

每個 Operator 都需要轉換為 Java Beam SDK。這是透過 OperatorTranslator 的實作來完成的。Euphoria API 包含每個隨附的 Operator 實作的轉換器。某些運算子可能具有在某些情況下適用的替代轉換。Join 通常可以有多種實作。我們在這裡只描述最有趣的實作。

BroadcastHashJoinTranslator

當一側的整個資料集適合目標執行器的記憶體時,能夠轉換 LeftJoinRightJoin。因此,它可以透過 Beam 的側輸入分發。從而提高效能。

CompositeOperatorTranslator

某些運算子是複合的。這表示它們實際上是其他運算子的包裝鏈。CompositeOperatorTranslator 可確保它們在轉換過程中被分解為基本運算子。

詳細資訊

大多數轉換都發生在 org.apache.beam.sdk.extensions.euphoria.core.translate 套件中。其中最有趣的類別是

該套件還包含每種支援的運算子類型的 OperatorTranslator 的實作 (JoinTranslatorFlatMapTranslatorReduceByKeyTranslator)。並非每個運算子都需要有自己的轉換器。它們中的一些可以由其他運算子組成。這就是為什麼運算子可以實作 CompositeOperator,這讓它們可以選擇擴充到一組其他 Euphoria 運算子。

翻譯過程的設計考量了彈性。我們希望允許不同的方式將較高層次的 Euphoria 運算子翻譯成 Beam SDK 的基本元素。這使得可以根據使用者選擇或自動獲取的一些資料知識來進一步優化效能。

不支援的功能

原始的 Euphoria 包含一些 Beam 移植版本尚未支援的功能和運算子。以下為尚未支援的功能列表