Flink Connector 配置
Flink Connector 使用
Connector 準(zhǔn)備
參考管理自定義連接器,上傳igraph Connector的jar包。
上傳成功如圖所示
Propoerties詳解
名稱 | 類型 | 是否必須 | 描述 |
endpoint | string | 是 | igraph實(shí)例對(duì)應(yīng)的endpoint |
username | string | 是 | igraph請(qǐng)求的username |
password | string | 是 | igraph請(qǐng)求的password |
graph_name | string | 是 | 要寫入的igraph圖名 |
label_name | string | 是 | 要寫入的igraph邊或者節(jié)點(diǎn)名 |
pk_field | string | 是 | igraph sink表的pk字段 |
sk_field | string | 否 | igraph sink表的sk字段 |
cmd_field | string | 是 | igraph sink表的cmd字段,取值為add、delete,用于表示這條記錄是新增或刪除請(qǐng)求 |
request_retry | int | 否,默認(rèn)0 | 請(qǐng)求igraph的重試次數(shù),默認(rèn)為0 |
dry_run | boolean | 否,默認(rèn)false | 如果dry_run為true,只做測(cè)試,不會(huì)真實(shí)寫入igraph |
async | boolean | 否,默認(rèn)為 false | 打開會(huì)使用異步模式,寫入性能會(huì)有提升 |
Flink SQL demo
節(jié)點(diǎn)(kv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'cmd_field' = 'cmd',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
邊(kkv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey VARCHAR
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'sk_field' = 'skey',
'cmd_field' = 'add',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
需要注意內(nèi)容:
pk_field為寫入igraph表的主鍵,必填字段
cmd_field取值為add、delete,表示是插入或刪除請(qǐng)求。如果是add請(qǐng)求,默認(rèn)需要傳入表的全部字段,如果是update請(qǐng)求默認(rèn)需要全部字段,如果需要支持部分字段的更新連續(xù)管理員配置
如果對(duì)邊進(jìn)行delete操作,只填寫pkey 則默認(rèn)刪除這個(gè)起點(diǎn)出發(fā)的全部邊,如果填寫pkey和skey則只刪除一條邊數(shù)據(jù)
graph_name 為寫入igraph的圖名
label_name 為寫入igraph圖的節(jié)點(diǎn)或者邊的label
request_retry,為了避免網(wǎng)絡(luò)抖動(dòng)等原因,設(shè)置寫入igraph的重試請(qǐng)求次數(shù)
dry_run,為true時(shí)用作測(cè)試,會(huì)打log,不寫igraph。