Apache Beam 行動遊戲管線範例

本節提供一系列 Apache Beam 管線範例的逐步解說,這些範例展示了比基本 WordCount 範例更複雜的功能。本節中的管線會處理來自假設的遊戲資料,使用者可以在他們的行動電話上玩遊戲。這些管線示範了越來越複雜的處理;例如,第一個管線示範如何執行批次分析作業以取得相對簡單的分數資料,而後續的管線則使用 Beam 的視窗和觸發功能來提供低延遲的資料分析以及關於使用者遊玩模式的更複雜的情報。

注意:這些範例假設您對 Beam 程式設計模型有一定的熟悉程度。如果您還沒有熟悉,我們建議您先熟悉程式設計模型文件,並在繼續之前執行基本的範例管線。另請注意,這些範例使用 Java 8 lambda 語法,因此需要 Java 8。但是,您可以使用 Java 7 建立具有相同功能的管線。

注意:這些範例假設您對 Beam 程式設計模型有一定的熟悉程度。如果您還沒有熟悉,我們建議您先熟悉程式設計模型文件,並在繼續之前執行基本的範例管線。

注意:MobileGaming 目前不適用於 Go SDK。有一個關於此的開放問題 (Issue 18806)。

每次使用者玩我們的假設行動遊戲時,他們都會產生一個資料事件。每個資料事件都包含下列資訊

當使用者完成一個遊戲例子時,他們的手機會將資料事件傳送到遊戲伺服器,資料會記錄並儲存在檔案中。一般來說,資料會在完成後立即傳送到遊戲伺服器。但是,有時網路的各個點可能會發生延遲。另一種可能的情況是,當使用者的手機與伺服器失去連線時(例如在飛機上或在網路覆蓋範圍之外),使用者會「離線」玩遊戲。當使用者的手機重新與遊戲伺服器連線時,手機會傳送所有累積的遊戲資料。在這些情況下,某些資料事件可能會延遲且順序錯誤地到達。

下圖顯示了理想情況(事件發生時即處理)與現實情況(處理之前通常會有時間延遲)。

There is often a time delay before processing events.

圖 1:X 軸表示事件時間:遊戲事件實際發生的時間。Y 軸表示處理時間:遊戲事件被處理的時間。理想情況下,事件應在其發生時立即處理,如圖中的虛線所示。但是,在現實中,情況並非如此,它看起來更像理想線上方紅色彎曲線所描繪的那樣。

遊戲伺服器接收到的資料事件可能會比使用者產生這些事件的時間晚很多。這種時間差異(稱為偏差)可能會對那些會考量每個分數產生時間的計算管線產生處理上的影響。例如,這類管線可能會追蹤一天中每個小時產生的分數,或者它們會計算使用者連續玩遊戲的時間長度 — 這兩者都取決於每個資料記錄的事件時間。

由於我們的一些範例管線使用資料檔案(例如來自遊戲伺服器的記錄)作為輸入,因此每個遊戲的事件時間戳記可能會嵌入到資料中 — 也就是說,它是每個資料記錄中的一個欄位。這些管線需要從每個資料記錄中剖析事件時間戳記,然後再從輸入檔案中讀取。

對於從無界來源讀取無界遊戲資料的管線,資料來源會將每個 PCollection 元素的固有 時間戳記 設定為適當的事件時間。

行動遊戲範例管線的複雜性各不相同,從簡單的批次分析到可以執行即時分析和濫用偵測的更複雜管線。本節將引導您完成每個範例,並示範如何使用 Beam 的視窗和觸發等功能來擴展管線的功能。

UserScore:批次中的基本分數處理

UserScore 管線是用於處理行動遊戲資料的最簡單範例。UserScore 會決定在有限資料集(例如,遊戲伺服器上儲存的一天分數)中每個使用者的總分。像 UserScore 這樣的管線最好在收集所有相關資料後定期執行。例如,UserScore 可以作為夜間作業執行,以處理當天收集的資料。

注意:有關完整的範例管線程式,請參閱 GitHub 上的 UserScore

注意:有關完整的範例管線程式,請參閱 GitHub 上的 UserScore

UserScore 的功能是什麼?

在一天的計分資料中,每個使用者 ID 可能有多個記錄(如果使用者在分析視窗中玩多次遊戲),每個記錄都有自己的分數值和時間戳記。如果我們想要決定使用者在一天中玩的所有遊戲例子的總分,我們的管線需要將每個使用者的所有記錄分組在一起。

當管線處理每個事件時,事件分數會被加入該特定使用者的總和中。

UserScore 只會剖析每個記錄中它需要的資料,特別是使用者 ID 和分數值。管線不會考量任何記錄的事件時間;它只會處理您在執行管線時指定的輸入檔案中的所有資料。

