確保 Python 的類型安全

Python 是一種動態類型語言,沒有靜態類型檢查。由於 Python 的類型檢查方式,以及執行器執行的延遲性質,開發人員的生產力很容易因為花時間調查與類型相關的錯誤而受到阻礙。

適用於 Python 的 Apache Beam SDK 在管線建構和執行階段使用類型提示,以嘗試模擬真正的靜態類型所達到的正確性保證。此外,使用類型提示為後端服務執行有效的類型推斷和Coder物件的註冊奠定了一些基礎。

Python 3.5 版引入了一個名為 typing 的模組,以提供語言中類型驗證器的提示。適用於 Python 的 Beam SDK 實作了 PEP 484 的子集,並致力於在其自己的 typehints 模組中盡可能地遵循它。

這些旗標控制 Beam 的類型安全

類型提示的好處

當您使用類型提示時,Beam 會在管線建構期間而非執行階段引發例外。例如,如果 Beam 偵測到您的管線套用了不符的 PTransforms (其中一個轉換的預期輸出與後續轉換的預期輸入不符),則 Beam 會產生例外。這些例外會在管線建構時引發,無論您的管線將在哪裡執行。為您定義的 PTransforms 引入類型提示可讓您在本地執行器中提前捕獲潛在錯誤,而不是在深入且複雜的管線中執行數分鐘後才捕獲。

請考慮以下範例,其中 numbersstr 值的 PCollection

p = TestPipeline()

numbers = p | beam.Create(['1', '2', '3'])

然後,此程式碼會將 Filter 轉換套用至 numbers 集合,其中有一個可呼叫的物件會擷取偶數。

evens = numbers | beam.Filter(lambda x: x % 2 == 0)

當您呼叫 p.run() 時,此程式碼會在嘗試執行此轉換時產生錯誤,因為 Filter 預期是整數的 PCollection,但卻給定字串的 PCollection。使用類型提示,此錯誤可以在管線建構時(甚至在管線開始執行之前)捕獲。

適用於 Python 的 Beam SDK 包含一些自動類型提示:例如,某些 PTransforms,例如 Create 和簡單的 ParDo 轉換,會嘗試根據其輸入推斷其輸出類型。但是,Beam 無法在所有情況下推斷類型。因此,建議您宣告類型提示以協助您執行自己的類型檢查。

宣告類型提示

您可以在可呼叫物件、DoFns 或整個 PTransforms 上宣告類型提示。有三種方法可以宣告類型提示:在管線建構期間內聯、作為 DoFnPTransform 的屬性使用裝飾器,或者作為某些函式上的 Python 3 類型註釋。

您隨時都可以內聯宣告類型提示,但是如果您的程式碼需要重複使用它們,請將它們宣告為註釋或裝飾器。例如,如果您的 DoFn 需要 int 輸入,則將輸入的類型提示宣告為 process 引數的註釋(或 DoFn 的屬性)比內聯宣告更有意義。

使用註釋還有一個額外的好處,就是允許使用靜態類型檢查器(例如 mypy)來額外類型檢查您的程式碼。如果您已在使用類型檢查器,則使用註釋而不是裝飾器可以減少程式碼重複。但是,註釋並未涵蓋裝飾器和內聯宣告的所有使用案例。例如,它們不適用於 lambda 函式。

使用類型註釋宣告類型提示

2.21.0 版的新功能。

若要將類型提示指定為某些函式的註釋,請像平常一樣使用它們,並省略任何裝飾器提示或內聯提示。

目前在以下項目上支援註釋

以下程式碼在 to_id 轉換上宣告 int 輸入和 str 輸出類型提示,方法是使用 my_fn 上的註釋。

def my_fn(element: int) -> str:
  return 'id_' + str(element)

ids = numbers | 'to_id' >> beam.Map(my_fn)

以下程式碼示範如何在 PTransform 子類別上使用註釋。有效的註釋是包裝內部(巢狀)類型、PBeginPDoneNonePCollection。以下程式碼在自訂 PTransform 上宣告類型提示,該 PTransform 會使用註釋取得 PCollection[int] 輸入並輸出 PCollection[str]

from apache_beam.pvalue import PCollection

class IntToStr(beam.PTransform):
  def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
    return pcoll | beam.Map(lambda elem: str(elem))

ids = numbers | 'convert to str' >> IntToStr()

以下程式碼在 filter_evens 上宣告 int 輸入和輸出類型提示,方法是使用 FilterEvensDoFn.process 上的註釋。由於 process 會傳回產生器,因此產生 PCollection[int] 的 DoFn 的輸出類型會註釋為 Iterable[int]Generator[int, None, None] 也適用於此處)。Beam 會移除 DoFn.process 方法上傳回類型的外部可反覆運算項目,以及傳遞至 FlatMap 的函式,以推斷產生的 PCollection 的元素類型。對於這些函式,具有不可反覆運算的傳回類型註釋會產生錯誤。其他支援的可反覆運算類型包括:IteratorGeneratorTupleList

