I/O 標準

概觀

這份 Apache Beam I/O 標準文件闡述了 1P/3P 開發人員開發 Apache Beam I/O 連接器的規範指南。這些指南旨在以簡單扼要的方式建立涵蓋文件、開發和測試的最佳實務。

什麼是內建 I/O 連接器?

位於 Apache Beam Github 儲存庫中的 I/O 連接器 (I/O) 稱為內建 I/O 連接器。內建 I/O 的整合測試和效能測試會由 Google Cloud Dataflow 團隊使用 Dataflow Runner 定期執行,並且公開發布度量以供參考。否則,以下指南將適用於兩者,除非另有明確說明。

指南

文件

本節列出了預期 I/O 提供的所有文件的超集合。本節中引用的 Apache Beam 文件可在此處找到。一般來說,一個很好的範例是遵循內建的 I/O,Snowflake I/O

內建 I/O

為 I/O 的相關語言提供程式碼文件。這也應該包含 Apache Beam 網站或外部文件位置中任何外部資訊來源的連結。

範例

I/O 連接器指南下新增一個頁面,涵蓋特定提示和設定。以下顯示 ParquetHadoop 和其他連接器的這些內容。

範例

I/O connector guides screenshot

您的 Javadoc/Pythondoc 中章節標題的格式應該保持一致,以便未來可以啟用其他頁面的程式化資訊擷取。

範例是頁面中要依序包含的章節子集

  1. 開始之前
  2. {連接器}IO 基礎
  3. 支援的功能
    1. 關聯式
  4. 驗證
  5. 從 {連接器} 讀取
  6. 寫入 {連接器}
  7. 資源可擴展性
  8. 限制
  9. 回報問題

範例

KafkaIO JavaDoc

I/O 連接器應該在 支援的功能 子標題下包含一個表格,指出所使用的關聯式功能

關聯式功能是一些概念,可以幫助提高效率,並且可以選擇性地由 I/O 連接器實作。使用最終使用者提供的管線設定 (SchemaIO) 和使用者查詢 (FieldAccessDescriptor) 資料,應用關聯式理論來推導出改進措施,例如更快的管線執行、更低的操作成本和更少讀取/寫入的資料。

範例表格

I/O connector guides screenshot

<div class="table-container-wrapper">
<table class="table table-bordered table-io-standards-relational-features">
   <tr>
      <th>
         <p><strong>Relational Feature</strong>
      </th>
      <th>
         <p><strong>Supported</strong>
      </th>
      <th>
         <p><strong>Notes</strong>
      </th>
   </tr>
   <tr>
      <td>
         <p>Column Pruning
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Filter Pushdown
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Table Statistics
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Partition Metadata
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
   <tr>
      <td>
         <p>Metastore
      </td>
      <td>
         <p>Yes/No
      </td>
      <td>
         <p>To Be Filled
      </td>
   </tr>
</table>
</div>

範例實作

BigQueryIO 欄修剪透過 ProjectionPushdown 僅傳回最終使用者查詢中指示的必要欄。這是透過使用 BigQuery DirectRead API 來實現的。

如有必要,在 常見管線模式 下新增一個頁面,概述涉及您的 I/O 的常見使用模式。

https://beam.dev.org.tw/documentation/patterns/bigqueryio/

使用您的 I/O 資訊更新 I/O 連接器

範例

https://beam.dev.org.tw/documentation/io/connectors/#built-in-io-connectors

alt_text

開始之前 標題下,提供使用 I/O 的設定步驟。

範例

https://beam.dev.org.tw/documentation/io/built-in/parquet/#before-you-start

在每個支援語言的初始描述之後,包含標準的讀取/寫入程式碼片段。以下範例顯示 Hadoop 的 Java 範例。

範例

https://beam.dev.org.tw/documentation/io/built-in/hadoop/#reading-using-hadoopformation

指示如何為元素指派時間戳記。這包括批次來源,以允許未來的 I/O 可以提供比 current_time() 更有用的資訊。

範例

指示如何推進時間戳記;對於批次來源,在大多數情況下會標示為 n/a。

概述連接器將建立的任何暫時資源(例如,檔案)。

範例

BigQuery 批次載入首先建立一個臨時 GCS 位置

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L455

驗證 子標題下,提供如何取得合作夥伴授權資料,以安全地存取來源/接收器。

範例

https://beam.dev.org.tw/documentation/io/built-in/snowflake/#authentication

這裡 BigQuery 將其命名為權限,但主題涵蓋了相似之處

https://beam.dev.org.tw/releases/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

I/O 應該在 開始之前 標題中提供來源/接收器文件的連結。

範例