注意:若要有效地使用 UserScore 管線,您需要確保您提供的輸入資料已按所需的事件時間週期分組 — 也就是說,您指定一個輸入檔案,其中僅包含您關心的那一天中的資料。

UserScore 的基本管線流程執行以下操作

  1. 從文字檔案讀取當天的分數資料。
  2. 透過依使用者 ID 將每個遊戲事件分組並合併分數值,來總和每個唯一使用者的分數值,以取得該特定使用者的總分。
  3. 將結果資料寫入文字檔案。

下圖顯示了管線分析期間內幾位使用者的分數資料。在圖表中,每個資料點都是一個事件,該事件會產生一個使用者/分數對。

A pipeline processes score data for three users.

圖 2:三位使用者的分數資料。

此範例使用批次處理,圖表的 Y 軸表示處理時間:管線首先處理 Y 軸上較低的事件,然後處理軸上較高的事件。圖表的 X 軸表示每個遊戲事件的事件時間,如該事件的時間戳記所示。請注意,管線並未按照發生的順序(根據其時間戳記)處理圖中的各個事件。

從輸入檔案讀取分數事件後,管線會將所有這些使用者/分數對分組在一起,並將分數值總和為每個唯一使用者的總值。UserScore 會將該步驟的核心邏輯封裝為 使用者定義的複合轉換 ExtractAndSumScore

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> gameInfo) {

    return gameInfo
        .apply(
            MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
                .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
        .apply(Sum.integersPerKey());
  }
}
class ExtractAndSumScore(beam.PTransform):
  """A transform to extract key/score information and sum the scores.
  The constructor argument `field` determines whether 'team' or 'user' info is
  extracted.
  """
  def __init__(self, field):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super().__init__()
    beam.PTransform.__init__(self)
    self.field = field

  def expand(self, pcoll):
    return (
        pcoll
        | beam.Map(lambda elem: (elem[self.field], elem['score']))
        | beam.CombinePerKey(sum))

ExtractAndSumScore 的編寫方式更通用,您可以傳入您想要依其將資料分組的欄位(在我們的遊戲中,依唯一使用者或唯一團隊)。這表示我們可以在其他管線中重複使用 ExtractAndSumScore,例如,依團隊將分數資料分組。

