內建 I/O 轉換

Snowflake I/O

關於使用和執行 Snowflake IO 的管線選項和一般資訊。

開始之前

若要使用 SnowflakeIO,請將 Maven artifact 相依性新增至您的 pom.xml 檔案。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-snowflake</artifactId>
    <version>2.60.0</version>
</dependency>

其他資源

驗證

讀取和批次寫入支援下列驗證方法

串流寫入僅支援金鑰配對驗證。如需詳細資訊,請參閱:BEAM-3304

傳遞認證是透過用來實例化 SnowflakeIO.DataSourceConfiguration 類別的管線選項來完成的。每種驗證方法都有不同的方式來設定此類別。

使用者名稱和密碼

若要在 SnowflakeIO 中使用使用者名稱/密碼驗證,請使用下列管線選項來叫用您的管線

--username=<USERNAME> --password=<PASSWORD>

傳遞認證是透過用來實例化 SnowflakeIO.DataSourceConfiguration 類別的管線選項來完成的。

SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
        .withUsernamePasswordAuth(
                options.getUsername(),
                options.getPassword())
        .withServerName(options.getServerName())
        .withDatabase(options.getDatabase())
        .withRole(options.getRole())
        .withWarehouse(options.getWarehouse())
        .withSchema(options.getSchema());

金鑰配對

若要使用此驗證方法,您必須先產生金鑰配對,並將公鑰與將使用 IO 轉換連線的 Snowflake 使用者建立關聯。如需指示,請參閱 Snowflake 文件中的金鑰配對驗證 & 金鑰配對輪換

若要將金鑰配對驗證與 SnowflakeIO 搭配使用,請使用下列其中一組管線選項來叫用您的管線

OAuth 權杖

SnowflakeIO 也支援 OAuth 權杖。

重要:SnowflakeIO 需要有效的 OAuth 存取權杖。它既無法重新整理權杖,也無法使用網頁式流程取得權杖。如需有關設定 OAuth 整合和取得權杖的資訊,請參閱Snowflake 文件

取得權杖後,請使用下列管線選項來叫用您的管線

--oauthToken=<TOKEN>
SnowflakeIO.DataSourceConfiguration 類別的初始化可能如下所示
 SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());

資料來源設定

資料來源設定在讀取和寫入物件中都必須使用,以便設定 Snowflake 連線屬性以用於 IO 用途。

一般用法

建立資料來源設定

 SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());
其中參數可以是

注意 - 必須使用 .withUrl(...).withServerName(...) 其中一個

管線選項

使用 Beam 的 Pipeline 選項,透過命令列設定選項。

Snowflake 管線選項

Snowflake IO 函式庫支援以下選項,當 Pipeline 使用時,預設可以透過命令列傳遞。

--url Snowflake 的 JDBC 樣式 URL,包含帳戶名稱和區域,不含任何參數。

--serverName 包含帳戶、區域和網域的完整伺服器名稱。

--username 使用者名稱/密碼和私鑰驗證時為必要選項。

--oauthToken 僅限 OAuth 驗證時為必要選項。

--password 僅限使用者名稱/密碼驗證時為必要選項。

--privateKeyPath 私鑰檔案路徑。僅限私鑰驗證時為必要選項。

--rawPrivateKey 私鑰。僅限私鑰驗證時為必要選項。

--privateKeyPassphrase 私鑰的密碼。僅限私鑰驗證時為必要選項。

--stagingBucketName 外部儲存桶路徑,以 / 結尾。例如 {gs,s3}://bucket/。允許子目錄。

--storageIntegrationName 儲存整合名稱

--warehouse 要使用的倉儲。選用。

--database 要連接的資料庫名稱。選用。

--schema 要使用的綱要。選用。

--table 要使用的表格。選用。

--query 要使用的查詢。選用。

--role 要使用的角色。選用。

--authenticator 要使用的驗證器。選用。

--portNumber 連接埠號碼。選用。

--loginTimeout 登入逾時。選用。

--snowPipe SnowPipe 名稱。選用。

使用管線選項執行 main 命令

若要透過命令列傳遞 Pipeline 選項,請在 gradle 命令中使用 --args,如下所示

