概述

Apache Flink 執行器可用於使用 Apache Flink 執行 Beam 管線。對於執行,您可以選擇叢集執行模式(例如 Yarn/Kubernetes/Mesos)或本機嵌入式執行模式,這對於測試管線很有用。

Flink 執行器和 Flink 適用於大規模、連續的作業,並提供

使用 Apache Flink 執行器

重要的是要了解 Flink 執行器有兩種版本

  1. 原始的經典執行器,僅支援 Java(和其他基於 JVM 的語言)
  2. 較新的可攜式執行器,支援 Java/Python/Go

您可能會問為什麼會有兩個執行器?

Beam 及其執行器最初僅支援基於 JVM 的語言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 是稍後才添加的。必須顯著更改執行器的架構,才能支援執行以其他語言編寫的管線。

如果您的應用程式僅使用 Java,那麼您目前應該使用經典執行器。最終,可攜式執行器將取代經典執行器,因為它包含用於在未來執行 Java、Python、Go 和更多語言的通用框架。

如果您想在 Flink 上使用 Beam 執行 Python 管線,則需要使用可攜式執行器。有關可攜性的更多資訊,請瀏覽可攜性頁面

因此,本指南分為多個部分,以記錄 Flink 執行器的經典和可攜式功能。此外,Python 提供方便的封裝器來處理執行器的完整生命週期,因此進一步分為自動(建議)或手動管理可攜性元件。請使用下面的切換器為執行器選擇適當的模式

先決條件和設定

如果您想使用 Flink 執行器的本機執行模式,則不必完成任何叢集設定。您可以直接執行您的 Beam 管線。請務必將執行器設定為FlinkRunnerPortableRunner

若要使用 Flink 執行器在叢集上執行,您必須按照 Flink 設定快速入門來設定 Flink 叢集。

相依性

您必須在 pom.xmlbuild.gradle 中指定您對 Flink 執行器的依賴性。使用下面相容性表格中的 Beam 版本和構件 ID。例如

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-flink-1.18</artifactId>
  <version>2.60.0</version>
</dependency>

您的執行環境中需要安裝 Docker。若要執行嵌入式 Flink 叢集或將 Flink 執行器用於 Python < 3.6,您的執行環境中還需要有 Java。

您的執行環境中需要安裝 Docker。

若要在 Flink 叢集上執行管線,您需要將您的程式與所有相依性一起封裝在所謂的 fat jar 中。如何執行此操作取決於您的建置系統,但是如果您按照Beam 快速入門進行操作,則這是您必須執行的命令

$ mvn package -Pflink-runner

在此命令的 target 資料夾中尋找輸出 JAR。

Beam 快速入門 Maven 專案設定為使用 Maven Shade 外掛程式來建立 fat jar,而 -Pflink-runner 引數確保包含對 Flink 執行器的依賴性。

若要執行管線,最簡單的選項是使用 Flink 的一部分 flink 命令

$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters

或者,您也可以使用 Maven 的 exec 命令。例如,若要執行 WordCount 範例

mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam-bundled-0.1.jar"

如果您的本機電腦上正在執行 Flink JobManager,您可以為 flinkMaster 提供 localhost:8081。否則,將為該作業啟動一個嵌入式 Flink 叢集。

若要在 Flink 上執行管線,請將執行器設定為 FlinkRunner,並將 flink_master 設定為 Flink 叢集的 master URL。此外,還可以選擇將 environment_type 設定為 LOOPBACK。例如,在啟動本機 Flink 叢集後,可以執行

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=FlinkRunner",
    "--flink_master=localhost:8081",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

若要在嵌入式 Flink 叢集上執行,只需省略 flink_master 選項,系統將為該作業自動啟動和關閉嵌入式 Flink 叢集。

對於舊版 Python,也可能需要選擇性的 flink_version 選項。

從 Beam 2.18.0 開始,預先建置的 Flink Job Service Docker 映像可在 Docker Hub 上取得:Flink 1.16Flink 1.17Flink 1.18

若要在嵌入式 Flink 叢集上執行管線

(1)啟動 JobService 端點:docker run --net=host apache/beam_flink1.10_job_server:latest

JobService 是您將 Beam 管線提交到的中央執行個體。JobService 將為管線建立 Flink 作業,並執行該作業。

(2)使用 PortableRunner 將 Python 管線提交至上述端點,job_endpoint 設定為 localhost:8099(這是 JobService 的預設位址)。還可以選擇將 environment_type 設定為 LOOPBACK。例如

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

