本文介紹Spark如何訪問SLS。
Spark RDD訪問SLS
代碼示例
## TestBatchLoghub.Scala
object TestBatchLoghub {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
| <access key id> <access key secret> <output path> <start time> <end time=now>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val endpoint = args(2)
val accessKeyId = args(3)
val accessKeySecret = args(4)
val outputPath = args(5)
val startTime = args(6).toLong
val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
var rdd:JavaRDD[String] = null
if (args.length > 7) {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
} else {
rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
}
rdd.saveAsTextFile(outputPath)
}
}
Maven pom文件可以參見aliyun-emapreduce-demo。
編譯運行
運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量。
## 編譯命令
mvn clean package -DskipTests
## 編譯完后,作業JAR包位于target/shaded/下。
## 提交執行
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g --num-executors 2 --class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> <sls endpoint> $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET <output path> <start time> [<end time=now>]
x.x.x.TestBatchLoghub
和xxx.jar
需要替換成真實的類路徑和包路徑。作業資源需要根據實際數據規模和實際集群規模調整,如果集群太小,直接運行以上命令可能無法執行。
spark-sql訪問SLS
訪問命令
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
中包含LogHub DataSource類型。如果您EMR集群使用的是Spark2,則應修改上面命令中的spark3
應該換成spark2
。
如果您希望在本地電腦的開發環境中使用Spark3依賴SLS,類似于Spark2的操作方式,可以按照以下步驟操作:
下載集群
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12
目錄下的內容到本地。使用Maven將JAR包安裝到本地。
mvn install:install-file -DgroupId=com.aliyun.emr -DartifactId=emr-datasources_shaded_2.12 -Dversion=3.0.2 -Dpackaging=jar -Dfile=/Users/zhongqiang.czq/Downloads/tempory/emr-datasources_shaded_2.12-3.0.2.jar
在pom文件中添加以下依賴項。
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datasources_shaded_2.12</artifactId> <version>3.0.2</version> </dependency>
建表和讀取數據示例
create table test_sls
using loghub
options(endpoint='cn-hangzhou-intranet.log.aliyuncs.com',
access.key.id='${hiveconf:accessKeyId}',
access.key.secret='${hiveconf:accessKeySecret}',
sls.project='test_project',
sls.store='test_store',
startingoffsets='earliest'
);
select * from test_sls;
配置環境變量
配置環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維,具體操作,請參見創建RAM用戶。
請不要將AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
Linux和macOS系統配置方法:
執行以下命令配置環境變量。
其中,
<access_key_id>
需替換為您RAM用戶的AccessKey ID,<access_key_secret>
替換為您RAM用戶的AccessKey Secret。export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
Windows系統配置方法
新建環境變量文件,添加環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并寫入已準備好的AccessKey ID和AccessKey Secret。
重啟Windows系統生效。
相關文檔
Spark訪問Kafka:Structured Streaming + Kafka Integration Guide