場景示例
更新時間:
本文介紹運行任務并查看結果、重試失敗文件的完整流程。
場景一: 運行任務
以下示例用于創建通道、創建代理、創建數據地址、創建遷移任務,運行任務并查看運行結果。
說明
在不使用代理的情況下,請忽略下面代碼中的創建通道和代理的部分。使用代理前需要先部署代理,詳情請參見 代理管理。
package sample;
import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
import com.google.gson.Gson;
import java.util.LinkedList;
import java.util.List;
public class Demo {
/** 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。*/
static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/** 填寫主賬號ID。*/
static String userId = "11470***876***55";
private static final String AVAILABLE = "available";
private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
public static void main(String[] args) {
// 填寫數據地址名稱。
String addressName = "exampleaddress";
try {
/** 步驟1:初始化 */
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
// 這里以北京區域為例。
config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
config.setRegionId(accessKeyId);
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(accessKeySecret);
Client client = new Client(config);
//* 創建通道和代理,代理可以創建多個關聯一個通道。 */
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest();
CreateTunnelInfo createTunnelInfo = new CreateTunnelInfo();
TunnelQos qos = new TunnelQos();
// MaxBandwidth, MaxQps 默認為0, 表示沒有限制, MaxBandWidth的單位是bit,請按照實際需求填寫。
qos.setMaxBandwidth(1073741824L);
qos.setMaxQps(1000);
createTunnelInfo.setTunnelQos(qos);
createTunnelRequest.setImportTunnel(createTunnelInfo);
CreateTunnelResponse createTunnelResponse = client.createTunnel(userId, createTunnelRequest);
String tunnelId = createTunnelResponse.headers.get("x-oss-import-tunnel-id");
// 填寫代理名稱。
String agentName = "exampleagent";
// 使用網絡,公網請填寫public, 專線或者VPN請填寫vpc。
String agentEndpoint = "public";
// 部署方式,目前僅支持填寫default。
String deployMethod = "default";
CreateAgentRequest createAgentRequest = new CreateAgentRequest();
CreateAgentInfo createAgentInfo = new CreateAgentInfo();
createAgentInfo.setName(agentName);
createAgentInfo.setTunnelId(tunnelId);
createAgentInfo.setAgentEndpoint(agentEndpoint);
createAgentInfo.setDeployMethod(deployMethod);
createAgentRequest.setImportAgent(createAgentInfo);
client.createAgent(userId, createAgentRequest);
/** 步驟2:創建源地址和目的地址并驗證可用性,這里以創建源為oss,目的為oss的地址為例,其它源示例請參見數據地址篇創建數據地址小節。 */
String srcAddress = "examplesrcaddress";
String destAddress = "exampledestaddress";
AddressDetail addrssdetail = new AddressDetail();
addrssdetail.addressType = "oss";
// 以下參數請根據實際值填寫。
addrssdetail.bucket = "examplebucket";
addrssdetail.prefix = "***/";
addrssdetail.regionId = "oss-cn-beijing";
addrssdetail.role = "rolename_xxxxx";
CreateAddressRequest createAddressRequest = new CreateAddressRequest();
CreateAddressInfo createAddressInfo = new CreateAddressInfo();
createAddressInfo.name = addressName;
createAddressInfo.setAddressDetail(addrssdetail);
createAddressRequest.setImportAddress(createAddressInfo);
client.createAddress(userId, createAddressRequest);
// 驗證源端數據地址可用性。
VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, srcAddress);
if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
return;
}
addrssdetail = new AddressDetail();
addrssdetail.addressType = "oss";
// 以下參數請根據實際值填寫。
addrssdetail.bucket = "examplebucket";
addrssdetail.prefix = "***/";
addrssdetail.regionId = "oss-cn-beijing";
addrssdetail.role = "rolename_xxxxx";
createAddressRequest = new CreateAddressRequest();
createAddressInfo = new CreateAddressInfo();
createAddressInfo.name = addressName;
createAddressInfo.setAddressDetail(addrssdetail);
createAddressRequest.setImportAddress(createAddressInfo);
client.createAddress(userId, createAddressRequest);
verifyAddressResponse = client.verifyAddress(userId, destAddress);
if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
return;
}
/** 步驟3:創建并且啟動任務 */
// 填寫任務名稱。
String jobName = "examplejob";
CreateJobInfo createJobInfo = new CreateJobInfo();
createJobInfo.name = jobName;
createJobInfo.srcAddress = srcAddress;
createJobInfo.destAddress = destAddress;
/*
overwriteMode和transferMode需要組合使用,具體組合含義如下
always,all 全覆蓋
always,lastmodified 根據文件最后修改時間覆蓋
never,all 不覆蓋
*/
/* 文件覆蓋方式, 可能得值為 1.never 2.always */
createJobInfo.overwriteMode = "always";
/* 文件傳輸的方式, 可能得值為 1.changed 2.all 3.lastmodified */
createJobInfo.transferMode = "lastmodified";
// 如果maxImportTaskQps值為0或者不設置,會被設置為默認值;MaxBandWidth值為0或者不設置,會被設置為默認值,maxBandWidth值單位為bits。
ImportQos importQos = new ImportQos();
// maxBandWidth,maxImportTaskQps 根據實際需求值填寫。
importQos.maxBandWidth = 1073741824L;
importQos.maxImportTaskQps = 1000L;
createJobInfo.setImportQos(importQos);
// 配置調度規則,具體參數含義請參看API文檔。
ScheduleRule scheduleRule = new ScheduleRule();
// maxScheduleCount,startCronExpression,suspendCronExpression 根據實際需求值填寫。
scheduleRule.maxScheduleCount = 2L;
scheduleRule.startCronExpression = "0 0 10 * * ?";
scheduleRule.suspendCronExpression = "0 0 14 * * ?";
createJobInfo.setScheduleRule(scheduleRule);
// 配置過濾規則,具體參數含義請參看API文檔。
FilterRule filterRule = new FilterRule();
// 文件過濾器,根據實際需求值填寫。
KeyFilters keyFilters = new KeyFilters();
KeyFilterItem includekeyFilterItem = new KeyFilterItem();
List<String> includeRegex = new LinkedList<>();
includeRegex.add(".*.jpg");
includeRegex.add(".*.jpeg");
includekeyFilterItem.setRegex(includeRegex);
KeyFilterItem excludekeyFilterItem = new KeyFilterItem();
List<String> excludeRegex = new LinkedList<>();
excludeRegex.add(".*.txt");
excludeRegex.add(".*.js");
excludekeyFilterItem.setRegex(excludeRegex);
keyFilters.setIncludes(includekeyFilterItem);
keyFilters.setExcludes(excludekeyFilterItem);
filterRule.setKeyFilters(keyFilters);
// 時間過濾器, 時間格式遵循UTC時間格式,根據實際需求值填寫。
LastModifiedFilters lastModifiedFilters = new LastModifiedFilters();
LastModifyFilterItem includeLastModifyFilterItem = new LastModifyFilterItem();
List<TimeFilter> includeRegexs = new LinkedList<>();
TimeFilter includeTimeFilter = new TimeFilter();
includeTimeFilter.startTime = "2006-01-01T00:00:00Z";
includeTimeFilter.endTime = "2006-12-31T23:59:59Z";
includeRegexs.add(includeTimeFilter);
includeLastModifyFilterItem.setTimeFilter(includeRegexs);
LastModifyFilterItem excludeLastModifyFilterItem = new LastModifyFilterItem();
List<TimeFilter> excludeRegexs = new LinkedList<>();
TimeFilter excludeTimeFilter = new TimeFilter();
excludeTimeFilter.startTime = "2008-01-01T00:00:00Z";
excludeTimeFilter.endTime = "2008-12-31T23:59:59Z";
excludeRegexs.add(excludeTimeFilter);
excludeLastModifyFilterItem.setTimeFilter(excludeRegexs);
lastModifiedFilters.setIncludes(includeLastModifyFilterItem);
lastModifiedFilters.setExcludes(excludeLastModifyFilterItem);
filterRule.setLastModifiedFilters(lastModifiedFilters);
createJobInfo.setFilterRule(filterRule);
CreateJobRequest createJobRequest = new CreateJobRequest();
createJobRequest.setImportJob(createJobInfo);
client.createJob(userId, createJobRequest);
UpdateJobRequest updateJobRequest = new UpdateJobRequest();
UpdateJobInfo updateJobInfo = new UpdateJobInfo();
updateJobInfo.setStatus("IMPORT_JOB_LAUNCHING");
updateJobRequest.setImportJob(updateJobInfo);
client.updateJob(userId, jobName, updateJobRequest);
/** 步驟4:循環查看當前任務狀態 */
for(;;) {
GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
System.out.println("job is interrupted");
return;
}
if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
System.out.println("job is finished");
break;
}
Thread.sleep(6000);
}
/** 步驟5:任務結束后查看結果。 */
ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
ListJobHistoryResponse listJobHistoryResp = client.listJobHistory(userId, jobName, listJobHistoryRequest);
System.out.println(new Gson().toJson(listJobHistoryResp.body.jobHistoryList.jobHistory))
} catch (Exception e) {
e.printStackTrace();
}
}
}
場景二:重試失敗文件
遷移任務運行完成后,可能有失敗文件,遷移服務會為這些失敗文件構造一份失敗文件列表,以下示例先獲取失敗文件列表的詳情,然后據此創建一個新的數據地址,再創建一個重試子任務。通過這種方式,您能夠重新遷移這些失敗的文件。
package sample;
import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
public class Demo {
/** 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。*/
static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/** 填寫主賬號ID。*/
static String userId = "11470***876***55";
private static final String AVAILABLE = "available";
private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
private static final String INTERRUPT = "Interrupt";
private static final String NONEED = "NoNeed";
private static final String READY = "Ready";
private static final String SYSTEM = "SYSTEM";
private static final String LAUNCHING = "IMPORT_JOB_LAUNCHING";
public static void main(String[] args) {
try {
/** 步驟1:初始化 */
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
// 這里以北京區域為例。
config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
config.setRegionId(accessKeyId);
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(accessKeySecret);
Client client = new Client(config);
// 下列參數值請根據實際需求填寫。
int runtimeId = 1;
String jobName = "examplejob";
String srcAddress = "examplesrcaddress";
String destAddress = "exampledestaddress";
/** 步驟2:循環查看任務狀態,查看任務是否結束或者中斷。 */
for(;;) {
GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
System.out.println("job is interrupted");
return;
}
if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
System.out.println("job is finished");
break;
}
Thread.sleep(6000);
}
/** 步驟3:ListJobHistory,查看是否有失敗文件。 */
ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
listJobHistoryRequest.runtimeId = runtimeId;
ListJobHistoryResponse listJobHistoryResponse = client.listJobHistory(userId, jobName, listJobHistoryRequest);
if (listJobHistoryResponse.body.jobHistoryList.jobHistory.size() < 1) {
System.out.println("list job history length less than 1");
return;
}
if (listJobHistoryResponse.body.jobHistoryList.jobHistory.get(0).failedCount <= 0) {
System.out.println("job no need retry");
return;
}
/** 步驟4:循環GetJobResult,等待Ready。 */
for (;;) {
GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
getJobResultRequest.setRuntimeId(runtimeId);
GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
if ("NoNeed".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
System.out.println("job no need retry");
return;
}
if ("Ready".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
break;
}
Thread.sleep(6000);
}
retry(client, jobName, srcAddress, destAddress, runtimeId);
} catch (Exception e) {
e.printStackTrace();
}
}
static void retry(Client client, String jobName, String srcAddress, String destAddress, int runtimeId) throws Exception {
/** 步驟5:根據失敗文件清單信息創建新的數據地址并且驗證是否可用。 */
GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
getJobResultRequest.runtimeId = runtimeId;
GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
GetJobResponse getJobResponse = client.getJob(userId, jobName, new GetJobRequest());
GetAddressResponse getAddressResponse = client.getAddress(userId, srcAddress);
CreateAddressRequest request = new CreateAddressRequest();
CreateAddressInfo info = new CreateAddressInfo();
long time = System.currentTimeMillis();
info.name = srcAddress + "_" + time + "_retry";
AddressDetail detail = new AddressDetail();
detail.addressType = getJobResultResponse.body.importJobResult.addressType;
detail.invLocation = getJobResultResponse.body.importJobResult.invLocation;
detail.invBucket = getJobResultResponse.body.importJobResult.invBucket;
detail.invAccessId = SYSTEM;
detail.invAccessSecret = SYSTEM;
detail.invPath = getJobResultResponse.body.importJobResult.invPath;
detail.invRegionId = getJobResultResponse.body.importJobResult.invRegionId;
detail.domain = getAddressResponse.body.importAddress.addressDetail.domain;
detail.regionId = getAddressResponse.body.importAddress.addressDetail.regionId;
detail.accessId = getAddressResponse.body.importAddress.addressDetail.accessId;
detail.accessSecret = getAddressResponse.body.importAddress.addressDetail.accessSecret;
detail.agentList = getAddressResponse.body.importAddress.addressDetail.agentList;
detail.role = getAddressResponse.body.importAddress.addressDetail.role;
detail.prefix = getAddressResponse.body.importAddress.getAddressDetail().prefix;
detail.invRole = getAddressResponse.body.importAddress.addressDetail.invRole;
detail.bucket = getAddressResponse.body.importAddress.addressDetail.bucket;
info.setAddressDetail(detail);
request.setImportAddress(info);
client.createAddress(userId, request);
VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, info.name);
if (!verifyAddressResponse.body.verifyAddressResponse.status.equals(AVAILABLE)) {
throw new Exception("retry srcaddress unavailable");
}
/** 步驟6:創建重試子任務并啟動。 */
CreateJobRequest createJobRequest = new CreateJobRequest();
CreateJobInfo createJobInfo = new CreateJobInfo();
createJobInfo.name = jobName + "_" + time + "_retry";
createJobInfo.transferMode = getJobResponse.body.importJob.transferMode;
createJobInfo.overwriteMode = getJobResponse.body.importJob.overwriteMode;
createJobInfo.srcAddress = info.name;
createJobInfo.destAddress = destAddress;
createJobInfo.parentVersion = getJobResultResponse.body.importJobResult.version;
createJobInfo.filterRule = getJobResponse.body.importJob.filterRule;
createJobInfo.audit = getJobResponse.body.importJob.audit;
createJobInfo.createReport = getJobResponse.body.importJob.createReport;
createJobRequest.setImportJob(createJobInfo);
client.createJob(userId, createJobRequest);
UpdateJobRequest updateJobRequest = new UpdateJobRequest();
UpdateJobInfo updateJobInfo = new UpdateJobInfo();
updateJobInfo.status = LAUNCHING;
updateJobRequest.setImportJob(updateJobInfo);
client.updateJob(userId, createJobInfo.name, updateJobRequest);
}
}
文檔內容是否對您有幫助?