日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

訪問Elasticsearch數據源

本文介紹如何使用云原生數據倉庫 AnalyticDB MySQL 版Spark通過ENI網絡讀取Elasticsearch數據源。

前提條件

準備工作

  1. 阿里云Elasticsearch控制臺基本信息頁面,獲取交換機ID。

  2. ECS管理控制臺安全組頁面,獲取阿里云Elasticsearch實例所屬的安全組ID。如未添加安全組,請參見創建安全組

使用Scala連接阿里云Elasticsearch

  1. 下載與阿里云Elasticsearch實例版本對應的JAR包,下載鏈接,請參見Elasticsearch Spark。本文下載的示例JAR包為Elasticsearch-spark-30_2.12-7.17.9.jar。

  2. 在pom.xml文件的dependencies中添加依賴項。

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    重要

    請確保pom.xml文件中Elasticsearch-spark-30_2.12的版本與阿里云Elasticsearch實例的版本一致,Spark-core_2.12的版本與AnalyticDB for MySQL Spark版本一致。

  3. 編寫如下示例程序,并進行編譯打包,本文生成的JAR包名稱為spark-example.jar

    package org.example
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkEs {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().getOrCreate();
    
        // 生成一個dataframe
        val columns = Seq("language","users_count")
        val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
        val writeDF = spark.createDataFrame(data).toDF(columns:_*)
    
        // 寫入數據
        writeDF.write.format("es").mode(SaveMode.Overwrite)
        // 阿里云Elasticsearch實例的私網地址
        .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
        // 阿里云Elasticsearch實例的私網端口號
        .option("es.port", "9200")
        // 阿里云Elasticsearch實例的用戶名,固定寫為elastic
        .option("es.net.http.auth.user", "elastic")
        // 阿里云Elasticsearch實例的密碼
        .option("es.net.http.auth.pass", "password")
        // 連接阿里云Elasticsearch實例時,必須配置為true 
        .option("es.nodes.wan.only", "true")
        // 連接阿里云Elasticsearch實例時,必須配置為false
        .option("es.nodes.discovery", "false")
        // Spark讀取的阿里云Elasticsearch實例的數據類型
        .save("spark/_doc")
    
        // 讀取數據
        spark.read.format("es")
        // 阿里云Elasticsearch實例的私網地址
        .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
        // 阿里云Elasticsearch實例的私網端口號
        .option("es.port", "9200")
        // 阿里云Elasticsearch實例的用戶名,固定寫為elastic
        .option("es.net.http.auth.user", "elastic")
        // 阿里云Elasticsearch實例的密碼
        .option("es.net.http.auth.pass", "password")
        // 連接阿里云Elasticsearch實例時,必須配置為true 
        .option("es.nodes.wan.only", "true")
        // 連接阿里云Elasticsearch實例時,必須配置為false
        .option("es.nodes.discovery", "false")
        // Spark讀取的阿里云Elasticsearch實例的數據類型,格式為<index>/<type>
        .load("spark/_doc").show
      }
    }
  4. 將步驟1中下載的JAR包和示例程序spark-example.jar上傳至OSS。具體操作,請參見上傳文件

  5. 登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版湖倉版頁簽下,單擊目標集群ID。

  6. 在左側導航欄,單擊作業開發 > Spark Jar 開發

  7. 在編輯器窗口上方,選擇Job型資源組和Spark應用類型。本文以Batch類型為例。

  8. 在編輯器中執行以下作業內容。

    {
    
        "name": "ES-SPARK-EXAMPLE",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/spark-example.jar",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    參數說明如下:

    參數

    說明

    name

    Spark作業名稱。

    className

    Java或者Scala程序入口類,Python不需要指定入口類。

    conf

    與開源Spark中的配置項基本一致,參數格式為key:value形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明

    spark.adb.eni.enabled

    是否開啟ENI訪問。使用企業版、基礎版及湖倉版Spark訪問Elasticsearch數據源時,需要開啟ENI訪問。

    spark.adb.eni.vswitchId

    阿里云Elasticsearch實例的交換機ID。獲取方法,請參見準備工作

    spark.adb.eni.securityGroupId

    阿里云Elasticsearch實例的安全組ID。獲取方法,請參見準備工作

    file

    示例程序spark-example.jar所在的OSS路徑。

    jars

    Spark作業依賴的JAR包所在的OSS路徑。

  9. 單擊立即執行

使用PySpark連接阿里云Elasticsearch

  1. 下載與阿里云Elasticsearch實例版本對應的JAR包,下載鏈接,請參見Elasticsearch Spark。本文下載的示例JAR包為Elasticsearch-spark-30_2.12-7.17.9.jar。

  2. 在pom.xml文件的dependencies中添加依賴項。

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    重要

    請確保pom.xml文件中Elasticsearch-spark-30_2.12的版本與阿里云Elasticsearch實例的版本一致,Spark-core_2.12的版本與AnalyticDB for MySQL Spark版本一致。

  3. 編寫如下示例程序,并將示例程序存儲為es-spark-example.py

    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .getOrCreate()
    
        # 生成DataFrame
        dept = [("Finance", 10),
                ("Marketing", 20),
                ("Sales", 30),
                ("IT", 40)
                ]
        deptColumns = ["dept_name", "dept_id"]
        deptDF = spark.createDataFrame(data=dept, schema=deptColumns)
        deptDF.printSchema()
        deptDF.show(truncate=False)
    
        # 寫入數據
        deptDF.write.format('es').mode("overwrite") \
            #阿里云Elasticsearch實例的私網地址
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            #阿里云Elasticsearch實例的私網端口號
            .option('es.port', '9200') \
            #阿里云Elasticsearch實例的用戶名,固定寫為elastic
            .option('es.net.http.auth.user', 'elastic') \
            #阿里云Elasticsearch實例的密碼
            .option('es.net.http.auth.pass', 'password') \
            #連接阿里云Elasticsearch實例時,必須配置為true
            .option("es.nodes.wan.only", "true") \
            #連接阿里云Elasticsearch實例時,必須配置為false
            .option("es.nodes.discovery", "false") \
            #Spark讀取的阿里云Elasticsearch實例的數據類型,格式為<index>/<type>
            .save("spark/_doc")
    
        # 讀取數據
        df = spark.read.format("es") \
            #阿里云Elasticsearch實例的私網地址
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            #阿里云Elasticsearch實例的私網端口號
            .option('es.port', '9200') \
            #阿里云Elasticsearch實例的用戶名,固定寫為elastic
            .option('es.net.http.auth.user', 'elastic') \
            #阿里云Elasticsearch實例的密碼
            .option('es.net.http.auth.pass', 'password') \
            #連接阿里云Elasticsearch實例時,必須配置為true 
            .option("es.nodes.wan.only", "true") \
            #連接阿里云Elasticsearch實例時,必須配置為false
            .option("es.nodes.discovery", "false") \
            #Spark讀取的阿里云Elasticsearch實例的數據類型,格式為<index>/<type>
            .load("spark/_doc").show
                            
  4. 將步驟1中下載的JAR包和es-spark-example.py程序上傳到OSS中。具體操作,請參見上傳文件

  5. 登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版湖倉版頁簽下,單擊目標集群ID。

  6. 在左側導航欄,單擊作業開發 > Spark Jar 開發

  7. 在編輯器窗口上方,選擇Job型資源組和Spark應用類型。本文以Batch類型為例。

  8. 在編輯器中執行以下作業內容。

    {
        "name": "ES-SPARK-EXAMPLE",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/es-spark-example.py",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    參數說明如下:

    參數

    說明

    name

    Spark作業的名稱。

    conf

    與開源Spark中的配置項基本一致,參數格式為key:value形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明

    spark.adb.eni.enabled

    是否開啟ENI訪問。使用企業版、基礎版及湖倉版Spark訪問Elasticsearch數據源時,需要開啟ENI訪問。

    spark.adb.eni.vswitchId

    阿里云Elasticsearch實例的交換機ID。獲取方法,請參見準備工作

    spark.adb.eni.securityGroupId

    阿里云Elasticsearch實例的安全組ID。獲取方法,請參見準備工作

    file

    es-spark-example.py程序所在的OSS路徑。

    jars

    Spark作業依賴的JAR包所在的OSS路徑。

  9. 單擊立即執行