本文介紹如何使用云原生數據倉庫 AnalyticDB MySQL 版Spark通過ENI網絡讀取Elasticsearch數據源。
前提條件
AnalyticDB for MySQL集群的產品系列為企業版、基礎版或湖倉版。
已在AnalyticDB for MySQL集群中創建Job型資源組。具體操作,請參見新建資源組。
已創建數據庫賬號。
如果是通過阿里云賬號訪問,只需創建高權限賬號。具體操作,請參見創建高權限賬號。
如果是通過RAM用戶訪問,需要創建高權限賬號和普通賬號并且將RAM用戶綁定到普通賬號上。具體操作,請參見創建數據庫賬號和綁定或解綁RAM用戶與數據庫賬號。
已創建阿里云Elasticsearch實例。具體操作,請參見創建阿里云Elasticsearch實例。
已將AnalyticDB for MySQL企業版、基礎版及湖倉版的IP地址添加至阿里云Elasticsearch實例的白名單中。具體操作,請參見配置實例公網或私網訪問白名單。
已開通OSS服務,并創建與AnalyticDB for MySQL湖倉版(3.0)集群位于相同地域的存儲空間。具體操作,請參見開通OSS服務和創建存儲空間。
準備工作
在阿里云Elasticsearch控制臺的基本信息頁面,獲取交換機ID。
在ECS管理控制臺的安全組頁面,獲取阿里云Elasticsearch實例所屬的安全組ID。如未添加安全組,請參見創建安全組。
使用Scala連接阿里云Elasticsearch
下載與阿里云Elasticsearch實例版本對應的JAR包,下載鏈接,請參見Elasticsearch Spark。本文下載的示例JAR包為Elasticsearch-spark-30_2.12-7.17.9.jar。
在pom.xml文件的dependencies中添加依賴項。
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
重要請確保pom.xml文件中Elasticsearch-spark-30_2.12的版本與阿里云Elasticsearch實例的版本一致,Spark-core_2.12的版本與AnalyticDB for MySQL Spark版本一致。
編寫如下示例程序,并進行編譯打包,本文生成的JAR包名稱為
spark-example.jar
。package org.example import org.apache.spark.sql.{SaveMode, SparkSession} object SparkEs { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate(); // 生成一個dataframe val columns = Seq("language","users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000")) val writeDF = spark.createDataFrame(data).toDF(columns:_*) // 寫入數據 writeDF.write.format("es").mode(SaveMode.Overwrite) // 阿里云Elasticsearch實例的私網地址 .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // 阿里云Elasticsearch實例的私網端口號 .option("es.port", "9200") // 阿里云Elasticsearch實例的用戶名,固定寫為elastic .option("es.net.http.auth.user", "elastic") // 阿里云Elasticsearch實例的密碼 .option("es.net.http.auth.pass", "password") // 連接阿里云Elasticsearch實例時,必須配置為true .option("es.nodes.wan.only", "true") // 連接阿里云Elasticsearch實例時,必須配置為false .option("es.nodes.discovery", "false") // Spark讀取的阿里云Elasticsearch實例的數據類型 .save("spark/_doc") // 讀取數據 spark.read.format("es") // 阿里云Elasticsearch實例的私網地址 .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // 阿里云Elasticsearch實例的私網端口號 .option("es.port", "9200") // 阿里云Elasticsearch實例的用戶名,固定寫為elastic .option("es.net.http.auth.user", "elastic") // 阿里云Elasticsearch實例的密碼 .option("es.net.http.auth.pass", "password") // 連接阿里云Elasticsearch實例時,必須配置為true .option("es.nodes.wan.only", "true") // 連接阿里云Elasticsearch實例時,必須配置為false .option("es.nodes.discovery", "false") // Spark讀取的阿里云Elasticsearch實例的數據類型,格式為<index>/<type> .load("spark/_doc").show } }
將步驟1中下載的JAR包和示例程序
spark-example.jar
上傳至OSS。具體操作,請參見上傳文件。登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版或湖倉版頁簽下,單擊目標集群ID。
在左側導航欄,單擊
。在編輯器窗口上方,選擇Job型資源組和Spark應用類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "ES-SPARK-EXAMPLE", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/spark-example.jar", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }
參數說明如下:
參數
說明
name
Spark作業名稱。
className
Java或者Scala程序入口類,Python不需要指定入口類。
conf
與開源Spark中的配置項基本一致,參數格式為
key:value
形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明。spark.adb.eni.enabled
是否開啟ENI訪問。使用企業版、基礎版及湖倉版Spark訪問Elasticsearch數據源時,需要開啟ENI訪問。
spark.adb.eni.vswitchId
阿里云Elasticsearch實例的交換機ID。獲取方法,請參見準備工作。
spark.adb.eni.securityGroupId
阿里云Elasticsearch實例的安全組ID。獲取方法,請參見準備工作。
file
示例程序
spark-example.jar
所在的OSS路徑。jars
Spark作業依賴的JAR包所在的OSS路徑。
單擊立即執行。
使用PySpark連接阿里云Elasticsearch
下載與阿里云Elasticsearch實例版本對應的JAR包,下載鏈接,請參見Elasticsearch Spark。本文下載的示例JAR包為Elasticsearch-spark-30_2.12-7.17.9.jar。
在pom.xml文件的dependencies中添加依賴項。
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
重要請確保pom.xml文件中Elasticsearch-spark-30_2.12的版本與阿里云Elasticsearch實例的版本一致,Spark-core_2.12的版本與AnalyticDB for MySQL Spark版本一致。
編寫如下示例程序,并將示例程序存儲為
es-spark-example.py
。from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession \ .builder \ .getOrCreate() # 生成DataFrame dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40) ] deptColumns = ["dept_name", "dept_id"] deptDF = spark.createDataFrame(data=dept, schema=deptColumns) deptDF.printSchema() deptDF.show(truncate=False) # 寫入數據 deptDF.write.format('es').mode("overwrite") \ #阿里云Elasticsearch實例的私網地址 .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ #阿里云Elasticsearch實例的私網端口號 .option('es.port', '9200') \ #阿里云Elasticsearch實例的用戶名,固定寫為elastic .option('es.net.http.auth.user', 'elastic') \ #阿里云Elasticsearch實例的密碼 .option('es.net.http.auth.pass', 'password') \ #連接阿里云Elasticsearch實例時,必須配置為true .option("es.nodes.wan.only", "true") \ #連接阿里云Elasticsearch實例時,必須配置為false .option("es.nodes.discovery", "false") \ #Spark讀取的阿里云Elasticsearch實例的數據類型,格式為<index>/<type> .save("spark/_doc") # 讀取數據 df = spark.read.format("es") \ #阿里云Elasticsearch實例的私網地址 .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ #阿里云Elasticsearch實例的私網端口號 .option('es.port', '9200') \ #阿里云Elasticsearch實例的用戶名,固定寫為elastic .option('es.net.http.auth.user', 'elastic') \ #阿里云Elasticsearch實例的密碼 .option('es.net.http.auth.pass', 'password') \ #連接阿里云Elasticsearch實例時,必須配置為true .option("es.nodes.wan.only", "true") \ #連接阿里云Elasticsearch實例時,必須配置為false .option("es.nodes.discovery", "false") \ #Spark讀取的阿里云Elasticsearch實例的數據類型,格式為<index>/<type> .load("spark/_doc").show
將步驟1中下載的JAR包和
es-spark-example.py
程序上傳到OSS中。具體操作,請參見上傳文件。登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版或湖倉版頁簽下,單擊目標集群ID。
在左側導航欄,單擊
。在編輯器窗口上方,選擇Job型資源組和Spark應用類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "ES-SPARK-EXAMPLE", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/es-spark-example.py", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }
參數說明如下:
參數
說明
name
Spark作業的名稱。
conf
與開源Spark中的配置項基本一致,參數格式為
key:value
形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明。spark.adb.eni.enabled
是否開啟ENI訪問。使用企業版、基礎版及湖倉版Spark訪問Elasticsearch數據源時,需要開啟ENI訪問。
spark.adb.eni.vswitchId
阿里云Elasticsearch實例的交換機ID。獲取方法,請參見準備工作。
spark.adb.eni.securityGroupId
阿里云Elasticsearch實例的安全組ID。獲取方法,請參見準備工作。
file
es-spark-example.py
程序所在的OSS路徑。jars
Spark作業依賴的JAR包所在的OSS路徑。
單擊立即執行。