Runner 編寫指南

本指南將逐步說明如何實作新的 Runner。目標讀者是擁有資料處理系統並希望使用它來執行 Beam Pipeline 的人。本指南從基礎開始,協助你評估未來的工作。接著,各章節將會越來越詳細,以作為你開發 Runner 的資源。

涵蓋的主題

實作 Beam 原始操作

除了編碼和持久化資料(假設你的引擎已經以某種方式完成了)之外,你需要做的大部分工作是實作 Beam 原始操作。本節詳細介紹了每個原始操作,涵蓋了你可能不知道的所有內容以及提供的支援程式碼。

這些原始操作是為 Pipeline 作者設計的,而不是為 Runner 作者設計的。每一個都代表了不同的概念操作模式(外部 IO、逐元素、分組、窗口化、聯合),而不是特定的實作決策。根據使用者實例化的方式,相同的原始操作可能需要非常不同的實作方式。例如,使用狀態或計時器的 ParDo 可能需要鍵分割,而具有投機觸發的 GroupByKey 可能需要更昂貴或複雜的實作。

如果你還沒有實作這些功能,該怎麼辦?

沒關係!你不必一次完成所有工作,甚至有些功能可能對你的 Runner 永遠沒有意義。我們在 Beam 網站上維護一個能力矩陣,以便你可以告訴使用者你支援哪些功能。當你收到一個 Pipeline 時,你應該遍歷它並確定你是否可以執行你找到的每個 DoFn。如果你無法執行 Pipeline 中的某些 DoFn(或者如果你的 Runner 缺少任何其他要求),你應該拒絕該 Pipeline。在你的原生環境中,這可能看起來像是拋出一個 UnsupportedOperationException。Runner API RPC 將明確這一點,以實現跨語言的可移植性。

實作 Impulse 原始操作

Impulse 是一個 PTransform,它不接收任何輸入,並且在 Pipeline 的生命週期中只產生一個輸出,該輸出應該是具有最小時間戳記的全局窗口中的空位元組。當使用標準窗口化值編碼器進行編碼時,它的編碼值為 7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00

雖然 Impulse 通常不會被使用者調用,但它是唯一的根原始操作,而其他根操作(如 ReadCreate)是由一個 Impulse 後跟一系列(可能是可分割的)ParDo 組成的複合操作。

實作 ParDo 原始操作

ParDo 原始操作描述了 PCollection 的逐元素轉換。ParDo 是最複雜的原始操作,因為它是描述每個元素處理的地方。除了來自函數式編程的標準 mapflatMap 等非常簡單的操作之外,ParDo 還支援多個輸出、側輸入、初始化、刷新、拆解和狀態處理。

應用於每個元素的 UDF 稱為 DoFnDoFn 的確切 API 可能因語言/SDK 而異,但通常遵循相同的模式,因此我們可以使用虛擬程式碼進行討論。我也會經常參考 Java 支援程式碼,因為我熟悉它,而且我們目前和未來的大多數 Runner 都是基於 Java 的。

通常,與其一次對整個輸入資料集應用一系列 ParDo,不如將多個 ParDo 融合到一個可執行的階段中,該階段包含一整系列的映射操作(通常是一個 DAG)。除了 ParDo 之外,窗口化操作、本地(GBK 前或後)組合操作和其他映射操作也可以融合到這些階段中。

由於 DoFn 可能會以與 Runner 本身不同的語言執行程式碼,或者需要不同的環境,因此 Beam 提供了跨程序呼叫這些程式碼的能力。這是 Beam Fn API 的關鍵,有關更多詳細資訊,請參閱下文。然而,當環境相容時,Runner 可以完全在程序中調用此使用者程式碼(為了簡單起見或提高效率)。

Bundles (叢集)

為了正確性,DoFn *應該* 表示逐元素的函數,但在大多數 SDK 中,這是一個長時間存在的物件,它會處理稱為 bundles 的小組元素。

