側邊輸入模式

此頁面的範例會向您展示常見的 Beam 側邊輸入模式。側邊輸入是您的 DoFn 在每次處理輸入 PCollection 中的元素時可以存取的額外輸入。如需更多資訊,請參閱程式設計指南中關於側邊輸入的章節

如果您嘗試透過對遠端服務執行鍵值查找來擴充資料,您可能需要先考慮擴充轉換,它可以抽象化側邊輸入的一些細節,並提供額外的好處,例如用戶端節流。

緩慢更新全域視窗側邊輸入

您可以從全域視窗檢索側邊輸入,以便在具有非全域視窗(如 FixedWindow)的管道作業中使用它們。

若要在具有非全域視窗的管道中緩慢更新全域視窗側邊輸入

  1. 撰寫一個 DoFn,定期將資料從有界來源提取到全域視窗中。

    a. 使用 GenerateSequence 來源轉換定期發出一個值。

    b. 實例化一個資料驅動觸發器,該觸發器在每個元素上啟動,並從有界來源提取資料。

    c. 觸發觸發器以將資料傳遞到全域視窗中。

  2. 建立下游轉換的側邊輸入。側邊輸入應適合放入記憶體中。

全域視窗側邊輸入在處理時間觸發,因此主管道會非確定性地將側邊輸入與事件時間中的元素相匹配。

例如,以下程式碼範例使用 Map 來建立 DoFnMap 變成一個 View.asSingleton 側邊輸入,該側邊輸入在每個計數器滴答時重建。側邊輸入每 5 秒更新一次,以示範工作流程。在實際情況中,側邊輸入通常每幾個小時或每天更新一次。

  public static void sideInputPatterns() {
    // This pipeline uses View.asSingleton for a placeholder external service.
    // Run in debug mode to see the output.
    Pipeline p = Pipeline.create();

    // Create a side input that updates every 5 seconds.
    // View as an iterable, not singleton, so that if we happen to trigger more
    // than once before Latest.globally is computed we can handle both elements.
    PCollectionView<Iterable<Map<String, String>>> mapIterable =
        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {

                      @ProcessElement
                      public void process(
                          @Element Long input,
                          @Timestamp Instant timestamp,
                          OutputReceiver<Map<String, String>> o) {
                        // Replace map with test data from the placeholder external service.
                        // Add external reads here.
                        o.output(PlaceholderExternalService.readTestData(timestamp));
                      }
                    }))
            .apply(
                Window.<Map<String, String>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(Latest.globally())
            .apply(View.asIterable());

    // Consume side input. GenerateSequence generates test data.
    // Use a real source (like PubSubIO or KafkaIO) in production.
    p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(Sum.longsGlobally().withoutDefaults())
        .apply(
            ParDo.of(
                    new DoFn<Long, KV<Long, Long>>() {

                      @ProcessElement
                      public void process(ProcessContext c, @Timestamp Instant timestamp) {
                        Iterable<Map<String, String>> si = c.sideInput(mapIterable);
                        // Take an element from the side input iterable (likely length 1)
                        Map<String, String> keyMap = si.iterator().next();
                        c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

                        LOG.info(
                            "Value is {} with timestamp {}, using key A from side input with time {}.",
                            c.element(),
                            timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
                            keyMap.get("Key_A"));
                      }
                    })
                .withSideInputs(mapIterable));

    p.run();
  }

  /** Placeholder class that represents an external service generating test data. */
  public static class PlaceholderExternalService {

    public static Map<String, String> readTestData(Instant timestamp) {

      Map<String, String> map = new HashMap<>();

      map.put("Key_A", timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")));

      return map;
    }
  }
No sample present.

使用視窗化緩慢更新側邊輸入

您可以定期將側邊輸入資料讀取到不同的 PCollection 視窗中。當您將側邊輸入套用到您的主要輸入時,每個主要輸入視窗都會自動與單個側邊輸入視窗相匹配。這保證了單個視窗持續時間的一致性,這表示主輸入上的每個視窗都將與單個版本的側邊輸入資料相匹配。

若要定期將側邊輸入資料讀取到不同的 PCollection 視窗中

  1. 使用 PeriodicImpulse 或 PeriodicSequence PTransform 來
    • 在所需的處理時間間隔內產生無限的元素序列
    • 將它們指派給不同的視窗。
  2. 使用 SDF Read 或 ReadAll PTransform 在 PCollection 元素到達時觸發以擷取資料。
  3. 套用側邊輸入。
PCollectionView<List<Long>> sideInput =
    p.apply(
            "SIImpulse",
            PeriodicImpulse.create()
                .startAt(startAt)
                .stopAt(stopAt)
                .withInterval(interval1)
                .applyWindowing())
        .apply(
            "FileToRead",
            ParDo.of(
                new DoFn<Instant, String>() {
                  @DoFn.ProcessElement
                  public void process(@Element Instant notUsed, OutputReceiver<String> o) {
                    o.output(fileToRead);
                  }
                }))
        .apply(FileIO.matchAll())
        .apply(FileIO.readMatches())
        .apply(TextIO.readFiles())
        .apply(
            ParDo.of(
                new DoFn<String, String>() {
                  @ProcessElement
                  public void process(@Element String src, OutputReceiver<String> o) {
                    o.output(src);
                  }
                }))
        .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
        .apply(View.asList());

PCollection<Instant> mainInput =
    p.apply(
        "MIImpulse",
        PeriodicImpulse.create()
            .startAt(startAt.minus(Duration.standardSeconds(1)))
            .stopAt(stopAt.minus(Duration.standardSeconds(1)))
            .withInterval(interval2)
            .applyWindowing());

// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
PCollection<Long> result =
    mainInput.apply(
        "generateOutput",
        ParDo.of(
                new DoFn<Instant, Long>() {
                  @ProcessElement
                  public void process(ProcessContext c) {
                    c.output((long) c.sideInput(sideInput).size());
                  }
                })
            .withSideInputs(sideInput));
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window

# from apache_beam.utils.timestamp import MAX_TIMESTAMP
# last_timestamp = MAX_TIMESTAMP to go on indefninitely

# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
  for x in rights:
    yield (left, x)

# Create pipeline.
pipeline = beam.Pipeline()
side_input = (
    pipeline
    | 'PeriodicImpulse' >> PeriodicImpulse(
        first_timestamp, last_timestamp, interval, True)
    | 'MapToFileName' >> beam.Map(lambda x: src_file_pattern + str(x))
    | 'ReadFromFile' >> beam.io.ReadAllFromText())

main_input = (
    pipeline
    | 'MpImpulse' >> beam.Create(sample_main_input_elements)
    |
    'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
    | 'WindowMpInto' >> beam.WindowInto(
        window.FixedWindows(main_input_windowing_interval)))

result = (
    main_input
    | 'ApplyCrossJoin' >> beam.FlatMap(
        cross_join, rights=beam.pvalue.AsIter(side_input)))