https://beam.dev.org.tw/documentation/io/built-in/snowflake/

透過連結到文件,指出每種語言中是否有原生或跨語言支援。

範例

Kinesis I/O 具有 java 的原生實作,以及 python 的跨語言支援,但不支援 Golang。

限制 標題下指出已知的限制。如果限制有追蹤問題,請內嵌連結它。

範例

https://beam.dev.org.tw/documentation/io/built-in/snowflake/#limitations

I/O(非內建)

自訂 I/O 不包含在 Apache Beam Github 儲存庫中。一些範例是 SolaceIO。

使用您的資訊更新 Apache Beam 表格的其他 I/O 連接器。

上述表格

## 開發

本節概述了 API 語法、語意以及應針對新的和現有的 Apache Beam I/O 連接器採用的功能建議。

I/O 連接器開發指南的編寫遵循以下原則:

所有 SDK

管線設定/執行/串流/視窗化語意指南

主題

語義

管線選項

I/O 應極少依賴 PipelineOptions 子類別來調整內部參數。

如有必要,連接器相關的管線選項類別應:

  • 針對每個選項清楚說明其效果以及修改原因。
  • 選項名稱必須使用命名空間,以避免衝突。
  • 類別名稱:{Connector}Options
  • 方法名稱:set{Connector}{Option}get{Connector}{Option}

來源視窗化

除非使用者在 API 中明確參數化,否則來源必須以 GlobalWindow 傳回元素。

允許的非全域視窗模式

  • ReadFromIO(window_by=...)
  • ReadFromIO.IntoFixedWindows(...)
  • ReadFromIO(apply_windowing=True/False) (例如 PeriodicImpulse)
  • IO.read().withWindowing(...)
  • IO.read().windowBy(...)
  • IO.read().withFixedWindows(...)

接收器視窗化

接收器應與視窗無關,並處理以任何視窗化方法傳送的元素,除非在其 API 中明確參數化或表達。

接收器可以內部以任何方式變更 PCollection 的視窗化。但是,它作為結果物件一部分傳回的中繼資料必須

  • 必須與輸入在同一個視窗中,除非在 API 中明確聲明另有不同。
  • 必須具有準確的時間戳記
  • 可能包含有關視窗化的其他資訊(例如,BigQuery 工作可能會有時間戳記,但也可能會有與其關聯的視窗)。

允許的非全域視窗模式

  • WriteToIO(triggering_frequency=...) - 例如 WriteToBigQuery (這僅在轉換內設定視窗化 - 輸入資料仍位於全域視窗中)。
  • WriteBatchesToIO(...)
  • WriteWindowsToIO(...)

節流

串流接收器(或任何存取外部服務的轉換)可以實作其請求的節流,以防止外部服務過載。

待辦事項:Beam 應公開節流實用程式 (追蹤問題)

  • 每個金鑰固定節流
  • 使用接收器報告的回壓進行自適應節流
  • 從起點開始的加速節流

錯誤處理

待辦事項:追蹤問題

Java

一般

使用連接器時使用的主要類別應命名為 {connector}IO

範例

BigQuery I/O 是 org.apache.beam.sdk.io.bigquery.BigQueryIO

類別應放置在套件 org.apache.beam.sdk.io.{connector} 中。

範例

BigQueryIO 屬於 java 套件 org.apache.beam.sdk.io.bigquery

單元/整合/效能測試應位於套件 org.apache.beam.sdk.io.{connector}.testing 下。這將導致各種測試使用連接器的標準使用者介面。

單元測試應與連接器位於同一個套件中(即 org.apache.beam.sdk.io.{connector}),因為它們通常會測試連接器的內部。

BigQueryIO 屬於 java 套件 org.apache.beam.sdk.io.bigquery

I/O 轉換應避免接收使用者 lambda,以將元素從使用者類型對應到連接器特定的類型。相反地,它們應該與連接器特定的資料類型(在可能的情況下使用結構描述資訊)介接。

如有必要,I/O 轉換應接收一個類型參數,以指定轉換的輸入類型(對於接收器)或輸出類型(對於來源)。

只有在 **確定其輸出類型不會變更時**,I/O 轉換可能沒有類型參數(例如 FileIO.MatchAll 和其他 FileIO 轉換)。

強烈建議不要在 I/O 連接器的公開 API 部分直接公開第三方程式庫,原因如下:

  • 它會降低 Apache Beam 的相容性保證 - 第三方程式庫的變更可能/將直接中斷現有使用者的管線。
  • 它使得程式碼可維護性困難 - 如果程式庫在 API 層級直接公開,則相依性變更將需要在整個 I/O 實作程式碼中進行多次變更
  • 它會將第三方相依性強加給最終使用者

