Beam SQL 擴充功能:CREATE EXTERNAL TABLE

Beam SQL 的 CREATE EXTERNAL TABLE 語句會註冊一個虛擬表格,該表格會對應到一個外部儲存系統。對於某些儲存系統,CREATE EXTERNAL TABLE 在寫入發生之前不會建立實體表格。在實體表格存在之後,您可以使用 SELECTJOININSERT INTO 語句來存取該表格。

CREATE EXTERNAL TABLE 語句包含綱要和擴充子句。

語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE type
[LOCATION location]
[TBLPROPERTIES tblProperties]

simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR

fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*>

tableElement: columnName fieldType [ NOT NULL ]

BigQuery

語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE bigquery
LOCATION '[PROJECT_ID]:[DATASET].[TABLE]'
TBLPROPERTIES '{"method": "DIRECT_READ"}'

讀取模式

Beam SQL 支援讀取具有簡單型別 (simpleType) 和簡單型別陣列 (ARRAY<simpleType>) 的欄位。

使用 EXPORT 方法讀取時,應設定以下管道選項

使用 DIRECT_READ 方法讀取時,最佳化工具會嘗試執行專案和述詞下推,這可能會減少從 BigQuery 讀取資料所需的時間。

如需 BigQuery Storage API 的詳細資訊,請參閱此處

寫入模式

如果表格不存在,當寫入第一個記錄時,Beam 會建立在位置中指定的表格。如果表格確實存在,指定的欄位必須與現有的表格相符。

綱要

與綱要相關的錯誤會導致管道崩潰。不支援 Map 型別。Beam SQL 型別會對應到 BigQuery Standard SQL 型別,如下所示

Beam SQL 型別BigQuery Standard SQL 型別
TINYINT、SMALLINT、INTEGER、BIGINT  INT64
FLOAT、DOUBLE、DECIMALFLOAT64
BOOLEANBOOL
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
CHAR、VARCHARSTRING
MAP(不支援)
ARRAYARRAY
ROWSTRUCT

範例

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE bigquery
LOCATION 'testing-integration:apache.users'

Cloud Bigtable

語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
    key VARCHAR NOT NULL,
    family ROW<qualifier cells [, qualifier cells ]* >
    [, family ROW< qualifier cells [, qualifier cells ]* > ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'

使用平面綱要的替代語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
    key VARCHAR NOT NULL,
    qualifier SIMPLE_TYPE
    [, qualifier SIMPLE_TYPE ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
TBLPROPERTIES '{
  "columnsMapping": "family:qualifier[,family:qualifier]*"
}'

讀取模式

Beam SQL 支援讀取具有強制 key 欄位的列,以及至少一個具有至少一個 qualifierfamily。儲存格以簡單型別 (SIMPLE_TYPE) 或具有強制 val 欄位、選用 timestampMicros 和選用 labels 的 ROW 型別表示。兩者都會讀取欄位中的最新儲存格。指定為簡單型別陣列 (ARRAY<simpleType>) 的儲存格允許讀取欄位的所有值。

對於平面綱要,只允許 SIMPLE_TYPE 值。除了 key 之外的每個欄位都必須對應到 columnsMapping 中指定的金鑰值配對。

不必將所有現有的欄位族和限定詞提供給綱要。

篩選器僅允許由具有 RE2 語法 Regex 的單一 LIKE 語句來篩選 key 欄位,例如 SELECT * FROM table WHERE key LIKE '^key[012]{1}'

寫入模式

僅支援平面綱要。

範例

CREATE EXTERNAL TABLE beamTable(
  key VARCHAR NOT NULL,
  beamFamily ROW<
     boolLatest BOOLEAN NOT NULL,
     longLatestWithTs ROW<
        val BIGINT NOT NULL,
        timestampMicros BIGINT NOT NULL
      > NOT NULL,
      allStrings ARRAY<VARCHAR> NOT NULL,
      doubleLatestWithTsAndLabels ROW<
        val DOUBLE NOT NULL,
        timestampMicros BIGINT NOT NULL,
        labels ARRAY<VARCHAR> NOT NULL
      > NOT NULL,
      binaryLatestWithLabels ROW<
         val BINARY NOT NULL,
         labels ARRAY<VARCHAR> NOT NULL
      > NOT NULL
    > NOT NULL
  )
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'

平面綱要範例

CREATE EXTERNAL TABLE flatTable(
  key VARCHAR NOT NULL,
  boolColumn BOOLEAN NOT NULL,
  longColumn BIGINT NOT NULL,
  stringColumn VARCHAR NOT NULL,
  doubleColumn DOUBLE NOT NULL,
  binaryColumn BINARY NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable'
TBLPROPERTIES '{
  "columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn"
}'

寫入範例

INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn)
  VALUES ('key', TRUE, 10, 'stringValue', 5.5)

