Apache Beam Typescript SDK
Apache Beam 的 Typescript SDK 提供了一個簡單而強大的 API,用於建構批次和串流資料處理管線。
開始使用 Typescript SDK
開始使用Beam Typescript SDK 快速入門 來設定您的開發環境、取得 Beam SDK for Typescript,並執行範例管線。然後,請閱讀Beam 程式設計指南,了解適用於 Beam 中所有 SDK 的基本概念。
概觀
我們通常嘗試以 TypeScript 慣用的方式應用 Beam API 的概念。此外,與傳統的 SDK 相比,也有一些顯著的不同之處
我們採用「關聯基礎」方法,其中具有結構的資料是與資料互動的主要方式,並且我們通常避開需要鍵值轉換,而傾向於使用更靈活的方法來命名欄位或表達式。例如,我們傾向於使用更靈活的GroupBy PTransform,而不是傳統的 GroupByKey。JavaScript 的原生 Object 被用作列型別。
作為優先考慮結構的一部分,我們也降低了 Coders 作為 SDK 中一級概念的重要性,將其降級為用於互通的高級功能。雖然我們可以從個別元素推斷結構,但是否/如何利用型別系統和/或函數內省來定期在建構時推斷結構仍有待確定。當我們沒有足夠的型別資訊時,會使用使用 BSON 編碼的回退編碼器。
我們已在 PCollection 物件中新增其他方法,特別是
map
和flatmap
,而不是只允許 apply。此外,apply
可以接受函數引數(PCollection) => ...
以及 PTransform 子類別,這會將此可呼叫物件視為 PTransform 的擴展。另一方面,我們已從 API 中移除有問題的 Pipeline 物件,而是提供一個在其上建立管線的
Root
PValue,並在 Runner 上呼叫 run()。我們提供較不易出錯的Runner.run
,它僅在管線完全完成時才結束,以及Runner.runAsync
,它會傳回執行中管線的控制代碼。我們沒有引入 PCollectionTuple、PCollectionList 等,而是讓 PValue 成為具有 PValue 值的陣列或物件,轉換可以取用或產生這些值。這些透過以
P
運算子包裝來應用,例如P([pc1, pc2, pc3]).apply(new Flatten())
。與 Python 類似,
flatMap
和ParDo.process
透過從產生器產生多個元素來傳回,而不是呼叫傳入的回呼。目前有一項作業可以根據元素的屬性將 PCollection 分割為多個 PCollection,我們可能會考慮對側輸出使用回呼。map
、flatMap
和ParDo.process
方法會採用額外的 (選擇性) 環境引數,這與 Python 中使用的關鍵字引數類似。這些是 JavaScript 物件,其成員可以是常數 (按原樣傳遞),也可以是特殊的 DoFnParam 物件,這些物件提供在執行階段取得元素特定資訊 (例如目前的時間戳記、視窗或側輸入) 的 getter。我們沒有在 map/do 作業本身中引入多個輸出的複雜性,而是透過追蹤新的
Split
原始類型來產生多個輸出,該原始類型採用PCollection<{a?: AType, b: BType, ... }>
,並產生物件{a: PCollection<AType>, b: PCollection<BType>, ...}
。JavaScript 支援 (並鼓勵) 非同步程式設計模型,許多程式庫都要求使用 async/await 範例。由於 (按設計) 沒有方法可以從非同步樣式返回同步樣式,因此在設計 API 時需要考慮到這一點。我們目前提供
PValue.apply(...)
的非同步變體 (除了同步變體之外,因為它們更容易鏈結),以及讓Runner.run
成為非同步。有待確定是否也對所有使用者回呼執行此操作。
範例管線可以在 wordcount.ts 中找到,更多文件可以在 beam 程式設計指南中找到。
管線 I/O
請參閱Beam 提供的 I/O 轉換頁面,以取得目前可用的 I/O 轉換清單。
支援的功能
Typescript SDK 仍在開發中,但已支援 Beam 模型目前支援的許多功能 (但並非全部),包括批次和串流。它也廣泛支援跨語言轉換,可用於利用 Typescript 管線中更進階的功能。
序列化
由於 Beam 的設計目的是在分散式環境中執行,因此所有函數和資料都必須是可序列化的。
依預設,資料使用 BSON 編碼進行序列化,但可以透過將 withRowCoder 或 withCoderInternal 轉換套用至 PCollection 來進行自訂。
在轉換 (例如 map
) 中使用的函數,包括閉包及其擷取的資料,會透過 ts-serialize-closures 進行序列化。雖然這可以很好地處理大多數情況,但它仍然有其限制,並且在其對引用物件的傳遞閉包的走訪中,可能會擷取最好匯入而不是序列化的物件。為了避免這些限制,可以使用 requireForSerialization 函數明確註冊參考,如下所示。
// in module my_package/module_to_be_required
import { requireForSerialization } from "apache-beam/serialization";
// define or import various objects_to_register here
requireForSerialization(
"my_package/module_to_be_required", { objects_to_register });
入門專案具有這樣的範例。