內建 I/O 轉換

Google BigQuery I/O 連接器

Beam SDK 包含內建的轉換,可以從 Google BigQuery 資料表讀取資料並將資料寫入其中。

開始之前

若要使用 BigQueryIO,請將 Maven 構件相依性新增至您的 pom.xml 檔案。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.60.0</version>
</dependency>

其他資源

若要使用 BigQueryIO,您必須執行 pip install apache-beam[gcp] 來安裝 Google Cloud Platform 相依性。

其他資源

BigQuery 基礎

資料表名稱

若要從 BigQuery 資料表讀取或寫入,您必須提供完整限定的 BigQuery 資料表名稱(例如,bigquery-public-data:github_repos.sample_contents)。完整限定的 BigQuery 資料表名稱由三個部分組成

如果您使用時間分割資料表,資料表名稱也可以包含資料表裝飾器

若要指定 BigQuery 資料表,您可以使用資料表的完整限定名稱作為字串,或使用 TableReference TableReference 物件。

使用字串

若要使用字串指定資料表,請使用 [project_id]:[dataset_id].[table_id] 格式指定完整限定的 BigQuery 資料表名稱。

String tableSpec = "apache-beam-testing.samples.weather_stations";
# project-id:dataset_id.table_id
table_spec = 'apache-beam-testing.samples.weather_stations'

您也可以省略 project_id 並使用 [dataset_id].[table_id] 格式。如果您省略專案 ID,Beam 會使用管道選項中的預設專案 ID。管道選項

String tableSpec = "samples.weather_stations";
# dataset_id.table_id
table_spec = 'samples.weather_stations'

使用 TableReference

若要使用 TableReference 指定資料表,請使用 BigQuery 資料表名稱的三個部分建立新的 TableReference

TableReference tableSpec =
    new TableReference()
        .setProjectId("clouddataflow-readonly")
        .setDatasetId("samples")
        .setTableId("weather_stations");
from apache_beam.io.gcp.internal.clients import bigquery

table_spec = bigquery.TableReference(
    projectId='clouddataflow-readonly',
    datasetId='samples',
    tableId='weather_stations')

適用於 Java 的 Beam SDK 也提供 parseTableSpec 協助程式方法,該方法會從包含完整限定 BigQuery 資料表名稱的字串建構 TableReference 物件。但是,BigQueryIO 轉換的靜態工廠方法接受資料表名稱作為字串,並為您建構 TableReference 物件。

資料表列

BigQueryIO 讀取和寫入轉換會產生和使用資料,作為字典的 PCollection,其中 PCollection 中的每個元素都代表資料表中的單一列。

結構描述

在寫入 BigQuery 時,您必須為要寫入的目的地資料表提供資料表結構描述,除非您指定 CREATE_NEVER建立處置建立資料表結構描述更詳細地說明結構描述。

資料類型

BigQuery 支援下列資料類型:STRING、BYTES、INTEGER、FLOAT、NUMERIC、BOOLEAN、TIMESTAMP、DATE、TIME、DATETIME 和 GEOGRAPHY。如需 Google Standard SQL 資料類型的概述,請參閱資料類型。BigQueryIO 允許您使用所有這些資料類型。以下範例顯示從 BigQuery 讀取和寫入時使用的資料類型的正確格式

