概述
Apache Flink 執行器可用於使用 Apache Flink 執行 Beam 管線。對於執行,您可以選擇叢集執行模式(例如 Yarn/Kubernetes/Mesos)或本機嵌入式執行模式,這對於測試管線很有用。
Flink 執行器和 Flink 適用於大規模、連續的作業,並提供
- 一個以串流為先的執行時環境,同時支援批次處理和資料串流程式
- 一個同時支援極高輸送量和低事件延遲的執行時環境
- 具有精確一次處理保證的容錯能力
- 串流程式中的自然回壓
- 自訂記憶體管理,可在記憶體內和核心外資料處理演算法之間進行高效且穩健的切換
- 與 YARN 和 Apache Hadoop 生態系統的其他元件整合
使用 Apache Flink 執行器
重要的是要了解 Flink 執行器有兩種版本
- 原始的經典執行器,僅支援 Java(和其他基於 JVM 的語言)
- 較新的可攜式執行器,支援 Java/Python/Go
您可能會問為什麼會有兩個執行器?
Beam 及其執行器最初僅支援基於 JVM 的語言(例如 Java/Scala/Kotlin)。Python 和 Go SDK 是稍後才添加的。必須顯著更改執行器的架構,才能支援執行以其他語言編寫的管線。
如果您的應用程式僅使用 Java,那麼您目前應該使用經典執行器。最終,可攜式執行器將取代經典執行器,因為它包含用於在未來執行 Java、Python、Go 和更多語言的通用框架。
如果您想在 Flink 上使用 Beam 執行 Python 管線,則需要使用可攜式執行器。有關可攜性的更多資訊,請瀏覽可攜性頁面。
因此,本指南分為多個部分,以記錄 Flink 執行器的經典和可攜式功能。此外,Python 提供方便的封裝器來處理執行器的完整生命週期,因此進一步分為自動(建議)或手動管理可攜性元件。請使用下面的切換器為執行器選擇適當的模式
- 經典(Java)
- 可攜式(Python)
- 可攜式(Java/Python/Go)
先決條件和設定
如果您想使用 Flink 執行器的本機執行模式,則不必完成任何叢集設定。您可以直接執行您的 Beam 管線。請務必將執行器設定為FlinkRunner
PortableRunner
。
若要使用 Flink 執行器在叢集上執行,您必須按照 Flink 設定快速入門來設定 Flink 叢集。
相依性
您必須在 pom.xml
或 build.gradle
中指定您對 Flink 執行器的依賴性。使用下面相容性表格中的 Beam 版本和構件 ID。例如
您的執行環境中需要安裝 Docker。若要執行嵌入式 Flink 叢集或將 Flink 執行器用於 Python < 3.6,您的執行環境中還需要有 Java。
您的執行環境中需要安裝 Docker。
在 Flink 叢集上執行 Beam 管線
若要在 Flink 叢集上執行管線,您需要將您的程式與所有相依性一起封裝在所謂的 fat jar 中。如何執行此操作取決於您的建置系統,但是如果您按照Beam 快速入門進行操作,則這是您必須執行的命令
在此命令的 target
資料夾中尋找輸出 JAR。
Beam 快速入門 Maven 專案設定為使用 Maven Shade 外掛程式來建立 fat jar,而 -Pflink-runner
引數確保包含對 Flink 執行器的依賴性。
若要執行管線,最簡單的選項是使用 Flink 的一部分 flink
命令
$ bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters
或者,您也可以使用 Maven 的 exec 命令。例如,若要執行 WordCount 範例
如果您的本機電腦上正在執行 Flink JobManager
,您可以為 flinkMaster
提供 localhost:8081
。否則,將為該作業啟動一個嵌入式 Flink 叢集。
若要在 Flink 上執行管線,請將執行器設定為 FlinkRunner
,並將 flink_master
設定為 Flink 叢集的 master URL。此外,還可以選擇將 environment_type
設定為 LOOPBACK
。例如,在啟動本機 Flink 叢集後,可以執行
若要在嵌入式 Flink 叢集上執行,只需省略 flink_master
選項,系統將為該作業自動啟動和關閉嵌入式 Flink 叢集。
對於舊版 Python,也可能需要選擇性的 flink_version
選項。
從 Beam 2.18.0 開始,預先建置的 Flink Job Service Docker 映像可在 Docker Hub 上取得:Flink 1.16。Flink 1.17。Flink 1.18。
若要在嵌入式 Flink 叢集上執行管線
(1)啟動 JobService 端點:docker run --net=host apache/beam_flink1.10_job_server:latest
JobService 是您將 Beam 管線提交到的中央執行個體。JobService 將為管線建立 Flink 作業,並執行該作業。
(2)使用 PortableRunner
將 Python 管線提交至上述端點,job_endpoint
設定為 localhost:8099
(這是 JobService 的預設位址)。還可以選擇將 environment_type
設定為 LOOPBACK
。例如
若要在單獨的Flink 叢集上執行
(1)啟動一個公開 Rest 介面的 Flink 叢集(例如,預設為 localhost:8081
)。
(2)使用 Flink Rest 端點啟動 JobService:docker run --net=host apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081
。
(3)如上所述提交管線。
請注意,environment_type=LOOPBACK
僅適用於本機測試,不適用於遠端叢集。有關詳細資訊,請參閱此處。
其他資訊和注意事項
監控您的作業
您可以使用 Flink JobManager 儀表板或其 Rest 介面來監控正在執行的 Flink 作業。預設情況下,這可在 JobManager 節點的連接埠 8081
上取得。如果您在本機電腦上安裝了 Flink,則該位址會是 https://127.0.0.1:8081
。注意:當您使用 [local]
模式時,將會啟動一個嵌入式 Flink 叢集,不會提供儀表板。
串流執行
如果您的管線使用無界資料來源或接收器,則 Flink 執行器會自動切換到串流模式。您可以使用 --streaming
旗標強制執行串流模式。
注意:當使用無界來源且未啟用檢查點時,執行器會列印警告訊息。許多來源(例如 PubSubIO
)依賴其檢查點來進行確認,這僅能在為 FlinkRunner
啟用檢查點時完成。若要啟用檢查點,請將checkpointingInterval
checkpointing_interval
設定為所需的檢查點間隔(以毫秒為單位)。
Flink 執行器的管線選項
當您使用 Flink 執行器執行管線時,您可以設定這些管線選項。
以下 Flink 特定的管線選項清單是從 FlinkPipelineOptions 參考類別自動產生
allowNonRestoredState | 旗標,指出如果儲存點包含不再是管線一部分的運算子的狀態,是否允許非還原狀態。 | 預設值:false |
attachedMode | 指定管線是以附加模式還是分離模式提交 | 預設值:true |
autoBalanceWriteFilesShardingEnabled | 旗標,指示是否應啟用 WriteFiles 轉換的自動平衡分片。這在串流用例中可能很有用,在串流用例中,管線需要將相當多的事件寫入檔案,通常分為 N 個分片。Flink 上的預設行為是,某些工作者將收到比其他工作者更多的分片來處理。這會導致工作者在處理待辦事項和記憶體使用方面失去平衡。啟用此功能將使分片在可用的工作者之間均勻分佈,從而提高輸送量和記憶體使用穩定性。 | 預設值:false |
autoWatermarkInterval | 自動水位線發射的間隔(以毫秒為單位)。 | |
checkpointTimeoutMillis | 檢查點在被捨棄之前可能需要的最大時間(以毫秒為單位)。 | 預設值:-1 |
checkpointingInterval | 觸發執行中管線檢查點的間隔(以毫秒為單位)。預設值:不進行檢查點。 | 預設值:-1 |
checkpointingMode | 定義一致性保證的檢查點模式。 | 預設值:EXACTLY_ONCE |
disableMetrics | 在 Flink 執行器中停用 Beam 指標 | 預設值:false |
enableStableInputDrain | 允許對包含 RequiresStableInput 運算子的 Flink 管線進行排空操作。請注意,在排空時,如果 DoFn 運算子中存在任何與處理相關的失敗,則可能會違反 RequiresStableInput 合約。 | 預設值:false |
executionModeForBatch | 批次管線的資料交換的 Flink 模式。參考 {@link org.apache.flink.api.common.ExecutionMode}。如果管線遭到封鎖,請將其設定為 BATCH_FORCED,請參閱 https://issues.apache.org/jira/browse/FLINK-10672 | 預設值:PIPELINED |
executionRetryDelay | 設定執行之間的延遲(以毫秒為單位)。值 {@code -1} 表示應使用預設值。 | 預設值:-1 |
externalizedCheckpointsEnabled | 啟用或停用外部化檢查點。與 CheckpointingInterval 結合使用 | 預設值:false |
failOnCheckpointingErrors | 設定任務在檢查點程序中遇到錯誤時的預期行為。若設定為 true,任務在檢查點錯誤時會失敗。若設定為 false,任務只會拒絕檢查點並繼續執行。 | 預設值:true |
fasterCopy | 移除運算子之間不必要的深層複製。請參閱 https://issues.apache.org/jira/browse/BEAM-11146 | 預設值:false |
fileInputSplitMaxSizeMB | 設定從檔案系統讀取資料時輸入分割的最大大小。0 表示無最大大小限制。 | 預設值:0 |
finishBundleBeforeCheckpointing | 若設定,將在檢查運算子的狀態之前完成目前的 bundle 並清除所有輸出。預設情況下,會立即開始檢查點並緩衝剩餘的 bundle 輸出作為檢查點的一部分。此設定可能會影響檢查點對齊。 | 預設值:false |
flinkConfDir | 包含 Flink YAML 設定檔的目錄。這些屬性將會設定給提交到 Flink 的所有任務,並且優先於 FLINK_CONF_DIR 中的設定。 | |
flinkMaster | 應執行 Pipeline 的 Flink Master 位址。可以是 "host:port" 的形式,或是 [local]、[collection] 或 [auto] 等特殊值之一。 | 預設值:[auto] |
forceUnalignedCheckpointEnabled | 強制啟用未對齊的檢查點,特別是允許迭代任務使用。 | 預設值:false |
jobCheckIntervalInSecs | 在分離模式下,使用 waitUntilFinish 方法設定任務檢查間隔(以秒為單位),預設為 5 秒。 | 預設值:5 |
latencyTrackingInterval | 從來源端傳送延遲追蹤標記到接收端的間隔時間(以毫秒為單位)。間隔值 <= 0 會停用此功能。 | 預設值:0 |
maxBundleSize | 一個 bundle 中元素的最大數量。串流任務的預設值為 1000,批次任務則為 1,000,000。 | 預設值:MaxBundleSizeFactory |
maxBundleTimeMills | 在最終確定一個 bundle 之前等待的最大時間(以毫秒為單位)。串流的預設值為 1000,批次則為 10,000。 | 預設值:MaxBundleTimeFactory |
maxParallelism | 要使用的 Pipeline 全域最大並行度。最大並行度指定動態調整的上限,以及用於分區狀態的金鑰組數量。 | 預設值:-1 |
minPauseBetweenCheckpoints | 觸發下一個檢查點之前最短的暫停時間(以毫秒為單位)。 | 預設值:-1 |
numConcurrentCheckpoints | 最大並行檢查點數量。預設值為 1 (= 無並行檢查點)。 | 預設值:1 |
numberOfExecutionRetries | 設定失敗的任務重新執行的次數。值為零會有效地停用容錯功能。值為 -1 表示應使用系統預設值(如設定中定義)。 | 預設值:-1 |
objectReuse | 設定重複使用物件的行為。 | 預設值:false |
operatorChaining | 設定運算子鏈結的行為。 | 預設值:true |
parallelism | 將運算分配到工作節點時要使用的並行度。如果未設定並行度,則會使用設定的 Flink 預設值,如果找不到則使用 1。 | 預設值:-1 |
reIterableGroupByKeyResult | 指出 GBK 結果是否需要可重複疊代的旗標。可重複疊代結果表示單一金鑰的所有值都必須放入記憶體,因為目前不支援溢寫到磁碟。 | 預設值:false |
reportCheckpointDuration | 若非空值,則在提供的度量命名空間中回報每個 ParDo 階段的檢查點持續時間。 | |
retainExternalizedCheckpointsOnCancellation | 設定取消時外部化檢查點的行為。 | 預設值:false |
savepointPath | 儲存點還原路徑。若指定,將從提供的路徑還原串流 Pipeline。 | |
shutdownSourcesAfterIdleMs | 關閉已閒置設定毫秒數的來源。來源關閉後,不再可能執行檢查點。一旦所有輸入都處理完畢,關閉來源最終會導致 Pipeline 關閉(=Flink 任務完成)。除非明確設定,否則當啟用檢查點時,預設值為 Long.MAX_VALUE,而停用檢查點時則為 0。請參閱 https://issues.apache.org/jira/browse/FLINK-2491 以了解此問題的進度。 | 預設值:-1 |
stateBackend | 用於儲存 Beam 狀態的狀態後端。使用 'rocksdb' 或 'filesystem'。 | |
stateBackendFactory | 設定在串流模式中使用的狀態後端工廠。預設為 Flink 叢集的 state.backend 設定。 | |
stateBackendStoragePath | 用於持久化狀態後端資料的狀態後端路徑。用於初始化狀態後端。 | |
unalignedCheckpointEnabled | 若設定,未對齊的檢查點會包含作為檢查點狀態一部分的處理中資料(即儲存在緩衝區中的資料),允許檢查點障礙超越這些緩衝區。因此,檢查點持續時間不再取決於目前的輸送量,因為檢查點障礙實際上不再嵌入資料串流中。 | 預設值:false |
allow_non_restored_state | 旗標,指出如果儲存點包含不再是管線一部分的運算子的狀態,是否允許非還原狀態。 | 預設值:false |
attached_mode | 指定管線是以附加模式還是分離模式提交 | 預設值:true |
auto_balance_write_files_sharding_enabled | 旗標,指示是否應啟用 WriteFiles 轉換的自動平衡分片。這在串流用例中可能很有用,在串流用例中,管線需要將相當多的事件寫入檔案,通常分為 N 個分片。Flink 上的預設行為是,某些工作者將收到比其他工作者更多的分片來處理。這會導致工作者在處理待辦事項和記憶體使用方面失去平衡。啟用此功能將使分片在可用的工作者之間均勻分佈,從而提高輸送量和記憶體使用穩定性。 | 預設值:false |
auto_watermark_interval | 自動水位線發射的間隔(以毫秒為單位)。 | |
checkpoint_timeout_millis | 檢查點在被捨棄之前可能需要的最大時間(以毫秒為單位)。 | 預設值:-1 |
checkpointing_interval | 觸發執行中管線檢查點的間隔(以毫秒為單位)。預設值:不進行檢查點。 | 預設值:-1 |
checkpointing_mode | 定義一致性保證的檢查點模式。 | 預設值:EXACTLY_ONCE |
disable_metrics | 在 Flink 執行器中停用 Beam 指標 | 預設值:false |
enable_stable_input_drain | 允許對包含 RequiresStableInput 運算子的 Flink 管線進行排空操作。請注意,在排空時,如果 DoFn 運算子中存在任何與處理相關的失敗,則可能會違反 RequiresStableInput 合約。 | 預設值:false |
execution_mode_for_batch | 批次管線的資料交換的 Flink 模式。參考 {@link org.apache.flink.api.common.ExecutionMode}。如果管線遭到封鎖,請將其設定為 BATCH_FORCED,請參閱 https://issues.apache.org/jira/browse/FLINK-10672 | 預設值:PIPELINED |
execution_retry_delay | 設定執行之間的延遲(以毫秒為單位)。值 {@code -1} 表示應使用預設值。 | 預設值:-1 |
externalized_checkpoints_enabled | 啟用或停用外部化檢查點。與 CheckpointingInterval 結合使用 | 預設值:false |
fail_on_checkpointing_errors | 設定任務在檢查點程序中遇到錯誤時的預期行為。若設定為 true,任務在檢查點錯誤時會失敗。若設定為 false,任務只會拒絕檢查點並繼續執行。 | 預設值:true |
faster_copy | 移除運算子之間不必要的深層複製。請參閱 https://issues.apache.org/jira/browse/BEAM-11146 | 預設值:false |
file_input_split_max_size_m_b | 設定從檔案系統讀取資料時輸入分割的最大大小。0 表示無最大大小限制。 | 預設值:0 |
finish_bundle_before_checkpointing | 若設定,將在檢查運算子的狀態之前完成目前的 bundle 並清除所有輸出。預設情況下,會立即開始檢查點並緩衝剩餘的 bundle 輸出作為檢查點的一部分。此設定可能會影響檢查點對齊。 | 預設值:false |
flink_conf_dir | 包含 Flink YAML 設定檔的目錄。這些屬性將會設定給提交到 Flink 的所有任務,並且優先於 FLINK_CONF_DIR 中的設定。 | |
flink_master | 應執行 Pipeline 的 Flink Master 位址。可以是 "host:port" 的形式,或是 [local]、[collection] 或 [auto] 等特殊值之一。 | 預設值:[auto] |
force_unaligned_checkpoint_enabled | 強制啟用未對齊的檢查點,特別是允許迭代任務使用。 | 預設值:false |
job_check_interval_in_secs | 在分離模式下,使用 waitUntilFinish 方法設定任務檢查間隔(以秒為單位),預設為 5 秒。 | 預設值:5 |
latency_tracking_interval | 從來源端傳送延遲追蹤標記到接收端的間隔時間(以毫秒為單位)。間隔值 <= 0 會停用此功能。 | 預設值:0 |
max_bundle_size | 一個 bundle 中元素的最大數量。串流任務的預設值為 1000,批次任務則為 1,000,000。 | 預設值:MaxBundleSizeFactory |
max_bundle_time_mills | 在最終確定一個 bundle 之前等待的最大時間(以毫秒為單位)。串流的預設值為 1000,批次則為 10,000。 | 預設值:MaxBundleTimeFactory |
max_parallelism | 要使用的 Pipeline 全域最大並行度。最大並行度指定動態調整的上限,以及用於分區狀態的金鑰組數量。 | 預設值:-1 |
min_pause_between_checkpoints | 觸發下一個檢查點之前最短的暫停時間(以毫秒為單位)。 | 預設值:-1 |
num_concurrent_checkpoints | 最大並行檢查點數量。預設值為 1 (= 無並行檢查點)。 | 預設值:1 |
number_of_execution_retries | 設定失敗的任務重新執行的次數。值為零會有效地停用容錯功能。值為 -1 表示應使用系統預設值(如設定中定義)。 | 預設值:-1 |
object_reuse | 設定重複使用物件的行為。 | 預設值:false |
operator_chaining | 設定運算子鏈結的行為。 | 預設值:true |
parallelism | 將運算分配到工作節點時要使用的並行度。如果未設定並行度,則會使用設定的 Flink 預設值,如果找不到則使用 1。 | 預設值:-1 |
re_iterable_group_by_key_result | 指出 GBK 結果是否需要可重複疊代的旗標。可重複疊代結果表示單一金鑰的所有值都必須放入記憶體,因為目前不支援溢寫到磁碟。 | 預設值:false |
report_checkpoint_duration | 若非空值,則在提供的度量命名空間中回報每個 ParDo 階段的檢查點持續時間。 | |
retain_externalized_checkpoints_on_cancellation | 設定取消時外部化檢查點的行為。 | 預設值:false |
savepoint_path | 儲存點還原路徑。若指定,將從提供的路徑還原串流 Pipeline。 | |
shutdown_sources_after_idle_ms | 關閉已閒置設定毫秒數的來源。來源關閉後,不再可能執行檢查點。一旦所有輸入都處理完畢,關閉來源最終會導致 Pipeline 關閉(=Flink 任務完成)。除非明確設定,否則當啟用檢查點時,預設值為 Long.MAX_VALUE,而停用檢查點時則為 0。請參閱 https://issues.apache.org/jira/browse/FLINK-2491 以了解此問題的進度。 | 預設值:-1 |
state_backend | 用於儲存 Beam 狀態的狀態後端。使用 'rocksdb' 或 'filesystem'。 | |
state_backend_factory | 設定在串流模式中使用的狀態後端工廠。預設為 Flink 叢集的 state.backend 設定。 | |
state_backend_storage_path | 用於持久化狀態後端資料的狀態後端路徑。用於初始化狀態後端。 | |
unaligned_checkpoint_enabled | 若設定,未對齊的檢查點會包含作為檢查點狀態一部分的處理中資料(即儲存在緩衝區中的資料),允許檢查點障礙超越這些緩衝區。因此,檢查點持續時間不再取決於目前的輸送量,因為檢查點障礙實際上不再嵌入資料串流中。 | 預設值:false |
有關一般的 Beam Pipeline 選項,請參閱 PipelineOptions 參考資料。
Flink 版本相容性
Flink 叢集版本必須與 FlinkRunner 使用的次要版本相符。次要版本是版本字串中的前兩個數字,例如在 1.18.0
中,次要版本是 1.18
。
我們會嘗試追蹤 Beam 發佈時 Apache Flink 的最新版本。Beam 支援 Flink 版本的時間與 Flink 社群支援的時間相同。Flink 社群支援最新的兩個次要版本。當停止支援 Flink 版本時,也可能會從 Beam 中移除該版本並將其標示為已淘汰。若要找出哪個版本的 Flink 與 Beam 相容,請參閱下表
Flink 版本 | Artifact Id | 支援的 Beam 版本 |
---|---|---|
1.19.x | beam-runners-flink-1.19 | ≥ 2.61.0 |
1.18.x | beam-runners-flink-1.18 | ≥ 2.57.0 |
1.17.x | beam-runners-flink-1.17 | ≥ 2.56.0 |
1.16.x | beam-runners-flink-1.16 | 2.47.0 - 2.60.0 |
1.15.x | beam-runners-flink-1.15 | 2.40.0 - 2.60.0 |
1.14.x | beam-runners-flink-1.14 | 2.38.0 - 2.56.0 |
1.13.x | beam-runners-flink-1.13 | 2.31.0 - 2.55.0 |
1.12.x | beam-runners-flink-1.12 | 2.27.0 - 2.55.0 |
1.11.x | beam-runners-flink-1.11 | 2.25.0 - 2.38.0 |
1.10.x | beam-runners-flink-1.10 | 2.21.0 - 2.30.0 |
1.9.x | beam-runners-flink-1.9 | 2.17.0 - 2.29.0 |
1.8.x | beam-runners-flink-1.8 | 2.13.0 - 2.29.0 |
1.7.x | beam-runners-flink-1.7 | 2.10.0 - 2.20.0 |
1.6.x | beam-runners-flink-1.6 | 2.10.0 - 2.16.0 |
1.5.x | beam-runners-flink_2.11 | 2.6.0 - 2.16.0 |
1.4.x,搭配 Scala 2.11 | beam-runners-flink_2.11 | 2.3.0 - 2.5.0 |
1.3.x,搭配 Scala 2.10 | beam-runners-flink_2.10 | 2.1.x - 2.2.0 |
1.2.x,搭配 Scala 2.10 | beam-runners-flink_2.10 | 2.0.0 |
若要取得正確的 Flink 版本,請參閱 Flink 下載頁面。
如需更多資訊,Flink 文件可能會很有幫助。
Beam 功能
Beam 功能矩陣記錄了傳統 Flink Runner 的功能。
可攜式功能矩陣記錄了可攜式 Flink Runner 的功能。
上次更新日期:2024/10/31
您找到所有您要尋找的內容了嗎?
內容是否有用且清楚?您有任何想要變更的地方嗎?請告訴我們!