確保 Python 的類型安全
Python 是一種動態類型語言,沒有靜態類型檢查。由於 Python 的類型檢查方式,以及執行器執行的延遲性質,開發人員的生產力很容易因為花時間調查與類型相關的錯誤而受到阻礙。
適用於 Python 的 Apache Beam SDK 在管線建構和執行階段使用類型提示,以嘗試模擬真正的靜態類型所達到的正確性保證。此外,使用類型提示為後端服務執行有效的類型推斷和Coder
物件的註冊奠定了一些基礎。
Python 3.5 版引入了一個名為 typing 的模組,以提供語言中類型驗證器的提示。適用於 Python 的 Beam SDK 實作了 PEP 484 的子集,並致力於在其自己的 typehints 模組中盡可能地遵循它。
這些旗標控制 Beam 的類型安全
--no_pipeline_type_check
在管線建構期間停用類型檢查。預設是執行這些檢查。
--runtime_type_check
啟用每個元素的執行階段類型檢查。這可能會影響管線效能,因此預設是略過這些檢查。
--type_check_additional
啟用額外的類型檢查。為了保持向後相容性,預設不會啟用這些檢查。此旗標接受以逗號分隔的選項清單
all
:啟用所有額外檢查。ptransform_fn
:在使用@ptransform_fn
裝飾器時啟用類型提示裝飾器。
類型提示的好處
當您使用類型提示時,Beam 會在管線建構期間而非執行階段引發例外。例如,如果 Beam 偵測到您的管線套用了不符的 PTransforms
(其中一個轉換的預期輸出與後續轉換的預期輸入不符),則 Beam 會產生例外。這些例外會在管線建構時引發,無論您的管線將在哪裡執行。為您定義的 PTransforms
引入類型提示可讓您在本地執行器中提前捕獲潛在錯誤,而不是在深入且複雜的管線中執行數分鐘後才捕獲。
請考慮以下範例,其中 numbers
是 str
值的 PCollection
然後,此程式碼會將 Filter
轉換套用至 numbers
集合,其中有一個可呼叫的物件會擷取偶數。
當您呼叫 p.run()
時,此程式碼會在嘗試執行此轉換時產生錯誤,因為 Filter
預期是整數的 PCollection
,但卻給定字串的 PCollection
。使用類型提示,此錯誤可以在管線建構時(甚至在管線開始執行之前)捕獲。
適用於 Python 的 Beam SDK 包含一些自動類型提示:例如,某些 PTransforms
,例如 Create
和簡單的 ParDo
轉換,會嘗試根據其輸入推斷其輸出類型。但是,Beam 無法在所有情況下推斷類型。因此,建議您宣告類型提示以協助您執行自己的類型檢查。
宣告類型提示
您可以在可呼叫物件、DoFns
或整個 PTransforms
上宣告類型提示。有三種方法可以宣告類型提示:在管線建構期間內聯、作為 DoFn
或 PTransform
的屬性使用裝飾器,或者作為某些函式上的 Python 3 類型註釋。
您隨時都可以內聯宣告類型提示,但是如果您的程式碼需要重複使用它們,請將它們宣告為註釋或裝飾器。例如,如果您的 DoFn
需要 int
輸入,則將輸入的類型提示宣告為 process
引數的註釋(或 DoFn
的屬性)比內聯宣告更有意義。
使用註釋還有一個額外的好處,就是允許使用靜態類型檢查器(例如 mypy)來額外類型檢查您的程式碼。如果您已在使用類型檢查器,則使用註釋而不是裝飾器可以減少程式碼重複。但是,註釋並未涵蓋裝飾器和內聯宣告的所有使用案例。例如,它們不適用於 lambda 函式。
使用類型註釋宣告類型提示
2.21.0 版的新功能。
若要將類型提示指定為某些函式的註釋,請像平常一樣使用它們,並省略任何裝飾器提示或內聯提示。
目前在以下項目上支援註釋
DoFn
子類別上的process()
方法。PTransform
子類別上的expand()
方法。- 傳遞至以下項目的函式:
ParDo
、Map
、FlatMap
、Filter
。
以下程式碼在 to_id
轉換上宣告 int
輸入和 str
輸出類型提示,方法是使用 my_fn
上的註釋。
以下程式碼示範如何在 PTransform
子類別上使用註釋。有效的註釋是包裝內部(巢狀)類型、PBegin
、PDone
或 None
的 PCollection
。以下程式碼在自訂 PTransform 上宣告類型提示,該 PTransform 會使用註釋取得 PCollection[int]
輸入並輸出 PCollection[str]
。
以下程式碼在 filter_evens
上宣告 int
輸入和輸出類型提示,方法是使用 FilterEvensDoFn.process
上的註釋。由於 process
會傳回產生器,因此產生 PCollection[int]
的 DoFn 的輸出類型會註釋為 Iterable[int]
(Generator[int, None, None]
也適用於此處)。Beam 會移除 DoFn.process
方法上傳回類型的外部可反覆運算項目,以及傳遞至 FlatMap
的函式,以推斷產生的 PCollection 的元素類型。對於這些函式,具有不可反覆運算的傳回類型註釋會產生錯誤。其他支援的可反覆運算類型包括:Iterator
、Generator
、Tuple
、List
。
以下程式碼在 double_evens
上宣告 int
輸入和輸出類型提示,方法是使用 FilterEvensDoubleDoFn.process
上的註釋。由於 process
會傳回 list
或 None
,因此輸出類型會註釋為 Optional[List[int]]
。Beam 也會移除外部 Optional
,以及(如上所述)傳回類型的外部可反覆運算項目,僅在 DoFn.process
方法和傳遞至 FlatMap
的函式上。
內聯宣告類型提示
若要內聯指定類型提示,請使用 with_input_types
和 with_output_types
方法。以下範例程式碼會內聯宣告輸入類型提示
當您將 Filter 轉換套用至上述範例中的 numbers 集合時,您可以在管線建構期間捕獲錯誤。
使用裝飾器宣告類型提示
若要將類型提示指定為 DoFn
或 PTransform
的屬性,請使用裝飾器 @with_input_types()
和 @with_output_types()
。
以下程式碼在 FilterEvensDoFn
上宣告 int
類型提示,方法是使用裝飾器 @with_input_types()
。
裝飾器會接收任意數量的位置和/或關鍵字引數,通常在它們包裝的函式的內容中解譯。一般而言,第一個引數是主要輸入的類型提示,而其他引數是輔助輸入的類型提示。
停用註釋使用
由於預設會啟用此樣式的類型提示宣告,因此以下是一些停用它的方法。
- 在您希望 Beam 忽略註釋的特定函式上使用
@beam.typehints.no_annotations
裝飾器。 - 使用上述裝飾器或內聯方法宣告類型提示。這些優先於註釋。
- 在建立管線之前呼叫
beam.typehints.disable_type_annotations()
。這會阻止 Beam 檢視所有函式上的註釋。
定義泛型類型
您可以使用類型提示註釋來定義泛型類型。以下程式碼指定一個輸入類型提示,該提示會判斷泛型類型 T
,以及一個輸出類型提示,該提示會判斷類型 Tuple[int, T]
。如果 MyTransform
的輸入類型為 str
,則 Beam 會推斷輸出類型為 Tuple[int, str]
。
類型提示的種類
你可以對任何類別使用型別提示,包括 Python 的原始型別、容器類別和使用者定義的類別。所有類別,例如 int
、float
和使用者定義的類別,都可以用來定義型別提示,稱為簡單型別提示。容器型別,例如列表、元組和可迭代物件,也可以用來定義型別提示,稱為參數化型別提示。最後,還有一些不對應任何具體 Python 類別的特殊型別,例如 Any
、Optional
和 Union
,也允許作為型別提示。
Beam 定義了自己的內部型別提示型別,這些型別仍然可用於向後相容。它也支援 Python 的 typing 模組型別,這些型別在內部會轉換為 Beam 內部型別。
對於新的程式碼,建議使用 typing 模組型別。
簡單類型提示
型別提示可以是任何類別,從 int
和 str
到使用者定義的類別。如果你的型別提示是一個類別,你可能需要為它定義一個編碼器 (coder)。
參數化類型提示
參數化型別提示對於提示類似容器的 Python 物件的型別很有用,例如 list
。這些型別提示進一步細化了容器物件中的元素。
參數化型別提示的參數可以是簡單型別、參數化型別或型別變數。作為型別變數的元素型別,例如 T
,會強制操作的輸入和輸出之間的關係(例如,List[T]
-> T
)。型別提示可以巢狀使用,讓你為複雜型別定義型別提示。例如,List[Tuple[int, int, str]]
。
為了避免與內建容器型別的命名空間衝突,第一個字母會大寫。
允許使用以下參數化型別提示
Tuple[T, U]
Tuple[T, ...]
List[T]
KV[T, U]
Dict[T, U]
Set[T]
FrozenSet[T]
Iterable[T]
Iterator[T]
Generator[T]
PCollection[T]
注意: Tuple[T, U]
型別提示是具有固定數量異質型別元素的元組,而 Tuple[T, ...]
型別提示是具有可變數量同質型別元素的元組。
特殊類型提示
以下是不對應於類別的特殊型別提示,而是對應於 PEP 484 中引入的特殊型別。
Any
Union[T, U, V]
Optional[T]
執行階段類型檢查
除了在管道建構時使用型別提示進行型別檢查之外,你還可以啟用執行階段型別檢查,以檢查實際元素在管道執行期間是否滿足宣告的型別約束。
例如,以下管道會發出錯誤型別的元素。根據執行器 (runner) 的實作方式,其執行可能會或可能不會在執行階段失敗。
但是,如果你啟用執行階段型別檢查,則保證程式碼會在執行階段失敗。要啟用執行階段型別檢查,請將管道選項 runtime_type_check
設定為 True
。
請注意,由於執行階段型別檢查是對每個 PCollection
元素進行的,因此啟用此功能可能會導致顯著的效能損失。因此,建議在生產管道中停用執行階段型別檢查。有關更快、適用於生產環境的替代方案,請參閱下一節。
更快的執行階段類型檢查
你可以透過將管道選項 performance_runtime_type_check
設定為 True
來啟用更快的、基於取樣的執行階段型別檢查。
這是一個僅限於 Python 3 的功能,它透過使用最佳化的 Cython 程式碼對一小部分值(稱為樣本)進行執行階段型別檢查來運作。
目前,此功能不支援對側邊輸入或組合操作進行執行階段型別檢查。這些計劃在 Beam 的未來版本中支援。
編碼器中類型提示的使用
當你的管道讀取、寫入或以其他方式實例化其資料時,你的 PCollection
中的元素需要被編碼和解碼為位元組字串。位元組字串用於中間儲存、比較 GroupByKey
操作中的鍵,以及從來源讀取和寫入到接收器。
Python 的 Beam SDK 使用 Python 原生支援序列化未知型別的物件,這個過程稱為封裝 (pickling)。但是,使用 PickleCoder
有幾個缺點:它在時間和空間上效率較低,並且使用的編碼不確定,這會阻礙分散式分割、分組和狀態查找。
為了避免這些缺點,你可以定義 Coder
類別,以便以更有效率的方式編碼和解碼型別。你可以指定一個 Coder
來描述如何編碼和解碼給定 PCollection
的元素。
為了正確且有效率,一個 Coder
需要型別資訊,並且 PCollection
要與特定型別相關聯。型別提示使這些型別資訊可用。Python 的 Beam SDK 為標準 Python 型別(例如 int
、float
、str
、bytes
和 unicode
)提供內建編碼器。
確定性編碼器
如果你不定義 Coder
,則預設是一個編碼器,它會針對未知型別回退到封裝。在某些情況下,你必須指定一個確定性的 Coder
,否則你會收到執行階段錯誤。
例如,假設你有一個鍵值對的 PCollection
,其鍵是 Player
物件。如果你對這樣的集合應用 GroupByKey
轉換,當使用非確定性編碼器(例如預設的封裝編碼器)時,其鍵物件在不同的機器上可能會以不同的方式序列化。由於 GroupByKey
使用此序列化表示來比較鍵,這可能會導致不正確的行為。為了確保元素始終以相同的方式編碼和解碼,你需要為 Player
類別定義一個確定性的 Coder
。
以下程式碼顯示了 Player
類別的範例以及如何為其定義 Coder
。當你使用型別提示時,Beam 會使用 beam.coders.registry
推斷要使用的 Coders
。以下程式碼將 PlayerCoder
註冊為 Player
類別的編碼器。在此範例中,為 CombinePerKey
宣告的輸入型別為 Tuple[Player, int]
。在這種情況下,Beam 推斷要使用的 Coder
物件是 TupleCoder
、PlayerCoder
和 IntCoder
。
from typing import Tuple
class Player(object):
def __init__(self, team, name):
self.team = team
self.name = name
class PlayerCoder(beam.coders.Coder):
def encode(self, player):
return ('%s:%s' % (player.team, player.name)).encode('utf-8')
def decode(self, s):
return Player(*s.decode('utf-8').split(':'))
def is_deterministic(self):
return True
beam.coders.registry.register_coder(Player, PlayerCoder)
def parse_player_and_score(csv):
name, team, score = csv.split(',')
return Player(team, name), int(score)
totals = (
lines
| beam.Map(parse_player_and_score)
| beam.CombinePerKey(sum).with_input_types(Tuple[Player, int]))