import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class BigQueryTableRowCreate {
  public static TableRow createTableRow() {
    TableRow row =
        new TableRow()
            // To learn more about BigQuery data types:
            // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
            .set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
            .set("int64_field", 432)
            .set("float64_field", 3.141592653589793)
            .set("numeric_field", new BigDecimal("1234.56").toString())
            .set("bool_field", true)
            .set(
                "bytes_field",
                Base64.getEncoder()
                    .encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))

            // To learn more about date formatting:
            // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
            .set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
            .set(
                "datetime_field",
                LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
            .set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
            .set(
                "timestamp_field",
                Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT

            // To learn more about the geography Well-Known Text (WKT) format:
            // https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
            .set("geography_field", "POINT(30 10)")

            // An array has its mode set to REPEATED.
            .set("array_field", Arrays.asList(1, 2, 3, 4))

            // Any class can be written as a STRUCT as long as all the fields in the
            // schema are present and they are encoded correctly as BigQuery types.
            .set(
                "struct_field",
                Stream.of(
                        new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
                        new SimpleEntry<>("int64_value", "42"))
                    .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
    return row;
  }
}
bigquery_data = [{
    'string': 'abc',
    'bytes': base64.b64encode(b'\xab\xac'),
    'integer': 5,
    'float': 0.5,
    'numeric': Decimal('5'),
    'boolean': True,
    'timestamp': '2018-12-31 12:44:31.744957 UTC',
    'date': '2018-12-31',
    'time': '12:44:31',
    'datetime': '2018-12-31T12:44:31',
    'geography': 'POINT(30 10)'
}]

自 Beam 2.7.0 起,支援 NUMERIC 資料類型。此資料類型支援高精確度的十進位數字(精確度為 38 位數,小數位數為 9 位數)。GEOGRAPHY 資料類型可與 Well-Known Text 搭配使用(請參閱 https://en.wikipedia.org/wiki/Well-known_text 格式,以在 BigQuery 中讀取和寫入)。當寫入 BigQuery 時,BigQuery IO 要求 BYTES 資料類型的值使用 base64 編碼。當從 BigQuery 讀取位元組時,它們會以 base64 編碼的字串形式傳回。

自 Beam 2.7.0 起,支援 NUMERIC 資料類型。此資料類型支援高精確度的十進位數字(精確度為 38 位數,小數位數為 9 位數)。GEOGRAPHY 資料類型可與 Well-Known Text 搭配使用(請參閱 https://en.wikipedia.org/wiki/Well-known_text 格式,以在 BigQuery 中讀取和寫入)。當寫入 BigQuery 時,BigQuery IO 要求 BYTES 資料類型的值使用 base64 編碼。當從 BigQuery 讀取位元組時,它們會以 base64 編碼的位元組形式傳回。

從 BigQuery 讀取

BigQueryIO 允許您從 BigQuery 表格讀取資料,或執行 SQL 查詢並讀取結果。預設情況下,當您套用 BigQueryIO 讀取轉換時,Beam 會調用 BigQuery 匯出請求。但是,適用於 Java 的 Beam SDK 也支援使用 BigQuery Storage Read API 直接從 BigQuery 儲存空間讀取。有關詳細資訊,請參閱 使用 Storage Read API

Beam 對 BigQuery API 的使用受 BigQuery 的 配額定價 政策約束。

適用於 Java 的 Beam SDK 有兩種 BigQueryIO 讀取方法。這兩種方法都允許您從表格讀取資料,或使用查詢字串讀取欄位。

  1. read(SerializableFunction) 讀取 Avro 格式的記錄,並使用指定的剖析函式將它們剖析為自訂類型物件的 PCollectionPCollection 中的每個元素代表表格中的單一列。使用查詢字串讀取的範例程式碼顯示如何使用 read(SerializableFunction)

  2. readTableRows 傳回 BigQuery TableRow 物件的 PCollectionPCollection 中的每個元素代表表格中的單一列。TableRow 物件中的整數值會編碼為字串,以符合 BigQuery 的匯出 JSON 格式。此方法很方便,但與 read(SerializableFunction) 相比,效能可能會慢 2-3 倍。從表格讀取的範例程式碼顯示如何使用 readTableRows

注意:自 Beam SDK 2.2.0 起,BigQueryIO.read() 已被棄用。請改用 read(SerializableFunction<SchemaAndRecord, T>) 將 Avro GenericRecord 中的 BigQuery 列剖析為您的自訂類型,或使用 readTableRows() 將它們剖析為 JSON TableRow 物件。

若要使用適用於 Python 的 Beam SDK 從 BigQuery 表格讀取資料,請套用 ReadFromBigQuery 轉換。ReadFromBigQuery 傳回字典的 PCollection,其中 PCollection 中的每個元素代表表格中的單一列。TableRow 物件中的整數值會編碼為字串,以符合 BigQuery 的匯出 JSON 格式。

注意:自 Beam SDK 2.25.0 起,BigQuerySource() 已被棄用。在 2.25.0 之前,若要使用 Beam SDK 從 BigQuery 表格讀取資料,請在 BigQuerySource 上套用 Read 轉換。例如,beam.io.Read(beam.io.BigQuerySource(table_spec))

從資料表讀取

若要讀取整個 BigQuery 表格,請搭配 BigQuery 表格名稱使用 from 方法。此範例使用 readTableRows

若要讀取整個 BigQuery 表格,請搭配 BigQuery 表格名稱使用 table 參數。

以下程式碼讀取包含氣象站資料的整個表格,然後擷取 max_temperature 欄。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTable {
  public static PCollection<MyData> readFromTable(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

使用查詢字串讀取

如果您不想讀取整個表格,可以使用 fromQuery 方法提供查詢字串。

如果您不想讀取整個表格,可以透過指定 query 參數,將查詢字串提供給 ReadFromBigQuery

以下程式碼使用 SQL 查詢僅讀取 max_temperature 欄。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQuery {
  public static PCollection<MyData> readFromQuery(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows()
                    .fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
                    .usingStandardSql())
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'QueryTable' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '[apache-beam-testing.samples.weather_stations]')
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

您也可以使用 BigQuery 的標準 SQL 方言搭配查詢字串,如下列範例所示

PCollection<Double> maxTemperatures =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
            .fromQuery(
                "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
            .usingStandardSql()
            .withCoder(DoubleCoder.of()));
max_temperatures = (
    pipeline
    | 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '`clouddataflow-readonly.samples.weather_stations`',
        use_standard_sql=True)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

查詢執行專案

預設情況下,管道會在與管道相關聯的 Google Cloud 專案中執行查詢(如果是 Dataflow 執行器,則為管道執行的專案)。在某些情況下,查詢執行專案應與管道專案不同。如果您使用 Java SDK,則可以透過將管道選項「bigQueryProject」設定為所需的 Google Cloud 專案 ID 來定義查詢執行專案。

使用 Storage Read API

BigQuery Storage API 允許您直接存取 BigQuery 儲存空間中的表格,並支援諸如欄選取和述詞篩選下推等功能,這些功能可以實現更有效率的管道執行。

適用於 Java 的 Beam SDK 支援在從 BigQuery 讀取資料時使用 BigQuery Storage API。2.25.0 之前的 SDK 版本支援 BigQuery Storage API 作為實驗性功能,並使用 pre-GA BigQuery Storage API 介面。呼叫者應將使用 BigQuery Storage API 的管道遷移至使用 SDK 2.25.0 或更新版本。

適用於 Python 的 Beam SDK 支援 BigQuery Storage API。透過將 method=DIRECT_READ 作為參數傳遞給 ReadFromBigQuery 來啟用它。

更新程式碼

當您從表格讀取資料時,請使用以下方法

以下程式碼片段從表格讀取資料。此範例來自 BigQueryTornadoes 範例。當範例的讀取方法選項設定為 DIRECT_READ 時,管道會使用 BigQuery Storage API 和欄投影從 BigQuery 表格讀取氣象資料的公用範例。您可以查看 GitHub 上的完整原始程式碼

import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTableWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .from(String.format("%s:%s.%s", project, dataset, table))
                    .withMethod(Method.DIRECT_READ)
                    .withSelectedFields(
                        Arrays.asList(
                            "string_field",
                            "int64_field",
                            "float64_field",
                            "numeric_field",
                            "bool_field",
                            "bytes_field",
                            "date_field",
                            "datetime_field",
                            "time_field",
                            "timestamp_field",
                            "geography_field",
                            "array_field",
                            "struct_field")))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTableWithStorageAPI' >> beam.io.ReadFromBigQuery(
        table=table_spec, method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
    | beam.Map(lambda elem: elem['max_temperature']))

以下程式碼片段使用查詢字串讀取資料。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQueryWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
      String project, String dataset, String table, String query, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    /*
    String query = String.format("SELECT\n" +
        "  string_field,\n" +
        "  int64_field,\n" +
        "  float64_field,\n" +
        "  numeric_field,\n" +
        "  bool_field,\n" +
        "  bytes_field,\n" +
        "  date_field,\n" +
        "  datetime_field,\n" +
        "  time_field,\n" +
        "  timestamp_field,\n" +
        "  geography_field,\n" +
        "  array_field,\n" +
        "  struct_field\n" +
        "FROM\n" +
        "  `%s:%s.%s`", project, dataset, table)
    */

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withMethod(Method.DIRECT_READ))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
# The SDK for Python does not support the BigQuery Storage API.

寫入 BigQuery

BigQueryIO 允許您寫入 BigQuery 表格。如果您使用適用於 Java 的 Beam SDK,則可以將不同的列寫入不同的表格。適用於 Java 的 Beam SDK 也支援使用 BigQuery Storage Write API 直接寫入 BigQuery 儲存空間。有關詳細資訊,請參閱 使用 Storage Write API

BigQueryIO 寫入轉換會使用受 BigQuery 的 配額定價 政策約束的 API。

當您套用寫入轉換時,必須為目標表格提供以下資訊

此外,如果您的寫入操作建立新的 BigQuery 表格,您還必須提供目標表格的表格結構描述。

建立處置

建立處置控制您的 BigQuery 寫入操作是否應該在目標表格不存在時建立表格。

使用 .withCreateDisposition 指定建立處置。有效的列舉值為

  • Write.CreateDisposition.CREATE_IF_NEEDED:指定如果表格不存在,則寫入操作應建立新表格。如果您使用此值,則必須使用 withSchema 方法提供表格結構描述。CREATE_IF_NEEDED 是預設行為。

  • Write.CreateDisposition.CREATE_NEVER:指定永遠不應建立表格。如果目標表格不存在,則寫入操作會失敗。

使用 create_disposition 參數指定建立處置。有效的列舉值為

  • BigQueryDisposition.CREATE_IF_NEEDED:指定如果表格不存在,則寫入操作應建立新表格。如果您使用此值,則必須提供表格結構描述。CREATE_IF_NEEDED 是預設行為。

  • BigQueryDisposition.CREATE_NEVER:指定永遠不應建立表格。如果目標表格不存在,則寫入操作會失敗。

如果您將 CREATE_IF_NEEDED 指定為建立處置,並且沒有提供表格結構描述,則如果目標表格不存在,轉換可能會在執行階段失敗。

寫入處置

寫入處置控制您的 BigQuery 寫入操作如何套用至現有表格。

使用 .withWriteDisposition 指定寫入處置。有效的列舉值為

  • Write.WriteDisposition.WRITE_EMPTY:指定如果目標表格不為空,則寫入操作應在執行階段失敗。WRITE_EMPTY 是預設行為。

  • Write.WriteDisposition.WRITE_TRUNCATE:指定寫入操作應取代現有表格。目標表格中的任何現有列都會被移除,且新的列會新增至表格。

  • Write.WriteDisposition.WRITE_APPEND:指定寫入操作應將列附加到現有表格的末尾。

使用 write_disposition 參數指定寫入處置。有效的列舉值為

  • BigQueryDisposition.WRITE_EMPTY:指定如果目標表格不為空,則寫入操作應在執行階段失敗。WRITE_EMPTY 是預設行為。

  • BigQueryDisposition.WRITE_TRUNCATE:指定寫入操作應取代現有表格。目標表格中的任何現有列都會被移除,且新的列會新增至表格。

  • BigQueryDisposition.WRITE_APPEND:指定寫入操作應將列附加到現有表格的末尾。

當您使用 WRITE_EMPTY 時,對目標表格是否為空的檢查可以在實際寫入操作之前進行。此檢查不保證您的管道將具有對表格的獨佔存取權。兩個同時寫入具有 WRITE_EMPTY 寫入處置的相同輸出表格的管道可能會成功啟動,但兩個管道都可能會在稍後發生寫入嘗試時失敗。

建立資料表結構描述

如果您的 BigQuery 寫入操作建立新表格,您必須提供結構描述資訊。結構描述包含有關表格中每個欄位的資訊。當使用新結構描述更新管道時,現有結構描述欄位必須保持相同的順序,否則管道將會中斷,且無法寫入 BigQuery。

若要在 Java 中建立表格結構描述,您可以使用 TableSchema 物件,也可以使用包含 JSON 序列化 TableSchema 物件的字串。

若要在 Python 中建立表格結構描述,您可以使用 TableSchema 物件,也可以使用定義欄位清單的字串。基於單一字串的結構描述不支援巢狀欄位、重複欄位,或為欄位指定 BigQuery 模式(該模式始終設定為 NULLABLE)。

使用 TableSchema

若要建立並將表格結構描述作為 TableSchema 物件使用,請依照下列步驟操作。

  1. 建立一個 TableFieldSchema 物件的清單。每個 TableFieldSchema 物件代表表格中的一個欄位。

  2. 建立一個 TableSchema 物件,並使用 setFields 方法指定您的欄位清單。

  3. 當您套用寫入轉換時,使用 withSchema 方法提供您的表格結構描述。

  1. 建立一個 TableSchema 物件。

  2. 為您表格中的每個欄位建立並附加一個 TableFieldSchema 物件。

  3. 當您套用寫入轉換時,使用 schema 參數提供您的表格結構描述。將參數的值設定為 TableSchema 物件。

以下範例程式碼示範如何為具有兩個字串類型欄位(來源和引述)的表格建立 TableSchema

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;

class BigQuerySchemaCreate {
  public static TableSchema createSchema() {
    // To learn more about BigQuery schemas:
    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema schema =
        new TableSchema()
            .setFields(
                Arrays.asList(
                    new TableFieldSchema()
                        .setName("string_field")
                        .setType("STRING")
                        .setMode("REQUIRED"),
                    new TableFieldSchema()
                        .setName("int64_field")
                        .setType("INT64")
                        .setMode("NULLABLE"),
                    new TableFieldSchema()
                        .setName("float64_field")
                        .setType("FLOAT64"), // default mode is "NULLABLE"
                    new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
                    new TableFieldSchema().setName("bool_field").setType("BOOL"),
                    new TableFieldSchema().setName("bytes_field").setType("BYTES"),
                    new TableFieldSchema().setName("date_field").setType("DATE"),
                    new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
                    new TableFieldSchema().setName("time_field").setType("TIME"),
                    new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
                    new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
                    new TableFieldSchema()
                        .setName("array_field")
                        .setType("INT64")
                        .setMode("REPEATED")
                        .setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
                    new TableFieldSchema()
                        .setName("struct_field")
                        .setType("STRUCT")
                        .setDescription(
                            "A STRUCT accepts a custom data class, the fields must match the custom class fields.")
                        .setFields(
                            Arrays.asList(
                                new TableFieldSchema().setName("string_value").setType("STRING"),
                                new TableFieldSchema().setName("int64_value").setType("INT64")))));
    return schema;
  }
}
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

使用字串

若要建立並使用包含 JSON 序列化 TableSchema 物件的字串作為表格結構描述,請按照下列步驟操作。

  1. 建立一個包含 JSON 序列化 TableSchema 物件的字串。

  2. 當您套用寫入轉換時,使用 withJsonSchema 方法提供您的表格結構描述。

若要建立並使用字串作為表格結構描述,請按照下列步驟操作。

  1. 建立一個單一逗號分隔的字串,格式為「field1:type1,field2:type2,field3:type3」,定義欄位清單。類型應指定欄位的 BigQuery 類型。

  2. 當您套用寫入轉換時,使用 schema 參數提供您的表格結構描述。將參數的值設定為字串。

以下範例示範如何使用字串指定與先前範例相同的表格結構描述。

String tableSchemaJson =
    ""
        + "{"
        + "  \"fields\": ["
        + "    {"
        + "      \"name\": \"source\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"NULLABLE\""
        + "    },"
        + "    {"
        + "      \"name\": \"quote\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"REQUIRED\""
        + "    }"
        + "  ]"
        + "}";
# column_name:BIGQUERY_TYPE, ...
table_schema = 'source:STRING, quote:STRING'

設定插入方法

BigQueryIO 支援兩種將資料插入 BigQuery 的方法:載入工作和串流插入。每種插入方法在成本、配額和資料一致性方面都有不同的權衡。有關這些權衡的更多資訊,請參閱 BigQuery 文件中的不同資料擷取選項(特別是載入工作串流插入)。

BigQueryIO 會根據輸入的 PCollection 選擇預設的插入方法。您可以使用 withMethod 指定所需的插入方法。請參閱Write.Method,以取得可用方法及其限制的清單。

BigQueryIO 會根據輸入的 PCollection 選擇預設的插入方法。您可以使用 method 指定所需的插入方法。請參閱WriteToBigQuery,以取得可用方法及其限制的清單。

在下列情況下,BigQueryIO 會使用載入工作

  • 當您將 BigQueryIO 寫入轉換套用到有界的 PCollection 時。
  • 當您使用 BigQueryIO.write().withMethod(FILE_LOADS) 指定載入工作作為插入方法時。
  • 當您將 BigQueryIO 寫入轉換套用到有界的 PCollection 時。
  • 當您使用 WriteToBigQuery(method='FILE_LOADS') 指定載入工作作為插入方法時。

注意: 如果您在串流管線中使用批次載入

您必須使用 withTriggeringFrequency 指定啟動載入工作的觸發頻率。請小心設定頻率,確保您的管線不會超過 BigQuery 載入工作的配額限制

您可以使用 withNumFileShards 明確設定寫入的檔案分片數量,或使用 withAutoSharding 啟用動態分片(從 2.29.0 版本開始),並且可以在執行時判斷和變更分片數量。分片行為取決於執行器。

您必須使用 triggering_frequency 指定啟動載入工作的觸發頻率。請小心設定頻率,確保您的管線不會超過 BigQuery 載入工作的配額限制

您可以設定 with_auto_sharding=True 以啟用動態分片(從 2.29.0 版本開始)。可以在執行時判斷和變更分片數量。分片行為取決於執行器。

在下列情況下,BigQueryIO 會使用串流插入

  • 當您將 BigQueryIO 寫入轉換套用到無界的 PCollection 時。
  • 當您使用 BigQueryIO.write().withMethod(STREAMING_INSERTS) 指定串流插入作為插入方法時。
  • 當您將 BigQueryIO 寫入轉換套用到無界的 PCollection 時。
  • 當您使用 WriteToBigQuery(method='STREAMING_INSERTS') 指定串流插入作為插入方法時。

注意: 依預設,串流插入會啟用 BigQuery 盡力重複資料刪除機制。您可以設定 ignoreInsertIds 來停用該機制。啟用與停用重複資料刪除時,配額限制有所不同。

串流插入會為每個表格目的地套用預設分片。您可以使用 withAutoSharding(從 2.28.0 版本開始)啟用動態分片,並且可以在執行時判斷和變更分片數量。分片行為取決於執行器。

注意: 依預設,串流插入會啟用 BigQuery 盡力重複資料刪除機制。您可以設定 ignore_insert_ids=True 來停用該機制。啟用與停用重複資料刪除時,配額限制有所不同。

串流插入會為每個表格目的地套用預設分片。您可以設定 with_auto_sharding=True(從 2.29.0 版本開始)啟用動態分片。可以在執行時判斷和變更分片數量。分片行為取決於執行器。

寫入資料表

若要寫入 BigQuery 表格,請套用 writeTableRowswrite 轉換。

若要寫入 BigQuery 表格,請套用 WriteToBigQuery 轉換。WriteToBigQuery 支援批次模式和串流模式。您必須將轉換套用到字典的 PCollection。一般來說,您需要使用另一個轉換(例如 ParDo)將您的輸出資料格式化為集合。

以下範例使用這個包含引述的 PCollection

writeTableRows 方法會將 BigQuery TableRow 物件的 PCollection 寫入 BigQuery 表格。PCollection 中的每個元素代表表格中的單一資料列。此範例使用 writeTableRows 將元素寫入 PCollection<TableRow>。寫入作業會建立表格(如果需要)。如果表格已存在,則會被取代。

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;

class BigQueryWriteToTable {
  public static void writeToTable(
      String project,
      String dataset,
      String table,
      TableSchema schema,
      PCollection<TableRow> rows) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // TableSchema schema = new TableSchema().setFields(Arrays.asList(...));

    // Pipeline pipeline = Pipeline.create();
    // PCollection<TableRow> rows = ...

    rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withSchema(schema)
            // For CreateDisposition:
            // - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
            // required
            // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            // For WriteDisposition:
            // - WRITE_EMPTY (default): raises an error if the table is not empty
            // - WRITE_APPEND: appends new rows to existing rows
            // - WRITE_TRUNCATE: deletes the existing rows before writing
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    // pipeline.run().waitUntilFinish();
  }
}
quotes = pipeline | beam.Create([
    {
        'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'
    },
    {
        'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."
    },
])

以下範例程式碼示範如何套用 WriteToBigQuery 轉換,將字典的 PCollection 寫入 BigQuery 表格。寫入作業會建立表格(如果需要)。如果表格已存在,則會被取代。

quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

write 轉換會將自訂類型物件的 PCollection 寫入 BigQuery 表格。使用 .withFormatFunction(SerializableFunction) 提供格式化函式,將 PCollection 中的每個輸入元素轉換為 TableRow。此範例使用 write 寫入 PCollection<String>。寫入作業會建立表格(如果需要)。如果表格已存在,則會被取代。

quotes.apply(
    BigQueryIO.<Quote>write()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withFormatFunction(
            (Quote elem) ->
                new TableRow().set("source", elem.source).set("quote", elem.quote))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

當您使用串流插入時,您可以決定如何處理失敗的記錄。您可以選擇持續重試,或使用 WriteResult.getFailedInserts() 方法將失敗的記錄傳回至個別的 PCollection

使用 Storage Write API

從 Beam SDK for Java 的 2.36.0 版開始,您可以使用 BigQueryIO 連接器的 BigQuery Storage Write API

此外,在 Beam SDK for Python 的 2.47.0 版之後,SDK 支援 BigQuery Storage Write API。

BigQuery Storage Write API for Python SDK 目前在支援的資料類型方面有一些限制。由於此方法使用跨語言轉換,因此我們僅限於跨語言界限支援的類型。例如,需要 apache_beam.utils.timestamp.Timestamp 來寫入 TIMESTAMP BigQuery 類型。此外,某些類型(例如 DATETIME)尚未支援。如需更多詳細資訊,請參閱完整類型對應

注意:如果您想從原始碼使用 Storage Write API 執行 WriteToBigQuery,您需要執行 ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build 來建置 expansion-service jar。如果您是從已發行的 Beam SDK 執行,則 jar 已包含在其中。

完全一次語義

若要使用 Storage Write API 寫入 BigQuery,請將 withMethod 設定為 Method.STORAGE_WRITE_API。以下範例轉換使用 Storage Write API 和完全一次語意寫入 BigQuery

WriteResult writeResult = rows.apply("Save Rows to BigQuery",
BigQueryIO.writeTableRows()
        .to(options.getFullyQualifiedTableName())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withMethod(Method.STORAGE_WRITE_API)
);
quotes | "WriteTableWithStorageAPI" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)

如果您想要變更 BigQueryIO 的行為,讓您管線的所有 BigQuery 接收器預設都使用 Storage Write API,請設定 UseStorageWriteApi 選項

如果您的管線需要建立表格(在表格不存在且您將建立處置指定為 CREATE_IF_NEEDED 的情況下),您必須提供表格結構描述。API 會使用結構描述驗證資料並將其轉換為二進位協定。

TableSchema schema = new TableSchema().setFields(
        List.of(
            new TableFieldSchema()
                .setName("request_ts")
                .setType("TIMESTAMP")
                .setMode("REQUIRED"),
            new TableFieldSchema()
                .setName("user_name")
                .setType("STRING")
                .setMode("REQUIRED")));
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

對於串流管線,您需要設定兩個額外參數:串流數量和觸發頻率。

BigQueryIO.writeTableRows()
        // ...
        .withTriggeringFrequency(Duration.standardSeconds(5))
        .withNumStorageWriteApiStreams(3)
);
# The Python SDK doesn't currently support setting the number of write streams
quotes | "StorageWriteAPIWithFrequency" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
    triggering_frequency=5)