你的 Runner 決定在一個 bundle 中包含多少個元素以及哪些元素,甚至可以在處理過程中動態決定目前的 bundle 已「結束」。bundle 的處理方式與 DoFn 生命週期的其餘部分相關。

通常,盡可能使用最大的 bundles 來提高吞吐量,以便初始化和最終化的成本可以在許多元素上攤銷。但如果你的資料是以串流形式到達,那麼你將需要終止一個 bundle 以達到適當的延遲,因此 bundles 可能只有幾個元素。

bundle 是 Beam 中的提交單位。如果在處理 bundle 時遇到錯誤,則必須由 Runner 丟棄該 bundle 的所有先前輸出(包括對狀態或計時器的任何修改),並重試整個 bundle。成功完成 bundle 後,必須以原子方式提交其輸出,以及任何狀態/計時器修改和水印更新。

DoFn 的生命週期

在許多 SDK 中,DoFn 都有幾個方法,例如 setupstart_bundlefinish_bundleteardown 等,除了標準的逐元素 process 呼叫之外。一般而言,當從標準 bundle 處理器(透過 FnAPI 或直接使用 BundleProcessor(java (python))呼叫一個或多個 DoFn 時,應為你處理 此生命週期 的正確調用。與 SDK 無關的 Runner 永遠不必直接擔心這些細節。

側輸入

主要設計文件:https://s.apache.org/beam-side-inputs-1-pager

側輸入是 PCollection 窗口的全局視圖。這與主要輸入不同,後者一次處理一個元素。SDK/使用者充分準備 PCollection,Runner 將其具體化,然後 Runner 將其饋送到 DoFn

與由 Runner *推* 送到 ParDo 的主要輸入資料(通常透過 FnApi 資料通道)不同,側輸入資料由 ParDo 從 Runner *拉* 取(通常透過 FnAPI 狀態通道)。

側輸入透過特定的 access_pattern 存取。目前在 StandardSideInputTypes proto 中列舉了兩個存取模式:beam:side_input:iterable:v1 表示 Runner 必須傳回與特定窗口對應的 PCollection 中的所有值,以及 beam:side_input:multimap:v1 表示 Runner 必須傳回與特定鍵和窗口對應的所有值。能夠有效地服務這些存取模式可能會影響 Runner 如何具體化此 PCollection。

可以透過查看 ParDo 轉換的 ParDoPayload 中的 side_inputs 映射來偵測側輸入。ParDo 操作本身負責調用 window_mapping_fn(在調用 Runner 之前)和 view_fn(在 Runner 傳回的值上),因此 Runner 不需要關心這些欄位。

當需要側輸入,但側輸入對於給定的窗口沒有相關資料時,必須延遲該窗口中的元素,直到側輸入有一些資料或水印已充分推進,以至於我們可以確定該窗口不會有任何資料。PushBackSideInputDoFnRunner 是實作此操作的一個範例。

狀態與計時器

主要設計文件:https://s.apache.org/beam-state

ParDo 包含狀態和計時器時,它在你的 Runner 上的執行通常會非常不同。特別是,狀態必須在 bundle 完成時持久化,並在未來 bundle 中檢索。設定的計時器也必須在水印充分推進時注入到未來的 bundle 中。

狀態和計時器按鍵和窗口進行分割,也就是說,處理給定鍵的 DoFn 必須在共享此鍵的所有元素中具有一致的狀態和計時器視圖。你可能需要或想要顯式地洗牌資料以支援此操作。一旦水印通過了窗口的末尾(加上允許的延遲時間,如果有的話),則可以捨棄與此窗口相關的狀態。

狀態設定和檢索在 FnAPI 狀態通道上執行,而計時器設定和觸發在 FnAPI 資料通道上發生。

可分割的 DoFn

主要設計文件:https://s.apache.org/splittable-do-fn

可分割的 DoFnParDo 的一種泛化,適用於可以平行處理的高扇出映射。這類操作的典型範例是從檔案讀取,其中單一檔案名稱(作為輸入元素)可以映射到該檔案中包含的所有元素。DoFn 被認為是可分割的,因為代表單一檔案的元素可以被分割(例如,分割成該檔案的範圍),以便由不同的工作節點處理(例如,讀取)。這個基本元件的強大之處在於,這些分割可以動態進行,而不是僅僅靜態地(即預先)進行,避免了過度或分割不足的問題。

關於可分割的 DoFn 的完整說明超出了本文檔的範圍,但這裡有一個關於其執行的簡要概述。

可分割的 DoFn 可以透過在元素內部以及元素之間進行分割來參與動態分割協定。動態分割是由執行器在控制通道上發出 ProcessBundleSplitRequest 訊息所觸發的。SDK 將承諾僅處理指示元素的某一部分,並在 ProcessBundleSplitResponse 中將剩餘部分(即未處理的部分)的描述返回給執行器,以便由執行器排程(例如,在不同的工作節點上或作為不同 bundle 的一部分)。

可分割的 DoFn 也可以啟動自己的分割,表示它已經處理了一個元素,但目前只能處理到這個程度(例如,當追蹤檔案時),但還有更多剩餘。這種情況最常發生在讀取無界來源時。在這種情況下,一組代表延遲工作的元素會傳回在 ProcessBundleResponseresidual_roots 欄位中。在未來,執行器必須使用 residual_roots 中給定的元素重新調用這些相同的操作。

實作 GroupByKey (和窗口) 原始操作

GroupByKey 操作(有時簡稱為 GBK)會將鍵值對的 PCollection 按鍵和視窗分組,並根據 PCollection 的觸發配置發出結果。

它比簡單地將具有相同鍵的元素放置在一起要複雜得多,並且使用了 PCollection 視窗策略中的許多欄位。

依編碼的位元組分組

對於鍵和視窗,您的執行器將它們視為「僅僅是位元組」。因此,您需要以與這些位元組分組一致的方式進行分組,即使您對所涉及的類型有一些特殊的了解。

您正在處理的元素將是鍵值對,並且您需要提取鍵。因此,鍵值對的格式是標準化並在所有 SDK 之間共享的。有關二進位格式的文件,請參閱 Java 中的 KvCoder 或 Python 中的 TupleCoder

窗口合併

除了按鍵分組之外,您的執行器還必須按元素的視窗分組。WindowFn 可以選擇宣告它會基於每個鍵合併視窗。例如,如果相同鍵的 Session Window 重疊,則會將它們合併。因此,您的執行器必須在分組期間調用 WindowFn 的合併方法。

透過 GroupByKeyOnly + GroupAlsoByWindow 實作

Java 和 Python 程式碼庫包含支援程式碼,用於實現完整 GroupByKey 操作的一種特別常見的方法:首先對鍵進行分組,然後按視窗進行分組。對於合併視窗,這基本上是必需的,因為合併是針對每個鍵的。

通常,按時間戳記順序呈現一組值可以更有效地將這些值分組到它們最終的視窗中。

捨棄延遲資料

主要設計文件:https://s.apache.org/beam-lateness

如果輸入 PCollection 的浮水印已超過視窗的末尾,且至少超過了輸入 PCollection 的允許延遲時間,則 PCollection 中的視窗會過期。

過期視窗的資料可以隨時刪除,並且應在 GroupByKey 時刪除。如果您使用 GroupAlsoByWindow,則在此轉換執行之前。如果您在 GroupByKeyOnly 之前刪除資料,則可以減少洗牌的資料量,但只能在非合併視窗的情況下安全地進行,因為顯示為過期的視窗可能會合併而變得不過期。

觸發

主要設計文件:https://s.apache.org/beam-triggers

輸入 PCollection 的觸發器和累積模式指定了 GroupByKey 操作應何時以及如何發出輸出。

在 Java 中,有很多支援程式碼用於在 GroupAlsoByWindow 實作、ReduceFnRunner (舊名稱) 和 TriggerStateMachine 中執行觸發器,這是一種將所有觸發器實作為元素和計時器上的事件驅動機器的顯而易見的方法。在 Python 中,這由 TriggerDriver 類別支援。

TimestampCombiner

當從多個輸入產生聚合輸出時,GroupByKey 操作必須為組合選擇時間戳記。為此,首先 WindowFn 有機會移動時間戳記 - 這對於確保浮水印不會阻止滑動視窗之類的視窗的進度是必要的(詳細資訊超出本文檔的範圍)。然後,需要合併移動的時間戳記 - 這由 TimestampCombiner 指定,它可以選擇其輸入的最小值或最大值,或者僅忽略輸入並選擇視窗的末尾。

實作 Window 原始操作

視窗基本元件會應用 WindowFn UDF,將每個輸入元素放入其輸出 PCollection 的一個或多個視窗中。請注意,基本元件通常還會為 PCollection 配置視窗策略的其他方面,但您的執行器收到的完整建構的圖形將已經為每個 PCollection 設定了完整的視窗策略。

若要實作此基本元件,您需要在每個元素上調用提供的 WindowFn,它將傳回該元素將成為輸出 PCollection 一部分的一組視窗。

大多數執行器透過將這些視窗變更的映射與 DoFn 融合來實作此功能。

實作考量

「視窗」只是第二個分組鍵,它具有「最大時間戳記」。它可以是任何任意的使用者定義類型。WindowFn 提供視窗類型的編碼器。

Beam 的支援程式碼提供 WindowedValue,它是多個視窗中元素的壓縮表示形式。您可能想使用這個,或者您自己的壓縮表示形式。請記住,它只是同時表示多個元素;沒有所謂的「在多個視窗中」的元素。

對於全域視窗中的值,您可能想要使用更進一步的壓縮表示形式,它根本不會包含視窗。

我們提供具有這些最佳化的編碼器,例如 PARAM_WINDOWED_VALUE,可用於減少序列化資料的大小。

未來,此基本元件可能會被淘汰,因為如果增強 ParDo 的功能以允許輸出到新視窗,則可以將其實作為 ParDo。

實作 Flatten 原始操作

這是一個很簡單的功能 - 接受一組有限的 PCollection 作為輸入,並輸出它們的包聯集,同時保持視窗完整。

為了使此操作有意義,SDK 有責任確保視窗策略是相容的。

另請注意,並非要求所有 PCollection 的編碼器都相同。如果您的執行器想要強制要求 (為了避免繁瑣的重新編碼),您必須自己強制執行。或者,您也可以將快速路徑實作為最佳化。

特別提及:Combine 複合操作

執行器幾乎總是特別對待的複合轉換是 CombinePerKey,它將結合律和交換律運算子應用於 PCollection 的元素。此複合不是基本元件。它是使用 ParDoGroupByKey 實作的,因此您的執行器將在不處理它的情況下工作 - 但它確實攜帶您可能想要用於最佳化的其他資訊:結合律-交換律運算子,稱為 CombineFn

通常,執行器會想要透過所謂的合併器提升來實作此功能,其中在 GroupByKey 之前放置一個新的操作,執行部分(bundle 內)合併,這通常也需要稍微修改 GroupByKey 之後的內容。可以在 Pythongo 實作中找到此轉換的範例。產生的 GroupByKey 前後操作通常會與 ParDo 融合在一起,並如上所述執行。

使用 Pipelines

當您從使用者那裡收到管道時,您需要對其進行轉換。關於 Beam 管道的表示方式的說明可以在這裡找到,它們補充了官方 proto 宣告

測試你的 Runner

Beam Java SDK 和 Python SDK 具有一系列執行器驗證測試。組態的發展速度可能快於本文檔,因此請檢查其他 Beam 執行器的組態。但請注意,我們有測試,您可以非常輕鬆地使用它們!若要在使用 Gradle 的 Java 基礎執行器中啟用這些測試,您需要掃描 SDK 的相依性,以尋找具有 JUnit 類別 ValidatesRunner 的測試。

task validatesRunner(type: Test) {
  group = "Verification"
  description = "Validates the runner"
  def pipelineOptions = JsonOutput.toJson(["--runner=MyRunner", ... misc test options ...])
  systemProperty "beamTestPipelineOptions", pipelineOptions
  classpath = configurations.validatesRunner
  testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
  useJUnit {
    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
  }
}

尚未探索在其他語言中啟用這些測試的方法。

讓你的 Runner 與 SDK 良好整合

無論您的執行器是否基於與 SDK 相同的語言(例如 Java),如果您希望該 SDK(例如 Python)的使用者使用它,您都需要提供一個 shim 來從另一個 SDK 調用它。

與 Java SDK 整合

允許使用者將選項傳遞給你的 Runner

組態機制是 PipelineOptions,它是一個與一般 Java 物件完全不同的介面。忘記您所知道的內容,並遵循規則,PipelineOptions 會對您很好。

您必須為您的執行器實作一個子介面,其中包含名稱相符的 getter 和 setter,如下所示:

public interface MyRunnerOptions extends PipelineOptions {
  @Description("The Foo to use with MyRunner")
  @Required
  public Foo getMyRequiredFoo();
  public void setMyRequiredFoo(Foo newValue);

  @Description("Enable Baz; on by default")
  @Default.Boolean(true)
  public Boolean isBazEnabled();
  public void setBazEnabled(Boolean newValue);
}

您可以設定預設值等等。詳情請參閱 javadoc。當您的執行器使用 PipelineOptions 物件實例化時,您可以使用 options.as(MyRunnerOptions.class) 來存取您的介面。

若要讓這些選項在命令列上可用,您需要使用 PipelineOptionsRegistrar 註冊您的選項。如果您使用 @AutoService,這會很容易。

@AutoService(PipelineOptionsRegistrar.class)
public static class MyOptionsRegistrar implements PipelineOptionsRegistrar {
  @Override
  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
    return ImmutableList.<Class<? extends PipelineOptions>>of(MyRunnerOptions.class);
  }
}

