Apache Beam 行動遊戲管線範例
- Java SDK
- Python SDK
本節提供一系列 Apache Beam 管線範例的逐步解說,這些範例展示了比基本 WordCount 範例更複雜的功能。本節中的管線會處理來自假設的遊戲資料,使用者可以在他們的行動電話上玩遊戲。這些管線示範了越來越複雜的處理;例如,第一個管線示範如何執行批次分析作業以取得相對簡單的分數資料,而後續的管線則使用 Beam 的視窗和觸發功能來提供低延遲的資料分析以及關於使用者遊玩模式的更複雜的情報。
注意:這些範例假設您對 Beam 程式設計模型有一定的熟悉程度。如果您還沒有熟悉,我們建議您先熟悉程式設計模型文件,並在繼續之前執行基本的範例管線。另請注意,這些範例使用 Java 8 lambda 語法,因此需要 Java 8。但是,您可以使用 Java 7 建立具有相同功能的管線。
注意:這些範例假設您對 Beam 程式設計模型有一定的熟悉程度。如果您還沒有熟悉,我們建議您先熟悉程式設計模型文件,並在繼續之前執行基本的範例管線。
注意:MobileGaming 目前不適用於 Go SDK。有一個關於此的開放問題 (Issue 18806)。
每次使用者玩我們的假設行動遊戲時,他們都會產生一個資料事件。每個資料事件都包含下列資訊
- 正在玩遊戲的使用者的唯一 ID。
- 使用者所屬團隊的團隊 ID。
- 該特定遊戲例子的分數值。
- 記錄特定遊戲例子發生時間的時間戳記 – 這是每個遊戲資料事件的事件時間。
當使用者完成一個遊戲例子時,他們的手機會將資料事件傳送到遊戲伺服器,資料會記錄並儲存在檔案中。一般來說,資料會在完成後立即傳送到遊戲伺服器。但是,有時網路的各個點可能會發生延遲。另一種可能的情況是,當使用者的手機與伺服器失去連線時(例如在飛機上或在網路覆蓋範圍之外),使用者會「離線」玩遊戲。當使用者的手機重新與遊戲伺服器連線時,手機會傳送所有累積的遊戲資料。在這些情況下,某些資料事件可能會延遲且順序錯誤地到達。
下圖顯示了理想情況(事件發生時即處理)與現實情況(處理之前通常會有時間延遲)。
圖 1:X 軸表示事件時間:遊戲事件實際發生的時間。Y 軸表示處理時間:遊戲事件被處理的時間。理想情況下,事件應在其發生時立即處理,如圖中的虛線所示。但是,在現實中,情況並非如此,它看起來更像理想線上方紅色彎曲線所描繪的那樣。
遊戲伺服器接收到的資料事件可能會比使用者產生這些事件的時間晚很多。這種時間差異(稱為偏差)可能會對那些會考量每個分數產生時間的計算管線產生處理上的影響。例如,這類管線可能會追蹤一天中每個小時產生的分數,或者它們會計算使用者連續玩遊戲的時間長度 — 這兩者都取決於每個資料記錄的事件時間。
由於我們的一些範例管線使用資料檔案(例如來自遊戲伺服器的記錄)作為輸入,因此每個遊戲的事件時間戳記可能會嵌入到資料中 — 也就是說,它是每個資料記錄中的一個欄位。這些管線需要從每個資料記錄中剖析事件時間戳記,然後再從輸入檔案中讀取。
對於從無界來源讀取無界遊戲資料的管線,資料來源會將每個 PCollection 元素的固有 時間戳記 設定為適當的事件時間。
行動遊戲範例管線的複雜性各不相同,從簡單的批次分析到可以執行即時分析和濫用偵測的更複雜管線。本節將引導您完成每個範例,並示範如何使用 Beam 的視窗和觸發等功能來擴展管線的功能。
UserScore:批次中的基本分數處理
UserScore
管線是用於處理行動遊戲資料的最簡單範例。UserScore
會決定在有限資料集(例如,遊戲伺服器上儲存的一天分數)中每個使用者的總分。像 UserScore
這樣的管線最好在收集所有相關資料後定期執行。例如,UserScore
可以作為夜間作業執行,以處理當天收集的資料。
注意:有關完整的範例管線程式,請參閱 GitHub 上的 UserScore。
注意:有關完整的範例管線程式,請參閱 GitHub 上的 UserScore。
UserScore 的功能是什麼?
在一天的計分資料中,每個使用者 ID 可能有多個記錄(如果使用者在分析視窗中玩多次遊戲),每個記錄都有自己的分數值和時間戳記。如果我們想要決定使用者在一天中玩的所有遊戲例子的總分,我們的管線需要將每個使用者的所有記錄分組在一起。
當管線處理每個事件時,事件分數會被加入該特定使用者的總和中。
UserScore
只會剖析每個記錄中它需要的資料,特別是使用者 ID 和分數值。管線不會考量任何記錄的事件時間;它只會處理您在執行管線時指定的輸入檔案中的所有資料。
注意:若要有效地使用
UserScore
管線,您需要確保您提供的輸入資料已按所需的事件時間週期分組 — 也就是說,您指定一個輸入檔案,其中僅包含您關心的那一天中的資料。
UserScore
的基本管線流程執行以下操作
- 從文字檔案讀取當天的分數資料。
- 透過依使用者 ID 將每個遊戲事件分組並合併分數值,來總和每個唯一使用者的分數值,以取得該特定使用者的總分。
- 將結果資料寫入文字檔案。
下圖顯示了管線分析期間內幾位使用者的分數資料。在圖表中,每個資料點都是一個事件,該事件會產生一個使用者/分數對。

