流計算
使用Spark的DataFrame方式訪問表格存儲,并在本地和集群上分別進行運行調(diào)試。
前提條件
了解Spark訪問表格存儲的依賴包,并在使用時通過Maven方式引入項目中。
Spark相關(guān):spark-core、spark-sql、spark-hive
Spark Tablestore connector:emr-tablestore-<version>.jar
Tablestore Java SDK:tablestore-<version>-jar-with-dependencies.jar
其中<version>表示相應(yīng)依賴包的版本號,請以實際為準(zhǔn)。
已在表格存儲側(cè)創(chuàng)建Source表和在Source表上創(chuàng)建通道。具體操作,請參見創(chuàng)建數(shù)據(jù)通道。
已獲取AccessKey(包括AccessKey ID和AccessKey Secret)。具體操作,請參見獲取AccessKey。
快速開始
通過項目樣例了解快速使用流計算的操作。
從GitHub下載項目樣例的源碼,具體下載路徑請參見TableStoreSparkDemo。
項目中包含完整的依賴和使用樣例,具體的依賴請參見項目中的pom文件。
閱讀TableStoreSparkDemo項目的README文檔,并安裝最新版的Spark Tablestore connector和Tablestore Java SDK到本地Maven庫。
修改Sample代碼。
以StructuredTableStoreAggSQLSample為例,對此示例代碼的核心代碼說明如下:
format("tablestore")表示使用ServiceLoader方式加載Spark Tablestore connector,具體配置請參見項目中的META-INF.services。
instanceName、tableName、tunnel.id、endpoint、accessKeyId、accessKeySecret分別表示表格存儲的實例名稱、數(shù)據(jù)表名稱、通道ID、實例endpoint、阿里云賬號的AccessKey ID和AccessKey Secret。
maxoffsetsperchannel表示每一個mini-batch中每一個channel(分區(qū))最多讀取的數(shù)據(jù)量,默認值為10000。
val ordersDF = sparkSession.readStream .format("tablestore") .option("instance.name", instanceName) .option("table.name", tableName) .option("tunnel.id", tunnelId) .option("endpoint", endpoint) .option("access.key.id", accessKeyId) .option("access.key.secret", accessKeySecret) .option("maxoffsetsperchannel", maxOffsetsPerChannel) //默認值為10000。 .option("catalog", dataCatalog) .load() .createTempView("order_source_stream_view") val dataCatalog: String = s""" |{"columns": { | "UserId": {"type":"string"}, | "OrderId": {"type":"string"}, | "price": {"type":"double"}, | "timestamp": {"type":"long"} | } |}""".stripMargin
運行調(diào)試
根據(jù)需求修改示例代碼后,可在本地或者通過Spark集群進行運行調(diào)試。以StructuredTableStoreAggSQLSample為例說明調(diào)試過程。
本地調(diào)試
以IntelliJ IDEA為例說明。
說明本文測試使用的環(huán)境為Spark 2.4.3、Scala 2.11.7和Java SE Development Kit 8,如果使用中遇到問題,請聯(lián)系表格存儲技術(shù)支持。
在系統(tǒng)參數(shù)中,配置實例名稱、數(shù)據(jù)表名稱、實例endpoint、阿里云賬號的AccessKey ID和AccessKey Secret等參數(shù)。
您也可以自定義參數(shù)的加載方式。
選擇include dependencies with "provided" scope,單擊OK。
運行示例代碼程序。
通過Spark集群調(diào)試
以spark-submit方式為例說明。示例代碼中的master默認為
local[*]
,在Spark集群上運行時可以去掉,使用spark-submit參數(shù)傳入。執(zhí)行mvn -U clean package命令打包,包的路徑為
target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
。上傳包到Spark集群的Driver節(jié)點,并使用spark-submit提交任務(wù)。
spark-submit --class com.aliyun.tablestore.spark.demo.streaming.StructuredTableStoreAggSQLSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar <ots-instanceName> <ots-tableName> <ots-tunnelId> <access-key-id> <access-key-secret> <ots-endpoint> <max-offsets-per-channel>