PTransform 風格指南

新的可重複使用 PTransform 的撰寫者風格指南。

與程式語言無關的考量

一致性

與先前的技術保持一致

公開 PTransform 與其他

所以您想開發一個人們將在其 Beam 管線中使用的函式庫 - 到第三方系統的連接器、機器學習演算法等。您應該如何公開它?

應該

不應該

命名

應該

不應該

設定

哪些內容應放入設定,哪些應放入輸入集合

要公開哪些參數

應該

不應該

錯誤處理

轉換設定錯誤

儘早偵測錯誤。可以在以下階段偵測到錯誤

例如

執行階段錯誤和資料一致性

優先考慮資料一致性。不要掩蓋資料遺失或損毀。如果無法避免資料遺失,請失敗。

應該

不應該

效能

許多執行器會以提高效能的方式優化 ParDo 鏈,如果每個輸入元素發出的元素數量少到中等,或者每個元素的處理成本相對較低(例如 Dataflow 的「融合」),但如果違反這些假設,則會限制平行化。在這種情況下,您可能需要一個「融合中斷」(Reshuffle.of()) 來提高處理 ParDo 輸出 PCollection 的平行性。

文件

記錄如何配置轉換(提供程式碼範例),以及它對輸入的期望或對輸出的保證,並考量 Beam 模型。例如:

日誌記錄

預測轉換使用者可能遇到的異常情況。記錄他們會發現足以進行除錯的資訊,但限制記錄的量。以下是一些適用於所有程式的建議,但在資料量龐大且執行分散式時尤其重要。

應該

不應該

測試

資料處理是棘手的,充滿了邊緣情況,並且難以除錯,因為管道需要很長時間才能運行,很難檢查輸出是否正確,您無法附加除錯器,而且由於資料量很大,您通常無法像希望的那樣記錄許多內容。因此,測試特別重要。

測試轉換的執行階段行為

測試轉換的建構和驗證

建構和驗證轉換的程式碼通常很簡單,而且大多是樣板程式碼。但是,其中的小錯誤或拼寫錯誤可能會導致嚴重的後果(例如,忽略使用者設定的屬性),因此也需要對其進行測試。然而,過多的瑣碎測試可能難以維護,並且會給人一種錯誤的印象,認為轉換已經過充分測試。

應該

不應該

相容性

應該

不應該

Java 特定考量

以下大多數實踐的好例子是 JdbcIOMongoDbIO

API

選擇輸入和輸出 PCollection 的類型

盡可能使用特定於轉換性質的類型。如果需要,人們可以使用轉換 DoFn 從他們自己的類型進行包裝。例如,Datastore 連接器應使用 Datastore Entity 類型,MongoDb 連接器應使用 Mongo Document 類型,而不是 JSON 的字串表示形式。

有時候這是不可能的(例如,JDBC 沒有提供與 Beam 相容(可以使用 Coder 編碼)的「JDBC 記錄」資料類型) - 那麼就讓使用者提供一個函數,用於在轉換特定的類型和與 Beam 相容的類型之間進行轉換(例如,請參閱 JdbcIOMongoDbGridFSIO)。

當轉換在邏輯上應該回傳一個尚未存在對應 Java 類別的複合類型時,請建立一個新的 POJO 類別,並使用明確命名的欄位。請勿使用泛型元組類別或 KV(除非這些欄位確實是鍵和值)。

具有多個輸出集合的轉換

如果轉換需要回傳多個集合,它應該是一個 PTransform<..., PCollectionTuple>,並且為每個集合公開 getBlahTag() 方法。

例如,如果您想回傳一個 PCollection<Foo> 和一個 PCollection<Bar>,請公開 TupleTag<Foo> getFooTag()TupleTag<Bar> getBarTag()

例如

public class MyTransform extends PTransform<..., PCollectionTuple> {
  private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
  private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
  ...
  PCollectionTuple expand(... input) {
    ...
    PCollection<Moo> moo = ...;
    PCollection<Blah> blah = ...;
    return PCollectionTuple.of(mooTag, moo)
                           .and(blahTag, blah);
  }

  public TupleTag<Moo> getMooTag() {
    return mooTag;
  }

  public TupleTag<Blah> getBlahTag() {
    return blahTag;
  }
  ...
}

用於設定的 Fluent 建構器

