Nexmark 基準測試套件
這是什麼
Nexmark 是一組受 Nexmark 研究論文中的「連續資料串流」查詢啟發的管線套件
這些是在線上拍賣系統的三個實體模型上執行的多個查詢
- Person 代表提交拍賣物品和/或對拍賣出價的人。
- Auction 代表拍賣中的物品。
- Bid 代表對拍賣中物品的出價。
查詢
這些查詢練習了 Beam 模型的多個面向
- Query1 或 CURRENCY_CONVERSION:以歐元計算的出價金額是多少?說明簡單的對應。
- Query2 或 SELECTION:具有特定拍賣編號的拍賣有哪些?說明簡單的篩選。
- Query3 或 LOCAL_ITEM_SUGGESTION:誰在特定的美國州販售?說明增量聯結(使用每個金鑰狀態和計時器)和篩選。
- Query4 或 AVERAGE_PRICE_FOR_CATEGORY:每個拍賣類別的平均售價是多少?說明複雜的聯結(使用自訂視窗函數)和聚合。
- Query5 或 HOT_ITEMS:哪些拍賣在最後一段時間內看到最多出價?說明滑動視窗和合併器。
- Query6 或 AVERAGE_SELLING_PRICE_BY_SELLER:每個賣家最後 10 次已結束拍賣的平均售價是多少?與 Query4 共用相同的「得標出價」核心,並說明專用的合併器。
- Query7 或 HIGHEST_BID:每個時段的最高出價是多少?刻意使用側輸入來展示扇出。
- Query8 或 MONITOR_NEW_USERS:誰在最後一段時間內進入系統並建立拍賣?說明簡單的聯結。
我們增加了原始查詢,再增加了五個
- Query0 或 PASSTHROUGH:直接傳遞。讓我們能夠測量監控負擔。
- Query9 或 WINNING_BIDS:得標出價。Query4 和 Query6 共用的通用子查詢。
- Query10 或 LOG_TO_SHARDED_FILES:將所有事件記錄到 GCS 檔案。說明具有大型觸發側邊效果的視窗。
- Query11 或 USER_SESSIONS:使用者在每個活動階段中進行了多少次出價?說明階段視窗。
- Query12 或 PROCESSING_TIME_WINDOWS:使用者在固定的處理時間限制內進行了多少次出價?說明在全域視窗中以處理時間工作,與所有其他查詢在非全域視窗中以事件時間工作相比。
- BOUNDED_SIDE_INPUT_JOIN:將串流聯結到有界的側輸入,為基本串流擴充建模。
基準測試工作負載組態
以下是一些基準測試工作負載的調整旋鈕(請參閱 NexmarkConfiguration.java)。
這些組態項目可以傳遞至啟動命令列。
事件產生(預設值)
- 產生 100,000 個事件
- 100 個產生器執行緒
- SIN 曲線中的事件速率
- 10,000 的初始事件速率
- 10,000 的事件速率步進
- 100 個並行拍賣
- 1000 個並行出價/建立拍賣的人
視窗(預設值)
- 大小 10 秒
- 滑動期間 5 秒
- 浮水印保留 0 秒
事件比例(預設值)
- 熱門拍賣 = ½
- 熱門出價者 = ¼
- 熱門賣家 = ¼
技術
- 人為 CPU 負載
- 人為 IO 負載
Nexmark 輸出
以下是在(本機)直接執行器上使用 SMOKE 套件以串流模式執行 Nexmark 基準測試的範例輸出
Performance: Conf Runtime(sec) Events(/sec) Results 0000 5,5 18138,9 100000 0001 4,2 23657,4 92000 0002 2,2 45683,0 351 0003 3,9 25348,5 444 0004 1,6 6207,3 40 0005 5,0 20173,5 12 0006 0,9 11376,6 401 0007 121,4 823,5 1 0008 2,5 40273,9 6000 0009 0,9 10695,2 298 0010 4,0 25025,0 1 0011 4,4 22655,2 1919 0012 3,5 28208,7 1919
基準測試啟動組態
Nexmark 啟動器接受 --runner
引數,這與使用 Beam PipelineOptions 管理其命令列引數的程式一樣。除了這個之外,必須設定必要的相依性。
透過 Gradle 執行時,以下兩個參數會控制執行
-P nexmark.args
The command line to pass to the Nexmark main program.
-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
`settings.gradle.kts`.
測試資料是根據需要確定性地合成的。測試資料可以在與查詢本身相同的管線中合成,或者可以發佈到 Pub/Sub 或 Kafka。
查詢結果可能是
- 發佈到 Pub/Sub 或 Kafka。
- 以純文字形式寫入文字檔。
- 使用 Avro 編碼寫入文字檔。
- 傳送至 BigQuery。
- 捨棄。
通用組態參數
決定是批次還是串流
--streaming=true
事件產生器的數量
--numEventGenerators=4
可以依名稱或編號執行查詢(編號仍然存在,以便向後相容,只有查詢 0 到 12 有編號)
執行查詢 N
--query=N
執行名為 PASSTHROUGH 的查詢
--query=PASSTHROUGH
可用套件
可以使用此組態參數選擇要執行的套件
--suite=SUITE
可用的套件為
- DEFAULT:使用查詢 0 測試預設組態。
- SMOKE:使用預設組態執行所有查詢。
- STRESS:類似 smoke,但針對 100 萬個事件。
- FULL_THROTTLE:類似 SMOKE,但有 1 億個事件。
Google Cloud Dataflow 執行器特定組態
--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.60.0.jar
直接執行器特定組態
--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
Flink 執行器特定組態
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores
Spark 執行器特定組態
--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
Kafka 來源/接收器組態參數
設定 Kafka 主機/IP(例如,「localhost:9092」)
--bootstrapServers=<kafka host/ip>
將結果寫入 Kafka 主題
--sinkType=KAFKA
設定將用於基準測試結果的主題名稱
--kafkaResultsTopic=<topic name>
將事件寫入和/或讀取至 Kafka 主題
--sourceType=KAFKA
設定將用於基準測試事件的主題名稱
--kafkaTopic=<topic name>
目前狀態
這些表格包含在不同執行器中執行的查詢狀態。Google Cloud Dataflow 狀態即將推出。
批次 / 合成 / 本機
查詢 | 直接 | Spark | Flink |
---|---|---|---|
0 | 確定 | 確定 | 確定 |
1 | 確定 | 確定 | 確定 |
2 | 確定 | 確定 | 確定 |
3 | 確定 | 確定 | 確定 |
4 | 確定 | 確定 | 確定 |
5 | 確定 | 確定 | 確定 |
6 | 確定 | 確定 | 確定 |
7 | 確定 | 確定 | 確定 |
8 | 確定 | 確定 | 確定 |
9 | 確定 | 確定 | 確定 |
10 | 確定 | 確定 | 確定 |
11 | 確定 | 確定 | 確定 |
12 | 確定 | 確定 | 確定 |
BOUNDED_SIDE_INPUT_JOIN | 確定 | 確定 | 確定 |
串流 / 合成 / 本機
查詢 | 直接 | Spark Issue 18416 | Flink |
---|---|---|---|
0 | 確定 | 確定 | 確定 |
1 | 確定 | 確定 | 確定 |
2 | 確定 | 確定 | 確定 |
3 | 確定 | Issue 18074, BEAM-3961 | 確定 |
4 | 確定 | 確定 | 確定 |
5 | 確定 | 確定 | 確定 |
6 | 確定 | 確定 | 確定 |
7 | 確定 | BEAM-2112 | 確定 |
8 | 確定 | 確定 | 確定 |
9 | 確定 | 確定 | 確定 |
10 | 確定 | 確定 | 確定 |
11 | 確定 | 確定 | 確定 |
12 | 確定 | 確定 | 確定 |
BOUNDED_SIDE_INPUT_JOIN | 確定 | BEAM-2112 | 確定 |
批次 / 合成 / 叢集
即將推出
串流 / 合成 / 叢集
即將推出
執行 Nexmark
在 DirectRunner (本機) 上執行 SMOKE 套件
DirectRunner 是預設值,因此不需要傳遞 -Pnexmark.runner
。在這裡,我們為了最大程度的清晰度而這樣做。
直接執行器沒有單獨的批次和串流模式,但 Nexmark 啟動有。
這些參數會保留 DirectRunner 的許多額外安全檢查,因此 SMOKE 套件可以確保 Nexmark 套件中沒有任何損壞。
批次模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:direct-java" \
-Pnexmark.args="
--runner=DirectRunner
--streaming=false
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"
串流模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:direct-java" \
-Pnexmark.args="
--runner=DirectRunner
--streaming=true
--suite=SMOKE
--manageResources=false
--monitorJobs=true
--enforceEncodability=true
--enforceImmutability=true"
在 SparkRunner (本機) 上執行 SMOKE 套件
在 Nexmark gradle 啟動中,SparkRunner 是特殊情況。工作會提供 SparkRunner 建置時所依據的 Spark 版本,並設定記錄。
批次模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:spark:3" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true"
串流模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:spark:3" \
-Pnexmark.args="
--runner=SparkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true"
在 FlinkRunner (本機) 上執行 SMOKE 套件
批次模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.13" \
-Pnexmark.args="
--runner=FlinkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=false
--manageResources=false
--monitorJobs=true
--flinkMaster=[local]"
串流模式
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:flink:1.13" \
-Pnexmark.args="
--runner=FlinkRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true
--flinkMaster=[local]"
在 Google Cloud Dataflow 上執行 SMOKE 套件
先設定這些,讓以下命令有效
PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>
啟動
./gradlew :sdks:java:testing:nexmark:run \
-Pnexmark.runner=":runners:google-cloud-dataflow-java" \
-Pnexmark.args="
--runner=DataflowRunner
--suite=SMOKE
--streamTimeout=60
--streaming=true
--manageResources=false
--monitorJobs=true
--project=${PROJECT}
--zone=${ZONE}
--workerMachineType=n1-highmem-8
--stagingLocation=${STAGING_LOCATION}
--sourceType=PUBSUB
--pubSubMode=PUBLISH_ONLY
--pubsubTopic=${PUBSUB_TOPIC}
--resourceNameMode=VERBATIM
--manageResources=false
--numEventGenerators=64
--numWorkers=16
--maxNumWorkers=16
--firstEventRate=100000
--nextEventRate=100000
--ratePeriodSec=3600
--isRateLimited=true
--avgPersonByteSize=500
--avgAuctionByteSize=500
--avgBidByteSize=500
--probDelayedEvent=0.000001
--occasionalDelaySec=3600
--numEvents=0
--useWallclockEventTime=true
--usePubsubPublishTime=true
--experiments=enable_custom_pubsub_sink"
使用 Apache Hadoop YARN 在 Spark 叢集上執行查詢 0
建置套件
./gradlew :sdks:java:testing:nexmark:assemble
提交至叢集
spark-submit \
--class org.apache.beam.sdk.nexmark.Main \
--master yarn-client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.60.0-spark.jar \
--runner=SparkRunner \
--query=0 \
--streamTimeout=60 \
--streaming=false \
--manageResources=false \
--monitorJobs=true"
Nexmark 儀表板
以下儀表板用作 CI 機制,以偵測 Beam 元件上是否有未回歸的現象。它們不應該是執行器或引擎的基準測試比較。尤其是因為
- 執行器的參數不同
- Nexmark 在本機(大多數時間為內嵌)模式下與執行器一起執行
- Nexmark 在執行所有 CI 和組建的共用機器上執行。
- 執行器對 Beam 模型有不同的支援
- 執行器具有不同的優勢,使得比較很困難
- 有些執行器設計為面向批次,有些則面向串流
- 有些設計為次秒級延遲,有些則支援自動縮放
儀表板內容
在每次主版的提交時,都會執行 Nexmark 套件,並在圖表上建立圖形。所有指標儀表板都託管在 metrics.beam.apache.org。
有 2 種儀表板
- 一個用於效能(查詢的執行時間)
- 一個用於輸出 PCollection 的大小(應該是恆定的)
這些執行器有儀表板(其他執行器稍後推出)
- Spark
- Flink
- Direct Runner
- Dataflow
每個儀表板包含
- 批次模式下的圖表
- 串流模式下的圖表
- 所有查詢的圖表。