Pub/Sub

語法

巢狀模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
    event_timestamp TIMESTAMP,
    attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
    payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

扁平模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

在巢狀模式中,以下欄位會保留主題中繼資料。attributes 欄位的存在會觸發巢狀模式的使用。

讀取模式

PubsubIO 支援透過建立新的訂閱從主題讀取資料。

寫入模式

PubsubIO 支援寫入資料至主題。

綱要

Pub/Sub 訊息具有與其關聯的中繼資料,您可以在查詢中參考此中繼資料。對於每個訊息,Pub/Sub 除了酬載(一般情況下為非結構化)之外,還會公開其發佈時間和使用者提供的屬性對應。此資訊必須保留,並且可以從 SQL 陳述式存取。目前,這表示 PubsubIO 資料表需要您宣告一組特殊的欄,如下所示。

支援的酬載

範例

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/testing-integration/topics/user-location'

Pub/Sub Lite

語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
    publish_timestamp DATETIME,
    event_timestamp DATETIME,
    message_key BYTES,
    attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
    payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsublite
// For writing
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
// For reading
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'

讀取模式

PubsubLiteIO 支援從訂閱讀取資料。

寫入模式

PubsubLiteIO 支援寫入資料至主題。

支援的酬載

範例

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsublite
LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'

Kafka

KafkaIO 在 Beam SQL 中為實驗性質。

語法

扁平模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
    "topics": ["topic2", "topic3"],
    "format": "json"
}'

巢狀模式

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
  event_timestamp DATETIME,
  message_key BYTES,
  headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
  payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
    "topics": ["topic2", "topic3"],
    "format": "json"
}'

headers 欄位的存在會觸發巢狀模式的使用。

讀取模式

讀取模式支援從主題讀取資料。

寫入模式

寫入模式支援寫入資料至主題。

支援的格式

綱要

對於 CSV,僅支援簡單類型。

MongoDB

語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE mongodb
LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'

讀取模式

讀取模式支援從集合讀取資料。

寫入模式

寫入模式支援寫入資料至集合。

綱要

僅支援簡單類型。MongoDB 文件透過 JsonToRow 轉換對應到 Beam SQL 類型。

範例

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE mongodb
LOCATION 'mongodb://localhost:27017/apache/users'

文字

TextIO 在 Beam SQL 中為實驗性質。讀取模式和寫入模式目前無法存取相同的基礎資料。

語法

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE text
LOCATION '/home/admin/orders'
TBLPROPERTIES '{"format: "Excel"}'
format 的值欄位分隔符號引號記錄分隔符號忽略空白行?允許遺失欄名稱?
預設,"\r\n
rfc4180,"\r\n
excel,"\r\n
tdf\t"\r\n
mysql\t\n

讀取模式

讀取模式支援從檔案讀取資料。

寫入模式

寫入模式支援寫入資料至一組檔案。TextIO 會在寫入時建立檔案。

支援的酬載

綱要

僅支援簡單類型。

範例

CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER)
TYPE text
LOCATION '/home/admin/orders'

通用酬載處理

某些資料來源和接收器支援通用酬載處理。此處理會將位元組陣列酬載欄位剖析為資料表結構描述。此處理支援以下結構描述。所有結構描述都至少需要設定 "format": "<type>",並且可能需要其他屬性。

通用 DLQ 處理

支援通用 DLQ 處理的來源和接收器會指定格式為 "<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]" 的參數。支援以下類型的 DLQ 處理