./gradle run
    --args="
        --serverName=<SNOWFLAKE SERVER NAME>
           Example: --serverName=account.region.gcp.snowflakecomputing.com
        --username=<SNOWFLAKE USERNAME>
           Example: --username=testuser
        --password=<SNOWFLAKE PASSWORD>
           Example: --password=mypassword
        --database=<SNOWFLAKE DATABASE>
           Example: --database=TEST_DATABASE
        --schema=<SNOWFLAKE SCHEMA>
           Example: --schema=public
        --table=<SNOWFLAKE TABLE IN DATABASE>
           Example: --table=TEST_TABLE
        --query=<IF NOT TABLE THEN QUERY>
           Example: --query=‘SELECT column FROM TABLE’
        --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
           Example: --storageIntegrationName=my_integration
        --stagingBucketName=<GCS OR S3 BUCKET>
           Example: --stagingBucketName={gs,s3}://bucket/
        --runner=<DirectRunner/DataflowRunner>
           Example: --runner=DataflowRunner
        --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
           Example: --project=my_project
        --tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
                        WITH gs://…>
           Example: --tempLocation=gs://bucket/temp/
        --region=<FOR DATAFLOW RUNNER: GCP REGION>
           Example: --region=us-east-1
        --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
           Example: --appName=my_job"
然後在程式碼中,可以使用 options.getStagingBucketName() 命令存取帶有引數的參數。

使用管線選項執行 test 命令

若要透過命令列傳遞 Pipeline 選項,請在 gradle 命令中使用 -DintegrationTestPipelineOptions,如下所示

./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
  "--serverName=<SNOWFLAKE SERVER NAME>",
      Example: --serverName=account.region.gcp.snowflakecomputing.com
  "--username=<SNOWFLAKE USERNAME>",
      Example: --username=testuser
  "--password=<SNOWFLAKE PASSWORD>",
      Example: --password=mypassword
  "--schema=<SNOWFLAKE SCHEMA>",
      Example: --schema=PUBLIC
  "--table=<SNOWFLAKE TABLE IN DATABASE>",
      Example: --table=TEST_TABLE
  "--database=<SNOWFLAKE DATABASE>",
      Example: --database=TEST_DATABASE
  "--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
      Example: --storageIntegrationName=my_integration
  "--stagingBucketName=<GCS OR S3 BUCKET>",
      Example: --stagingBucketName={gs,s3}://bucket
  "--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
      Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache

所有參數都以「--」開頭,並用雙引號括住,並以逗號分隔

在 Dataflow 上執行管線

預設情況下,管道會在您本機的 Direct Runner 上執行。若要在 Google Dataflow 上執行管道,您必須提供以下 Pipeline 選項

如需 Dataflow 的更多管道選項,請參閱此處

注意:若要正確使用 Google Cloud 進行驗證,請使用 gcloud 或遵循Google Cloud 文件

重要:請了解 Google Dataflow 定價

在 Dataflow 上執行管線範本

Google Dataflow 支援範本建立,這表示在 Cloud Storage 上暫存管道,並使用只能在管道執行期間使用的執行階段參數來執行它們。

建立自己的 Dataflow 範本的流程如下

  1. 建立您自己的管道。
  2. 建立 Dataflow 範本,同時檢查 SnowflakeIO 在執行階段支援哪些選項。
  3. 使用 Cloud ConsoleREST APIgcloud 執行 Dataflow 範本。

目前,SnowflakeIO 在執行階段支援以下選項

目前,SnowflakeIO 不支援在執行階段使用以下選項

寫入 Snowflake 表格

SnowflakeIO 的其中一個功能是寫入 Snowflake 表格。此轉換可讓您使用將使用者的 PCollection 發送到您的 Snowflake 資料庫的輸出作業來完成 Beam 管道。

批次寫入 (來自有界來源)

基本的 .write() 作業用法如下

data.apply(
   SnowflakeIO.<type>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
)
將 type 取代為要寫入的 PCollection 物件的資料類型;例如,對於字串的輸入 PCollection,使用 SnowflakeIO.<String>

以下所有參數都是必要參數

注意:SnowflakeIO 在幕後使用 COPY 陳述式來寫入(使用 COPY to table)。StagingBucketName 將用於儲存最終將進入 Snowflake 的 CSV 檔案。這些 CSV 檔案將儲存在「stagingBucketName」路徑下。

批次處理的選用設定

串流寫入 (來自無界來源)

需要在 Snowflake 主控台中建立 SnowPipe。SnowPipe 應使用相同的整合和儲存桶,如同 .withStagingBucketName.withStorageIntegrationName 方法所指定。寫入作業可能如下所示