向 SDK 註冊你的 Runner 以供命令列使用

若要讓您的執行器在命令列上可用,您需要使用 PipelineRunnerRegistrar 註冊您的選項。如果您使用 @AutoService,這會很容易。

@AutoService(PipelineRunnerRegistrar.class)
public static class MyRunnerRegistrar implements PipelineRunnerRegistrar {
  @Override
  public Iterable<Class<? extends PipelineRunner>> getPipelineRunners() {
    return ImmutableList.<Class<? extends PipelineRunner>>of(MyRunner.class);
  }
}

與 Python SDK 整合

在 Python SDK 中,程式碼的註冊不是自動的。因此,在建立新的執行器時,有幾件事需要記住。

任何新的執行器對套件的依賴關係都應該是選項,因此在 setup.py 中建立一個新的 extra_requires 目標,這是新執行器所需的。

所有執行器程式碼都應該放在 apache_beam/runners 目錄下的自有套件中。

runner.pycreate_runner 函式中註冊新的執行器,以便將部分名稱與要使用的正確類別相匹配。

Python 執行器也可以透過它們的完整限定名稱來識別(例如,當傳遞 runner 參數時),無論它們是否位於 Beam 儲存庫中。

編寫與 SDK 無關的 Runner

要使您的執行器與 SDK 無關,能夠執行用其他語言編寫的管道,有兩個方面:Fn API 和 Runner API。

