通過(guò)SDK投遞數(shù)據(jù)到OSS
本文中含有需要您注意的重要提示信息,忽略該信息可能對(duì)您的業(yè)務(wù)造成影響,請(qǐng)務(wù)必仔細(xì)閱讀。
使用SDK進(jìn)行數(shù)據(jù)投遞前,您需要了解使用數(shù)據(jù)湖投遞功能的注意事項(xiàng)、接口等信息。創(chuàng)建投遞任務(wù)后,表格存儲(chǔ)數(shù)據(jù)表中的數(shù)據(jù)會(huì)自動(dòng)投遞到OSS Bucket中存儲(chǔ)。
注意事項(xiàng)
目前支持使用數(shù)據(jù)湖投遞功能的地域有華東1(杭州)、華東2(上海)、華北2(北京)和華北3(張家口)。
數(shù)據(jù)湖投遞不支持同步刪除操作,表格存儲(chǔ)中的刪除操作在數(shù)據(jù)投遞時(shí)會(huì)被忽略,已投遞到OSS中的數(shù)據(jù)不會(huì)被刪除。
新建數(shù)據(jù)投遞任務(wù)時(shí)存在最多1分鐘的初始化時(shí)間。
數(shù)據(jù)同步存在延遲,寫(xiě)入速率穩(wěn)定時(shí),延遲在3分鐘內(nèi)。數(shù)據(jù)同步的P99延遲在10分鐘內(nèi)。
說(shuō)明P99延遲表示過(guò)去10秒內(nèi)最慢的1%的請(qǐng)求的平均延遲。
前提條件
在對(duì)象存儲(chǔ)服務(wù)側(cè)已完成如下操作:
已開(kāi)通OSS服務(wù)且在表格存儲(chǔ)實(shí)例所在地域創(chuàng)建Bucket,詳情請(qǐng)參見(jiàn)開(kāi)通OSS服務(wù)。
說(shuō)明數(shù)據(jù)湖投遞支持投遞到和表格存儲(chǔ)相同地域的任意OSS Bucket中。如需投遞到其他數(shù)倉(cāng)存儲(chǔ)(例如MaxCompute),請(qǐng)提交工單申請(qǐng)。
在表格存儲(chǔ)服務(wù)側(cè)已完成如下操作:
已在實(shí)例詳情頁(yè)面獲取實(shí)例的服務(wù)地址(Endpoint)。更多信息,請(qǐng)參考服務(wù)地址。
已創(chuàng)建表格存儲(chǔ)數(shù)據(jù)表。具體操作,請(qǐng)參見(jiàn)創(chuàng)建數(shù)據(jù)表。
在訪問(wèn)控制RAM服務(wù)側(cè)完成如下操作:
已創(chuàng)建RAM用戶并為RAM用戶授予管理表格存儲(chǔ)權(quán)限(AliyunOTSFullAccess)。具體操作,請(qǐng)參見(jiàn)創(chuàng)建RAM用戶和為RAM用戶授權(quán)。
警告阿里云賬號(hào)AccessKey泄露會(huì)威脅您所有資源的安全。建議您使用RAM用戶AccessKey進(jìn)行操作,可以有效降低AccessKey泄露的風(fēng)險(xiǎn)。
已為RAM用戶創(chuàng)建AccessKey。具體操作,請(qǐng)參見(jiàn)創(chuàng)建AccessKey。
已配置訪問(wèn)憑證。具體操作,請(qǐng)參見(jiàn)配置訪問(wèn)憑證。
接口
接口 | 說(shuō)明 |
CreateDeliveryTask | 創(chuàng)建一個(gè)投遞任務(wù)。 |
ListDeliveryTask | 列出一個(gè)數(shù)據(jù)表所有的投遞任務(wù)信息。 |
DescribeDeliveryTask | 查詢投遞任務(wù)描述信息。 |
DeleteDeliveryTask | 刪除一個(gè)投遞任務(wù)。 |
參數(shù)
參數(shù) | 說(shuō)明 |
tableName | 數(shù)據(jù)表名稱(chēng)。 |
taskName | 投遞任務(wù)名稱(chēng)。 名稱(chēng)只能包含英文小寫(xiě)字母(a~z)、數(shù)字和短橫線(-),開(kāi)頭和結(jié)尾必須為英文小寫(xiě)字母或數(shù)字,且長(zhǎng)度為3~16字符。 |
taskConfig | 投遞任務(wù)配置,包括如下選項(xiàng):
|
taskType | 投遞任務(wù)的類(lèi)型,包括如下選項(xiàng):
|
使用
您可以通過(guò)Java SDK、Go SDK實(shí)現(xiàn)數(shù)據(jù)湖投遞功能。此處以Java SDK為例介紹數(shù)據(jù)投遞湖的操作。
以下示例用于為數(shù)據(jù)表創(chuàng)建投遞任務(wù)。
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {
public static void main(String[] args) {
final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "yourinstancename";
SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
try {
createDeliveryTask(client);
System.out.println("end");
} catch (TableStoreException e) {
System.err.println("操作失敗,詳情:" + e.getMessage() + e.getErrorCode() + e.toString());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("請(qǐng)求失敗,詳情:" + e.getMessage());
} finally {
client.shutdown();
}
}
private static void createDeliveryTask(SyncClient client){
String tableName = "sampleTable";
String taskName = "sampledeliverytask";
OSSTaskConfig taskConfig = new OSSTaskConfig();
taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
taskConfig.setOssBucket("datadeliverytest");
taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
//eventColumn為可選配置,指定按某一列數(shù)據(jù)的時(shí)間進(jìn)行分區(qū)。如果不設(shè)置此參數(shù),則按數(shù)據(jù)寫(xiě)入表格存儲(chǔ)的時(shí)間進(jìn)行分區(qū)。
EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
taskConfig.setEventTimeColumn(eventColumn);
taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
request.setTableName(tableName);
request.setTaskName(taskName);
request.setTaskConfig(taskConfig);
request.setTaskType(DeliveryTaskType.BASE_INC);
CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
System.out.println("resquestID: "+ response.getRequestId());
System.out.println("traceID: " + response.getTraceId());
System.out.println("create delivery task success");
}
}