Google BigQuery I/O 連接器
- Java SDK
- Python SDK
Beam SDK 包含內建的轉換,可以從 Google BigQuery 資料表讀取資料並將資料寫入其中。
開始之前
若要使用 BigQueryIO,請將 Maven 構件相依性新增至您的 pom.xml
檔案。
其他資源
若要使用 BigQueryIO,您必須執行 pip install apache-beam[gcp]
來安裝 Google Cloud Platform 相依性。
其他資源
BigQuery 基礎
資料表名稱
若要從 BigQuery 資料表讀取或寫入,您必須提供完整限定的 BigQuery 資料表名稱(例如,bigquery-public-data:github_repos.sample_contents
)。完整限定的 BigQuery 資料表名稱由三個部分組成
- 專案 ID:Google Cloud 專案的 ID。預設值來自您的管道選項物件。
- 資料集 ID:BigQuery 資料集 ID,在給定的 Cloud 專案中是唯一的。
- 資料表 ID:BigQuery 資料表 ID,在給定的資料集中是唯一的。
如果您使用時間分割資料表,資料表名稱也可以包含資料表裝飾器。
若要指定 BigQuery 資料表,您可以使用資料表的完整限定名稱作為字串,或使用 TableReference TableReference 物件。
使用字串
若要使用字串指定資料表,請使用 [project_id]:[dataset_id].[table_id]
格式指定完整限定的 BigQuery 資料表名稱。
您也可以省略 project_id
並使用 [dataset_id].[table_id]
格式。如果您省略專案 ID,Beam 會使用管道選項中的預設專案 ID。管道選項。
使用 TableReference
若要使用 TableReference
指定資料表,請使用 BigQuery 資料表名稱的三個部分建立新的 TableReference
。
適用於 Java 的 Beam SDK 也提供 parseTableSpec
協助程式方法,該方法會從包含完整限定 BigQuery 資料表名稱的字串建構 TableReference
物件。但是,BigQueryIO 轉換的靜態工廠方法接受資料表名稱作為字串,並為您建構 TableReference
物件。
資料表列
BigQueryIO 讀取和寫入轉換會產生和使用資料,作為字典的 PCollection
,其中 PCollection
中的每個元素都代表資料表中的單一列。
結構描述
在寫入 BigQuery 時,您必須為要寫入的目的地資料表提供資料表結構描述,除非您指定 CREATE_NEVER
的建立處置。 建立資料表結構描述更詳細地說明結構描述。
資料類型
BigQuery 支援下列資料類型:STRING、BYTES、INTEGER、FLOAT、NUMERIC、BOOLEAN、TIMESTAMP、DATE、TIME、DATETIME 和 GEOGRAPHY。如需 Google Standard SQL 資料類型的概述,請參閱資料類型。BigQueryIO 允許您使用所有這些資料類型。以下範例顯示從 BigQuery 讀取和寫入時使用的資料類型的正確格式
import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class BigQueryTableRowCreate {
public static TableRow createTableRow() {
TableRow row =
new TableRow()
// To learn more about BigQuery data types:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
.set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
.set("int64_field", 432)
.set("float64_field", 3.141592653589793)
.set("numeric_field", new BigDecimal("1234.56").toString())
.set("bool_field", true)
.set(
"bytes_field",
Base64.getEncoder()
.encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))
// To learn more about date formatting:
// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
.set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
.set(
"datetime_field",
LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
.set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
.set(
"timestamp_field",
Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT
// To learn more about the geography Well-Known Text (WKT) format:
// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
.set("geography_field", "POINT(30 10)")
// An array has its mode set to REPEATED.
.set("array_field", Arrays.asList(1, 2, 3, 4))
// Any class can be written as a STRUCT as long as all the fields in the
// schema are present and they are encoded correctly as BigQuery types.
.set(
"struct_field",
Stream.of(
new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
new SimpleEntry<>("int64_value", "42"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
return row;
}
}
bigquery_data = [{
'string': 'abc',
'bytes': base64.b64encode(b'\xab\xac'),
'integer': 5,
'float': 0.5,
'numeric': Decimal('5'),
'boolean': True,
'timestamp': '2018-12-31 12:44:31.744957 UTC',
'date': '2018-12-31',
'time': '12:44:31',
'datetime': '2018-12-31T12:44:31',
'geography': 'POINT(30 10)'
}]
自 Beam 2.7.0 起,支援 NUMERIC 資料類型。此資料類型支援高精確度的十進位數字(精確度為 38 位數,小數位數為 9 位數)。GEOGRAPHY 資料類型可與 Well-Known Text 搭配使用(請參閱 https://en.wikipedia.org/wiki/Well-known_text 格式,以在 BigQuery 中讀取和寫入)。當寫入 BigQuery 時,BigQuery IO 要求 BYTES 資料類型的值使用 base64 編碼。當從 BigQuery 讀取位元組時,它們會以 base64 編碼的字串形式傳回。
自 Beam 2.7.0 起,支援 NUMERIC 資料類型。此資料類型支援高精確度的十進位數字(精確度為 38 位數,小數位數為 9 位數)。GEOGRAPHY 資料類型可與 Well-Known Text 搭配使用(請參閱 https://en.wikipedia.org/wiki/Well-known_text 格式,以在 BigQuery 中讀取和寫入)。當寫入 BigQuery 時,BigQuery IO 要求 BYTES 資料類型的值使用 base64 編碼。當從 BigQuery 讀取位元組時,它們會以 base64 編碼的位元組形式傳回。
從 BigQuery 讀取
BigQueryIO 允許您從 BigQuery 表格讀取資料,或執行 SQL 查詢並讀取結果。預設情況下,當您套用 BigQueryIO 讀取轉換時,Beam 會調用 BigQuery 匯出請求。但是,適用於 Java 的 Beam SDK 也支援使用 BigQuery Storage Read API 直接從 BigQuery 儲存空間讀取。有關詳細資訊,請參閱 使用 Storage Read API。
適用於 Java 的 Beam SDK 有兩種 BigQueryIO 讀取方法。這兩種方法都允許您從表格讀取資料,或使用查詢字串讀取欄位。
read(SerializableFunction)
讀取 Avro 格式的記錄,並使用指定的剖析函式將它們剖析為自訂類型物件的PCollection
。PCollection
中的每個元素代表表格中的單一列。使用查詢字串讀取的範例程式碼顯示如何使用read(SerializableFunction)
。readTableRows
傳回 BigQueryTableRow
物件的PCollection
。PCollection
中的每個元素代表表格中的單一列。TableRow
物件中的整數值會編碼為字串,以符合 BigQuery 的匯出 JSON 格式。此方法很方便,但與read(SerializableFunction)
相比,效能可能會慢 2-3 倍。從表格讀取的範例程式碼顯示如何使用readTableRows
。
注意:自 Beam SDK 2.2.0 起,BigQueryIO.read()
已被棄用。請改用 read(SerializableFunction<SchemaAndRecord, T>)
將 Avro GenericRecord
中的 BigQuery 列剖析為您的自訂類型,或使用 readTableRows()
將它們剖析為 JSON TableRow
物件。
若要使用適用於 Python 的 Beam SDK 從 BigQuery 表格讀取資料,請套用 ReadFromBigQuery
轉換。ReadFromBigQuery
傳回字典的 PCollection
,其中 PCollection
中的每個元素代表表格中的單一列。TableRow
物件中的整數值會編碼為字串,以符合 BigQuery 的匯出 JSON 格式。
注意:自 Beam SDK 2.25.0 起,BigQuerySource()
已被棄用。在 2.25.0 之前,若要使用 Beam SDK 從 BigQuery 表格讀取資料,請在 BigQuerySource
上套用 Read
轉換。例如,beam.io.Read(beam.io.BigQuerySource(table_spec))
。
從資料表讀取
若要讀取整個 BigQuery 表格,請搭配 BigQuery 表格名稱使用 from
方法。此範例使用 readTableRows
。
若要讀取整個 BigQuery 表格,請搭配 BigQuery 表格名稱使用 table
參數。
以下程式碼讀取包含氣象站資料的整個表格,然後擷取 max_temperature
欄。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTable {
public static PCollection<MyData> readFromTable(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
使用查詢字串讀取
如果您不想讀取整個表格,可以使用 fromQuery
方法提供查詢字串。
如果您不想讀取整個表格,可以透過指定 query
參數,將查詢字串提供給 ReadFromBigQuery
。
以下程式碼使用 SQL 查詢僅讀取 max_temperature
欄。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQuery {
public static PCollection<MyData> readFromQuery(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows()
.fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
.usingStandardSql())
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
您也可以使用 BigQuery 的標準 SQL 方言搭配查詢字串,如下列範例所示
max_temperatures = (
pipeline
| 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
query='SELECT max_temperature FROM '\
'`clouddataflow-readonly.samples.weather_stations`',
use_standard_sql=True)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
查詢執行專案
預設情況下,管道會在與管道相關聯的 Google Cloud 專案中執行查詢(如果是 Dataflow 執行器,則為管道執行的專案)。在某些情況下,查詢執行專案應與管道專案不同。如果您使用 Java SDK,則可以透過將管道選項「bigQueryProject」設定為所需的 Google Cloud 專案 ID 來定義查詢執行專案。
使用 Storage Read API
BigQuery Storage API 允許您直接存取 BigQuery 儲存空間中的表格,並支援諸如欄選取和述詞篩選下推等功能,這些功能可以實現更有效率的管道執行。
適用於 Java 的 Beam SDK 支援在從 BigQuery 讀取資料時使用 BigQuery Storage API。2.25.0 之前的 SDK 版本支援 BigQuery Storage API 作為實驗性功能,並使用 pre-GA BigQuery Storage API 介面。呼叫者應將使用 BigQuery Storage API 的管道遷移至使用 SDK 2.25.0 或更新版本。
適用於 Python 的 Beam SDK 支援 BigQuery Storage API。透過將 method=DIRECT_READ
作為參數傳遞給 ReadFromBigQuery
來啟用它。
更新程式碼
當您從表格讀取資料時,請使用以下方法
- 必要:指定 withMethod(Method.DIRECT_READ) 以使用 BigQuery Storage API 進行讀取操作。
- 選用:若要使用諸如 欄投影和欄篩選等功能,您必須分別指定 withSelectedFields 和 withRowRestriction。
以下程式碼片段從表格讀取資料。此範例來自 BigQueryTornadoes 範例。當範例的讀取方法選項設定為 DIRECT_READ
時,管道會使用 BigQuery Storage API 和欄投影從 BigQuery 表格讀取氣象資料的公用範例。您可以查看 GitHub 上的完整原始程式碼。
import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTableWithBigQueryStorageAPI {
public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.from(String.format("%s:%s.%s", project, dataset, table))
.withMethod(Method.DIRECT_READ)
.withSelectedFields(
Arrays.asList(
"string_field",
"int64_field",
"float64_field",
"numeric_field",
"bool_field",
"bytes_field",
"date_field",
"datetime_field",
"time_field",
"timestamp_field",
"geography_field",
"array_field",
"struct_field")))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
以下程式碼片段使用查詢字串讀取資料。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQueryWithBigQueryStorageAPI {
public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
String project, String dataset, String table, String query, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
/*
String query = String.format("SELECT\n" +
" string_field,\n" +
" int64_field,\n" +
" float64_field,\n" +
" numeric_field,\n" +
" bool_field,\n" +
" bytes_field,\n" +
" date_field,\n" +
" datetime_field,\n" +
" time_field,\n" +
" timestamp_field,\n" +
" geography_field,\n" +
" array_field,\n" +
" struct_field\n" +
"FROM\n" +
" `%s:%s.%s`", project, dataset, table)
*/
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
寫入 BigQuery
BigQueryIO 允許您寫入 BigQuery 表格。如果您使用適用於 Java 的 Beam SDK,則可以將不同的列寫入不同的表格。適用於 Java 的 Beam SDK 也支援使用 BigQuery Storage Write API 直接寫入 BigQuery 儲存空間。有關詳細資訊,請參閱 使用 Storage Write API。
當您套用寫入轉換時,必須為目標表格提供以下資訊
- 表格名稱。
- 目標表格的建立處置。建立處置指定目標表格必須存在或可以由寫入操作建立。
- 目標表格的寫入處置。寫入處置指定您寫入的資料是否取代現有表格、將列附加到現有表格,或僅寫入空白表格。
此外,如果您的寫入操作建立新的 BigQuery 表格,您還必須提供目標表格的表格結構描述。
建立處置
建立處置控制您的 BigQuery 寫入操作是否應該在目標表格不存在時建立表格。
使用 .withCreateDisposition
指定建立處置。有效的列舉值為
Write.CreateDisposition.CREATE_IF_NEEDED
:指定如果表格不存在,則寫入操作應建立新表格。如果您使用此值,則必須使用withSchema
方法提供表格結構描述。CREATE_IF_NEEDED
是預設行為。Write.CreateDisposition.CREATE_NEVER
:指定永遠不應建立表格。如果目標表格不存在,則寫入操作會失敗。
使用 create_disposition
參數指定建立處置。有效的列舉值為
BigQueryDisposition.CREATE_IF_NEEDED
:指定如果表格不存在,則寫入操作應建立新表格。如果您使用此值,則必須提供表格結構描述。CREATE_IF_NEEDED
是預設行為。BigQueryDisposition.CREATE_NEVER
:指定永遠不應建立表格。如果目標表格不存在,則寫入操作會失敗。
如果您將 CREATE_IF_NEEDED
指定為建立處置,並且沒有提供表格結構描述,則如果目標表格不存在,轉換可能會在執行階段失敗。
寫入處置
寫入處置控制您的 BigQuery 寫入操作如何套用至現有表格。
使用 .withWriteDisposition
指定寫入處置。有效的列舉值為
Write.WriteDisposition.WRITE_EMPTY
:指定如果目標表格不為空,則寫入操作應在執行階段失敗。WRITE_EMPTY
是預設行為。Write.WriteDisposition.WRITE_TRUNCATE
:指定寫入操作應取代現有表格。目標表格中的任何現有列都會被移除,且新的列會新增至表格。Write.WriteDisposition.WRITE_APPEND
:指定寫入操作應將列附加到現有表格的末尾。
使用 write_disposition
參數指定寫入處置。有效的列舉值為
BigQueryDisposition.WRITE_EMPTY
:指定如果目標表格不為空,則寫入操作應在執行階段失敗。WRITE_EMPTY
是預設行為。BigQueryDisposition.WRITE_TRUNCATE
:指定寫入操作應取代現有表格。目標表格中的任何現有列都會被移除,且新的列會新增至表格。BigQueryDisposition.WRITE_APPEND
:指定寫入操作應將列附加到現有表格的末尾。
當您使用 WRITE_EMPTY
時,對目標表格是否為空的檢查可以在實際寫入操作之前進行。此檢查不保證您的管道將具有對表格的獨佔存取權。兩個同時寫入具有 WRITE_EMPTY
寫入處置的相同輸出表格的管道可能會成功啟動,但兩個管道都可能會在稍後發生寫入嘗試時失敗。
建立資料表結構描述
如果您的 BigQuery 寫入操作建立新表格,您必須提供結構描述資訊。結構描述包含有關表格中每個欄位的資訊。當使用新結構描述更新管道時,現有結構描述欄位必須保持相同的順序,否則管道將會中斷,且無法寫入 BigQuery。
若要在 Java 中建立表格結構描述,您可以使用 TableSchema
物件,也可以使用包含 JSON 序列化 TableSchema
物件的字串。
若要在 Python 中建立表格結構描述,您可以使用 TableSchema
物件,也可以使用定義欄位清單的字串。基於單一字串的結構描述不支援巢狀欄位、重複欄位,或為欄位指定 BigQuery 模式(該模式始終設定為 NULLABLE
)。
使用 TableSchema
若要建立並將表格結構描述作為 TableSchema
物件使用,請依照下列步驟操作。
建立一個
TableFieldSchema
物件的清單。每個TableFieldSchema
物件代表表格中的一個欄位。建立一個
TableSchema
物件,並使用setFields
方法指定您的欄位清單。當您套用寫入轉換時,使用
withSchema
方法提供您的表格結構描述。
建立一個
TableSchema
物件。為您表格中的每個欄位建立並附加一個
TableFieldSchema
物件。當您套用寫入轉換時,使用
schema
參數提供您的表格結構描述。將參數的值設定為TableSchema
物件。
以下範例程式碼示範如何為具有兩個字串類型欄位(來源和引述)的表格建立 TableSchema
。
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
class BigQuerySchemaCreate {
public static TableSchema createSchema() {
// To learn more about BigQuery schemas:
// https://cloud.google.com/bigquery/docs/schemas
TableSchema schema =
new TableSchema()
.setFields(
Arrays.asList(
new TableFieldSchema()
.setName("string_field")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("int64_field")
.setType("INT64")
.setMode("NULLABLE"),
new TableFieldSchema()
.setName("float64_field")
.setType("FLOAT64"), // default mode is "NULLABLE"
new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
new TableFieldSchema().setName("bool_field").setType("BOOL"),
new TableFieldSchema().setName("bytes_field").setType("BYTES"),
new TableFieldSchema().setName("date_field").setType("DATE"),
new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
new TableFieldSchema().setName("time_field").setType("TIME"),
new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
new TableFieldSchema()
.setName("array_field")
.setType("INT64")
.setMode("REPEATED")
.setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
new TableFieldSchema()
.setName("struct_field")
.setType("STRUCT")
.setDescription(
"A STRUCT accepts a custom data class, the fields must match the custom class fields.")
.setFields(
Arrays.asList(
new TableFieldSchema().setName("string_value").setType("STRING"),
new TableFieldSchema().setName("int64_value").setType("INT64")))));
return schema;
}
}
使用字串
若要建立並使用包含 JSON 序列化 TableSchema
物件的字串作為表格結構描述,請按照下列步驟操作。
建立一個包含 JSON 序列化
TableSchema
物件的字串。當您套用寫入轉換時,使用
withJsonSchema
方法提供您的表格結構描述。
若要建立並使用字串作為表格結構描述,請按照下列步驟操作。
建立一個單一逗號分隔的字串,格式為「field1:type1,field2:type2,field3:type3」,定義欄位清單。類型應指定欄位的 BigQuery 類型。
當您套用寫入轉換時,使用
schema
參數提供您的表格結構描述。將參數的值設定為字串。
以下範例示範如何使用字串指定與先前範例相同的表格結構描述。
設定插入方法
BigQueryIO 支援兩種將資料插入 BigQuery 的方法:載入工作和串流插入。每種插入方法在成本、配額和資料一致性方面都有不同的權衡。有關這些權衡的更多資訊,請參閱 BigQuery 文件中的不同資料擷取選項(特別是載入工作和串流插入)。
BigQueryIO 會根據輸入的 PCollection
選擇預設的插入方法。您可以使用 withMethod
指定所需的插入方法。請參閱Write.Method
,以取得可用方法及其限制的清單。
BigQueryIO 會根據輸入的 PCollection
選擇預設的插入方法。您可以使用 method
指定所需的插入方法。請參閱WriteToBigQuery
,以取得可用方法及其限制的清單。
在下列情況下,BigQueryIO 會使用載入工作
- 當您將 BigQueryIO 寫入轉換套用到有界的
PCollection
時。 - 當您使用
BigQueryIO.write().withMethod(FILE_LOADS)
指定載入工作作為插入方法時。
- 當您將 BigQueryIO 寫入轉換套用到有界的
PCollection
時。 - 當您使用
WriteToBigQuery(method='FILE_LOADS')
指定載入工作作為插入方法時。
注意: 如果您在串流管線中使用批次載入
您必須使用 withTriggeringFrequency
指定啟動載入工作的觸發頻率。請小心設定頻率,確保您的管線不會超過 BigQuery 載入工作的配額限制。
您可以使用 withNumFileShards
明確設定寫入的檔案分片數量,或使用 withAutoSharding
啟用動態分片(從 2.29.0 版本開始),並且可以在執行時判斷和變更分片數量。分片行為取決於執行器。
您必須使用 triggering_frequency
指定啟動載入工作的觸發頻率。請小心設定頻率,確保您的管線不會超過 BigQuery 載入工作的配額限制。
您可以設定 with_auto_sharding=True
以啟用動態分片(從 2.29.0 版本開始)。可以在執行時判斷和變更分片數量。分片行為取決於執行器。
在下列情況下,BigQueryIO 會使用串流插入
- 當您將 BigQueryIO 寫入轉換套用到無界的
PCollection
時。 - 當您使用
BigQueryIO.write().withMethod(STREAMING_INSERTS)
指定串流插入作為插入方法時。
- 當您將 BigQueryIO 寫入轉換套用到無界的
PCollection
時。 - 當您使用
WriteToBigQuery(method='STREAMING_INSERTS')
指定串流插入作為插入方法時。
注意: 依預設,串流插入會啟用 BigQuery 盡力重複資料刪除機制。您可以設定 ignoreInsertIds
來停用該機制。啟用與停用重複資料刪除時,配額限制有所不同。
串流插入會為每個表格目的地套用預設分片。您可以使用 withAutoSharding
(從 2.28.0 版本開始)啟用動態分片,並且可以在執行時判斷和變更分片數量。分片行為取決於執行器。
注意: 依預設,串流插入會啟用 BigQuery 盡力重複資料刪除機制。您可以設定 ignore_insert_ids=True
來停用該機制。啟用與停用重複資料刪除時,配額限制有所不同。
串流插入會為每個表格目的地套用預設分片。您可以設定 with_auto_sharding=True
(從 2.29.0 版本開始)啟用動態分片。可以在執行時判斷和變更分片數量。分片行為取決於執行器。
寫入資料表
若要寫入 BigQuery 表格,請套用 writeTableRows
或 write
轉換。
若要寫入 BigQuery 表格,請套用 WriteToBigQuery
轉換。WriteToBigQuery
支援批次模式和串流模式。您必須將轉換套用到字典的 PCollection
。一般來說,您需要使用另一個轉換(例如 ParDo
)將您的輸出資料格式化為集合。
以下範例使用這個包含引述的 PCollection
。
writeTableRows
方法會將 BigQuery TableRow
物件的 PCollection
寫入 BigQuery 表格。PCollection
中的每個元素代表表格中的單一資料列。此範例使用 writeTableRows
將元素寫入 PCollection<TableRow>
。寫入作業會建立表格(如果需要)。如果表格已存在,則會被取代。
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;
class BigQueryWriteToTable {
public static void writeToTable(
String project,
String dataset,
String table,
TableSchema schema,
PCollection<TableRow> rows) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// TableSchema schema = new TableSchema().setFields(Arrays.asList(...));
// Pipeline pipeline = Pipeline.create();
// PCollection<TableRow> rows = ...
rows.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withSchema(schema)
// For CreateDisposition:
// - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
// required
// - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
// For WriteDisposition:
// - WRITE_EMPTY (default): raises an error if the table is not empty
// - WRITE_APPEND: appends new rows to existing rows
// - WRITE_TRUNCATE: deletes the existing rows before writing
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
// pipeline.run().waitUntilFinish();
}
}
以下範例程式碼示範如何套用 WriteToBigQuery
轉換,將字典的 PCollection
寫入 BigQuery 表格。寫入作業會建立表格(如果需要)。如果表格已存在,則會被取代。
write
轉換會將自訂類型物件的 PCollection
寫入 BigQuery 表格。使用 .withFormatFunction(SerializableFunction)
提供格式化函式,將 PCollection
中的每個輸入元素轉換為 TableRow
。此範例使用 write
寫入 PCollection<String>
。寫入作業會建立表格(如果需要)。如果表格已存在,則會被取代。
quotes.apply(
BigQueryIO.<Quote>write()
.to(tableSpec)
.withSchema(tableSchema)
.withFormatFunction(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
當您使用串流插入時,您可以決定如何處理失敗的記錄。您可以選擇持續重試,或使用 WriteResult.getFailedInserts()
方法將失敗的記錄傳回至個別的 PCollection
。
使用 Storage Write API
從 Beam SDK for Java 的 2.36.0 版開始,您可以使用 BigQueryIO 連接器的 BigQuery Storage Write API。
此外,在 Beam SDK for Python 的 2.47.0 版之後,SDK 支援 BigQuery Storage Write API。
BigQuery Storage Write API for Python SDK 目前在支援的資料類型方面有一些限制。由於此方法使用跨語言轉換,因此我們僅限於跨語言界限支援的類型。例如,需要 apache_beam.utils.timestamp.Timestamp
來寫入 TIMESTAMP
BigQuery 類型。此外,某些類型(例如 DATETIME
)尚未支援。如需更多詳細資訊,請參閱完整類型對應。
注意:如果您想從原始碼使用 Storage Write API 執行 WriteToBigQuery,您需要執行 ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
來建置 expansion-service jar。如果您是從已發行的 Beam SDK 執行,則 jar 已包含在其中。
完全一次語義
若要使用 Storage Write API 寫入 BigQuery,請將 withMethod
設定為 Method.STORAGE_WRITE_API
。以下範例轉換使用 Storage Write API 和完全一次語意寫入 BigQuery
如果您想要變更 BigQueryIO 的行為,讓您管線的所有 BigQuery 接收器預設都使用 Storage Write API,請設定 UseStorageWriteApi
選項。
如果您的管線需要建立表格(在表格不存在且您將建立處置指定為 CREATE_IF_NEEDED
的情況下),您必須提供表格結構描述。API 會使用結構描述驗證資料並將其轉換為二進位協定。
對於串流管線,您需要設定兩個額外參數:串流數量和觸發頻率。
串流數量定義 BigQueryIO 寫入轉換的平行處理度,並且大致對應於管線使用的 Storage Write API 串流數量。您可以使用 withNumStorageWriteApiStreams
在轉換上明確設定,或將 numStorageWriteApiStreams
選項提供給管線(如 BigQueryOptions
中所定義)。請注意,這僅支援串流管線。
觸發頻率決定資料在 BigQuery 中可供查詢的時間。您可以透過 withTriggeringFrequency
明確設定,或透過設定 storageWriteApiTriggeringFrequencySec
選項來指定秒數。
這兩個參數的組合會影響 BigQueryIO 在呼叫 Storage Write API 之前建立的資料列批次大小。將頻率設定得太高可能會導致批次較小,這會影響效能。一般而言,單一串流應能處理至少每秒 1Mb 的輸送量。建立獨佔串流對 BigQuery 服務而言是一項昂貴的操作,因此您應僅使用您用例所需的串流數量。對於大多數管線而言,觸發頻率設定在個位數秒數是一個不錯的選擇。
與串流插入類似,STORAGE_WRITE_API
支援動態決定寫入 BigQuery 的平行串流數量(從 2.42.0 開始)。您可以使用 withAutoSharding
來明確啟用此功能。
當 numStorageWriteApiStreams
設定為 0 或未指定時,STORAGE_WRITE_API
預設為動態分片。
使用 STORAGE_WRITE_API
時,WriteResult.getFailedStorageApiInserts
傳回的 PCollection
會包含無法寫入 Storage Write API sink 的資料列。
至少一次語義
如果您的用例允許目標表格中存在潛在的重複記錄,您可以使用 STORAGE_API_AT_LEAST_ONCE
方法。此方法不會將要寫入 BigQuery 的記錄保存到其 shuffle 儲存中,而 shuffle 儲存是提供 STORAGE_WRITE_API
方法的精確一次語意所必需的。因此,對於大多數管線而言,使用此方法通常成本較低且延遲較短。如果您使用 STORAGE_API_AT_LEAST_ONCE
,您不需要指定串流數量,也無法指定觸發頻率。
自動分片不適用於 STORAGE_API_AT_LEAST_ONCE
。
使用 STORAGE_API_AT_LEAST_ONCE
時,WriteResult.getFailedStorageApiInserts
傳回的 PCollection
會包含無法寫入 Storage Write API sink 的資料列。
配額
在使用 Storage Write API 之前,請注意 BigQuery Storage Write API 配額。
使用動態目的地
您可以使用動態目的地功能,將 PCollection
中的元素寫入不同的 BigQuery 表格,這些表格可能具有不同的結構描述。
動態目的地功能會根據使用者定義的目的地鍵來分組您的使用者類型,使用該鍵來計算目的地表格和/或結構描述,並將每個群組的元素寫入計算出的目的地。
此外,您也可以寫入您自己的具有 TableRow
對應函式的類型,並且您可以在所有 DynamicDestinations
方法中使用側輸入。
若要使用動態目的地,您必須建立 DynamicDestinations
物件並實作下列方法:
getDestination
:傳回一個物件,getTable
和getSchema
可以使用該物件作為目的地鍵,以計算目的地表格和/或結構描述。getTable
:傳回目的地鍵的表格(作為TableDestination
物件)。此方法必須為每個唯一的目的地傳回唯一的表格。getSchema
:傳回目的地鍵的表格結構描述(作為TableSchema
物件)。
然後,使用您的 DynamicDestinations
物件呼叫 write().to
。此範例使用包含天氣資料的 PCollection
,並將資料寫入每年不同的表格中。
/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
final long year;
final long month;
final long day;
final double maxTemp;
public WeatherData() {
this.year = 0;
this.month = 0;
this.day = 0;
this.maxTemp = 0.0f;
}
public WeatherData(long year, long month, long day, double maxTemp) {
this.year = year;
this.month = month;
this.day = day;
this.maxTemp = maxTemp;
}
}
*/
PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> {
GenericRecord record = elem.getRecord();
return new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature"));
})
.fromQuery(
"SELECT year, month, day, max_temperature "
+ "FROM [apache-beam-testing.samples.weather_stations] "
+ "WHERE year BETWEEN 2007 AND 2009")
.withCoder(AvroCoder.of(WeatherData.class)));
// We will send the weather data into different tables for every year.
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(
new DynamicDestinations<WeatherData, Long>() {
@Override
public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
return elem.getValue().year;
}
@Override
public TableDestination getTable(Long destination) {
return new TableDestination(
new TableReference()
.setProjectId(writeProject)
.setDatasetId(writeDataset)
.setTableId(writeTable + "_" + destination),
"Table for year " + destination);
}
@Override
public TableSchema getSchema(Long destination) {
return new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("year")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("month")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("day")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("maxTemp")
.setType("FLOAT")
.setMode("NULLABLE")));
}
})
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
('Obi Wan Kenobi', True)]))
def table_fn(element, fictional_characters):
if element in fictional_characters:
return 'my_dataset.fictional_quotes'
else:
return 'my_dataset.real_quotes'
quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
table_fn,
schema=table_schema,
table_side_inputs=(fictional_characters_view, ),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
使用時間分割
BigQuery 時間分割將您的表格分割成較小的分割區,這稱為 分割表格。分割表格使您可以更輕鬆地管理和查詢您的資料。
若要使用 BigQuery 時間分割,請使用以下兩種方法之一:
withTimePartitioning
:此方法會取得TimePartitioning
類別,並且僅在您寫入單一表格時才可使用。withJsonTimePartitioning
:此方法與withTimePartitioning
相同,但會取得 JSON 序列化的字串物件。
此範例會每天產生一個分割區。
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(tableSpec + "_partitioning")
.withSchema(tableSchema)
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
// NOTE: an existing table without time partitioning set up will not work
.withTimePartitioning(new TimePartitioning().setType("DAY"))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
限制
BigQueryIO 目前有以下限制:
您無法將 BigQuery 寫入的完成順序與管線的其他步驟排序。
如果您使用的是 Beam SDK for Python,當您寫入非常大的資料集時,您可能會遇到匯入大小配額問題。作為一種解決方法,您可以分割資料集(例如,使用 Beam 的
Partition
轉換)並寫入多個 BigQuery 表格。Beam SDK for Java 沒有此限制,因為它會為您分割資料集。當您將資料載入到 BigQuery 時,會套用這些限制。依預設,BigQuery 會使用共用的插槽集區來載入資料。這表示無法保證可用容量,您的載入可能會排隊,直到有可用的插槽。如果插槽在 6 小時內沒有變為可用,則載入會因 BigQuery 設定的限制而失敗。為了避免這種情況,強烈建議您使用BigQuery 預留,以確保您的載入不會因為容量問題而排隊和失敗。
其他範例
您可以在 Beam 的範例目錄中找到使用 BigQuery 的其他範例。
Java 食譜範例
這些範例來自 Java cookbook examples 目錄。
BigQueryTornadoes 從 BigQuery 讀取天氣資料的公開範例,計算每個月份發生的龍捲風數量,並將結果寫入 BigQuery 表格。
CombinePerKeyExamples 從 BigQuery 讀取公開的莎士比亞資料,對於資料集中超過給定長度的每個單字,產生一個包含該單字出現的劇名清單的字串。然後,管線會將結果寫入 BigQuery 表格。
FilterExamples 從 BigQuery 讀取天氣資料的公開範例,對資料執行投影,找出溫度讀數的整體平均值,篩選單一給定月份的讀數,並且僅輸出平均溫度小於導出整體平均值(針對該月份)的資料。
JoinExamples 從 BigQuery 讀取 GDELT「世界事件」的範例,並將事件
action
國家/地區代碼與將國家/地區代碼對應到國家/地區名稱的表格聯結。MaxPerKeyExamples 從 BigQuery 讀取天氣資料的公開範例,找出每個月的最高溫度,並將結果寫入 BigQuery 表格。
TriggerExample 對聖地牙哥高速公路的交通資料執行串流分析。管線會查看從文字檔案傳入的資料,並將結果寫入 BigQuery 表格。
Java 完整範例
這些範例來自 Java complete examples 目錄。
AutoComplete 計算每個字首最熱門的主題標籤,這些主題標籤可用於自動完成。管線可以選擇性地將結果寫入 BigQuery 表格。
StreamingWordExtract 讀取文字行,將每一行分割成個別的單字,將這些單字大寫,並將輸出寫入 BigQuery 表格。
TrafficMaxLaneFlow 讀取交通感測器資料,找出記錄流量最高的車道,並將結果寫入 BigQuery 表格。
TrafficRoutes 讀取交通感測器資料,計算每個視窗的平均速度,並尋找路線中的減速,並將結果寫入 BigQuery 表格。
Python 食譜範例
這些範例來自 Python cookbook examples 目錄。
BigQuery schema 使用巢狀和重複欄位建立
TableSchema
,產生具有巢狀和重複欄位的資料,並將資料寫入 BigQuery 表格。BigQuery side inputs 使用 BigQuery 來源作為側輸入。它說明如何以三種不同的形式將側輸入插入轉換中:作為單例、作為迭代器和作為列表。
BigQuery tornadoes 從一個 BigQuery 表格讀取,該表格的表格結構描述中包含「month」和「tornado」欄位,計算每個月的龍捲風數量,並將結果輸出到 BigQuery 表格。
BigQuery filters 從 BigQuery 表格讀取氣象站資料,在記憶體中操作 BigQuery 資料列,並將結果寫入 BigQuery 表格。
上次更新日期:2024/10/31
您是否已找到您要尋找的所有內容?
這一切是否有用且清楚?是否有任何您想要變更的地方?請告訴我們!