本文介紹如何創(chuàng)建MySQL Source Connector,通過DataWorks將數(shù)據(jù)從阿里云數(shù)據(jù)庫RDS MySQL版導(dǎo)出至云消息隊(duì)列 Kafka 版實(shí)例的Topic。

前提條件

在導(dǎo)出數(shù)據(jù)前,請(qǐng)確保您已完成以下操作:
  • 云消息隊(duì)列 Kafka 版實(shí)例開啟Connector。更多信息,請(qǐng)參見開啟Connector
    重要 請(qǐng)確保您的云消息隊(duì)列 Kafka 版實(shí)例部署在華南1(深圳)、西南1(成都)、華北2(北京)、華北3(張家口)、華東1(杭州)、華東2(上海)或新加坡地域。
  • 創(chuàng)建RDS MySQL實(shí)例
  • 創(chuàng)建數(shù)據(jù)庫和賬號(hào)
  • 創(chuàng)建數(shù)據(jù)庫表。常見的SQL語句,請(qǐng)參見常用語句
  • 阿里云賬號(hào)和RAM用戶均須授予DataWorks訪問您彈性網(wǎng)卡ENI資源的權(quán)限。授予權(quán)限,請(qǐng)?jiān)L問云資源訪問授權(quán)
    重要 如果您使用的是RAM用戶,請(qǐng)確保您的賬號(hào)有以下權(quán)限:
    • AliyunDataWorksFullAccess:DataWorks所有資源的管理權(quán)限。
    • AliyunBSSOrderAccess:購(gòu)買阿里云產(chǎn)品的權(quán)限。

    如何為RAM用戶添加權(quán)限策略,請(qǐng)參見步驟二:為RAM用戶添加權(quán)限

  • 請(qǐng)確保您是阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例(數(shù)據(jù)源)和云消息隊(duì)列 Kafka 版實(shí)例(數(shù)據(jù)目標(biāo))的所有者,即創(chuàng)建者。
  • 請(qǐng)確保阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例(數(shù)據(jù)源)和云消息隊(duì)列 Kafka 版實(shí)例(數(shù)據(jù)目標(biāo))所在的VPC網(wǎng)段沒有重合,否則無法成功創(chuàng)建同步任務(wù)。

背景信息

您可以在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建數(shù)據(jù)同步任務(wù),將您在阿里云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表中的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版的Topic。該同步任務(wù)將依賴阿里云DataWorks產(chǎn)品實(shí)現(xiàn),流程圖如下所示。mysql_connector

如果您在云消息隊(duì)列 Kafka 版控制臺(tái)成功創(chuàng)建了數(shù)據(jù)同步任務(wù),那么阿里云DataWorks會(huì)自動(dòng)為您開通DataWorks產(chǎn)品基礎(chǔ)版服務(wù)(免費(fèi))、新建DataWorks項(xiàng)目(免費(fèi))、并新建數(shù)據(jù)集成獨(dú)享資源組(需付費(fèi)),資源組規(guī)格為4c8g,購(gòu)買模式為包年包月,時(shí)長(zhǎng)為1個(gè)月并自動(dòng)續(xù)費(fèi)。阿里云DataWorks的計(jì)費(fèi)詳情,請(qǐng)參見DataWorks計(jì)費(fèi)概述

此外,DataWorks會(huì)根據(jù)您數(shù)據(jù)同步任務(wù)的配置,自動(dòng)為您生成云消息隊(duì)列 Kafka 版的目標(biāo)Topic。數(shù)據(jù)庫表和Topic是一對(duì)一的關(guān)系,對(duì)于有主鍵的表,默認(rèn)6分區(qū);無主鍵的表,默認(rèn)1分區(qū)。請(qǐng)確保實(shí)例剩余Topic數(shù)和分區(qū)數(shù)充足,不然任務(wù)會(huì)因?yàn)閯?chuàng)建Topic失敗而導(dǎo)致異常。

Topic的命名格式為<配置的前綴>_<數(shù)據(jù)庫表名>,下劃線(_)為系統(tǒng)自動(dòng)添加的字符。詳情如下圖所示。

table_topic_match

例如,您將前綴配置為mysql,需同步的數(shù)據(jù)庫表名為table_1,那么DataWorks會(huì)自動(dòng)為您生成專用Topic,用來接收table_1同步過來的數(shù)據(jù),該Topic的名稱為mysql_table_1;table_2的專用Topic名稱為mysql_table_2,以此類推。

