Beam SQL 擴充功能:CREATE EXTERNAL TABLE
Beam SQL 的 CREATE EXTERNAL TABLE
語句會註冊一個虛擬表格,該表格會對應到一個外部儲存系統。對於某些儲存系統,CREATE EXTERNAL TABLE
在寫入發生之前不會建立實體表格。在實體表格存在之後,您可以使用 SELECT
、JOIN
和 INSERT INTO
語句來存取該表格。
CREATE EXTERNAL TABLE
語句包含綱要和擴充子句。
語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE type
[LOCATION location]
[TBLPROPERTIES tblProperties]
simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR
fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*>
tableElement: columnName fieldType [ NOT NULL ]
IF NOT EXISTS
:選用。如果表格已經註冊,Beam SQL 會忽略該語句,而不是傳回錯誤。tableName
:要建立和註冊的表格的區分大小寫名稱,指定為 識別碼。表格名稱不需要與底層資料儲存系統中的名稱相符。tableElement
:columnName
fieldType
[ NOT NULL ]
columnName
:欄位的區分大小寫名稱,指定為反引號括住的運算式。fieldType
:欄位的型別,指定為下列其中一種型別simpleType
:TINYINT
、SMALLINT
、INTEGER
、BIGINT
、FLOAT
、DOUBLE
、DECIMAL
、BOOLEAN
、DATE
、TIME
、TIMESTAMP
、CHAR
、VARCHAR
MAP<simpleType, fieldType>
ARRAY<fieldType>
ROW<tableElement [, tableElement ]*>
NOT NULL
:選用。表示該欄位不可為 Null。
type
:支援虛擬表格的 I/O 轉換,指定為具有下列其中一個值的識別碼bigquery
bigtable
pubsub
kafka
text
location
:底層表格的 I/O 特定位置,指定為字串文字。請參閱 I/O 特定章節,了解location
格式的需求。tblProperties
:I/O 特定引號括住的金鑰值 JSON 物件,其中包含額外的組態,指定為字串文字。請參閱 I/O 特定章節,了解tblProperties
格式的需求。
BigQuery
語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE bigquery
LOCATION '[PROJECT_ID]:[DATASET].[TABLE]'
TBLPROPERTIES '{"method": "DIRECT_READ"}'
LOCATION
:表格在 BigQuery CLI 格式中的位置。PROJECT_ID
:Google Cloud Project 的 ID。DATASET
:BigQuery 資料集 ID。TABLE
:資料集中的 BigQuery 表格 ID。
TBLPROPERTIES
:method
:選用。要使用的讀取方法。以下選項可用DIRECT_READ
:使用 BigQuery Storage API。EXPORT
:以 Avro 格式將資料匯出到 Google Cloud Storage,並從該位置讀取資料檔案。- 對於 Beam 2.21+,預設為
DIRECT_READ
(舊版本使用EXPORT
)。
讀取模式
Beam SQL 支援讀取具有簡單型別 (simpleType
) 和簡單型別陣列 (ARRAY<simpleType>
) 的欄位。
使用 EXPORT
方法讀取時,應設定以下管道選項
project
:Google Cloud Project 的 ID。tempLocation
:用於儲存中繼資料的儲存區。例如:gs://temp-storage/temp
。
使用 DIRECT_READ
方法讀取時,最佳化工具會嘗試執行專案和述詞下推,這可能會減少從 BigQuery 讀取資料所需的時間。
如需 BigQuery Storage API 的詳細資訊,請參閱此處。
寫入模式
如果表格不存在,當寫入第一個記錄時,Beam 會建立在位置中指定的表格。如果表格確實存在,指定的欄位必須與現有的表格相符。
綱要
與綱要相關的錯誤會導致管道崩潰。不支援 Map 型別。Beam SQL 型別會對應到 BigQuery Standard SQL 型別,如下所示
Beam SQL 型別 | BigQuery Standard SQL 型別 |
TINYINT、SMALLINT、INTEGER、BIGINT | INT64 |
FLOAT、DOUBLE、DECIMAL | FLOAT64 |
BOOLEAN | BOOL |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
CHAR、VARCHAR | STRING |
MAP | (不支援) |
ARRAY | ARRAY |
ROW | STRUCT |
範例
CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE bigquery
LOCATION 'testing-integration:apache.users'
Cloud Bigtable
語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
key VARCHAR NOT NULL,
family ROW<qualifier cells [, qualifier cells ]* >
[, family ROW< qualifier cells [, qualifier cells ]* > ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
key
:Bigtable 列的金鑰family
:欄位族的名稱qualifier
:欄位限定詞cells
:每個值的其中之一TYPE
ARRAY<SIMPLE_TYPE>
LOCATION
:PROJECT_ID
:Google Cloud Project 的 ID。INSTANCE_ID
:Bigtable 執行個體 ID。TABLE
:Bigtable 表格 ID。
TYPE
:SIMPLE_TYPE
或CELL_ROW
CELL_ROW
:ROW<val SIMPLE_TYPE [, timestampMicros BIGINT [NOT NULL]] [, labels ARRAY<VARCHAR> [NOT NULL]]
SIMPLE_TYPE
:下列其中之一BINARY
VARCHAR
BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
FLOAT
BOOLEAN
TIMESTAMP
使用平面綱要的替代語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
key VARCHAR NOT NULL,
qualifier SIMPLE_TYPE
[, qualifier SIMPLE_TYPE ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
TBLPROPERTIES '{
"columnsMapping": "family:qualifier[,family:qualifier]*"
}'
key
:Bigtable 列的金鑰family
:欄位族的名稱qualifier
:欄位限定詞LOCATION
:PROJECT_ID
:Google Cloud Project 的 ID。INSTANCE_ID
:Bigtable 執行個體 ID。TABLE
:Bigtable 表格 ID。
TBLPROPERTIES
:包含columnsMapping
金鑰的 JSON 物件,其中金鑰值配對以逗號分隔,並以冒號分隔SIMPLE_TYPE
:與先前語法中相同
讀取模式
Beam SQL 支援讀取具有強制 key
欄位的列,以及至少一個具有至少一個 qualifier
的 family
。儲存格以簡單型別 (SIMPLE_TYPE
) 或具有強制 val
欄位、選用 timestampMicros
和選用 labels
的 ROW 型別表示。兩者都會讀取欄位中的最新儲存格。指定為簡單型別陣列 (ARRAY<simpleType>
) 的儲存格允許讀取欄位的所有值。
對於平面綱要,只允許 SIMPLE_TYPE
值。除了 key
之外的每個欄位都必須對應到 columnsMapping
中指定的金鑰值配對。
不必將所有現有的欄位族和限定詞提供給綱要。
篩選器僅允許由具有 RE2 語法 Regex 的單一 LIKE
語句來篩選 key
欄位,例如 SELECT * FROM table WHERE key LIKE '^key[012]{1}'
寫入模式
僅支援平面綱要。
範例
CREATE EXTERNAL TABLE beamTable(
key VARCHAR NOT NULL,
beamFamily ROW<
boolLatest BOOLEAN NOT NULL,
longLatestWithTs ROW<
val BIGINT NOT NULL,
timestampMicros BIGINT NOT NULL
> NOT NULL,
allStrings ARRAY<VARCHAR> NOT NULL,
doubleLatestWithTsAndLabels ROW<
val DOUBLE NOT NULL,
timestampMicros BIGINT NOT NULL,
labels ARRAY<VARCHAR> NOT NULL
> NOT NULL,
binaryLatestWithLabels ROW<
val BINARY NOT NULL,
labels ARRAY<VARCHAR> NOT NULL
> NOT NULL
> NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'
平面綱要範例
CREATE EXTERNAL TABLE flatTable(
key VARCHAR NOT NULL,
boolColumn BOOLEAN NOT NULL,
longColumn BIGINT NOT NULL,
stringColumn VARCHAR NOT NULL,
doubleColumn DOUBLE NOT NULL,
binaryColumn BINARY NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable'
TBLPROPERTIES '{
"columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn"
}'
寫入範例
INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn)
VALUES ('key', TRUE, 10, 'stringValue', 5.5)
Pub/Sub
語法
巢狀模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
event_timestamp TIMESTAMP,
attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
扁平模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
在巢狀模式中,以下欄位會保留主題中繼資料。attributes
欄位的存在會觸發巢狀模式的使用。
event_timestamp
:Pub/Sub 訊息透過 PubsubIO 關聯的事件時間戳記。它可以是下列其中之一- 訊息發佈時間,由 Pub/Sub 提供。如果未提供額外組態,這會是預設值。
- 在使用者提供的訊息屬性之一中指定的時間戳記。屬性金鑰由
tblProperties
blob 的timestampAttributeKey
欄位設定。屬性的值應符合 PubsubIO 的需求,即自 Unix Epoch 以來的毫秒數或 RFC 339 日期字串。
attributes
:Pub/Sub 訊息的使用者提供屬性對應;payload
:Pub/Sub 訊息的酬載綱要。如果無法還原序列化記錄,則該記錄會寫入tblProperties
blob 的deadLeaderQueue
欄位中指定的主題。如果在此情況下未指定死信佇列,則會擲回例外狀況,且管道會崩潰。LOCATION
:PROJECT
:Google Cloud 專案的 ID。TOPIC
:Pub/Sub 主題名稱。將會自動建立訂閱,但該訂閱不會自動清除。不支援指定現有的訂閱。
TBLPROPERTIES
:timestampAttributeKey
:選填。包含與 Pub/Sub 訊息關聯的事件時間戳記的索引鍵。如果未指定,則訊息發佈時間戳記會用作視窗化/浮水印的事件時間戳記。deadLetterQueue
:如果無法剖析酬載,則訊息會寫入此主題。如果未指定,則會針對剖析失敗擲回例外狀況。format
:選填。可讓您指定 Pub/Sub 酬載格式。
讀取模式
PubsubIO 支援透過建立新的訂閱從主題讀取資料。
寫入模式
PubsubIO 支援寫入資料至主題。
綱要
Pub/Sub 訊息具有與其關聯的中繼資料,您可以在查詢中參考此中繼資料。對於每個訊息,Pub/Sub 除了酬載(一般情況下為非結構化)之外,還會公開其發佈時間和使用者提供的屬性對應。此資訊必須保留,並且可以從 SQL 陳述式存取。目前,這表示 PubsubIO 資料表需要您宣告一組特殊的欄,如下所示。
支援的酬載
- Pub/Sub 支援通用酬載處理。
範例
CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/testing-integration/topics/user-location'
Pub/Sub Lite
語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
publish_timestamp DATETIME,
event_timestamp DATETIME,
message_key BYTES,
attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsublite
// For writing
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
// For reading
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
LOCATION
:PROJECT
:Google Cloud 專案的 ID。TOPIC
:Pub/Sub Lite 主題名稱。SUBSCRIPTION
:Pub/Sub Lite 訂閱名稱。GCP-LOCATION
:此 Pub/Sub Lite 主題或訂閱的位置。
TBLPROPERTIES
:timestampAttributeKey
:選填。包含與 Pub/Sub 訊息關聯的事件時間戳記的索引鍵。如果未指定,則訊息發佈時間戳記會用作視窗化/浮水印的事件時間戳記。deadLetterQueue
:選填,支援通用 DLQ 處理format
:選填。可讓您指定酬載格式。
讀取模式
PubsubLiteIO 支援從訂閱讀取資料。
寫入模式
PubsubLiteIO 支援寫入資料至主題。
支援的酬載
- Pub/Sub Lite 支援通用酬載處理。
範例
CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsublite
LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
Kafka
KafkaIO 在 Beam SQL 中為實驗性質。
語法
扁平模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
"bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
"topics": ["topic2", "topic3"],
"format": "json"
}'
巢狀模式
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
event_timestamp DATETIME,
message_key BYTES,
headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
"bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
"topics": ["topic2", "topic3"],
"format": "json"
}'
headers
欄位的存在會觸發巢狀模式的使用。
LOCATION
:一個 URL,其中包含要使用的初始啟動代理程式和初始主題名稱(以路徑形式提供)。TBLPROPERTIES
:bootstrap_servers
:選填。可讓您指定其他啟動伺服器,這些伺服器會與LOCATION
中的伺服器一起使用。topics
:選填。可讓您指定其他主題,這些主題會與LOCATION
中的主題一起使用。format
:選填。可讓您指定 Kafka 值的格式。可能的值為 {csv
、avro
、json
、proto
、thrift
}。在扁平模式下預設為csv
,在巢狀模式下預設為json
。csv
不支援巢狀模式。
讀取模式
讀取模式支援從主題讀取資料。
寫入模式
寫入模式支援寫入資料至主題。
支援的格式
- CSV(預設)
- Beam 會剖析訊息,嘗試根據結構描述中指定的類型來剖析欄位。
- Kafka 支援所有通用酬載處理格式。
綱要
對於 CSV,僅支援簡單類型。
MongoDB
語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE mongodb
LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'
LOCATION
:集合的位置。HOST
:MongoDB 伺服器的位置。可以是 localhost 或 IP 位址。當需要驗證時,可以使用以下格式指定使用者名稱和密碼:username:password@localhost
。PORT
:MongoDB 伺服器正在監聽的連接埠。DATABASE
:要連線的資料庫。COLLECTION
:資料庫中的集合。
讀取模式
讀取模式支援從集合讀取資料。
寫入模式
寫入模式支援寫入資料至集合。
綱要
僅支援簡單類型。MongoDB 文件透過 JsonToRow
轉換對應到 Beam SQL 類型。
範例
CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE mongodb
LOCATION 'mongodb://localhost:27017/apache/users'
文字
TextIO 在 Beam SQL 中為實驗性質。讀取模式和寫入模式目前無法存取相同的基礎資料。
語法
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE text
LOCATION '/home/admin/orders'
TBLPROPERTIES '{"format: "Excel"}'
LOCATION
:讀取模式的檔案路徑。寫入模式的前置詞。TBLPROPERTIES
:format
:選填。可讓您指定 CSV 格式,以控制欄位分隔符號、引號字元、記錄分隔符號和其他屬性。請參閱下表
format 的值 | 欄位分隔符號 | 引號 | 記錄分隔符號 | 忽略空白行? | 允許遺失欄名稱? |
---|---|---|---|---|---|
預設 | , | " | \r\n | 是 | 否 |
rfc4180 | , | " | \r\n | 否 | 否 |
excel | , | " | \r\n | 否 | 是 |
tdf | \t | " | \r\n | 是 | 否 |
mysql | \t | 無 | \n | 否 | 否 |
讀取模式
讀取模式支援從檔案讀取資料。
寫入模式
寫入模式支援寫入資料至一組檔案。TextIO 會在寫入時建立檔案。
支援的酬載
- CSV
- Beam 會剖析訊息,嘗試使用 org.apache.commons.csv 根據結構描述中指定的類型來剖析欄位。
綱要
僅支援簡單類型。
範例
CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER)
TYPE text
LOCATION '/home/admin/orders'
通用酬載處理
某些資料來源和接收器支援通用酬載處理。此處理會將位元組陣列酬載欄位剖析為資料表結構描述。此處理支援以下結構描述。所有結構描述都至少需要設定 "format": "<type>"
,並且可能需要其他屬性。
avro
:Avro- Avro 結構描述會從指定的欄位類型自動產生。它用於剖析傳入訊息和格式化傳出訊息。
json
:JSON 物件- Beam 會嘗試將位元組陣列剖析為 UTF-8 JSON,以符合結構描述。
proto
:Protocol Buffers- Beam 會尋找對等的 Protocol Buffer 類別,並使用它來剖析酬載
protoClass
:必要。要使用的 proto 類別名稱。必須建置到部署的 JAR 中。- 結構描述中的欄位必須符合給定
protoClass
的欄位。
thrift
:Thrift- 結構描述中的欄位必須符合給定
thriftClass
的欄位。 thriftClass
:必要。可讓您指定完整的 Thrift Java 類別名稱。必須建置到部署的 JAR 中。thriftProtocolFactoryClass
:必要。可讓您指定要用於 Thrift 序列化的TProtocolFactory
的完整類別名稱。必須建置到部署的 JAR 中。- 用於 Thrift 序列化的
TProtocolFactory
必須符合提供的thriftProtocolFactoryClass
。
- 結構描述中的欄位必須符合給定
通用 DLQ 處理
支援通用 DLQ 處理的來源和接收器會指定格式為 "<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"
的參數。支援以下類型的 DLQ 處理
bigquery
:BigQuery- DLQ_ID 是具有「error」字串欄位和「payload」位元組陣列欄位的輸出資料表的資料表規格。
pubsub
:Pub/Sub 主題- DLQ_ID 是 Pub/Sub 主題的完整路徑。
pubsublite
:Pub/Sub Lite 主題- DLQ_ID 是 Pub/Sub Lite 主題的完整路徑。