檔案處理模式
此頁面說明常見的檔案處理任務。如需檔案型 I/O 的詳細資訊,請參閱管道 I/O 和檔案型輸入和輸出資料。
- Java SDK
- Python SDK
處理檔案在抵達時
本節說明如何處理檔案,因為它們會抵達您的檔案系統或物件儲存區(例如 Google Cloud Storage)。您可以連續讀取檔案,或在檔案抵達時觸發串流和處理管道。
連續讀取模式
您可以使用 FileIO
或 TextIO
來連續讀取來源以取得新檔案。
使用 FileIO
類別來持續監看單一檔案模式。下列範例每 30 秒重複比對一次檔案模式,持續將新比對的檔案以無邊界的 PCollection<Metadata>
傳回,如果一小時內沒有出現新檔案則停止
TextIO
類別的 watchForNewFiles
屬性會串流新的檔案比對。
// This produces PCollection<String>
p.apply(
TextIO.read()
.from("<path-to-files>/*")
.watchForNewFiles(
// Check for new files every minute.
Duration.standardMinutes(1),
// Stop watching the file pattern if no new files appear for an hour.
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
某些執行器可能會在更新期間保留檔案清單,但當您重新啟動管道時,檔案清單不會保留。您可以藉由下列方式儲存檔案清單
- 將處理過的檔案名稱儲存在外部檔案中,並在下一個轉換時移除清單的重複項目
- 將時間戳記新增至檔案名稱,寫入 glob 模式以僅提取新檔案,並在管道重新啟動時比對模式
Python 無法使用連續讀取選項。
從外部來源觸發的串流處理
串流管道可以處理來自無邊界來源的資料。例如,若要使用 Google Cloud Pub/Sub 觸發串流處理
- 使用外部程序來偵測新檔案何時抵達。
- 傳送包含檔案 URI 的 Google Cloud Pub/Sub 訊息。
- 從接續在 Google Cloud Pub/Sub 來源之後的
DoFn
存取 URI。 - 處理檔案。
從外部來源觸發的批次處理
若要在檔案抵達時啟動或排程批次管道作業,請在來源檔案本身中寫入觸發事件。這具有最高的延遲,因為管道必須先初始化才能處理。它最適合用於低頻率、大型的檔案大小更新。
存取檔案名稱
使用 FileIO
類別在管道作業中讀取檔案名稱。FileIO
會傳回 PCollection<ReadableFile>
物件,而 ReadableFile
執行個體包含檔案名稱。
若要存取檔案名稱
- 使用
FileIO
建立ReadableFile
執行個體。FileIO
會傳回PCollection<ReadableFile>
物件。ReadableFile
類別包含檔案名稱。 - 呼叫
readFullyAsUTF8String()
方法將檔案讀入記憶體,並將檔案名稱傳回為String
物件。如果記憶體有限,您可以使用公用程式類別,例如FileSystems
來直接處理檔案。
若要在管道作業中讀取檔案名稱
- 收集檔案 URI 的清單。您可以使用
FileSystems
模組來取得符合 glob 模式的檔案清單。 - 將檔案 URI 傳遞給
PCollection
。
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// The withCompression method is optional. By default, the Beam SDK detects compression from
// the filename.
.apply(FileIO.readMatches().withCompression(Compression.GZIP))
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) {
// We can now access the file and its metadata.
LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
}
}));
上次更新時間:2024/10/31
您是否找到了您要尋找的所有內容?
是否一切都有用且清楚?您想要變更任何內容嗎?請告訴我們!