以下是 UserScore 的主要方法,展示我們如何應用管道的所有三個步驟

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline
      .apply(TextIO.read().from(options.getInput()))
      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
      // Extract and sum username/score pairs from the event data.
      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
      .apply(
          "WriteUserScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), false));

  // Run the batch pipeline.
  pipeline.run().waitUntilFinish();
}
def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the user_score pipeline."""
  parser = argparse.ArgumentParser()

  # The default maps to two large Google Cloud Storage files (each ~12GB)
  # holding two subsequent day's worth (roughly) of data.
  parser.add_argument(
      '--input',
      type=str,
      default='gs://apache-beam-samples/game/small/gaming_data.csv',
      help='Path to the data file(s) containing game data.')
  parser.add_argument(
      '--output', type=str, required=True, help='Path to the output file(s).')

  args, pipeline_args = parser.parse_known_args(argv)

  options = PipelineOptions(pipeline_args)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  options.view_as(SetupOptions).save_main_session = save_main_session

  with beam.Pipeline(options=options) as p:

    def format_user_score_sums(user_score):
      (user, score) = user_score
      return 'user: %s, total_score: %s' % (user, score)


    (  # pylint: disable=expression-not-assigned
        p
        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
        | 'UserScore' >> UserScore()
        | 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
        | 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))

限制

如範例中所示,UserScore 管道有一些限制

從下一個管道範例開始,我們將討論如何使用 Beam 的功能來解決這些限制。

HourlyTeamScore:使用視窗在批次中進行進階處理

HourlyTeamScore 管道擴展了 UserScore 管道中使用的基本批次分析原則,並改進了它的一些限制。HourlyTeamScore 執行更精細的分析,不僅使用 Beam SDK 中的其他功能,還會考慮到遊戲資料的更多方面。例如,HourlyTeamScore 可以篩選掉不屬於相關分析期間的資料。

UserScore 一樣,HourlyTeamScore 最適合被視為在收集所有相關資料後定期執行的作業(例如每天一次)。該管道從檔案中讀取固定的資料集,並將結果寫回文字檔案寫入 Google Cloud BigQuery 表格

注意:如需完整的範例管道程式,請參閱 GitHub 上的 HourlyTeamScore

注意:如需完整的範例管道程式,請參閱 GitHub 上的 HourlyTeamScore

HourlyTeamScore 的功能是什麼?

HourlyTeamScore 計算固定資料集中(例如一天的資料)每個團隊每小時的總分。

在下面,我們將詳細了解 HourlyTeamScore 中的每一個增強功能

固定時間視窗

使用固定時間視窗可讓管道提供更好的資訊,了解資料集中事件在分析期間的累積方式。在我們的案例中,它會告訴我們每個團隊在一天中的哪個時間活躍,以及該團隊在這些時間獲得多少分數。

下圖顯示了在應用固定時間視窗後,管道如何處理一天中單一團隊的計分資料

A pipeline processes score data for two teams.

圖 3:兩個團隊的分數資料。每個團隊的分數都會根據這些分數在事件時間中發生的時間劃分為邏輯視窗。

請注意,隨著處理時間的推進,總和現在是每個視窗;每個視窗代表一天中分數發生的事件時間的一小時。

注意:如上圖所示,使用視窗會為每個間隔產生獨立的總計(在此案例中,每小時)。HourlyTeamScore 不會提供每個小時的整個資料集的執行總計,它提供僅在該小時內發生的所有事件的總分。

Beam 的視窗功能使用附加到 PCollection 每個元素的固有時間戳記資訊。因為我們希望管道根據事件時間進行視窗化,所以我們必須先提取嵌入在每個資料記錄中的時間戳記,並將其應用於分數資料的 PCollection 中的對應元素。然後,管道可以應用視窗化函數PCollection 分割成邏輯視窗。

HourlyTeamScore 使用 WithTimestampsWindow 轉換來執行這些操作。

HourlyTeamScore 使用 window.py 中找到的 FixedWindows 轉換來執行這些操作。

以下程式碼顯示了這一點

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply(
    "AddEventTimestamps",
    WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(
    "FixedWindowsTeam",
    Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
# Add an element timestamp based on the event log, and apply fixed
# windowing.
| 'AddEventTimestamps' >> beam.Map(
    lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'FixedWindowsTeam' >> beam.WindowInto(
    beam.window.FixedWindows(self.window_duration_in_seconds))

請注意,管道用來指定視窗化的轉換與實際的資料處理轉換(例如 ExtractAndSumScores)不同。此功能在設計 Beam 管道時提供了一些彈性,因為您可以使用不同的視窗化特性在資料集上執行現有的轉換。

根據事件時間進行篩選

HourlyTeamScore 使用篩選來移除資料集中時間戳記不在相關分析期間內的任何事件(即,它們不是在我們感興趣的當天產生的)。這可以防止管道錯誤地包含任何資料,例如在前一天離線產生,但在當天傳送到遊戲伺服器的資料。

它也讓管道可以包含相關的延遲資料,即時間戳記有效,但在分析期間結束後才到達的資料事件。例如,如果我們的管道截止時間是凌晨 12:00,我們可能會在凌晨 2:00 執行管道,但會篩選掉任何時間戳記指示發生在凌晨 12:00 截止時間之後的事件。在凌晨 12:01 到凌晨 2:00 之間延遲到達,但時間戳記指示發生在凌晨 12:00 截止時間之前的資料事件將包含在管道處理中。

HourlyTeamScore 使用 Filter 轉換來執行此操作。當您應用 Filter 時,您會指定一個謂詞,每個資料記錄都會與之進行比較。通過比較的資料記錄會包含在內,而未通過比較的事件則會排除在外。在我們的案例中,謂詞是我們指定的截止時間,我們只比較資料的一部分,即時間戳記欄位。

以下程式碼顯示了 HourlyTeamScore 如何使用 Filter 轉換來篩選在相關分析期間之前或之後發生的事件

.apply(
    "FilterStartTime",
    Filter.by(
        (GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply(
    "FilterEndTime",
    Filter.by(
        (GameActionInfo gInfo) -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
| 'FilterStartTime' >>
beam.Filter(lambda elem: elem['timestamp'] > self.start_timestamp)
| 'FilterEndTime' >>
beam.Filter(lambda elem: elem['timestamp'] < self.stop_timestamp)

計算每個團隊、每個視窗的分數

HourlyTeamScore 使用與 UserScore 管道相同的 ExtractAndSumScores 轉換,但會傳遞不同的鍵(團隊,而不是使用者)。此外,由於管道在將固定時間 1 小時視窗化應用於輸入資料之後應用 ExtractAndSumScores,因此資料會依團隊和視窗分組。您可以在 HourlyTeamScore 的主要方法中看到轉換的完整順序

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline
      .apply(TextIO.read().from(options.getInput()))
      // Parse the incoming data.
      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))

      // Filter out data before and after the given times so that it is not included
      // in the calculations. As we collect data in batches (say, by day), the batch for the day
      // that we want to analyze could potentially include some late-arriving data from the
      // previous day.
      // If so, we want to weed it out. Similarly, if we include data from the following day
      // (to scoop up late-arriving events from the day we're analyzing), we need to weed out
      // events that fall after the time period we want to analyze.
      // [START DocInclude_HTSFilters]
      .apply(
          "FilterStartTime",
          Filter.by(
              (GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
      .apply(
          "FilterEndTime",
          Filter.by(
              (GameActionInfo gInfo) -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
      // [END DocInclude_HTSFilters]

      // [START DocInclude_HTSAddTsAndWindow]
      // Add an element timestamp based on the event log, and apply fixed windowing.
      .apply(
          "AddEventTimestamps",
          WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
      .apply(
          "FixedWindowsTeam",
          Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
      // [END DocInclude_HTSAddTsAndWindow]

      // Extract and sum teamname/score pairs from the event data.
      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
      .apply(
          "WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), true));

  pipeline.run().waitUntilFinish();
}
class HourlyTeamScore(beam.PTransform):
  def __init__(self, start_min, stop_min, window_duration):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super().__init__()
    beam.PTransform.__init__(self)
    self.start_timestamp = str2timestamp(start_min)
    self.stop_timestamp = str2timestamp(stop_min)
    self.window_duration_in_seconds = window_duration * 60

  def expand(self, pcoll):
    return (
        pcoll
        | 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())

        # Filter out data before and after the given times so that it is not
        # included in the calculations. As we collect data in batches (say, by
        # day), the batch for the day that we want to analyze could potentially
        # include some late-arriving data from the previous day. If so, we want
        # to weed it out. Similarly, if we include data from the following day
        # (to scoop up late-arriving events from the day we're analyzing), we
        # need to weed out events that fall after the time period we want to
        # analyze.
        # [START filter_by_time_range]
        | 'FilterStartTime' >>
        beam.Filter(lambda elem: elem['timestamp'] > self.start_timestamp)
        | 'FilterEndTime' >>
        beam.Filter(lambda elem: elem['timestamp'] < self.stop_timestamp)
        # [END filter_by_time_range]

        # [START add_timestamp_and_window]
        # Add an element timestamp based on the event log, and apply fixed
        # windowing.
        | 'AddEventTimestamps' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
        | 'FixedWindowsTeam' >> beam.WindowInto(
            beam.window.FixedWindows(self.window_duration_in_seconds))
        # [END add_timestamp_and_window]

        # Extract and sum teamname/score pairs from the event data.
        | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))


def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the hourly_team_score pipeline."""
  parser = argparse.ArgumentParser()

  # The default maps to two large Google Cloud Storage files (each ~12GB)
  # holding two subsequent day's worth (roughly) of data.
  parser.add_argument(
      '--input',
      type=str,
      default='gs://apache-beam-samples/game/gaming_data*.csv',
      help='Path to the data file(s) containing game data.')
  parser.add_argument(
      '--dataset',
      type=str,
      required=True,
      help='BigQuery Dataset to write tables to. '
      'Must already exist.')
  parser.add_argument(
      '--table_name',
      default='leader_board',
      help='The BigQuery table name. Should not already exist.')
  parser.add_argument(
      '--window_duration',
      type=int,
      default=60,
      help='Numeric value of fixed window duration, in minutes')
  parser.add_argument(
      '--start_min',
      type=str,
      default='1970-01-01-00-00',
      help='String representation of the first minute after '
      'which to generate results in the format: '
      'yyyy-MM-dd-HH-mm. Any input data timestamped '
      'prior to that minute won\'t be included in the '
      'sums.')
  parser.add_argument(
      '--stop_min',
      type=str,
      default='2100-01-01-00-00',
      help='String representation of the first minute for '
      'which to generate results in the format: '
      'yyyy-MM-dd-HH-mm. Any input data timestamped '
      'after to that minute won\'t be included in the '
      'sums.')

  args, pipeline_args = parser.parse_known_args(argv)

  options = PipelineOptions(pipeline_args)

  # We also require the --project option to access --dataset
  if options.view_as(GoogleCloudOptions).project is None:
    parser.print_usage()
    print(sys.argv[0] + ': error: argument --project is required')
    sys.exit(1)

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  options.view_as(SetupOptions).save_main_session = save_main_session

  with beam.Pipeline(options=options) as p:
    (  # pylint: disable=expression-not-assigned
        p
        | 'ReadInputText' >> beam.io.ReadFromText(args.input)
        | 'HourlyTeamScore' >> HourlyTeamScore(
            args.start_min, args.stop_min, args.window_duration)
        | 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
        | 'WriteTeamScoreSums' >> WriteToBigQuery(
            args.table_name,
            args.dataset,
            {
                'team': 'STRING',
                'total_score': 'INTEGER',
                'window_start': 'STRING',
            },
            options.view_as(GoogleCloudOptions).project))

