使用 Apache Spark 執行器

Apache Spark 執行器可以用來使用 Apache Spark 執行 Beam 管道。Spark 執行器可以像原生的 Spark 應用程式一樣執行 Spark 管道;部署一個用於本機模式的獨立應用程式,在 Spark 的獨立資源管理器上執行,或使用 YARN 或 Mesos。

Spark 執行器在 Apache Spark 之上執行 Beam 管道,提供

Beam 能力矩陣記錄了目前 Spark 執行器支援的功能。

Spark 執行器的三種形式

Spark 執行器有三種形式

  1. 一個傳統執行器,僅支援 Java(和其他基於 JVM 的語言),並且基於 Spark RDD/DStream
  2. 一個結構化串流 Spark 執行器,僅支援 Java(和其他基於 JVM 的語言),並且基於 Spark 資料集和 Apache Spark 結構化串流框架。

注意:它仍然是實驗性的,它對 Beam 模型的覆蓋範圍是局部的。目前它僅支援批次模式。

  1. 一個可移植執行器,支援 Java、Python 和 Go

本指南分為兩部分,以記錄 Spark 執行器的不可移植和可移植功能。請使用下面的切換器來選擇合適的執行器

該使用哪種執行器:可移植或不可移植執行器?

Beam 及其執行器最初僅支援基於 JVM 的語言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 是後來添加的。為了支援執行使用其他語言編寫的管道,執行器的架構必須進行重大更改。

如果您的應用程式僅使用 Java,那麼您目前應該使用基於 Java 的執行器之一。如果您想在 Spark 上使用 Beam 執行 Python 或 Go 管道,則需要使用可移植執行器。有關可移植性的更多資訊,請造訪可移植性頁面

Spark 執行器的先決條件和設定

Spark 執行器目前支援 Spark 的 3.2.x 分支。

注意:對 Spark 2.4.x 的支援已在 Beam 2.46.0 中移除。

您可以透過將以下內容添加到您的 pom.xml 來新增對最新版本的 Spark 執行器的依賴項

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark-3</artifactId>
  <version>2.60.0</version>
</dependency>

使用您的應用程式部署 Spark

在某些情況下,例如在本機模式/獨立模式下執行時,您的(獨立)應用程式需要透過在 pom.xml 中明確新增以下依賴項來打包 Spark

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

並使用 maven shade 外掛程式對應用程式 jar 進行遮蔽

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
      <filter>
        <artifact>*:*</artifact>
        <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
  </configuration>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>shaded</shadedClassifierName>
        <transformers>
          <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

