EMR Serverless Spark與其他VPC間網(wǎng)絡(luò)互通
通過網(wǎng)絡(luò)連接功能,您可以訪問自有VPC(Virtual Private Cloud)內(nèi)的數(shù)據(jù)源。本文將以SparkSQL和Application JAR類型任務(wù)連接至您的自有VPC的HMS(Hive Metastore)服務(wù)為例,為您介紹如何配置并訪問自有VPC內(nèi)的數(shù)據(jù)源。
前提條件
已準(zhǔn)備好數(shù)據(jù)源。本文以在EMR on ECS頁面創(chuàng)建包含Hive服務(wù),元數(shù)據(jù)為內(nèi)置MySQL的DataLake集群為例,詳情請參見創(chuàng)建集群。
使用限制
當(dāng)前僅支持創(chuàng)建一個網(wǎng)絡(luò)連接。
當(dāng)前僅支持使用下列可用區(qū)的交換機。
地域名稱
地域ID
可用區(qū)名稱
華東1(杭州)
cn-hangzhou
杭州 可用區(qū)H
杭州 可用區(qū)I
杭州 可用區(qū)J
華東2(上海)
cn-shanghai
上海 可用區(qū)F
上海 可用區(qū)G
華北2(北京)
cn-beijing
北京 可用區(qū)F
北京 可用區(qū)G
北京 可用區(qū)H
北京 可用區(qū)K
華南1(深圳)
cn-shenzhen
深圳 可用區(qū)E
地域名稱
地域ID
可用區(qū)名稱
新加坡
ap-southeast-1
新加坡 可用區(qū)B
新加坡 可用區(qū)C
美國(弗吉尼亞)
us-east-1
弗吉尼亞 可用區(qū)A
弗吉尼亞 可用區(qū)B
德國(法蘭克福)
eu-central-1
法蘭克福 可用區(qū)A
法蘭克福 可用區(qū)B
步驟一:新增網(wǎng)絡(luò)連接
進入網(wǎng)絡(luò)連接頁面。
在左側(cè)導(dǎo)航欄,選擇
。在Spark頁面,單擊目標(biāo)工作空間名稱。
在EMR Serverless Spark頁面,單擊左側(cè)導(dǎo)航欄中的網(wǎng)絡(luò)連接。
在網(wǎng)絡(luò)連接頁面,單擊新增網(wǎng)絡(luò)連接。
在新增網(wǎng)絡(luò)連接對話框中,配置以下信息,單擊確定。
參數(shù)
說明
連接名稱
輸入新增連接的名稱。
專有網(wǎng)絡(luò)
選擇與EMR集群相同的專有網(wǎng)絡(luò)。
如果當(dāng)前沒有可選擇的專有網(wǎng)絡(luò),請單擊創(chuàng)建專有網(wǎng)絡(luò),前往專有網(wǎng)絡(luò)控制臺創(chuàng)建,詳情請參見創(chuàng)建和管理專有網(wǎng)絡(luò)。
交換機
選擇與EMR集群部署在同一專有網(wǎng)絡(luò)下的相同交換機。
如果當(dāng)前可用區(qū)沒有交換機,請單擊虛擬交換機,前往專有網(wǎng)絡(luò)控制臺創(chuàng)建,詳情請參見創(chuàng)建和管理交換機。
重要僅支持選擇特定可用區(qū)下的交換機,詳情請參見使用限制。
當(dāng)狀態(tài)顯示為已成功時,表示新增網(wǎng)絡(luò)連接成功。
步驟二:為EMR集群添加安全組規(guī)則
獲取已創(chuàng)建網(wǎng)絡(luò)連接中指定交換機的網(wǎng)段。
您可以登錄專有網(wǎng)絡(luò)管理控制臺,在交換機頁面獲取交換機的網(wǎng)段。
添加安全組規(guī)則。
在集群管理頁面,單擊目標(biāo)集群的集群ID。
在基礎(chǔ)信息頁面,單擊集群安全組后面的鏈接。
在安全組規(guī)則頁面,單擊手動添加,填寫端口范圍和授權(quán)對象,然后單擊保存。
參數(shù)
說明
端口范圍
填寫9083端口。
授權(quán)對象
填寫前一步驟中獲取的指定交換機的網(wǎng)段。
重要為防止被外部的用戶攻擊導(dǎo)致安全問題,授權(quán)對象禁止填寫為0.0.0.0/0。
(可選)步驟三:連接Hive服務(wù)并查詢表數(shù)據(jù)
如果您已有創(chuàng)建并配置好的Hive表,則可以跳過該步驟。
使用SSH方式登錄集群的Master節(jié)點,詳情請參見登錄集群。
執(zhí)行以下命令,進入Hive命令行。
hive
執(zhí)行以下命令,創(chuàng)建表。
CREATE TABLE my_table (id INT,name STRING);
執(zhí)行以下命令,向表中插入數(shù)據(jù)。
INSERT INTO my_table VALUES (1, 'John'); INSERT INTO my_table VALUES (2, 'Jane');
執(zhí)行以下命令,查詢數(shù)據(jù)。
SELECT * FROM my_table;
(可選)步驟四:準(zhǔn)備并上傳資源文件
如果您后續(xù)使用JAR任務(wù)類型,則需提前準(zhǔn)備好資源文件。如果使用本文的SparkSQL任務(wù)類型,則可以跳過該步驟。
在本地新建一個Maven工程。
package com.example; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class DataFrameExample { public static void main(String[] args) { // 創(chuàng)建SparkSession。 SparkSession spark = SparkSession.builder() .appName("HMSQueryExample") .enableHiveSupport() .getOrCreate(); // 執(zhí)行查詢。 Dataset<Row> result = spark.sql("SELECT * FROM default.my_table"); // 打印查詢結(jié)果。 result.show(); // 關(guān)閉SparkSession。 spark.stop(); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>sparkDataFrame</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spark.version>3.3.1</spark.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> </dependencies> </project>
使用
mvn package
命令打包,編譯打包后生成sparkDataFrame-1.0-SNAPSHOT.jar文件。在EMR Serverless Spark頁面的目標(biāo)工作空間下,單擊左側(cè)的文件管理。
在文件管理頁面,單擊上傳文件。
上傳本地打包好的
sparkDataFrame-1.0-SNAPSHOT.jar
文件。
步驟五:新建并運行任務(wù)
JAR任務(wù)
在EMR Serverless Spark頁面,單擊左側(cè)的數(shù)據(jù)開發(fā)。
單擊新建。
輸入名稱,類型選擇
,單擊確定。在新建的任務(wù)開發(fā)中,配置以下信息,其余參數(shù)無需配置,然后單擊運行。
參數(shù)
說明
主jar資源
選擇前一步驟中上傳的資源文件。例如,sparkDataFrame-1.0-SNAPSHOT.jar。
Main Class
提交Spark任務(wù)時所指定的主類。本文示例填寫為com.example.DataFrameExample。
Spark配置
配置以下信息。
spark.hadoop.hive.metastore.uris thrift://*.*.*.*:9083 spark.hadoop.hive.imetastoreclient.factory.class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClientFactory spark.emr.serverless.network.service.name <yourConnectionName>
其中,以下信息請您根據(jù)實際情況替換:
*.*.*.*
為HMS服務(wù)的內(nèi)網(wǎng)IP地址。本示例為EMR集群Master節(jié)點的內(nèi)網(wǎng)IP,您可以在EMR集群的節(jié)點管理頁面中查看。<yourConnectionName>
為您在步驟一中新增的網(wǎng)絡(luò)連接的名稱。
運行任務(wù)后,在下方的運行記錄區(qū)域,單擊任務(wù)操作列的詳情。
在任務(wù)歷史的開發(fā)任務(wù)頁面的日志探查頁簽,您可以查看相關(guān)的日志信息。
SparkSQL任務(wù)
創(chuàng)建并啟動SQL會話,詳情請參見管理SQL會話。
Spark配置參數(shù)需要配置以下信息。
spark.hadoop.hive.metastore.uris thrift://*.*.*.*:9083 spark.hadoop.hive.imetastoreclient.factory.class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClientFactory spark.emr.serverless.network.service.name <yourConnectionName>
其中,以下信息請您根據(jù)實際情況替換:
*.*.*.*
為HSM服務(wù)的內(nèi)網(wǎng)IP地址。本示例為EMR集群Master節(jié)點的內(nèi)網(wǎng)IP,您可以在EMR集群的節(jié)點管理頁面中查看。<yourConnectionName>
為您在步驟一中新增的網(wǎng)絡(luò)連接的名稱。
在EMR Serverless Spark頁面,單擊左側(cè)的數(shù)據(jù)開發(fā)。
單擊新建。
輸入名稱,類型選擇
,單擊確定。在新建的任務(wù)開發(fā)中,選擇Catalog、數(shù)據(jù)庫和已啟動的SQL會話實例,輸入以下命令,并單擊運行。
SELECT * FROM default.my_table;
說明當(dāng)您計劃將基于外部Metastore的SQL代碼部署到工作流時,請確保您的SQL語句以
db.table_name
的形式指定表名,并且務(wù)必在界面右上方“Catalog”選項中選取一個默認(rèn)庫,其格式應(yīng)為catalog_id.default
。下方的運行結(jié)果區(qū)域會向您展示返回信息。