讓轉換類別成為不可變的,並具有產生修改後的不可變物件的方法。使用 AutoValue。AutoValue 可以提供一個 Builder 輔助類別。使用 @Nullable 標記沒有預設值或預設值為 null 的類別類型參數,除了基本類型(例如 int)之外。

@AutoValue
public abstract static class MyTransform extends PTransform<...> {
  int getMoo();
  @Nullable abstract String getBlah();

  abstract Builder toBuilder();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMoo(int moo);
    abstract Builder setBlah(String blah);

    abstract MyTransform build();
  }
  ...
}
工廠方法

提供一個無參數的靜態工廠方法,可以放在封閉類別中(請參閱「封裝一系列轉換」),或者在轉換類別本身中。

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }

  public abstract static class Twiddle extends PTransform<...> { ... }
}

// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
  public static TwiddleThumbs create() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }
  ...
}

例外情況:當轉換具有單一最重要參數時,請呼叫工廠方法 of,並將該參數放入工廠方法的引數中:ParDo.of(DoFn).withAllowedLateness()

用於設定參數的 Fluent Builder 方法

將它們命名為 withBlah()。所有 Builder 方法都必須回傳完全相同的類型;如果它是參數化(泛型)類型,則具有相同的類型參數值。

withBlah() 方法視為一組無序的關鍵字引數 - 結果不應取決於您呼叫 withFoo()withBar() 的順序(例如,withBar() 不得讀取 foo 的目前值)。

記錄每個 withBlah 方法的含義:何時使用此方法,允許哪些值,預設值是什麼,以及更改值的含義。

/**
 * Returns a new {@link TwiddleThumbs} transform with moo set
 * to the given value.
 *
 * <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
 *
 * <p>Higher values generally improve throughput, but increase chance
 * of spontaneous combustion.
 */
public Twiddle withMoo(int moo) {
  checkArgument(moo >= 0 && moo < 100,
      "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
      + "Valid values are 0 (inclusive) to 100 (exclusive)",
      moo);
  return toBuilder().setMoo(moo).build();
}
參數的預設值

在工廠方法中指定它們(工廠方法回傳具有預設值的物件)。

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().setMoo(42).build();
  }
  ...
}
將多個參數封裝到可重複使用的物件中

如果轉換的幾個參數在邏輯上非常緊密地耦合在一起,有時將它們封裝到容器物件中是有意義的。對此容器物件使用相同的準則(使其不可變,使用具有 builder 的 AutoValue,記錄 withBlah() 方法等)。例如,請參閱 JdbcIO.DataSourceConfiguration

具有類型參數的轉換

所有類型參數都應在工廠方法上明確指定。Builder 方法(withBlah())不應更改類型。

public class Thumbs {
  public static Twiddle<T> twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
  }

  @AutoValue
  public abstract static class Twiddle<T>
       extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
    
    @Nullable abstract Bar<T> getBar();

    abstract Builder<T> toBuilder();

    @AutoValue.Builder
    abstract static class Builder<T> {
      
      abstract Builder<T> setBar(Bar<T> bar);

      abstract Twiddle<T> build();
    }
    
  }
}

// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle()  );

例外情況:當轉換具有單一最重要參數,且此參數取決於類型 T 時,最好將其直接放入工廠方法中:例如 Combine.globally(SerializableFunction<Iterable<V>,V>)。這可以改善 Java 的類型推斷,並讓使用者不必明確指定類型參數。

當轉換有多個類型參數時,或者如果參數的含義不明顯,請將類型參數命名為類似 SomethingT 的名稱,例如:一個實作分類器演算法並為每個輸入元素分配標籤的 PTransform 可以使用類型 Classify<InputT, LabelT>

注入使用者指定的行為

如果轉換具有需要使用者程式碼自訂的行為方面,請按照以下方式做出決定

應該

不應該

打包轉換系列

在開發一系列高度相關的轉換(例如,以不同的方式與同一系統互動,或提供同一高階任務的不同實作)時,請使用頂層類別作為命名空間,並使用多個工廠方法回傳對應於每個個別用例的轉換。

容器類別必須具有私有建構函式,因此不能直接實例化。

FooIO 層級記錄通用內容,並單獨記錄每個工廠方法。

/** Transforms for clustering data. */
public class Cluster {
  // Force use of static factory methods.
  private Cluster() {}

