您可以通過HDFS表引擎或表函數讀寫數據。本文為您介紹如何將HDFS中的數據導入至ClickHouse集群以及如何從ClickHouse集群導出數據到HDFS。
前提條件
已創建Hadoop集群,詳情請參見創建集群。
已創建ClickHouse集群,詳情請參見創建ClickHouse集群。
注意事項
本文代碼示例中HDFS URL中的9000為非HA模式下NameNode的端口,如果使用的是HA模式下的NameNode,則端口通常為8020。
HDFS集群數據導入至ClickHouse集群
步驟一:創建業務表
使用SSH方式登錄ClickHouse集群,詳情請參見登錄集群。
執行以下命令,進入ClickHouse客戶端。
clickhouse-client -h core-1-1 -m
說明本示例登錄core-1-1節點,如果您有多個Core節點,可以登錄任意一個節點。
執行以下命令,創建數據庫product,并在product數據庫中創建業務表orders。
CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}') PARTITION BY toYYYYMMDD(date) ORDER BY toYYYYMMDD(date); CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) Engine = Distributed(cluster_emr, product, orders, rand());
說明示例中的{shard}和{replica}是阿里云EMR為ClickHouse集群自動生成的宏定義,可以直接使用。
步驟二:導入數據
通過HDFS表引擎導入數據
ClickHouse的HDFS表引擎能夠從指定HDFS地址讀取特定格式的文件數據,語法如下:
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2],
...
)
Engine = HDFS(uri, format);
參數 | 描述 |
| 數據庫名。 |
| 表名。 |
| 列名。 |
| 列的類型。 |
| HDFS上文件的地址。 |
| 文件的類型。 |
其中uri不能為目錄地址,且文件所屬的目錄需要存在,否則寫數據時會報錯。
創建HDFS引擎表并準備數據。
下載并上傳示例數據orders.csv至HDFS集群的目錄下,本文將文件上傳到了HDFS集群的根目錄下。
執行以下命令創建數據庫hdfs和HDFS表。
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
說明本文示例是將示例數據上傳到了HDFS集群的根目錄下。代碼中的
192.168.**.**
為HDFS集群的core-1-1節點的內網IP地址,您可以在EMR控制臺的節點管理頁簽查看。
執行以下命令將數據導入product.orders_all表中。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs.orders;
執行以下命令檢查數據一致性。
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
通過HDFS表函數導入數據
ClickHouse的hdfs函數能夠從指定HDFS地址讀取文件數據,返回指定結構的表,語法如下:
hdfs(uri, format, structure);
參數 | 描述 |
| HDFS上文件的地址。 |
| 文件的類型。 |
| 表中字段的類型。例如,column1 UInt32,column2 String。 |
其中uri不能為目錄地址,且文件所屬的目錄需要存在,否則寫數據時會報錯。
下載并上傳示例數據orders.csv至HDFS集群的目錄下,本文將文件上傳到了HDFS集群的根目錄下。
執行以下命令將數據導入product.orders_all表中。
INSERT INTO product.orders_all SELECT uid, date, skuId, order_revenue FROM hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32');
說明本文示例是將示例數據上傳到了HDFS集群的根目錄下。代碼中的
192.168.**.**
為HDFS集群的core-1-1節點的內網IP地址,您可以在EMR控制臺的節點管理頁簽查看。執行以下命令檢查數據一致性。
SELECT a.* FROM hdfs.orders a LEFT ANTI JOIN product.orders_all USING uid;
ClickHouse集群數據導出至HDFS
步驟一:創建業務表
本文中導出操作使用的業務表結構與導入操作的業務表結構相同,具體創建操作可查看步驟一:創建業務表。
步驟二:數據準備
執行以下命令向product.orders_all業務表中插入數據,為后續導出操作準備數據。
INSERT INTO product.orders_all VALUES (60333391,'2021-08-04 11:26:01',49358700,89) (38826285,'2021-08-03 10:47:29',25166907,27) (10793515,'2021-07-31 02:10:31',95584454,68) (70246093,'2021-08-01 00:00:08',82355887,97) (70149691,'2021-08-02 12:35:45',68748652,1) (87307646,'2021-08-03 19:45:23',16898681,71) (61694574,'2021-08-04 23:23:32',79494853,35) (61337789,'2021-08-02 07:10:42',23792355,55) (66879038,'2021-08-01 16:13:19',95820038,89);
(可選)設置導出方式,EMR-5.8.0及之后、EMR-3.45.0及之后版本可通過設置寫入方式來避免路徑上文件已存在的問題。
增量導出
設置后若文件已存在會在對應目錄下新建文件并存放數據。
set hdfs_create_new_file_on_insert=1
覆蓋導出
設置后若文件已存在會覆蓋原有數據,請謹慎設置。
set hdfs_truncate_on_insert=1
步驟三:導出數據
通過HDFS表引擎導出數據
執行以下命令創建HDFS表。
CREATE DATABASE IF NOT EXISTS hdfs ON CLUSTER cluster_emr; CREATE TABLE IF NOT EXISTS hdfs.orders ON CLUSTER cluster_emr ( `uid` UInt32, `date` DateTime, `skuId` UInt32, `order_revenue` UInt32 ) ENGINE = HDFS('hdfs://192.168.**.**:9000/orders.csv', 'CSV');
說明本文示例將數據導出至HDFS集群的根目錄下。代碼中的
192.168.**.**
為HDFS集群的core-1-1節點的內網IP地址,您可以在EMR控制臺的節點管理頁簽查看。執行以下命令導出數據,數據通過HDFS表引擎存放到相應地址。
INSERT INTO hdfs.orders SELECT uid, date, skuId, order_revenue FROM product.orders_all;
說明ClickHouse在數據導出時會在相應地址上創建文件并寫入數據,默認方式在文件已存在情況下導出失敗。EMR-5.8.0、EMR-3.45.0之后的版本可通過配置參數來避免此問題。
執行以下命令,可以檢查數據一致性。
SELECT a.* FROM hdfs.orders RIGHT ANTI JOIN product.orders_all a USING uid;
通過HDFS表函數導出數據
執行以下命令導出數據。
INSERT INTO FUNCTION hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32') SELECT uid, date, skuId, order_revenue FROM product.orders_all;
說明ClickHouse在數據導出時會在相應地址上創建文件并寫入數據,默認方式在文件已存在情況下導出失敗。EMR-5.8.0、EMR-3.45.0之后的版本可通過配置參數來避免此問題。
執行以下命令可以檢查數據一致性。
SELECT
a.*
FROM
hdfs('hdfs://192.168.**.**:9000/orders.csv', 'CSV', 'uid UInt32, date DateTime, skuId UInt32, order_revenue UInt32')
RIGHT ANTI JOIN
product.orders_all a
USING uid;
配置
EMR ClickHouse允許使用對HDFS進行配置:
全局生效的HDFS配置。
<hdfs> <dfs_default_replica>3</dfs_default_replica> </hdfs>
HDFS參數的詳細信息,請參見官網文檔HDFS Configuration Reference。
說明查詢參數時將下劃線(_)替換為半角句號(.)即可。例如,您要查詢EMR中的參數
dfs_default_replica
,則可以在官網文檔中搜索dfs.default.replica
。僅對${user}用戶生效的HDFS配置,用戶配置與全局配置相同的鍵不同值時,會覆蓋全局配置。
<hdfs_${user}> <dfs_default_replica>3</dfs_default_replica> </hdfs_${user}>