本文為您介紹如何使用日志服務SLS連接器。
背景信息
日志服務是針對日志類數據的一站式服務。日志服務可以幫助您快捷地完成數據采集、消費、投遞以及查詢分析,提升運維和運營效率,建立海量日志處理能力。
SLS連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表和結果表 |
運行模式 | 僅支持流模式 |
特有監控指標 | 暫不適用 |
數據格式 | 暫無 |
API種類 | SQL |
是否支持更新或刪除結果表數據 | 不支持更新和刪除結果表數據,只支持插入數據。 |
特色功能
SLS連接器源表支持直接讀取消息的屬性字段,支持的屬性字段如下。
字段名 | 字段類型 | 字段說明 |
__source__ | STRING METADATA VIRTUAL | 消息源。 |
__topic__ | STRING METADATA VIRTUAL | 消息主題。 |
__timestamp__ | BIGINT METADATA VIRTUAL | 日志時間。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 消息TAG。 對于屬性 |
前提條件
已創建日志服務Project和Logstore,詳情請參見創建Project和Logstore。
使用限制
僅實時計算引擎VVR 2.0.0及以上版本支持日志服務SLS連接器。
SLS連接器僅保證At-Least-Once語義。
僅實時計算引擎VVR 4.0.13及以上版本支持Shard數目變化觸發自動Failover功能。
強烈建議不要設置Source并發度大于Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果后續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。
語法結構
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值sls。
endPoint
EndPoint地址。
String
是
無
請填寫SLS的私網服務地址,詳情請參見服務接入點。
project
SLS項目名稱。
String
是
無
無。
logStore
SLS LogStore或metricstore名稱。
String
是
無
logStore和metricstore是相同的消費方式。
accessId
阿里云賬號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。
accessKey
阿里云賬號的AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
enableNewSource
是否啟用實現了FLIP-27接口的新數據源。
Boolean
否
false
新數據源可以自動適應shard變化,同時盡可能保證shard在所有的source并發上分布均勻。
說明僅實時計算引擎VVR 8.0.9及以上版本支持該參數。
重要作業在該配置項發生變化后無法從狀態恢復。
可通過先設置配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true后無狀態啟動作業,從而實現從歷史進度繼續消費。
shardDiscoveryIntervalMs
動態檢測shard變化時間間隔,單位為毫秒。
Long
否
60000
設置為負值時可以關閉動態檢測。
說明僅當配置項enableNewSource為true時生效。
僅實時計算引擎VVR 8.0.9及以上版本支持該參數。
startupMode
源表啟動模式。
String
否
timestamp
參數取值如下:
timestamp(默認):從指定的起始時間開始消費日志。
latest:從最新位點開始消費日志。
earliest:從最早位點開始消費日志。
說明若將配置項consumeFromCheckpoint設為true,則會從指定的消費組中保存的Checkpoint開始消費日志,此處的啟動模式將不會生效。
startTime
消費日志的開始時間。
String
否
當前時間
格式為yyyy-MM-dd hh:mm:ss。
僅當startupMode設為timestamp時生效。
說明startTime和stopTime基于SLS中的__receive_time__屬性,而非__timestamp__屬性。
stopTime
消費日志的結束時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。
說明如期望日志消費到結尾時退出Flink程序,需要同時設置exitAfterFinish=true.
consumerGroup
消費組名稱。
String
否
無
消費組用于記錄消費進度。您可以自定義消費組名,無固定格式。
說明不支持通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設置不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部數據。這是因為在Flink消費SLS的數據時,并不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的消息,即使消費組是相同的。
consumeFromCheckpoint
是否從指定的消費組中保存的Checkpoint開始消費日志。
String
否
false
參數取值如下:
true:必須同時指定消費組,Flink程序會從消費組中保存的Checkpoint開始消費日志,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。
false(默認值):不從指定的消費組中保存的Checkpoint開始消費日志。
說明僅實時計算引擎VVR 6.0.5及以上版本支持該參數。
maxRetries
讀取SLS失敗后重試次數。
String
否
3
無。
batchGetSize
單次請求讀取logGroup的個數。
String
否
100
batchGetSize設置不能超過1000,否則會報錯。
exitAfterFinish
在數據消費完成后,Flink程序是否退出。
String
否
false
參數取值如下:
true:數據消費完后,Flink程序退出。
false(默認):數據消費完后,Flink程序不退出。
說明僅實時計算引擎VVR 4.0.13及以上版本支持該參數。
query
SLS消費預處理語句。
String
否
無
通過使用query參數,您可以在消費SLS數據之前對其進行過濾,以避免將所有數據都消費到Flink中,從而實現節約成本和提高處理速度的目的。
例如
'query' = '*| where request_method = ''GET'''
表示在Flink讀取SLS數據前,先匹配出request_method字段值等于get的數據。說明query需使用日志服務SPL語言,請參見SPL概述。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
topicField
指定字段名,該字段的值會覆蓋__topic__屬性字段的值,表示日志的主題。
String
否
無
該參數值是表中已存在的字段之一。
timeField
指定字段名,該字段的值會覆蓋__timestamp__屬性字段的值,表示日志寫入時間。
String
否
當前時間
該參數值是表中已存在的字段之一,且字段類型必須為INT。如果未指定,則默認填充當前時間。
sourceField
指定字段名,該字段的值會覆蓋__source__屬性字段的值,表示日志的來源地,例如產生該日志機器的IP地址。
String
否
無
該參數值是表中已存在的字段之一。
partitionField
指定字段名,數據寫入時會根據該列值計算Hash值,Hash值相同的數據會寫入同一個shard。
String
否
無
如果未指定,則每條數據會隨機寫入當前可用的Shard中。
buckets
當指定partitionField時,根據Hash值重新分組的個數。
String
否
64
該參數的取值范圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大于等于Shard個數,否則會出現部分Shard沒有數據寫入的情況。
說明僅實時計算引擎VVR 6.0.5及以上版本支持該參數。
flushIntervalMs
觸發數據寫入的時間周期。
String
否
2000
單位為毫秒。
writeNullProperties
是否將null值作為空字符串寫入SLS。
Boolean
否
true
參數取值如下:
true(默認值):將null值作為空字符串寫入日志。
false:計算結果為null的字段不會寫入到日志中。
說明僅實時計算引擎VVR 8.0.6及以上版本支持該參數。
類型映射
Flink字段類型 | SLS字段類型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
代碼示例
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DataStream API
通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法。
讀取SLS
實時計算引擎VVR提供SourceFunction的實現類SlsSourceFunction,用于讀取SLS,讀取SLS的示例如下。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
寫入SLS
提供OutputFormat的實現類SLSOutputFormat,用于寫入SLS。寫入SLS的示例如下。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
XML
Maven中央庫中已經放置了SLS DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>
常見問題
恢復失敗的Flink程序時,TaskManager發生OOM,源表報錯java.lang.OutOfMemoryError: Java heap space