日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

日志服務SLS

更新時間:

本文為您介紹如何使用日志服務SLS連接器。

背景信息

日志服務是針對日志類數據的一站式服務。日志服務可以幫助您快捷地完成數據采集、消費、投遞以及查詢分析,提升運維和運營效率,建立海量日志處理能力。

SLS連接器支持的信息如下。

類別

詳情

支持類型

源表和結果表

運行模式

僅支持流模式

特有監控指標

暫不適用

數據格式

暫無

API種類

SQL

是否支持更新或刪除結果表數據

不支持更新和刪除結果表數據,只支持插入數據。

特色功能

SLS連接器源表支持直接讀取消息的屬性字段,支持的屬性字段如下。

字段名

字段類型

字段說明

__source__

STRING METADATA VIRTUAL

消息源。

__topic__

STRING METADATA VIRTUAL

消息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日志時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

消息TAG。

對于屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會被作為KV對,記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

前提條件

已創建日志服務Project和Logstore,詳情請參見創建Project和Logstore

使用限制

  • 僅實時計算引擎VVR 2.0.0及以上版本支持日志服務SLS連接器。

  • SLS連接器僅保證At-Least-Once語義。

  • 僅實時計算引擎VVR 4.0.13及以上版本支持Shard數目變化觸發自動Failover功能。

  • 強烈建議不要設置Source并發度大于Shard個數,不僅會造成資源浪費,且在8.0.5及更低版本中,如果后續Shard數目發生變化,自動Failover功能可能會失效,造成部分Shard不被消費。

