本文以一個完整的程序為例,提交一個計算π的作業(yè)到數(shù)據(jù)湖分析DLA,跟蹤它的狀態(tài),查詢歷史上的運行結(jié)果。
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.openanalytics_open.model.v20180619.*;
import com.aliyuncs.profile.DefaultProfile;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* 演示如何使用Java SDK操作數(shù)據(jù)湖分析的spark作業(yè)
*
* @author aliyun
*/
public class Demo {
/**
* 提交一個SQL作業(yè)到數(shù)據(jù)湖分析Serverless Spark
*
* @param virtualClusterName 數(shù)據(jù)湖分析虛擬集群名稱
* @param sql sql內(nèi)容
* @return Spark JobId, 提交作業(yè)成功, 返回作業(yè)的ID, 用于后續(xù)的狀態(tài)跟蹤
* @throws ClientException 提交作業(yè)可能因為網(wǎng)絡原因等拋出錯誤
*/
public static String submitSparkSQL(IAcsClient client,
String virtualClusterName,
String sql) throws ClientException {
// 初始化Request, 填入集群名稱和作業(yè)內(nèi)容
SubmitSparkSQLRequest request = new SubmitSparkSQLRequest();
request.setVcName(virtualClusterName);
request.setSql(sql);
// 提交作業(yè), 返回Spark作業(yè)的JobId
SubmitSparkSQLResponse response = client.getAcsResponse(request);
return response.getJobId();
}
/**
* 提交一個作業(yè)到數(shù)據(jù)湖分析Serverless Spark
*
* @param virtualClusterName 數(shù)據(jù)湖分析虛擬集群名稱
* @param jobConfig 提交Spark作業(yè)的描述文件,需要是JSON格式
* @return Spark JobId, 提交作業(yè)成功, 返回作業(yè)的ID, 用于后續(xù)的狀態(tài)跟蹤
* @throws ClientException 提交作業(yè)可能因為網(wǎng)絡原因等拋出錯誤
*/
public static String submitSparkJob(IAcsClient client,
String virtualClusterName,
String jobConfig) throws ClientException {
// 初始化Request, 填入集群名稱和作業(yè)內(nèi)容
SubmitSparkJobRequest request = new SubmitSparkJobRequest();
request.setVcName(virtualClusterName);
request.setConfigJson(jobConfig);
// 提交作業(yè), 返回Spark作業(yè)的JobId
SubmitSparkJobResponse response = client.getAcsResponse(request);
return response.getJobId();
}
/**
* 返回一個Spark Job當前的狀態(tài)
*
* @param sparkJobId 用戶Spark作業(yè)的ID
* @return 返回Spark作業(yè)的狀態(tài), 類型為String
* @throws ClientException 提交作業(yè)可能因為網(wǎng)絡原因等拋出錯
*/
public static String getSparkJobStatus(IAcsClient client,
String virtualClusterName,
String sparkJobId) throws ClientException {
// 初始化Request, 填入spark job id
GetJobStatusRequest request = new GetJobStatusRequest();
request.setJobId(sparkJobId);
request.setVcName(virtualClusterName);
// 提交作業(yè), 返回Spark作業(yè)的狀態(tài)碼
GetJobStatusResponse response = client.getAcsResponse(request);
return response.getStatus();
}
/**
* 停止一個Spark Job
*
* @param sparkJobId 用戶Spark作業(yè)的ID
* @param virtualClusterName 數(shù)據(jù)湖分析虛擬集群名稱
* @return 無返回值
* @throws ClientException 提交作業(yè)可能因為網(wǎng)絡原因等拋出錯
*/
public static void killSparkJob(IAcsClient client,
String virtualClusterName,
String sparkJobId) throws ClientException {
// 初始化Request, 填入spark job id
KillSparkJobRequest request = new KillSparkJobRequest();
request.setVcName(virtualClusterName);
request.setJobId(sparkJobId);
// 提交作業(yè), 返回Spark作業(yè)的狀態(tài)碼
KillSparkJobResponse response = client.getAcsResponse(request);
}
/**
* 返回一個Spark Job的日志
*
* @param client 客戶端
* @param virtualClusterName 數(shù)據(jù)湖分析虛擬集群名稱
* @param sparkJobId 用戶Spark作業(yè)的ID
* @return 返回Spark作業(yè)的狀態(tài), 類型為String
* @throws ClientException 提交作業(yè)可能因為網(wǎng)絡原因等拋出錯
*/
public static String getSparkJobLog(IAcsClient client,
String virtualClusterName,
String sparkJobId) throws ClientException {
// 初始化Request, 填入spark job id
GetJobLogRequest request = new GetJobLogRequest();
request.setJobId(sparkJobId);
request.setVcName(virtualClusterName);
// 提交作業(yè), 返回Spark作業(yè)的日志
GetJobLogResponse response = client.getAcsResponse(request);
return response.getData();
}
/**
* 查詢某個虛擬集群上提交的Spark作業(yè), 通過翻頁可以遍歷所有的歷史作業(yè)信息
*
* @param client 客戶端
* @param pageNumber 查詢的頁碼, 從1開始
* @param pageSize 每頁返回數(shù)量
* @throws ClientException 提交作業(yè)可能因為網(wǎng)絡原因等拋出錯
*/
public static void listSparkJob(IAcsClient client,
String virtualClusterName,
int pageNumber,
int pageSize) throws ClientException {
// 初始化Request, 填入spark job id
ListSparkJobRequest request = new ListSparkJobRequest();
request.setVcName(virtualClusterName);
request.setPageNumber(pageNumber); // pageNumber 從1開始
request.setPageSize(pageSize);
// 提交作業(yè), 返回Spark作業(yè)的狀態(tài)碼
ListSparkJobResponse response = client.getAcsResponse(request);
// 獲取任務列表
List<ListSparkJobResponse.DataResult.Data> sparkJobList = response.getDataResult().getJobList();
for (ListSparkJobResponse.DataResult.Data job : sparkJobList) {
System.out.println(String.format("JobName: %s, JobUI: %s, JobStatus: %s, JobConfig: %s",
job.getJobName(),
job.getStatus(),
job.getSparkUI(),
job.getDetail()));
}
}
public static void main(String[] args) throws IOException, ClientException, InterruptedException {
// 提交任務必須的參數(shù)
String region = "cn-hangzhou";
// 獲取一個合法的AK SK
String accessKeyId = System.getEnv("RAM_AK");
String accessKeySecret = "System.getEnv("RAM_SK");
String virtualClusterName = "MyCluster";
// 需要是一個合法的JSON格式的字符串
String jobConfig=
"{\n" +
" \"name\": \"SparkPi\",\n" +
" \"file\": \"local:///tmp/spark-examples.jar\",\n" +
" \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
" \"args\": [\n" +
" \"100\"\n" +
" ],\n" +
" \"conf\": {\n" +
" \"spark.driver.resourceSpec\": \"medium\",\n" +
" \"spark.executor.instances\": 5,\n" +
" \"spark.executor.resourceSpec\": \"medium\"\n" +
" }\n" +
"}";
String sql = "-- here is the spark conf\n"
+ "set spark.driver.resourceSpec=medium;\n"
+ "set spark.executor.instances=5;\n"
+ "set spark.executor.resourceSpec=medium;\n"
+ "set spark.app.name=sparksqltest;\n"
+ "set spark.sql.hive.metastore.version=dla;\n"
+ "-- here is your sql statement\n"
+ "-- add your jar\n"
+ "-- add jar oss://path/to/your/jar\n"
+ "show databases;";
// 初始化阿里云平臺開發(fā)Client
DefaultProfile profile = DefaultProfile.getProfile(region, accessKeyId, accessKeySecret);
IAcsClient client = new DefaultAcsClient(profile);
// 提交任務
// 您也可以選擇提交SQL作業(yè)
//String sparkJobId = submitSparkSQL(client, virtualClusterName, sql);
String sparkJobId = submitSparkJob(client, virtualClusterName, jobConfig);
// 輪詢?nèi)蝿諣顟B(tài), 超時未完成則殺死任務
long startTime = System.currentTimeMillis();
List<String> finalStatusList = Arrays.asList("error", "success", "dead", "killed");
while (true) {
String status = getSparkJobStatus(client, virtualClusterName, sparkJobId);
if (finalStatusList.contains(status)) {
System.out.println("Job went to final status");
break;
} else if ((System.currentTimeMillis() - startTime) > 100000) {
// 如果超時則殺死job
System.out.println("Kill expire time job");
killSparkJob(client, virtualClusterName, sparkJobId);
break;
}
// 打印狀態(tài), 等待5秒, 進入下一輪查詢
System.out.println(String.format("Job %s status is %s", sparkJobId, status));
Thread.sleep(5000);
}
// 打印作業(yè)的日志
String logDetail = getSparkJobLog(client, virtualClusterName, sparkJobId);
System.out.println(logDetail);
// 打印最近10條作業(yè)的明細
listSparkJob(client, virtualClusterName, 1, 10);
}
}