相反地,我們強烈建議公開 Beam 原生介面和保存對應邏輯的配接器。

如果您認為相關的程式庫本質上是極度靜態的。請在 I/O 本身中註明。

來源和接收器應該使用 PTransform 包裝器抽象化,並且內部類別應宣告為受保護或私有。透過這樣做,可以在不中斷相依性實作的情況下新增/變更/修改實作細節。

類別/方法/屬性

Java 語法

語義

class IO.Read

提供對表示 I/O 內部讀取之類別的存取權限。Read 類別應實作類似於 fluentbuilder 模式的流暢介面(例如 withX(...).withY(...))。搭配預設值,它提供比建立器模式稍少詳細資訊的快速失敗(在每次 .withX() 後立即驗證回饋)。

使用者 **不應** 直接建立此類別。它應該由 頂層實用程式方法 建立。

class IO.ReadAll

一些不同的來源實作執行階段組態,以便從資料來源讀取。這是一個很有價值的模式,因為它使純批次來源能夠成為更複雜的串流來源。

盡可能,此類型的轉換應具有建構時設定的轉換的類型豐富度

  • 支援在建構時已知結構描述的 Beam Row 輸出。
  • 在這種情況下,可能需要(且可接受)額外的組態(例如,SchemaProvider 參數、Schema 參數、Schema 目錄或類似的實用程式)。
  • 輸入 PCollection 應該具有帶有結構描述的固定類型,因此使用者可以輕鬆地操作它。

範例

JdbcIO.ReadAll, ParquetIO.ReadFiles

class IO.Write

提供對表示 I/O 內部寫入之類別的存取權限。Write 類別應實作流暢介面模式(例如 withX(...).withY(...)),如上針對 IO.Read 進一步所述。

使用者不應直接建立此類別。它應該由 頂層實用程式方法 建立。

其他轉換類別

一些資料儲存和外部系統實作不易調整為讀取或寫入語義的 API (例如,FhirIO 實作多種不同的轉換,以擷取資料或將資料傳送至 Fhir)。

這些類別應在**無法或極難以將其功能封裝為讀取、寫入和 ReadAll 轉換的額外組態一部分時才新增**,以避免增加使用者的認知負荷。

使用者不應直接建立這些類別。它們應該由 頂層靜態方法 建立。

實用程式類別

一些連接器依賴其他使用者介面類別來設定組態參數。

(例如 JdbcIO.DataSourceConfiguration)。這些類別應 **巢狀於 {Connector}IO 類別中**。

此格式使其在主 Javadoc 中可見,並且易於使用者發現。

方法 IO<T>.write()

頂層 I/O 類別將提供一個 **靜態方法**,以開始建構 I/O.Write 轉換。這會傳回具有單一輸入 PCollection 和 Write.Result 輸出的 PTransform。

此方法不應在其名稱中指定下列任何項目

  • 內部資料格式
  • 用於寫入資料的策略
  • 輸入或輸出資料類型

如果可能,上述項目應透過組態參數指定。**如果不可能**,則可以引入 **新的靜態方法**,但這 **必須是例外**。

方法 IO<T>.read()

開始建構 I/O.Read 轉換的方法。這會傳回具有單一輸出 PCollection 的 PTransform。

此方法不應在其名稱中指定下列任何項目

  • 內部資料格式
  • 用於讀取資料的策略
  • 輸出資料類型

如果可能,上述項目應透過組態參數指定。**如果不可能**,則可以引入 **新的靜態方法**,但這 **必須是例外,並在 I/O 標頭中記錄為 API 的一部分**。

如果參數很少且一般,或者如果它們是設定轉換所必需的,則初始靜態建構函式方法可能會接收參數(例如 FhirIO.exportResourcesToGcs, JdbcIO.ReadWithPartitions 需要用於初始組態的 TypeDescriptor)。

IO.Read.from(source)

Read 轉換必須提供一個 from 方法,使用者可以在其中指定從何處讀取。如果轉換可以從不同種類的來源(例如資料表、查詢、主題、分割區)讀取,則可以提供此 from 方法的多種實作來容納此要求

  • IO.Read from(Query query)
  • IO.Read from(Table table) / from(String table)
  • IO.Read from (Topic topic)
  • IO.Read from(Partition partition)

這些方法的輸入類型可以反映外部來源的 API(例如,Kafka TopicPartition 應使用 **Beam 實作**的 TopicPartition 物件)。

有時,可能會有多個使用相同輸入類型的 from 位置,這表示我們無法利用方法多載。考慮到這一點,請使用新的方法來處理這種情況。

  • IO.Read from(String table)
  • IO.Read fromQuery(String query)

