日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Kafka Connect將MySQL數據同步至云消息隊列 Kafka 版

本教程介紹如何使用Kafka Connect的Source Connector將MySQL的數據同步至云消息隊列 Kafka 版。

背景信息

Kafka Connect主要用于將數據流輸入和輸出云消息隊列 Kafka 版。Kafka Connect主要通過各種Source Connector的實現,將數據從第三方系統輸入到Kafka Broker,通過各種Sink Connector實現,將數據從Kafka Broker中導入到第三方系統。system

前提條件

在開始本教程前,請確保您已完成以下操作:

  • 下載MySQL Source Connector。

    說明

    本教程以0.5.2版本的MySQL Source Connector為例。

  • 下載Kafka Connect。

    說明

    本教程以0.10.2.2版本的Kafka Connect為例。

  • 安裝Docker。

步驟一:配置Kafka Connect

  1. 將下載完成的MySQL Connector解壓到指定目錄。

  2. 在Kafka Connect的配置文件connect-distributed.properties中配置插件安裝位置。

    plugin.path=/kafka/connect/plugins
    重要

    Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。

    export CLASSPATH=/kafka/connect/plugins/mysql-connector/*

步驟二:啟動Kafka Connect

在配置好connect-distributed.properties后,執行以下命令啟動Kafka Connect。

  • 公網接入

    1. 執行命令export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"設置java.security.auth.login.config。

    2. 執行命令bin/connect-distributed.sh config/connect-distributed.properties啟動Kafka Connect。

  • VPC接入

    執行命令bin/connect-distributed.sh config/connect-distributed.properties啟動Kafka Connect。

步驟三:安裝MySQL

  1. 下載docker-compose-mysql.yaml

  2. 執行以下命令安裝MySQL。

    export DEBEZIUM_VERSION=0.5
    docker-compose -f docker-compose-mysql.yaml up

步驟四:配置MySQL

  1. 在配置文件中配置以下內容,開啟MySQL的binlog寫入功能,并配置binlog模式為row。

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1 
  2. 執行以下命令設置MySQL的User權限。

    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    說明

    示例中MySQL的User為debezium,密碼為dbz。

步驟五:啟動MySQL Connector

  1. 下載register-mysql.json

  2. 編輯register-mysql.json

    • VPC接入

      ## 云消息隊列 Kafka 版接入點,通過控制臺獲取。
      ## 您在控制臺獲取的默認接入點。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制臺創建同名Topic,在本例中創建Topic:server1。
      ## 所有Table的變更數據,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.inventory.products。
      ## 因此用戶需要提前在控制臺中創建所有相關Topic。
      "database.server.name": "server1",
      ## 記錄schema變化信息將記錄在這個Topic中。
      ## 需要提前在控制臺創建。
      "database.history.kafka.topic": "schema-changes-inventory"
    • 公網接入

      ## 云消息隊列 Kafka 版接入點,通過控制臺獲取。存儲db中schema變化信息。
      ## 您在控制臺獲取的SSL接入點。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 需要提前在控制臺創建同名Topic,在本例中創建Topic:server1。
      ## 所有Table的變更數據,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.testDB.products。
      ## 因此用戶需要提前在控制臺中創建所有相關Topic。
      "database.server.name": "server1",
      ## schema變化信息將記錄在這個Topic中。
      ## 需要提前在控制臺創建。
      "database.history.kafka.topic": "schema-changes-inventory",
      ## SSL公網方式訪問配置。
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. 配置好register-mysql.json后,您需要根據配置在控制臺創建相應的Topic,相關操作步驟,請參見步驟一:創建Topic。

    按照本教程中的方式安裝的MySQL,您可以看到MySQL中已經提前創建好了database:inventory。其中有四張表:

    • customers

    • orders

    • products

    • products_on_hand

    根據以上配置,您需要使用OpenAPI創建Topic:

    • server1

    • server1.inventory.customers

    • server1.inventory.orders

    • server1.inventory.products

    • server1.inventory.products_on_hand

    register-mysql.json中,配置了將schema變化信息記錄在schema-changes-testDB,因此您還需要使用OpenAPI創建Topic:schema-changes-inventory。 使用OpenAPI創建Topic,請參見CreateTopic。

  4. 執行以下命令啟動MySQL Connector。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

結果驗證

按照以下步驟操作確認云消息隊列 Kafka 版能否接收到MySQL的變更數據。

  1. 變更MySQL Table中的數據。

  2. 在控制臺的消息查詢頁面,查詢變更數據。