若要在單獨的Flink 叢集上執行

(1)啟動一個公開 Rest 介面的 Flink 叢集(例如,預設為 localhost:8081)。

(2)使用 Flink Rest 端點啟動 JobService:docker run --net=host apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081

(3)如上所述提交管線。

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options=options) as p:
    ...

請注意,environment_type=LOOPBACK 僅適用於本機測試,不適用於遠端叢集。有關詳細資訊,請參閱此處

其他資訊和注意事項

監控您的作業

您可以使用 Flink JobManager 儀表板或其 Rest 介面來監控正在執行的 Flink 作業。預設情況下,這可在 JobManager 節點的連接埠 8081 上取得。如果您在本機電腦上安裝了 Flink,則該位址會是 https://127.0.0.1:8081。注意:當您使用 [local] 模式時,將會啟動一個嵌入式 Flink 叢集,不會提供儀表板。

串流執行

如果您的管線使用無界資料來源或接收器,則 Flink 執行器會自動切換到串流模式。您可以使用 --streaming 旗標強制執行串流模式。

注意:當使用無界來源且未啟用檢查點時,執行器會列印警告訊息。許多來源(例如 PubSubIO)依賴其檢查點來進行確認,這僅能在為 FlinkRunner 啟用檢查點時完成。若要啟用檢查點,請將checkpointingIntervalcheckpointing_interval設定為所需的檢查點間隔(以毫秒為單位)。

當您使用 Flink 執行器執行管線時,您可以設定這些管線選項。

以下 Flink 特定的管線選項清單是從 FlinkPipelineOptions 參考類別自動產生