IO.Read.fromABC(String abc)

如果可以使用方法多載,則不建議使用此模式,請遵循 Read.from(source) 中的指引。

IO.Write.to(destination)

Write 轉換必須提供一個 to 方法,讓使用者可以指定要將資料寫入的位置。如果轉換可以寫入不同種類的來源,同時仍然使用相同的輸入元素類型(例如表格、查詢、主題、分割區),則可以提供此 from 方法的多個實作來滿足此需求。

  • IO.Write to(Query query)
  • IO.Write to(Table table) / from(String table)
  • IO.Write to(Topic topic)
  • IO.Write to(Partition partition)

這些方法的輸入類型可以反映外部接收器的 API(例如,Kafka TopicPartition 應該使用Beam 實作的 TopicPartition 物件)。

如果不同種類的目的地需要不同類型的輸入物件類型,則應在個別的 I/O 連接器中完成。

有時,可能會有多個使用相同輸入類型的 from 位置,這表示我們無法利用方法多載。考慮到這一點,請使用新的方法來處理這種情況。

  • IO.Write to(String table)
  • IO.Write toTable(String table)

IO.Write.to(DynamicDestination destination)

寫入轉換可能允許寫入多個目的地。這可能是一個複雜的模式,應謹慎實作(對於可能在單一管線中有多個目的地的連接器來說,這是首選模式)。

此模式的首選方式是定義一個 DynamicDestinations 介面(例如 BigQueryIO.DynamicDestinations),讓使用者可以定義目的地配置的所有必要參數。

DynamicDestinations 介面還允許維護人員隨著時間的推移新增方法(使用預設實作以避免中斷現有使用者),在必要時定義額外的配置參數。

IO.Write.toABC(destination)

如果可以使用方法多載,則不建議使用此模式,請遵循 Write.to(destination) 中的指引。

class IO.Read.withX

IO.Write.withX

withX 提供一個方法,將配置傳遞給 Read 方法,其中 X 代表要建立的配置。除了通用的 with 語句(定義如下),I/O 應嘗試將配置選項的名稱與來源中的選項名稱相符。

這些方法應傳回 I/O 的新實例,而不是修改現有的實例。

範例

TextIO.Read.withCompression

IO.Read.withConfigObject

IO.Write.withConfigObject

Java 中的某些連接器會收到配置物件作為其配置的一部分。僅在特定情況下才建議使用此模式。在大多數情況下,連接器可以在頂層保存所有必要的配置參數。

若要判斷多參數配置物件是否適合作為高階轉換的參數,該配置物件必須

  • 僅保存與外部資料儲存的連線/驗證參數相關的屬性(例如 JdbcIO.DataSourceConfiguration)。
    • 一般來說,不應將機密作為參數傳遞,除非沒有其他替代方案。對於機密管理,建議使用機密管理服務或 KMS。
  • 反映外部資料來源的 API 特性(例如 KafkaIO.Read.withConsumerConfigUpdates),而不在 Beam API 中公開該外部 API。
    • 該方法應反映 API 物件的名稱(例如,如果有一個 SubscriptionStatConfig 物件,則方法將會是 withSubscriptionStatConfig)。
  • 當連接器可以支援不同的配置「路徑」時,其中特定屬性需要指定其他屬性(例如 BigQueryIO 的方法將包含各種不同的屬性)。(請參閱最後的範例)。

範例

JdbcIO.DataSourceConfigurationSpannerConfigKafkaIO.Read.withConsumerConfigUpdates

BigQueryIO.write()
  .withWriteConfig(FileLoadsConfig.withAvro()
                                 .withTriggeringFrequency()...)

BigQueryIO.write()
  .withWriteConfig(StreamingInsertsConfig.withDetailedError()
                                  .withExactlyOnce().etc..)

class IO.Write.withFormatFunction

不建議 - 動態目的地除外

對於可以接收 Beam Row 類型的 PCollection 的來源,不需要格式化函式,因為 Beam 應該能夠根據其架構格式化輸入資料。

對於提供動態目的地功能的接收器,元素可能攜帶有助於判斷其目的地的資料。在寫入最終目的地之前,可能需要移除這些資料。

若要包含此方法,連接器應

  • 顯示無法自動執行資料比對。
  • 支援動態目的地,並且因此需要變更輸入資料。

IO.Read.withCoder

IO.Write.withCoder

強烈不建議

設定編碼器來編碼/解碼此連接器的輸出/輸入 PCollection 的元素類型。一般來說,建議來源會

  1. 傳回具有自動推斷架構的 Row 物件。
  2. 藉由具有固定的輸出/輸入類型,或推斷其輸出/輸入類型,自動設定必要的編碼器。