串流數量定義 BigQueryIO 寫入轉換的平行處理度,並且大致對應於管線使用的 Storage Write API 串流數量。您可以使用 withNumStorageWriteApiStreams 在轉換上明確設定,或將 numStorageWriteApiStreams 選項提供給管線(如 BigQueryOptions 中所定義)。請注意,這僅支援串流管線。

觸發頻率決定資料在 BigQuery 中可供查詢的時間。您可以透過 withTriggeringFrequency 明確設定,或透過設定 storageWriteApiTriggeringFrequencySec 選項來指定秒數。

這兩個參數的組合會影響 BigQueryIO 在呼叫 Storage Write API 之前建立的資料列批次大小。將頻率設定得太高可能會導致批次較小,這會影響效能。一般而言,單一串流應能處理至少每秒 1Mb 的輸送量。建立獨佔串流對 BigQuery 服務而言是一項昂貴的操作,因此您應僅使用您用例所需的串流數量。對於大多數管線而言,觸發頻率設定在個位數秒數是一個不錯的選擇。

與串流插入類似,STORAGE_WRITE_API 支援動態決定寫入 BigQuery 的平行串流數量(從 2.42.0 開始)。您可以使用 withAutoSharding 來明確啟用此功能。

numStorageWriteApiStreams 設定為 0 或未指定時,STORAGE_WRITE_API 預設為動態分片。