限制

如撰寫的那樣,HourlyTeamScore 仍然有限制

LeaderBoard:使用即時遊戲資料進行串流處理

我們可以幫助解決 UserScoreHourlyTeamScore 管道中存在的延遲問題的一種方法是從無界來源讀取分數資料。LeaderBoard 管道透過從產生無限量資料的無界來源讀取遊戲分數資料,而不是從遊戲伺服器上的檔案讀取,來引入串流處理。

LeaderBoard 管道也示範如何根據處理時間事件時間來處理遊戲分數資料。LeaderBoard 輸出有關個人使用者分數和團隊分數的資料,每個資料都以不同的時間範圍為準。

由於 LeaderBoard 管道在產生資料時從無界來源讀取遊戲資料,您可以將該管道視為與遊戲程序同時運行的持續作業。因此,LeaderBoard 可以提供低延遲的見解,了解使用者在任何給定時刻如何玩遊戲 — 這很有用,例如,如果我們想要提供即時的網路排行榜,讓使用者可以在玩遊戲時追蹤他們與其他使用者的進度。

注意:如需完整的範例管道程式,請參閱 GitHub 上的 LeaderBoard

注意:如需完整的範例管道程式,請參閱 GitHub 上的 LeaderBoard

LeaderBoard 的功能是什麼?

