Beam SQL Shell
概觀
從 2.6.0 版本開始,Beam SQL 包含一個互動式 shell,稱為 Beam SQL shell。 此 shell 允許您將管線撰寫為 SQL 查詢,而無需 Java SDK。 預設情況下,Beam 使用 DirectRunner
來執行查詢作為 Beam 管線。
此頁面描述如何使用 shell,但不會著重於 Beam SQL 的特定功能。 如需此頁面範例中使用之功能的更完整概觀,請參閱Beam SQL 文件中對應的章節。
快速入門
若要使用 Beam SQL shell,您必須先複製 Beam SDK 儲存庫。 然後,從儲存庫複製的根目錄,執行下列命令以執行 shell
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
./sdks/java/extensions/sql/shell/build/install/shell/bin/shell
執行命令後,SQL shell 會啟動,您可以輸入查詢
Welcome to Beam SQL 2.6.0-SNAPSHOT (based on sqlline version 1.4.0)
0: BeamSQL>
注意:如果您在執行 Gradle 命令之前未建置專案,則該命令將需要幾分鐘的時間,因為 Gradle 必須先建置所有相依性。
shell 會將查詢轉換為 Beam 管線,使用 DirectRunner
執行它們,並在管線完成時以表格形式傳回結果
0: BeamSQL> SELECT 'foo' AS NAME, 'bar' AS TYPE, 'num' AS NUMBER;
+------+------+--------+
| NAME | TYPE | NUMBER |
+------+------+--------+
| foo | bar | num |
+------+------+--------+
1 row selected (0.826 seconds)
宣告資料表
在從來源讀取資料或將資料寫入目的地之前,您必須使用 CREATE EXTERNAL TABLE
陳述式宣告虛擬資料表。 例如,如果您在目前資料夾中有一個本機 CSV 檔案 "test-file.csv"
,您可以使用下列陳述式建立資料表
0: BeamSQL> CREATE EXTERNAL TABLE csv_file (field1 VARCHAR, field2 INTEGER) TYPE text LOCATION 'test-file.csv';
No rows affected (0.042 seconds)
CREATE EXTERNAL TABLE
陳述式會在 Beam SQL 中將 CSV 檔案註冊為資料表,並指定資料表的綱要。 此陳述式不會直接建立持續性實體資料表;它僅描述 Beam SQL 的來源/接收器,以便您可以在讀取資料和寫入資料的查詢中使用該資料表。
如需有關 CREATE EXTERNAL TABLE
語法和支援的資料表類型的詳細資訊,請參閱CREATE EXTERNAL TABLE 參考頁面。
讀取和寫入資料
若要從您在上一節中宣告的本機 CSV 檔案讀取資料,請執行下列查詢
0: BeamSQL> SELECT field1 AS field FROM csv_file;
+--------+
| field |
+--------+
| baz |
| foo |
| bar |
| bar |
| foo |
+--------+
如需有關 SELECT
語法的詳細資訊,請參閱查詢語法頁面。
若要將資料寫入 CSV 檔案,請使用 INSERT INTO … SELECT ...
陳述式
0: BeamSQL> INSERT INTO csv_file SELECT 'foo', 'bar';
讀取和寫入行為取決於資料表的類型。 例如
- 資料表類型
text
是使用TextIO
實作的,因此寫入text
資料表可能會產生多個編號的檔案。 - 資料表類型
pubsub
是無界限來源,因此從pubsub
資料表讀取永遠不會完成。
使用無界限來源進行開發
當您想要在開發期間檢查來自無界限來源的資料時,您必須在 SELECT
陳述式的結尾指定 LIMIT x
子句,以將輸出限制為 x
個記錄。 否則,管線將永遠不會完成。
0: BeamSQL> SELECT field1 FROM unbounded_source LIMIT 10 ;
到目前為止顯示的範例查詢是快速查詢,會在本地執行。 當您在調查資料並以反覆方式設計管線時,這些查詢很有幫助。 理想情況下,您希望查詢快速完成並在完成時傳回輸出。
當您對 SQL 陳述式的邏輯感到滿意時,您可以透過捨棄 LIMIT x
陳述式來將陳述式提交為長時間執行的作業。 然後,如果其中一個資料表代表無界限來源,則管線可能會永遠執行。
指定執行器
預設情況下,Beam 使用 DirectRunner
在您執行命令的機器上執行管線。 如果您想要使用不同的執行器執行管線,您必須執行兩個步驟
請確保 SQL shell 包含所需的執行器。 將對應的專案 ID 新增至 Gradle 調用的
-Pbeam.sql.shell.bundled
參數 (原始碼、專案 ID)。 例如,使用下列命令來包含 Flink 執行器和 KafkaIO./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
注意:您可以採用相同的方式將多個執行器 (使用逗號分隔清單) 或其他額外元件綑綁在一起。 例如,您可以新增對更多 I/O 的支援。
然後,使用
SET
命令指定執行器 (參考頁面)0: BeamSQL> SET runner='FlinkRunner';
Beam 會將所有未來的 INSERT
陳述式作為管線提交給指定的執行器。 在此情況下,Beam SQL shell 不會顯示查詢結果。 您必須透過對應執行器的 UI (例如,使用 Flink UI 或命令列) 管理提交的作業。
指定 PipelineOptions
若要設定執行器,您必須使用 SET
命令指定 PipelineOptions
(詳細資訊)
0: BeamSQL> SET projectId='gcpProjectId';
0: BeamSQL> SET tempLocation='/tmp/tempDir';
封裝 SQL Shell
您也可以使用 distZip
或 distTar
工作為 SQL shell 建置自己的獨立封裝。 例如
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' distZip
ls ./sdks/java/extensions/sql/shell/build/distributions/
beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip