側邊輸入模式
此頁面的範例會向您展示常見的 Beam 側邊輸入模式。側邊輸入是您的 DoFn
在每次處理輸入 PCollection
中的元素時可以存取的額外輸入。如需更多資訊,請參閱程式設計指南中關於側邊輸入的章節。
如果您嘗試透過對遠端服務執行鍵值查找來擴充資料,您可能需要先考慮擴充轉換,它可以抽象化側邊輸入的一些細節,並提供額外的好處,例如用戶端節流。
- Java SDK
- Python SDK
緩慢更新全域視窗側邊輸入
您可以從全域視窗檢索側邊輸入,以便在具有非全域視窗(如 FixedWindow
)的管道作業中使用它們。
若要在具有非全域視窗的管道中緩慢更新全域視窗側邊輸入
撰寫一個
DoFn
,定期將資料從有界來源提取到全域視窗中。a. 使用
GenerateSequence
來源轉換定期發出一個值。b. 實例化一個資料驅動觸發器,該觸發器在每個元素上啟動,並從有界來源提取資料。
c. 觸發觸發器以將資料傳遞到全域視窗中。
建立下游轉換的側邊輸入。側邊輸入應適合放入記憶體中。
全域視窗側邊輸入在處理時間觸發,因此主管道會非確定性地將側邊輸入與事件時間中的元素相匹配。
例如,以下程式碼範例使用 Map
來建立 DoFn
。Map
變成一個 View.asSingleton
側邊輸入,該側邊輸入在每個計數器滴答時重建。側邊輸入每 5 秒更新一次,以示範工作流程。在實際情況中,側邊輸入通常每幾個小時或每天更新一次。
public static void sideInputPatterns() {
// This pipeline uses View.asSingleton for a placeholder external service.
// Run in debug mode to see the output.
Pipeline p = Pipeline.create();
// Create a side input that updates every 5 seconds.
// View as an iterable, not singleton, so that if we happen to trigger more
// than once before Latest.globally is computed we can handle both elements.
PCollectionView<Iterable<Map<String, String>>> mapIterable =
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input,
@Timestamp Instant timestamp,
OutputReceiver<Map<String, String>> o) {
// Replace map with test data from the placeholder external service.
// Add external reads here.
o.output(PlaceholderExternalService.readTestData(timestamp));
}
}))
.apply(
Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Latest.globally())
.apply(View.asIterable());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c, @Timestamp Instant timestamp) {
Iterable<Map<String, String>> si = c.sideInput(mapIterable);
// Take an element from the side input iterable (likely length 1)
Map<String, String> keyMap = si.iterator().next();
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.info(
"Value is {} with timestamp {}, using key A from side input with time {}.",
c.element(),
timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
keyMap.get("Key_A"));
}
})
.withSideInputs(mapIterable));
p.run();
}
/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {
public static Map<String, String> readTestData(Instant timestamp) {
Map<String, String> map = new HashMap<>();
map.put("Key_A", timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")));
return map;
}
}
使用視窗化緩慢更新側邊輸入
您可以定期將側邊輸入資料讀取到不同的 PCollection 視窗中。當您將側邊輸入套用到您的主要輸入時,每個主要輸入視窗都會自動與單個側邊輸入視窗相匹配。這保證了單個視窗持續時間的一致性,這表示主輸入上的每個視窗都將與單個版本的側邊輸入資料相匹配。
若要定期將側邊輸入資料讀取到不同的 PCollection 視窗中
- 使用 PeriodicImpulse 或 PeriodicSequence PTransform 來
- 在所需的處理時間間隔內產生無限的元素序列
- 將它們指派給不同的視窗。
- 使用 SDF Read 或 ReadAll PTransform 在 PCollection 元素到達時觸發以擷取資料。
- 套用側邊輸入。
PCollectionView<List<Long>> sideInput =
p.apply(
"SIImpulse",
PeriodicImpulse.create()
.startAt(startAt)
.stopAt(stopAt)
.withInterval(interval1)
.applyWindowing())
.apply(
"FileToRead",
ParDo.of(
new DoFn<Instant, String>() {
@DoFn.ProcessElement
public void process(@Element Instant notUsed, OutputReceiver<String> o) {
o.output(fileToRead);
}
}))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(TextIO.readFiles())
.apply(
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void process(@Element String src, OutputReceiver<String> o) {
o.output(src);
}
}))
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply(View.asList());
PCollection<Instant> mainInput =
p.apply(
"MIImpulse",
PeriodicImpulse.create()
.startAt(startAt.minus(Duration.standardSeconds(1)))
.stopAt(stopAt.minus(Duration.standardSeconds(1)))
.withInterval(interval2)
.applyWindowing());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
PCollection<Long> result =
mainInput.apply(
"generateOutput",
ParDo.of(
new DoFn<Instant, Long>() {
@ProcessElement
public void process(ProcessContext c) {
c.output((long) c.sideInput(sideInput).size());
}
})
.withSideInputs(sideInput));
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
# from apache_beam.utils.timestamp import MAX_TIMESTAMP
# last_timestamp = MAX_TIMESTAMP to go on indefninitely
# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
for x in rights:
yield (left, x)
# Create pipeline.
pipeline = beam.Pipeline()
side_input = (
pipeline
| 'PeriodicImpulse' >> PeriodicImpulse(
first_timestamp, last_timestamp, interval, True)
| 'MapToFileName' >> beam.Map(lambda x: src_file_pattern + str(x))
| 'ReadFromFile' >> beam.io.ReadAllFromText())
main_input = (
pipeline
| 'MpImpulse' >> beam.Create(sample_main_input_elements)
|
'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(
window.FixedWindows(main_input_windowing_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(
cross_join, rights=beam.pvalue.AsIter(side_input)))
上次更新於 2024/10/31
您是否找到了您要找的所有內容?
這些內容是否全部有用且清楚?您想更改任何內容嗎?請告訴我們!