使用 Apache Spark 執行器
Apache Spark 執行器可以用來使用 Apache Spark 執行 Beam 管道。Spark 執行器可以像原生的 Spark 應用程式一樣執行 Spark 管道;部署一個用於本機模式的獨立應用程式,在 Spark 的獨立資源管理器上執行,或使用 YARN 或 Mesos。
Spark 執行器在 Apache Spark 之上執行 Beam 管道,提供
- 批次和串流(以及組合)管道。
- 與 RDD 和 DStream 提供的相同的容錯 保證。
- Spark 提供的相同的 安全性 功能。
- 使用 Spark 的指標系統內建指標報告,該系統也會報告 Beam Aggregators。
- 透過 Spark 的廣播變數原生支援 Beam 的側輸入。
Beam 能力矩陣記錄了目前 Spark 執行器支援的功能。
Spark 執行器的三種形式
Spark 執行器有三種形式
- 一個傳統執行器,僅支援 Java(和其他基於 JVM 的語言),並且基於 Spark RDD/DStream
- 一個結構化串流 Spark 執行器,僅支援 Java(和其他基於 JVM 的語言),並且基於 Spark 資料集和 Apache Spark 結構化串流框架。
注意:它仍然是實驗性的,它對 Beam 模型的覆蓋範圍是局部的。目前它僅支援批次模式。
- 一個可移植執行器,支援 Java、Python 和 Go
本指南分為兩部分,以記錄 Spark 執行器的不可移植和可移植功能。請使用下面的切換器來選擇合適的執行器
該使用哪種執行器:可移植或不可移植執行器?
Beam 及其執行器最初僅支援基於 JVM 的語言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 是後來添加的。為了支援執行使用其他語言編寫的管道,執行器的架構必須進行重大更改。
如果您的應用程式僅使用 Java,那麼您目前應該使用基於 Java 的執行器之一。如果您想在 Spark 上使用 Beam 執行 Python 或 Go 管道,則需要使用可移植執行器。有關可移植性的更多資訊,請造訪可移植性頁面。
- 不可移植 (Java)
- 可移植 (Java/Python/Go)
Spark 執行器的先決條件和設定
Spark 執行器目前支援 Spark 的 3.2.x 分支。
注意:對 Spark 2.4.x 的支援已在 Beam 2.46.0 中移除。
您可以透過將以下內容添加到您的 pom.xml 來新增對最新版本的 Spark 執行器的依賴項
使用您的應用程式部署 Spark
在某些情況下,例如在本機模式/獨立模式下執行時,您的(獨立)應用程式需要透過在 pom.xml 中明確新增以下依賴項來打包 Spark
並使用 maven shade 外掛程式對應用程式 jar 進行遮蔽
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
執行 mvn package
後,執行 ls target
,您應該會看到(假設您的 artifactId 是 beam-examples
,版本是 1.0.0
)
要針對獨立叢集執行,只需執行
對於基於 RDD/DStream 的執行器
對於基於結構化串流的執行器
您需要在您的執行環境中安裝 Docker。要使用 Python 開發 Apache Beam,您必須安裝 Apache Beam Python SDK:pip install apache_beam
。有關如何建立 Python 管道,請參閱Python 文件。
從 Beam 2.20.0 開始,預先建置的 Spark 任務服務 Docker 映像可在 Docker Hub 上取得。
對於較舊的 Beam 版本,您需要一份 Apache Beam 的原始程式碼副本。您可以在下載頁面下載它。
- 啟動任務服務端點
- 使用 Docker(首選):
docker run --net=host apache/beam_spark_job_server:latest
- 或從 Beam 原始程式碼:
./gradlew :runners:spark:3:job-server:runShadow
- 使用 Docker(首選):
任務服務是您提交 Beam 管道的中心實例。任務服務將為管道建立一個 Spark 任務並執行該任務。要在 Spark 叢集上執行任務,需要向 Beam 任務服務提供 Spark 主機位址。
- 透過使用
PortableRunner
、將job_endpoint
設定為localhost:8099
(這是任務服務的預設位址)和將environment_type
設定為LOOPBACK
,將 Python 管道提交到上述端點。例如
在預先部署的 Spark 叢集上執行
在已具有 Spark 部署的叢集上部署您的 Beam 管道(Spark 類別在容器類別路徑中可用)不需要任何額外的依賴項。有關不同部署模式的更多詳細資訊,請參閱:獨立、YARN 或 Mesos。
- 啟動一個 Spark 叢集,預設情況下,該叢集會在 7077 連接埠上公開主機。
- 啟動將與 Spark 主機連接的任務服務
- 使用 Docker(首選):
docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://127.0.0.1:7077
- 或從 Beam 原始程式碼:
./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://127.0.0.1:7077
- 使用 Docker(首選):
- 如上所述提交管道。但請注意,
environment_type=LOOPBACK
僅適用於本機測試。有關詳細資訊,請參閱此處。
(請注意,根據您的叢集設定,您可能需要變更 environment_type
選項。有關詳細資訊,請參閱此處。)
在 Dataproc 叢集(以 YARN 為後端)上執行
要執行以 Python、Go 和其他支援語言編寫的 Beam 任務,您可以使用 Beam 的 Spark 執行器頁面上描述的 SparkRunner
和 PortableRunner
(另請參閱可移植性框架路線圖)。
以下範例從以 Yarn 為後端的 Dataproc 叢集的主節點執行可移植的 Python Beam 任務。
注意:此範例在 Dataproc 2.0、Spark 3.1.2 和 Beam 2.37.0 下成功執行。
- 建立一個啟用 Docker 元件的 Dataproc 叢集。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --properties spark:spark.master.rest.enabled=true
--optional-components
:Docker。--image-version
:叢集的映像版本,決定叢集上安裝的 Spark 版本(例如,請參閱最新和前四個 2.0.x 映像發行版本中列出的 Apache Spark 元件版本)。--region
:支援的 Dataproc 區域。--enable-component-gateway
:啟用對 網路介面的存取。--scopes
:啟用對同一專案中 GCP 服務的 API 存取。--properties
:為某些元件新增特定設定,此處啟用 spark.master.rest 以使用任務提交到叢集。
- 建立 Cloud Storage 儲存貯體。
gsutil mb BUCKET_NAME
- 在您的本機環境中為任務安裝必要的 Python 程式庫。
python -m pip install apache-beam[gcp]==BEAM_VERSION
- 將文字計數範例管道與稍後可執行以執行管道所需的所有依賴項、工件等捆綁到一個 jar 中。
python -m apache_beam.examples.wordcount \ --runner=SparkRunner \ --output_executable_path=OUTPUT_JAR_PATH \ --output=gs://BUCKET_NAME/python-wordcount-out \ --spark_version=3
--runner
(必要):SparkRunner
。--output_executable_path
(必要):要建立的捆綁 jar 的路徑。--output
(必要):應寫入輸出位置。--spark_version
(選用):選擇 spark 版本 3(預設)或 2(已棄用!)。
- 將 Spark 任務提交到 Dataproc 叢集的主節點。
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=org.apache.beam.runners.spark.SparkPipelineRunner \ --jars=OUTPUT_JAR_PATH
--cluster
:已建立的 Dataproc 叢集的名稱。--region
:支援的 Dataproc 區域。--class
:您的應用程式的進入點。--jars
:捆綁 jar 的路徑,包括您的應用程式和所有依賴項。
- 檢查結果是否已寫入您的儲存貯體。
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Spark 執行器的管道選項
使用 Spark 執行器執行管道時,您應考慮以下管道選項。
對於基於 RDD/DStream 的執行器
欄位 | 說明 | 預設值 |
---|---|---|
runner | 要使用的管道執行器。此選項可讓您在執行時決定管道執行器。 | 設定為 SparkRunner 以使用 Spark 執行。 |
sparkMaster | Spark 主機的 URL。這相當於設定 SparkConf#setMaster(String) ,可以是 local[x] 以使用 x 個核心在本機執行、spark://host:port 以連線到 Spark 獨立叢集、mesos://host:port 以連線到 Mesos 叢集,或 yarn 以連線到 yarn 叢集。 | local[4] |
storageLevel | 在批次管線中快取 RDD 時使用的 StorageLevel 。Spark Runner 會自動快取重複評估的 RDD。這是一個僅限批次的屬性,因為 Beam 中的串流管線是有狀態的,這需要 Spark DStream 的 StorageLevel 為 MEMORY_ONLY 。 | MEMORY_ONLY |
batchIntervalMillis | StreamingContext 的 batchDuration - 設定 Spark 的批次間隔。 | 1000 |
enableSparkMetricSinks | 啟用將指標回報至 Spark 的指標接收器。 | true |
cacheDisabled | 停用整個管線重複使用 PCollections 的快取。當重新計算 RDD 比儲存更快時,這會很有用。 | false |
對於基於結構化串流的執行器
欄位 | 說明 | 預設值 |
---|---|---|
runner | 要使用的管道執行器。此選項可讓您在執行時決定管道執行器。 | 設定為 SparkStructuredStreamingRunner 以使用 Spark 結構化串流執行。 |
sparkMaster | Spark 主機的 URL。這相當於設定 SparkConf#setMaster(String) ,可以是 local[x] 以使用 x 個核心在本機執行、spark://host:port 以連線到 Spark 獨立叢集、mesos://host:port 以連線到 Mesos 叢集,或 yarn 以連線到 yarn 叢集。 | local[4] |
testMode | 啟用測試模式,提供有用的偵錯資訊:Catalyst 執行計畫和 Beam DAG 列印 | false |
enableSparkMetricSinks | 啟用將指標回報至 Spark 的指標接收器。 | true |
checkpointDir | 用於串流復原的檢查點目錄,在批次中會被忽略。為了耐用性,需要使用可靠的檔案系統,例如 HDFS/S3/GS。 | /tmp 中的本機目錄 |
filesToStage | 要傳送至所有工作節點並放在類別路徑上的 Jar 檔案。 | 來自類別路徑的所有檔案 |
EnableSparkMetricSinks | 啟用/停用將聚合器值傳送至 Spark 的指標接收器 | true |
欄位 | 說明 | 值 |
---|---|---|
--runner | 要使用的管道執行器。此選項可讓您在執行時決定管道執行器。 | 設定為 PortableRunner 以使用 Spark 執行。 |
--job_endpoint | 要使用的工作服務端點。應採用 hostname:port 的格式,例如 localhost:3000 | 設定為符合您的工作服務端點(預設為 localhost:8099) |
其他注意事項
使用 spark-submit
當將 Spark 應用程式提交到叢集時,通常(且建議)使用 Spark 安裝提供的 spark-submit
腳本。上述描述的 PipelineOptions
並非要取代 spark-submit
,而是要對其進行補充。傳遞任何上述選項可以作為 application-arguments
之一完成,並且設定 –master
具有優先權。如需更多關於如何一般使用 spark-submit
的資訊,請查看 Spark 文件。
監控您的任務
您可以使用 Spark Web 介面來監控正在執行的 Spark 工作。預設情況下,這可以在驅動程式節點上的連接埠 4040
取得。如果您在本機電腦上執行 Spark,則將為 https://127.0.0.1:4040
。Spark 還有一個歷史伺服器,可以在事後檢視。
指標也可以透過 REST API 取得。Spark 提供了一個 指標系統,可讓將 Spark 指標回報至各種接收器。Spark runner 使用相同的指標系統回報使用者定義的 Beam 聚合器,目前支援 GraphiteSink 和 CSVSink。為 Spark 支援的其他接收器提供支援既簡單又直接。
可攜式 runner 尚不支援 Spark 指標。
串流執行
對於基於 RDD/DStream 的執行器
如果您的管線使用 UnboundedSource
,Spark Runner 會自動設定串流模式。強制使用串流模式主要用於測試,不建議使用。
對於基於結構化串流的執行器
Spark 結構化串流 runner 中尚未實作串流模式。
Spark 可攜式 runner 尚不支援串流。
使用提供的 SparkContext 和 StreamingListeners
對於基於 RDD/DStream 的執行器
如果您想使用提供的 SparkContext
執行 Spark 工作,例如當使用 spark-jobserver 時,或使用 StreamingListeners
,您不能使用 SparkPipelineOptions
(上下文或監聽器無論如何都不能作為命令列參數傳遞)。相反地,您應該使用 SparkContextOptions
,它只能以程式設計方式使用,而不是常見的 PipelineOptions
實作。
對於基於結構化串流的執行器
Spark 結構化串流 runner 不支援提供的 SparkSession 和 StreamingListeners。
Spark 可攜式 runner 不支援提供的 SparkContext 和 StreamingListeners。
Kubernetes
提交沒有任務伺服器的 Beam 任務
若要在不啟動額外工作伺服器的情況下,直接在 Spark Kubernetes 叢集上提交 beam 工作,您可以執行
spark-submit --master MASTER_URL \
--conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
--class org.apache.beam.runners.spark.SparkPipelineRunner \
--conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
./wc_job.jar
類似於在 Dataproc 上執行 beam 工作,您可以將工作 jar 打包如下。此範例使用 SDK harness 的 PROCESS
類型來透過程序執行工作。
python -m beam_example_wc \
--runner=SparkRunner \
--output_executable_path=./wc_job.jar \
--environment_type=PROCESS \
--environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
--spark_version=3
以下是 Kubernetes 執行程式 pod 範本的範例,需要 initContainer
下載 beam SDK harness 以執行 beam 管線。
spec:
containers:
- name: spark-kubernetes-executor
volumeMounts:
- name: beam-data
mountPath: /opt/apache/beam/
initContainers:
- name: init-beam
image: apache/beam_python3.7_sdk
command:
- cp
- /opt/apache/beam/boot
- /init-container/data/boot
volumeMounts:
- name: beam-data
mountPath: /init-container/data
volumes:
- name: beam-data
emptyDir: {}
提交帶有任務伺服器的 Beam 任務
一個關於設定 Spark 以使用工作伺服器執行 Apache beam 工作的 範例。
上次更新於 2024/10/31
您是否找到您要的所有資訊?
所有資訊是否有用且清楚?是否有任何您想變更的內容?請告訴我們!