SparkReceiver IO
SparkReceiverIO 是一種轉換,用於從 Apache Spark Receiver 讀取資料作為無邊界來源。
Spark Receivers 支援
SparkReceiverIO
目前支援 Apache Spark Receiver。
Spark Receiver
的需求
- Spark 版本應為 2.4.*。
Spark Receiver
應支援使用偏移量。Spark Receiver
應實作 HasOffset 介面。- 記錄應具有代表記錄偏移量的數字欄位。
如需更多詳細資訊,請參閱 SparkReceiverIO 自述檔。
使用 SparkReceiverIO 進行串流讀取
為了從 Spark Receiver
讀取,您需要傳遞
getOffsetFn
,這是一個SerializableFunction
,定義如何從記錄中取得Long
記錄偏移量。receiverBuilder
,用於建構Spark Receiver
的實例,這些實例使用 Apache Beam 機制而非 Spark 環境。
您可以透過傳遞以下參數輕鬆建立 receiverBuilder
物件
- 您的
Spark Receiver
的類別。 - 建立
Spark Receiver
實例所需的建構子引數。
例如
//In this example, MyReceiver accepts a MyConfig object as its only constructor parameter.
MyConfig myPluginConfig = new MyConfig(authToken, apiServerUrl);
Object[] myConstructorArgs = new Object[] {myConfig};
ReceiverBuilder<String, MyReceiver<String>> myReceiverBuilder =
new ReceiverBuilder<>(MyReceiver.class)
.withConstructorArgs(myConstructorArgs);
然後您就可以將此 receiverBuilder
物件傳遞給 SparkReceiverIO
。
例如
讀取具有可選參數的資料
您可以選擇性地傳遞以下可選參數
pullFrequencySec
,這是輪詢新記錄更新之間的延遲秒數。startOffset
,這是應開始讀取的包含起始偏移量。timestampFn
,這是一個SerializableFunction
,定義如何從記錄中取得Instant timestamp
。
例如
特定 Spark Receiver 的範例
CDAP Hubspot Receiver
ReceiverBuilder<String, HubspotReceiver<String>> hubspotReceiverBuilder =
new ReceiverBuilder<>(HubspotReceiver.class)
.withConstructorArgs(hubspotConfig);
SparkReceiverIO.Read<String> readTransform =
SparkReceiverIO.<String>read()
.withGetOffsetFn(GetOffsetUtils.getOffsetFnForHubspot())
.withSparkReceiverBuilder(hubspotReceiverBuilder)
p.apply("readFromHubspotReceiver", readTransform);
上次更新於 2024/10/31
您是否找到了您要找的所有資訊?
所有資訊都有用且清楚嗎?是否有任何您想變更的內容?請告訴我們!