Fn API

設計文件

要執行使用者的管道,您需要能夠調用他們的 UDF。Fn API 是一個 RPC 介面,用於 Beam 的標準 UDF,使用基於 gRPC 的協議緩衝區實作。

Fn API 包括

您完全可以**也**為您的語言使用 SDK 作為實用程式碼,或為相同語言的 UDF 提供 bundle 處理的優化實作。

Runner API

Runner API 是一個與 SDK 無關的管道架構,以及用於啟動管道和檢查作業狀態的 RPC 介面。僅透過 Runner API 介面檢查管道,可以消除您的執行器對其語言的 SDK 的依賴,以進行管道分析和作業轉換。

要執行這樣一個與 SDK 無關的管道,您需要支援 Fn API。UDF 作為函式的規格(通常只是一個特定語言的不透明序列化位元組)以及可以執行它的環境規格(基本上是一個特定的 SDK)嵌入在管道中。到目前為止,此規格預期是一個 Docker 容器的 URI,該容器託管 SDK 的 Fn API harness。

您完全可以**也**使用您的語言的 SDK,這可能會提供有用的實用程式碼。

管道的與語言無關的定義透過協議緩衝區架構描述,如下所述供參考。但是您的執行器**無需**直接操作 protobuf 訊息。相反,Beam 程式碼庫提供了用於處理管道的實用程式,以便您無需了解管道是否曾經被序列化或傳輸,或一開始可能是用什麼語言編寫的。

