檔案處理模式

此頁面說明常見的檔案處理任務。如需檔案型 I/O 的詳細資訊,請參閱管道 I/O檔案型輸入和輸出資料

處理檔案在抵達時

本節說明如何處理檔案,因為它們會抵達您的檔案系統或物件儲存區(例如 Google Cloud Storage)。您可以連續讀取檔案,或在檔案抵達時觸發串流和處理管道。

連續讀取模式

您可以使用 FileIOTextIO 來連續讀取來源以取得新檔案。

使用 FileIO 類別來持續監看單一檔案模式。下列範例每 30 秒重複比對一次檔案模式,持續將新比對的檔案以無邊界的 PCollection<Metadata> 傳回,如果一小時內沒有出現新檔案則停止

// This produces PCollection<MatchResult.Metadata>
p.apply(
    FileIO.match()
        .filepattern("...")
        .continuously(
            Duration.standardSeconds(30),
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

TextIO 類別的 watchForNewFiles 屬性會串流新的檔案比對。

// This produces PCollection<String>
p.apply(
    TextIO.read()
        .from("<path-to-files>/*")
        .watchForNewFiles(
            // Check for new files every minute.
            Duration.standardMinutes(1),
            // Stop watching the file pattern if no new files appear for an hour.
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

某些執行器可能會在更新期間保留檔案清單,但當您重新啟動管道時,檔案清單不會保留。您可以藉由下列方式儲存檔案清單

Python 無法使用連續讀取選項。

從外部來源觸發的串流處理

串流管道可以處理來自無邊界來源的資料。例如,若要使用 Google Cloud Pub/Sub 觸發串流處理

  1. 使用外部程序來偵測新檔案何時抵達。
  2. 傳送包含檔案 URI 的 Google Cloud Pub/Sub 訊息。
  3. 從接續在 Google Cloud Pub/Sub 來源之後的 DoFn 存取 URI。
  4. 處理檔案。

從外部來源觸發的批次處理

若要在檔案抵達時啟動或排程批次管道作業,請在來源檔案本身中寫入觸發事件。這具有最高的延遲,因為管道必須先初始化才能處理。它最適合用於低頻率、大型的檔案大小更新。

存取檔案名稱

使用 FileIO 類別在管道作業中讀取檔案名稱。FileIO 會傳回 PCollection<ReadableFile> 物件,而 ReadableFile 執行個體包含檔案名稱。

若要存取檔案名稱

  1. 使用 FileIO 建立 ReadableFile 執行個體。FileIO 會傳回 PCollection<ReadableFile> 物件。ReadableFile 類別包含檔案名稱。
  2. 呼叫 readFullyAsUTF8String() 方法將檔案讀入記憶體,並將檔案名稱傳回為 String 物件。如果記憶體有限,您可以使用公用程式類別,例如 FileSystems 來直接處理檔案。

若要在管道作業中讀取檔案名稱

  1. 收集檔案 URI 的清單。您可以使用 FileSystems 模組來取得符合 glob 模式的檔案清單。
  2. 將檔案 URI 傳遞給 PCollection

p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
    // The withCompression method is optional. By default, the Beam SDK detects compression from
    // the filename.
    .apply(FileIO.readMatches().withCompression(Compression.GZIP))
    .apply(
        ParDo.of(
            new DoFn<FileIO.ReadableFile, String>() {
              @ProcessElement
              public void process(@Element FileIO.ReadableFile file) {
                // We can now access the file and its metadata.
                LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
              }
            }));
with beam.Pipeline() as pipeline:
  readable_files = (
      pipeline
      | fileio.MatchFiles('hdfs://path/to/*.txt')
      | fileio.ReadMatches()
      | beam.Reshuffle())
  files_and_contents = (
      readable_files
      | beam.Map(lambda x: (x.metadata.path, x.read_utf8())))