如果無法執行 #1 和 #2,則可以新增 withCoder(...) 方法。

IO.ABC.withEndpoint / with{IO}Client / withClient

連接器轉換應提供一個方法來覆寫它們與它們所通訊的外部系統之間的介面。這可以啟用各種用途

設定編碼器來編碼/解碼此連接器的輸出/輸入 PCollection 的元素類型。一般來說,建議來源會

  • 藉由模擬目的地服務進行本機測試
  • 使用者在用戶端中啟用的指標、監控和安全性處理。
  • 基於模擬器的整合測試

範例

BigQueryIO.Write.withTestServices(BigQueryServices)

類型

Java 語法

語義

方法 IO.Read.expand

Read 轉換的 expand 方法必須傳回具有類型的 PCollection 物件。類型可以是參數化的,也可以固定為類別。

使用者 **不應** 直接建立此類別。它應該由 頂層實用程式方法 建立。

方法 IO.Read.expand 的 PCollection 類型

PCollection 的類型通常會是以下四個選項之一。對於這些選項中的每一個,建議的編碼/資料如下

  • 預先定義的基本 Java 類型(例如 String)
    • 此編碼應很簡單,並使用簡單的 Beam 編碼器(例如 Utf8StringCoder)
  • 具有架構的預設 POJO 類型(例如 Metadata
  • 具有特定架構的 Beam Row
  • 在建構時不知道架構的類型

在所有情況下,不建議要求使用者傳遞編碼器(例如 withCoder(...))。

方法 IO.Write.expand

任何寫入轉換的 expand 方法都必須傳回一個延伸 PCollectionTuple 的 IO.Write.Result 類型物件。此物件允許轉換傳回有關其寫入結果的 metadata,並允許此寫入之後接著其他的 PTransform。

如果 Write 轉換不需要傳回任何 metadata,仍然偏好 Write.Result 物件,因為它將允許轉換隨著時間的推移演進其 metadata。

metadata 的範例

  • 失敗的元素和錯誤
  • 成功寫入的元素
  • 轉換發出的呼叫中的 API 權杖

範例

BigQueryIO 的 WriteResult

演進

隨著時間的推移,I/O 需要演進以解決新的使用案例,或在底層使用新的 API。I/O 必要演進的一些範例

Java 語法

語意

頂層靜態方法

一般來說,應避免為可以在現有方法中作為配置捕獲的功能新增全新的靜態方法。

太多頂層方法可以透過配置支援的一個範例是 PubsubIO

僅應在以下情況下新增新的頂層靜態方法

Python

一般

如果 I/O 位於 Apache Beam 中,則應將其放置在 apache_beam.io.{connector}apache_beam.io.{namespace}.{connector} 套件中

範例

apache_beam.io.fileio 和 apache_beam.io.gcp.bigquery

將有一個名為 {connector}.py 的模組,它是用於在管線中使用連接器的主要進入點 apache_beam.io.{connector}apache_beam.io.{namespace}.{connector}

範例

apache_beam.io.gcp.bigquery / apache_beam/io/gcp/bigquery.py

另一種可能的佈局:apache_beam/io/gcp/bigquery/bigquery.py (自動匯入 bigquery/__init__.py 中的 public 類別)

連接器必須在其主要檔案中定義一個 __all__ 屬性,並且僅匯出使用者需要存取的類別和方法。

如果 I/O 實作存在於單一模組(單一檔案)中,則檔案 {connector}.py 可以保存它。

否則,連接器程式碼應定義在具有 __init__.py 檔案的目錄(連接器套件)中,該檔案會記錄公用 API。

如果連接器定義了其他包含其實作公用程式的檔案,則這些檔案必須清楚地記錄它們不應作為公用介面的事實。

類別/方法/屬性

Python 語法

語意

可呼叫的 ReadFrom{Connector}

這會提供 PTransform 的存取權,以從給定的資料來源讀取資料。它允許您透過接收的引數來配置它。對於冗長的選用參數清單,可以將其定義為具有預設值的參數。

問:Java 使用建構器模式。為什麼我們不能在 Python 中執行此操作?選用參數可以在 Python 中扮演相同的角色

範例

apache_beam.io.gcp.bigquery.ReadFromBigQuery

可呼叫的 ReadAllFrom{Connector}

一些不同的來源實作執行階段組態,以便從資料來源讀取。這是一個很有價值的模式,因為它使純批次來源能夠成為更複雜的串流來源。

盡可能地,這種轉換類型應該具有建構時設定的轉換的類型豐富度和安全性

  • 支援具有在建構時已知的架構的輸出
    • 在這種情況下,可能需要(且可接受)額外的組態(例如,SchemaProvider 參數、Schema 參數、Schema 目錄或類似的實用程式)。
  • 輸入 PCollection 應該具有帶有結構描述的固定類型,因此使用者可以輕鬆地操作它。

範例

ReadAllFromBigQuery

可呼叫的 WriteTo{Connector}

這提供了存取 PTransform 以寫入指定資料接收器的功能。它允許您透過接收的引數來設定。對於可選參數的長列表,它們可以定義為具有預設值的參數。

問:Java 使用建構器模式。為什麼我們不能在 Python 中這樣做?可選參數可以起到相同的作用

作用在 Python 中。

範例

apache_beam.io.gcp.bigquery.WriteToBigQuery

可呼叫的讀取/寫入

頂層轉換初始化器 (ReadFromIO/ReadAllFromIO/WriteToIO) 的目標必須是盡可能減少所需的參數,以簡化其使用方式,並允許使用者快速使用它們。

參數 ReadFrom{Connector}({source})

參數 WriteTo{Connector}({sink})

讀取或寫入 I/O 連接器的第一個參數必須指定讀取器的來源或寫入器的目的地。

如果轉換可以從不同種類的來源讀取(例如表格、查詢、主題、分割區),則建議的方法依偏好順序為:

  1. 保留單一引數,但自動推斷來源/接收器類型(例如 pandas.read_sql(...) 支援表格和查詢)
  2. 為每個可能的來源/接收器類型新增一個新引數(例如 ReadFromBigQuery 具有 query/table 參數)

參數 WriteToIO(destination={multiple_destinations})

寫入轉換可能允許寫入多個目的地。這可能是一個複雜的模式,應謹慎實作(對於可能在單一管線中有多個目的地的連接器來說,這是首選模式)。

在 Python 中,首選的 API 模式是傳遞可呼叫的物件(例如 WriteToBigQuery)以設定所有需要組態的參數。一般來說,可呼叫參數的範例可以是:

  • 目的地可呼叫物件 → 應該接收一個元素,並返回該元素的目標
  • 其他範例
    • Schema 可呼叫物件 → 應接收目的地並返回該目的地的 Schema
    • 格式化函式 → 應該接收一筆記錄(可能還有一個目的地),並格式化要插入的記錄。

使用這些可呼叫物件也允許維護人員隨著時間的推移新增新的可參數化可呼叫物件(具有預設值,以避免破壞現有使用者),如果需要,這些可呼叫物件將定義額外的組態參數。

特殊情況:通常需要將 side input 傳遞給其中一些可呼叫物件。建議的模式是在建構函數中加入一個額外參數來包含這些 side input(例如 WriteToBigQuery 的 table_side_inputs 參數

參數 ReadFromIO(param={param_val})

參數 WriteToIO(param={param_val})

任何額外的組態都可以作為 I/O 建構函數中的可選參數新增。應盡可能避免強制性的額外參數。可選參數應具有合理的預設值,以便選取新的連接器將盡可能容易。

參數 ReadFromIO(config={config_object})

不建議使用

Python 中的某些連接器可能會收到複雜的組態物件作為其組態的一部分。不建議使用此模式,因為連接器可以在頂層保留所有必要的組態參數。

若要判斷多參數配置物件是否適合作為高階轉換的參數,該配置物件必須

類型

Python 語法

語義

方法 ReadFromIO.expand 的輸出

讀取轉換的 expand 方法必須傳回具有類型的 PCollection 物件,並使用類型進行註解。Python 中首選的 PCollection 類型是(依偏好順序):

如果為(位元組、字串、數字),則為簡單的 Python 類型

對於複雜的類型

  1. 使用 RowCoder 編碼的具有設定的 Schema 的 NamedTuple 或 DataClass
  2. Python 字典
    1. 如果可能,字典應透過 RowCoder 編碼。
  3. 如果無法使用 Schema,則為預設的 Python 類別

方法 WriteToIO.expand 的輸出

任何寫入轉換的 expand 方法都必須傳回具有固定類別類型的 Python 物件。建議的類別名稱是 WriteTo{IO}Result。此物件允許轉換傳回有關其寫入結果的中繼資料。

如果寫入轉換不需要傳回任何中繼資料,則仍然首選具有類別類型的 Python 物件,因為它允許轉換隨著時間的推移發展其的中繼資料。

metadata 的範例

  • 失敗的元素和錯誤
  • 成功寫入的元素
  • 轉換發出的呼叫中的 API 權杖

範例

BigQueryIO 的 WriteResult

激勵範例(不良模式):WriteToBigQuery 的不一致字典結果 [1][2]

方法 WriteToIO.expand 的輸入

寫入轉換的 expand 方法必須傳回具有類型的 PCollection 物件,並使用類型進行註解。Python 中首選的 PCollection 類型與 T1 中引用的 ReadFromIO 的輸出類型相同。

GoLang

一般

如果 I/O 存在於 Apache Beam 中,則應將其放置在套件中

{connector}io

範例

avroiobigqueryio

整合和效能測試應與 I/O 本身放在同一套件下

{connector}io

Typescript

類別/方法/屬性

Typescript 語法

語義

function readFromXXX

開始建構 I/O.Read 轉換的方法。

function writeToXXX

開始建構 I/O.Write 轉換的方法。

測試

I/O 應具有單元測試、整合測試和效能測試。在以下指南中,我們將說明每種類型測試的目標,並提供測試涵蓋範圍的基準標準。請注意,實際的測試案例和實際測試的業務邏輯會因每個來源/接收器的具體情況而異,但我們已包含一些建議的測試案例作為基準。

本指南透過新增特定的測試案例和場景,來補充 Apache Beam I/O 轉換測試指南。如需有關測試 Beam I/O 連接器的一般資訊,請參閱該指南。

整合和效能測試應置於 org.apache.beam.sdk.io.{connector}.testing 套件下。這將導致各種測試使用連接器的標準面向使用者的介面。

單元測試應與連接器放在同一套件中(即 org.apache.beam.sdk.io.{connector}),因為它們可能經常測試連接器的內部結構。

單元測試

I/O 單元測試需要有效率地測試程式碼的功能。鑑於預期單元測試將在多個測試套件中執行多次(例如,針對每個 Python 版本),這些測試應執行相對快速,並且不應有副作用。我們建議嘗試透過單元測試達成 100% 的程式碼涵蓋率。

在可能的情況下,由於執行時間更快且資源使用率低,單元測試優於整合測試。此外,單元測試可以輕鬆包含在預先提交的測試套件中(例如,Jenkins beam_PreCommit_* 測試套件),因此更有機會及早發現迴歸。單元測試也是錯誤情況的首選。

單元測試類別應與 IO 位於同一套件中,並命名為 {connector}IOTest。

範例

sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java

建議的測試案例

要測試的功能

描述

範例

使用預設選項進行讀取

最好使用 DirectRunner 和資料儲存的假資料在本地執行管道。但可以是使用模擬的來源轉換單元測試。

BigtableIOTest.testReading

pubsub_test.TestReadFromPubSub.test_read_messages_success

CassandraIOTest.testRead

使用預設選項進行寫入

最好使用 DirectRunner 和資料儲存的假資料在本地執行管道。但可以是使用模擬的接收器轉換單元測試。

BigtableIOTest.testWriting

pubsub_test.TestWriteToPubSub.test_write_messages_success

使用其他選項進行讀取

針對使用者可用的每個選項。

BigtableIOTest.testReadingWithFilter

使用其他選項進行寫入

針對使用者可用的每個選項。例如,寫入動態目的地。

BigTableIOTest.testReadWithBigTableOptionsSetsRetryOptions

BigQueryIOWriteTest.testWriteDynamicDestinations

讀取其他元素類型

如果資料儲存讀取 Schema 支援不同的資料類型。

BigQueryIOReadTest.testReadTableWithSchema

寫入其他元素類型

如果資料儲存寫入 Schema 支援不同的資料類型。

顯示資料

測試來源/接收器是否正確填入顯示資料。

AvroIOTest.testReadDisplayData

DatastoreV1Test.testReadDisplayData

bigquery_test.TestBigQuerySourcetest_table_reference_display_data

初始分割

這些測試可能有很多變化。請參閱範例以了解詳細資訊。

BigqueryIOReadTest.estBigQueryQuerySourceInitSplit

avroio_test.AvroBase.test_read_with_splitting

動態工作重新平衡

這些測試可能有很多變化。請參閱範例以了解詳細資訊。

BigTableIOTest.testReadingSplitAtFractionExhaustive

avroio_test.AvroBase.test_dynamic_work_rebalancing_exhaustive

Schema 支援

讀取 PCollection<Row> 或寫入 PCollection<Row>

應驗證從來源擷取 Schema,並針對接收器推送/驗證 Schema。

BigQueryIOReadTest.testReadTableWithSchema

BigQueryIOWriteTest.testSchemaWriteLoads

驗證測試

測試來源/接收器轉換是否已正確驗證,亦即,錯誤/不相容的組態會因可執行的錯誤而被拒絕。

BigQueryIOWriteTest.testWriteValidatesDataset

PubsubIOTest.testTopicValidationSuccess

度量

確認設定了各種讀取/寫入計量

SpannerIOReadTest.testReadMetrics

bigtableio_test.TestWriteBigTable.test_write_metrics

全部讀取

測試測試的全部讀取 (PCollection<Read Config>) 版本是否正常運作

SpannerIOReadTest.readAllPipeline

CassandraIOTest.readAllQuery

接收器批次處理測試

如果接收器因效能原因執行批次處理,請確保接收器在寫入之前批次處理資料。

SpannerIOWriteTest.testBatchFn_cells

錯誤處理

確保正確處理來自資料儲存的各種錯誤(例如,HTTP 錯誤代碼)

BigQueryIOWriteTest.testExtendedErrorRetrieval

重試原則

確認來源/接收器按預期重試要求

BigQueryIOWriteTest.testRetryPolicy

接收器的輸出 PCollection

接收器應產生後續步驟可以依賴的 PCollection。

BigQueryIOWriteTest.testWriteTables

回溯位元組報告

測試以確認無界限來源轉換是否正確報告回溯位元組。

KinesisReaderTest.getSplitBacklogBytesShouldReturnBacklogUnknown

水位線報告

測試以確認無界限來源轉換是否正確報告水位線。

WatermarkPolicyTest.shouldAdvanceWatermarkWithTheArrivalTimeFromKinesisRecords

整合測試

整合測試測試 Beam 執行器與給定 I/O 連接到的資料儲存之間的端對端互動。由於這些通常涉及遠端 RPC 呼叫,因此整合測試需要更長的執行時間。此外,Beam 執行器在執行整合測試時可能會使用多個工作程式。由於這些成本,只有在單元測試無法涵蓋給定案例時才應實作整合測試。

提交時,需要實作至少一個涉及 Beam 和外部儲存系統之間互動的整合測試。

對於同時涉及來源和接收器的 I/O 連接器,Beam 指南建議以寫入然後讀取的形式實作測試,以便同一個測試管道可以涵蓋讀取和寫入。

整合測試類別應與 I/O 位於同一套件中,並命名為 {connector}IOIT

例如

sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java

建議的測試案例

測試類型

描述

範例

使用 Dataflow 的「寫入然後讀取」測試

將產生的資料寫入資料儲存,並使用 Dataflow 從資料儲存中讀回相同的資料。

JdbcIOIT.testWriteThenRead

使用 Dataflow 的「寫入然後全部讀取」測試

與「寫入然後讀取」相同,但適用於支援讀取來源組態 PCollection 的來源。預期所有未來的 (SDF) 來源都支援此功能。

如果相同的轉換用於「讀取」和「全部讀取」形式,或者兩個轉換本質上相同(例如,讀取轉換是全部讀取的簡單包裝函式,反之亦然),則只需新增單一的「全部讀取」測試就足夠了。

SpannerReadIT.testReadAllRecordsInDb

使用 Dataflow 的無界限寫入然後讀取

一個持續寫入和讀取資料的管道。這樣的管道應該被取消,以驗證結果。這僅適用於支援無界讀取的連接器。

KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming

效能測試

由於效能測試框架仍在變動中,效能測試可以在實際 I/O 程式碼之後提交。

效能測試框架目前尚不支援 GoLang 或 Typescript。

效能基準測試是 I/O 最佳實踐的重要環節,因為它們有效地解決了幾個方面

儀表板

Google 會定期對內建 I/O 執行效能測試,並將測試結果發佈到外部可檢視的儀表板,適用於 JavaPython

Dataflow performance test dashboard

指南

盡可能將相同的測試用於整合測試和效能測試。效能測試通常與整合測試相同,但涉及較大的資料量。測試框架(內部和外部)提供追蹤與這些測試相關的效能基準,以及提供儀表板/工具來偵測異常的功能。

內建 I/O 連接器指南文件中,在您的頁面中加入一個資源可擴展性部分,其中將指出 IO 進行整合測試的上限。

例如

指出 KafkaIO 已使用 xxxx 個主題進行整合測試。文件可以說明連接器作者是否認為連接器可以擴展到整合測試的數量之上,然而這將向使用者清楚說明已測試路徑的限制。

文件應清楚指出限制所遵循的配置。例如,使用 runner x 和配置選項 a。

記錄您的 I/O 收集的效能/內部指標,包括它們的含義以及如何使用它們(某些連接器會收集和發布效能指標,例如延遲/綑綁大小/等等)

根據連接器已實施的效能測試,納入 I/O 的預期效能特性。