from typing import Iterable

class TypedFilterEvensDoFn(beam.DoFn):
  def process(self, element: int) -> Iterable[int]:
    if element % 2 == 0:
      yield element

evens = numbers | 'filter_evens' >> beam.ParDo(TypedFilterEvensDoFn())

以下程式碼在 double_evens 上宣告 int 輸入和輸出類型提示,方法是使用 FilterEvensDoubleDoFn.process 上的註釋。由於 process 會傳回 listNone,因此輸出類型會註釋為 Optional[List[int]]。Beam 也會移除外部 Optional,以及(如上所述)傳回類型的外部可反覆運算項目,僅在 DoFn.process 方法和傳遞至 FlatMap 的函式上。

from typing import List, Optional

class FilterEvensDoubleDoFn(beam.DoFn):
  def process(self, element: int) -> Optional[List[int]]:
    if element % 2 == 0:
      return [element, element]
    return None

evens = numbers | 'double_evens' >> beam.ParDo(FilterEvensDoubleDoFn())

內聯宣告類型提示

若要內聯指定類型提示,請使用 with_input_typeswith_output_types 方法。以下範例程式碼會內聯宣告輸入類型提示

evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int)

當您將 Filter 轉換套用至上述範例中的 numbers 集合時,您可以在管線建構期間捕獲錯誤。

使用裝飾器宣告類型提示

若要將類型提示指定為 DoFnPTransform 的屬性,請使用裝飾器 @with_input_types()@with_output_types()

以下程式碼在 FilterEvensDoFn 上宣告 int 類型提示,方法是使用裝飾器 @with_input_types()

@beam.typehints.with_input_types(int)
class FilterEvensDoFn(beam.DoFn):
  def process(self, element):
    if element % 2 == 0:
      yield element

evens = numbers | beam.ParDo(FilterEvensDoFn())

裝飾器會接收任意數量的位置和/或關鍵字引數,通常在它們包裝的函式的內容中解譯。一般而言,第一個引數是主要輸入的類型提示,而其他引數是輔助輸入的類型提示。

停用註釋使用

由於預設會啟用此樣式的類型提示宣告,因此以下是一些停用它的方法。

  1. 在您希望 Beam 忽略註釋的特定函式上使用 @beam.typehints.no_annotations 裝飾器。
  2. 使用上述裝飾器或內聯方法宣告類型提示。這些優先於註釋。
  3. 在建立管線之前呼叫 beam.typehints.disable_type_annotations()。這會阻止 Beam 檢視所有函式上的註釋。

定義泛型類型

您可以使用類型提示註釋來定義泛型類型。以下程式碼指定一個輸入類型提示,該提示會判斷泛型類型 T,以及一個輸出類型提示,該提示會判斷類型 Tuple[int, T]。如果 MyTransform 的輸入類型為 str,則 Beam 會推斷輸出類型為 Tuple[int, str]

from typing import Tuple, TypeVar

T = TypeVar('T')

@beam.typehints.with_input_types(T)
@beam.typehints.with_output_types(Tuple[int, T])
class MyTransform(beam.PTransform):
  def expand(self, pcoll):
    return pcoll | beam.Map(lambda x: (len(x), x))

words_with_lens = words | MyTransform()

類型提示的種類

你可以對任何類別使用型別提示,包括 Python 的原始型別、容器類別和使用者定義的類別。所有類別,例如 intfloat 和使用者定義的類別,都可以用來定義型別提示,稱為簡單型別提示。容器型別,例如列表、元組和可迭代物件,也可以用來定義型別提示,稱為參數化型別提示。最後,還有一些不對應任何具體 Python 類別的特殊型別,例如 AnyOptionalUnion,也允許作為型別提示。

Beam 定義了自己的內部型別提示型別,這些型別仍然可用於向後相容。它也支援 Python 的 typing 模組型別,這些型別在內部會轉換為 Beam 內部型別。

對於新的程式碼,建議使用 typing 模組型別。

簡單類型提示

型別提示可以是任何類別,從 intstr 到使用者定義的類別。如果你的型別提示是一個類別,你可能需要為它定義一個編碼器 (coder)。

參數化類型提示

參數化型別提示對於提示類似容器的 Python 物件的型別很有用,例如 list。這些型別提示進一步細化了容器物件中的元素。

參數化型別提示的參數可以是簡單型別、參數化型別或型別變數。作為型別變數的元素型別,例如 T,會強制操作的輸入和輸出之間的關係(例如,List[T] -> T)。型別提示可以巢狀使用,讓你為複雜型別定義型別提示。例如,List[Tuple[int, int, str]]

