結構描述模式
本頁的範例描述了使用結構描述的常見模式。結構描述為 Beam 記錄提供了一種類型系統,該系統獨立於任何特定的程式語言類型。可能有多個具有相同結構描述的 Java 類別(例如,Protocol-Buffer 類別或 POJO 類別),而 Beam 將允許我們在這些類型之間無縫轉換。結構描述還提供了一種簡單的方法來推理不同程式語言 API 中的類型。有關更多資訊,請參閱程式設計指南中關於結構描述的部分。
- Java SDK
使用聯結
Beam 支援在結構描述的 PCollections
上進行等值聯結,其中聯結條件取決於欄位子集的相等性。
如果您有多個提供關於相關事物的資訊的集合,且其結構已知,請考慮使用 Join
。
例如,假設我們有兩個不同的使用者資料集合:一個集合包含姓名和電子郵件地址;另一個集合包含姓名和電話號碼。我們可以將這兩個集合使用姓名作為共同鍵聯結,並將其他資料作為相關的值。聯結後,我們有一個包含與每個姓名關聯的所有資訊(電子郵件地址和電話號碼)的集合。
以下概念範例使用兩個輸入集合來顯示 Join
的機制。
首先,我們定義結構描述和使用者資料。
// Define Schemas
Schema emailSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("email", Schema.FieldType.STRING));
Schema phoneSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("phone", Schema.FieldType.STRING));
// Create User Data Collections
final List<Row> emailUsers =
Arrays.asList(
Row.withSchema(emailSchema).addValue("person1").addValue("person1@example.com").build(),
Row.withSchema(emailSchema).addValue("person2").addValue("person2@example.com").build(),
Row.withSchema(emailSchema).addValue("person3").addValue("person3@example.com").build(),
Row.withSchema(emailSchema).addValue("person4").addValue("person4@example.com").build(),
Row.withSchema(emailSchema)
.addValue("person6")
.addValue("person6@example.com")
.build());
final List<Row> phoneUsers =
Arrays.asList(
Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person5").addValue("777-333-4444").build());
然後,我們為使用者資料建立 Pcollections
,並使用 Join
在兩個 PCollections
上執行聯結。
// Create/Read Schema PCollections
PCollection<Row> emailList =
p.apply("CreateEmails", Create.of(emailUsers).withRowSchema(emailSchema));
PCollection<Row> phoneList =
p.apply("CreatePhones", Create.of(phoneUsers).withRowSchema(phoneSchema));
// Perform Join
PCollection<Row> resultRow =
emailList.apply("Apply Join", Join.<Row, Row>innerJoin(phoneList).using("name"));
// Preview Result
resultRow.apply(
"Preview Result",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(x);
return "";
}));
/* Sample Output From the pipeline:
Row:[Row:[person1, person1@example.com], Row:[person1, 111-222-3333]]
Row:[Row:[person2, person2@example.com], Row:[person2, 222-333-4444]]
Row:[Row:[person4, person4@example.com], Row:[person4, 555-333-4444]]
Row:[Row:[person3, person3@example.com], Row:[person3, 444-333-4444]]
*/
結果 Row
的類型為 Row: [Row(emailSchema), Row(phoneSchema)]
,並且可以轉換為所需的格式,如下面的程式碼片段所示。
PCollection<String> result =
resultRow.apply(
"Format Output",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
String userInfo =
"Name: "
+ x.getRow(0).getValue("name")
+ " Email: "
+ x.getRow(0).getValue("email")
+ " Phone: "
+ x.getRow(1).getValue("phone");
System.out.println(userInfo);
return userInfo;
}));
/* Sample output From the pipeline
Name: person4 Email: person4@example.com Phone: 555-333-4444
Name: person2 Email: person2@example.com Phone: 222-333-4444
Name: person3 Email: person3@example.com Phone: 444-333-4444
Name: person1 Email: person1@example.com Phone: 111-222-3333
*/
上次更新日期:2024/10/31
您是否找到了想要的所有內容?
這些內容是否都實用且清晰?您有任何想要變更的地方嗎?請告訴我們!