為 Beam SQL CLI 新增資料來源

Apache Beam 新推出一項令人興奮的功能,那就是能在你的管線中使用 SQL。這是透過 Java 管線中的 Beam SqlTransform 來完成的。

Beam 也有一個很棒的全新 SQL 命令列,你可以用它來互動式地查詢你的資料,不論是批次還是串流。如果你還沒試過,請查看 https://bit.ly/ExploreBeamSQL

SQL CLI 的一個好處是,你可以使用 CREATE EXTERNAL TABLE 命令來新增要在 CLI 中存取的資料來源。目前,CLI 支援從 BigQuery、PubSub、Kafka 和文字檔案建立表格。在這篇文章中,我們將探討如何新增資料來源,讓你可以從其他 Beam 來源取用資料。

我們將在這篇文章中實作的表格提供者將會產生一個連續的無界限整數串流。它將基於 Beam SDK 中的 GenerateSequence PTransform。最後,我們將能夠像這樣在 SQL 中定義和使用序列產生器

CREATE EXTERNAL TABLE                      -- all tables in Beam are external, they are not persisted
  sequenceTable                              -- table alias that will be used in queries
  (
         sequence BIGINT,                  -- sequence number
         event_timestamp TIMESTAMP         -- timestamp of the generated event
  )
TYPE sequence                              -- type identifies the table provider
TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events are generated

而且我們將能夠像這樣在查詢中使用它

SELECT sequence FROM sequenceTable;

讓我們開始吧!

實作 TableProvider

Beam 的 SqlTransform 透過依賴 TableProviders 來運作,當有人使用 CREATE EXTERNAL TABLE 語句時,它就會使用這些 TableProviders。如果你想要為 Beam SQL CLI 新增資料來源,那麼你會需要新增一個 TableProvider 來完成這件事。在這篇文章中,我將展示為 Java SDK 中提供的 GenerateSequence 轉換建立新表格提供者所需的步驟。

TableProvider 類別位於 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/ 下。如果你在那裡查看,你可以找到所有可用資料來源的提供者及其實作。因此,你只需要新增你想要的提供者,以及 BaseBeamTable 的實作。

GenerateSequenceTableProvider

我們的表格提供者如下所示

@AutoService(TableProvider.class)
public class GenerateSequenceTableProvider extends InMemoryMetaTableProvider {

  @Override
  public String getTableType() {
    return "sequence";
  }

  @Override
  public BeamSqlTable buildBeamSqlTable(Table table) {
    return new GenerateSequenceTable(table);
  }
}

它所做的只是為表格指定類型 - 而且它實作了 buildBeamSqlTable 方法,該方法只會傳回由我們的 GenerateSequenceTable 實作定義的 BeamSqlTable

GenerateSequenceTable

我們想要一個能正確支援串流的表格實作,因此我們會允許使用者定義每秒要發出的元素數量。我們將定義一個簡單的表格,它會以串流方式發出連續的整數。如下所示

class GenerateSequenceTable extends BaseBeamTable implements Serializable {
  public static final Schema TABLE_SCHEMA =
      Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", FieldType.DATETIME));

  Integer elementsPerSecond = 5;

  GenerateSequenceTable(Table table) {
    super(TABLE_SCHEMA);
    if (table.getProperties().containsKey("elementsPerSecond")) {
      elementsPerSecond = table.getProperties().getInteger("elementsPerSecond");
    }
  }

  @Override
  public PCollection.IsBounded isBounded() {
    return IsBounded.UNBOUNDED;
  }

  @Override
  public PCollection<Row> buildIOReader(PBegin begin) {
    return begin
        .apply(GenerateSequence.from(0).withRate(elementsPerSecond, Duration.standardSeconds(1)))
        .apply(
            MapElements.into(TypeDescriptor.of(Row.class))
                .via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, Instant.now()).build()))
        .setRowSchema(getSchema());
  }

  @Override
  public POutput buildIOWriter(PCollection<Row> input) {
    throw new UnsupportedOperationException("buildIOWriter unsupported!");
  }
}

真正有趣的來了

現在我們已經實作了兩個基本類別 (BaseBeamTableTableProvider),我們可以開始使用它們了。在建置 SQL CLI 之後,我們現在可以在表格上執行選取

0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
. . . . . >   sequence BIGINT COMMENT 'this is the primary key',
. . . . . >   event_time TIMESTAMP COMMENT 'this is the element timestamp'
. . . . . > )
. . . . . > TYPE 'sequence';
No rows affected (0.005 seconds)

讓我們選取幾列

0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
+---------------------+------------+
|      sequence       | event_time |
+---------------------+------------+
| 0                   | 2019-05-21 00:36:33 |
| 1                   | 2019-05-21 00:36:33 |
| 2                   | 2019-05-21 00:36:33 |
| 3                   | 2019-05-21 00:36:33 |
| 4                   | 2019-05-21 00:36:33 |
+---------------------+------------+
5 rows selected (1.138 seconds)

現在讓我們嘗試一些更有趣的事情。例如分組。這也會讓我們確保我們為每一列正確地提供時間戳記

0: BeamSQL> SELECT
. . . . . >   COUNT(sequence) as elements,
. . . . . >   TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
. . . . . > FROM input_seq
. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
+---------------------+--------------+
|      elements       | window_start |
+---------------------+--------------+
| 6                   | 2019-06-05 00:39:24 |
| 10                  | 2019-06-05 00:39:26 |
| 10                  | 2019-06-05 00:39:28 |
| 10                  | 2019-06-05 00:39:30 |
| 10                  | 2019-06-05 00:39:32 |
+---------------------+--------------+
5 rows selected (10.142 seconds)

瞧!我們可以開始使用一些有趣的串流查詢到我們的序列產生器。