data.apply(
   SnowflakeIO.<type>write()
      .withStagingBucketName("BUCKET")
      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
      .withDataSourceConfiguration(dc)
      .withUserDataMapper(mapper)
      .withSnowPipe("MY_SNOW_PIPE")
      .withFlushTimeLimit(Duration.millis(time))
      .withFlushRowLimit(rowsNumber)
      .withShardsNumber(shardsNumber)
)

參數

串流的必要設定

注意:提供綱要資料庫名稱非常重要。

注意:

如前所述,SnowflakeIO 在幕後使用 SnowPipe REST 呼叫,從無界來源寫入。StagingBucketName 將用於儲存最終將進入 Snowflake 的 CSV 檔案。SnowflakeIO 不會從「stagingBucketName」下的路徑中刪除建立的 CSV 檔案,無論是在串流期間或串流結束後都不會。

串流的選用設定

重要通知:

  1. 串流僅接受金鑰對驗證。詳細資訊請參閱:Issue 21287
  2. SnowflakeIO.DataSourceConfiguration 物件中設定的角色參數在串流寫入時會被忽略。詳細資訊請參閱:Issue 21365

刷新時間:持續時間 & 列數

持續時間:串流寫入將根據刷新時間限制中指定的時間持續定期地在暫存區寫入檔案(例如,每 1 分鐘)。

資料列數:除非達到刷新時間限制,否則準備寫入的檔案的資料列數將會依照刷新資料列限制中指定的數量。(例如,如果限制為 1000 列,且緩衝區收集了 99 列,但 1 分鐘的刷新時間已過,則這些列將會被傳送到 SnowPipe 進行插入。)

暫存檔案的大小將取決於資料列的大小和使用的壓縮方式(GZIP)。

UserDataMapper 函數

UserDataMapper 函式是必需的,用於將資料從 PCollection 對應到字串值陣列,然後 write() 操作會將資料儲存到臨時的 .csv 檔案中。例如:

public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
}

其他寫入選項

轉換查詢

write() 操作的 .withQueryTransformation() 選項接受一個 SQL 查詢作為字串值,該查詢將在將 CSV 檔案中暫存的資料直接傳輸到目標 Snowflake 資料表時執行。有關轉換 SQL 語法的資訊,請參閱Snowflake 文件

用法

String query = "SELECT t.$1 from YOUR_TABLE;";
data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withQueryTransformation(query)
)

寫入處置

透過為 write() 操作指定 .withWriteDisposition(...) 選項,根據資料將寫入的資料表來定義寫入行為。支援以下值:

用法範例

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withWriteDisposition(TRUNCATE)
)

建立處置

如果目標資料表不存在,.withCreateDisposition() 選項會定義寫入操作的行為。支援以下值:

用法

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withCreateDisposition(CREATE_NEVER)
)

表格結構描述處置

.withCreateDisposition() 選項設定為 CREATE_IF_NEEDED 時,.withTableSchema() 選項可讓您指定所建立目標資料表的結構描述。資料表結構描述是 SnowflakeColumn 物件的清單,其中包含與資料表中每個資料行的資料行類型對應的名稱和類型。

用法

SnowflakeTableSchema tableSchema =
    new SnowflakeTableSchema(
        SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
        new SnowflakeColumn("id", new SnowflakeNumber()),
        SnowflakeColumn.of("name", new SnowflakeText(), true));

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withTableSchema(tableSchema)
)

從 Snowflake 讀取

SnowflakeIO 的其中一個功能是讀取 Snowflake 資料表 - 可以透過資料表名稱讀取完整資料表,也可以透過查詢讀取自訂資料。讀取轉換的輸出是一個使用者定義資料類型的 PCollection

一般用法

基本的 .read() 操作用法

PCollection<USER_DATA_TYPE> items = pipeline.apply(
   SnowflakeIO.<USER_DATA_TYPE>read()
       .withDataSourceConfiguration(dc)
       .fromTable("MY_TABLE") // or .fromQuery("QUERY")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withCsvMapper(mapper)
       .withCoder(coder));
)
以下所有參數都是必需的:

注意:SnowflakeIO 在幕後使用 COPY 陳述式來讀取(使用 COPY to location)雲端儲存中暫存的檔案。StagingBucketName 將被用作儲存 CSV 檔案的臨時位置。這些臨時目錄將被命名為 sf_copy_csv_DATE_TIME_RANDOMSUFFIX,並且在讀取操作完成後將會自動移除。

CSVMapper

SnowflakeIO 使用 COPY INTO 陳述式將資料從 Snowflake 資料表移動到 GCS/S3 作為 CSV 檔案。然後,這些檔案會透過 FileIO 下載,並逐行處理。每一行都會使用 OpenCSV 程式庫分割成字串陣列。

