阿里云數據庫Redis版是兼容開源Redis協議標準、提供內存加硬盤混合存儲的數據庫服務,基于高可靠雙機熱備架構及可平滑擴展的集群架構,可充分滿足高吞吐、低延遲及彈性變配的業務需求。本文主要介紹如何通過DLA Serverless Spark訪問云數據庫Redis。
重要
云原生數據湖分析(DLA)產品已退市,云原生數據倉庫 AnalyticDB MySQL 版湖倉版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相關使用文檔,請參見訪問Redis數據源。
前提條件
操作步驟
準備以下測試代碼和依賴包來訪問Redis,并將此測試代碼和依賴包分別編譯打包生成jar包上傳至您的OSS。
測試代碼示例:
package com.aliyun.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ object SparkOnRedis { def main(args: Array[String]): Unit = { //獲取Redis的, redisHost:內網連接地址(host),redisPort:端口號(port),redisPassword:連接密碼。 val redisHost = args(0) val redisPort = args(1) val redisPassword = args(2) //redis側的表名。 var redisTableName = args(3) //spark conf中配置的redis信息。 val sparkConf = new SparkConf() .set("spark.redis.host", redisHost) .set("spark.redis.port", redisPort) .set("spark.redis.auth", redisPassword) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() //樣例數據。 val data = Seq( Person("John", 30, "60 Wall Street", 150.5), Person("Peter", 35, "110 Wall Street", 200.3) ) //通過dataset API寫入數據。 val dfw = sparkSession.createDataFrame(data) dfw.write.format("org.apache.spark.sql.redis") .option("model", "hash") .option("table", redisTableName) .save() //默認方式讀取redis的hash值。 var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("table", redisTableName) .load() .cache() loadedDf.show(10) //設置infer.schema=true,spark會檢索redis的Schema。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") // .option("table", redisTableName) .option("keys.pattern", redisTableName + ":*") .option("infer.schema", "true") .load() loadedDf.show(10) //指定Schema的方式。 loadedDf = sparkSession.read.format("org.apache.spark.sql.redis") .option("keys.pattern", redisTableName + ":*") .schema(StructType(Array( StructField("name", StringType), StructField("age", IntegerType), StructField("address", StringType), StructField("salary", DoubleType) ))) .load() loadedDf.show(10) sparkSession.stop() } } case class Person(name: String, age: Int, address: String, salary: Double)
Redis依賴的pom文件:
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>jedis</artifactId> <version>3.0.0-m1</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis</artifactId> <version>2.3.1-m3</version> </dependency>
在頁面左上角,選擇Redis實例所在地域。
單擊左側導航欄中的 。
在作業編輯頁面,單擊創建作業。
在創建作業模板頁面,按照頁面提示進行參數配置后,單擊確定創建Spark作業。
單擊Spark作業名,在Spark作業編輯框中輸入以下作業內容,并按照以下參數說明進行參數值替換。保存并提交Spark作業。
{ "args": [ "r-xxx1.redis.rds.aliyuncs.com", #Redis數據庫“連接信息”的“內網連接地址(host)。 "6379", #Redis數據庫“連接信息”的“端口號(port)”。 "xxx2", #Redis數據庫登錄密碼。 "spark-test" #Redis數據庫的表名。 ], "file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar", #存放測試軟件包的OSS路徑。 "name": "redis-test", "jars": [ "oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar", #存放測試軟件依賴包的OSS路徑。 "oss://spark_test/jars/redis/commons-pool2-2.0.jar", #存放測試軟件依賴包的OSS路徑。 "oss://spark_test/jars/redis/jedis-3.0.0-m1.jar" #存放測試軟件依賴包的OSS路徑。 ], "className": "com.aliyun.spark.SparkOnRedis", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-xxx", #可訪問Redis的交換機id。 "spark.dla.eni.security.group.id": "sg-xxx" #可訪問Redis的安全組id。 } }
執行結果
作業運行成功后,在任務列表中單擊
,查看作業日志。出現如下日志說明作業運行成功:文檔內容是否對您有幫助?