注意事項(xiàng)

  • 地域說明
    • 如果數(shù)據(jù)源和目標(biāo)實(shí)例位于不同地域,請(qǐng)確保您使用的賬號(hào)擁有云企業(yè)網(wǎng)實(shí)例,且云企業(yè)網(wǎng)實(shí)例已掛載數(shù)據(jù)源和目標(biāo)實(shí)例所在的VPC,并配置好流量帶寬完成網(wǎng)絡(luò)打通。

      否則,可能會(huì)新建云企業(yè)網(wǎng)實(shí)例,并將目標(biāo)實(shí)例和獨(dú)享資源組ECS全部掛載到云企業(yè)網(wǎng)實(shí)例來打通網(wǎng)絡(luò)。這樣的云企業(yè)網(wǎng)實(shí)例沒有配置帶寬,所以帶寬流量很小,可能導(dǎo)致創(chuàng)建同步任務(wù)過程中的網(wǎng)絡(luò)訪問出錯(cuò),或者同步任務(wù)創(chuàng)建成功后,在運(yùn)行過程中出錯(cuò)。

    • 如果數(shù)據(jù)源和目標(biāo)實(shí)例位于同一地域,創(chuàng)建數(shù)據(jù)同步任務(wù)會(huì)自動(dòng)在其中一個(gè)實(shí)例所在VPC創(chuàng)建ENI,并綁定到獨(dú)享資源組ECS上,以打通網(wǎng)絡(luò)。
  • DataWorks獨(dú)享資源組說明
    • DataWorks的每個(gè)獨(dú)享資源組可以運(yùn)行最多3個(gè)同步任務(wù)。創(chuàng)建數(shù)據(jù)同步任務(wù)時(shí),如果DataWorks發(fā)現(xiàn)您的賬號(hào)名下有資源組的歷史購(gòu)買記錄,并且運(yùn)行的同步任務(wù)少于3個(gè),將使用已有資源組運(yùn)行新建的同步任務(wù)。
    • DataWorks的每個(gè)獨(dú)享資源組最多綁定兩個(gè)VPC的ENI。如果DataWorks發(fā)現(xiàn)已購(gòu)買的資源組綁定的ENI與需要新綁定的ENI有網(wǎng)段沖突,或者其他技術(shù)限制,導(dǎo)致使用已購(gòu)買的資源組無法創(chuàng)建出同步任務(wù),此時(shí),即使已有的資源組運(yùn)行的同步任務(wù)少于3個(gè),也將新建資源組確保同步任務(wù)能夠順利創(chuàng)建。

