使用時間戳記

將時間戳記指派給集合的所有元素。

範例

在以下範例中,我們建立一個具有 PCollection 的管線,並將時間戳記值附加到其每個元素。當視窗化和延遲資料在串流管線中扮演重要角色時,時間戳記特別有用。

範例 1:依事件時間戳記

元素本身通常已經包含時間戳記欄位。beam.window.TimestampedValue 接受一個值和以秒為單位的 Unix 時間戳記

若要從 time.struct_time 轉換為 unix_time,您可以使用 time.mktime。如需時間格式選項的詳細資訊,請參閱 time.strftime

import time

time_tuple = time.strptime('2020-03-19 20:50:00', '%Y-%m-%d %H:%M:%S')
unix_time = time.mktime(time_tuple)

若要從 datetime.datetime 轉換為 unix_time,您可以先使用 datetime.timetuple 將其轉換為 time.struct_time

import time
import datetime

now = datetime.datetime.now()
time_tuple = now.timetuple()
unix_time = time.mktime(time_tuple)

範例 2:依邏輯時鐘戳記

如果每個元素都有一個時間順序編號,則這些編號可以用作邏輯時鐘。這些數字必須轉換為「秒」等效值,這對於您的視窗化和延遲資料規則來說尤其重要。

範例 3:依處理時間戳記

如果元素沒有任何可用的時間資料,您也可以使用每個元素的目前處理時間。請注意,這會取得正在處理每個元素的工作人員的本機時間。工作人員可能有時間差異,因此使用這種方法不是進行精確排序的可靠方法。

透過使用處理時間,無法知道資料是否延遲到達,因為時間戳記是在元素進入管線時附加的。