本文介紹如何通過(guò)創(chuàng)建Tablestore Sink Connector,將數(shù)據(jù)從云消息隊(duì)列 Kafka 版實(shí)例的數(shù)據(jù)源Topic導(dǎo)出至表格存儲(chǔ)(Tablestore)。
前提條件
- 云消息隊(duì)列 Kafka 版
- 已為實(shí)例開(kāi)啟Connector。具體操作,請(qǐng)參見(jiàn)開(kāi)啟Connector。
- 已為實(shí)例創(chuàng)建數(shù)據(jù)源Topic。更多信息,請(qǐng)參見(jiàn)步驟一:創(chuàng)建Topic。
- 表格存儲(chǔ)
- 已開(kāi)通表格存儲(chǔ)服務(wù)。具體操作,請(qǐng)參見(jiàn)步驟一:開(kāi)通表格存儲(chǔ)服務(wù)。
- 已創(chuàng)建表格存儲(chǔ)實(shí)例。具體操作,請(qǐng)參見(jiàn)步驟二:創(chuàng)建實(shí)例。
注意事項(xiàng)
- 僅支持在同地域內(nèi),將數(shù)據(jù)從云消息隊(duì)列 Kafka 版實(shí)例的數(shù)據(jù)源Topic導(dǎo)出至表格存儲(chǔ)。Connector的限制說(shuō)明,請(qǐng)參見(jiàn)使用限制。
- 創(chuàng)建Connector時(shí),云消息隊(duì)列 Kafka 版會(huì)為您自動(dòng)創(chuàng)建服務(wù)關(guān)聯(lián)角色。
- 如果未創(chuàng)建服務(wù)關(guān)聯(lián)角色,云消息隊(duì)列 Kafka 版會(huì)為您自動(dòng)創(chuàng)建一個(gè)服務(wù)關(guān)聯(lián)角色,以便您使用云消息隊(duì)列 Kafka 版導(dǎo)出數(shù)據(jù)至表格存儲(chǔ)的功能。
- 如果已創(chuàng)建服務(wù)關(guān)聯(lián)角色,云消息隊(duì)列 Kafka 版不會(huì)重復(fù)創(chuàng)建。
操作流程
使用Tablestore Sink Connector將數(shù)據(jù)從云消息隊(duì)列 Kafka 版實(shí)例的數(shù)據(jù)源Topic導(dǎo)出至表格存儲(chǔ)操作流程如下:
- 可選:創(chuàng)建Tablestore Sink Connector依賴的Topic和Group
如果您不需要自定義Topic和Group,您可以直接跳過(guò)該步驟,在下一步驟選擇自動(dòng)創(chuàng)建。
重要 部分Tablestore Sink Connector依賴的Topic的存儲(chǔ)引擎必須為L(zhǎng)ocal存儲(chǔ),大版本為0.10.2的云消息隊(duì)列 Kafka 版實(shí)例不支持手動(dòng)創(chuàng)建Local存儲(chǔ)的Topic,只支持自動(dòng)創(chuàng)建。 - 創(chuàng)建并部署Tablestore Sink Connector
- 結(jié)果驗(yàn)證
創(chuàng)建Tablestore Sink Connector依賴的Topic
您可以在云消息隊(duì)列 Kafka 版控制臺(tái)手動(dòng)創(chuàng)建Tablestore Sink Connector依賴的5個(gè)Topic,包括:任務(wù)位點(diǎn)Topic、任務(wù)配置Topic、任務(wù)狀態(tài)Topic、死信隊(duì)列Topic以及異常數(shù)據(jù)Topic。每個(gè)Topic所需要滿足的分區(qū)數(shù)與存儲(chǔ)引擎會(huì)有差異,具體信息,請(qǐng)參見(jiàn)配置源服務(wù)參數(shù)列表。
- 登錄云消息隊(duì)列 Kafka 版控制臺(tái)。
- 在概覽頁(yè)面的資源分布區(qū)域,選擇地域。重要 Topic需要在應(yīng)用程序所在的地域(即所部署的ECS的所在地域)進(jìn)行創(chuàng)建。Topic不能跨地域使用。例如Topic創(chuàng)建在華北2(北京)這個(gè)地域,那么消息生產(chǎn)端和消費(fèi)端也必須運(yùn)行在華北2(北京)的ECS。
- 在實(shí)例列表頁(yè)面,單擊目標(biāo)實(shí)例名稱。
- 在左側(cè)導(dǎo)航欄,單擊Topic 管理。
- 在Topic 管理頁(yè)面,單擊創(chuàng)建 Topic。
- 在創(chuàng)建 Topic面板,設(shè)置Topic屬性,然后單擊確定。
參數(shù) 說(shuō)明 示例 名稱 Topic名稱。 demo 描述 Topic的簡(jiǎn)單描述。 demo test 分區(qū)數(shù) Topic的分區(qū)數(shù)量。 12 存儲(chǔ)引擎 說(shuō)明 當(dāng)前僅專業(yè)版實(shí)例支持選擇存儲(chǔ)引擎類型,標(biāo)準(zhǔn)版暫不支持,默認(rèn)選擇為云存儲(chǔ)類型。Topic消息的存儲(chǔ)引擎。 云消息隊(duì)列 Kafka 版支持以下兩種存儲(chǔ)引擎。
- 云存儲(chǔ):底層接入阿里云云盤,具有低時(shí)延、高性能、持久性、高可靠等特點(diǎn),采用分布式3副本機(jī)制。實(shí)例的規(guī)格類型為標(biāo)準(zhǔn)版(高寫版)時(shí),存儲(chǔ)引擎只能為云存儲(chǔ)。
- Local 存儲(chǔ):使用原生Kafka的ISR復(fù)制算法,采用分布式3副本機(jī)制。
云存儲(chǔ) 消息類型 Topic消息的類型。 - 普通消息:默認(rèn)情況下,保證相同Key的消息分布在同一個(gè)分區(qū)中,且分區(qū)內(nèi)消息按照發(fā)送順序存儲(chǔ)。集群中出現(xiàn)機(jī)器宕機(jī)時(shí),可能會(huì)造成消息亂序。當(dāng)存儲(chǔ)引擎選擇云存儲(chǔ)時(shí),默認(rèn)選擇普通消息。
- 分區(qū)順序消息:默認(rèn)情況下,保證相同Key的消息分布在同一個(gè)分區(qū)中,且分區(qū)內(nèi)消息按照發(fā)送順序存儲(chǔ)。集群中出現(xiàn)機(jī)器宕機(jī)時(shí),仍然保證分區(qū)內(nèi)按照發(fā)送順序存儲(chǔ)。但是會(huì)出現(xiàn)部分分區(qū)發(fā)送消息失敗,等到分區(qū)恢復(fù)后即可恢復(fù)正常。當(dāng)存儲(chǔ)引擎選擇Local 存儲(chǔ)時(shí),默認(rèn)選擇分區(qū)順序消息。
普通消息 日志清理策略 Topic日志的清理策略。 當(dāng)存儲(chǔ)引擎選擇Local 存儲(chǔ)(當(dāng)前僅專業(yè)版實(shí)例支持選擇存儲(chǔ)引擎類型為L(zhǎng)ocal存儲(chǔ),標(biāo)準(zhǔn)版暫不支持)時(shí),需要配置日志清理策略。
云消息隊(duì)列 Kafka 版支持以下兩種日志清理策略。
- Delete:默認(rèn)的消息清理策略。在磁盤容量充足的情況下,保留在最長(zhǎng)保留時(shí)間范圍內(nèi)的消息;在磁盤容量不足時(shí)(一般磁盤使用率超過(guò)85%視為不足),將提前刪除舊消息,以保證服務(wù)可用性。
- Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會(huì)被保留。主要適用于系統(tǒng)宕機(jī)后恢復(fù)狀態(tài),系統(tǒng)重啟后重新加載緩存等場(chǎng)景。例如,在使用Kafka Connect或Confluent Schema Registry時(shí),需要使用Kafka Compact Topic存儲(chǔ)系統(tǒng)狀態(tài)信息或配置信息。重要 Compact Topic一般只用在某些生態(tài)組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發(fā)請(qǐng)勿為Topic設(shè)置該屬性。具體信息,請(qǐng)參見(jiàn)云消息隊(duì)列 Kafka 版Demo庫(kù)。
Compact 標(biāo)簽 Topic的標(biāo)簽。 demo 創(chuàng)建完成后,在Topic 管理頁(yè)面的列表中顯示已創(chuàng)建的Topic。
創(chuàng)建Tablestore Sink Connector依賴的Group
您可以在云消息隊(duì)列 Kafka 版控制臺(tái)手動(dòng)創(chuàng)建Tablestore Sink Connector數(shù)據(jù)同步任務(wù)使用的Group。該Group的名稱必須為connect-任務(wù)名稱,具體信息,請(qǐng)參見(jiàn)配置源服務(wù)參數(shù)列表。
- 登錄云消息隊(duì)列 Kafka 版控制臺(tái)。
- 在概覽頁(yè)面的資源分布區(qū)域,選擇地域。
- 在實(shí)例列表頁(yè)面,單擊目標(biāo)實(shí)例名稱。
- 在左側(cè)導(dǎo)航欄,單擊Group 管理。
- 在Group 管理頁(yè)面,單擊創(chuàng)建 Group。
- 在創(chuàng)建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡(jiǎn)要描述Group,并給Group添加標(biāo)簽,單擊確定。創(chuàng)建完成后,在Group 管理頁(yè)面的列表中顯示已創(chuàng)建的Group。
創(chuàng)建并部署Tablestore Sink Connector
創(chuàng)建并部署將數(shù)據(jù)從云消息隊(duì)列 Kafka 版同步至表格存儲(chǔ)的Tablestore Sink Connector。
- 登錄云消息隊(duì)列 Kafka 版控制臺(tái)。
- 在概覽頁(yè)面的資源分布區(qū)域,選擇地域。
- 在左側(cè)導(dǎo)航欄,單擊Connector 任務(wù)列表。
- 在Connector 任務(wù)列表頁(yè)面,從選擇實(shí)例的下拉列表選擇Connector所屬的實(shí)例,然后單擊創(chuàng)建 Connector。
- 在創(chuàng)建 Connector配置向?qū)ы?yè)面,完成以下操作。
- 創(chuàng)建完成后,在Connector 任務(wù)列表頁(yè)面,找到創(chuàng)建的Connector ,單擊其操作列的部署。
- 單擊確認(rèn)。
發(fā)送測(cè)試消息
部署Tablestore Sink Connector后,您可以向云消息隊(duì)列 Kafka 版的數(shù)據(jù)源Topic發(fā)送消息,測(cè)試數(shù)據(jù)能否被同步至表格存儲(chǔ)。
- 在Connector 任務(wù)列表頁(yè)面,找到目標(biāo)Connector,在其右側(cè)操作列,單擊測(cè)試。
- 在發(fā)送消息面板,發(fā)送測(cè)試消息。
- 發(fā)送方式選擇控制臺(tái)。
- 在消息 Key文本框中輸入消息的Key值,例如demo。
- 在消息內(nèi)容文本框輸入測(cè)試的消息內(nèi)容,例如 {"key": "test"}。
- 設(shè)置發(fā)送到指定分區(qū),選擇是否指定分區(qū)。
- 單擊是,在分區(qū) ID文本框中輸入分區(qū)的ID,例如0。如果您需查詢分區(qū)的ID,請(qǐng)參見(jiàn)查看分區(qū)狀態(tài)。
- 單擊否,不指定分區(qū)。
- 發(fā)送方式選擇Docker,執(zhí)行運(yùn)行 Docker 容器生產(chǎn)示例消息區(qū)域的Docker命令,發(fā)送消息。
- 發(fā)送方式選擇SDK,根據(jù)您的業(yè)務(wù)需求,選擇需要的語(yǔ)言或者框架的SDK以及接入方式,通過(guò)SDK發(fā)送消息。
- 發(fā)送方式選擇控制臺(tái)。
查看表數(shù)據(jù)
- 登錄表格存儲(chǔ)控制臺(tái)。
- 在概覽頁(yè)面,單擊實(shí)例名稱或在操作列單擊實(shí)例管理。
- 在實(shí)例詳情頁(yè)簽,數(shù)據(jù)表列表區(qū)域,查看對(duì)應(yīng)的數(shù)據(jù)表。
- 單擊數(shù)據(jù)表名稱,在表管理頁(yè)面的數(shù)據(jù)管理頁(yè)簽,查看表數(shù)據(jù)。