使用 STORAGE_WRITE_API 時,WriteResult.getFailedStorageApiInserts 傳回的 PCollection 會包含無法寫入 Storage Write API sink 的資料列。

至少一次語義

如果您的用例允許目標表格中存在潛在的重複記錄,您可以使用 STORAGE_API_AT_LEAST_ONCE 方法。此方法不會將要寫入 BigQuery 的記錄保存到其 shuffle 儲存中,而 shuffle 儲存是提供 STORAGE_WRITE_API 方法的精確一次語意所必需的。因此,對於大多數管線而言,使用此方法通常成本較低且延遲較短。如果您使用 STORAGE_API_AT_LEAST_ONCE,您不需要指定串流數量,也無法指定觸發頻率。

自動分片不適用於 STORAGE_API_AT_LEAST_ONCE

使用 STORAGE_API_AT_LEAST_ONCE 時,WriteResult.getFailedStorageApiInserts 傳回的 PCollection 會包含無法寫入 Storage Write API sink 的資料列。

配額

在使用 Storage Write API 之前,請注意 BigQuery Storage Write API 配額

使用動態目的地

您可以使用動態目的地功能,將 PCollection 中的元素寫入不同的 BigQuery 表格,這些表格可能具有不同的結構描述。

動態目的地功能會根據使用者定義的目的地鍵來分組您的使用者類型,使用該鍵來計算目的地表格和/或結構描述,並將每個群組的元素寫入計算出的目的地。

