本文介紹如何創(chuàng)建MySQL Source Connector,通過DataWorks將數(shù)據(jù)從阿里云數(shù)據(jù)庫RDS MySQL版導(dǎo)出至云消息隊(duì)列 Kafka 版實(shí)例的Topic。
前提條件
- 為云消息隊(duì)列 Kafka 版實(shí)例開啟Connector。更多信息,請(qǐng)參見開啟Connector。重要 請(qǐng)確保您的云消息隊(duì)列 Kafka 版實(shí)例部署在華南1(深圳)、西南1(成都)、華北2(北京)、華北3(張家口)、華東1(杭州)、華東2(上海)或新加坡地域。
- 創(chuàng)建RDS MySQL實(shí)例。
- 創(chuàng)建數(shù)據(jù)庫和賬號(hào)。
- 創(chuàng)建數(shù)據(jù)庫表。常見的SQL語句,請(qǐng)參見常用語句。
- 阿里云賬號(hào)和RAM用戶均須授予DataWorks訪問您彈性網(wǎng)卡ENI資源的權(quán)限。授予權(quán)限,請(qǐng)?jiān)L問云資源訪問授權(quán)。重要 如果您使用的是RAM用戶,請(qǐng)確保您的賬號(hào)有以下權(quán)限:
- AliyunDataWorksFullAccess:DataWorks所有資源的管理權(quán)限。
- AliyunBSSOrderAccess:購(gòu)買阿里云產(chǎn)品的權(quán)限。
如何為RAM用戶添加權(quán)限策略,請(qǐng)參見步驟二:為RAM用戶添加權(quán)限。
- 請(qǐng)確保您是阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例(數(shù)據(jù)源)和云消息隊(duì)列 Kafka 版實(shí)例(數(shù)據(jù)目標(biāo))的所有者,即創(chuàng)建者。
- 請(qǐng)確保阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例(數(shù)據(jù)源)和云消息隊(duì)列 Kafka 版實(shí)例(數(shù)據(jù)目標(biāo))所在的VPC網(wǎng)段沒有重合,否則無法成功創(chuàng)建同步任務(wù)。
背景信息
您可以在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建數(shù)據(jù)同步任務(wù),將您在阿里云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表中的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版的Topic。該同步任務(wù)將依賴阿里云DataWorks產(chǎn)品實(shí)現(xiàn),流程圖如下所示。如果您在云消息隊(duì)列 Kafka 版控制臺(tái)成功創(chuàng)建了數(shù)據(jù)同步任務(wù),那么阿里云DataWorks會(huì)自動(dòng)為您開通DataWorks產(chǎn)品基礎(chǔ)版服務(wù)(免費(fèi))、新建DataWorks項(xiàng)目(免費(fèi))、并新建數(shù)據(jù)集成獨(dú)享資源組(需付費(fèi)),資源組規(guī)格為4c8g,購(gòu)買模式為包年包月,時(shí)長(zhǎng)為1個(gè)月并自動(dòng)續(xù)費(fèi)。阿里云DataWorks的計(jì)費(fèi)詳情,請(qǐng)參見DataWorks計(jì)費(fèi)概述。
此外,DataWorks會(huì)根據(jù)您數(shù)據(jù)同步任務(wù)的配置,自動(dòng)為您生成云消息隊(duì)列 Kafka 版的目標(biāo)Topic。數(shù)據(jù)庫表和Topic是一對(duì)一的關(guān)系,對(duì)于有主鍵的表,默認(rèn)6分區(qū);無主鍵的表,默認(rèn)1分區(qū)。請(qǐng)確保實(shí)例剩余Topic數(shù)和分區(qū)數(shù)充足,不然任務(wù)會(huì)因?yàn)閯?chuàng)建Topic失敗而導(dǎo)致異常。
Topic的命名格式為<配置的前綴>_<數(shù)據(jù)庫表名>
,下劃線(_)為系統(tǒng)自動(dòng)添加的字符。詳情如下圖所示。
例如,您將前綴配置為mysql,需同步的數(shù)據(jù)庫表名為table_1,那么DataWorks會(huì)自動(dòng)為您生成專用Topic,用來接收table_1同步過來的數(shù)據(jù),該Topic的名稱為mysql_table_1;table_2的專用Topic名稱為mysql_table_2,以此類推。
注意事項(xiàng)
- 地域說明
- 如果數(shù)據(jù)源和目標(biāo)實(shí)例位于不同地域,請(qǐng)確保您使用的賬號(hào)擁有云企業(yè)網(wǎng)實(shí)例,且云企業(yè)網(wǎng)實(shí)例已掛載數(shù)據(jù)源和目標(biāo)實(shí)例所在的VPC,并配置好流量帶寬完成網(wǎng)絡(luò)打通。
否則,可能會(huì)新建云企業(yè)網(wǎng)實(shí)例,并將目標(biāo)實(shí)例和獨(dú)享資源組ECS全部掛載到云企業(yè)網(wǎng)實(shí)例來打通網(wǎng)絡(luò)。這樣的云企業(yè)網(wǎng)實(shí)例沒有配置帶寬,所以帶寬流量很小,可能導(dǎo)致創(chuàng)建同步任務(wù)過程中的網(wǎng)絡(luò)訪問出錯(cuò),或者同步任務(wù)創(chuàng)建成功后,在運(yùn)行過程中出錯(cuò)。
- 如果數(shù)據(jù)源和目標(biāo)實(shí)例位于同一地域,創(chuàng)建數(shù)據(jù)同步任務(wù)會(huì)自動(dòng)在其中一個(gè)實(shí)例所在VPC創(chuàng)建ENI,并綁定到獨(dú)享資源組ECS上,以打通網(wǎng)絡(luò)。
- 如果數(shù)據(jù)源和目標(biāo)實(shí)例位于不同地域,請(qǐng)確保您使用的賬號(hào)擁有云企業(yè)網(wǎng)實(shí)例,且云企業(yè)網(wǎng)實(shí)例已掛載數(shù)據(jù)源和目標(biāo)實(shí)例所在的VPC,并配置好流量帶寬完成網(wǎng)絡(luò)打通。
- DataWorks獨(dú)享資源組說明
- DataWorks的每個(gè)獨(dú)享資源組可以運(yùn)行最多3個(gè)同步任務(wù)。創(chuàng)建數(shù)據(jù)同步任務(wù)時(shí),如果DataWorks發(fā)現(xiàn)您的賬號(hào)名下有資源組的歷史購(gòu)買記錄,并且運(yùn)行的同步任務(wù)少于3個(gè),將使用已有資源組運(yùn)行新建的同步任務(wù)。
- DataWorks的每個(gè)獨(dú)享資源組最多綁定兩個(gè)VPC的ENI。如果DataWorks發(fā)現(xiàn)已購(gòu)買的資源組綁定的ENI與需要新綁定的ENI有網(wǎng)段沖突,或者其他技術(shù)限制,導(dǎo)致使用已購(gòu)買的資源組無法創(chuàng)建出同步任務(wù),此時(shí),即使已有的資源組運(yùn)行的同步任務(wù)少于3個(gè),也將新建資源組確保同步任務(wù)能夠順利創(chuàng)建。
創(chuàng)建并部署MySQL Source Connector
- 登錄云消息隊(duì)列 Kafka 版控制臺(tái)。
- 在概覽頁面的資源分布區(qū)域,選擇地域。
- 在左側(cè)導(dǎo)航欄,單擊Connector 任務(wù)列表。
- 在Connector 任務(wù)列表頁面,從選擇實(shí)例的下拉列表選擇Connector所屬的實(shí)例,然后單擊創(chuàng)建 Connector。
- 在創(chuàng)建 Connector配置向?qū)е校瓿梢韵虏僮鳌?/span>
- 創(chuàng)建完成后,在Connector 任務(wù)列表頁面,找到創(chuàng)建的Connector ,單擊其操作列的部署。在Connector 任務(wù)列表頁面,您可以看到創(chuàng)建的任務(wù)狀態(tài)為運(yùn)行中,則說明任務(wù)創(chuàng)建成功。說明 如果創(chuàng)建失敗,請(qǐng)?jiān)俅螜z查本文前提條件中的操作是否已全部完成。
如需配置同步任務(wù),單擊其操作列的任務(wù)配置,跳轉(zhuǎn)至DataWorks控制臺(tái)完成操作。
驗(yàn)證結(jié)果
- 向阿里云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表插入數(shù)據(jù)。示例如下。
更多SQL語句,請(qǐng)參見常用語句。INSERT INTO mysql_tbl (mysql_title, mysql_author, submission_date) VALUES ("mysql2kafka", "tester", NOW())
- 使用云消息隊(duì)列 Kafka 版提供的消息查詢功能,驗(yàn)證數(shù)據(jù)能否被導(dǎo)出至云消息隊(duì)列 Kafka 版目標(biāo)Topic。查詢的具體步驟,請(qǐng)參見查詢消息。云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表導(dǎo)出至云消息隊(duì)列 Kafka 版Topic的數(shù)據(jù)示例如下。消息結(jié)構(gòu)及各字段含義,請(qǐng)參見附錄:消息格式。
{ "schema":{ "dataColumn":[ { "name":"mysql_id", "type":"LONG" }, { "name":"mysql_title", "type":"STRING" }, { "name":"mysql_author", "type":"STRING" }, { "name":"submission_date", "type":"DATE" } ], "primaryKey":[ "mysql_id" ], "source":{ "dbType":"MySQL", "dbName":"mysql_to_kafka", "tableName":"mysql_tbl" } }, "payload":{ "before":null, "after":{ "dataColumn":{ "mysql_title":"mysql2kafka", "mysql_author":"tester", "submission_date":1614700800000 } }, "sequenceId":"1614748790461000000", "timestamp":{ "eventTime":1614748870000, "systemTime":1614748870925, "checkpointTime":1614748870000 }, "op":"INSERT", "ddl":null }, "version":"0.0.1" }