語法結構

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    請填寫SLS的私網服務地址,詳情請參見服務接入點

    說明
    • 實時計算Flink版默認不具備訪問公網的能力,但阿里云提供的NAT網關可以實現VPC網絡與公網網絡互通,詳情請參見控制臺操作

    • 不建議跨公網訪問SLS。如確有需要,請使用HTTPS網絡傳輸協議并且開啟SLS全球加速服務,詳情請參見管理傳輸加速

    project

    SLS項目名稱。

    String

    無。

    logStore

    SLS LogStore或metricstore名稱。

    String

    logStore和metricstore是相同的消費方式。

    accessId

    阿里云賬號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    accessKey

    阿里云賬號的AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    enableNewSource

    是否啟用實現了FLIP-27接口的新數據源。

    Boolean

    false

    新數據源可以自動適應shard變化,同時盡可能保證shard在所有的source并發上分布均勻。

    說明

    僅實時計算引擎VVR 8.0.9及以上版本支持該參數。

    重要

    作業在該配置項發生變化后無法從狀態恢復。

    可通過先設置配置項consumerGroup啟動作業,將消費進度記錄到SLS消費組中,再將配置項consumeFromCheckpoint設為true后無狀態啟動作業,從而實現從歷史進度繼續消費。

    shardDiscoveryIntervalMs

    動態檢測shard變化時間間隔,單位為毫秒。

    Long

    60000

    設置為負值時可以關閉動態檢測。

    說明
    • 僅當配置項enableNewSource為true時生效。

    • 僅實時計算引擎VVR 8.0.9及以上版本支持該參數。

    startupMode

    源表啟動模式。

    String

    timestamp

    參數取值如下:

    • timestamp(默認):從指定的起始時間開始消費日志。

    • latest:從最新位點開始消費日志。

    • earliest:從最早位點開始消費日志。

    說明

    若將配置項consumeFromCheckpoint設為true,則會從指定的消費組中保存的Checkpoint開始消費日志,此處的啟動模式將不會生效。

    startTime

    消費日志的開始時間。

    String

    當前時間

    格式為yyyy-MM-dd hh:mm:ss。

    僅當startupMode設為timestamp時生效。

    說明

    startTime和stopTime基于SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費日志的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    說明

    如期望日志消費到結尾時退出Flink程序,需要同時設置exitAfterFinish=true.

    consumerGroup

    消費組名稱。

    String

    消費組用于記錄消費進度。您可以自定義消費組名,無固定格式。

    說明

    不支持通過相同的消費組進行多作業的協同消費。不同的Flink作業應該設置不同的消費組。如果不同的Flink作業使用相同的消費組,它們將會消費全部數據。這是因為在Flink消費SLS的數據時,并不會經過SLS消費組進行分區分配,因此導致各個消費者獨立消費各自的消息,即使消費組是相同的。

    consumeFromCheckpoint

    是否從指定的消費組中保存的Checkpoint開始消費日志。

    String

    false

    參數取值如下:

    • true:必須同時指定消費組,Flink程序會從消費組中保存的Checkpoint開始消費日志,如果該消費組沒有對應的Checkpoint,則從startTime配置值開始消費。

    • false(默認值):不從指定的消費組中保存的Checkpoint開始消費日志。

    說明

    僅實時計算引擎VVR 6.0.5及以上版本支持該參數。

    maxRetries

    讀取SLS失敗后重試次數。

    String

    3

    無。

    batchGetSize

    單次請求讀取logGroup的個數。

    String

    100

    batchGetSize設置不能超過1000,否則會報錯。

    exitAfterFinish

    在數據消費完成后,Flink程序是否退出。

    String

    false

    參數取值如下:

    • true:數據消費完后,Flink程序退出。

    • false(默認):數據消費完后,Flink程序不退出。

    說明

    僅實時計算引擎VVR 4.0.13及以上版本支持該參數。

    query

    SLS消費預處理語句。

    String

    通過使用query參數,您可以在消費SLS數據之前對其進行過濾,以避免將所有數據都消費到Flink中,從而實現節約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS數據前,先匹配出request_method字段值等于get的數據。

    說明

    query需使用日志服務SPL語言,請參見SPL概述

    重要
    • 僅實時計算引擎VVR 8.0.1及以上版本支持該參數。

    • 日志服務SLS支持該功能的地域請參見基于規則消費日志

    • 公測階段免費,后續可能會在產生日志服務SLS費用,詳情請參見費用說明

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    topicField

    指定字段名,該字段的值會覆蓋__topic__屬性字段的值,表示日志的主題。

    String

    該參數值是表中已存在的字段之一。

    timeField

    指定字段名,該字段的值會覆蓋__timestamp__屬性字段的值,表示日志寫入時間。

    String

    當前時間

    該參數值是表中已存在的字段之一,且字段類型必須為INT。如果未指定,則默認填充當前時間。

    sourceField

    指定字段名,該字段的值會覆蓋__source__屬性字段的值,表示日志的來源地,例如產生該日志機器的IP地址。

    String

    該參數值是表中已存在的字段之一。

    partitionField

    指定字段名,數據寫入時會根據該列值計算Hash值,Hash值相同的數據會寫入同一個shard。

    String

    如果未指定,則每條數據會隨機寫入當前可用的Shard中。

    buckets

    當指定partitionField時,根據Hash值重新分組的個數。

    String

    64

    該參數的取值范圍是[1, 256],且必須是2的整數次冪。同時,buckets個數應當大于等于Shard個數,否則會出現部分Shard沒有數據寫入的情況。

    說明

    僅實時計算引擎VVR 6.0.5及以上版本支持該參數。

    flushIntervalMs

    觸發數據寫入的時間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空字符串寫入SLS。

    Boolean

    true

    參數取值如下:

    • true(默認值):將null值作為空字符串寫入日志。

    • false:計算結果為null的字段不會寫入到日志中。

    說明

    僅實時計算引擎VVR 8.0.6及以上版本支持該參數。

類型映射

Flink字段類型

SLS字段類型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

代碼示例

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DataStream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法

讀取SLS

實時計算引擎VVR提供SourceFunction的實現類SlsSourceFunction,用于讀取SLS,讀取SLS的示例如下。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

寫入SLS

提供OutputFormat的實現類SLSOutputFormat,用于寫入SLS。寫入SLS的示例如下。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Maven中央庫中已經放置了SLS DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題

恢復失敗的Flink程序時,TaskManager發生OOM,源表報錯java.lang.OutOfMemoryError: Java heap space