LeaderBoard 管道讀取發佈到無界來源的遊戲資料,該來源會近乎即時地產生無限量的資料,並使用該資料執行兩個單獨的處理任務

在下面,我們將詳細了解這兩個任務。

根據處理時間計算使用者分數

我們希望管道在處理時間的每十分鐘輸出每個使用者的執行總分。此計算不會考慮使用者遊戲實例何時產生實際分數;它只會輸出管道至今已接收到的該使用者所有分數的總和。延遲資料會在它恰好在管道執行時到達時包含在計算中。

因為我們希望在每次更新計算時使用管道中已到達的所有資料,所以我們讓管道考慮在單一全域視窗中的所有使用者分數資料。單一全域視窗是無界的,但我們可以透過使用處理時間 觸發,為每十分鐘的計算指定一種暫時的截止點。

當我們為單一全域視窗指定一個十分鐘的處理時間觸發器時,管道會在每次觸發器觸發時有效地取得視窗內容的「快照」。這個快照發生在收到資料後十分鐘。如果沒有資料抵達,管道會在元素抵達後 10 分鐘取得下一個「快照」。由於我們使用的是單一全域視窗,每個快照都包含*到那時為止*收集的所有資料。下圖顯示在單一全域視窗上使用處理時間觸發器的效果

A pipeline processes score data for three users.

圖 4:三位使用者的分數資料。每位使用者的分數都集中在單一的全域視窗中,並使用一個觸發器,在收到資料後十分鐘產生快照以進行輸出。

隨著處理時間的推進和更多分數被處理,觸發器會輸出每位使用者更新後的總和。

以下程式碼範例顯示 LeaderBoard 如何設定處理時間觸發器來輸出使用者分數的資料

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing. Get
 * periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
    return input
        .apply(
            "LeaderboardUserGlobalWindow",
            Window.<GameActionInfo>into(new GlobalWindows())
                // Get periodic results every ten minutes.
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
                .accumulatingFiredPanes()
                .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}
