日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

訪問Kafka數(shù)據(jù)

本文介紹如何通過Lindorm計(jì)算引擎訪問Kafka數(shù)據(jù)。您可以將Kafka實(shí)例中的數(shù)據(jù)加載到Lindorm計(jì)算引擎中,結(jié)合Lindorm計(jì)算引擎的其他數(shù)據(jù)源數(shù)據(jù),完成復(fù)雜的數(shù)據(jù)生產(chǎn)作業(yè)。

前提條件

  • 已開通Lindorm實(shí)例的計(jì)算引擎服務(wù)。具體操作,請參見開通與變配。

  • 已創(chuàng)建與Lindorm實(shí)例位于同一地域的Kafka實(shí)例。具體操作,請參見公網(wǎng)和VPC接入。

  • 已在Kafka實(shí)例中創(chuàng)建Topic。具體操作,請參見步驟三:創(chuàng)建資源。

操作步驟

  1. 啟動Beeline,并訪問Lindorm計(jì)算引擎服務(wù)。具體操作,請參見使用Beeline訪問JDBC服務(wù)。

  2. 登錄云消息隊(duì)列 Kafka 版控制臺,獲取訪問Kafka數(shù)據(jù)的連接信息。

    1. 實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域,獲取Kafka實(shí)例的域名接入點(diǎn)。

    2. Topic管理頁面,獲取Kafka實(shí)例的Topic名稱。

  3. 在Lindorm計(jì)算引擎中創(chuàng)建Kafka Topic對應(yīng)的Hive臨時表。

    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);

    USING kafka中Kafka為指定數(shù)據(jù)源。Kafka表無需指定Schema,Lindorm計(jì)算引擎提供默認(rèn)的Kafka Schema,不可修改,Schema信息如下:

    列名稱

    數(shù)據(jù)類型

    說明

    key

    binary

    該記錄在Kafka中的Key信息。

    value

    binary

    該記錄在Kafka中的Value信息。

    topic

    string

    該記錄所屬的Kafka Topic。

    partition

    int

    該記錄所屬的Partition。

    offset

    bigint

    該記錄在Partition的offset。

    timestamp

    timestamp

    該記錄的時間戳。

    timestampType

    int

    該記錄時間戳的類型:

    • 0:CreateTime。

    • 1:LogAppendTime。

    詳細(xì)信息,請參見timestampType。

    table_options參數(shù)說明:

    參數(shù)

    是否必選

    說明

    示例值

    kafka.bootstrap.servers

    Kafka實(shí)例的域名接入點(diǎn)。

    alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092

    subscribe

    Kafka實(shí)例的Topic名稱。

    topic_to_read

    startingTimestamp

    待訪問Topic數(shù)據(jù)片段開始時間的時間戳,單位為毫秒。

    說明

    您可以使用Spark SQL unix_timestamp函數(shù)將時間轉(zhuǎn)換為UNIX時間戳。

    1682406000000

    endingTimestamp

    待訪問Topic數(shù)據(jù)片段結(jié)束時間的時間戳,單位為毫秒。

    說明

    您可以使用Spark SQL unix_timestamp函數(shù)將時間轉(zhuǎn)換為UNIX時間戳。

    1682409600000

    startingOffsetsByTimestampStrategy

    如果Kafka實(shí)例中部分Partition中沒有數(shù)據(jù),需要添加該參數(shù)。

    latest

    創(chuàng)建Kafka表支持多種類型參數(shù),更多參數(shù)請參考:Structured Streaming + Kafka Integration Guide。

    示例:

    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092",
      "subscribe"="topic_to_read",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);
  4. 查詢Kafka表的數(shù)據(jù)。

    • 查詢表Kafka_tbl中Schema的數(shù)據(jù)。

      DESCRIBE kafka_tbl;
    • 查詢表Kafka_tbl中的數(shù)據(jù)。

      SELECT * FROM kafka_tbl LIMIT 10;
    • 使用Spark函數(shù)提取Kafka中的數(shù)據(jù)。例如,查詢表Kafka_tbl中Value為{"content": "kafka record"}的數(shù)據(jù)。

      SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;

      返回結(jié)果:

      Output: kafka record

(可選)實(shí)踐:將Kafka數(shù)據(jù)導(dǎo)入Hive表

如果您有數(shù)據(jù)分析等相關(guān)需求,可以參考以下步驟將Kafka中的數(shù)據(jù)導(dǎo)入Hive表。

假設(shè)域名接入點(diǎn)為kafka_addr:9092,topic名稱為topic1的Kafka實(shí)例中有兩條寫入時間在2023-04-25 15:00:00至2023-04-25 16:00:00之間的數(shù)據(jù),具體內(nèi)容為:

{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}

現(xiàn)在需要將這兩條數(shù)據(jù)寫入Hive表中,便于后續(xù)進(jìn)行數(shù)據(jù)分析。

  1. 創(chuàng)建Kafka源表。

    CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="kafka_addr:9092",
      "subscribe"="topic1",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);

    參數(shù)的詳細(xì)說明,請參見參數(shù)說明

  2. 創(chuàng)建Hive目標(biāo)表。

    CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
  3. 將Kafka源表中所有數(shù)據(jù)解析后寫入Hive目標(biāo)表中。

    INSERT INTO kafka_target_tbl
        SELECT
          cast(get_json_object(cast(value as string), '$.id') as long),
          get_json_object(cast(value as string), '$.name')
        FROM
          kafka_src_tbl;
  4. 查詢Kafka導(dǎo)入Hive表中的數(shù)據(jù)。

    SELECT * FROM kafka_target_tbl LIMIT 10;

    返回結(jié)果:

    Output:
    +-----+--------+
    | id  |  name  |
    +-----+--------+
    | 1   | name1  |
    | 2   | name2  |
    +-----+--------+