內建 I/O 轉換

SingleStoreDB I/O

管道選項和有關使用和執行 SingleStoreDB I/O 的一般資訊。

開始之前

若要使用 SingleStoreDB I/O,請將 Maven 成品相依性新增至您的 pom.xml 檔案。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-singlestore</artifactId>
    <version>2.60.0</version>
</dependency>

其他資源

驗證

設定 SingleStoreIO 連線屬性需要 DataSource 設定。

建立 DataSource 設定

SingleStoreIO.DataSourceConfiguration
    .create("myHost:3306")
    .withDatabase("db")
    .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
    .withPassword("password")
    .withUsername("admin");

參數可以是

注意 - .readWithPartitions() 需要 .withDatabase(...)

從 SingleStoreDB 讀取

SingleStoreIO 的其中一個功能是從 SingleStoreDB 資料表讀取。SingleStoreIO 支援兩種讀取類型

在許多情況下,由於效能原因,平行資料讀取優於循序資料讀取。

循序資料讀取

基本的 .read() 操作使用方式如下

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>read()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withStatementPreparator(statementPreparator)
        .withOutputParallelization(true)
        .withRowMapper(mapper)
);

參數可以是

注意 - 必須使用 .withTable(...).withQuery(...) 其中之一。

平行資料讀取

基本的 .readWithPartitions() 操作使用方式如下

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withRowMapper(mapper)
);

參數可以是

注意 - 必須使用 .withTable(...).withQuery(...) 其中之一。

StatementPreparator

StatementPreparatorread() 用來設定 PreparedStatement 的參數。例如

public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
    @Override
    public void setParameters(PreparedStatement preparedStatement) throws Exception {
        preparedStatement.setInt(1, 10);
    }
}

RowMapper

RowMapperread()readWithPartitions() 用於將 ResultSet 的每一列轉換為產生的 PCollection 的元素。例如

public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
    @Override
    public MyRow mapRow(ResultSet resultSet) throws Exception {
        return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
    }
}

寫入 SingleStoreDB 資料表

SingleStoreIO 的其中一個功能是寫入 SingleStoreDB 資料表。此轉換使您能夠將使用者的 PCollection 傳送到您的 SingleStoreDB 資料庫。它會傳回每一批元素寫入的列數。

基本的 .write() 操作使用方式如下

data.apply(
    SingleStoreIO.<USER_DATA_TYPE>write()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE")
        .withUserDataMapper(mapper)
        .withBatchSize(100000)
);

參數可以是

UserDataMapper

write() 操作儲存資料之前,需要 UserDataMapper 將資料從 PCollection 對應到 String 值的陣列。例如

public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
    @Override
    public List<String> mapRow(MyRow element) {
        List<String> res = new ArrayList<>();
        res.add(element.id().toString());
        res.add(element.name());
        return res;
    }
}