日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Kafka寫入數據

Lindorm流引擎提供了100%兼容Flink SQL的能力。您可以將原始數據存儲在Kafka Topic,并通過Flink SQL在流引擎中創建實時計算任務,對原始數據進行高效計算和處理。本文介紹如何使用Flink SQL提交流引擎計算任務將Kafka Topic中的數據導入至Lindorm寬表。

前提條件

  • 已開通Lindorm流引擎。如何開通,請參見開通流引擎。

  • 已將客戶端IP地址添加至Lindorm實例的白名單中。具體操作,請參見設置白名單。

注意事項

如果您的應用部署在ECS實例,且想要通過專有網絡訪問Lindorm實例,則需要確保Lindorm實例和ECS實例滿足以下條件,以保證網絡的連通性。

  • 所在地域相同,并建議所在可用區相同(以減少網絡延時)。

  • ECS實例與Lindorm實例屬于同一專有網絡。

操作步驟

步驟一:數據準備

  1. 通過Kafka API將源數據寫入Kafka Topic。共支持以下兩種寫入方式:

    以通過開源Kafka腳本工具寫入數據為例。

    #創建Topic
    ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create
    
    #寫入數據
    ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic
    {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"}
    {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"}
    {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}

    Lindorm Stream Kafka地址的獲取方式請參見查看連接地址

  2. 在寬表引擎中創建結果表,用于存儲計算任務的處理結果。

    1. 通過Lindorm-cli連接寬表引擎。如何連接,請參見通過Lindorm-cli連接并使用寬表引擎。

    2. 創建結果表log。

      CREATE TABLE IF NOT EXISTS log (
        loglevel VARCHAR,
        thread VARCHAR,
        class VARCHAR,
        detail VARCHAR,
        timestamp BIGINT,
      primary key (loglevel, thread) );

步驟二:安裝流引擎客戶端

  1. 在ECS上執行以下命令,下載流引擎客戶端壓縮包。

    wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
  2. 執行以下命令,解壓壓縮包。

    tar zxvf lindorm-sqlline-2.0.2.tar.gz
  3. 進入lindorm-sqlline-2.0.2/bin目錄,執行以下命令連接至Lindorm流引擎。

    ./lindorm-sqlline -url <Lindorm Stream SQL地址>

    Lindorm Stream SQL地址的獲取方式,請參見查看連接地址。

步驟三:在流引擎中提交計算任務

示例計算任務的具體實現如下:

  1. 創建名為log_to_lindorm的Flink Job,并在Flink Job中創建兩張表:originalData(Source表)和lindorm_log_table(Sink表),分別關聯已創建的Kafka Topic和結果表log。

  2. 創建流任務,過濾掉loglevel為ERROR的日志,將過濾結果寫入結果表log中。

具體代碼如下:

CREATE FJOB log_to_lindorm(
    --Kafka Source表
    CREATE TABLE originalData(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT
    )WITH(
        'connector'='kafka',
        'topic'='log_topic',
        'scan.startup.mode'='earliest-offset',
        'properties.bootstrap.servers'='Lindorm Stream Kafka地址',
        'format'='json'
    );
    --Lindorm寬表 Sink表
    CREATE TABLE lindorm_log_table(
        `loglevel` VARCHAR,
        `thread` VARCHAR,
        `class` VARCHAR,
        `detail` VARCHAR,
        `timestamp` BIGINT,
        PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
    )WITH(
        'connector'='lindorm',
        'seedServer'='Lindorm寬表引擎的HBase兼容地址',
        'userName'='root',
        'password'='test',
        'tableName'='log',
        'namespace'='default'
    );
    --過濾Kafka中的ERROR日志,將結果寫入Lindorm寬表
    INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);
說明

步驟四:查詢流引擎處理結果

支持以下兩種查詢方式:

  • 通過Lindorm-cli連接寬表引擎并執行以下命令查詢處理結果。

    SELECT * FROM log LIMIT 5;

    返回結果:

    +----------+----------+-------------------------+-----------------------+---------------+
    | loglevel |  thread  |          class          |        detail         |   timestamp   |
    +----------+----------+-------------------------+-----------------------+---------------+
    | ERROR    | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 |
    | ERROR    | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 |
    +----------+----------+-------------------------+-----------------------+---------------+                                     
  • 通過寬表引擎的集群管理系統查詢處理結果,具體操作請參見數據查詢。