部落格
2024/09/20
使用 Beam YAML 和 Protobuf 進行高效串流資料處理
使用 Beam YAML 和 Protobuf 進行高效串流資料處理
隨著串流資料處理的成長,其維護、複雜性和成本也隨之增加。這篇文章解釋了如何透過使用 Protobuf 來有效擴展管道,這確保了管道的可重複使用和快速部署。目標是讓工程師使用 Beam YAML 輕鬆實現此流程。
使用 Beam YAML 簡化管道
在 Beam 中建立管道可能有點困難,尤其是對於新的 Apache Beam 使用者而言。設定專案、管理依賴關係等等都可能具有挑戰性。Beam YAML 消除了大部分的樣板程式碼,讓您可以專注於工作中最重要的一部分:資料轉換。
Beam YAML 的一些主要優點包括
- 可讀性:透過使用宣告式語言 (YAML),管道組態更易於人類閱讀。
- 可重複使用性:簡化了在不同管道之間重複使用相同元件的過程。
- 可維護性:更容易進行管道維護和更新。
以下範本顯示從 Kafka 主題讀取事件並將它們寫入 BigQuery 的範例。
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'TOPIC_NAME'
format: RAW/AVRO/JSON/PROTO
bootstrap_servers: 'BOOTSTRAP_SERVERS'
schema: 'SCHEMA'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
完整的工作流程
本節示範此管道的完整工作流程。
建立簡單的 proto 事件
以下程式碼建立一個簡單的電影事件。
// events/v1/movie_event.proto
syntax = "proto3";
package event.v1;
import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";
message MovieEvent {
option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
// validates the average rating is at least 0
gte: 0,
// validates the average rating is at most 100
lte: 100
}, (gen_bq_schema.bigquery).description = "Movie rating"];
string event_dt = 5 [
(gen_bq_schema.bigquery).type_override = "DATETIME",
(gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
(buf.validate.field) = {
string: {
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
},
ignore_empty: false,
}
];
}
由於這些事件會寫入 BigQuery,因此匯入 bq_field
proto 和 bq_table
proto。這些 proto 檔案有助於產生 BigQuery JSON 結構描述。此範例也示範了左移方法,該方法儘可能在開發流程的早期移動測試、品質和效能。例如,為了確保僅從來源產生有效的事件,會包含 buf.validate
元素。
在 events/v1
資料夾中建立 movie_event.proto
proto 之後,您可以產生必要的 檔案描述符。檔案描述符是結構描述的編譯表示形式,它允許各種工具和系統動態地理解和使用 protobuf 資料。為了簡化流程,此範例使用 Buf,它需要以下組態檔。
Buf 組態
# buf.yaml
version: v2
deps:
- buf.build/googlecloudplatform/bq-schema-api
- buf.build/bufbuild/protovalidate
breaking:
use:
- FILE
lint:
use:
- DEFAULT
# buf.gen.yaml
version: v2
managed:
enabled: true
plugins:
# Python Plugins
- remote: buf.build/protocolbuffers/python
out: gen/python
- remote: buf.build/grpc/python
out: gen/python
# Java Plugins
- remote: buf.build/protocolbuffers/java:v25.2
out: gen/maven/src/main/java
- remote: buf.build/grpc/java
out: gen/maven/src/main/java
# BQ Schemas
- remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
out: protoc-gen/bq_schema
執行以下兩個命令以產生必要的 Java、Python、BigQuery 結構描述和描述符檔案
// Generate the buf.lock file
buf deps update
// It generates the descriptor in descriptor.binp.
buf build . -o descriptor.binp --exclude-imports
// It generates the Java, Python and BigQuery schema as described in buf.gen.yaml
buf generate --include-imports
讓 Beam YAML 讀取 proto
對 YAML 檔案進行以下修改
# movie_events_pipeline.yml
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'movie_proto'
format: PROTO
bootstrap_servers: '<BOOTSTRAP_SERVERS>'
file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
message_name: 'event.v1.MovieEvent'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: '<PROJECT_ID>.raw.movie_table'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
此步驟將格式變更為 PROTO
,並新增 file_descriptor_path
和 message_name
。
使用 Terraform 部署管道
您可以使用 Terraform 將 Beam YAML 管道部署為以 Dataflow 作為執行器。以下 Terraform 程式碼範例示範如何達成此目的
// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
project = var.gcp_project_id
service = "dataflow.googleapis.com"
}
// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
provider = google-beta
project = var.gcp_project_id
name = "movie-proto-events"
container_spec_gcs_path = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
region = var.gcp_region
on_delete = "drain"
machine_type = "n2d-standard-4"
enable_streaming_engine = true
subnetwork = var.subnetwork
skip_wait_on_job_termination = true
parameters = {
yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
max_num_workers = 40
worker_zone = var.gcp_zone
}
depends_on = [google_project_service.enable_dataflow_api]
}
假設 BigQuery 資料表存在(您可以使用 Terraform 和 Proto 來完成),此程式碼會使用 Beam YAML 程式碼來建立 Dataflow 作業,該程式碼從 Kafka 讀取 Proto 事件並將它們寫入 BigQuery。
改進和結論
以下社群貢獻可以改進此範例中的 Beam YAML 程式碼
支援結構描述登錄:與 Buf 登錄或 Apicurio 等結構描述登錄整合,以實現更好的結構描述管理。目前的工作流程使用 Buf 產生描述符,並將它們儲存在 Google Cloud Storage 中。描述符可以改為儲存在結構描述登錄中。
增強監控:實作進階監控和警示機制,以快速識別和解決資料管道中的問題。
利用 Beam YAML 和 Protobuf,我們可以簡化資料處理管道的建立和維護,大幅降低複雜性。這種方法確保工程師可以更有效地實作和擴展穩健、可重複使用的管道,而無需手動編寫 Beam 程式碼。
貢獻
歡迎想要協助建構和新增功能的開發人員開始在 Beam YAML 模組中貢獻心力。
GitHub 儲存庫上也有一個開放 錯誤清單,現在已標記為 yaml
標籤。
儘管 Beam YAML 在 Beam 2.52 中被標記為穩定版,但它仍在積極開發中,每次發行都會新增功能。強烈鼓勵希望參與設計決策並深入了解框架使用方式的人員加入 開發郵件清單,這些討論會在其中進行。