本文介紹如何將Doris數據遷移至云原生數據倉庫AnalyticDB PostgreSQL版。
準備工作
已開通IDC專線服務。具體內容,請參見通過物理專線實現本地IDC與云上VPC互通。
已開通阿里云對象存儲服務(OSS)。具體內容,請參見什么是對象存儲OSS。
已創建OSS存儲空間。具體內容,請參見創建存儲空間。
已創建云原生數據倉庫AnalyticDB PostgreSQL版實例。具體內容,請參見創建實例。
操作步驟
步驟一:創建用于裝載數據的目標表
在云原生數據倉庫AnalyticDB PostgreSQL版實例中創建用于裝載Doris數據的目標表。目標表結構需與源表結構對應。建表語法,請參見建表語句。
步驟二:將Doris的數據導入到OSS
Doris支持使用S3協議將數據導出至對象存儲,OSS同樣支持S3協議。您可以將數據直接上傳至OSS,但針對不同的Doris版本其支持的導出能力也有所不同:
Doris2.0及以上版本支持使用
EXPORT TABLE
語句導出數據,導出格式為csv、text、orc和parquet。其中以parquet格式導出的數據與主流parquet格式并不兼容,并且使用csv或text格式不支持導出數據中含有換行符等特殊字符,因此更推薦使用orc格式,該格式導出速度較快。Doris1.2版本的
EXPORT TABLE
語句僅支持導出格式為csv,導出格式較為單一,不推薦使用。推薦使用SELECT INTO OUTFILE
語句,導出格式為orc,速度相比EXPORT TABLE
語句稍慢一些,但支持使用WHERE
子句進行過濾。
使用示例
Doris2.0和Doris1.2版本的導出語句如下:
------ Doris 2.0
EXPORT TABLE s3_test TO "s3://bucket/dir/"
PROPERTIES (
"format"="orc"
)
WITH s3 (
"AWS_ENDPOINT" = "oss-cn-shanghai-internal.aliyuncs.com",
"AWS_ACCESS_KEY" = "LTA****",
"AWS_SECRET_KEY" = "lQEI1TSJIY0******",
"AWS_REGION" = "shanghai"
)
---- Doris 1.2 c列 為datetime格式
SELECT a, b, CAST(c AS string) AS c FROM s3_test INTO OUTFILE "s3://bucket/dir/"
FORMAT AS orc
PROPERTIES
(
"AWS_ENDPOINT" = "oss-cn-shanghai-internal.aliyuncs.com",
"AWS_ACCESS_KEY" = "LTA****",
"AWS_SECRET_KEY" = "lQEI1TSJIY0******",
"AWS_REGION" = "shanghai"
);
如果表中含有DATETIME類型的列,則只能使用SELECT INTO FILE語句。因為Doris的DATETIME類型導出的格式與主流產品不兼容,需要轉換為STRING類型。
步驟三:將OSS數據導入到AnalyticDB PostgreSQL版
您可以通過COPY命令或使用OSS外表將數據導入云原生數據倉庫AnalyticDB PostgreSQL版:
使用COPY命令導入OSS數據的方法,請參見使用COPY或UNLOAD命令導入或導出數據到OSS。
使用OSS外表導入OSS數據的方法,請參見使用OSS Foreign Table進行數據湖分析。
使用示例
使用COPY命令導入OSS的語句如下:
COPY test1 FROM 'oss://bucket/dir/' ACCESS_KEY_ID 'LTAI5t********' SECRET_ACCESS_KEY 'lQEI1T*******'
FORMAT AS orc ENDPOINT 'oss-*****-
internal.aliyuncs.com' FDW 'oss_fdw' ;
語法轉換
數據類型
Doris | AnalyticDB PostgreSQL版 | 備注 |
BOOLEAN | BOOLEAN | 無 |
TINYINT | SMALLINT | 云原生數據倉庫AnalyticDB PostgreSQL版沒有TINYINT。 |
SMALLINT | SMALLINT | 無 |
INT | INT | 無 |
BIGINT | BIGINT | 無 |
LARGEINT | DECIMAL | 無 |
FLOAT | FLOAT | 無 |
DOUBLE | DOUBLE | 無 |
DECIMAL | DECIMAL | 無 |
DATE | DATE | 無 |
DATETIME | TIMESTAMP/TIMSTAMPTZ | 無 |
CHAR | CHAR | 無 |
VARCHAR | VARCHAR | 無 |
STRING | TEXT | 無 |
HLL | / |
|
BITMAP | / | BITMAP類型的列可以在Aggregate表或Unique表中使用。 |
QUANTILE_STATE | / | 無 |
ARRAY | [ ] | 無 |
MAP | 自定義復合類型 | 無 |
STRUCT | 自定義復合類型 | 無 |
JSON | JSON | 無 |
AGG_STATE | / | 無 |
VARIANT | 自定義復合類型 | 無 |
建表語句
以下為幾種常見的建表語句模型。
模型一:明細模型
明細模型沒有主鍵和聚合列限制,在建表語句中指定的DUPLICATE KEY,是用來指明底層數據按照哪些列進行排序。在云原生數據倉庫AnalyticDB PostgreSQL版中對應轉換為AOCS或BEAM表,并使用ORDER BY語句指定排序鍵,啟動AUTOMERGE可以定期進行數據自動排序。
使用示例
CREATE TABLE IF NOT EXISTS example_tbl_by_default
(
`timestamp` DATETIME NOT NULL COMMENT "日志時間",
`type` INT NOT NULL COMMENT "日志類型",
`error_code` INT COMMENT "錯誤碼",
`error_msg` VARCHAR(1024) COMMENT "錯誤詳細信息",
`op_id` BIGINT COMMENT "負責人id",
`op_time` DATETIME COMMENT "處理時間"
)
DUPLICATE KEY (`timestamp`,`type`,`error_code`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_tbl_by_default
(
"timestamp" TIMESTAMP NOT NULL ,
"type" INT NOT NULL ,
error_code INT ,
error_msg VARCHAR(1024),
op_id BIGINT,
op_time TIMESTAMP
)
WITH(appendonly = true, orientation = column)
DISTRIBUTED BY("type")
ORDER BY("timestamp","type",error_code);
COMMENT ON COLUMN example_tbl_by_default.timestamp IS '日志時間';
模型二:主鍵模型
主鍵模型的主要目的是為了確保數據主鍵的唯一性,使用UNIQUE KEY來指定唯一性約束,在云原生數據倉庫AnalyticDB PostgreSQL版中對應heap表,并使用PRIMARY KEY來指定唯一鍵。
使用示例
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
`user_id` LARGEINT NOT NULL COMMENT "用戶id",
`username` VARCHAR(50) NOT NULL COMMENT "用戶昵稱",
`city` VARCHAR(20) COMMENT "用戶所在城市",
`age` SMALLINT COMMENT "用戶年齡",
`sex` TINYINT COMMENT "用戶性別",
`phone` LARGEINT COMMENT "用戶電話",
`address` VARCHAR(500) COMMENT "用戶地址",
`register_time` DATETIME COMMENT "用戶注冊時間"
)
UNIQUE KEY(`user_id`, `username`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_tbl_unique
(
user_id BIGINT NOT NULL,
username VARCHAR(50) NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex SMALLINT,
phone BIGINT,
address VARCHAR(500),
register_time TIMESTAMP,
PRIMARY KEY (user_id, username)
)
DISTRIBUTED BY (user_id);
COMMENT ON COLUMN example_tbl_unique.user_id IS '用戶id';
模型三:聚合模型
聚合模型導入數據時,會將Aggregate Key列相同的行聚合成一行,而將Value列按照設置的 AggregationType
進行聚合。在云原生數據倉庫AnalyticDB PostgreSQL版中對應使用heap表 ,對于Aggregate Key創建唯一索引,同時插入數據時使用UPSERT方式進行。具體內容,請參見使用INSERT ON CONFLICT覆蓋寫入數據。
使用示例
CREATE TABLE IF NOT EXISTS example_tbl_agg1
(
`user_id` LARGEINT NOT NULL COMMENT "用戶id",
`date` DATE NOT NULL COMMENT "數據灌入日期時間",
`city` VARCHAR(20) COMMENT "用戶所在城市",
`age` SMALLINT COMMENT "用戶年齡",
`sex` TINYINT COMMENT "用戶性別",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用戶最后一次訪問時間",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用戶總消費",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用戶最大停留時間",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用戶最小停留時間"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
-----AnalyticDB PostgreSQL 不支持自動預聚合
CREATE TABLE IF NOT EXISTS example_tbl_agg1
(
user_id BIGINT NOT NULL,
"date" DATE NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex SMALLINT,
last_visit_date TIMESTAMP DEFAULT '1970-01-01 00:00:00',
cost BIGINT DEFAULT 0,
max_dwell_time INT DEFAULT 0,
min_dwell_time INT DEFAULT 99999,
UNIQUE (user_id, "date", city, age, sex)
)
DISTRIBUTED BY(user_id);
INSERT INTO example_tbl_agg1 VALUES (10000,'2024-08-22','beijing', 18, 0, '2024-08-22 12:00:00', 20, 1000, 1000) ON CONFLICT (user_id, "date", city, age, sex) DO UPDATE SET last_visit_date = excluded.last_visit_date, cost = example_tbl_agg1.cost + excluded.cost, max_dwell_time = GREATEST(example_tbl_agg1.max_dwell_time, excluded.max_dwell_time), min_dwell_time = LEAST(example_tbl_agg1.min_dwell_time, excluded.min_dwell_time);
分區分桶
Doris通過PARTITION BY進行分區,DISTRIBUTED BY進行分桶,使用BUCKETS指定分桶數量,在云原生數據倉庫AnalyticDB PostgreSQL版中對應分區鍵PARTITION BY和分布鍵DISTRIBUTED BY。
使用示例
CREATE TABLE IF NOT EXISTS example_range_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用戶id",
`date` DATE NOT NULL COMMENT "數據灌入日期時間",
`timestamp` DATETIME NOT NULL COMMENT "數據灌入的時間戳",
`city` VARCHAR(20) COMMENT "用戶所在城市",
`age` SMALLINT COMMENT "用戶年齡",
`sex` TINYINT COMMENT "用戶性別",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用戶最后一次訪問時間",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用戶總消費",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用戶最大停留時間",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用戶最小停留時間"
)
ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01"),
PARTITION `p2018` VALUES [("2018-01-01"), ("2019-01-01"))
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
PROPERTIES
(
"replication_num" = "1"
);
----AnalyticDB PostgreSQL
CREATE TABLE IF NOT EXISTS example_range_tbl
(
user_id BIGINT NOT NULL,
"date" DATE NOT NULL,
city VARCHAR(20),
age SMALLINT,
sex SMALLINT,
visit_date TIMESTAMP DEFAULT '1970-01-01 00:00:00',
a_cost BIGINT DEFAULT 0,
dwell_time INT DEFAULT 0
)
PARTITION BY RANGE("date")
(
PARTITION p201701 VALUES START ("2017-02-01") INCLUSIVE,
PARTITION p201702 VALUES START ("2017-03-01") INCLUSIVE,
PARTITION p201703 VALUES START ("2017-04-01") INCLUSIVE,
PARTITION p2018 VALUES START ("2018-01-01") INCLUSIVE END ("2019-01-01") EXCLUSIVE
)
DISTRIBUTED BY (user_id);