此外,您也可以寫入您自己的具有 TableRow 對應函式的類型,並且您可以在所有 DynamicDestinations 方法中使用側輸入。

若要使用動態目的地,您必須建立 DynamicDestinations 物件並實作下列方法:

  • getDestination:傳回一個物件,getTablegetSchema 可以使用該物件作為目的地鍵,以計算目的地表格和/或結構描述。

  • getTable:傳回目的地鍵的表格(作為 TableDestination 物件)。此方法必須為每個唯一的目的地傳回唯一的表格。

  • getSchema:傳回目的地鍵的表格結構描述(作為 TableSchema 物件)。

然後,使用您的 DynamicDestinations 物件呼叫 write().to。此範例使用包含天氣資料的 PCollection,並將資料寫入每年不同的表格中。

/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
  final long year;
  final long month;
  final long day;
  final double maxTemp;

  public WeatherData() {
    this.year = 0;
    this.month = 0;
    this.day = 0;
    this.maxTemp = 0.0f;
  }
  public WeatherData(long year, long month, long day, double maxTemp) {
    this.year = year;
    this.month = month;
    this.day = day;
    this.maxTemp = maxTemp;
  }
}
*/

PCollection<WeatherData> weatherData =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> {
                  GenericRecord record = elem.getRecord();
                  return new WeatherData(
                      (Long) record.get("year"),
                      (Long) record.get("month"),
                      (Long) record.get("day"),
                      (Double) record.get("max_temperature"));
                })
            .fromQuery(
                "SELECT year, month, day, max_temperature "
                    + "FROM [apache-beam-testing.samples.weather_stations] "
                    + "WHERE year BETWEEN 2007 AND 2009")
            .withCoder(AvroCoder.of(WeatherData.class)));

