Snowflake I/O
關於使用和執行 Snowflake IO 的管線選項和一般資訊。
開始之前
若要使用 SnowflakeIO,請將 Maven artifact 相依性新增至您的 pom.xml
檔案。
其他資源
驗證
讀取和批次寫入支援下列驗證方法
- 使用者名稱和密碼
- 金鑰配對
- OAuth 權杖
串流寫入僅支援金鑰配對驗證。如需詳細資訊,請參閱:BEAM-3304。
傳遞認證是透過用來實例化 SnowflakeIO.DataSourceConfiguration
類別的管線選項來完成的。每種驗證方法都有不同的方式來設定此類別。
使用者名稱和密碼
若要在 SnowflakeIO 中使用使用者名稱/密碼驗證,請使用下列管線選項來叫用您的管線
傳遞認證是透過用來實例化 SnowflakeIO.DataSourceConfiguration
類別的管線選項來完成的。
SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(
options.getUsername(),
options.getPassword())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withRole(options.getRole())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());
金鑰配對
若要使用此驗證方法,您必須先產生金鑰配對,並將公鑰與將使用 IO 轉換連線的 Snowflake 使用者建立關聯。如需指示,請參閱 Snowflake 文件中的金鑰配對驗證 & 金鑰配對輪換。
若要將金鑰配對驗證與 SnowflakeIO 搭配使用,請使用下列其中一組管線選項來叫用您的管線
- 以路徑方式傳遞金鑰
SnowflakeIO.DataSourceConfiguration
類別的初始化可能如下所示SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create() .withKeyPairPathAuth( options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()) .withServerName(options.getServerName()) .withDatabase(options.getDatabase()) .withRole(options.getRole()) .withWarehouse(options.getWarehouse()) .withSchema(options.getSchema());
- 以值方式傳遞金鑰
SnowflakeIO.DataSourceConfiguration
類別的初始化可能如下所示SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create() .withKeyPairRawAuth( options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase()) .withServerName(options.getServerName()) .withDatabase(options.getDatabase()) .withRole(options.getRole()) .withWarehouse(options.getWarehouse()) .withSchema(options.getSchema());
OAuth 權杖
SnowflakeIO 也支援 OAuth 權杖。
重要:SnowflakeIO 需要有效的 OAuth 存取權杖。它既無法重新整理權杖,也無法使用網頁式流程取得權杖。如需有關設定 OAuth 整合和取得權杖的資訊,請參閱Snowflake 文件。
取得權杖後,請使用下列管線選項來叫用您的管線
SnowflakeIO.DataSourceConfiguration
類別的初始化可能如下所示資料來源設定
資料來源設定在讀取和寫入物件中都必須使用,以便設定 Snowflake 連線屬性以用於 IO 用途。
一般用法
建立資料來源設定
.withUrl(...)
- 您的 Snowflake 帳戶的類似 JDBC 的 URL,包括帳戶名稱和區域,不含任何參數。
- 範例:
.withUrl("jdbc:snowflake://account.snowflakecomputing.com")
.withServerName(...)
- 伺服器名稱 - 具有帳戶、區域和網域的完整伺服器名稱。
- 範例:
.withServerName("account.snowflakecomputing.com")
.withDatabase(...)
- 要使用的 Snowflake 資料庫名稱。
- 範例:
.withDatabase("MY_DATABASE")
.withWarehouse(...)
- 要使用的 Snowflake 資料倉儲名稱。此參數為選用。如果未指定資料倉儲名稱,則會使用使用者的預設資料倉儲。
- 範例:
.withWarehouse("MY_WAREHOUSE")
.withSchema(...)
- 要使用的資料庫中的結構描述名稱。此參數為選用。
- 範例:
.withSchema("PUBLIC")
.withUsernamePasswordAuth(username, password)
- 設定使用者名稱/密碼驗證。
- 範例:
.withUsernamePasswordAuth("USERNAME", "PASSWORD")
.withOAuth(token)
- 設定 OAuth 驗證。
- 範例:
.withOAuth("TOKEN")
.withKeyPairAuth(username, privateKey)
- 使用使用者名稱和PrivateKey設定金鑰配對驗證
- 範例:
.withKeyPairAuth("USERNAME",
PrivateKey)
.withKeyPairPathAuth(username, privateKeyPath, privateKeyPassphrase)
- 使用使用者名稱、私密金鑰檔案路徑和密碼片語設定金鑰配對驗證。
- 範例:
.withKeyPairPathAuth("USERNAME", "PATH/TO/KEY.P8", "PASSPHRASE")
.withKeyPairRawAuth(username, rawPrivateKey, privateKeyPassphrase)
- 設定使用使用者名稱、私鑰和密碼的金鑰對驗證。
- 範例:
.withKeyPairRawAuth("USERNAME", "PRIVATE_KEY", "PASSPHRASE")
注意 - 必須使用 .withUrl(...)
或 .withServerName(...)
其中一個。
管線選項
使用 Beam 的 Pipeline 選項,透過命令列設定選項。
Snowflake 管線選項
Snowflake IO 函式庫支援以下選項,當 Pipeline 使用時,預設可以透過命令列傳遞。
--url
Snowflake 的 JDBC 樣式 URL,包含帳戶名稱和區域,不含任何參數。
--serverName
包含帳戶、區域和網域的完整伺服器名稱。
--username
使用者名稱/密碼和私鑰驗證時為必要選項。
--oauthToken
僅限 OAuth 驗證時為必要選項。
--password
僅限使用者名稱/密碼驗證時為必要選項。
--privateKeyPath
私鑰檔案路徑。僅限私鑰驗證時為必要選項。
--rawPrivateKey
私鑰。僅限私鑰驗證時為必要選項。
--privateKeyPassphrase
私鑰的密碼。僅限私鑰驗證時為必要選項。
--stagingBucketName
外部儲存桶路徑,以 /
結尾。例如 {gs,s3}://bucket/
。允許子目錄。
--storageIntegrationName
儲存整合名稱
--warehouse
要使用的倉儲。選用。
--database
要連接的資料庫名稱。選用。
--schema
要使用的綱要。選用。
--table
要使用的表格。選用。
--query
要使用的查詢。選用。
--role
要使用的角色。選用。
--authenticator
要使用的驗證器。選用。
--portNumber
連接埠號碼。選用。
--loginTimeout
登入逾時。選用。
--snowPipe
SnowPipe 名稱。選用。
使用管線選項執行 main 命令
若要透過命令列傳遞 Pipeline 選項,請在 gradle 命令中使用 --args
,如下所示
./gradle run
--args="
--serverName=<SNOWFLAKE SERVER NAME>
Example: --serverName=account.region.gcp.snowflakecomputing.com
--username=<SNOWFLAKE USERNAME>
Example: --username=testuser
--password=<SNOWFLAKE PASSWORD>
Example: --password=mypassword
--database=<SNOWFLAKE DATABASE>
Example: --database=TEST_DATABASE
--schema=<SNOWFLAKE SCHEMA>
Example: --schema=public
--table=<SNOWFLAKE TABLE IN DATABASE>
Example: --table=TEST_TABLE
--query=<IF NOT TABLE THEN QUERY>
Example: --query=‘SELECT column FROM TABLE’
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
Example: --storageIntegrationName=my_integration
--stagingBucketName=<GCS OR S3 BUCKET>
Example: --stagingBucketName={gs,s3}://bucket/
--runner=<DirectRunner/DataflowRunner>
Example: --runner=DataflowRunner
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
Example: --project=my_project
--tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
WITH gs://…>
Example: --tempLocation=gs://bucket/temp/
--region=<FOR DATAFLOW RUNNER: GCP REGION>
Example: --region=us-east-1
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
Example: --appName=my_job"
options.getStagingBucketName()
命令存取帶有引數的參數。使用管線選項執行 test 命令
若要透過命令列傳遞 Pipeline 選項,請在 gradle 命令中使用 -DintegrationTestPipelineOptions
,如下所示
./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
"--serverName=<SNOWFLAKE SERVER NAME>",
Example: --serverName=account.region.gcp.snowflakecomputing.com
"--username=<SNOWFLAKE USERNAME>",
Example: --username=testuser
"--password=<SNOWFLAKE PASSWORD>",
Example: --password=mypassword
"--schema=<SNOWFLAKE SCHEMA>",
Example: --schema=PUBLIC
"--table=<SNOWFLAKE TABLE IN DATABASE>",
Example: --table=TEST_TABLE
"--database=<SNOWFLAKE DATABASE>",
Example: --database=TEST_DATABASE
"--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
Example: --storageIntegrationName=my_integration
"--stagingBucketName=<GCS OR S3 BUCKET>",
Example: --stagingBucketName={gs,s3}://bucket
"--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache
所有參數都以「--」開頭,並用雙引號括住,並以逗號分隔
--serverName=<SNOWFLAKE 伺服器名稱>
- 指定您的帳戶完整名稱(由 Snowflake 提供)。請注意,您的完整帳戶名稱可能包含額外的區段,用於識別您帳戶所在的區域和雲端平台。
- 範例:
--serverName=xy12345.eu-west-1.gcp..snowflakecomputing.com
--username=<SNOWFLAKE 使用者名稱>
- 指定使用者的登入名稱。
- 範例:
--username=my_username
--password=<SNOWFLAKE 密碼>
- 指定指定使用者的密碼。
- 範例:
--password=my_secret
--schema=<SNOWFLAKE 綱要>
- 指定連線後,用於指定資料庫的綱要。指定的綱要應該是現有綱要,且指定使用者的角色具有該綱要的權限。
- 範例:
--schema=PUBLIC
--table=<SNOWFLAKE 資料庫中的表格>
- 範例:
--table=MY_TABLE
- 範例:
--database=<SNOWFLAKE 資料庫>
- 指定連線後要使用的資料庫。指定的資料庫應該是現有資料庫,且指定使用者的角色具有該資料庫的權限。
- 範例:
--database=MY_DATABASE
--storageIntegrationName=<SNOWFLAKE 儲存整合名稱>
- 在 Snowflake 中為選擇的雲端儲存建立的儲存整合名稱。
- 範例:
--storageIntegrationName=my_google_integration
在 Dataflow 上執行管線
預設情況下,管道會在您本機的 Direct Runner 上執行。若要在 Google Dataflow 上執行管道,您必須提供以下 Pipeline 選項
--runner=DataflowRunner
- Dataflow 的特定執行器。
--project=<GCS 專案>
- Google Cloud Platform 專案的名稱。
--stagingBucketName=<GCS 或 S3 儲存桶>
- Google Cloud Services 儲存桶或 AWS S3 儲存桶,Beam 檔案將在此處暫存。
--maxNumWorkers=5
- (選用)最大工作節點數。
--appName=<JOB NAME>
- (選用)Dataflow 儀表板中作業名稱的前綴。
如需 Dataflow 的更多管道選項,請參閱此處。
注意:若要正確使用 Google Cloud 進行驗證,請使用 gcloud 或遵循Google Cloud 文件。
重要:請了解 Google Dataflow 定價
在 Dataflow 上執行管線範本
Google Dataflow 支援範本建立,這表示在 Cloud Storage 上暫存管道,並使用只能在管道執行期間使用的執行階段參數來執行它們。
建立自己的 Dataflow 範本的流程如下
- 建立您自己的管道。
- 建立 Dataflow 範本,同時檢查 SnowflakeIO 在執行階段支援哪些選項。
- 使用 Cloud Console、REST API 或 gcloud 執行 Dataflow 範本。
目前,SnowflakeIO 在執行階段支援以下選項
--serverName
包含帳戶、區域和網域的完整伺服器名稱。--username
使用者名稱/密碼和私鑰驗證時為必要選項。--password
僅限使用者名稱/密碼驗證時為必要選項。--rawPrivateKey
私鑰檔案。僅限私鑰驗證時為必要選項。--privateKeyPassphrase
私鑰的密碼。僅限私鑰驗證時為必要選項。--stagingBucketName
外部儲存桶路徑,以/
結尾。例如{gs,s3}://bucket/
。允許子目錄。--storageIntegrationName
儲存整合名稱。--warehouse
要使用的倉儲。選用。--database
要連接的資料庫名稱。選用。--schema
要使用的綱要。選用。--table
要使用的表格。選用。注意:表格不在預設的管道選項中。--query
要使用的查詢。選用。注意:查詢不在預設的管道選項中。--role
要使用的角色。選用。--snowPipe
SnowPipe 名稱。選用。
目前,SnowflakeIO 不支援在執行階段使用以下選項
--url
Snowflake 的 JDBC 樣式 URL,包含帳戶名稱和區域,不含任何參數。--oauthToken
僅限 OAuth 驗證時為必要選項。--privateKeyPath
私鑰檔案路徑。僅限私鑰驗證時為必要選項。--authenticator
要使用的驗證器。選用。--portNumber
連接埠號碼。選用。--loginTimeout
登入逾時。選用。
寫入 Snowflake 表格
SnowflakeIO 的其中一個功能是寫入 Snowflake 表格。此轉換可讓您使用將使用者的 PCollection 發送到您的 Snowflake 資料庫的輸出作業來完成 Beam 管道。
批次寫入 (來自有界來源)
基本的 .write()
作業用法如下
PCollection
物件的資料類型;例如,對於字串的輸入 PCollection
,使用 SnowflakeIO.<String>
。以下所有參數都是必要參數
.withDataSourceConfiguration()
接受 DatasourceConfiguration 物件。.to()
接受目標 Snowflake 表格名稱。.withStagingBucketName()
接受以斜線結尾的雲端儲存桶路徑。 - 範例:.withStagingBucketName("{gs,s3}://bucket/my/dir/")
.withStorageIntegrationName()
接受根據 Snowflake 文件建立的 Snowflake 儲存整合物件的名稱。範例然後.withUserDataMapper()
接受 UserDataMapper 函數,該函數會將使用者的 PCollection 對應到字串值陣列(String[])
。
注意:SnowflakeIO 在幕後使用 COPY
陳述式來寫入(使用 COPY to table)。StagingBucketName 將用於儲存最終將進入 Snowflake 的 CSV 檔案。這些 CSV 檔案將儲存在「stagingBucketName」路徑下。
批次處理的選用設定
.withQuotationMark()
- 預設值:
‘
(單引號)。 - 接受帶有一個字元的字串。它會將所有儲存到 CSV 的文字 (String) 欄位包圍。它應該是 Snowflake 的 FIELD_OPTIONALLY_ENCLOSED_BY 參數 (雙引號、單引號或無) 接受的字元之一。
- 範例:
.withQuotationMark("'")
- 預設值:
串流寫入 (來自無界來源)
需要在 Snowflake 主控台中建立 SnowPipe。SnowPipe 應使用相同的整合和儲存桶,如同 .withStagingBucketName
和 .withStorageIntegrationName
方法所指定。寫入作業可能如下所示
data.apply(
SnowflakeIO.<type>write()
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withDataSourceConfiguration(dc)
.withUserDataMapper(mapper)
.withSnowPipe("MY_SNOW_PIPE")
.withFlushTimeLimit(Duration.millis(time))
.withFlushRowLimit(rowsNumber)
.withShardsNumber(shardsNumber)
)
參數
串流的必要設定
.withDataSourceConfiguration()
- 接受 DatasourceConfiguration 物件。
.to()
- 接受目標 Snowflake 表格名稱。
- 範例:
.to("MY_TABLE")
.withStagingBucketName()
- 接受以斜線結尾的雲端儲存桶路徑。
- 範例:
.withStagingBucketName("{gs,s3}://bucket/my/dir/")
.withStorageIntegrationName()
- 接受根據 Snowflake 文件建立的 Snowflake 儲存整合物件的名稱。
- 範例然後
.withSnowPipe()
接受目標 SnowPipe 名稱。
.withSnowPipe()
接受 snowpipe 的確切名稱。範例然後
注意:提供綱要和資料庫名稱非常重要。
.withUserDataMapper()
- 接受 UserDataMapper 函數,該函數會將使用者的 PCollection 對應到字串值陣列
(String[])
。
- 接受 UserDataMapper 函數,該函數會將使用者的 PCollection 對應到字串值陣列
注意:
如前所述,SnowflakeIO 在幕後使用 SnowPipe REST 呼叫,從無界來源寫入。StagingBucketName 將用於儲存最終將進入 Snowflake 的 CSV 檔案。SnowflakeIO 不會從「stagingBucketName」下的路徑中刪除建立的 CSV 檔案,無論是在串流期間或串流結束後都不會。
串流的選用設定
.withFlushTimeLimit()
- 預設值:30 秒
- 接受 Duration 物件,其中指定每次重複串流寫入的時間。
- 範例:
.withFlushTimeLimit(Duration.millis(180000))
.withFlushRowLimit()
- 預設值:10,000 行
- 寫入每個暫存檔案的行數限制
- 範例:
.withFlushRowLimit(500000)
.withShardNumber()
- 預設值:1 個分片
- 每次刷新時將儲存的檔案數(用於平行寫入)。
- 範例:
.withShardNumber(5)
.withQuotationMark()
- 預設值:
‘
(單引號)。 - 接受帶有一個字元的字串。它會將所有儲存到 CSV 的文字 (String) 欄位包圍。它應該是 Snowflake 的 FIELD_OPTIONALLY_ENCLOSED_BY 參數 (雙引號、單引號或無) 接受的字元之一。範例:.withQuotationMark("")(無引號)
- 預設值:
.withDebugMode()
- 接受
SnowflakeIO.StreamingLogLevel.INFO
- 顯示有關載入檔案的完整資訊SnowflakeIO.StreamingLogLevel.ERROR
- 僅顯示錯誤。
- 顯示有關串流檔案到 Snowflake 的日誌,類似於 insertReport。啟用偵錯模式可能會影響效能。
- 範例:
.withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)
- 接受
重要通知:
- 串流僅接受金鑰對驗證。詳細資訊請參閱:Issue 21287。
- 在
SnowflakeIO.DataSourceConfiguration
物件中設定的角色參數在串流寫入時會被忽略。詳細資訊請參閱:Issue 21365
刷新時間:持續時間 & 列數
持續時間:串流寫入將根據刷新時間限制中指定的時間持續定期地在暫存區寫入檔案(例如,每 1 分鐘)。
資料列數:除非達到刷新時間限制,否則準備寫入的檔案的資料列數將會依照刷新資料列限制中指定的數量。(例如,如果限制為 1000 列,且緩衝區收集了 99 列,但 1 分鐘的刷新時間已過,則這些列將會被傳送到 SnowPipe 進行插入。)
暫存檔案的大小將取決於資料列的大小和使用的壓縮方式(GZIP)。
UserDataMapper 函數
UserDataMapper
函式是必需的,用於將資料從 PCollection
對應到字串值陣列,然後 write()
操作會將資料儲存到臨時的 .csv
檔案中。例如:
其他寫入選項
轉換查詢
write()
操作的 .withQueryTransformation()
選項接受一個 SQL 查詢作為字串值,該查詢將在將 CSV 檔案中暫存的資料直接傳輸到目標 Snowflake 資料表時執行。有關轉換 SQL 語法的資訊,請參閱Snowflake 文件。
用法
寫入處置
透過為 write()
操作指定 .withWriteDisposition(...)
選項,根據資料將寫入的資料表來定義寫入行為。支援以下值:
APPEND
- 預設行為。寫入的資料會新增至資料表中現有的資料列。EMPTY
- 目標資料表必須是空的;否則,寫入操作會失敗。TRUNCATE
- 寫入操作會在寫入資料之前,刪除目標資料表中的所有資料列。
用法範例
建立處置
如果目標資料表不存在,.withCreateDisposition()
選項會定義寫入操作的行為。支援以下值:
CREATE_IF_NEEDED
- 預設行為。寫入操作會檢查指定目標資料表是否存在;如果不存在,寫入操作會嘗試建立資料表。請使用.withTableSchema()
選項來指定目標資料表的結構描述。CREATE_NEVER
- 如果目標資料表不存在,寫入操作會失敗。
用法
表格結構描述處置
當 .withCreateDisposition()
選項設定為 CREATE_IF_NEEDED
時,.withTableSchema()
選項可讓您指定所建立目標資料表的結構描述。資料表結構描述是 SnowflakeColumn
物件的清單,其中包含與資料表中每個資料行的資料行類型對應的名稱和類型。
用法
SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
new SnowflakeColumn("id", new SnowflakeNumber()),
SnowflakeColumn.of("name", new SnowflakeText(), true));
data.apply(
SnowflakeIO.<~>write()
.withDataSourceConfiguration(dc)
.to("MY_TABLE")
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withUserDataMapper(mapper)
.withTableSchema(tableSchema)
)
從 Snowflake 讀取
SnowflakeIO 的其中一個功能是讀取 Snowflake 資料表 - 可以透過資料表名稱讀取完整資料表,也可以透過查詢讀取自訂資料。讀取轉換的輸出是一個使用者定義資料類型的 PCollection。
一般用法
基本的 .read()
操作用法
PCollection<USER_DATA_TYPE> items = pipeline.apply(
SnowflakeIO.<USER_DATA_TYPE>read()
.withDataSourceConfiguration(dc)
.fromTable("MY_TABLE") // or .fromQuery("QUERY")
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withCsvMapper(mapper)
.withCoder(coder));
)
.withDataSourceConfiguration(...)
- 接受一個 DataSourceConfiguration 物件。
.fromTable(...)
或.fromQuery(...)
- 指定 Snowflake 資料表名稱或自訂 SQL 查詢。
.withStagingBucketName()
- 接受一個雲端儲存貯體名稱。
.withStorageIntegrationName()
接受一個根據 Snowflake 文件建立的 Snowflake 儲存整合物件的名稱。範例:
然後.withCsvMapper(mapper)
- 接受一個 CSVMapper 執行個體,用於將 String[] 對應到 USER_DATA_TYPE。
.withCoder(coder)
- 接受 USER_DATA_TYPE 的 Coder。
注意:SnowflakeIO 在幕後使用 COPY
陳述式來讀取(使用 COPY to location)雲端儲存中暫存的檔案。StagingBucketName 將被用作儲存 CSV 檔案的臨時位置。這些臨時目錄將被命名為 sf_copy_csv_DATE_TIME_RANDOMSUFFIX
,並且在讀取操作完成後將會自動移除。
CSVMapper
SnowflakeIO 使用 COPY INTO
CSVMapper 的工作是讓使用者能夠將字串陣列轉換為使用者定義的類型,例如 Avro 或 Parquet 檔案的 GenericRecord,或自訂 POJO。
GenericRecord 的 CsvMapper 實作範例
將 SnowflakeIO 與 AWS S3 搭配使用
若要能夠使用 AWS S3 儲存貯體作為 stagingBucketName
,則需要:
- 建立一個
PipelineOptions
介面,該介面會擴充SnowflakePipelineOptions
和 S3Options,並包含AwsAccessKey
和AwsSecretKey
選項。範例:
public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {
@Description("AWS Access Key")
@Default.String("access_key")
String getAwsAccessKey();
void setAwsAccessKey(String awsAccessKey);
@Description("AWS secret key")
@Default.String("secret_key")
String getAwsSecretKey();
void setAwsSecretKey(String awsSecretKey);
}
AwsAccessKey
和 AwsSecretKey
選項來設定 AwsCredentialsProvider
選項。注意:請記得從 S3Options 設定 awsRegion
。
在 Python SDK 中使用 SnowflakeIO
簡介
Snowflake 跨語言實作支援 Python 程式語言的讀取和寫入操作,這要歸功於跨語言,這是 可移植性架構藍圖 的一部分,旨在提供整個 Beam 生態系統的完整互通性。從開發人員的角度來看,這意味著可以結合以不同語言(Java/Python/Go)撰寫的轉換。
有關跨語言的更多資訊,請參閱多 SDK 工作和跨語言轉換 API 和擴充服務文章。
其他資源
從 Snowflake 讀取
SnowflakeIO 的其中一個功能是讀取 Snowflake 資料表 - 可以透過資料表名稱讀取完整資料表,也可以透過查詢讀取自訂資料。讀取轉換的輸出是一個使用者定義資料類型的 PCollection。
一般用法
必要參數
server_name
完整的 Snowflake 伺服器名稱,包含帳戶、區域和網域。schema
要使用的資料庫中 Snowflake 結構描述的名稱。database
要使用的 Snowflake 資料庫的名稱。staging_bucket_name
Google Cloud Storage 儲存貯體或 AWS S3 儲存貯體的名稱。該儲存貯體將被用作儲存 CSV 檔案的臨時位置。這些臨時目錄將被命名為sf_copy_csv_DATE_TIME_RANDOMSUFFIX
,並且在讀取操作完成後將會自動移除。storage_integration_name
根據 Snowflake 文件建立的 Snowflake 儲存整合物件的名稱。csv_mapper
指定一個函式,該函式必須將使用者定義的物件轉換為字串陣列。SnowflakeIO 使用 COPY INTO陳述式將資料從 Snowflake 資料表移動到 GCS/S3 作為 CSV 檔案。然後,這些檔案會透過 FileIO 下載,並逐行處理。每一行都會使用 OpenCSV 程式庫分割成字串陣列。 csv_mapper
函式的工作是讓使用者能夠將字串陣列轉換為使用者定義的類型,例如 Avro 或 Parquet 檔案的 GenericRecord,或自訂物件。範例:table
或query
指定 Snowflake 資料表名稱或自訂 SQL 查詢。
驗證參數
需要傳遞以下其中一個有效參數組合以進行驗證:
username
和password
指定使用者名稱/密碼驗證方法的使用者名稱和密碼。private_key_path
和private_key_passphrase
指定私密金鑰的路徑和金鑰組驗證方法的密碼。raw_private_key
和private_key_passphrase
指定金鑰組驗證方法的私密金鑰和密碼。o_auth_token
指定 OAuth 驗證方法的存取權杖。
其他參數
role
指定 Snowflake 角色。如果未指定,則將使用使用者的預設角色。warehouse
指定 Snowflake 資料倉儲名稱。如果未指定,則將使用使用者的預設資料倉儲。expansion_service
指定擴充服務的 URL。
寫入 Snowflake
SnowflakeIO 的其中一個功能是寫入 Snowflake 資料表。此轉換可讓您使用將使用者的 PCollection 發送到 Snowflake 資料庫的輸出操作來完成 Beam 管道。
一般用法
OPTIONS = ["--runner=FlinkRunner"]
with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
(p
| <SOURCE OF DATA>
| WriteToSnowflake(
server_name=<SNOWFLAKE SERVER NAME>,
username=<SNOWFLAKE USERNAME>,
password=<SNOWFLAKE PASSWORD>,
o_auth_token=<OAUTH TOKEN>,
private_key_path=<PATH TO P8 FILE>,
raw_private_key=<PRIVATE_KEY>
private_key_passphrase=<PASSWORD FOR KEY>,
schema=<SNOWFLAKE SCHEMA>,
database=<SNOWFLAKE DATABASE>,
staging_bucket_name=<GCS OR S3 BUCKET>,
storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
create_disposition=<CREATE DISPOSITION>,
write_disposition=<WRITE DISPOSITION>,
table_schema=<SNOWFLAKE TABLE SCHEMA>,
user_data_mapper=<USER DATA MAPPER FUNCTION>,
table=<SNOWFLAKE TABLE>,
query=<IF NOT TABLE THEN QUERY>,
role=<SNOWFLAKE ROLE>,
warehouse=<SNOWFLAKE WAREHOUSE>,
expansion_service=<EXPANSION SERVICE ADDRESS>))
必要參數
server_name
完整的 Snowflake 伺服器名稱,包含帳戶、區域和網域。schema
要使用的資料庫中 Snowflake 結構描述的名稱。database
要使用的 Snowflake 資料庫的名稱。staging_bucket_name
Google Cloud Storage 儲存貯體或 AWS S3 儲存貯體的路徑,並以斜線結尾。此儲存貯體將用於儲存 CSV 檔案,這些檔案最終將會進入 Snowflake。這些 CSV 檔案將儲存在「staging_bucket_name」路徑下。storage_integration_name
根據 Snowflake 文件建立的 Snowflake 儲存整合物件的名稱。user_data_mapper
指定一個函式,該函式會將資料從 PCollection 對應到字串值陣列,然後寫入操作會將資料儲存到臨時的 .csv 檔案中。範例:table
或query
指定 Snowflake 資料表名稱或自訂 SQL 查詢。
驗證參數
需要傳遞以下其中一個有效參數組合以進行驗證:
username
和password
指定使用者名稱/密碼驗證方法。private_key_path
和private_key_passphrase
指定私密金鑰的路徑和金鑰組驗證方法的密碼。raw_private_key
和private_key_passphrase
指定金鑰組驗證方法的私密金鑰和密碼。o_auth_token
指定 OAuth 驗證方法的存取權杖。
其他參數
role
指定 Snowflake 角色。如果未指定,則將使用使用者的預設角色。warehouse
指定 Snowflake 資料倉儲名稱。如果未指定,則將使用使用者的預設資料倉儲。create_disposition
定義如果目標資料表不存在,寫入操作的行為。支援以下值:CREATE_IF_NEEDED
- 預設行為。寫入操作會檢查指定目標資料表是否存在;如果不存在,寫入操作會嘗試建立資料表。請使用table_schema
參數來指定目標資料表的結構描述。CREATE_NEVER
- 如果目標資料表不存在,寫入操作會失敗。
write_disposition
根據資料將寫入的資料表來定義寫入行為。支援以下值:APPEND
- 預設行為。寫入的資料會新增至資料表中現有的資料列。EMPTY
- 目標資料表必須是空的;否則,寫入操作會失敗。TRUNCATE
- 寫入操作會在寫入資料之前,刪除目標資料表中的所有資料列。
table_schema
當create_disposition
參數設定為 CREATE_IF_NEEDED 時,table_schema
參數可讓您指定所建立目標資料表的結構描述。資料表結構描述是一個 JSON 陣列,其結構如下:所有支援的資料類型您可以在 Snowflake 資料類型 中閱讀有關 Snowflake 資料類型的資訊。{"type":"date"}, {"type":"datetime"}, {"type":"time"}, {"type":"timestamp"}, {"type":"timestamp_ltz"}, {"type":"timestamp_ntz"}, {"type":"timestamp_tz"}, {"type":"boolean"}, {"type":"decimal","precision":38,"scale":1}, {"type":"double"}, {"type":"float"}, {"type":"integer","precision":38,"scale":0}, {"type":"number","precision":38,"scale":1}, {"type":"numeric","precision":38,"scale":2}, {"type":"real"}, {"type":"array"}, {"type":"object"}, {"type":"variant"}, {"type":"binary","size":null}, {"type":"char","length":1}, {"type":"string","length":null}, {"type":"text","length":null}, {"type":"varbinary","size":null}, {"type":"varchar","length":100}]
expansion_service
指定擴充服務的 URL。
限制
SnowflakeIO 目前有以下限制。
串流寫入僅支援金鑰組驗證。如需詳細資訊,請參閱:Issue 21287。
在
SnowflakeIO.DataSourceConfiguration
物件中設定的角色參數在串流寫入時會被忽略。詳細資訊請參閱:Issue 21365
上次更新日期:2024/10/31
您是否找到了您要找的所有資訊?
這些資訊是否對您有幫助且清楚易懂?您有任何想要變更的地方嗎?請告訴我們!