執行 mvn package 後,執行 ls target,您應該會看到(假設您的 artifactId 是 beam-examples,版本是 1.0.0

beam-examples-1.0.0-shaded.jar

要針對獨立叢集執行,只需執行


對於基於 RDD/DStream 的執行器

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner


對於基於結構化串流的執行器

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner

您需要在您的執行環境中安裝 Docker。要使用 Python 開發 Apache Beam,您必須安裝 Apache Beam Python SDK:pip install apache_beam。有關如何建立 Python 管道,請參閱Python 文件

pip install apache_beam

從 Beam 2.20.0 開始,預先建置的 Spark 任務服務 Docker 映像可在 Docker Hub 上取得。

對於較舊的 Beam 版本,您需要一份 Apache Beam 的原始程式碼副本。您可以在下載頁面下載它。

  1. 啟動任務服務端點
    • 使用 Docker(首選):docker run --net=host apache/beam_spark_job_server:latest
    • 或從 Beam 原始程式碼:./gradlew :runners:spark:3:job-server:runShadow

任務服務是您提交 Beam 管道的中心實例。任務服務將為管道建立一個 Spark 任務並執行該任務。要在 Spark 叢集上執行任務,需要向 Beam 任務服務提供 Spark 主機位址。

  1. 透過使用 PortableRunner、將 job_endpoint 設定為 localhost:8099(這是任務服務的預設位址)和將 environment_type 設定為 LOOPBACK,將 Python 管道提交到上述端點。例如

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

在預先部署的 Spark 叢集上執行

在已具有 Spark 部署的叢集上部署您的 Beam 管道(Spark 類別在容器類別路徑中可用)不需要任何額外的依賴項。有關不同部署模式的更多詳細資訊,請參閱:獨立YARNMesos

  1. 啟動一個 Spark 叢集,預設情況下,該叢集會在 7077 連接埠上公開主機。

  1. 啟動將與 Spark 主機連接的任務服務
    • 使用 Docker(首選):docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://127.0.0.1:7077
    • 或從 Beam 原始程式碼:./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://127.0.0.1:7077

  1. 如上所述提交管道。但請注意,environment_type=LOOPBACK 僅適用於本機測試。有關詳細資訊,請參閱此處

(請注意,根據您的叢集設定,您可能需要變更 environment_type 選項。有關詳細資訊,請參閱此處。)

在 Dataproc 叢集(以 YARN 為後端)上執行

要執行以 Python、Go 和其他支援語言編寫的 Beam 任務,您可以使用 Beam 的 Spark 執行器頁面上描述的 SparkRunnerPortableRunner(另請參閱可移植性框架路線圖)。

以下範例從以 Yarn 為後端的 Dataproc 叢集的主節點執行可移植的 Python Beam 任務。

注意:此範例在 Dataproc 2.0、Spark 3.1.2 和 Beam 2.37.0 下成功執行。

  1. 建立一個啟用 Docker 元件的 Dataproc 叢集。
gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DOCKER \
    --image-version=DATAPROC_IMAGE_VERSION \
    --region=REGION \
    --enable-component-gateway \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --properties spark:spark.master.rest.enabled=true
  1. 建立 Cloud Storage 儲存貯體。
gsutil mb BUCKET_NAME
  1. 在您的本機環境中為任務安裝必要的 Python 程式庫。
python -m pip install apache-beam[gcp]==BEAM_VERSION
  1. 將文字計數範例管道與稍後可執行以執行管道所需的所有依賴項、工件等捆綁到一個 jar 中。
python -m apache_beam.examples.wordcount \
    --runner=SparkRunner \
    --output_executable_path=OUTPUT_JAR_PATH \
    --output=gs://BUCKET_NAME/python-wordcount-out \
    --spark_version=3
  1. 將 Spark 任務提交到 Dataproc 叢集的主節點。
gcloud dataproc jobs submit spark \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --class=org.apache.beam.runners.spark.SparkPipelineRunner \
        --jars=OUTPUT_JAR_PATH
  1. 檢查結果是否已寫入您的儲存貯體。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID

Spark 執行器的管道選項

使用 Spark 執行器執行管道時,您應考慮以下管道選項。


對於基於 RDD/DStream 的執行器

欄位說明預設值
runner要使用的管道執行器。此選項可讓您在執行時決定管道執行器。設定為 SparkRunner 以使用 Spark 執行。
sparkMasterSpark 主機的 URL。這相當於設定 SparkConf#setMaster(String),可以是 local[x] 以使用 x 個核心在本機執行、spark://host:port 以連線到 Spark 獨立叢集、mesos://host:port 以連線到 Mesos 叢集,或 yarn 以連線到 yarn 叢集。local[4]
storageLevel在批次管線中快取 RDD 時使用的 StorageLevel。Spark Runner 會自動快取重複評估的 RDD。這是一個僅限批次的屬性,因為 Beam 中的串流管線是有狀態的,這需要 Spark DStream 的 StorageLevelMEMORY_ONLYMEMORY_ONLY
batchIntervalMillisStreamingContextbatchDuration - 設定 Spark 的批次間隔。1000
enableSparkMetricSinks啟用將指標回報至 Spark 的指標接收器。true
cacheDisabled停用整個管線重複使用 PCollections 的快取。當重新計算 RDD 比儲存更快時,這會很有用。false


對於基於結構化串流的執行器

欄位說明預設值
runner要使用的管道執行器。此選項可讓您在執行時決定管道執行器。設定為 SparkStructuredStreamingRunner 以使用 Spark 結構化串流執行。
sparkMasterSpark 主機的 URL。這相當於設定 SparkConf#setMaster(String),可以是 local[x] 以使用 x 個核心在本機執行、spark://host:port 以連線到 Spark 獨立叢集、mesos://host:port 以連線到 Mesos 叢集,或 yarn 以連線到 yarn 叢集。local[4]
testMode啟用測試模式,提供有用的偵錯資訊:Catalyst 執行計畫和 Beam DAG 列印false
enableSparkMetricSinks啟用將指標回報至 Spark 的指標接收器。true
checkpointDir用於串流復原的檢查點目錄,在批次中會被忽略。為了耐用性,需要使用可靠的檔案系統,例如 HDFS/S3/GS。/tmp 中的本機目錄
filesToStage要傳送至所有工作節點並放在類別路徑上的 Jar 檔案。來自類別路徑的所有檔案
EnableSparkMetricSinks啟用/停用將聚合器值傳送至 Spark 的指標接收器true
欄位說明
--runner要使用的管道執行器。此選項可讓您在執行時決定管道執行器。設定為 PortableRunner 以使用 Spark 執行。
--job_endpoint要使用的工作服務端點。應採用 hostname:port 的格式,例如 localhost:3000設定為符合您的工作服務端點(預設為 localhost:8099)

其他注意事項

使用 spark-submit

當將 Spark 應用程式提交到叢集時,通常(且建議)使用 Spark 安裝提供的 spark-submit 腳本。上述描述的 PipelineOptions 並非要取代 spark-submit,而是要對其進行補充。傳遞任何上述選項可以作為 application-arguments 之一完成,並且設定 –master 具有優先權。如需更多關於如何一般使用 spark-submit 的資訊,請查看 Spark 文件

監控您的任務

您可以使用 Spark Web 介面來監控正在執行的 Spark 工作。預設情況下,這可以在驅動程式節點上的連接埠 4040 取得。如果您在本機電腦上執行 Spark,則將為 https://127.0.0.1:4040。Spark 還有一個歷史伺服器,可以在事後檢視

指標也可以透過 REST API 取得。Spark 提供了一個 指標系統,可讓將 Spark 指標回報至各種接收器。Spark runner 使用相同的指標系統回報使用者定義的 Beam 聚合器,目前支援 GraphiteSinkCSVSink。為 Spark 支援的其他接收器提供支援既簡單又直接。

可攜式 runner 尚不支援 Spark 指標。

串流執行


對於基於 RDD/DStream 的執行器
如果您的管線使用 UnboundedSource,Spark Runner 會自動設定串流模式。強制使用串流模式主要用於測試,不建議使用。

對於基於結構化串流的執行器
Spark 結構化串流 runner 中尚未實作串流模式。

Spark 可攜式 runner 尚不支援串流。

使用提供的 SparkContext 和 StreamingListeners


對於基於 RDD/DStream 的執行器
如果您想使用提供的 SparkContext 執行 Spark 工作,例如當使用 spark-jobserver 時,或使用 StreamingListeners,您不能使用 SparkPipelineOptions(上下文或監聽器無論如何都不能作為命令列參數傳遞)。相反地,您應該使用 SparkContextOptions,它只能以程式設計方式使用,而不是常見的 PipelineOptions 實作。

對於基於結構化串流的執行器
Spark 結構化串流 runner 不支援提供的 SparkSession 和 StreamingListeners。

Spark 可攜式 runner 不支援提供的 SparkContext 和 StreamingListeners。

Kubernetes

提交沒有任務伺服器的 Beam 任務

若要在不啟動額外工作伺服器的情況下,直接在 Spark Kubernetes 叢集上提交 beam 工作,您可以執行

spark-submit --master MASTER_URL \
  --conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
  --conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
  --class org.apache.beam.runners.spark.SparkPipelineRunner \
  --conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
  ./wc_job.jar

類似於在 Dataproc 上執行 beam 工作,您可以將工作 jar 打包如下。此範例使用 SDK harnessPROCESS 類型來透過程序執行工作。

python -m beam_example_wc \
    --runner=SparkRunner \
    --output_executable_path=./wc_job.jar \
    --environment_type=PROCESS \
    --environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
    --spark_version=3

以下是 Kubernetes 執行程式 pod 範本的範例,需要 initContainer 下載 beam SDK harness 以執行 beam 管線。

spec:
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
      - name: beam-data
        mountPath: /opt/apache/beam/
  initContainers:
  - name: init-beam
    image: apache/beam_python3.7_sdk
    command:
    - cp
    - /opt/apache/beam/boot
    - /init-container/data/boot
    volumeMounts:
    - name: beam-data
      mountPath: /init-container/data
  volumes:
  - name: beam-data
    emptyDir: {}

提交帶有任務伺服器的 Beam 任務

一個關於設定 Spark 以使用工作伺服器執行 Apache beam 工作的 範例