class CalculateUserScores(beam.PTransform):
  """Extract user/score pairs from the event stream using processing time, via
  global windowing. Get periodic updates on all users' running scores.
  """
  def __init__(self, allowed_lateness):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super().__init__()
    beam.PTransform.__init__(self)
    self.allowed_lateness_seconds = allowed_lateness * 60

  def expand(self, pcoll):
    # NOTE: the behavior does not exactly match the Java example
    # TODO: allowed_lateness not implemented yet in FixedWindows
    # TODO: AfterProcessingTime not implemented yet, replace AfterCount
    return (
        pcoll
        # Get periodic results every ten events.
        | 'LeaderboardUserGlobalWindows' >> beam.WindowInto(
            beam.window.GlobalWindows(),
            trigger=trigger.Repeatedly(trigger.AfterCount(10)),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
        # Extract and sum username/score pairs from the event data.
        | 'ExtractAndSumScore' >> ExtractAndSumScore('user'))

LeaderBoard視窗累加模式設定為在觸發器觸發時累加視窗窗格。此累加模式是透過在設定觸發器時呼叫 .accumulatingFiredPanes 使用 accumulation_mode=trigger.AccumulationMode.ACCUMULATING 來設定的,並導致管道將先前發出的資料與自上次觸發器觸發以來抵達的任何新資料累加在一起。這確保了 LeaderBoard 是使用者分數的執行總和,而不是個別總和的集合。

根據事件時間計算團隊分數

我們希望我們的管道在每小時的遊戲時間內也輸出每個隊伍的總分。與使用者分數計算不同,對於團隊分數,我們關心每個分數實際發生的*事件*時間,因為我們希望單獨考慮每小時的遊戲時間。我們也希望在每小時進展時提供推測性更新,並允許任何遲到資料的實例——在給定小時的資料被認為完成後才抵達的資料——包含在我們的計算中。

因為我們單獨考慮每小時,我們可以像在 HourlyTeamScore 中一樣,將固定時間視窗化應用於我們的輸入資料。為了提供推測性更新和遲到資料的更新,我們將指定其他觸發器參數。觸發器將導致每個視窗以我們指定的間隔(在本例中,每五分鐘)計算並發出結果,並且在視窗被認為「完成」後繼續觸發,以考慮遲到資料。就像使用者分數計算一樣,我們將觸發器設定為累加模式,以確保我們獲得每個小時長視窗的執行總和。

推測性更新和遲到資料的觸發器有助於解決時間偏差的問題。管道中的事件不一定按照它們根據時間戳實際發生的順序處理;它們可能會無序或遲到地抵達管道(在我們的例子中,因為它們是在使用者的手機與網路斷線時產生的)。Beam 需要一種方法來確定它何時可以合理地假設它已經擁有給定視窗中「所有」的資料:這稱為*水位線*。

在理想的世界中,所有資料都會在發生時立即處理,因此處理時間將等於(或至少與事件時間有線性關係)事件時間。然而,由於分散式系統包含一些固有的不準確性(例如我們遲報的手機),Beam 通常使用啟發式水位線。

下圖顯示了正在進行的處理時間與兩個隊伍中每個分數的事件時間之間的關係

A pipeline processes score data by team, windowed by event time.

圖 5:按隊伍劃分的分數資料,按事件時間劃分視窗。基於處理時間的觸發器會導致視窗發出推測性的早期結果,並包含遲到的結果。

圖中的虛線是「理想」的水位線:Beam 對何時可以合理地認為給定視窗中的所有資料都已抵達的概念。不規則的實線表示實際的水位線,由資料來源確定。

抵達實線水位線以上的資料是*遲到資料*——這是一個分數事件,被延遲了(可能是離線產生的),並且在它所屬的視窗關閉後才抵達。我們管道的遲到觸發器確保了此遲到資料仍然包含在總和中。

以下程式碼範例顯示 LeaderBoard 如何應用固定時間視窗化和適當的觸發器,使我們的管道執行我們想要的計算

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
    return infos
        .apply(
            "LeaderboardTeamFixedWindows",
            Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
                // We will get early (speculative) results as well as cumulative
                // processing of late data.
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(FIVE_MINUTES))
                        .withLateFirings(
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(TEN_MINUTES)))
                .withAllowedLateness(allowedLateness)
                .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}
