Nexmark 基準測試套件

這是什麼

Nexmark 是一組受 Nexmark 研究論文中的「連續資料串流」查詢啟發的管線套件

這些是在線上拍賣系統的三個實體模型上執行的多個查詢

查詢

這些查詢練習了 Beam 模型的多個面向

我們增加了原始查詢,再增加了五個

基準測試工作負載組態

以下是一些基準測試工作負載的調整旋鈕(請參閱 NexmarkConfiguration.java)。

這些組態項目可以傳遞至啟動命令列。

事件產生(預設值)

視窗(預設值)

事件比例(預設值)

技術

Nexmark 輸出

以下是在(本機)直接執行器上使用 SMOKE 套件以串流模式執行 Nexmark 基準測試的範例輸出

Performance:
  Conf       Runtime(sec)         Events(/sec)         Results
  0000                5,5              18138,9          100000
  0001                4,2              23657,4           92000
  0002                2,2              45683,0             351
  0003                3,9              25348,5             444
  0004                1,6               6207,3              40
  0005                5,0              20173,5              12
  0006                0,9              11376,6             401
  0007              121,4                823,5               1
  0008                2,5              40273,9            6000
  0009                0,9              10695,2             298
  0010                4,0              25025,0               1
  0011                4,4              22655,2            1919
  0012                3,5              28208,7            1919

基準測試啟動組態

Nexmark 啟動器接受 --runner 引數,這與使用 Beam PipelineOptions 管理其命令列引數的程式一樣。除了這個之外,必須設定必要的相依性。

透過 Gradle 執行時,以下兩個參數會控制執行

-P nexmark.args
    The command line to pass to the Nexmark main program.

-P nexmark.runner
The Gradle project name of the runner, such as ":runners:direct-java" or
":runners:flink:1.13. The project names can be found in the root
    `settings.gradle.kts`.

測試資料是根據需要確定性地合成的。測試資料可以在與查詢本身相同的管線中合成,或者可以發佈到 Pub/Sub 或 Kafka。

查詢結果可能是

通用組態參數

決定是批次還是串流

--streaming=true

事件產生器的數量

--numEventGenerators=4

可以依名稱或編號執行查詢(編號仍然存在,以便向後相容,只有查詢 0 到 12 有編號)

執行查詢 N

--query=N

執行名為 PASSTHROUGH 的查詢

--query=PASSTHROUGH

可用套件

可以使用此組態參數選擇要執行的套件

--suite=SUITE

可用的套件為

Google Cloud Dataflow 執行器特定組態

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
--stagingLocation=gs://<a gs path for staging> \
--runner=DataflowRunner \
--tempLocation=gs://<a gs path for temporary files> \
--filesToStage=target/beam-sdks-java-nexmark-2.60.0.jar

直接執行器特定組態

--manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--manageResources=false --monitorJobs=true \
--flinkMaster=[local] --parallelism=#numcores

Spark 執行器特定組態

--manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true

Kafka 來源/接收器組態參數

設定 Kafka 主機/IP(例如,「localhost:9092」)

--bootstrapServers=<kafka host/ip>

將結果寫入 Kafka 主題

--sinkType=KAFKA

設定將用於基準測試結果的主題名稱

--kafkaResultsTopic=<topic name>

將事件寫入和/或讀取至 Kafka 主題

--sourceType=KAFKA

設定將用於基準測試事件的主題名稱

--kafkaTopic=<topic name>

目前狀態

這些表格包含在不同執行器中執行的查詢狀態。Google Cloud Dataflow 狀態即將推出。

批次 / 合成 / 本機

查詢直接SparkFlink
0確定確定確定
1確定確定確定
2確定確定確定
3確定確定確定
4確定確定確定
5確定確定確定
6確定確定確定
7確定確定確定
8確定確定確定
9確定確定確定
10確定確定確定
11確定確定確定
12確定確定確定
BOUNDED_SIDE_INPUT_JOIN確定確定確定

串流 / 合成 / 本機

查詢直接Spark Issue 18416Flink
0確定確定確定
1確定確定確定
2確定確定確定
3確定Issue 18074, BEAM-3961確定
4確定確定確定
5確定確定確定
6確定確定確定
7確定BEAM-2112確定
8確定確定確定
9確定確定確定
10確定確定確定
11確定確定確定
12確定確定確定
BOUNDED_SIDE_INPUT_JOIN確定BEAM-2112確定

