Euphoria Java 8 DSL
什麼是 Euphoria
易於使用的 Java 8 API,建立於 Beam 的 Java SDK 之上。API 提供了資料轉換的高階抽象,專注於 Java 8 的語言特性(例如 lambda 和串流)。它與現有的 Beam SDK 完全可互操作,並且可以來回轉換。它允許透過使用(可選的)基於 Kryo 的編碼器、lambda 和高階運算子進行快速原型設計,並且可以無縫整合到現有的 Beam Pipelines
中。
Euphoria API 專案於 2014 年啟動,明確目標是為 Seznam.cz 的資料基礎架構提供主要建構區塊。2015 年,DataFlow 白皮書啟發了原始作者更進一步,並為串流和批次處理提供統一的 API。該 API 已於 2016 年開源,目前仍在積極開發中。由於 Beam 社群的目標非常相似,我們決定將該 API 作為 Beam Java SDK 的高階 DSL 貢獻出來,並與社群分享我們的努力。
Euphoria DSL 的整合仍在進行中,並在 BEAM-3900 中追蹤。
WordCount 範例
讓我們從一個小範例開始。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);
// Source of data loaded from Beam IO.
PCollection<String> input =
pipeline
.apply(Create.of(textLineByLine))
.setTypeDescriptor(TypeDescriptor.of(String.class));
// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
FlatMap.named("TOKENIZER")
.of(lines)
.using(
(String line, Collector<String> context) -> {
for (String word : Splitter.onPattern("\\s+").split(line)) {
context.collect(word);
}
})
.output();
// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
CountByKey.named("COUNT")
.of(words)
.keyBy(w -> w)
.output();
// Format output.
PCollection<String> output =
MapElements.named("FORMAT")
.of(counted)
.using(p -> p.getKey() + ": " + p.getValue())
.output();
// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
.apply(TextIO.write()
.to("counted_words"));
pipeline.run();
Euphoria 指南
Euphoria API 由一組運算子組成,可讓您根據應用程式需求建構 Pipeline
。
輸入與輸出
輸入資料可以透過 Beam 的 IO 供應到 PCollection
中,與 Beam 中的方式相同。
新增運算子
Euphoria API 的真正威力在於其運算子套件。每個運算子消耗一個或多個輸入,並產生一個輸出 PCollection
。讓我們看看簡單的 MapElements
範例。
input
,它將給定的 lambda 表達式(String::valueOf
)應用於 input
的每個元素,並傳回對應的 PCollection
。開發人員在建立運算子時會經過一系列步驟的引導,因此運算子的宣告非常簡單。要開始建立運算子,只需寫下其名稱和 '.'(點)。您的 IDE 會提供提示。建構任何運算子的第一步是透過 named()
方法給它一個名稱。該名稱會在系統中傳播,並且可以在以後除錯時使用。
Coders 與類型
Beam 的 Java SDK 要求開發人員為自訂元素類型提供 Coder
,以便能夠具體化元素。Euphoria 允許使用 Kryo 作為序列化的方式。Kryo 位於 :sdks:java:extensions:kryo
模組中。
//gradle
dependencies {
compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-kryo</artifactId>
<version>${beam.version}</version>
</dependency>
您只需要建立 KryoCoderProvider
並將其註冊到您的 Pipeline
。有兩種方法可以做到這一點。
在原型設計時,您可能決定不太關心編碼器,然後建立不向 Kryo 進行任何類別註冊的 KryoCoderProvider
。
KryoCoderProvider
將為每個非原始元素類型傳回 KryoCoder
。當然,這會降低效能,因為 Kryo 無法有效地序列化未知類型的實例。但它可以提高管線開發的速度。預設情況下會啟用此行為,並且可以在透過 KryoOptions
建立 Pipeline
時停用。第二種更友善效能的方式是註冊 Kryo 將序列化的所有類型。有時,註冊 Kryo 自身的序列化器也是一個好主意。Euphoria 允許您透過實作自己的 KryoRegistrar
並在建立 KryoCoderProvider
時使用它來做到這一點。
TypeDescriptor
。TypeDescriptors
時,Euphoria 運算子將使用 TypeDescriptor<Object>
。因此,如果 KryoOptions
允許,KryoCoderProvider
可能會為每個未知類型的元素傳回 KryoCoder<Object>
。當使用 .setKryoRegistrationRequired(true)
時,提供 TypeDescriptors
將成為強制性的。度量與累加器
有關作業內部的統計資訊在分散式作業的開發過程中非常有用。Euphoria 稱它們為累加器。它們可以透過環境 Context
存取,該環境可以從 Collector
中取得,無論何時使用它。當預期運算子會產生零到多個輸出元素時,通常會出現這種情況。例如,在 FlatMap
的情況下。
MapElements
也允許透過提供 UnaryFunctionEnv
(新增第二個內容引數)的實作,而不是 UnaryFunctor
來存取 Context
。視窗化
Euphoria 遵循與 Beam Java SDK 相同的視窗化原則。每個 shuffle 運算子(需要在網路上 shuffle 資料的運算子)都允許您設定它。與 Beam 中相同的參數是必要的。WindowFn
、Trigger
、WindowingStrategy
和其他。使用者在建構運算子時會被引導設定所有強制性和幾個可選參數,或者都不設定。視窗化會向下傳播到 Pipeline
。
PCollection<KV<Integer, Long>> countedElements =
CountByKey.of(input)
.keyBy(e -> e)
.windowBy(FixedWindows.of(Duration.standardSeconds(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(5))
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.output();
如何取得 Euphoria
Euphoria 位於 Apache Beam 專案的 dsl-euphoria
分支、beam-sdks-java-extensions-euphoria
模組中。要建構 euphoria
子專案,請呼叫
./gradlew beam-sdks-java-extensions-euphoria:build
運算子參考
運算子基本上是更高階的資料轉換,可讓您以簡單的方式建構資料處理作業的商業邏輯。本節中記錄了所有 Euphoria 運算子,包括範例。為了簡潔起見,沒有應用視窗化的範例。有關更多詳細資訊,請參閱視窗化章節。
CountByKey
計算具有相同鍵的元素數量。需要輸入資料集透過給定的鍵提取器 (UnaryFunction
) 映射到鍵,然後計算這些鍵的數量。輸出以 KV<K, Long>
形式發出 (K
是鍵的類型),其中每個 KV
包含鍵和輸入資料集中該鍵的元素數量。
Distinct
輸出不同的元素 (基於 equals 方法)。它接受一個可選的 UnaryFunction
映射器參數,該參數將元素映射到輸出類型。
Distinct
。Join
表示兩個 (左和右) 資料集在給定鍵上的內部聯結,產生新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftT
和 RightT
。聯結本身由使用者提供的 BinaryFunctor
執行,該函式使用來自共享相同鍵的兩個資料集的元素。並輸出聯結的結果 (OutputT
)。運算子發出 KV<K, OutputT>
類型的輸出資料集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
Join.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
表示兩個 (左和右) 資料集在給定鍵上的左聯結,產生單一新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftT
和 RightT
。聯結本身由使用者提供的 BinaryFunctor
執行,該函式使用來自共享相同鍵的兩個資料集的元素,其中右側元素是可選的。並輸出聯結的結果 (OutputT
)。運算子發出 KV<K, OutputT>
類型的輸出資料集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
LeftJoin.named("left-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Integer l, Optional<String> r, Collector<String> c) ->
c.collect(l + "+" + r.orElse(null)))
.output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
的名為「BroadcastHashJoin」的效能優化。當聯結兩個資料集時,如果其中一個資料集可以放入記憶體中 (在 LeftJoin
中,右側資料集必須放入記憶體),廣播聯結可能會非常有效率。如何使用「廣播雜湊聯結」在 翻譯 部分中進行說明。RightJoin
表示兩個 (左和右) 資料集在給定鍵上的右聯結,產生單一新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftT
和 RightT
。聯結本身由使用者提供的 BinaryFunctor
執行,該函式使用來自共享相同鍵的兩個資料集的元素,其中左側元素是可選的。並輸出聯結的結果 (OutputT
)。運算子發出 KV<K, OutputT>
類型的輸出資料集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
RightJoin.named("right-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, String r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
// KV(8, "null+elephant"), KV(5, "null+mouse")]
RightJoin
的名為「BroadcastHashJoin」的效能優化。當聯結兩個資料集時,如果其中一個資料集可以放入記憶體中 (在 RightJoin
中,左側資料集必須放入記憶體),廣播聯結可能會非常有效率。如何使用「廣播雜湊聯結」在 翻譯 部分中進行說明。FullJoin
表示兩個 (左和右) 資料集在給定鍵上的完整外部聯結,產生單一新的資料集。鍵是從兩個資料集透過單獨的提取器提取的,因此左側和右側的元素可以具有不同的類型,分別表示為 LeftT
和 RightT
。聯結本身由使用者提供的 BinaryFunctor
執行,該函式使用來自共享相同鍵的兩個資料集的元素,其中兩個元素都是可選的。並輸出聯結的結果 (OutputT
)。運算子發出 KV<K, OutputT>
類型的輸出資料集。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
FullJoin.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, Optional<String> r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r.orElse(null)))
.output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
// KV(1, "null+elephant"), KV(5, "null+mouse")]
MapElements
將輸入類型 InputT
的一個輸入元素轉換為另一個 (可能相同) OutputT
類型的一個輸出元素。轉換是透過使用者指定的 UnaryFunction
完成的。
FlatMap
將輸入類型 InputT
的一個輸入元素轉換為零個或多個另一個 (可能相同) OutputT
類型的輸出元素。轉換是透過使用者指定的 UnaryFunctor
完成的,其中 Collector<OutputT>
用於發出輸出元素。請注意與 MapElements
的相似性,後者始終只能發出一個元素。
// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
FlatMap.named("str2char")
.of(words)
.using(
(String s, Collector<String> collector) -> {
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
collector.collect(String.valueOf(c));
}
})
.output();
// characters will contain: ["B", "r", "o", "w", "n", "f", "o", "x", "."]
FlatMap
可用於確定元素的時間戳記。這是在建置時提供 ExtractEventTime
時間提取器的實作來完成的。有一個專用的 AssignEventTime
運算子來為元素指派時間戳記。請考慮使用它,您的程式碼可能會更具可讀性。// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
FlatMap.named("extract-event-time")
.of(events)
.using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
.eventTimeBy(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
Filter
Filter
會捨棄所有未通過給定條件的元素。條件由使用者以 UnaryPredicate
的實作形式提供。輸入和輸出元素的類型相同。
ReduceByKey
透過使用者提供的 reduce 函數,對具有相同鍵的 InputT
類型元素執行聚合。鍵是從每個元素透過 UnaryFunction
提取的,該函式會採用輸入元素並輸出類型為 K
的鍵。元素可以選擇性地映射到 V
類型的值,它會在元素洗牌之前發生,因此它可以對效能產生正面影響。
最後,具有相同鍵的元素會由使用者定義的 ReduceFunctor
、ReduceFunction
或 CombinableReduceFunction
聚合。它們在它們接受的引數數量以及輸出被解釋的方式上有所不同。ReduceFunction
基本上是一個將元素 Stream
作為輸入並輸出一個聚合結果的函數。ReduceFunctor
採用第二個 Collector
,允許存取 Context
。當提供 CombinableReduceFunction
時,會在洗牌之前執行部分縮減,因此透過網路傳輸的資料較少。
以下範例展示了 ReduceByKey
運算子的基本用法,包括值提取。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-counts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(Stream::count)
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
現在假設我們要使用計數器追蹤我們的 ReduceByKey
內部狀態。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(
(Stream<Integer> s, Collector<Long> collector) -> {
collector.collect(s.count());
collector.asContext().getCounter("num-of-keys").increment();
})
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
同樣的範例,但使用最佳化的可組合輸出。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will e used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
CombinableReduceFunction
必須是關聯的且可交換的,才能真正可組合。因此,它可用於在洗牌之前計算部分結果。然後將部分結果合併為一個。這就是為什麼簡單的 Stream::count
在此範例中無法運作,不像前一個範例。Euphoria 的目標是使程式碼易於編寫和閱讀。因此,已經有一些支援以 Fold
或折疊函數的形式編寫可組合的 reduce 函數。它允許使用者僅提供縮減邏輯 (BinaryFunction
) 並從中建立 CombinableReduceFunction
。提供的 BinaryFunction
仍然必須是關聯的。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(Fold.of((l1, l2) -> l1 + l2))
.output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
ReduceWindow
縮減 視窗 中的所有元素。運算子對應於所有元素具有相同鍵的 ReduceByKey
,因此實際鍵僅由視窗定義。
//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();
PCollection<Integer> output =
ReduceWindow.of(withEventTime)
.combineBy(Fold.of((i1, i2) -> i1 + i2))
.windowBy(FixedWindows.of(Duration.millis(5000)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.output();
//output will contain: [ 10, 26 ]
SumByKey
將具有相同鍵的元素加總。需要輸入資料集透過給定的鍵提取器 (UnaryFunction
) 映射到鍵。透過值提取器,也是一個輸出到 Long
的 UnaryFunction
,映射到值。然後這些值會依鍵分組並加總。輸出以 KV<K, Long>
形式發出 (K
是鍵的類型),其中每個 KV
包含鍵和輸入資料集中該鍵的元素數量。
Union
合併至少兩個相同類型的資料集,而不保證元素的順序。
//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
Union.named("to-animals")
.of(cats, rodents)
.output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"
TopPerKey
為每個鍵發出一個評分最高的元素。類型為 K
的鍵由給定的 UnaryFunction
提取。另一個 UnaryFunction
提取器允許將輸入元素轉換為類型為 V
的值。頂部元素的選擇是基於分數,該分數是透過使用者提供的稱為分數計算器的 UnaryFunction
從每個元素獲得的。分數類型表示為 ScoreT
,並且需要擴充 Comparable<ScoreT>
,以便可以直接比較兩個元素的分數。輸出資料集元素的類型為 Triple<K, V, ScoreT>
。
// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
TopPerKey.named("longest-animal-names")
.of(animals)
.keyBy(name -> name.charAt(0)) // first character is the key
.valueBy(UnaryFunction.identity()) // value type is the same as input element type
.scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
.output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKey
是一個洗牌運算子,因此它允許定義視窗。AssignEventTime
當應用 視窗 時,Euphoria 需要知道如何從元素中提取時間戳記。AssignEventTime
透過給定的 ExtractEventTime
函數的實作來告知 Euphoria 如何執行此操作。
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
AssignEventTime.named("extract-event-time")
.of(events)
.using(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
轉換
Euphoria API 是在 Beam Java SDK 之上建置的。該 API 在背景中會透明地轉換為 Beam 的 PTransforms
。
Euphoria API 轉換為 Beam Java SDK 的事實讓我們可以選擇微調轉換本身。Operator
的轉換是透過 OperatorTranslator
的實作來實現的。Euphoria 使用 TranslationProvider
來決定應使用哪個轉換器。Euphoria API 的使用者可以透過擴充 EuphoriaOptions
,透過 TranslationProvider
提供自己的 OperatorTranslator
。Euphoria 已經包含一些有用的實作。
TranslationProviders
GenericTranslatorProvider
一般 TranslationProvider
。允許透過三種不同的方式註冊 OperatorTranslator
。
- 透過運算子類別註冊特定於運算子的轉換器。
- 透過運算子類別和使用者定義的額外述詞註冊特定於運算子的轉換器。
- 透過使用者定義的述詞註冊一般 (不特定於一種運算子類型) 轉換器。註冊順序很重要,因為
GenericTranslatorProvider
會傳回第一個合適的轉換器。
GenericTranslatorProvider.newBuilder()
.register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
.register(
Join.class,
(Join op) -> {
String name = ((Optional<String>) op.getName()).orElse("");
return name.toLowerCase().startsWith("broadcast");
},
new BroadcastHashJoinTranslator<>()) // register by class and predicate
.register(
op -> op instanceof CompositeOperator,
new CompositeOperatorTranslator<>()) // register by predicate only
.build();
GenericTranslatorProvider
是預設提供者,請參閱 GenericTranslatorProvider.createWithDefaultTranslators()
。
CompositeProvider
按照給定的順序實作 TranslationProvider
的鏈接。反過來,這允許將使用者定義的 TranslationProvider
與 Euphoria API 已經提供的 TranslationProvider
組合在一起。
運算子轉換器
每個 Operator
都需要轉換為 Java Beam SDK。這是透過 OperatorTranslator
的實作來完成的。Euphoria API 包含每個隨附的 Operator
實作的轉換器。某些運算子可能具有在某些情況下適用的替代轉換。Join
通常可以有多種實作。我們在這裡只描述最有趣的實作。
BroadcastHashJoinTranslator
當一側的整個資料集適合目標執行器的記憶體時,能夠轉換 LeftJoin
和 RightJoin
。因此,它可以透過 Beam 的側輸入分發。從而提高效能。
CompositeOperatorTranslator
某些運算子是複合的。這表示它們實際上是其他運算子的包裝鏈。CompositeOperatorTranslator
可確保它們在轉換過程中被分解為基本運算子。
詳細資訊
大多數轉換都發生在 org.apache.beam.sdk.extensions.euphoria.core.translate
套件中。其中最有趣的類別是
OperatorTranslator
- 定義 Euphoria 到 Beam 轉換的內部 API 的介面。TranslatorProvider
- 提供自訂轉換器的方法。OperatorTransform
- 管理 Euphoria 運算子到 Beam 的PTransform
的實際轉換和/或擴充。EuphoriaOptions
- 一個PipelineOptions
,允許設定自訂的TranslatorProvider
。
該套件還包含每種支援的運算子類型的 OperatorTranslator
的實作 (JoinTranslator
、FlatMapTranslator
、ReduceByKeyTranslator
)。並非每個運算子都需要有自己的轉換器。它們中的一些可以由其他運算子組成。這就是為什麼運算子可以實作 CompositeOperator
,這讓它們可以選擇擴充到一組其他 Euphoria 運算子。
翻譯過程的設計考量了彈性。我們希望允許不同的方式將較高層次的 Euphoria 運算子翻譯成 Beam SDK 的基本元素。這使得可以根據使用者選擇或自動獲取的一些資料知識來進一步優化效能。
不支援的功能
原始的 Euphoria 包含一些 Beam 移植版本尚未支援的功能和運算子。以下為尚未支援的功能列表
- 原始 Euphoria 中的
ReduceByKey
允許對輸出值(每個鍵)進行排序。這也尚未翻譯成 Beam,因此不支援。