部落格
2019/06/04
為 Beam SQL CLI 新增資料來源
Apache Beam 新推出一項令人興奮的功能,那就是能在你的管線中使用 SQL。這是透過 Java 管線中的 Beam SqlTransform
來完成的。
Beam 也有一個很棒的全新 SQL 命令列,你可以用它來互動式地查詢你的資料,不論是批次還是串流。如果你還沒試過,請查看 https://bit.ly/ExploreBeamSQL。
SQL CLI 的一個好處是,你可以使用 CREATE EXTERNAL TABLE
命令來新增要在 CLI 中存取的資料來源。目前,CLI 支援從 BigQuery、PubSub、Kafka 和文字檔案建立表格。在這篇文章中,我們將探討如何新增資料來源,讓你可以從其他 Beam 來源取用資料。
我們將在這篇文章中實作的表格提供者將會產生一個連續的無界限整數串流。它將基於 Beam SDK 中的 GenerateSequence
PTransform。最後,我們將能夠像這樣在 SQL 中定義和使用序列產生器
CREATE EXTERNAL TABLE -- all tables in Beam are external, they are not persisted
sequenceTable -- table alias that will be used in queries
(
sequence BIGINT, -- sequence number
event_timestamp TIMESTAMP -- timestamp of the generated event
)
TYPE sequence -- type identifies the table provider
TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events are generated
而且我們將能夠像這樣在查詢中使用它
SELECT sequence FROM sequenceTable;
讓我們開始吧!
實作 TableProvider
Beam 的 SqlTransform
透過依賴 TableProvider
s 來運作,當有人使用 CREATE EXTERNAL TABLE
語句時,它就會使用這些 TableProvider
s。如果你想要為 Beam SQL CLI 新增資料來源,那麼你會需要新增一個 TableProvider
來完成這件事。在這篇文章中,我將展示為 Java SDK 中提供的 GenerateSequence
轉換建立新表格提供者所需的步驟。
TableProvider
類別位於 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/
下。如果你在那裡查看,你可以找到所有可用資料來源的提供者及其實作。因此,你只需要新增你想要的提供者,以及 BaseBeamTable
的實作。
GenerateSequenceTableProvider
我們的表格提供者如下所示
它所做的只是為表格指定類型 - 而且它實作了 buildBeamSqlTable
方法,該方法只會傳回由我們的 GenerateSequenceTable
實作定義的 BeamSqlTable
。
GenerateSequenceTable
我們想要一個能正確支援串流的表格實作,因此我們會允許使用者定義每秒要發出的元素數量。我們將定義一個簡單的表格,它會以串流方式發出連續的整數。如下所示
class GenerateSequenceTable extends BaseBeamTable implements Serializable {
public static final Schema TABLE_SCHEMA =
Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", FieldType.DATETIME));
Integer elementsPerSecond = 5;
GenerateSequenceTable(Table table) {
super(TABLE_SCHEMA);
if (table.getProperties().containsKey("elementsPerSecond")) {
elementsPerSecond = table.getProperties().getInteger("elementsPerSecond");
}
}
@Override
public PCollection.IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
return begin
.apply(GenerateSequence.from(0).withRate(elementsPerSecond, Duration.standardSeconds(1)))
.apply(
MapElements.into(TypeDescriptor.of(Row.class))
.via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, Instant.now()).build()))
.setRowSchema(getSchema());
}
@Override
public POutput buildIOWriter(PCollection<Row> input) {
throw new UnsupportedOperationException("buildIOWriter unsupported!");
}
}
真正有趣的來了
現在我們已經實作了兩個基本類別 (BaseBeamTable
和 TableProvider
),我們可以開始使用它們了。在建置 SQL CLI 之後,我們現在可以在表格上執行選取
0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
. . . . . > sequence BIGINT COMMENT 'this is the primary key',
. . . . . > event_time TIMESTAMP COMMENT 'this is the element timestamp'
. . . . . > )
. . . . . > TYPE 'sequence';
No rows affected (0.005 seconds)
讓我們選取幾列
0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
+---------------------+------------+
| sequence | event_time |
+---------------------+------------+
| 0 | 2019-05-21 00:36:33 |
| 1 | 2019-05-21 00:36:33 |
| 2 | 2019-05-21 00:36:33 |
| 3 | 2019-05-21 00:36:33 |
| 4 | 2019-05-21 00:36:33 |
+---------------------+------------+
5 rows selected (1.138 seconds)
現在讓我們嘗試一些更有趣的事情。例如分組。這也會讓我們確保我們為每一列正確地提供時間戳記
0: BeamSQL> SELECT
. . . . . > COUNT(sequence) as elements,
. . . . . > TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
. . . . . > FROM input_seq
. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
+---------------------+--------------+
| elements | window_start |
+---------------------+--------------+
| 6 | 2019-06-05 00:39:24 |
| 10 | 2019-06-05 00:39:26 |
| 10 | 2019-06-05 00:39:28 |
| 10 | 2019-06-05 00:39:30 |
| 10 | 2019-06-05 00:39:32 |
+---------------------+--------------+
5 rows selected (10.142 seconds)
瞧!我們可以開始使用一些有趣的串流查詢到我們的序列產生器。