Java

如果您的執行器是基於 Java 的,則以 SDK 不可知的的方式與管道互動的工具位於 beam-runners-core-construction-java artifact 中,位於 org.apache.beam.sdk.util.construction 命名空間中。這些實用程式的命名方式一致,如下所示:

僅透過這些類別檢查轉換,您的執行器將不會依賴 Java SDK 的特定細節。

Runner API protos

Runner API 指的是 Beam 模型中概念的特定表現形式,作為協議緩衝區架構。即使您不應該直接操作這些訊息,了解構成管道的規範資料也可能會有所幫助。

大多數 API 與高階描述完全相同;您可以在不了解所有低階細節的情況下開始實作執行器。

對您來說,Runner API 最重要的重點是它是 Beam 管道的與語言無關的定義。您可能始終會透過特定 SDK 的支援程式碼來與這些定義互動,這些程式碼使用合理的慣用 API 包裝這些定義,但始終要意識到這是規格,任何其他資料不一定固有於管道,而可能是 SDK 特定的增強功能(或錯誤!)。

管道中的 UDF 可以為任何 Beam SDK 編寫,甚至在同一個管道中可以有多個。因此,我們將從這裡開始,採用由下而上的方法來理解 UDF 的協議緩衝區定義,然後再回到更高階、大部分顯而易見的記錄定義。