// We will send the weather data into different tables for every year.
weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(
            new DynamicDestinations<WeatherData, Long>() {
              @Override
              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
                return elem.getValue().year;
              }

              @Override
              public TableDestination getTable(Long destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId(writeProject)
                        .setDatasetId(writeDataset)
                        .setTableId(writeTable + "_" + destination),
                    "Table for year " + destination);
              }

              @Override
              public TableSchema getSchema(Long destination) {
                return new TableSchema()
                    .setFields(
                        ImmutableList.of(
                            new TableFieldSchema()
                                .setName("year")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("month")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("day")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("maxTemp")
                                .setType("FLOAT")
                                .setMode("NULLABLE")));
              }
            })
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
                                                  ('Obi Wan Kenobi', True)]))

def table_fn(element, fictional_characters):
  if element in fictional_characters:
    return 'my_dataset.fictional_quotes'
  else:
    return 'my_dataset.real_quotes'

quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema,
    table_side_inputs=(fictional_characters_view, ),
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

使用時間分割

BigQuery 時間分割將您的表格分割成較小的分割區,這稱為 分割表格。分割表格使您可以更輕鬆地管理和查詢您的資料。

若要使用 BigQuery 時間分割,請使用以下兩種方法之一:

  • withTimePartitioning:此方法會取得 TimePartitioning 類別,並且僅在您寫入單一表格時才可使用。

  • withJsonTimePartitioning:此方法與 withTimePartitioning 相同,但會取得 JSON 序列化的字串物件。