  /** Returns a new {@link UsingKMeans} transform. */
  public static UsingKMeans usingKMeans() { ... }
  public static Hierarchically hierarchically() { ... }

  /** Clusters data using the K-Means algorithm. */
  public static class UsingKMeans extends PTransform<...> { ... }
  public static class Hierarchically extends PTransform<...> { ... }
}

public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static Read read() { ... }
  ...

  public static class Read extends PTransform<...> { ... }
  public static class Write extends PTransform<...> { ... }
  public static class Delete extends PTransform<...> { ... }
  public static class Mutate extends PTransform<...> { ... }
}

當支援具有不相容 API 的多個版本時,也請將版本作為類似命名空間的類別,並將不同 API 版本的實作放在不同的檔案中。

// FooIO.java
public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static FooV1 v1() { return new FooV1(); }
  public static FooV2 v2() { return new FooV2(); }
}

// FooV1.java
public class FooV1 {
  // Force use of static factory methods outside the package.
  FooV1() {}
  public static Read read() { ... }
  public static class Read extends PTransform<...> { ... }
}

// FooV2.java
public static class FooV2 {
  // Force use of static factory methods outside the package.
  FooV2() {}
  public static Read read() { ... }

  public static class Read extends PTransform<...> { ... }
}

行為

不可變性

序列化

DoFnPTransformCombineFn 和其他實例將被序列化。將序列化的資料量保持在最低限度:將您不想序列化的欄位標記為 transient。盡可能使類別成為 static(以便實例不會捕獲並序列化封閉類別實例)。注意:在某些情況下,這意味著您不能使用匿名類別。

驗證

@AutoValue
public abstract class TwiddleThumbs
    extends PTransform<PCollection<Foo>, PCollection<Bar>> {
  abstract int getMoo();
  abstract String getBoo();

  ...
  // Validating individual parameters
  public TwiddleThumbs withMoo(int moo) {
    checkArgument(
        moo >= 0 && moo < 100,
        "Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
        moo);
    return toBuilder().setMoo(moo).build();
  }

  public TwiddleThumbs withBoo(String boo) {
    checkArgument(boo != null, "Boo can not be null");
    checkArgument(!boo.isEmpty(), "Boo can not be empty");
    return toBuilder().setBoo(boo).build();
  }

  @Override
  public void validate(PipelineOptions options) {
    int woo = options.as(TwiddleThumbsOptions.class).getWoo();
    checkArgument(
       woo > getMoo(),
      "Woo (%s) must be smaller than moo (%s)",
      woo, getMoo());
  }

  @Override
  public PCollection<Bar> expand(PCollection<Foo> input) {
    // Validating that a required parameter is present
    checkArgument(getBoo() != null, "Must specify boo");

    // Validating a combination of parameters
    checkArgument(
        getMoo() == 0 || getBoo() == null,
        "Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
        getMoo(), getBoo());

    ...
  }
}

編碼器

Coder 是 Beam 執行器在必要時具現化中繼資料或在工作者之間傳輸資料的一種方式。Coder 不應作為解析或寫入二進位格式的通用 API 使用,因為 Coder 的特定二進位編碼旨在成為其私有實作細節。

為類型提供預設編碼器

為所有新的資料類型提供預設 Coder。使用 @DefaultCoder 註釋或使用 @AutoService 註釋的 CoderProviderRegistrar 類別:有關範例,請參閱 SDK 中這些類別的用法。如果效能不重要,您可以使用 SerializableCoderAvroCoder。否則,請開發有效率的自訂編碼器(具體類型使用子類別 AtomicCoder,泛型類型使用子類別 StructuredCoder)。

在輸出集合上設定編碼器

您的 PTransform 建立的所有 PCollection(包括輸出和中繼集合)都必須在其上設定 Coder:使用者永遠不需要呼叫 .setCoder() 來「修復」您的 PTransform 產生的 PCollection 上的編碼器(事實上,Beam 打算最終棄用 setCoder)。在某些情況下,編碼器推斷足以實現此目的;在其他情況下,您的轉換需要明確地在其集合上呼叫 setCoder

如果集合是具體類型,則該類型通常具有對應的編碼器。使用特定的最有效編碼器(例如,字串使用 StringUtf8Coder.of(),位元組陣列使用 ByteArrayCoder.of() 等),而不是通用的編碼器,如 SerializableCoder

如果集合的類型涉及泛型類型變數,情況會更複雜