class CalculateTeamScores(beam.PTransform):
  """Calculates scores for each team within the configured window duration.

  Extract team/score pairs from the event stream, using hour-long windows by
  default.
  """
  def __init__(self, team_window_duration, allowed_lateness):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super().__init__()
    beam.PTransform.__init__(self)
    self.team_window_duration = team_window_duration * 60
    self.allowed_lateness_seconds = allowed_lateness * 60

  def expand(self, pcoll):
    # NOTE: the behavior does not exactly match the Java example
    # TODO: allowed_lateness not implemented yet in FixedWindows
    # TODO: AfterProcessingTime not implemented yet, replace AfterCount
    return (
        pcoll
        # We will get early (speculative) results as well as cumulative
        # processing of late data.
        | 'LeaderboardTeamFixedWindows' >> beam.WindowInto(
            beam.window.FixedWindows(self.team_window_duration),
            trigger=trigger.AfterWatermark(
                trigger.AfterCount(10), trigger.AfterCount(20)),
            accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
        # Extract and sum teamname/score pairs from the event data.
        | 'ExtractAndSumScore' >> ExtractAndSumScore('team'))

總而言之,這些處理策略使我們能夠解決 UserScoreHourlyTeamScore 管道中存在的延遲和完整性問題,同時仍然使用相同的基本轉換來處理資料——事實上,這兩個計算仍然使用我們在 UserScoreHourlyTeamScore 管道中使用的相同 ExtractAndSumScore 轉換。

GameStats:濫用偵測和使用分析

雖然 LeaderBoard 示範了如何使用基本視窗化和觸發器來執行低延遲和靈活的資料分析,但我們可以使用更進階的視窗化技術來執行更全面的分析。這可能包括一些旨在偵測系統濫用(如垃圾郵件)或深入了解使用者行為的計算。GameStats 管道建立在 LeaderBoard 中的低延遲功能之上,以示範如何使用 Beam 來執行這種進階分析。

LeaderBoard 一樣,GameStats 從無界來源讀取資料。最好將其視為一個持續進行的工作,在使用者玩遊戲時提供對遊戲的深入了解。

注意: 有關完整的範例管道程式,請參閱GitHub 上的 GameStats

注意: 有關完整的範例管道程式,請參閱GitHub 上的 GameStats

GameStats 的功能是什麼?

LeaderBoard 一樣,GameStats 計算每隊每小時的總分。但是,管道還執行兩種更複雜的分析

下面,我們將更詳細地研究這些功能。

濫用偵測

假設我們遊戲中的得分取決於使用者在手機上「點擊」的速度。GameStats 的濫用偵測會分析每個使用者的分數資料,以偵測使用者是否具有異常高的「點擊率」,因此分數也異常高。這可能表示遊戲是由一個運作速度明顯快於人類的機器人所玩。

為了確定分數是否「異常」高,GameStats 會計算該固定時間視窗中每個分數的平均值,然後根據平均分數乘以任意權重因子(在我們的案例中為 2.5)來檢查每個個別分數。因此,任何超過平均值 2.5 倍的分數都被視為垃圾郵件的產物。GameStats 管道會追蹤「垃圾郵件」使用者的清單,並將這些使用者從團隊排行榜的團隊分數計算中過濾掉。

由於平均值取決於管道資料,我們需要計算它,然後在後續的 ParDo 轉換中使用該計算資料,以過濾掉超過加權值的分數。為此,我們可以將計算出的平均值作為側輸入傳遞到篩選 ParDo

以下程式碼範例顯示了處理濫用偵測的複合轉換。該轉換使用 Sum.integersPerKey 轉換來對每個使用者的所有分數求和,然後使用 Mean.globally 轉換來確定所有使用者的平均分數。一旦計算完成(作為 PCollectionView 單例),我們可以使用 .withSideInputs 將其傳遞給篩選 ParDo

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores =
        userScores.apply("UserSum", Sum.integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore =
        sumScores.apply(Values.create()).apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered =
        sumScores.apply(
            "ProcessAndFilter",
            ParDo
                // use the derived mean total score as a side input
                .of(
                    new DoFn<KV<String, Integer>, KV<String, Integer>>() {
                      private final Counter numSpammerUsers =
                          Metrics.counter("main", "SpammerUsers");

                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Integer score = c.element().getValue();
                        Double gmc = c.sideInput(globalMeanScore);
                        if (score > (gmc * SCORE_WEIGHT)) {
                          LOG.info(
                              "user "
                                  + c.element().getKey()
                                  + " spammer score "
                                  + score
                                  + " with mean "
                                  + gmc);
                          numSpammerUsers.inc();
                          c.output(c.element());
                        }
                      }
                    })
                .withSideInputs(globalMeanScore));
    return filtered;
  }
}
class CalculateSpammyUsers(beam.PTransform):
  """Filter out all but those users with a high clickrate, which we will
  consider as 'spammy' users.

  We do this by finding the mean total score per user, then using that
  information as a side input to filter out all but those user scores that are
  larger than (mean * SCORE_WEIGHT).
  """
  SCORE_WEIGHT = 2.5

  def expand(self, user_scores):
    # Get the sum of scores for each user.
    sum_scores = (user_scores | 'SumUsersScores' >> beam.CombinePerKey(sum))

    # Extract the score from each element, and use it to find the global mean.
    global_mean_score = (
        sum_scores
        | beam.Values()
        | beam.CombineGlobally(beam.combiners.MeanCombineFn())\
            .as_singleton_view())

    # Filter the user sums using the global mean.
    filtered = (
        sum_scores
        # Use the derived mean total score (global_mean_score) as a side input.
        | 'ProcessAndFilter' >> beam.Filter(
            lambda key_score, global_mean:\
                key_score[1] > global_mean * self.SCORE_WEIGHT,
            global_mean_score))
    return filtered

濫用偵測轉換會產生一個疑似為垃圾機器人的使用者視圖。在管道的稍後階段,我們使用該視圖來過濾掉任何此類使用者,當我們計算每小時的團隊分數時,再次使用側輸入機制。以下程式碼範例顯示了我們在將分數視窗化為固定視窗和提取團隊分數之間插入垃圾郵件過濾器的位置

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
    .apply(
        "WindowIntoFixedWindows",
        Window.into(
            FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
    // Filter out the detected spammer users, using the side input derived above.
    .apply(
        "FilterOutSpammers",
        ParDo.of(
                new DoFn<GameActionInfo, GameActionInfo>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    // If the user is not in the spammers Map, output the data element.
                    if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                      c.output(c.element());
                    }
                  }
                })
            .withSideInputs(spammersView))
    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