CSVMapper 的工作是讓使用者能夠將字串陣列轉換為使用者定義的類型,例如 Avro 或 Parquet 檔案的 GenericRecord,或自訂 POJO。

GenericRecord 的 CsvMapper 實作範例

static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
   return (SnowflakeIO.CsvMapper<GenericRecord>)
           parts -> {
               return new GenericRecordBuilder(PARQUET_SCHEMA)
                       .set("ID", Long.valueOf(parts[0]))
                       .set("NAME", parts[1])
                       [...]
                       .build();
           };
}

將 SnowflakeIO 與 AWS S3 搭配使用

若要能夠使用 AWS S3 儲存貯體作為 stagingBucketName,則需要:

  1. 建立一個 PipelineOptions 介面,該介面會擴充 SnowflakePipelineOptionsS3Options,並包含 AwsAccessKeyAwsSecretKey 選項。範例:

public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {

    @Description("AWS Access Key")
    @Default.String("access_key")
    String getAwsAccessKey();

    void setAwsAccessKey(String awsAccessKey);

    @Description("AWS secret key")
    @Default.String("secret_key")
    String getAwsSecretKey();

    void setAwsSecretKey(String awsSecretKey);
}
2. 使用 AwsAccessKeyAwsSecretKey 選項來設定 AwsCredentialsProvider 選項。

options.setAwsCredentialsProvider(
    new AWSStaticCredentialsProvider(
        new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())
    )
);
3. 建立管道

Pipeline p = Pipeline.create(options);

注意:請記得從 S3Options 設定 awsRegion

在 Python SDK 中使用 SnowflakeIO

簡介

Snowflake 跨語言實作支援 Python 程式語言的讀取和寫入操作,這要歸功於跨語言,這是 可移植性架構藍圖 的一部分,旨在提供整個 Beam 生態系統的完整互通性。從開發人員的角度來看,這意味著可以結合以不同語言(Java/Python/Go)撰寫的轉換。

有關跨語言的更多資訊,請參閱多 SDK 工作跨語言轉換 API 和擴充服務文章。

其他資源

從 Snowflake 讀取

SnowflakeIO 的其中一個功能是讀取 Snowflake 資料表 - 可以透過資料表名稱讀取完整資料表,也可以透過查詢讀取自訂資料。讀取轉換的輸出是一個使用者定義資料類型的 PCollection

一般用法

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | ReadFromSnowflake(...)
       | <FURTHER TRANSFORMS>)

必要參數

驗證參數

需要傳遞以下其中一個有效參數組合以進行驗證:

其他參數

寫入 Snowflake

SnowflakeIO 的其中一個功能是寫入 Snowflake 資料表。此轉換可讓您使用將使用者的 PCollection 發送到 Snowflake 資料庫的輸出操作來完成 Beam 管道。

一般用法

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | <SOURCE OF DATA>
       | WriteToSnowflake(
           server_name=<SNOWFLAKE SERVER NAME>,
           username=<SNOWFLAKE USERNAME>,
           password=<SNOWFLAKE PASSWORD>,
           o_auth_token=<OAUTH TOKEN>,
           private_key_path=<PATH TO P8 FILE>,
           raw_private_key=<PRIVATE_KEY>
           private_key_passphrase=<PASSWORD FOR KEY>,
           schema=<SNOWFLAKE SCHEMA>,
           database=<SNOWFLAKE DATABASE>,
           staging_bucket_name=<GCS OR S3 BUCKET>,
           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
           create_disposition=<CREATE DISPOSITION>,
           write_disposition=<WRITE DISPOSITION>,
           table_schema=<SNOWFLAKE TABLE SCHEMA>,
           user_data_mapper=<USER DATA MAPPER FUNCTION>,
           table=<SNOWFLAKE TABLE>,
           query=<IF NOT TABLE THEN QUERY>,
           role=<SNOWFLAKE ROLE>,
           warehouse=<SNOWFLAKE WAREHOUSE>,
           expansion_service=<EXPANSION SERVICE ADDRESS>))

必要參數

驗證參數

需要傳遞以下其中一個有效參數組合以進行驗證:

其他參數

限制

SnowflakeIO 目前有以下限制。

  1. 串流寫入僅支援金鑰組驗證。如需詳細資訊,請參閱:Issue 21287

  2. SnowflakeIO.DataSourceConfiguration 物件中設定的角色參數在串流寫入時會被忽略。詳細資訊請參閱:Issue 21365