FunctionSpec proto

跨語言可攜性的核心是 FunctionSpec。這是函式的與語言無關的規格,以通常的程式設計意義來說,包括副作用等。

message FunctionSpec {
  string urn;
  bytes payload;
}

FunctionSpec 包括一個識別函式的 URN 以及一個任意的固定參數。例如,(假設的)「max」CombineFn 可能具有 URN beam:combinefn:max:0.1 和一個參數,該參數指示使用什麼比較來取最大值。

對於使用特定語言的 SDK 建構的管道中的大多數 UDF,URN 將指示 SDK 必須解釋它,例如 beam:dofn:javasdk:0.1beam:dofn:pythonsdk:0.1。該參數將包含序列化的程式碼,例如 Java 序列化的 DoFn 或 Python pickled DoFn

FunctionSpec 不僅適用於 UDF。它只是命名/指定任何函式的通用方法。它也用作 PTransform 的規格。但是,當在 PTransform 中使用時,它描述了從 PCollectionPCollection 的函式,並且不能特定於 SDK,因為執行器負責評估轉換並產生 PCollections

不用說,並非每個環境都能夠反序列化每個函式規格。因此,PTransform 具有一個 environment_id 參數,該參數指示至少一個能夠解釋所包含 URN 的環境。這是對管道 proto 的環境映射中環境的引用,並且通常由 docker 映像(可能還有一些額外的依賴項)定義。可能還有其他環境也能夠這樣做,並且執行器可以自由使用它們(如果它知道這一點)。

