訪問Kafka數(shù)據(jù)
本文介紹如何通過Lindorm計(jì)算引擎訪問Kafka數(shù)據(jù)。您可以將Kafka實(shí)例中的數(shù)據(jù)加載到Lindorm計(jì)算引擎中,結(jié)合Lindorm計(jì)算引擎的其他數(shù)據(jù)源數(shù)據(jù),完成復(fù)雜的數(shù)據(jù)生產(chǎn)作業(yè)。
前提條件
已開通Lindorm實(shí)例的計(jì)算引擎服務(wù)。具體操作,請參見開通與變配。
已創(chuàng)建與Lindorm實(shí)例位于同一地域的Kafka實(shí)例。具體操作,請參見公網(wǎng)和VPC接入。
已在Kafka實(shí)例中創(chuàng)建Topic。具體操作,請參見步驟三:創(chuàng)建資源。
操作步驟
啟動Beeline,并訪問Lindorm計(jì)算引擎服務(wù)。具體操作,請參見使用Beeline訪問JDBC服務(wù)。
登錄云消息隊(duì)列 Kafka 版控制臺,獲取訪問Kafka數(shù)據(jù)的連接信息。
在實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域,獲取Kafka實(shí)例的域名接入點(diǎn)。
在Topic管理頁面,獲取Kafka實(shí)例的Topic名稱。
在Lindorm計(jì)算引擎中創(chuàng)建Kafka Topic對應(yīng)的Hive臨時表。
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);
USING kafka
中Kafka為指定數(shù)據(jù)源。Kafka表無需指定Schema,Lindorm計(jì)算引擎提供默認(rèn)的Kafka Schema,不可修改,Schema信息如下:列名稱
數(shù)據(jù)類型
說明
key
binary
該記錄在Kafka中的Key信息。
value
binary
該記錄在Kafka中的Value信息。
topic
string
該記錄所屬的Kafka Topic。
partition
int
該記錄所屬的Partition。
offset
bigint
該記錄在Partition的offset。
timestamp
timestamp
該記錄的時間戳。
timestampType
int
該記錄時間戳的類型:
0:CreateTime。
1:LogAppendTime。
詳細(xì)信息,請參見timestampType。
table_options
參數(shù)說明:參數(shù)
是否必選
說明
示例值
kafka.bootstrap.servers
是
Kafka實(shí)例的域名接入點(diǎn)。
alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092
subscribe
是
Kafka實(shí)例的Topic名稱。
topic_to_read
startingTimestamp
是
待訪問Topic數(shù)據(jù)片段開始時間的時間戳,單位為毫秒。
說明您可以使用Spark SQL unix_timestamp函數(shù)將時間轉(zhuǎn)換為UNIX時間戳。
1682406000000
endingTimestamp
是
待訪問Topic數(shù)據(jù)片段結(jié)束時間的時間戳,單位為毫秒。
說明您可以使用Spark SQL unix_timestamp函數(shù)將時間轉(zhuǎn)換為UNIX時間戳。
1682409600000
startingOffsetsByTimestampStrategy
否
如果Kafka實(shí)例中部分Partition中沒有數(shù)據(jù),需要添加該參數(shù)。
latest
創(chuàng)建Kafka表支持多種類型參數(shù),更多參數(shù)請參考:Structured Streaming + Kafka Integration Guide。
示例:
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092", "subscribe"="topic_to_read", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
查詢Kafka表的數(shù)據(jù)。
查詢表Kafka_tbl中Schema的數(shù)據(jù)。
DESCRIBE kafka_tbl;
查詢表Kafka_tbl中的數(shù)據(jù)。
SELECT * FROM kafka_tbl LIMIT 10;
使用Spark函數(shù)提取Kafka中的數(shù)據(jù)。例如,查詢表Kafka_tbl中Value為
{"content": "kafka record"}
的數(shù)據(jù)。SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;
返回結(jié)果:
Output: kafka record
(可選)實(shí)踐:將Kafka數(shù)據(jù)導(dǎo)入Hive表
如果您有數(shù)據(jù)分析等相關(guān)需求,可以參考以下步驟將Kafka中的數(shù)據(jù)導(dǎo)入Hive表。
假設(shè)域名接入點(diǎn)為kafka_addr:9092,topic名稱為topic1的Kafka實(shí)例中有兩條寫入時間在2023-04-25 15:00:00至2023-04-25 16:00:00之間的數(shù)據(jù),具體內(nèi)容為:
{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}
現(xiàn)在需要將這兩條數(shù)據(jù)寫入Hive表中,便于后續(xù)進(jìn)行數(shù)據(jù)分析。
創(chuàng)建Kafka源表。
CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="kafka_addr:9092", "subscribe"="topic1", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
參數(shù)的詳細(xì)說明,請參見參數(shù)說明。
創(chuàng)建Hive目標(biāo)表。
CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
將Kafka源表中所有數(shù)據(jù)解析后寫入Hive目標(biāo)表中。
INSERT INTO kafka_target_tbl SELECT cast(get_json_object(cast(value as string), '$.id') as long), get_json_object(cast(value as string), '$.name') FROM kafka_src_tbl;
查詢Kafka導(dǎo)入Hive表中的數(shù)據(jù)。
SELECT * FROM kafka_target_tbl LIMIT 10;
返回結(jié)果:
Output: +-----+--------+ | id | name | +-----+--------+ | 1 | name1 | | 2 | name2 | +-----+--------+