Cdap IO
CdapIO
是一種轉換,用於從來源讀取資料或將資料寫入 CDAP 外掛程式接收器。
批次外掛程式支援
CdapIO
目前藉由參考 CDAP 外掛程式
類別名稱,支援下列 CDAP 批次外掛程式
此外,任何其他基於 Hadoop 的 InputFormat
或 OutputFormat
的 CDAP 批次外掛程式都可以使用。它們可以輕鬆地新增至依類別名稱支援的外掛程式清單,如需更多詳細資訊,請參閱 CdapIO 自述檔。
串流外掛程式支援
CdapIO
目前支援基於 Apache Spark Receiver 的 CDAP 串流外掛程式。
CDAP 串流外掛程式的需求
- CDAP 串流外掛程式應基於
Spark Receiver
(Spark 2.4)。 - CDAP 串流外掛程式應支援使用偏移量。
- 對應的 Spark Receiver 應實作 HasOffset 介面。
- 記錄應具有代表記錄偏移量的數值欄位。
使用 CdapIO 進行批次讀取
若要從 CDAP 外掛程式讀取,您需要傳遞
Key
和Value
類別。您需要檢查這些類別是否有可用的 Beam Coder。- 具有特定 CDAP 外掛程式參數的
PluginConfig
物件。
您可以透過指定下列項目,使用 ConfigWrapper
類別輕鬆建構 PluginConfig
物件
- 所需
PluginConfig
的類別。 - 對應 CDAP 外掛程式的
Map<String, Object>
參數對應。
例如
Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();
依外掛程式類別名稱讀取資料
某些 CDAP 外掛程式已支援,而且可以直接使用外掛程式類別名稱。
例如
使用建構批次外掛程式讀取資料
如果 CDAP 外掛程式不支援外掛程式類別名稱,您可以透過傳遞下列參數輕鬆建構 Plugin
物件
- CDAP 批次外掛程式的類別。
- 用於連線至您選擇的 CDAP 外掛程式的
InputFormat
類別。 - 用於提供
InputFormat
的InputFormatProvider
類別。
然後,您就可以將此 Plugin
物件傳遞至 CdapIO
。
例如
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyInputFormat.class,
MyInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
p.apply("read", readTransform);
特定 CDAP 外掛程式的範例
CDAP Hubspot 批次來源外掛程式
SourceHubspotConfig pluginConfig =
new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
CdapIO.<NullWritable, JsonElement>read()
.withCdapPluginClass(HubspotBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforce 批次來源外掛程式
SalesforceSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
CdapIO.<Schema, LinkedHashMap>read()
.withCdapPluginClass(SalesforceBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(Schema.class)
.withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);
CDAP ServiceNow 批次來源外掛程式
ServiceNowSourceConfig pluginConfig =
new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ServiceNowSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);
CDAP Zendesk 批次來源外掛程式
ZendeskBatchSourceConfig pluginConfig =
new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ZendeskBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);
若要瞭解更多資訊,請查看 完整範例。
使用 CdapIO 進行批次寫入
若要寫入 CDAP 外掛程式,您需要傳遞
Key
和Value
類別。您需要檢查這些類別是否有可用的 Beam Coder。locksDirPath
,這是將儲存鎖定的鎖定目錄路徑。此參數是 Hadoop 外部同步處理 (用於取得與寫入作業相關鎖定的機制) 所必需的。- 具有特定 CDAP 外掛程式參數的
PluginConfig
物件。
您可以透過指定下列項目,使用 ConfigWrapper
類別輕鬆建構 PluginConfig
物件
- 所需
PluginConfig
的類別。 - 對應 CDAP 外掛程式的
Map<String, Object>
參數對應。
例如
依外掛程式類別名稱寫入資料
某些 CDAP 外掛程式已支援,而且可以直接使用外掛程式類別名稱。
例如
使用建構批次外掛程式寫入資料
如果 CDAP 外掛程式不支援外掛程式類別名稱,您可以透過傳遞下列參數輕鬆建構 Plugin
物件
- CDAP 外掛程式的類別。
- 用於連線至您選擇的 CDAP 外掛程式的
OutputFormat
類別。 - 用於提供
OutputFormat
的OutputFormatProvider
類別。
然後,您就可以將此 Plugin
物件傳遞至 CdapIO
。
例如
CdapIO.Write<String, String> writeTransform =
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyOutputFormat.class,
MyOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);
特定 CDAP 外掛程式的範例
CDAP Hubspot 批次接收器外掛程式
SinkHubspotConfig pluginConfig =
new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
CdapIO.<NullWritable, String>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);
CDAP Salesforce 批次接收器外掛程式
SalesforceSinkConfig pluginConfig =
new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
CdapIO.<NullWritable, CSVRecord>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(CSVRecord.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);
若要瞭解更多資訊,請查看 完整範例。
使用 CdapIO 進行串流讀取
若要從 CDAP 外掛程式讀取,您需要傳遞
Key
和Value
類別。您需要檢查這些類別是否有可用的 Beam Coder。- 具有特定 CDAP 外掛程式參數的
PluginConfig
物件。
您可以透過指定下列項目,使用 ConfigWrapper
類別輕鬆建構 PluginConfig
物件
- 所需
PluginConfig
的類別。 - 對應 CDAP 外掛程式的
Map<String, Object>
參數對應。
例如
依外掛程式類別名稱讀取資料
某些 CDAP 外掛程式已支援,而且可以直接使用外掛程式類別名稱。
例如
使用建構串流外掛程式讀取資料
如果 CDAP 外掛程式不支援外掛程式類別名稱,您可以透過傳遞下列參數輕鬆建構 Plugin
物件
- CDAP 串流外掛程式的類別。
getOffsetFn
,它是SerializableFunction
,用於定義如何從記錄取得Long
記錄偏移量。receiverClass
,這是與 CDAP 外掛程式相關聯的 Spark (v 2.4)Receiver
類別。- (可選)
getReceiverArgsFromConfigFn
,這是一個SerializableFunction
,它定義了如何使用PluginConfig
物件來取得 SparkReceiver
的建構子參數。
然後,您就可以將此 Plugin
物件傳遞至 CdapIO
。
例如
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
MyStreamingPlugin.class,
myGetOffsetFn,
MyReceiver.class,
myGetReceiverArgsFromConfigFn))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("read", readTransform);
使用選用參數讀取資料
您可以選擇性地傳遞以下可選參數
pullFrequencySec
,這是輪詢新記錄更新之間的時間延遲(以秒為單位)。startOffset
,這是讀取應開始的包含起始偏移量。
例如
特定 CDAP 外掛程式的範例
CDAP Hubspot 串流來源外掛程式
HubspotStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
HubspotStreamingSource.class,
GetOffsetUtils.getOffsetFnForHubspot(),
HubspotReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforce 串流來源外掛程式
SalesforceStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
SalesforceStreamingSource.class,
GetOffsetUtils.getOffsetFnForSalesforce(),
SalesforceReceiver.class,
config -> {
SalesforceStreamingSourceConfig salesforceConfig =
SalesforceStreamingSourceConfig) config;
return new Object[] {
salesforceConfig.getAuthenticatorCredentials(),
salesforceConfig.getPushTopicName()
};
}))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);
若要瞭解更多資訊,請查看 完整範例。
最後更新於 2024/10/31
您是否找到了您要尋找的所有內容?
所有內容是否都有用且清晰?您有任何想要更改的地方嗎?請告訴我們!