Beam SQL 逐步解說
本頁說明如何使用 Beam SQL,並提供範例程式碼。
Beam Schemas 和 Rows
SQL 查詢只能應用於已註冊結構描述的 PCollection<T>
,或 PCollection<Row>
。請參閱 Beam 程式設計指南中的結構描述文件,以了解如何為類型 T
註冊結構描述的詳細資訊。
如果您沒有現有的類型 T
,可以透過多種方式取得 PCollection<Row>
,例如
來自記憶體中的資料(通常用於單元測試)。
注意:您必須明確指定
Row
編碼器。在此範例中,我們透過呼叫Create.of(..)
來完成。// Define the schema for the records. Schema appSchema = Schema .builder() .addInt32Field("appId") .addStringField("description") .addDateTimeField("rowtime") .build(); // Create a concrete row with that type. Row row = Row .withSchema(appSchema) .addValues(1, "Some cool app", new Date()) .build(); // Create a source PCollection containing only that row PCollection<Row> testApps = PBegin .in(p) .apply(Create .of(row) .withCoder(RowCoder.of(appSchema)));
來自其他類型的記錄
PCollection<T>
(即T
不是Row
),方法是應用ParDo
將輸入記錄轉換為Row
格式// An example POJO class. class AppPojo { Integer appId; String description; Date timestamp; } // Acquire a collection of POJOs somehow. PCollection<AppPojo> pojos = ... // Convert them to Rows with the same schema as defined above via a DoFn. PCollection<Row> apps = pojos .apply( ParDo.of(new DoFn<AppPojo, Row>() { @ProcessElement public void processElement(ProcessContext c) { // Get the current POJO instance AppPojo pojo = c.element(); // Create a Row with the appSchema schema // and values from the current POJO Row appRow = Row .withSchema(appSchema) .addValues( pojo.appId, pojo.description, pojo.timestamp) .build(); // Output the Row representing the current POJO c.output(appRow); } })).setRowSchema(appSchema);
作為另一個
SqlTransform
的輸出。詳細資訊請參閱下一節。
一旦您手上有 PCollection<Row>
,您可以使用 SqlTransform
將 SQL 查詢應用於它。
SqlTransform
SqlTransform.query(queryString)
方法是從 SQL 查詢的字串表示法建立 PTransform
的唯一 API。您可以將此 PTransform
應用於單個 PCollection
或保存多個 PCollection
的 PCollectionTuple
當應用於單個
PCollection
時,可以在查詢中透過資料表名稱PCOLLECTION
引用它當應用於
PCollectionTuple
時,tuple 中每個PCollection
的 tuple 標籤定義可用於查詢它的資料表名稱。請注意,資料表名稱會繫結至特定的PCollectionTuple
,因此僅在應用於它的查詢的內容中有效。例如,您可以聯結兩個
PCollection
// Create the schema for reviews Schema reviewSchema = Schema .builder() .addInt32Field("appId") .addInt32Field("reviewerId") .addFloatField("rating") .addDateTimeField("rowtime") .build(); // Obtain the reviews records with this schema PCollection<Row> reviewsRows = ... // Create a PCollectionTuple containing both PCollections. // TupleTags IDs will be used as table names in the SQL query PCollectionTuple namesAndFoods = PCollectionTuple .of(new TupleTag<>("Apps"), appsRows) // appsRows from the previous example .and(new TupleTag<>("Reviews"), reviewsRows); // Compute the total number of reviews // and average rating per app // by joining two PCollections PCollection<Row> output = namesAndFoods.apply( SqlTransform.query( "SELECT Apps.appId, COUNT(Reviews.rating), AVG(Reviews.rating) " + "FROM Apps INNER JOIN Reviews ON Apps.appId = Reviews.appId " + "GROUP BY Apps.appId"));
程式碼儲存庫中的 BeamSqlExample 顯示了兩個 API 的基本用法。