數(shù)據(jù)接入
數(shù)據(jù)接入服務,是一種接入領域服務,但是無須改造原始服務代碼的接入模式,這種模式依賴于云測運行的函數(shù)腳本和需要連接的服務端進行連接,并在函數(shù)中進行業(yè)務邏輯轉(zhuǎn)換,并將轉(zhuǎn)換結(jié)果暴露到兩條總線上的接入方式。
1.接入原理
2.腳本編寫描述
2.1 依賴的二方包
函數(shù)腳本通過阿里云函數(shù)計算實現(xiàn),所以需要引入對函數(shù)計算SDK的依賴。
<dependency>
<groupId>com.aliyun.fc.runtime</groupId>
<artifactId>fc-java-core</artifactId>
<version>1.2.1</version>
</dependency>
2.2 函數(shù)請求參數(shù)
函數(shù)入口傳入?yún)?shù)需轉(zhuǎn)換為基本請求參數(shù),基本請求參數(shù)中各屬性定義如下:
屬性名稱 | 作用 | 默認參數(shù) |
---|---|---|
id | 請求中全局唯一ID | 無 |
version | 請求協(xié)議版本 | 無 |
request | 系統(tǒng)參數(shù) | 服務模型:{ “type”:”SERVICE”, “apiPath”:”調(diào)用的服務接口apiPath}“數(shù)據(jù)模型:{ “type”:”DATA”, “dataModelId”:”數(shù)據(jù)模型ID”} |
params | 業(yè)務參數(shù) | 1. 服務模型:服務模型中對應接口的入?yún)?. 數(shù)據(jù)模型:{“startTime”: “unix時間戳,秒級”,“endTime”:”unix時間戳,秒級”} |
2.3 函數(shù)請求BaseRequest
package yourPackage;
import java.io.Serializable;
import java.util.Map;
public class BaseRequest implements Serializable {
private static final long serialVersionUID = 1L;
/**
* request里的全局唯一id透傳
*/
private String id;
/**
* 請求協(xié)議版本
*/
private String version;
/**
* 系統(tǒng)參數(shù)
*/
private Map<String, Object> request;
/**
* 業(yè)務參數(shù)
*/
private Map<String,Object> params;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public Map<String, Object> getRequest() {
return request;
}
public void setRequest(Map<String, Object> request) {
this.request = request;
}
public Map<String,Object> getParams() {
return params;
}
public void setParams(Map<String,Object> params) {
this.params = params;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"id\":\"")
.append(id == null ? "" : id).append('\"');
sb.append(",\"version\":\"")
.append(version == null ? "" : version).append('\"');
sb.append(",\"request\":")
.append(request);
sb.append(",\"params\":")
.append(params);
sb.append('}');
return sb.toString();
}
}
2.4 函數(shù)響應BaseResponse
package yourPackage;
import java.io.Serializable;
public class BaseResponse<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* request里的全局唯一id透傳
*/
private String id;
/**
* code
*/
private int code = 200;
/**
* 失敗時必填,錯誤調(diào)試信息;成功時不填
*/
private String message;
/**
* 失敗時必填,用戶可理解語言描述的錯誤信息;成功時不填
*/
private String localizedMsg;
/**
* 成功時必填,失敗時選填
*/
private T data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getLocalizedMsg() {
return localizedMsg;
}
public void setLocalizedMsg(String localizedMsg) {
this.localizedMsg = localizedMsg;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"id\":\"")
.append(id == null ? "" : id).append('\"');
sb.append(",\"code\":")
.append(code);
sb.append(",\"message\":\"")
.append(message == null ? "" : message).append('\"');
sb.append(",\"localizedMsg\":\"")
.append(localizedMsg == null ? "" : localizedMsg).append('\"');
sb.append(",\"data\":")
.append(data);
sb.append('}');
return sb.toString();
}
}
2.5 Demo的具體實現(xiàn)
package yourPackage;
import com.alibaba.fastjson.JSON;
import com.aliyun.fc.runtime.Context;
import com.aliyun.fc.runtime.FunctionInitializer;
import com.aliyun.fc.runtime.StreamRequestHandler;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class ApplicationAccessDemo implements StreamRequestHandler, FunctionInitializer {
@Override
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
if (inputStream != null) {
String request = IOUtils.toString(inputStream, "UTF-8");
BaseRequest baseRequest = JSON.parseObject(request, BaseRequest.class);
if (StringUtils.isNotEmpty(request) && baseRequest != null && MapUtils.isEmpty(baseRequest.getRequest())) {
//定時任務啟動時解析傳入請求
baseRequest = JSON.parseObject((String) JSON.parseObject(request).get("payload"), BaseRequest.class);
}
String type = MapUtils.getString(baseRequest.getRequest(), "type");
BaseResponse baseResponse = null;
if ("SERVICE".equals(type)) {
String apiPath = MapUtils.getString(baseRequest.getRequest(), "apiPath");
//TODO 服務模型具體的處理邏輯,返回baseResponse
} else if ("DATA".equals(type)) {
String dataModelId = MapUtils.getString(baseRequest.getRequest(), "dataModelId");
//TODO 數(shù)據(jù)模型具體的處理邏輯,返回baseResponse
}
//輸出baseResponse
try {
outputStream.write(JSON.toJSONString(baseResponse).getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void initialize(Context context) throws IOException {
//TODO 按需編寫初始化操作,如agent的啟動等等
}
}
文件開發(fā)完成,打包后在控制臺上傳即可(mvn clean package -Dmaven.skip.test=true)。注意在上傳時函數(shù)入口與初始化函數(shù)入口的填寫,以demo為例,函數(shù)入口yourPackage.ApplicationAccessDemo::handleRequest ,初始化入口:yourPackage.ApplicationAccessDemo::initialize。
2.6 網(wǎng)絡代理的使用
見網(wǎng)絡代理使用文檔。
3. 業(yè)務操作流程
3.1 創(chuàng)建轉(zhuǎn)換器
在應用服務平臺中>數(shù)據(jù)接入
>新增轉(zhuǎn)換器
,選擇腳本所對應的模型
3.2 轉(zhuǎn)換器-上傳函數(shù)
創(chuàng)建完的轉(zhuǎn)換器中將會展示所有領域模型關聯(lián)的服務模型和數(shù)據(jù)模型,每個模型都可以上傳一個函數(shù)腳本,腳本格式為jar包,并且填寫函數(shù)的主入口和初始化入口
3.3 轉(zhuǎn)換器-數(shù)據(jù)模型配置
數(shù)據(jù)模型的對接是通過定期器周期性調(diào)用函數(shù),讓函數(shù)將服務端側(cè)的數(shù)據(jù)寫入數(shù)據(jù)模型,所以需要配置一個調(diào)用時間周期
3.4 轉(zhuǎn)換器-手工觸發(fā)
腳本上傳后,可以對腳本進行測試或者手工的觸發(fā)臨時發(fā)起
手工觸發(fā)服務模型,需要在
請求參數(shù)
參數(shù)中輸入對應的入?yún)son手工觸發(fā)數(shù)據(jù)模型,選擇觸發(fā)的時間段
3.5 轉(zhuǎn)換器-綁定至項目
進入具體項目,在項目功能中選擇數(shù)據(jù)接入
,并添加轉(zhuǎn)換器
3.6 轉(zhuǎn)換器-啟動轉(zhuǎn)換器
轉(zhuǎn)換器進入正式運行狀態(tài),需要人工啟動任務,任務啟動后數(shù)據(jù)模型的定時任務將打開,服務模型可以被項目中其他應用調(diào)用