本教程介紹如何使用Kafka Connect的Source Connector將SQL Server的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版。

前提條件

在開(kāi)始本教程前,請(qǐng)確保您已完成以下操作:

  • 已下載SQL Server Source Connector。具體信息,請(qǐng)參見(jiàn)SQL Server Source Connector
  • 已下載Kafka Connect。具體信息,請(qǐng)參見(jiàn)Kafka Connect
    說(shuō)明 SQL Server Source Connector目前只支持2.1.0及以上版本的Kafka Connect。
  • 已下載Docker。具體信息,請(qǐng)參見(jiàn)Docker。

步驟一:配置Kafka Connect

  1. 將下載完成的SQL Server Connector解壓到指定目錄。
  2. 在Kafka Connect的配置文件connect-distributed.properties中配置插件安裝位置。
    ## 指定插件解壓后的路徑。
    plugin.path=/kafka/connect/plugins
    重要

    Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。

    export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*

步驟二:?jiǎn)?dòng)Kafka Connect

配置好connect-distributed.properties后,執(zhí)行以下命令啟動(dòng)Kafka Connect。

  1. 如果是公網(wǎng)接入,需先設(shè)置java.security.auth.login.config,如果是VPC接入,可以跳過(guò)這一步。
    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
  2. 啟動(dòng)Kafka Connect。
    bin/connect-distributed.sh config/connect-distributed.properties

步驟三:安裝SQL Server

重要 SQL Server 2016 SP1以上版本支持CDC,因此您的SQL Server版本必須高于該版本。
  1. 下載docker-compose-sqlserver.yaml。
  2. 執(zhí)行以下命令安裝SQL Server。
    docker-compose -f docker-compose-sqlserver.yaml up

步驟四:配置SQL Server

  1. 下載inventory.sql。
  2. 執(zhí)行以下命令初始化SQL Server中的測(cè)試數(shù)據(jù)。
    cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
  3. 可選:如果您需要監(jiān)聽(tīng)SQL Server中已有的數(shù)據(jù)表,請(qǐng)完成以下配置:
    1. 執(zhí)行以下命令開(kāi)啟CDC配置。
      ## 開(kāi)啟CDC模板數(shù)據(jù)庫(kù)。
      USE testDB
      GO
      EXEC sys.sp_cdc_enable_db
      GO
    2. 執(zhí)行以下命令開(kāi)啟指定Table的CDC配置。
      ## 開(kāi)啟指定Table的CDC配置。
      USE testDB
      GO
      
      EXEC sys.sp_cdc_enable_table
      @source_schema = N'dbo',
      @source_name   = N'MyTable',
      @role_name     = N'MyRole',
      @filegroup_name = N'MyDB_CT',
      @supports_net_changes = 1
      GO
    3. 執(zhí)行以下命令確認(rèn)是否有權(quán)限訪問(wèn)CDC Table。
      EXEC sys.sp_cdc_help_change_data_capture
      GO
      說(shuō)明 如果返回結(jié)果為空,您需要確認(rèn)是否有權(quán)限訪問(wèn)該表。
    4. 執(zhí)行以下命令確認(rèn)SQL Server Agent已開(kāi)啟。
      EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
      說(shuō)明 如果返回結(jié)果為Running,則說(shuō)明SQL Server Agent已開(kāi)啟。

步驟五:?jiǎn)?dòng)SQL Server Connector

  1. 下載register-sqlserver.json。
  2. 編輯register-sqlserver.json
    • VPC接入
      ## 云消息隊(duì)列 Kafka 版實(shí)例的默認(rèn)接入點(diǎn),您可以在云消息隊(duì)列 Kafka 版控制臺(tái)獲取。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建同名Topic,在本例中創(chuàng)建topic:server1。
      ## 所有table的變更數(shù)據(jù),會(huì)記錄在server1.$DATABASE.$TABLE的topic中,例如server1.testDB.products。
      ## 因此您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)中創(chuàng)建所有相關(guān)Topic。
      "database.server.name": "server1",
      ## 記錄schema變化信息將記錄在該Topic中。
      ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建該Topic。
      "database.history.kafka.topic": "schema-changes-inventory"
    • 公網(wǎng)接入
      ## 云消息隊(duì)列 Kafka 版實(shí)例的SSL接入點(diǎn),您可以在云消息隊(duì)列 Kafka 版控制臺(tái)獲取。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建同名Topic,在本例中創(chuàng)建topic:server1。
      ## 所有table的變更數(shù)據(jù),會(huì)記錄在server1.$DATABASE.$TABLE的Topic中,例如server1.testDB.products。
      ## 因此您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)中創(chuàng)建所有相關(guān)Topic。
      "database.server.name": "server1",
      ## 記錄schema變化信息將記錄在該Topic中。
      ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建該Topic。
      "database.history.kafka.topic": "schema-changes-inventory",
      ## 通過(guò)SSL接入點(diǎn)訪問(wèn),還需要修改以下配置。
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. 完成register-sqlserver.json配置后,您需要根據(jù)配置在控制臺(tái)創(chuàng)建相應(yīng)的Topic,相關(guān)操作步驟請(qǐng)參見(jiàn)步驟一:創(chuàng)建Topic
    按照本教程中的方式安裝的SQL Server,您可以看到SQL Server中已經(jīng)提前創(chuàng)建db name:testDB。其中有四張表:
    • customers
    • orders
    • products
    • products_on_hand
    根據(jù)以上register-sqlserver.json的配置,您需要使用OpenAPI創(chuàng)建Topic:
    • server1
    • server1.testDB.customers
    • server1.testDB.orders
    • server1.testDB.products
    • server1.testDB.products_on_hand

    register-sqlserver.json中,配置了將schema變化信息記錄在schema-changes-testDB,因此您還需要使用OpenAPI創(chuàng)建Topic:schema-changes-inventory,相關(guān)操作請(qǐng)參見(jiàn)CreateTopic。

  4. 執(zhí)行以下命令啟動(dòng)SQL Server。
    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

結(jié)果驗(yàn)證

確認(rèn)云消息隊(duì)列 Kafka 版能否接收到SQL Server的變更數(shù)據(jù):

  1. 變更監(jiān)聽(tīng)SQL Server中的數(shù)據(jù)。
  2. 在控制臺(tái)的消息查詢(xún)頁(yè)面,查詢(xún)變更消息。具體操作步驟,請(qǐng)參見(jiàn)查詢(xún)消息。