# Calculate the total score per team over fixed windows, and emit cumulative
# updates for late data. Uses the side input derived above --the set of
# suspected robots-- to filter out scores from those users from the sum.
# Write the results to BigQuery.
(  # pylint: disable=expression-not-assigned
    raw_events
    | 'WindowIntoFixedWindows' >> beam.WindowInto(
        beam.window.FixedWindows(fixed_window_duration))

    # Filter out the detected spammer users, using the side input derived
    # above
    | 'FilterOutSpammers' >> beam.Filter(
        lambda elem, spammers: elem['user'] not in spammers, spammers_view)
    # Extract and sum teamname/score pairs from the event data.
    | 'ExtractAndSumScore' >> ExtractAndSumScore('team')

分析使用模式

我們可以透過檢查每個遊戲分數的事件時間,並將具有相似事件時間的分數分組到*會話*中,來深入了解使用者何時在玩我們的遊戲以及玩了多久。GameStats 使用 Beam 內建的會話視窗化函數,根據使用者分數發生的時間將其分組到會話中。

當您設定會話視窗化時,您需要指定事件之間的*最小間隙持續時間*。所有到達時間間隔小於最小間隙持續時間的事件都會被分組到同一個視窗中。到達時間差大於間隙的事件會被分組到不同的視窗中。根據我們如何設定最小間隙持續時間,我們可以安全地假設同一個會話視窗中的分數是同一個(相對)不間斷遊戲時間的一部分。不同視窗中的分數表示使用者停止玩遊戲至少最小間隙時間,然後才稍後返回。

下圖顯示了資料在分組到會話視窗中時的外觀。與固定視窗不同,會話視窗*對於每個使用者都不同*,並且取決於每個使用者的個別遊戲模式

User sessions with a minimum gap duration.

圖 6:具有最小間隙持續時間的使用者會話。每個使用者都有不同的會話,具體取決於他們玩了多少次遊戲以及他們在遊戲之間的休息時間有多長。

我們可以利用會話視窗化的資料來確定所有使用者不間斷的平均遊戲時間長度,以及他們在每個會話期間獲得的總分。我們可以在程式碼中執行此操作,首先應用會話視窗,計算每個使用者和會話的分數總和,然後使用轉換來計算每個個別會話的長度

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
    .apply(
        "WindowIntoSessions",
        Window.<KV<String, Integer>>into(
                Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
    // For this use, we care only about the existence of the session, not any particular
    // information aggregated over it, so the following is an efficient way to do that.
    .apply(Combine.perKey(x -> 0))
    // Get the duration per session.
    .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
# Detect user sessions-- that is, a burst of activity separated by a gap
# from further activity. Find and record the mean session lengths.
# This information could help the game designers track the changing user
# engagement as their set of game changes.
(  # pylint: disable=expression-not-assigned
    user_events
    | 'WindowIntoSessions' >> beam.WindowInto(
        beam.window.Sessions(session_gap),
        timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW)

    # For this use, we care only about the existence of the session, not any
    # particular information aggregated over it, so we can just group by key
    # and assign a "dummy value" of None.
    | beam.CombinePerKey(lambda _: None)

    # Get the duration of the session
    | 'UserSessionActivity' >> beam.ParDo(UserSessionActivity())

這會給我們一組使用者會話,每個會話都附加了一個持續時間。然後,我們可以透過將資料重新視窗化為固定時間視窗,然後計算每個小時結束的所有會話的平均值,來計算*平均*會話長度

// Re-window to process groups of session sums according to when the sessions complete.
.apply(
    "WindowToExtractSessionMean",
    Window.into(
        FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(
    "WriteAvgSessionLength",
    new WriteWindowedToBigQuery<>(
        options.as(GcpOptions.class).getProject(),
        options.getDataset(),
        options.getGameStatsTablePrefix() + "_sessions",
        configureSessionWindowWrite()));
# Re-window to process groups of session sums according to when the
# sessions complete
| 'WindowToExtractSessionMean' >> beam.WindowInto(
    beam.window.FixedWindows(user_activity_window_duration))

# Find the mean session duration in each window
| beam.CombineGlobally(
    beam.combiners.MeanCombineFn()).without_defaults()
| 'FormatAvgSessionLength' >>
beam.Map(lambda elem: {'mean_duration': float(elem)})
| 'WriteAvgSessionLength' >> WriteToBigQuery(
    args.table_name + '_sessions',
    args.dataset, {
        'mean_duration': 'FLOAT',
    },
    options.view_as(GoogleCloudOptions).project))

我們可以利用產生的資訊來找出,例如,使用者一天中哪個時段玩的時間最長,或者哪一段時間更有可能看到較短的遊戲會話。

後續步驟

如果您遇到任何問題,請隨時聯繫我們