Beam SQL Shell

概觀

從 2.6.0 版本開始,Beam SQL 包含一個互動式 shell,稱為 Beam SQL shell。 此 shell 允許您將管線撰寫為 SQL 查詢,而無需 Java SDK。 預設情況下,Beam 使用 DirectRunner 來執行查詢作為 Beam 管線。

此頁面描述如何使用 shell,但不會著重於 Beam SQL 的特定功能。 如需此頁面範例中使用之功能的更完整概觀,請參閱Beam SQL 文件中對應的章節。

快速入門

若要使用 Beam SQL shell,您必須先複製 Beam SDK 儲存庫。 然後,從儲存庫複製的根目錄,執行下列命令以執行 shell

./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist

./sdks/java/extensions/sql/shell/build/install/shell/bin/shell

執行命令後,SQL shell 會啟動,您可以輸入查詢

Welcome to Beam SQL 2.6.0-SNAPSHOT (based on sqlline version 1.4.0)
0: BeamSQL>

注意:如果您在執行 Gradle 命令之前未建置專案,則該命令將需要幾分鐘的時間,因為 Gradle 必須先建置所有相依性。

shell 會將查詢轉換為 Beam 管線,使用 DirectRunner 執行它們,並在管線完成時以表格形式傳回結果

0: BeamSQL> SELECT 'foo' AS NAME, 'bar' AS TYPE, 'num' AS NUMBER;
+------+------+--------+
| NAME | TYPE | NUMBER |
+------+------+--------+
| foo  | bar  | num    |
+------+------+--------+
1 row selected (0.826 seconds)

宣告資料表

在從來源讀取資料或將資料寫入目的地之前,您必須使用 CREATE EXTERNAL TABLE 陳述式宣告虛擬資料表。 例如,如果您在目前資料夾中有一個本機 CSV 檔案 "test-file.csv",您可以使用下列陳述式建立資料表

0: BeamSQL> CREATE EXTERNAL TABLE csv_file (field1 VARCHAR, field2 INTEGER) TYPE text LOCATION 'test-file.csv';

No rows affected (0.042 seconds)

CREATE EXTERNAL TABLE 陳述式會在 Beam SQL 中將 CSV 檔案註冊為資料表,並指定資料表的綱要。 此陳述式不會直接建立持續性實體資料表;它僅描述 Beam SQL 的來源/接收器,以便您可以在讀取資料和寫入資料的查詢中使用該資料表。

如需有關 CREATE EXTERNAL TABLE 語法和支援的資料表類型的詳細資訊,請參閱CREATE EXTERNAL TABLE 參考頁面

讀取和寫入資料

若要從您在上一節中宣告的本機 CSV 檔案讀取資料,請執行下列查詢

0: BeamSQL> SELECT field1 AS field FROM csv_file;
+--------+
| field  |
+--------+
| baz    |
| foo    |
| bar    |
| bar    |
| foo    |
+--------+

如需有關 SELECT 語法的詳細資訊,請參閱查詢語法頁面

若要將資料寫入 CSV 檔案,請使用 INSERT INTO … SELECT ... 陳述式

0: BeamSQL> INSERT INTO csv_file SELECT 'foo', 'bar';

讀取和寫入行為取決於資料表的類型。 例如

使用無界限來源進行開發

當您想要在開發期間檢查來自無界限來源的資料時,您必須在 SELECT 陳述式的結尾指定 LIMIT x 子句,以將輸出限制為 x 個記錄。 否則,管線將永遠不會完成。

0: BeamSQL> SELECT field1 FROM unbounded_source LIMIT 10 ;

到目前為止顯示的範例查詢是快速查詢,會在本地執行。 當您在調查資料並以反覆方式設計管線時,這些查詢很有幫助。 理想情況下,您希望查詢快速完成並在完成時傳回輸出。

當您對 SQL 陳述式的邏輯感到滿意時,您可以透過捨棄 LIMIT x 陳述式來將陳述式提交為長時間執行的作業。 然後,如果其中一個資料表代表無界限來源,則管線可能會永遠執行。

指定執行器

預設情況下,Beam 使用 DirectRunner 在您執行命令的機器上執行管線。 如果您想要使用不同的執行器執行管線,您必須執行兩個步驟

  1. 請確保 SQL shell 包含所需的執行器。 將對應的專案 ID 新增至 Gradle 調用的 -Pbeam.sql.shell.bundled 參數 (原始碼專案 ID)。 例如,使用下列命令來包含 Flink 執行器和 KafkaIO

    ./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
    

    注意:您可以採用相同的方式將多個執行器 (使用逗號分隔清單) 或其他額外元件綑綁在一起。 例如,您可以新增對更多 I/O 的支援。

  2. 然後,使用 SET 命令指定執行器 (參考頁面)

    0: BeamSQL> SET runner='FlinkRunner';
    

Beam 會將所有未來的 INSERT 陳述式作為管線提交給指定的執行器。 在此情況下,Beam SQL shell 不會顯示查詢結果。 您必須透過對應執行器的 UI (例如,使用 Flink UI 或命令列) 管理提交的作業。

指定 PipelineOptions

若要設定執行器,您必須使用 SET 命令指定 PipelineOptions (詳細資訊)

0: BeamSQL> SET projectId='gcpProjectId';
0: BeamSQL> SET tempLocation='/tmp/tempDir';

封裝 SQL Shell

您也可以使用 distZipdistTar 工作為 SQL shell 建置自己的獨立封裝。 例如

./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' distZip

ls ./sdks/java/extensions/sql/shell/build/distributions/
beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip