日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

云原生多模數據庫Lindorm

更新時間:

本文為您介紹如何使用云原生多模數據庫Lindorm連接器。

背景信息

Lindorm是面向物聯網、互聯網、車聯網等設計和優化的云原生多模超融合數據庫,是日志、監控、賬單、廣告、社交、出行、風控等場景首選數據庫,也是為阿里巴巴核心業務提供支撐的數據庫之一。詳情請參見什么是云原生多模數據庫Lindorm

具備以下特性:

  • 支持寬表、時序、文本、對象、流、空間等多種數據的統一訪問和融合處理。

  • 兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多種標準接口和無縫集成三方生態工具。

Lindorm連接器支持的信息如下。

類別

詳情

支持類型

維表和結果表

運行模式

僅支持流模式

數據格式

暫不適用

特有監控指標

  • 維表:無

  • 結果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

支持的Lindorm引擎

寬表引擎

是否支持更新或刪除結果表數據

前提條件

  • 已經創建了Lindorm寬表引擎以及數據表,詳情請參見創建實例

  • Lindorm集群與Flink全托管集群處于網絡連通的環境下,例如在同一個VPC下。

使用限制

僅Flink計算引擎VVR 4.0.8及以上版本支持Lindorm。

語法結構

CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

WITH參數

類型

參數

說明

數據類型

是否必填

默認值

備注

通用

connector

表類型。

String

固定值為lindorm。

seedserver

Lindorm服務器的連接地址。

String

實時計算Flink版使用HBase Java API的方式連接并使用Lindorm寬表引擎。Lindorm服務器的連接地址的格式為host:port。詳情請參見通過Flink訪問寬表引擎

namespace

Lindorm的命名空間。

String

無。

username

連接Lindorm所用到的用戶名。

String

無。

password

連接Lindorm所用到的密碼。

String

無。

tableName

Lindorm表名。

String

無。

columnFamily

Lindorm表的列族名。

String

如果創建Lindorm表時未指定列族名,則填寫默認列族名f。

retryIntervalMs

讀取或寫入失敗時,再次重試讀取的時間間隔。

Integer

1000

單位為毫秒。

maxRetryTimes

最大嘗試次數。

Integer

5

無。

結果表獨有

bufferSize

一次批量寫入數據的條數。

Integer

500

無。

flushIntervalMs

當數據量比較少時,多長時間寫入一次。

Integer

2000

單位為毫秒。

ignoreDelete

是否忽略Delete操作。

Boolean

false

參數取值如下:

  • true:忽略。

  • false(默認):不忽略。

dynamicColumnSink

是否開啟動態表模式。關于動態表模式的介紹,請參見動態表模式

Boolean

false

參數取值如下:

  • true:開啟動態表模式。

  • false(默認):不開啟動態表模式。

說明

實時計算引擎VVR 6.0.2及以上版本支持該參數。

excludeUpdateColumns

指定字段忽略更新,不會插入結果表。

String

使用逗號分隔要忽略的字段。例如:excludeUpdateColumns='a,b,c',代表忽略更新a,b,c三個字段。

說明

實時計算引擎VVR 8.0.9及以上版本支持該參數。

維表獨有

partitionedJoin

是否額外使用JoinKey進行分區。

Boolean

false

參數取值如下:

  • true:用JoinKey進行分區,將數據分發到Join節點,提高緩存命中率。

  • false(默認值):不使用JoinKey進行分區。

shuffleEmptyKey

遇到空Key時,是否將Key為空的記錄隨機向下游Shuffle。

Boolean

false

參數取值如下:

  • true:隨機往下游做Shuffle。

  • false(默認值):從下游中編號為0的并發開始做Shuffle,即從第一個并發開始。

cache

緩存策略。

String

None

目前Lindorm支持以下兩種緩存策略:

  • None(默認值):無緩存。

  • LRU:只保留最近使用的數據。

需要配置相關參數:緩存大小(cacheSize)和緩存失效超時時間(cacheTTLMs)。

cacheSize

緩存數據的行數。

Integer

1000

當選擇LRU緩存策略后,使用本參數可以設置緩存大小。

cacheTTLMs

緩存失效超時時間。

Integer

單位為毫秒。當選擇LRU緩存策略后,可以設置緩存失效的超時時間,默認不過期。

cacheEmpty

是否緩存join結果為空的數據。

Boolean

true

無。

async

是否異步返回數據。

Boolean

false

參數取值如下:

  • true:表示異步返回數據。

  • false(默認值):表示不進行異步返回數據。

asyncLindormRpcTimeoutMs

在異步請求數據時的超時時間。

Integer

300000

單位毫秒。

動態表模式

動態表模式適用于在表定義中并未指定列名的情況,根據作業運行情況動態創建數據列并插入的場景。例如統計每天每小時的交易量,以天作為主鍵,小時作為列,每個小時的數據都是動態生成的,示例如下。

主鍵

列名:0點

列名:1點

2025-06-01

45

32

2025-06-02

76

34

動態表需要遵循特殊的DDL定義。其主鍵需要定義為前若干列,最后兩列中前一列的值作為列名變量,最后一列的值作為該列對應的值,且要求最后兩列的類型均為varchar。代碼示例如下。

CREATE TABLE lindorm_dynamic_output(
pk1varchar,
pk2varchar,
pk3varchar,
c1varchar,
c2varchar,
PRIMARYKEY(pk1,pk2,pk3)notenforced
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);

上述定義中,pk1、pk2、pk3為主鍵,c1、c2為動態表模式所必須的兩列且一定為最后兩列,不可存在其他的非主鍵的列。每次寫入數據時,會在主鍵<pk1, pk2, pk3>對應的條目中添加或更改一列,列名為c1的值,該列的值為c2的值。每次一條數據來臨時,只會添加或更改一列對應的值,其他列的值不會改變。

類型映射

Lindorm中數據均為二進制形式,通過Flink某個字段類型來轉換或解析二進制的Bytes方法如下。

Flink SQL類型

轉換為寫入的Bytes使用的方法

從Lindorm讀取Bytes之后的解析

CHAR

org.apache.flink.table.data.StringData::toBytes

org.apache.flink.table.data.StringData::fromBytes

VARCHAR

BOOLEAN

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

BINARY

直接為bytes的形式。

直接返回bytes。

VARBINARY

DECIMAL

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal)

com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal

TINYINT

直接將數據封裝成byte[]的第一個byte。

直接返回bytes[0]。

SMALLINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short)

com.alibaba.lindorm.client.core.utils.Bytes::toShort

INT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)

com.alibaba.lindorm.client.core.utils.Bytes::toInt

BIGINT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)

com.alibaba.lindorm.client.core.utils.Bytes::toLong

FLOAT

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float)

com.alibaba.lindorm.client.core.utils.Bytes::toFloat

DOUBLE

com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double)

com.alibaba.lindorm.client.core.utils.Bytes::toDouble

DATE

獲取自1970.01.01以來的天數后,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。

com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以來的天數。

TIME

獲取自當天00:00:00以來的毫秒數后,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。

com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自當天00:00:00以來的毫秒數。

TIMESTAMP

獲取自1970.01.01 00:00:00以來的毫秒數后,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。

com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以來的毫秒數。

代碼示例

CREATE TEMPORARY TABLE example_source(
 id INT,
 proc_time AS PROCTIME()
) WITH (
 'connector' = 'datagen',
 'number-of-rows' = '10',
 'fields.id.kind' = 'sequence',
 'fields.id.start' = '0',
 'fields.id.end' = '9'
);

CREATE TEMPORARY TABLE lindorm_hbase_dim(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_dim_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

CREATE TEMPORARY TABLE lindorm_hbase_sink(
 `id` INT,
 `name` VARCHAR,
 `birth` VARCHAR,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector'='lindorm',
 'tablename'='${lindorm_sink_table}',
 'seedserver'='${lindorm_seed_server}',
 'namespace'='default',
 'username'='${lindorm_username}',
 'password'='${lindorm_username}'
);

INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;