您可以通過Kafka表引擎導入數據至ClickHouse集群。本文為您介紹如何將Kafka中的數據導入至ClickHouse集群。

前提條件

  • 已創建DataFlow集群,且選擇了Kafka服務,詳情請參見創建集群
  • 已創建ClickHouse集群,詳情請參見創建集群

使用限制

DataFlow集群和ClickHouse集群需要在同一VPC下。

語法

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host1:port1,host2:port2',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format';
其中,涉及參數描述如下表所示。
參數 描述
db 數據庫名。
table_name 表名。
cluster 集群標識。
name1/name2 列名。
tyep1/type2 列的類型。
kafka_broker_list Kafka Broker的地址及端口。

DataFlow集群所有節點的內網IP地址及端口,您可以在EMR控制臺集群管理頁簽中的主機列表頁面查看。

kafka_topic_list 訂閱的Topic名稱。
kafka_group_name Kafka consumer的分組名稱。
kafka_format 數據的類型。例如,CSV和JSONEachRow等,詳細信息請參見Formats for Input and Output Data

示例

  1. 在ClickHouse集群中執行以下操作。
    1. 使用SSH方式登錄ClickHouse集群,詳情請參見登錄集群
    2. 執行如下命令,進入ClickHouse客戶端。
      clickhouse-client -h core-1-1 -m
      說明 本示例登錄core-1-1節點,如果您有多個Core節點,可以登錄任意一個節點。
    3. 執行如下命令,創建數據庫kafka。
      CREATE DATABASE IF NOT EXISTS kafka ON CLUSTER cluster_emr;
      說明 數據庫名您可以自定義。本文示例中的cluster_emr是集群默認的標識,如果您修改過,請填寫正確的集群標識,您也可以在EMR控制臺ClickHouse服務的配置頁面,在搜索區域搜索clickhouse_remote_servers參數查看。
    4. 執行如下命令,創建Kafka表。
      CREATE TABLE IF NOT EXISTS kafka.consumer ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      ENGINE = Kafka()
      SETTINGS
        kafka_broker_list = '192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092',
        kafka_topic_list = 'clickhouse_test',
        kafka_group_name = 'clickhouse_test',
        kafka_format = 'CSV';
      kafka_broker_list為DataFlow集群所有節點的內網IP地址及端口,您可以在EMR控制臺的節點管理頁面查看。其余參數含義請參見語法IP address
    5. 執行如下命令,創建數據庫product。
      CREATE DATABASE IF NOT EXISTS product ON CLUSTER cluster_emr;
    6. 執行以下命令,創建本地表。
      CREATE TABLE IF NOT EXISTS product.orders ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = ReplicatedMergeTree('/cluster_emr/product/orders/{shard}', '{replica}')
      PARTITION BY toYYYYMMDD(date)
      ORDER BY toYYYYMMDD(date);
    7. 執行以下命令,創建分布式表。
      CREATE TABLE IF NOT EXISTS product.orders_all ON CLUSTER cluster_emr
      (
          `uid` UInt32,
          `date` DateTime,
          `skuId` UInt32,
          `order_revenue` UInt32
      )
      Engine = Distributed(cluster_emr, product, orders, rand());
    8. 執行以下命令,創建MATERIALIZED VIEW自動導數據。
      CREATE MATERIALIZED VIEW IF NOT EXISTS product.kafka_load ON CLUSTER cluster_emr TO product.orders AS
      SELECT *
      FROM kafka.consumer;
  2. 在DataFlow集群中執行以下操作。
    1. 使用SSH方式登錄DataFlow集群,詳情請參見登錄集群
    2. 在DataFlow集群的命令行窗口,執行如下命令運行Kafka的生產者。
      /usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list 192.168.**.**:9092,192.168.**.**:9092,192.168.**.**:9092 --topic clickhouse_test
    3. 執行以下命令,輸入測試數據。
      38826285,2021-08-03 10:47:29,25166907,27
      10793515,2021-07-31 02:10:31,95584454,68
      70246093,2021-08-01 00:00:08,82355887,97
      70149691,2021-08-02 12:35:45,68748652,1
      87307646,2021-08-03 19:45:23,16898681,71
      61694574,2021-08-04 23:23:32,79494853,35
      61337789,2021-08-02 07:10:42,23792355,55
      66879038,2021-08-01 16:13:19,95820038,89
  3. 在ClickHouse命令窗口中,執行以下命令,可以查看從Kafka中導入至ClickHouse集群的數據。

    您可以校驗查詢到的數據與源數據是否一致。

    SELECT * FROM product.orders_all;
    Result_click