批次 / 合成 / 叢集

即將推出

串流 / 合成 / 叢集

即將推出

執行 Nexmark

在 DirectRunner (本機) 上執行 SMOKE 套件

DirectRunner 是預設值,因此不需要傳遞 -Pnexmark.runner。在這裡,我們為了最大程度的清晰度而這樣做。

直接執行器沒有單獨的批次和串流模式,但 Nexmark 啟動有。

這些參數會保留 DirectRunner 的許多額外安全檢查,因此 SMOKE 套件可以確保 Nexmark 套件中沒有任何損壞。

批次模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=false
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

串流模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:direct-java" \
    -Pnexmark.args="
        --runner=DirectRunner
        --streaming=true
        --suite=SMOKE
        --manageResources=false
        --monitorJobs=true
        --enforceEncodability=true
        --enforceImmutability=true"

在 SparkRunner (本機) 上執行 SMOKE 套件

在 Nexmark gradle 啟動中,SparkRunner 是特殊情況。工作會提供 SparkRunner 建置時所依據的 Spark 版本,並設定記錄。

批次模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true"

串流模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:spark:3" \
    -Pnexmark.args="
        --runner=SparkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true"

在 FlinkRunner (本機) 上執行 SMOKE 套件

批次模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=false
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

串流模式

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:flink:1.13" \
    -Pnexmark.args="
        --runner=FlinkRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --flinkMaster=[local]"

在 Google Cloud Dataflow 上執行 SMOKE 套件

先設定這些,讓以下命令有效

PROJECT=<your project>
ZONE=<your zone>
STAGING_LOCATION=gs://<a GCS path for staging>
PUBSUB_TOPCI=<existing pubsub topic>

啟動

./gradlew :sdks:java:testing:nexmark:run \
    -Pnexmark.runner=":runners:google-cloud-dataflow-java" \
    -Pnexmark.args="
        --runner=DataflowRunner
        --suite=SMOKE
        --streamTimeout=60
        --streaming=true
        --manageResources=false
        --monitorJobs=true
        --project=${PROJECT}
        --zone=${ZONE}
        --workerMachineType=n1-highmem-8
        --stagingLocation=${STAGING_LOCATION}
        --sourceType=PUBSUB
        --pubSubMode=PUBLISH_ONLY
        --pubsubTopic=${PUBSUB_TOPIC}
        --resourceNameMode=VERBATIM
        --manageResources=false
        --numEventGenerators=64
        --numWorkers=16
        --maxNumWorkers=16
        --firstEventRate=100000
        --nextEventRate=100000
        --ratePeriodSec=3600
        --isRateLimited=true
        --avgPersonByteSize=500
        --avgAuctionByteSize=500
        --avgBidByteSize=500
        --probDelayedEvent=0.000001
        --occasionalDelaySec=3600
        --numEvents=0
        --useWallclockEventTime=true
        --usePubsubPublishTime=true
        --experiments=enable_custom_pubsub_sink"

使用 Apache Hadoop YARN 在 Spark 叢集上執行查詢 0

建置套件

./gradlew :sdks:java:testing:nexmark:assemble

提交至叢集

spark-submit \
    --class org.apache.beam.sdk.nexmark.Main \
    --master yarn-client \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    sdks/java/testing/nexmark/build/libs/beam-sdks-java-nexmark-2.60.0-spark.jar \
        --runner=SparkRunner \
        --query=0 \
        --streamTimeout=60 \
        --streaming=false \
        --manageResources=false \
        --monitorJobs=true"

Nexmark 儀表板

以下儀表板用作 CI 機制,以偵測 Beam 元件上是否有未回歸的現象。它們不應該是執行器或引擎的基準測試比較。尤其是因為

儀表板內容

在每次主版的提交時,都會執行 Nexmark 套件,並在圖表上建立圖形。所有指標儀表板都託管在 metrics.beam.apache.org

有 2 種儀表板

這些執行器有儀表板(其他執行器稍後推出)

每個儀表板包含