PTransform 風格指南
新的可重複使用 PTransform 的撰寫者風格指南。
與程式語言無關的考量
一致性
與先前的技術保持一致
- 請閱讀貢獻指南。
- 如果某些 SDK 中已存在類似的轉換,請讓您的轉換的 API 類似,以便使用者對其中一個的體驗能夠轉移到另一個。這適用於同語言 SDK 和不同語言 SDK 中的轉換。例外: 僅僅因為它們是在本指南批准之前開發的而明顯違反當前風格指南的預先存在的轉換。在這種情況下,風格指南優先於與現有轉換的一致性。
- 當沒有現有的類似轉換時,請保持在您選擇的語言(例如 Java 或 Python)中慣用的範圍內。
- 如果可以,在提出新的轉換時,請使用此設計文件範本。
公開 PTransform 與其他
所以您想開發一個人們將在其 Beam 管線中使用的函式庫 - 到第三方系統的連接器、機器學習演算法等。您應該如何公開它?
應該
- 將您的函式庫完成的每個主要資料平行任務公開為複合
PTransform
。這允許轉換的結構對使用它的程式碼透明地演變:例如,一開始是ParDo
的東西可以隨著時間的推移變成更複雜的轉換。 - 公開轉換程式碼中大型、非簡單、可重複使用的循序位元,其他人可能希望以您尚未預料到的方式重複使用,作為常規函式或類別函式庫。轉換應該只是將此邏輯連接在一起。作為額外的好處,您可以獨立單元測試這些函式和類別。範例:在開發解析自訂資料格式檔案的轉換時,將格式解析器公開為函式庫;對於實作複雜機器學習演算法的轉換也是如此,等等。
- 在某些情況下,這可能包含 Beam 特定的類別,例如
CombineFn
或非簡單的DoFn
(那些不僅僅是一個@ProcessElement
方法的類別)。作為經驗法則:如果您預期完整封裝的PTransform
可能不足以滿足使用者的需求,並且使用者可能希望重複使用較低層級的原始元件,則公開這些類別。
不應該
- 不要公開轉換在內部結構上的確切方式。例如:您的函式庫的公共 API 介面通常(除了上面的最後一點之外)不應公開
DoFn
、具體Source
或Sink
類別等,以避免讓使用者在應用PTransform
或使用DoFn
/Source
/Sink
之間產生混淆的選擇。
命名
應該
- 尊重特定於程式語言的命名慣例,例如在 Java 和 Python 中以
PascalCase
命名類別,在 Java 中以camelCase
命名函式,但在 Python 中以snake_case
命名函式等。 - 命名工廠函式,使函式名稱是動詞,或者參考轉換讀起來像動詞:例如
MongoDbIO.read()
、Flatten.iterables()
。 - 在類型化語言中,也像動詞一樣命名
PTransform
類別(例如:MongoDbIO.Read
、Flatten.Iterables
)。 - 使用「IO」一詞命名與儲存系統互動的轉換系列:
MongoDbIO
、JdbcIO
。
不應該
- 不要在
PTransform
類別名稱中使用單字transform
、source
、sink
、reader
、writer
、bound
、unbound
(注意:當指PCollection
是否為有界或無界時,bounded
和unbounded
是可以的):這些單字是多餘的、令人困惑的、過時的,或命名 SDK 中現有的不同概念。
設定
哪些內容應放入設定,哪些應放入輸入集合
- 輸入
PCollection
:任何可能存在大量實例的東西(如果可能存在超過 1000 個實例,則應位於PCollection
中),或可能在管線建構時未知。例如:要處理或寫入第三方系統的記錄;要讀取的檔案名稱。例外:有時 Beam API 需要在管線建構時知道某些內容 - 例如Bounded
/UnboundedSource
API。如果您絕對必須使用此類 API,則其輸入當然只能進入轉換設定。 - 進入轉換設定:在整個轉換過程中恆定的內容(包括
ValueProvider
),且不依賴於轉換的輸入PCollection
的內容。例如:資料庫查詢或連線字串;憑證;使用者指定的回呼;調整參數。將參數放入轉換設定的一個優點是,它可以在管線建構時進行驗證。
要公開哪些參數
應該
- 公開 計算輸出所需的參數。
不應該
- 不要公開 調整旋鈕,例如批次大小、連線集區大小,除非不可能自動提供或計算足夠好的值(也就是說,除非您可以想像一個合理的人報告缺少此旋鈕的錯誤)。
- 當開發到具有許多參數的函式庫的連接器時,不要鏡像底層函式庫的每個參數 - 如有必要,重複使用底層函式庫的設定類別,並讓使用者提供整個實例。範例:
JdbcIO
。例外 1:如果底層函式庫的某些參數與 Beam 語義非簡單地互動,則公開它們。例如,當開發到具有發佈者「交付保證」參數的發佈/訂閱系統的連接器時,公開該參數,但禁止使用與 Beam 模型不相容的值(最多一次和恰好一次)。例外 2:如果底層函式庫的設定類別使用起來很麻煩 - 例如,未宣告穩定的 API、公開有問題的傳遞相依性,或不遵守語義版本控制 - 在這種情況下,最好將其包裝起來,並向轉換的使用者公開更乾淨、更穩定的 API。
錯誤處理
轉換設定錯誤
儘早偵測錯誤。可以在以下階段偵測到錯誤
- (在編譯語言中)編譯使用者管線的原始程式碼
- 建構或設定轉換
- 在管線中套用轉換
- 執行管線
例如
- 在類型化語言中,利用編譯時間錯誤檢查,使轉換的 API 強型別化
- 強型別設定:例如,在 Java 中,身為 URL 的參數應使用
URL
類別,而不是String
類別。 - 強型別輸入和輸出:例如,寫入 Mongo DB 的轉換應採用
PCollection<Document>
而不是PCollection<String>
(假設可以為Document
提供Coder
)。
- 強型別設定:例如,在 Java 中,身為 URL 的參數應使用
- 在設定方法中偵測個別參數的無效值。
- 在轉換的驗證方法中偵測參數的無效組合。
執行階段錯誤和資料一致性
優先考慮資料一致性。不要掩蓋資料遺失或損毀。如果無法避免資料遺失,請失敗。
應該
- 在
DoFn
中,如果操作可能在重試後成功,則重試暫時性失敗。以盡可能窄的範圍執行此類重試,以便最大程度地減少重試工作的量(即,最好是在 RPC 函式庫本身層級,或直接將失敗的 RPC 發送到第三方系統的層級)。否則,讓執行器以適合您的適當粒度層級重試工作(不同的執行器可能有不同的重試行為,但它們大多數都會進行某些重試)。 - 如果轉換有副作用,請努力使其具有冪等性(即可以安全地多次套用)。由於重試,副作用可能會多次執行,並且可能會並行執行。
- 如果轉換可能具有無法處理的(永久失敗的)記錄,並且您希望管線儘管如此仍繼續執行
- 如果可以安全地忽略錯誤的記錄,請在指標中計算錯誤的記錄。請確保轉換的文件提及此匯總器。請注意,在執行期間,無法從管線內部以程式設計方式存取以讀取匯總器值。
- 如果錯誤的記錄可能需要使用者手動檢查,請將它們發射到僅包含這些記錄的輸出中。
- 或者,取一個(預設為零)臨界值,超出該臨界值時,元素失敗會變成組合失敗(結構化轉換以計算元素總數和失敗元素的數量,比較它們,如果失敗次數超過臨界值則失敗)。
- 如果使用者要求的資料一致性保證高於您所能提供的,則應失敗。例如:如果使用者從 MQTT 連接器要求 QoS 2 (精確一次傳遞),則連接器應失敗,因為 Beam 執行器可能會重試寫入連接器,因此無法實現精確一次傳遞。
不應該
- 如果您無法處理失敗,甚至不要捕捉它。*例外:* 如果您能夠提供原始錯誤沒有的寶貴上下文,則捕捉錯誤、記錄訊息並重新拋出它可能很有價值。
- 絕對、絕對、絕對不要這樣做:
catch(...) { log an error; return null or false or otherwise ignore; }
經驗法則:如果一個 bundle 沒有失敗,其輸出必須是正確且完整的。 對於使用者而言,記錄錯誤但成功的轉換會是無聲的資料遺失。
效能
許多執行器會以提高效能的方式優化 ParDo
鏈,如果每個輸入元素發出的元素數量少到中等,或者每個元素的處理成本相對較低(例如 Dataflow 的「融合」),但如果違反這些假設,則會限制平行化。在這種情況下,您可能需要一個「融合中斷」(Reshuffle.of()
) 來提高處理 ParDo
輸出 PCollection
的平行性。
- 如果轉換包含一個
ParDo
,它每個輸入元素輸出潛在的大量元素,請在此ParDo
之後應用融合中斷,以確保下游轉換可以平行處理其輸出。 - 如果轉換包含一個
ParDo
,它需要很長時間才能處理一個元素,請在此ParDo
之前插入融合中斷,以確保所有或大多數元素都可以平行處理,無論其輸入PCollection
是如何產生的。
文件
記錄如何配置轉換(提供程式碼範例),以及它對輸入的期望或對輸出的保證,並考量 Beam 模型。例如:
- 此轉換的輸入和輸出集合是有界的還是無界的?還是兩者皆可?
- 如果轉換將資料寫入第三方系統,它是否保證資料至少寫入一次?至多一次?精確一次? (如果執行器由於重試或推測性執行(又稱備份)而多次執行一個 bundle,它是如何實現精確一次的?)
- 如果轉換從第三方系統讀取資料,則讀取的最大潛在平行度是多少?例如,如果轉換依序讀取資料(例如執行單一 SQL 查詢),則文件應提及這一點。
- 如果轉換在處理過程中查詢外部系統(例如,將
PCollection
與來自外部鍵值儲存的資訊聯接),則查詢資料的新鮮度保證是什麼:例如,是否在轉換開始時全部載入,還是按元素查詢(在這種情況下,如果單一元素的資料在轉換運行時發生變化,會發生什麼情況)? - 如果輸入
PCollection
中項目的到達與將輸出發送到輸出PCollection
之間存在非平凡的關係,這種關係是什麼?(例如,如果轉換在內部執行視窗化、觸發、分組或使用狀態或計時器 API)
日誌記錄
預測轉換使用者可能遇到的異常情況。記錄他們會發現足以進行除錯的資訊,但限制記錄的量。以下是一些適用於所有程式的建議,但在資料量龐大且執行分散式時尤其重要。
應該
- 當處理來自第三方系統的錯誤時,請記錄完整錯誤以及第三方系統提供的任何錯誤詳細資訊,並包含轉換知道的任何其他上下文。這使使用者能夠根據訊息中提供的資訊採取行動。當處理異常並重新拋出您自己的異常時,請將原始異常包裝在其中(某些語言提供更進階的功能,例如 Java 的「抑制異常」)。永遠不要靜默丟棄有關錯誤的可用資訊。
- 當執行罕見(非每個元素)且緩慢的操作(例如,展開大型檔案模式或啟動匯入/匯出作業)時,請記錄操作的開始和結束時間。如果操作具有識別符號,請記錄識別符號,以便使用者可以查找該操作以供日後除錯。
- 當計算對進一步處理的正確性或效能至關重要的少量內容時,請記錄輸入和輸出,以便正在除錯過程中的使用者可以健全性檢查它們或手動重現異常結果。例如,當將檔案模式展開為檔案時,請記錄檔案模式是什麼以及它被分割成多少個部分;當執行查詢時,請記錄查詢並記錄產生了多少結果。
不應該
- 不要在每個元素或每個 bundle 以
INFO
記錄。DEBUG
/TRACE
可能可以,因為這些層級預設為停用。 - 避免記錄可能包含敏感資訊的資料 payload,或在記錄之前對其進行清理(例如使用者資料、憑證等)。
測試
資料處理是棘手的,充滿了邊緣情況,並且難以除錯,因為管道需要很長時間才能運行,很難檢查輸出是否正確,您無法附加除錯器,而且由於資料量很大,您通常無法像希望的那樣記錄許多內容。因此,測試特別重要。
測試轉換的執行階段行為
- 使用
TestPipeline
和PAssert
對轉換的整體語義進行單元測試。首先針對直接執行器進行測試。對PCollection
內容的斷言應嚴格:例如,當預期從資料庫讀取 1 到 10 的數字時,不僅要斷言輸出PCollection
中有 10 個元素,或每個元素都在 [1, 10] 範圍內 - 而且要斷言每個數字 1 到 10 都恰好出現一次。 - 識別轉換中容易出現邊緣情況的非平凡順序邏輯,這些邊緣情況難以使用
TestPipeline
可靠地模擬,將此邏輯提取到可單元測試的函式中,並對其進行單元測試。常見的邊緣情況是DoFn
處理空 bundleDoFn
處理極大的 bundle(內容不適合記憶體,包括具有大量值的「熱鍵」)- 第三方 API 失敗
- 第三方 API 提供極不準確的資訊
- 在失敗情況下洩漏
Closeable
/AutoCloseable
資源 - 開發來源時的常見邊緣情況:
BoundedSource.split
中的複雜算術(例如分割索引鍵或偏移範圍)、迭代空的資料來源或具有一些空元件的複合資料來源。
- 模擬與第三方系統的互動,或者更好的是,在可用時使用「假的」實作。確保模擬的互動代表這些系統實際行為的所有有趣案例。
- 要對
DoFn
、CombineFn
和BoundedSource
進行單元測試,請考慮分別使用DoFnTester
、CombineFnTester
和SourceTestUtils
,它們可以以非平凡的方式執行程式碼,以找出潛在的錯誤。 - 對於在無界集合上運作的轉換,請使用
TestStream
測試它們在存在延遲或亂序資料時的行為。 - 測試必須 100% 通過,包括在惡劣的、CPU 或網路受限的環境中(持續整合伺服器)。永遠不要將依賴時間的程式碼(例如睡眠)放入測試中。經驗表明,任何合理的睡眠時間都不夠 - 程式碼可能會暫停超過幾秒鐘。
- 有關測試程式碼組織的詳細說明,請參閱 Beam 測試指南。
測試轉換的建構和驗證
建構和驗證轉換的程式碼通常很簡單,而且大多是樣板程式碼。但是,其中的小錯誤或拼寫錯誤可能會導致嚴重的後果(例如,忽略使用者設定的屬性),因此也需要對其進行測試。然而,過多的瑣碎測試可能難以維護,並且會給人一種錯誤的印象,認為轉換已經過充分測試。
應該
- 測試非平凡的驗證程式碼,其中遺失/不正確/不提供資訊的驗證可能會導致嚴重的問題:資料遺失、違反直覺的行為、屬性的值被靜默忽略或其他難以除錯的錯誤。為每個非平凡的驗證錯誤類別建立 1 個測試。一些應該測試的驗證範例
- 如果無法同時指定屬性
withFoo()
和withBar()
,請測試指定這兩個屬性的轉換是否被拒絕,而不是其中一個屬性在執行時被靜默忽略。 - 如果已知轉換對於特定配置的行為不正確或違反直覺,請測試是否拒絕此配置,而不是在執行時產生錯誤的結果。例如,轉換可能僅適用於有界集合,或僅適用於全域視窗化集合。或者,假設串流系統支援多個「服務品質」層級,其中一個是「精確一次傳遞」。但是,由於失敗時的重試,寫入此系統的轉換可能無法提供精確一次。在這種情況下,請測試轉換是否不允許指定精確一次 QoS,而不是在執行時無法提供預期的端對端語義。
- 如果無法同時指定屬性
- 使用
TestPipeline
和PAssert
建立測試,其中預期的測試結果取決於withFoo()
的值,測試每個withFoo()
方法(包括每個重載)都有效(不被忽略)。
不應該
- 不要測試成功的驗證(例如,「當正確配置轉換時,驗證不會失敗」)
- 不要測試瑣碎的驗證錯誤(例如,「當屬性未設定/為 null/為空/為負時,驗證失敗/...」)
相容性
應該
- 一般來說,請遵循 語義化版本控制的規則。
- 如果轉換的 API 尚未穩定,請將其註解為
@Experimental
(Java) 或@experimental
(Python)。 - 如果 API 已過時,請將其註解為
@Deprecated
(Java) 或@deprecated
(Python)。 - 注意轉換的 API 公開的第三方類別的穩定性和版本控制:如果它們不穩定或版本控制不當(不遵守 語義化版本控制),最好將它們包裝在您自己的類別中。
不應該
- 不要靜默更改轉換的行為,這種方式會使程式碼繼續編譯,但會執行與先前記錄的行為不同的操作(例如,產生不同的輸出或期望不同的輸入,當然除非先前的輸出不正確)。努力使這種不相容的行為變更導致編譯錯誤(例如,為新行為引入新的轉換,並棄用然後刪除舊的轉換(在新主版本中)比靜默更改現有轉換的行為更好),或至少導致執行時錯誤。
- 如果轉換的行為保持不變,並且您只是在變更實作或 API - 不要以會使使用者程式碼停止編譯的方式變更轉換的 API。
Java 特定考量
以下大多數實踐的好例子是 JdbcIO
和 MongoDbIO
。
API
選擇輸入和輸出 PCollection 的類型
盡可能使用特定於轉換性質的類型。如果需要,人們可以使用轉換 DoFn
從他們自己的類型進行包裝。例如,Datastore 連接器應使用 Datastore Entity
類型,MongoDb 連接器應使用 Mongo Document
類型,而不是 JSON 的字串表示形式。
有時候這是不可能的(例如,JDBC 沒有提供與 Beam 相容(可以使用 Coder 編碼)的「JDBC 記錄」資料類型) - 那麼就讓使用者提供一個函數,用於在轉換特定的類型和與 Beam 相容的類型之間進行轉換(例如,請參閱 JdbcIO
和 MongoDbGridFSIO
)。
當轉換在邏輯上應該回傳一個尚未存在對應 Java 類別的複合類型時,請建立一個新的 POJO 類別,並使用明確命名的欄位。請勿使用泛型元組類別或 KV
(除非這些欄位確實是鍵和值)。
具有多個輸出集合的轉換
如果轉換需要回傳多個集合,它應該是一個 PTransform<..., PCollectionTuple>
,並且為每個集合公開 getBlahTag()
方法。
例如,如果您想回傳一個 PCollection<Foo>
和一個 PCollection<Bar>
,請公開 TupleTag<Foo> getFooTag()
和 TupleTag<Bar> getBarTag()
。
例如
public class MyTransform extends PTransform<..., PCollectionTuple> {
private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
...
PCollectionTuple expand(... input) {
...
PCollection<Moo> moo = ...;
PCollection<Blah> blah = ...;
return PCollectionTuple.of(mooTag, moo)
.and(blahTag, blah);
}
public TupleTag<Moo> getMooTag() {
return mooTag;
}
public TupleTag<Blah> getBlahTag() {
return blahTag;
}
...
}
用於設定的 Fluent 建構器
讓轉換類別成為不可變的,並具有產生修改後的不可變物件的方法。使用 AutoValue。AutoValue 可以提供一個 Builder 輔助類別。使用 @Nullable
標記沒有預設值或預設值為 null 的類別類型參數,除了基本類型(例如 int)之外。
@AutoValue
public abstract static class MyTransform extends PTransform<...> {
int getMoo();
@Nullable abstract String getBlah();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMoo(int moo);
abstract Builder setBlah(String blah);
abstract MyTransform build();
}
...
}
工廠方法
提供一個無參數的靜態工廠方法,可以放在封閉類別中(請參閱「封裝一系列轉換」),或者在轉換類別本身中。
public class Thumbs {
public static Twiddle twiddle() {
return new AutoValue_Thumbs_Twiddle.Builder().build();
}
public abstract static class Twiddle extends PTransform<...> { ... }
}
// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
public static TwiddleThumbs create() {
return new AutoValue_Thumbs_Twiddle.Builder().build();
}
...
}
例外情況:當轉換具有單一最重要參數時,請呼叫工廠方法 of
,並將該參數放入工廠方法的引數中:ParDo.of(DoFn).withAllowedLateness()
。
用於設定參數的 Fluent Builder 方法
將它們命名為 withBlah()
。所有 Builder 方法都必須回傳完全相同的類型;如果它是參數化(泛型)類型,則具有相同的類型參數值。
將 withBlah()
方法視為一組無序的關鍵字引數 - 結果不應取決於您呼叫 withFoo()
和 withBar()
的順序(例如,withBar()
不得讀取 foo 的目前值)。
記錄每個 withBlah
方法的含義:何時使用此方法,允許哪些值,預設值是什麼,以及更改值的含義。
/**
* Returns a new {@link TwiddleThumbs} transform with moo set
* to the given value.
*
* <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
*
* <p>Higher values generally improve throughput, but increase chance
* of spontaneous combustion.
*/
public Twiddle withMoo(int moo) {
checkArgument(moo >= 0 && moo < 100,
"Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
+ "Valid values are 0 (inclusive) to 100 (exclusive)",
moo);
return toBuilder().setMoo(moo).build();
}
參數的預設值
在工廠方法中指定它們(工廠方法回傳具有預設值的物件)。
將多個參數封裝到可重複使用的物件中
如果轉換的幾個參數在邏輯上非常緊密地耦合在一起,有時將它們封裝到容器物件中是有意義的。對此容器物件使用相同的準則(使其不可變,使用具有 builder 的 AutoValue,記錄 withBlah()
方法等)。例如,請參閱 JdbcIO.DataSourceConfiguration。
具有類型參數的轉換
所有類型參數都應在工廠方法上明確指定。Builder 方法(withBlah()
)不應更改類型。
public class Thumbs {
public static Twiddle<T> twiddle() {
return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
}
@AutoValue
public abstract static class Twiddle<T>
extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
…
@Nullable abstract Bar<T> getBar();
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
…
abstract Builder<T> setBar(Bar<T> bar);
abstract Twiddle<T> build();
}
…
}
}
// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle() … );
例外情況:當轉換具有單一最重要參數,且此參數取決於類型 T 時,最好將其直接放入工廠方法中:例如 Combine.globally(SerializableFunction<Iterable<V>,V>
)。這可以改善 Java 的類型推斷,並讓使用者不必明確指定類型參數。
當轉換有多個類型參數時,或者如果參數的含義不明顯,請將類型參數命名為類似 SomethingT
的名稱,例如:一個實作分類器演算法並為每個輸入元素分配標籤的 PTransform 可以使用類型 Classify<InputT, LabelT>
。
注入使用者指定的行為
如果轉換具有需要使用者程式碼自訂的行為方面,請按照以下方式做出決定
應該
- 如果可能,僅使用 PTransform 組合作為可擴展性裝置 - 也就是說,如果使用者可以在其管線中套用轉換並將其與另一個
PTransform
組合,就可以達到相同的效果,那麼轉換本身不應該是可擴展的。例如,一個將 JSON 物件寫入第三方系統的轉換應該採用PCollection<JsonObject>
(假設可以為JsonObject
提供Coder
),而不是採用泛型PCollection<T>
和ProcessFunction<T, JsonObject>
(應該修正的反例:TextIO
)。 - 如果轉換內部需要使用者程式碼進行擴充,請將使用者程式碼作為
ProcessFunction
傳遞,或定義您自己的可序列化類似函數的類型(最好是單一方法,以便與 Java 8 lambdas 互通)。由於 Java 會抹除 lambda 的類型,即使使用者提供了原始類型的ProcessFunction
,您也應該確保具有足夠的類型資訊。有關如何在同時使用ProcessFunction
和InferableFunction
來為 lambdas 和具有類型資訊的具體子類提供良好支援的範例,請參閱MapElements
和FlatMapElements
。
不應該
- 請勿使用繼承進行擴充:使用者不應建立
PTransform
類別的子類別。
打包轉換系列
在開發一系列高度相關的轉換(例如,以不同的方式與同一系統互動,或提供同一高階任務的不同實作)時,請使用頂層類別作為命名空間,並使用多個工廠方法回傳對應於每個個別用例的轉換。
容器類別必須具有私有建構函式,因此不能直接實例化。
在 FooIO
層級記錄通用內容,並單獨記錄每個工廠方法。
/** Transforms for clustering data. */
public class Cluster {
// Force use of static factory methods.
private Cluster() {}
/** Returns a new {@link UsingKMeans} transform. */
public static UsingKMeans usingKMeans() { ... }
public static Hierarchically hierarchically() { ... }
/** Clusters data using the K-Means algorithm. */
public static class UsingKMeans extends PTransform<...> { ... }
public static class Hierarchically extends PTransform<...> { ... }
}
public class FooIO {
// Force use of static factory methods.
private FooIO() {}
public static Read read() { ... }
...
public static class Read extends PTransform<...> { ... }
public static class Write extends PTransform<...> { ... }
public static class Delete extends PTransform<...> { ... }
public static class Mutate extends PTransform<...> { ... }
}
當支援具有不相容 API 的多個版本時,也請將版本作為類似命名空間的類別,並將不同 API 版本的實作放在不同的檔案中。
// FooIO.java
public class FooIO {
// Force use of static factory methods.
private FooIO() {}
public static FooV1 v1() { return new FooV1(); }
public static FooV2 v2() { return new FooV2(); }
}
// FooV1.java
public class FooV1 {
// Force use of static factory methods outside the package.
FooV1() {}
public static Read read() { ... }
public static class Read extends PTransform<...> { ... }
}
// FooV2.java
public static class FooV2 {
// Force use of static factory methods outside the package.
FooV2() {}
public static Read read() { ... }
public static class Read extends PTransform<...> { ... }
}
行為
不可變性
- 轉換類別必須是不可變的:所有變數都必須是 private final 且本身是不可變的(例如,如果它是列表,則必須是
ImmutableList
)。 - 所有
PCollection
的元素都必須是不可變的。
序列化
DoFn
、PTransform
、CombineFn
和其他實例將被序列化。將序列化的資料量保持在最低限度:將您不想序列化的欄位標記為 transient
。盡可能使類別成為 static
(以便實例不會捕獲並序列化封閉類別實例)。注意:在某些情況下,這意味著您不能使用匿名類別。
驗證
- 在
.withBlah()
方法中使用checkArgument()
驗證個別參數。錯誤訊息應提及參數名稱、實際值以及有效值的範圍。 - 在
PTransform
的.expand()
方法中驗證參數組合和遺失的必要參數。 - 在
PTransform
的.validate(PipelineOptions)
方法中驗證PTransform
從PipelineOptions
取得的參數。這些驗證將在管線已經完全建構/擴展並將要使用特定的PipelineOptions
執行時執行。大多數PTransform
不使用PipelineOptions
,因此不需要validate()
方法 - 相反,它們應該透過上述其他兩種方法執行驗證。
@AutoValue
public abstract class TwiddleThumbs
extends PTransform<PCollection<Foo>, PCollection<Bar>> {
abstract int getMoo();
abstract String getBoo();
...
// Validating individual parameters
public TwiddleThumbs withMoo(int moo) {
checkArgument(
moo >= 0 && moo < 100,
"Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
moo);
return toBuilder().setMoo(moo).build();
}
public TwiddleThumbs withBoo(String boo) {
checkArgument(boo != null, "Boo can not be null");
checkArgument(!boo.isEmpty(), "Boo can not be empty");
return toBuilder().setBoo(boo).build();
}
@Override
public void validate(PipelineOptions options) {
int woo = options.as(TwiddleThumbsOptions.class).getWoo();
checkArgument(
woo > getMoo(),
"Woo (%s) must be smaller than moo (%s)",
woo, getMoo());
}
@Override
public PCollection<Bar> expand(PCollection<Foo> input) {
// Validating that a required parameter is present
checkArgument(getBoo() != null, "Must specify boo");
// Validating a combination of parameters
checkArgument(
getMoo() == 0 || getBoo() == null,
"Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
getMoo(), getBoo());
...
}
}
編碼器
Coder
是 Beam 執行器在必要時具現化中繼資料或在工作者之間傳輸資料的一種方式。Coder
不應作為解析或寫入二進位格式的通用 API 使用,因為 Coder
的特定二進位編碼旨在成為其私有實作細節。
為類型提供預設編碼器
為所有新的資料類型提供預設 Coder
。使用 @DefaultCoder
註釋或使用 @AutoService
註釋的 CoderProviderRegistrar
類別:有關範例,請參閱 SDK 中這些類別的用法。如果效能不重要,您可以使用 SerializableCoder
或 AvroCoder
。否則,請開發有效率的自訂編碼器(具體類型使用子類別 AtomicCoder
,泛型類型使用子類別 StructuredCoder
)。
在輸出集合上設定編碼器
您的 PTransform
建立的所有 PCollection
(包括輸出和中繼集合)都必須在其上設定 Coder
:使用者永遠不需要呼叫 .setCoder()
來「修復」您的 PTransform
產生的 PCollection
上的編碼器(事實上,Beam 打算最終棄用 setCoder
)。在某些情況下,編碼器推斷足以實現此目的;在其他情況下,您的轉換需要明確地在其集合上呼叫 setCoder
。
如果集合是具體類型,則該類型通常具有對應的編碼器。使用特定的最有效編碼器(例如,字串使用 StringUtf8Coder.of()
,位元組陣列使用 ByteArrayCoder.of()
等),而不是通用的編碼器,如 SerializableCoder
。
如果集合的類型涉及泛型類型變數,情況會更複雜
- 如果它與轉換的輸入類型一致或只是簡單的包裝,您可以重複使用輸入
PCollection
的編碼器,該編碼器可透過input.getCoder()
取得。 - 嘗試透過
input.getPipeline().getCoderRegistry().getCoder(TypeDescriptor)
推斷編碼器。使用TypeDescriptors
中的公用程式來取得泛型類型的TypeDescriptor
。有關此方法的範例,請參閱AvroIO.parseGenericRecords()
的實作。但是,泛型類型的編碼器推斷是盡力而為的,在某些情況下,由於 Java 類型抹除,它可能會失敗。 - 始終讓使用者可以透過將相關類型變數的
Coder
明確指定為您的PTransform
的組態參數。(例如AvroIO.<T>parseGenericRecords().withCoder(Coder<T>)
)。如果未明確指定編碼器,則回退到推斷。
上次更新於 2024/10/31
您是否找到了您要尋找的所有內容?
所有內容都有用且清楚嗎?您想要更改任何內容嗎?請告訴我們!