日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Redis

阿里云數據庫Redis版是兼容開源Redis協議標準、提供內存加硬盤混合存儲的數據庫服務,基于高可靠雙機熱備架構及可平滑擴展的集群架構,可充分滿足高吞吐、低延遲及彈性變配的業務需求。本文主要介紹如何通過DLA Serverless Spark訪問云數據庫Redis。

重要

云原生數據湖分析(DLA)產品已退市,云原生數據倉庫 AnalyticDB MySQL 版湖倉版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相關使用文檔,請參見訪問Redis數據源。

前提條件

  • 已經開通對象存儲OSS(Object Storage Service)服務。具體操作請參考開通OSS服務

  • 已經創建云數據庫Redis實例。具體請參考步驟1:創建實例。

  • 準備DLA Spark訪問Redis實例所需的安全組ID和交換機ID。具體操作請參見配置數據源網絡。

  • DLA Spark訪問Redis實例所需的安全組ID或交換機ID,已添加到Redis實例的白名單中。具體操作請參見步驟2:設置白名單。

操作步驟

  1. 準備以下測試代碼和依賴包來訪問Redis,并將此測試代碼和依賴包分別編譯打包生成jar包上傳至您的OSS。

    測試代碼示例:

    package com.aliyun.spark
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    
    object SparkOnRedis {
    
      def main(args: Array[String]): Unit = {
        //獲取Redis的, redisHost:內網連接地址(host),redisPort:端口號(port),redisPassword:連接密碼。
        val redisHost = args(0)
        val redisPort = args(1)
        val redisPassword = args(2)
        //redis側的表名。
        var redisTableName = args(3)
    
        //spark conf中配置的redis信息。
        val sparkConf = new SparkConf()
          .set("spark.redis.host", redisHost)
          .set("spark.redis.port", redisPort)
          .set("spark.redis.auth", redisPassword)
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        //樣例數據。
        val data =
          Seq(
            Person("John", 30, "60 Wall Street", 150.5),
            Person("Peter", 35, "110 Wall Street", 200.3)
          )
    
        //通過dataset API寫入數據。
        val dfw = sparkSession.createDataFrame(data)
        dfw.write.format("org.apache.spark.sql.redis")
          .option("model", "hash")
          .option("table", redisTableName)
          .save()
    
        //默認方式讀取redis的hash值。
        var loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("table", redisTableName)
          .load()
          .cache()
        loadedDf.show(10)
    
        //設置infer.schema=true,spark會檢索redis的Schema。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          //        .option("table", redisTableName)
          .option("keys.pattern", redisTableName + ":*")
          .option("infer.schema", "true")
          .load()
        loadedDf.show(10)
    
        //指定Schema的方式。
        loadedDf = sparkSession.read.format("org.apache.spark.sql.redis")
          .option("keys.pattern", redisTableName + ":*")
          .schema(StructType(Array(
            StructField("name", StringType),
            StructField("age", IntegerType),
            StructField("address", StringType),
            StructField("salary", DoubleType)
          )))
          .load()
        loadedDf.show(10)
    
        sparkSession.stop()
      }
    
    }
    
    case class Person(name: String, age: Int, address: String, salary: Double)

    Redis依賴的pom文件:

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.0</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>jedis</artifactId>
                <version>3.0.0-m1</version>
            </dependency>
            <dependency>
                <groupId>com.redislabs</groupId>
                <artifactId>spark-redis</artifactId>
                <version>2.3.1-m3</version>
            </dependency>
  2. 登錄Data Lake Analytics管理控制臺。

  3. 在頁面左上角,選擇Redis實例所在地域。

  4. 單擊左側導航欄中的Serverless Spark > 作業管理。

  5. 作業編輯頁面,單擊創建作業

  6. 創建作業模板頁面,按照頁面提示進行參數配置后,單擊確定創建Spark作業。

    3

  7. 單擊Spark作業名,在Spark作業編輯框中輸入以下作業內容,并按照以下參數說明進行參數值替換。保存并提交Spark作業。

    {
        "args": [
            "r-xxx1.redis.rds.aliyuncs.com",  #Redis數據庫“連接信息”的“內網連接地址(host)。
            "6379",  #Redis數據庫“連接信息”的“端口號(port)”。
            "xxx2",  #Redis數據庫登錄密碼。
            "spark-test"  #Redis數據庫的表名。
        ],
        "file": "oss://spark_test/jars/redis/spark-examples-0.0.1-SNAPSHOT.jar",  #存放測試軟件包的OSS路徑。
        "name": "redis-test",
        "jars": [
            "oss://spark_test/jars/redis/spark-redis-2.3.1-m3.jar",  #存放測試軟件依賴包的OSS路徑。
            "oss://spark_test/jars/redis/commons-pool2-2.0.jar",  #存放測試軟件依賴包的OSS路徑。
            "oss://spark_test/jars/redis/jedis-3.0.0-m1.jar"  #存放測試軟件依賴包的OSS路徑。
        ],
        "className": "com.aliyun.spark.SparkOnRedis",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "small",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-xxx",  #可訪問Redis的交換機id。
            "spark.dla.eni.security.group.id": "sg-xxx"  #可訪問Redis的安全組id。
        }
    }

執行結果

作業運行成功后,在任務列表中單擊操作 > 日志,查看作業日志。出現如下日志說明作業運行成功:日志詳情