為了避免與內建容器型別的命名空間衝突,第一個字母會大寫。

允許使用以下參數化型別提示

注意: Tuple[T, U] 型別提示是具有固定數量異質型別元素的元組,而 Tuple[T, ...] 型別提示是具有可變數量同質型別元素的元組。

特殊類型提示

以下是不對應於類別的特殊型別提示,而是對應於 PEP 484 中引入的特殊型別。

執行階段類型檢查

除了在管道建構時使用型別提示進行型別檢查之外,你還可以啟用執行階段型別檢查,以檢查實際元素在管道執行期間是否滿足宣告的型別約束。

例如,以下管道會發出錯誤型別的元素。根據執行器 (runner) 的實作方式,其執行可能會或可能不會在執行階段失敗。

p = TestPipeline()
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)

但是,如果你啟用執行階段型別檢查,則保證程式碼會在執行階段失敗。要啟用執行階段型別檢查,請將管道選項 runtime_type_check 設定為 True

p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
p.run()

請注意,由於執行階段型別檢查是對每個 PCollection 元素進行的,因此啟用此功能可能會導致顯著的效能損失。因此,建議在生產管道中停用執行階段型別檢查。有關更快、適用於生產環境的替代方案,請參閱下一節。

更快的執行階段類型檢查

你可以透過將管道選項 performance_runtime_type_check 設定為 True 來啟用更快的、基於取樣的執行階段型別檢查。

這是一個僅限於 Python 3 的功能,它透過使用最佳化的 Cython 程式碼對一小部分值(稱為樣本)進行執行階段型別檢查來運作。

目前,此功能不支援對側邊輸入或組合操作進行執行階段型別檢查。這些計劃在 Beam 的未來版本中支援。

編碼器中類型提示的使用

當你的管道讀取、寫入或以其他方式實例化其資料時,你的 PCollection 中的元素需要被編碼和解碼為位元組字串。位元組字串用於中間儲存、比較 GroupByKey 操作中的鍵,以及從來源讀取和寫入到接收器。

Python 的 Beam SDK 使用 Python 原生支援序列化未知型別的物件,這個過程稱為封裝 (pickling)。但是,使用 PickleCoder 有幾個缺點:它在時間和空間上效率較低,並且使用的編碼不確定,這會阻礙分散式分割、分組和狀態查找。

為了避免這些缺點,你可以定義 Coder 類別,以便以更有效率的方式編碼和解碼型別。你可以指定一個 Coder 來描述如何編碼和解碼給定 PCollection 的元素。

為了正確且有效率,一個 Coder 需要型別資訊,並且 PCollection 要與特定型別相關聯。型別提示使這些型別資訊可用。Python 的 Beam SDK 為標準 Python 型別(例如 intfloatstrbytesunicode)提供內建編碼器。

確定性編碼器

如果你不定義 Coder,則預設是一個編碼器,它會針對未知型別回退到封裝。在某些情況下,你必須指定一個確定性的 Coder,否則你會收到執行階段錯誤。

例如,假設你有一個鍵值對的 PCollection,其鍵是 Player 物件。如果你對這樣的集合應用 GroupByKey 轉換,當使用非確定性編碼器(例如預設的封裝編碼器)時,其鍵物件在不同的機器上可能會以不同的方式序列化。由於 GroupByKey 使用此序列化表示來比較鍵,這可能會導致不正確的行為。為了確保元素始終以相同的方式編碼和解碼,你需要為 Player 類別定義一個確定性的 Coder

以下程式碼顯示了 Player 類別的範例以及如何為其定義 Coder。當你使用型別提示時,Beam 會使用 beam.coders.registry 推斷要使用的 Coders。以下程式碼將 PlayerCoder 註冊為 Player 類別的編碼器。在此範例中,為 CombinePerKey 宣告的輸入型別為 Tuple[Player, int]。在這種情況下,Beam 推斷要使用的 Coder 物件是 TupleCoderPlayerCoderIntCoder

from typing import Tuple

class Player(object):
  def __init__(self, team, name):
    self.team = team
    self.name = name

class PlayerCoder(beam.coders.Coder):
  def encode(self, player):
    return ('%s:%s' % (player.team, player.name)).encode('utf-8')

  def decode(self, s):
    return Player(*s.decode('utf-8').split(':'))

  def is_deterministic(self):
    return True

beam.coders.registry.register_coder(Player, PlayerCoder)

def parse_player_and_score(csv):
  name, team, score = csv.split(',')
  return Player(team, name), int(score)

totals = (
    lines
    | beam.Map(parse_player_and_score)
    | beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))