allowNonRestoredState旗標,指出如果儲存點包含不再是管線一部分的運算子的狀態,是否允許非還原狀態。預設值:false
attachedMode指定管線是以附加模式還是分離模式提交預設值:true
autoBalanceWriteFilesShardingEnabled旗標,指示是否應啟用 WriteFiles 轉換的自動平衡分片。這在串流用例中可能很有用,在串流用例中,管線需要將相當多的事件寫入檔案,通常分為 N 個分片。Flink 上的預設行為是,某些工作者將收到比其他工作者更多的分片來處理。這會導致工作者在處理待辦事項和記憶體使用方面失去平衡。啟用此功能將使分片在可用的工作者之間均勻分佈,從而提高輸送量和記憶體使用穩定性。預設值:false
autoWatermarkInterval自動水位線發射的間隔(以毫秒為單位)。
checkpointTimeoutMillis檢查點在被捨棄之前可能需要的最大時間(以毫秒為單位)。預設值:-1
checkpointingInterval觸發執行中管線檢查點的間隔(以毫秒為單位)。預設值:不進行檢查點。預設值:-1
checkpointingMode定義一致性保證的檢查點模式。預設值:EXACTLY_ONCE
disableMetrics在 Flink 執行器中停用 Beam 指標預設值:false
enableStableInputDrain允許對包含 RequiresStableInput 運算子的 Flink 管線進行排空操作。請注意,在排空時,如果 DoFn 運算子中存在任何與處理相關的失敗,則可能會違反 RequiresStableInput 合約。預設值:false
executionModeForBatch批次管線的資料交換的 Flink 模式。參考 {@link org.apache.flink.api.common.ExecutionMode}。如果管線遭到封鎖,請將其設定為 BATCH_FORCED,請參閱 https://issues.apache.org/jira/browse/FLINK-10672預設值:PIPELINED
executionRetryDelay設定執行之間的延遲(以毫秒為單位)。值 {@code -1} 表示應使用預設值。預設值:-1
externalizedCheckpointsEnabled啟用或停用外部化檢查點。與 CheckpointingInterval 結合使用預設值:false
failOnCheckpointingErrors設定任務在檢查點程序中遇到錯誤時的預期行為。若設定為 true,任務在檢查點錯誤時會失敗。若設定為 false,任務只會拒絕檢查點並繼續執行。預設值:true
fasterCopy移除運算子之間不必要的深層複製。請參閱 https://issues.apache.org/jira/browse/BEAM-11146預設值:false
fileInputSplitMaxSizeMB設定從檔案系統讀取資料時輸入分割的最大大小。0 表示無最大大小限制。預設值:0
finishBundleBeforeCheckpointing若設定,將在檢查運算子的狀態之前完成目前的 bundle 並清除所有輸出。預設情況下,會立即開始檢查點並緩衝剩餘的 bundle 輸出作為檢查點的一部分。此設定可能會影響檢查點對齊。預設值:false
flinkConfDir包含 Flink YAML 設定檔的目錄。這些屬性將會設定給提交到 Flink 的所有任務,並且優先於 FLINK_CONF_DIR 中的設定。
flinkMaster應執行 Pipeline 的 Flink Master 位址。可以是 "host:port" 的形式,或是 [local]、[collection] 或 [auto] 等特殊值之一。預設值:[auto]
forceUnalignedCheckpointEnabled強制啟用未對齊的檢查點,特別是允許迭代任務使用。預設值:false
jobCheckIntervalInSecs在分離模式下,使用 waitUntilFinish 方法設定任務檢查間隔(以秒為單位),預設為 5 秒。預設值:5
latencyTrackingInterval從來源端傳送延遲追蹤標記到接收端的間隔時間(以毫秒為單位)。間隔值 <= 0 會停用此功能。預設值:0
maxBundleSize一個 bundle 中元素的最大數量。串流任務的預設值為 1000,批次任務則為 1,000,000。預設值:MaxBundleSizeFactory
maxBundleTimeMills在最終確定一個 bundle 之前等待的最大時間(以毫秒為單位)。串流的預設值為 1000,批次則為 10,000。預設值:MaxBundleTimeFactory
maxParallelism要使用的 Pipeline 全域最大並行度。最大並行度指定動態調整的上限,以及用於分區狀態的金鑰組數量。預設值:-1
minPauseBetweenCheckpoints觸發下一個檢查點之前最短的暫停時間(以毫秒為單位)。預設值:-1
numConcurrentCheckpoints最大並行檢查點數量。預設值為 1 (= 無並行檢查點)。預設值:1
numberOfExecutionRetries設定失敗的任務重新執行的次數。值為零會有效地停用容錯功能。值為 -1 表示應使用系統預設值(如設定中定義)。預設值:-1
objectReuse設定重複使用物件的行為。預設值:false
operatorChaining設定運算子鏈結的行為。預設值:true
parallelism將運算分配到工作節點時要使用的並行度。如果未設定並行度,則會使用設定的 Flink 預設值,如果找不到則使用 1。預設值:-1
reIterableGroupByKeyResult指出 GBK 結果是否需要可重複疊代的旗標。可重複疊代結果表示單一金鑰的所有值都必須放入記憶體,因為目前不支援溢寫到磁碟。預設值:false
reportCheckpointDuration若非空值,則在提供的度量命名空間中回報每個 ParDo 階段的檢查點持續時間。
retainExternalizedCheckpointsOnCancellation設定取消時外部化檢查點的行為。預設值:false
savepointPath儲存點還原路徑。若指定,將從提供的路徑還原串流 Pipeline。
shutdownSourcesAfterIdleMs關閉已閒置設定毫秒數的來源。來源關閉後,不再可能執行檢查點。一旦所有輸入都處理完畢,關閉來源最終會導致 Pipeline 關閉(=Flink 任務完成)。除非明確設定,否則當啟用檢查點時,預設值為 Long.MAX_VALUE,而停用檢查點時則為 0。請參閱 https://issues.apache.org/jira/browse/FLINK-2491 以了解此問題的進度。預設值:-1
stateBackend用於儲存 Beam 狀態的狀態後端。使用 'rocksdb' 或 'filesystem'。
stateBackendFactory設定在串流模式中使用的狀態後端工廠。預設為 Flink 叢集的 state.backend 設定。
stateBackendStoragePath用於持久化狀態後端資料的狀態後端路徑。用於初始化狀態後端。
unalignedCheckpointEnabled若設定,未對齊的檢查點會包含作為檢查點狀態一部分的處理中資料(即儲存在緩衝區中的資料),允許檢查點障礙超越這些緩衝區。因此,檢查點持續時間不再取決於目前的輸送量,因為檢查點障礙實際上不再嵌入資料串流中。預設值:false
allow_non_restored_state旗標,指出如果儲存點包含不再是管線一部分的運算子的狀態,是否允許非還原狀態。預設值:false
attached_mode指定管線是以附加模式還是分離模式提交預設值:true
auto_balance_write_files_sharding_enabled旗標,指示是否應啟用 WriteFiles 轉換的自動平衡分片。這在串流用例中可能很有用,在串流用例中,管線需要將相當多的事件寫入檔案,通常分為 N 個分片。Flink 上的預設行為是,某些工作者將收到比其他工作者更多的分片來處理。這會導致工作者在處理待辦事項和記憶體使用方面失去平衡。啟用此功能將使分片在可用的工作者之間均勻分佈,從而提高輸送量和記憶體使用穩定性。預設值:false
auto_watermark_interval自動水位線發射的間隔(以毫秒為單位)。
checkpoint_timeout_millis檢查點在被捨棄之前可能需要的最大時間(以毫秒為單位)。預設值:-1
checkpointing_interval觸發執行中管線檢查點的間隔(以毫秒為單位)。預設值:不進行檢查點。預設值:-1
checkpointing_mode定義一致性保證的檢查點模式。預設值:EXACTLY_ONCE
disable_metrics在 Flink 執行器中停用 Beam 指標預設值:false
enable_stable_input_drain允許對包含 RequiresStableInput 運算子的 Flink 管線進行排空操作。請注意,在排空時,如果 DoFn 運算子中存在任何與處理相關的失敗,則可能會違反 RequiresStableInput 合約。預設值:false
execution_mode_for_batch批次管線的資料交換的 Flink 模式。參考 {@link org.apache.flink.api.common.ExecutionMode}。如果管線遭到封鎖,請將其設定為 BATCH_FORCED,請參閱 https://issues.apache.org/jira/browse/FLINK-10672預設值:PIPELINED
execution_retry_delay設定執行之間的延遲(以毫秒為單位)。值 {@code -1} 表示應使用預設值。預設值:-1
externalized_checkpoints_enabled啟用或停用外部化檢查點。與 CheckpointingInterval 結合使用預設值:false
fail_on_checkpointing_errors設定任務在檢查點程序中遇到錯誤時的預期行為。若設定為 true,任務在檢查點錯誤時會失敗。若設定為 false,任務只會拒絕檢查點並繼續執行。預設值:true
faster_copy移除運算子之間不必要的深層複製。請參閱 https://issues.apache.org/jira/browse/BEAM-11146預設值:false
file_input_split_max_size_m_b設定從檔案系統讀取資料時輸入分割的最大大小。0 表示無最大大小限制。預設值:0
finish_bundle_before_checkpointing若設定,將在檢查運算子的狀態之前完成目前的 bundle 並清除所有輸出。預設情況下,會立即開始檢查點並緩衝剩餘的 bundle 輸出作為檢查點的一部分。此設定可能會影響檢查點對齊。預設值:false
flink_conf_dir包含 Flink YAML 設定檔的目錄。這些屬性將會設定給提交到 Flink 的所有任務,並且優先於 FLINK_CONF_DIR 中的設定。
flink_master應執行 Pipeline 的 Flink Master 位址。可以是 "host:port" 的形式,或是 [local]、[collection] 或 [auto] 等特殊值之一。預設值:[auto]
force_unaligned_checkpoint_enabled強制啟用未對齊的檢查點,特別是允許迭代任務使用。預設值:false
job_check_interval_in_secs在分離模式下,使用 waitUntilFinish 方法設定任務檢查間隔(以秒為單位),預設為 5 秒。預設值:5
latency_tracking_interval從來源端傳送延遲追蹤標記到接收端的間隔時間(以毫秒為單位)。間隔值 <= 0 會停用此功能。預設值:0
max_bundle_size一個 bundle 中元素的最大數量。串流任務的預設值為 1000,批次任務則為 1,000,000。預設值:MaxBundleSizeFactory
max_bundle_time_mills在最終確定一個 bundle 之前等待的最大時間(以毫秒為單位)。串流的預設值為 1000,批次則為 10,000。預設值:MaxBundleTimeFactory
max_parallelism要使用的 Pipeline 全域最大並行度。最大並行度指定動態調整的上限,以及用於分區狀態的金鑰組數量。預設值:-1
min_pause_between_checkpoints觸發下一個檢查點之前最短的暫停時間(以毫秒為單位)。預設值:-1
num_concurrent_checkpoints最大並行檢查點數量。預設值為 1 (= 無並行檢查點)。預設值:1
number_of_execution_retries設定失敗的任務重新執行的次數。值為零會有效地停用容錯功能。值為 -1 表示應使用系統預設值(如設定中定義)。預設值:-1
object_reuse設定重複使用物件的行為。預設值:false
operator_chaining設定運算子鏈結的行為。預設值:true
parallelism將運算分配到工作節點時要使用的並行度。如果未設定並行度,則會使用設定的 Flink 預設值,如果找不到則使用 1。預設值:-1
re_iterable_group_by_key_result指出 GBK 結果是否需要可重複疊代的旗標。可重複疊代結果表示單一金鑰的所有值都必須放入記憶體,因為目前不支援溢寫到磁碟。預設值:false
report_checkpoint_duration若非空值,則在提供的度量命名空間中回報每個 ParDo 階段的檢查點持續時間。
retain_externalized_checkpoints_on_cancellation設定取消時外部化檢查點的行為。預設值:false
savepoint_path儲存點還原路徑。若指定,將從提供的路徑還原串流 Pipeline。
shutdown_sources_after_idle_ms關閉已閒置設定毫秒數的來源。來源關閉後,不再可能執行檢查點。一旦所有輸入都處理完畢,關閉來源最終會導致 Pipeline 關閉(=Flink 任務完成)。除非明確設定,否則當啟用檢查點時,預設值為 Long.MAX_VALUE,而停用檢查點時則為 0。請參閱 https://issues.apache.org/jira/browse/FLINK-2491 以了解此問題的進度。預設值:-1
state_backend用於儲存 Beam 狀態的狀態後端。使用 'rocksdb' 或 'filesystem'。
state_backend_factory設定在串流模式中使用的狀態後端工廠。預設為 Flink 叢集的 state.backend 設定。
state_backend_storage_path用於持久化狀態後端資料的狀態後端路徑。用於初始化狀態後端。
unaligned_checkpoint_enabled若設定,未對齊的檢查點會包含作為檢查點狀態一部分的處理中資料(即儲存在緩衝區中的資料),允許檢查點障礙超越這些緩衝區。因此,檢查點持續時間不再取決於目前的輸送量,因為檢查點障礙實際上不再嵌入資料串流中。預設值:false