原始轉換有效負載 protos

基本轉換的 payload 只是它們規格的 proto 序列化。我不會在這裡重新產生它們的完整程式碼,我只會強調重要的部分,以顯示它們如何組合在一起。

再次強調,儘管您可能不會直接與這些 payload 互動,但它們是固有於轉換的唯一資料。

ParDoPayload proto

ParDo 轉換在其 SdkFunctionSpec 中攜帶其 DoFn,然後為其其他功能提供與語言無關的規格 - 側輸入、狀態宣告、計時器宣告等。

message ParDoPayload {
  FunctionSpec do_fn;
  map<string, SideInput> side_inputs;
  map<string, StateSpec> state_specs;
  map<string, TimerSpec> timer_specs;
  ...
}

CombinePayload proto

Combine 不是基本轉換。但是非基本轉換完全能夠攜帶額外的資訊以進行更好的優化。Combine 轉換攜帶的最重要內容是 SdkFunctionSpec 記錄中的 CombineFn。為了有效地執行所需的優化,還必須知道中間累計的編碼器,因此它還攜帶對該編碼器的引用。

message CombinePayload {
  FunctionSpec combine_fn;
  string accumulator_coder_id;
  ...
}

PTransform proto

PTransform 是從 PCollectionPCollection 的函式。這在 proto 中使用 FunctionSpec 表示。

message PTransform {
  FunctionSpec spec;
  repeated string subtransforms;

  // Maps from local string names to PCollection ids
  map<string, bytes> inputs;
  map<string, bytes> outputs;
  ...
}

如果 PTransform 是複合轉換,則它可能具有子轉換,在這種情況下,可以省略 FunctionSpec,因為子轉換定義了其行為。

輸入和輸出 PCollections 是未排序的,並以本機名稱引用。SDK 決定此名稱是什麼,因為它很可能會嵌入到序列化的 UDF 中。

了解給定 PTransform 的規格(無論是基本轉換還是複合轉換),由其 FunctionSpec 定義的執行器,可以自由地用另一個具有相同語義的 PTransform(或其集合)替換它。這通常是處理 CombinePerKey 的方式,但也可以執行許多其他替換。

PCollection proto

PCollection 僅儲存編碼器、視窗策略以及它是否受限。

message PCollection {
  string coder_id;
  IsBounded is_bounded;
  string windowing_strategy_id;
  ...
}

Coder proto

這是一個非常有趣的 proto。編碼器是一個參數化函式,只能由特定的 SDK 理解,因此是 FunctionSpec,但也可能具有完全定義它的元件編碼器。例如,ListCoder 只是一種元格式,而 ListCoder(VarIntCoder) 是一種完全指定的格式。

message Coder {
  FunctionSpec spec;
  repeated string component_coder_ids;
}

大多數(如果不是全部)SDK 都理解大量的 標準編碼器。使用這些編碼器允許跨語言轉換。

Jobs API RPC

概觀 規格

雖然您語言的 SDK 可以讓您避免直接接觸 Runner API proto,但您可能需要為您的執行器實作介面卡,以便將其公開給另一種語言。這允許 Python SDK 調用 Java 執行器,反之亦然。此類型的典型實作可以在 local_job_service.py 中找到,它直接用於前端多個 Python 實作的執行器。

RPC 本身將必然遵循 PipelineRunner 和 PipelineResult 的現有 API,但會被修改為最小的後端通道,而不是豐富且方便的 API。

其中的一個關鍵部分是 Artifacts API,它允許執行器提取和部署二進制 artifact(例如 jar、pypi 套件等),這些 artifact 在各種環境中被列為依賴項,並且可能具有不同的表示形式。這會在提交管道後、執行管道之前調用。提交管道的 SDK 充當執行器接收請求的 artifact 伺服器,而執行器反過來充當託管使用者 UDF 的工作程序(環境)的 artifact 伺服器。