使用 Beam YAML 和 Protobuf 進行高效串流資料處理

使用 Beam YAML 和 Protobuf 進行高效串流資料處理

隨著串流資料處理的成長,其維護、複雜性和成本也隨之增加。這篇文章解釋了如何透過使用 Protobuf 來有效擴展管道,這確保了管道的可重複使用和快速部署。目標是讓工程師使用 Beam YAML 輕鬆實現此流程。

使用 Beam YAML 簡化管道

在 Beam 中建立管道可能有點困難,尤其是對於新的 Apache Beam 使用者而言。設定專案、管理依賴關係等等都可能具有挑戰性。Beam YAML 消除了大部分的樣板程式碼,讓您可以專注於工作中最重要的一部分:資料轉換。

Beam YAML 的一些主要優點包括

  • 可讀性:透過使用宣告式語言 (YAML),管道組態更易於人類閱讀。
  • 可重複使用性:簡化了在不同管道之間重複使用相同元件的過程。
  • 可維護性:更容易進行管道維護和更新。

以下範本顯示從 Kafka 主題讀取事件並將它們寫入 BigQuery 的範例。

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'TOPIC_NAME'
        format: RAW/AVRO/JSON/PROTO
        bootstrap_servers: 'BOOTSTRAP_SERVERS'
        schema: 'SCHEMA'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
        useAtLeastOnceSemantics: true

options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

完整的工作流程

本節示範此管道的完整工作流程。

建立簡單的 proto 事件

以下程式碼建立一個簡單的電影事件。

// events/v1/movie_event.proto

syntax = "proto3";

package event.v1;

import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";

message MovieEvent {
  option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
  google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
  google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
  google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
  google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
    // validates the average rating is at least 0
    gte: 0,
    // validates the average rating is at most 100
    lte: 100
  }, (gen_bq_schema.bigquery).description = "Movie rating"];
  string event_dt = 5 [
    (gen_bq_schema.bigquery).type_override = "DATETIME",
    (gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
    (buf.validate.field) = {
      string: {
        pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
      },
      ignore_empty: false,
    }
  ];
}

由於這些事件會寫入 BigQuery,因此匯入 bq_field proto 和 bq_table proto。這些 proto 檔案有助於產生 BigQuery JSON 結構描述。此範例也示範了左移方法,該方法儘可能在開發流程的早期移動測試、品質和效能。例如,為了確保僅從來源產生有效的事件,會包含 buf.validate 元素。

events/v1 資料夾中建立 movie_event.proto proto 之後,您可以產生必要的 檔案描述符。檔案描述符是結構描述的編譯表示形式,它允許各種工具和系統動態地理解和使用 protobuf 資料。為了簡化流程,此範例使用 Buf,它需要以下組態檔。

Buf 組態

# buf.yaml

version: v2
deps:
  - buf.build/googlecloudplatform/bq-schema-api
  - buf.build/bufbuild/protovalidate
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
# buf.gen.yaml

version: v2
managed:
  enabled: true
plugins:
  # Python Plugins
  - remote: buf.build/protocolbuffers/python
    out: gen/python
  - remote: buf.build/grpc/python
    out: gen/python

  # Java Plugins
  - remote: buf.build/protocolbuffers/java:v25.2
    out: gen/maven/src/main/java
  - remote: buf.build/grpc/java
    out: gen/maven/src/main/java

  # BQ Schemas
  - remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
    out: protoc-gen/bq_schema

執行以下兩個命令以產生必要的 Java、Python、BigQuery 結構描述和描述符檔案

// Generate the buf.lock file
buf deps update

// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports

// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports

讓 Beam YAML 讀取 proto

對 YAML 檔案進行以下修改

# movie_events_pipeline.yml

pipeline:
  transforms:
    - type: ReadFromKafka
      name: ReadProtoMovieEvents
      config:
        topic: 'movie_proto'
        format: PROTO
        bootstrap_servers: '<BOOTSTRAP_SERVERS>'
        file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
        message_name: 'event.v1.MovieEvent'
    - type: WriteToBigQuery
      name: WriteMovieEvents
      input: ReadProtoMovieEvents
      config:
        table: '<PROJECT_ID>.raw.movie_table'
        useAtLeastOnceSemantics: true
options:
  streaming: true
  dataflow_service_options: [streaming_mode_at_least_once]

此步驟將格式變更為 PROTO,並新增 file_descriptor_pathmessage_name

使用 Terraform 部署管道

您可以使用 Terraform 將 Beam YAML 管道部署為以 Dataflow 作為執行器。以下 Terraform 程式碼範例示範如何達成此目的

// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
  project = var.gcp_project_id
  service = "dataflow.googleapis.com"
}

// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
  provider                     = google-beta
  project                      = var.gcp_project_id
  name                         = "movie-proto-events"
  container_spec_gcs_path      = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
  region                       = var.gcp_region
  on_delete                    = "drain"
  machine_type                 = "n2d-standard-4"
  enable_streaming_engine      = true
  subnetwork                   = var.subnetwork
  skip_wait_on_job_termination = true
  parameters = {
    yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
    max_num_workers    = 40
    worker_zone        = var.gcp_zone
  }
  depends_on = [google_project_service.enable_dataflow_api]
}

假設 BigQuery 資料表存在(您可以使用 Terraform 和 Proto 來完成),此程式碼會使用 Beam YAML 程式碼來建立 Dataflow 作業,該程式碼從 Kafka 讀取 Proto 事件並將它們寫入 BigQuery。

改進和結論

以下社群貢獻可以改進此範例中的 Beam YAML 程式碼

  • 支援結構描述登錄:與 Buf 登錄或 Apicurio 等結構描述登錄整合,以實現更好的結構描述管理。目前的工作流程使用 Buf 產生描述符,並將它們儲存在 Google Cloud Storage 中。描述符可以改為儲存在結構描述登錄中。

  • 增強監控:實作進階監控和警示機制,以快速識別和解決資料管道中的問題。

利用 Beam YAML 和 Protobuf,我們可以簡化資料處理管道的建立和維護,大幅降低複雜性。這種方法確保工程師可以更有效地實作和擴展穩健、可重複使用的管道,而無需手動編寫 Beam 程式碼。

貢獻

歡迎想要協助建構和新增功能的開發人員開始在 Beam YAML 模組中貢獻心力。

GitHub 儲存庫上也有一個開放 錯誤清單,現在已標記為 yaml 標籤。

儘管 Beam YAML 在 Beam 2.52 中被標記為穩定版,但它仍在積極開發中,每次發行都會新增功能。強烈鼓勵希望參與設計決策並深入了解框架使用方式的人員加入 開發郵件清單,這些討論會在其中進行。