此範例會每天產生一個分割區。

weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(tableSpec + "_partitioning")
        .withSchema(tableSchema)
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        // NOTE: an existing table without time partitioning set up will not work
        .withTimePartitioning(new TimePartitioning().setType("DAY"))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
quotes | 'WriteWithTimePartitioning' >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    additional_bq_parameters={'timePartitioning': {
        'type': 'HOUR'
    }})

限制

BigQueryIO 目前有以下限制:

  1. 您無法將 BigQuery 寫入的完成順序與管線的其他步驟排序。

  2. 如果您使用的是 Beam SDK for Python,當您寫入非常大的資料集時,您可能會遇到匯入大小配額問題。作為一種解決方法,您可以分割資料集(例如,使用 Beam 的 Partition 轉換)並寫入多個 BigQuery 表格。Beam SDK for Java 沒有此限制,因為它會為您分割資料集。

  3. 當您將資料載入到 BigQuery 時,會套用這些限制。依預設,BigQuery 會使用共用的插槽集區來載入資料。這表示無法保證可用容量,您的載入可能會排隊,直到有可用的插槽。如果插槽在 6 小時內沒有變為可用,則載入會因 BigQuery 設定的限制而失敗。為了避免這種情況,強烈建議您使用BigQuery 預留,以確保您的載入不會因為容量問題而排隊和失敗。

其他範例

您可以在 Beam 的範例目錄中找到使用 BigQuery 的其他範例。

Java 食譜範例

這些範例來自 Java cookbook examples 目錄。

Java 完整範例

這些範例來自 Java complete examples 目錄。

Python 食譜範例

這些範例來自 Python cookbook examples 目錄。