圖 2:三位使用者的分數資料。
此範例使用批次處理,圖表的 Y 軸表示處理時間:管線首先處理 Y 軸上較低的事件,然後處理軸上較高的事件。圖表的 X 軸表示每個遊戲事件的事件時間,如該事件的時間戳記所示。請注意,管線並未按照發生的順序(根據其時間戳記)處理圖中的各個事件。
從輸入檔案讀取分數事件後,管線會將所有這些使用者/分數對分組在一起,並將分數值總和為每個唯一使用者的總值。UserScore
會將該步驟的核心邏輯封裝為 使用者定義的複合轉換 ExtractAndSumScore
public static class ExtractAndSumScore
extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
private final String field;
ExtractAndSumScore(String field) {
this.field = field;
}
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> gameInfo) {
return gameInfo
.apply(
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
.apply(Sum.integersPerKey());
}
}
class ExtractAndSumScore(beam.PTransform):
"""A transform to extract key/score information and sum the scores.
The constructor argument `field` determines whether 'team' or 'user' info is
extracted.
"""
def __init__(self, field):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
self.field = field
def expand(self, pcoll):
return (
pcoll
| beam.Map(lambda elem: (elem[self.field], elem['score']))
| beam.CombinePerKey(sum))
ExtractAndSumScore
的編寫方式更通用,您可以傳入您想要依其將資料分組的欄位(在我們的遊戲中,依唯一使用者或唯一團隊)。這表示我們可以在其他管線中重複使用 ExtractAndSumScore
,例如,依團隊將分數資料分組。
以下是 UserScore
的主要方法,展示我們如何應用管道的所有三個步驟
public static void main(String[] args) throws Exception {
// Begin constructing a pipeline configured by commandline flags.
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
// Read events from a text file and parse them.
pipeline
.apply(TextIO.read().from(options.getInput()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
.apply(
"WriteUserScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), false));
// Run the batch pipeline.
pipeline.run().waitUntilFinish();
}
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the user_score pipeline."""
parser = argparse.ArgumentParser()
# The default maps to two large Google Cloud Storage files (each ~12GB)
# holding two subsequent day's worth (roughly) of data.
parser.add_argument(
'--input',
type=str,
default='gs://apache-beam-samples/game/small/gaming_data.csv',
help='Path to the data file(s) containing game data.')
parser.add_argument(
'--output', type=str, required=True, help='Path to the output file(s).')
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=options) as p:
def format_user_score_sums(user_score):
(user, score) = user_score
return 'user: %s, total_score: %s' % (user, score)
( # pylint: disable=expression-not-assigned
p
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'UserScore' >> UserScore()
| 'FormatUserScoreSums' >> beam.Map(format_user_score_sums)
| 'WriteUserScoreSums' >> beam.io.WriteToText(args.output))
限制
如範例中所示,UserScore
管道有一些限制
由於某些分數資料可能由離線玩家產生,並在每日截止時間後傳送,因此對於遊戲資料,
UserScore
管道產生的結果資料可能不完整。UserScore
只會在管道執行時處理輸入檔案中存在的固定輸入集。UserScore
會在處理時間處理輸入檔案中存在的所有資料事件,並且不會根據事件時間檢查或以其他方式錯誤檢查事件。因此,結果可能包含一些事件時間超出相關分析期間的值,例如前一天的延遲記錄。由於
UserScore
僅在收集所有資料後執行,因此在使用者產生資料事件(事件時間)和計算結果(處理時間)之間存在高延遲。UserScore
也只會報告整天的總結果,而不會提供有關一天中資料如何累積的任何更細緻的資訊。
從下一個管道範例開始,我們將討論如何使用 Beam 的功能來解決這些限制。
HourlyTeamScore:使用視窗在批次中進行進階處理
HourlyTeamScore
管道擴展了 UserScore
管道中使用的基本批次分析原則,並改進了它的一些限制。HourlyTeamScore
執行更精細的分析,不僅使用 Beam SDK 中的其他功能,還會考慮到遊戲資料的更多方面。例如,HourlyTeamScore
可以篩選掉不屬於相關分析期間的資料。
與 UserScore
一樣,HourlyTeamScore
最適合被視為在收集所有相關資料後定期執行的作業(例如每天一次)。該管道從檔案中讀取固定的資料集,並將結果寫回文字檔案寫入 Google Cloud BigQuery 表格。
注意:如需完整的範例管道程式,請參閱 GitHub 上的 HourlyTeamScore。
注意:如需完整的範例管道程式,請參閱 GitHub 上的 HourlyTeamScore。
HourlyTeamScore 的功能是什麼?
HourlyTeamScore
計算固定資料集中(例如一天的資料)每個團隊每小時的總分。
HourlyTeamScore
並非一次對整個資料集進行操作,而是將輸入資料分割成邏輯視窗,並在這些視窗上執行計算。這允許HourlyUserScore
提供每個視窗的計分資料資訊,其中每個視窗代表固定時間間隔(例如每小時一次)的遊戲分數進度。HourlyTeamScore
會根據資料事件的事件時間(如內嵌時間戳記所示)是否在相關分析期間內,來篩選資料事件。基本上,該管道會檢查每個遊戲事件的時間戳記,並確保它在我們要分析的範圍內(在此案例中為當天)。前幾天的資料事件將被捨棄,不包含在分數總計中。這使得HourlyTeamScore
比UserScore
更強大,更不容易產生錯誤的結果資料。它也允許該管道處理時間戳記在相關分析期間內的延遲到達資料。
在下面,我們將詳細了解 HourlyTeamScore
中的每一個增強功能
固定時間視窗
使用固定時間視窗可讓管道提供更好的資訊,了解資料集中事件在分析期間的累積方式。在我們的案例中,它會告訴我們每個團隊在一天中的哪個時間活躍,以及該團隊在這些時間獲得多少分數。
下圖顯示了在應用固定時間視窗後,管道如何處理一天中單一團隊的計分資料

圖 3:兩個團隊的分數資料。每個團隊的分數都會根據這些分數在事件時間中發生的時間劃分為邏輯視窗。
請注意,隨著處理時間的推進,總和現在是每個視窗;每個視窗代表一天中分數發生的事件時間的一小時。
注意:如上圖所示,使用視窗會為每個間隔產生獨立的總計(在此案例中,每小時)。
HourlyTeamScore
不會提供每個小時的整個資料集的執行總計,它提供僅在該小時內發生的所有事件的總分。
Beam 的視窗功能使用附加到 PCollection
每個元素的固有時間戳記資訊。因為我們希望管道根據事件時間進行視窗化,所以我們必須先提取嵌入在每個資料記錄中的時間戳記,並將其應用於分數資料的 PCollection
中的對應元素。然後,管道可以應用視窗化函數將 PCollection
分割成邏輯視窗。
HourlyTeamScore
使用 WithTimestamps 和 Window 轉換來執行這些操作。
HourlyTeamScore
使用 window.py 中找到的 FixedWindows
轉換來執行這些操作。
以下程式碼顯示了這一點
請注意,管道用來指定視窗化的轉換與實際的資料處理轉換(例如 ExtractAndSumScores
)不同。此功能在設計 Beam 管道時提供了一些彈性,因為您可以使用不同的視窗化特性在資料集上執行現有的轉換。
根據事件時間進行篩選
HourlyTeamScore
使用篩選來移除資料集中時間戳記不在相關分析期間內的任何事件(即,它們不是在我們感興趣的當天產生的)。這可以防止管道錯誤地包含任何資料,例如在前一天離線產生,但在當天傳送到遊戲伺服器的資料。
它也讓管道可以包含相關的延遲資料,即時間戳記有效,但在分析期間結束後才到達的資料事件。例如,如果我們的管道截止時間是凌晨 12:00,我們可能會在凌晨 2:00 執行管道,但會篩選掉任何時間戳記指示發生在凌晨 12:00 截止時間之後的事件。在凌晨 12:01 到凌晨 2:00 之間延遲到達,但時間戳記指示發生在凌晨 12:00 截止時間之前的資料事件將包含在管道處理中。
HourlyTeamScore
使用 Filter
轉換來執行此操作。當您應用 Filter
時,您會指定一個謂詞,每個資料記錄都會與之進行比較。通過比較的資料記錄會包含在內,而未通過比較的事件則會排除在外。在我們的案例中,謂詞是我們指定的截止時間,我們只比較資料的一部分,即時間戳記欄位。
以下程式碼顯示了 HourlyTeamScore
如何使用 Filter
轉換來篩選在相關分析期間之前或之後發生的事件
計算每個團隊、每個視窗的分數
HourlyTeamScore
使用與 UserScore
管道相同的 ExtractAndSumScores
轉換,但會傳遞不同的鍵(團隊,而不是使用者)。此外,由於管道在將固定時間 1 小時視窗化應用於輸入資料之後應用 ExtractAndSumScores
,因此資料會依團隊和視窗分組。您可以在 HourlyTeamScore
的主要方法中看到轉換的完整順序
public static void main(String[] args) throws Exception {
// Begin constructing a pipeline configured by commandline flags.
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
// Read 'gaming' events from a text file.
pipeline
.apply(TextIO.read().from(options.getInput()))
// Parse the incoming data.
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Filter out data before and after the given times so that it is not included
// in the calculations. As we collect data in batches (say, by day), the batch for the day
// that we want to analyze could potentially include some late-arriving data from the
// previous day.
// If so, we want to weed it out. Similarly, if we include data from the following day
// (to scoop up late-arriving events from the day we're analyzing), we need to weed out
// events that fall after the time period we want to analyze.
// [START DocInclude_HTSFilters]
.apply(
"FilterStartTime",
Filter.by(
(GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply(
"FilterEndTime",
Filter.by(
(GameActionInfo gInfo) -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
// [END DocInclude_HTSFilters]
// [START DocInclude_HTSAddTsAndWindow]
// Add an element timestamp based on the event log, and apply fixed windowing.
.apply(
"AddEventTimestamps",
WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(
"FixedWindowsTeam",
Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
// [END DocInclude_HTSAddTsAndWindow]
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
.apply(
"WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), true));
pipeline.run().waitUntilFinish();
}
class HourlyTeamScore(beam.PTransform):
def __init__(self, start_min, stop_min, window_duration):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
self.start_timestamp = str2timestamp(start_min)
self.stop_timestamp = str2timestamp(stop_min)
self.window_duration_in_seconds = window_duration * 60
def expand(self, pcoll):
return (
pcoll
| 'ParseGameEventFn' >> beam.ParDo(ParseGameEventFn())
# Filter out data before and after the given times so that it is not
# included in the calculations. As we collect data in batches (say, by
# day), the batch for the day that we want to analyze could potentially
# include some late-arriving data from the previous day. If so, we want
# to weed it out. Similarly, if we include data from the following day
# (to scoop up late-arriving events from the day we're analyzing), we
# need to weed out events that fall after the time period we want to
# analyze.
# [START filter_by_time_range]
| 'FilterStartTime' >>
beam.Filter(lambda elem: elem['timestamp'] > self.start_timestamp)
| 'FilterEndTime' >>
beam.Filter(lambda elem: elem['timestamp'] < self.stop_timestamp)
# [END filter_by_time_range]
# [START add_timestamp_and_window]
# Add an element timestamp based on the event log, and apply fixed
# windowing.
| 'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'FixedWindowsTeam' >> beam.WindowInto(
beam.window.FixedWindows(self.window_duration_in_seconds))
# [END add_timestamp_and_window]
# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the hourly_team_score pipeline."""
parser = argparse.ArgumentParser()
# The default maps to two large Google Cloud Storage files (each ~12GB)
# holding two subsequent day's worth (roughly) of data.
parser.add_argument(
'--input',
type=str,
default='gs://apache-beam-samples/game/gaming_data*.csv',
help='Path to the data file(s) containing game data.')
parser.add_argument(
'--dataset',
type=str,
required=True,
help='BigQuery Dataset to write tables to. '
'Must already exist.')
parser.add_argument(
'--table_name',
default='leader_board',
help='The BigQuery table name. Should not already exist.')
parser.add_argument(
'--window_duration',
type=int,
default=60,
help='Numeric value of fixed window duration, in minutes')
parser.add_argument(
'--start_min',
type=str,
default='1970-01-01-00-00',
help='String representation of the first minute after '
'which to generate results in the format: '
'yyyy-MM-dd-HH-mm. Any input data timestamped '
'prior to that minute won\'t be included in the '
'sums.')
parser.add_argument(
'--stop_min',
type=str,
default='2100-01-01-00-00',
help='String representation of the first minute for '
'which to generate results in the format: '
'yyyy-MM-dd-HH-mm. Any input data timestamped '
'after to that minute won\'t be included in the '
'sums.')
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
# We also require the --project option to access --dataset
if options.view_as(GoogleCloudOptions).project is None:
parser.print_usage()
print(sys.argv[0] + ': error: argument --project is required')
sys.exit(1)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=options) as p:
( # pylint: disable=expression-not-assigned
p
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
| 'HourlyTeamScore' >> HourlyTeamScore(
args.start_min, args.stop_min, args.window_duration)
| 'TeamScoresDict' >> beam.ParDo(TeamScoresDict())
| 'WriteTeamScoreSums' >> WriteToBigQuery(
args.table_name,
args.dataset,
{
'team': 'STRING',
'total_score': 'INTEGER',
'window_start': 'STRING',
},
options.view_as(GoogleCloudOptions).project))
限制
如撰寫的那樣,HourlyTeamScore
仍然有限制
HourlyTeamScore
在資料事件發生(事件時間)和產生結果(處理時間)之間仍然存在高延遲,因為作為批次管道,它需要等待所有資料事件都存在後才能開始處理。
LeaderBoard:使用即時遊戲資料進行串流處理
我們可以幫助解決 UserScore
和 HourlyTeamScore
管道中存在的延遲問題的一種方法是從無界來源讀取分數資料。LeaderBoard
管道透過從產生無限量資料的無界來源讀取遊戲分數資料,而不是從遊戲伺服器上的檔案讀取,來引入串流處理。
LeaderBoard
管道也示範如何根據處理時間和事件時間來處理遊戲分數資料。LeaderBoard
輸出有關個人使用者分數和團隊分數的資料,每個資料都以不同的時間範圍為準。
由於 LeaderBoard
管道在產生資料時從無界來源讀取遊戲資料,您可以將該管道視為與遊戲程序同時運行的持續作業。因此,LeaderBoard
可以提供低延遲的見解,了解使用者在任何給定時刻如何玩遊戲 — 這很有用,例如,如果我們想要提供即時的網路排行榜,讓使用者可以在玩遊戲時追蹤他們與其他使用者的進度。
注意:如需完整的範例管道程式,請參閱 GitHub 上的 LeaderBoard。
注意:如需完整的範例管道程式,請參閱 GitHub 上的 LeaderBoard。
LeaderBoard 的功能是什麼?
LeaderBoard
管道讀取發佈到無界來源的遊戲資料,該來源會近乎即時地產生無限量的資料,並使用該資料執行兩個單獨的處理任務
LeaderBoard
計算每個唯一使用者的總分,並為處理時間的每十分鐘發佈推測性結果。也就是說,在接收到資料十分鐘後,管道會輸出管道至今已處理的每個使用者的總分。此計算提供接近即時的執行「排行榜」,無論實際的遊戲事件何時產生。LeaderBoard
計算管道執行的每小時的團隊分數。例如,如果我們想獎勵每小時遊戲得分最高的團隊,這會很有用。團隊分數計算會使用固定時間視窗,根據資料到達管道時的事件時間(由時間戳記指示),將輸入資料分割成一小時長的有限視窗。此外,團隊分數計算會使用 Beam 的觸發機制,為每小時提供推測性結果(每五分鐘更新一次,直到該小時結束),並捕獲任何延遲資料並將其新增到所屬的特定一小時視窗。
在下面,我們將詳細了解這兩個任務。
根據處理時間計算使用者分數
我們希望管道在處理時間的每十分鐘輸出每個使用者的執行總分。此計算不會考慮使用者遊戲實例何時產生實際分數;它只會輸出管道至今已接收到的該使用者所有分數的總和。延遲資料會在它恰好在管道執行時到達時包含在計算中。
因為我們希望在每次更新計算時使用管道中已到達的所有資料,所以我們讓管道考慮在單一全域視窗中的所有使用者分數資料。單一全域視窗是無界的,但我們可以透過使用處理時間 觸發,為每十分鐘的計算指定一種暫時的截止點。
當我們為單一全域視窗指定一個十分鐘的處理時間觸發器時,管道會在每次觸發器觸發時有效地取得視窗內容的「快照」。這個快照發生在收到資料後十分鐘。如果沒有資料抵達,管道會在元素抵達後 10 分鐘取得下一個「快照」。由於我們使用的是單一全域視窗,每個快照都包含*到那時為止*收集的所有資料。下圖顯示在單一全域視窗上使用處理時間觸發器的效果

圖 4:三位使用者的分數資料。每位使用者的分數都集中在單一的全域視窗中,並使用一個觸發器,在收到資料後十分鐘產生快照以進行輸出。
隨著處理時間的推進和更多分數被處理,觸發器會輸出每位使用者更新後的總和。
以下程式碼範例顯示 LeaderBoard
如何設定處理時間觸發器來輸出使用者分數的資料
/**
* Extract user/score pairs from the event stream using processing time, via global windowing. Get
* periodic updates on all users' running scores.
*/
@VisibleForTesting
static class CalculateUserScores
extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
private final Duration allowedLateness;
CalculateUserScores(Duration allowedLateness) {
this.allowedLateness = allowedLateness;
}
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
return input
.apply(
"LeaderboardUserGlobalWindow",
Window.<GameActionInfo>into(new GlobalWindows())
// Get periodic results every ten minutes.
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
.accumulatingFiredPanes()
.withAllowedLateness(allowedLateness))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"));
}
}
class CalculateUserScores(beam.PTransform):
"""Extract user/score pairs from the event stream using processing time, via
global windowing. Get periodic updates on all users' running scores.
"""
def __init__(self, allowed_lateness):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
self.allowed_lateness_seconds = allowed_lateness * 60
def expand(self, pcoll):
# NOTE: the behavior does not exactly match the Java example
# TODO: allowed_lateness not implemented yet in FixedWindows
# TODO: AfterProcessingTime not implemented yet, replace AfterCount
return (
pcoll
# Get periodic results every ten events.
| 'LeaderboardUserGlobalWindows' >> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(10)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
# Extract and sum username/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('user'))
LeaderBoard
將視窗累加模式設定為在觸發器觸發時累加視窗窗格。此累加模式是透過在設定觸發器時呼叫 .accumulatingFiredPanes
使用 accumulation_mode=trigger.AccumulationMode.ACCUMULATING
來設定的,並導致管道將先前發出的資料與自上次觸發器觸發以來抵達的任何新資料累加在一起。這確保了 LeaderBoard
是使用者分數的執行總和,而不是個別總和的集合。
根據事件時間計算團隊分數
我們希望我們的管道在每小時的遊戲時間內也輸出每個隊伍的總分。與使用者分數計算不同,對於團隊分數,我們關心每個分數實際發生的*事件*時間,因為我們希望單獨考慮每小時的遊戲時間。我們也希望在每小時進展時提供推測性更新,並允許任何遲到資料的實例——在給定小時的資料被認為完成後才抵達的資料——包含在我們的計算中。
因為我們單獨考慮每小時,我們可以像在 HourlyTeamScore
中一樣,將固定時間視窗化應用於我們的輸入資料。為了提供推測性更新和遲到資料的更新,我們將指定其他觸發器參數。觸發器將導致每個視窗以我們指定的間隔(在本例中,每五分鐘)計算並發出結果,並且在視窗被認為「完成」後繼續觸發,以考慮遲到資料。就像使用者分數計算一樣,我們將觸發器設定為累加模式,以確保我們獲得每個小時長視窗的執行總和。
推測性更新和遲到資料的觸發器有助於解決時間偏差的問題。管道中的事件不一定按照它們根據時間戳實際發生的順序處理;它們可能會無序或遲到地抵達管道(在我們的例子中,因為它們是在使用者的手機與網路斷線時產生的)。Beam 需要一種方法來確定它何時可以合理地假設它已經擁有給定視窗中「所有」的資料:這稱為*水位線*。
在理想的世界中,所有資料都會在發生時立即處理,因此處理時間將等於(或至少與事件時間有線性關係)事件時間。然而,由於分散式系統包含一些固有的不準確性(例如我們遲報的手機),Beam 通常使用啟發式水位線。
下圖顯示了正在進行的處理時間與兩個隊伍中每個分數的事件時間之間的關係

圖 5:按隊伍劃分的分數資料,按事件時間劃分視窗。基於處理時間的觸發器會導致視窗發出推測性的早期結果,並包含遲到的結果。
圖中的虛線是「理想」的水位線:Beam 對何時可以合理地認為給定視窗中的所有資料都已抵達的概念。不規則的實線表示實際的水位線,由資料來源確定。
抵達實線水位線以上的資料是*遲到資料*——這是一個分數事件,被延遲了(可能是離線產生的),並且在它所屬的視窗關閉後才抵達。我們管道的遲到觸發器確保了此遲到資料仍然包含在總和中。
以下程式碼範例顯示 LeaderBoard
如何應用固定時間視窗化和適當的觸發器,使我們的管道執行我們想要的計算
// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
private final Duration teamWindowDuration;
private final Duration allowedLateness;
CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
this.teamWindowDuration = teamWindowDuration;
this.allowedLateness = allowedLateness;
}
@Override
public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
return infos
.apply(
"LeaderboardTeamFixedWindows",
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(allowedLateness)
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"));
}
}
class CalculateTeamScores(beam.PTransform):
"""Calculates scores for each team within the configured window duration.
Extract team/score pairs from the event stream, using hour-long windows by
default.
"""
def __init__(self, team_window_duration, allowed_lateness):
# TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
# super().__init__()
beam.PTransform.__init__(self)
self.team_window_duration = team_window_duration * 60
self.allowed_lateness_seconds = allowed_lateness * 60
def expand(self, pcoll):
# NOTE: the behavior does not exactly match the Java example
# TODO: allowed_lateness not implemented yet in FixedWindows
# TODO: AfterProcessingTime not implemented yet, replace AfterCount
return (
pcoll
# We will get early (speculative) results as well as cumulative
# processing of late data.
| 'LeaderboardTeamFixedWindows' >> beam.WindowInto(
beam.window.FixedWindows(self.team_window_duration),
trigger=trigger.AfterWatermark(
trigger.AfterCount(10), trigger.AfterCount(20)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team'))
總而言之,這些處理策略使我們能夠解決 UserScore
和 HourlyTeamScore
管道中存在的延遲和完整性問題,同時仍然使用相同的基本轉換來處理資料——事實上,這兩個計算仍然使用我們在 UserScore
和 HourlyTeamScore
管道中使用的相同 ExtractAndSumScore
轉換。
GameStats:濫用偵測和使用分析
雖然 LeaderBoard
示範了如何使用基本視窗化和觸發器來執行低延遲和靈活的資料分析,但我們可以使用更進階的視窗化技術來執行更全面的分析。這可能包括一些旨在偵測系統濫用(如垃圾郵件)或深入了解使用者行為的計算。GameStats
管道建立在 LeaderBoard
中的低延遲功能之上,以示範如何使用 Beam 來執行這種進階分析。
與 LeaderBoard
一樣,GameStats
從無界來源讀取資料。最好將其視為一個持續進行的工作,在使用者玩遊戲時提供對遊戲的深入了解。
注意: 有關完整的範例管道程式,請參閱GitHub 上的 GameStats。
注意: 有關完整的範例管道程式,請參閱GitHub 上的 GameStats。
GameStats 的功能是什麼?
與 LeaderBoard
一樣,GameStats
計算每隊每小時的總分。但是,管道還執行兩種更複雜的分析
GameStats
執行濫用偵測系統,該系統對分數資料執行一些簡單的統計分析,以確定哪些使用者(如果有)可能是垃圾郵件發送者或機器人。然後,它使用可疑垃圾郵件/機器人使用者的清單,將機器人從每小時的團隊分數計算中過濾掉。GameStats
透過使用會話視窗化將共享相似事件時間的遊戲資料分組在一起,來分析使用模式。這使我們可以深入了解使用者傾向於玩多久,以及遊戲時間如何隨時間變化。
下面,我們將更詳細地研究這些功能。
濫用偵測
假設我們遊戲中的得分取決於使用者在手機上「點擊」的速度。GameStats
的濫用偵測會分析每個使用者的分數資料,以偵測使用者是否具有異常高的「點擊率」,因此分數也異常高。這可能表示遊戲是由一個運作速度明顯快於人類的機器人所玩。
為了確定分數是否「異常」高,GameStats
會計算該固定時間視窗中每個分數的平均值,然後根據平均分數乘以任意權重因子(在我們的案例中為 2.5)來檢查每個個別分數。因此,任何超過平均值 2.5 倍的分數都被視為垃圾郵件的產物。GameStats
管道會追蹤「垃圾郵件」使用者的清單,並將這些使用者從團隊排行榜的團隊分數計算中過濾掉。
由於平均值取決於管道資料,我們需要計算它,然後在後續的 ParDo
轉換中使用該計算資料,以過濾掉超過加權值的分數。為此,我們可以將計算出的平均值作為側輸入傳遞到篩選 ParDo
。
以下程式碼範例顯示了處理濫用偵測的複合轉換。該轉換使用 Sum.integersPerKey
轉換來對每個使用者的所有分數求和,然後使用 Mean.globally
轉換來確定所有使用者的平均分數。一旦計算完成(作為 PCollectionView
單例),我們可以使用 .withSideInputs
將其傳遞給篩選 ParDo
。
public static class CalculateSpammyUsers
extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
private static final double SCORE_WEIGHT = 2.5;
@Override
public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
// Get the sum of scores for each user.
PCollection<KV<String, Integer>> sumScores =
userScores.apply("UserSum", Sum.integersPerKey());
// Extract the score from each element, and use it to find the global mean.
final PCollectionView<Double> globalMeanScore =
sumScores.apply(Values.create()).apply(Mean.<Integer>globally().asSingletonView());
// Filter the user sums using the global mean.
PCollection<KV<String, Integer>> filtered =
sumScores.apply(
"ProcessAndFilter",
ParDo
// use the derived mean total score as a side input
.of(
new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Counter numSpammerUsers =
Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
Double gmc = c.sideInput(globalMeanScore);
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info(
"user "
+ c.element().getKey()
+ " spammer score "
+ score
+ " with mean "
+ gmc);
numSpammerUsers.inc();
c.output(c.element());
}
}
})
.withSideInputs(globalMeanScore));
return filtered;
}
}
class CalculateSpammyUsers(beam.PTransform):
"""Filter out all but those users with a high clickrate, which we will
consider as 'spammy' users.
We do this by finding the mean total score per user, then using that
information as a side input to filter out all but those user scores that are
larger than (mean * SCORE_WEIGHT).
"""
SCORE_WEIGHT = 2.5
def expand(self, user_scores):
# Get the sum of scores for each user.
sum_scores = (user_scores | 'SumUsersScores' >> beam.CombinePerKey(sum))
# Extract the score from each element, and use it to find the global mean.
global_mean_score = (
sum_scores
| beam.Values()
| beam.CombineGlobally(beam.combiners.MeanCombineFn())\
.as_singleton_view())
# Filter the user sums using the global mean.
filtered = (
sum_scores
# Use the derived mean total score (global_mean_score) as a side input.
| 'ProcessAndFilter' >> beam.Filter(
lambda key_score, global_mean:\
key_score[1] > global_mean * self.SCORE_WEIGHT,
global_mean_score))
return filtered
濫用偵測轉換會產生一個疑似為垃圾機器人的使用者視圖。在管道的稍後階段,我們使用該視圖來過濾掉任何此類使用者,當我們計算每小時的團隊分數時,再次使用側輸入機制。以下程式碼範例顯示了我們在將分數視窗化為固定視窗和提取團隊分數之間插入垃圾郵件過濾器的位置
// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
.apply(
"WindowIntoFixedWindows",
Window.into(
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
// Filter out the detected spammer users, using the side input derived above.
.apply(
"FilterOutSpammers",
ParDo.of(
new DoFn<GameActionInfo, GameActionInfo>() {
@ProcessElement
public void processElement(ProcessContext c) {
// If the user is not in the spammers Map, output the data element.
if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
c.output(c.element());
}
}
})
.withSideInputs(spammersView))
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
# Calculate the total score per team over fixed windows, and emit cumulative
# updates for late data. Uses the side input derived above --the set of
# suspected robots-- to filter out scores from those users from the sum.
# Write the results to BigQuery.
( # pylint: disable=expression-not-assigned
raw_events
| 'WindowIntoFixedWindows' >> beam.WindowInto(
beam.window.FixedWindows(fixed_window_duration))
# Filter out the detected spammer users, using the side input derived
# above
| 'FilterOutSpammers' >> beam.Filter(
lambda elem, spammers: elem['user'] not in spammers, spammers_view)
# Extract and sum teamname/score pairs from the event data.
| 'ExtractAndSumScore' >> ExtractAndSumScore('team')
分析使用模式
我們可以透過檢查每個遊戲分數的事件時間,並將具有相似事件時間的分數分組到*會話*中,來深入了解使用者何時在玩我們的遊戲以及玩了多久。GameStats
使用 Beam 內建的會話視窗化函數,根據使用者分數發生的時間將其分組到會話中。
當您設定會話視窗化時,您需要指定事件之間的*最小間隙持續時間*。所有到達時間間隔小於最小間隙持續時間的事件都會被分組到同一個視窗中。到達時間差大於間隙的事件會被分組到不同的視窗中。根據我們如何設定最小間隙持續時間,我們可以安全地假設同一個會話視窗中的分數是同一個(相對)不間斷遊戲時間的一部分。不同視窗中的分數表示使用者停止玩遊戲至少最小間隙時間,然後才稍後返回。
下圖顯示了資料在分組到會話視窗中時的外觀。與固定視窗不同,會話視窗*對於每個使用者都不同*,並且取決於每個使用者的個別遊戲模式
圖 6:具有最小間隙持續時間的使用者會話。每個使用者都有不同的會話,具體取決於他們玩了多少次遊戲以及他們在遊戲之間的休息時間有多長。
我們可以利用會話視窗化的資料來確定所有使用者不間斷的平均遊戲時間長度,以及他們在每個會話期間獲得的總分。我們可以在程式碼中執行此操作,首先應用會話視窗,計算每個使用者和會話的分數總和,然後使用轉換來計算每個個別會話的長度
// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
.apply(
"WindowIntoSessions",
Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
// Get the duration per session.
.apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))
# Detect user sessions-- that is, a burst of activity separated by a gap
# from further activity. Find and record the mean session lengths.
# This information could help the game designers track the changing user
# engagement as their set of game changes.
( # pylint: disable=expression-not-assigned
user_events
| 'WindowIntoSessions' >> beam.WindowInto(
beam.window.Sessions(session_gap),
timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW)
# For this use, we care only about the existence of the session, not any
# particular information aggregated over it, so we can just group by key
# and assign a "dummy value" of None.
| beam.CombinePerKey(lambda _: None)
# Get the duration of the session
| 'UserSessionActivity' >> beam.ParDo(UserSessionActivity())
這會給我們一組使用者會話,每個會話都附加了一個持續時間。然後,我們可以透過將資料重新視窗化為固定時間視窗,然後計算每個小時結束的所有會話的平均值,來計算*平均*會話長度
// Re-window to process groups of session sums according to when the sessions complete.
.apply(
"WindowToExtractSessionMean",
Window.into(
FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply(
"WriteAvgSessionLength",
new WriteWindowedToBigQuery<>(
options.as(GcpOptions.class).getProject(),
options.getDataset(),
options.getGameStatsTablePrefix() + "_sessions",
configureSessionWindowWrite()));
# Re-window to process groups of session sums according to when the
# sessions complete
| 'WindowToExtractSessionMean' >> beam.WindowInto(
beam.window.FixedWindows(user_activity_window_duration))
# Find the mean session duration in each window
| beam.CombineGlobally(
beam.combiners.MeanCombineFn()).without_defaults()
| 'FormatAvgSessionLength' >>
beam.Map(lambda elem: {'mean_duration': float(elem)})
| 'WriteAvgSessionLength' >> WriteToBigQuery(
args.table_name + '_sessions',
args.dataset, {
'mean_duration': 'FLOAT',
},
options.view_as(GoogleCloudOptions).project))
我們可以利用產生的資訊來找出,例如,使用者一天中哪個時段玩的時間最長,或者哪一段時間更有可能看到較短的遊戲會話。
後續步驟
如果您遇到任何問題,請隨時聯繫我們!
上次更新於 2024/10/31
您是否找到了您要尋找的所有內容?
所有內容都有用且清晰嗎?您有任何想要更改的內容嗎?請告訴我們!