Cdap IO

CdapIO 是一種轉換,用於從來源讀取資料或將資料寫入 CDAP 外掛程式接收器。

批次外掛程式支援

CdapIO 目前藉由參考 CDAP 外掛程式類別名稱,支援下列 CDAP 批次外掛程式

此外,任何其他基於 Hadoop 的 InputFormatOutputFormat 的 CDAP 批次外掛程式都可以使用。它們可以輕鬆地新增至依類別名稱支援的外掛程式清單,如需更多詳細資訊,請參閱 CdapIO 自述檔

串流外掛程式支援

CdapIO 目前支援基於 Apache Spark Receiver 的 CDAP 串流外掛程式。

CDAP 串流外掛程式的需求

使用 CdapIO 進行批次讀取

若要從 CDAP 外掛程式讀取,您需要傳遞

您可以透過指定下列項目,使用 ConfigWrapper 類別輕鬆建構 PluginConfig 物件

例如

Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();

依外掛程式類別名稱讀取資料

某些 CDAP 外掛程式已支援,而且可以直接使用外掛程式類別名稱。

例如

CdapIO.Read<NullWritable, JsonElement> readTransform =
  CdapIO.<NullWritable, JsonElement>read()
    .withCdapPluginClass(HubspotBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(JsonElement.class);
p.apply("read", readTransform);

使用建構批次外掛程式讀取資料

如果 CDAP 外掛程式不支援外掛程式類別名稱,您可以透過傳遞下列參數輕鬆建構 Plugin 物件

然後,您就可以將此 Plugin 物件傳遞至 CdapIO

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPlugin(
      Plugin.createBatch(
        MyCdapPlugin.class,
        MyInputFormat.class,
        MyInputFormatProvider.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(String.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

特定 CDAP 外掛程式的範例

CDAP Hubspot 批次來源外掛程式

SourceHubspotConfig pluginConfig =
  new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
  CdapIO.<NullWritable, JsonElement>read()
    .withCdapPluginClass(HubspotBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);

CDAP Salesforce 批次來源外掛程式

SalesforceSourceConfig pluginConfig =
  new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
  CdapIO.<Schema, LinkedHashMap>read()
    .withCdapPluginClass(SalesforceBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(Schema.class)
    .withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);

CDAP ServiceNow 批次來源外掛程式

ServiceNowSourceConfig pluginConfig =
  new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
  CdapIO.<NullWritable, StructuredRecord>read()
    .withCdapPluginClass(ServiceNowSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);

CDAP Zendesk 批次來源外掛程式

ZendeskBatchSourceConfig pluginConfig =
  new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
  CdapIO.<NullWritable, StructuredRecord>read()
    .withCdapPluginClass(ZendeskBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);

若要瞭解更多資訊,請查看 完整範例

使用 CdapIO 進行批次寫入

若要寫入 CDAP 外掛程式,您需要傳遞

您可以透過指定下列項目,使用 ConfigWrapper 類別輕鬆建構 PluginConfig 物件

例如

MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

依外掛程式類別名稱寫入資料

某些 CDAP 外掛程式已支援,而且可以直接使用外掛程式類別名稱。

例如

CdapIO.Write<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>write()
    .withCdapPluginClass(HubspotBatchSink.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);

使用建構批次外掛程式寫入資料

如果 CDAP 外掛程式不支援外掛程式類別名稱,您可以透過傳遞下列參數輕鬆建構 Plugin 物件

然後,您就可以將此 Plugin 物件傳遞至 CdapIO

例如

CdapIO.Write<String, String> writeTransform =
  CdapIO.<String, String>write()
    .withCdapPlugin(
      Plugin.createBatch(
        MyCdapPlugin.class,
        MyOutputFormat.class,
        MyOutputFormatProvider.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(String.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);

特定 CDAP 外掛程式的範例

CDAP Hubspot 批次接收器外掛程式

SinkHubspotConfig pluginConfig =
  new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
  CdapIO.<NullWritable, String>write()
    .withCdapPluginClass(pluginClass)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);

CDAP Salesforce 批次接收器外掛程式

SalesforceSinkConfig pluginConfig =
  new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
  CdapIO.<NullWritable, CSVRecord>write()
    .withCdapPluginClass(pluginClass)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(CSVRecord.class)
    .withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);

若要瞭解更多資訊,請查看 完整範例

使用 CdapIO 進行串流讀取

若要從 CDAP 外掛程式讀取,您需要傳遞

您可以透過指定下列項目,使用 ConfigWrapper 類別輕鬆建構 PluginConfig 物件

例如

MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

依外掛程式類別名稱讀取資料

某些 CDAP 外掛程式已支援,而且可以直接使用外掛程式類別名稱。

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPluginClass(MyStreamingPlugin.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

使用建構串流外掛程式讀取資料

如果 CDAP 外掛程式不支援外掛程式類別名稱,您可以透過傳遞下列參數輕鬆建構 Plugin 物件

然後,您就可以將此 Plugin 物件傳遞至 CdapIO

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        MyStreamingPlugin.class,
        myGetOffsetFn,
        MyReceiver.class,
        myGetReceiverArgsFromConfigFn))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

使用選用參數讀取資料

您可以選擇性地傳遞以下可選參數

例如

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPluginClass(MyStreamingPlugin.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withPullFrequencySec(1L)
    .withStartOffset(1L);
p.apply("read", readTransform);

特定 CDAP 外掛程式的範例

CDAP Hubspot 串流來源外掛程式

HubspotStreamingSourceConfig pluginConfig =
  new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
    .withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        HubspotStreamingSource.class,
        GetOffsetUtils.getOffsetFnForHubspot(),
        HubspotReceiver.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);

CDAP Salesforce 串流來源外掛程式

SalesforceStreamingSourceConfig pluginConfig =
  new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
    .withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        SalesforceStreamingSource.class,
        GetOffsetUtils.getOffsetFnForSalesforce(),
        SalesforceReceiver.class,
        config -> {
          SalesforceStreamingSourceConfig salesforceConfig =
            SalesforceStreamingSourceConfig) config;
          return new Object[] {
            salesforceConfig.getAuthenticatorCredentials(),
            salesforceConfig.getPushTopicName()
          };
        }))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);

若要瞭解更多資訊,請查看 完整範例