創(chuàng)建并部署MySQL Source Connector

  1. 登錄云消息隊(duì)列 Kafka 版控制臺(tái)
  2. 概覽頁面的資源分布區(qū)域,選擇地域。
  3. 在左側(cè)導(dǎo)航欄,單擊Connector 任務(wù)列表
  4. Connector 任務(wù)列表頁面,從選擇實(shí)例的下拉列表選擇Connector所屬的實(shí)例,然后單擊創(chuàng)建 Connector
  5. 創(chuàng)建 Connector配置向?qū)е校瓿梢韵虏僮鳌?/span>
    1. 配置基本信息頁簽的名稱文本框,輸入Connector名稱,然后單擊下一步
      參數(shù)描述示例值
      名稱Connector的名稱。命名規(guī)則:
      • 可以包含數(shù)字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長(zhǎng)度限制為48個(gè)字符。
      • 同一個(gè)云消息隊(duì)列 Kafka 版實(shí)例內(nèi)保持唯一。

      Connector的數(shù)據(jù)同步任務(wù)必須使用名稱為connect-任務(wù)名稱Group。如果您未手動(dòng)創(chuàng)建該Group,系統(tǒng)將為您自動(dòng)創(chuàng)建。

      kafka-source-mysql
      實(shí)例默認(rèn)配置為實(shí)例的名稱與實(shí)例ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服務(wù)頁簽,選擇數(shù)據(jù)源云數(shù)據(jù)庫RDS MySQL版,配置以下參數(shù),然后單擊下一步
      參數(shù)描述示例值
      RDS 實(shí)例所在地域從下拉列表中,選擇阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例所在的地域。華南1(深圳)
      云數(shù)據(jù)庫 RDS 實(shí)例 ID需要同步數(shù)據(jù)的阿里云數(shù)據(jù)庫RDS MySQL版的實(shí)例ID。rm-wz91w3vk6owmz****
      數(shù)據(jù)庫名稱需要同步的阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例數(shù)據(jù)庫的名稱。mysql-to-kafka
      數(shù)據(jù)庫賬號(hào)需要同步的阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例數(shù)據(jù)庫賬號(hào)。mysql_to_kafka
      數(shù)據(jù)庫賬號(hào)密碼需要同步的阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例數(shù)據(jù)庫賬號(hào)的密碼。
      數(shù)據(jù)庫表需要同步的阿里云數(shù)據(jù)庫RDS MySQL版實(shí)例數(shù)據(jù)庫表的名稱,多個(gè)表名以英文逗號(hào)(,)分隔。

      數(shù)據(jù)庫表和目標(biāo)Topic是一對(duì)一的關(guān)系。

      mysql_tbl
      自動(dòng)添加數(shù)據(jù)表批量添加數(shù)據(jù)庫中的其他表。當(dāng)創(chuàng)建的新表匹配成功時(shí),也可被識(shí)別并同步數(shù)據(jù)。

      格式為正則表達(dá)式。例如,輸入.*,表示匹配數(shù)據(jù)庫中的所有表。

      .*
      Topic 前綴阿里云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表同步到云消息隊(duì)列 Kafka 版的Topic的命名前綴,請(qǐng)確保前綴全局唯一。mysql
      重要
      請(qǐng)確保阿里云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫賬號(hào)有以下最小權(quán)限:
      • SELECT
      • REPLICATION SLAVE
      • REPLICATION CLIENT
      授權(quán)命令示例:
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '同步賬號(hào)'@'%'; //授予數(shù)據(jù)庫賬號(hào)的SELECT、REPLICATION SLAVE和REPLICATION CLIENT權(quán)限。
    3. 配置目標(biāo)服務(wù)頁簽,顯示數(shù)據(jù)將同步到目標(biāo)云消息隊(duì)列 Kafka 版實(shí)例,確認(rèn)信息無誤后,單擊創(chuàng)建
  6. 創(chuàng)建完成后,在Connector 任務(wù)列表頁面,找到創(chuàng)建的Connector ,單擊其操作列的部署
    Connector 任務(wù)列表頁面,您可以看到創(chuàng)建的任務(wù)狀態(tài)運(yùn)行中,則說明任務(wù)創(chuàng)建成功。
    說明 如果創(chuàng)建失敗,請(qǐng)?jiān)俅螜z查本文前提條件中的操作是否已全部完成。

    如需配置同步任務(wù),單擊其操作列的任務(wù)配置,跳轉(zhuǎn)至DataWorks控制臺(tái)完成操作。

驗(yàn)證結(jié)果

  1. 向阿里云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表插入數(shù)據(jù)。
    示例如下。
    INSERT INTO mysql_tbl
        (mysql_title, mysql_author, submission_date)
        VALUES
        ("mysql2kafka", "tester", NOW())
    更多SQL語句,請(qǐng)參見常用語句
  2. 使用云消息隊(duì)列 Kafka 版提供的消息查詢功能,驗(yàn)證數(shù)據(jù)能否被導(dǎo)出至云消息隊(duì)列 Kafka 版目標(biāo)Topic。
    查詢的具體步驟,請(qǐng)參見查詢消息
    云數(shù)據(jù)庫RDS MySQL版數(shù)據(jù)庫表導(dǎo)出至云消息隊(duì)列 Kafka 版Topic的數(shù)據(jù)示例如下。消息結(jié)構(gòu)及各字段含義,請(qǐng)參見附錄:消息格式
    {
        "schema":{
            "dataColumn":[
                {
                    "name":"mysql_id",
                    "type":"LONG"
                },
                {
                    "name":"mysql_title",
                    "type":"STRING"
                },
                {
                    "name":"mysql_author",
                    "type":"STRING"
                },
                {
                    "name":"submission_date",
                    "type":"DATE"
                }
            ],
            "primaryKey":[
                "mysql_id"
            ],
            "source":{
                "dbType":"MySQL",
                "dbName":"mysql_to_kafka",
                "tableName":"mysql_tbl"
            }
        },
        "payload":{
            "before":null,
            "after":{
                "dataColumn":{
                    "mysql_title":"mysql2kafka",
                    "mysql_author":"tester",
                    "submission_date":1614700800000
                }
            },
            "sequenceId":"1614748790461000000",
            "timestamp":{
                "eventTime":1614748870000,
                "systemTime":1614748870925,
                "checkpointTime":1614748870000
            },
            "op":"INSERT",
            "ddl":null
        },
        "version":"0.0.1"
    }