有關一般的 Beam Pipeline 選項,請參閱 PipelineOptions 參考資料。

Flink 叢集版本必須與 FlinkRunner 使用的次要版本相符。次要版本是版本字串中的前兩個數字,例如在 1.18.0 中,次要版本是 1.18

我們會嘗試追蹤 Beam 發佈時 Apache Flink 的最新版本。Beam 支援 Flink 版本的時間與 Flink 社群支援的時間相同。Flink 社群支援最新的兩個次要版本。當停止支援 Flink 版本時,也可能會從 Beam 中移除該版本並將其標示為已淘汰。若要找出哪個版本的 Flink 與 Beam 相容,請參閱下表

Flink 版本Artifact Id支援的 Beam 版本
1.19.xbeam-runners-flink-1.19≥ 2.61.0
1.18.xbeam-runners-flink-1.18≥ 2.57.0
1.17.xbeam-runners-flink-1.17≥ 2.56.0
1.16.xbeam-runners-flink-1.162.47.0 - 2.60.0
1.15.xbeam-runners-flink-1.152.40.0 - 2.60.0
1.14.xbeam-runners-flink-1.142.38.0 - 2.56.0
1.13.xbeam-runners-flink-1.132.31.0 - 2.55.0
1.12.xbeam-runners-flink-1.122.27.0 - 2.55.0
1.11.xbeam-runners-flink-1.112.25.0 - 2.38.0
1.10.xbeam-runners-flink-1.102.21.0 - 2.30.0
1.9.xbeam-runners-flink-1.92.17.0 - 2.29.0
1.8.xbeam-runners-flink-1.82.13.0 - 2.29.0
1.7.xbeam-runners-flink-1.72.10.0 - 2.20.0
1.6.xbeam-runners-flink-1.62.10.0 - 2.16.0
1.5.xbeam-runners-flink_2.112.6.0 - 2.16.0
1.4.x,搭配 Scala 2.11beam-runners-flink_2.112.3.0 - 2.5.0
1.3.x,搭配 Scala 2.10beam-runners-flink_2.102.1.x - 2.2.0
1.2.x,搭配 Scala 2.10beam-runners-flink_2.102.0.0

若要取得正確的 Flink 版本,請參閱 Flink 下載頁面

如需更多資訊,Flink 文件可能會很有幫助。

Beam 功能

Beam 功能矩陣記錄了傳統 Flink Runner 的功能。

可攜式功能矩陣記錄了可攜式 Flink Runner 的功能。