日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用RabbitMQ Connector同步RabbitMQ數(shù)據(jù)

本文介紹如何創(chuàng)建RabbitMQ Connector,將云消息隊(duì)列 RabbitMQ 版的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版

前提條件

步驟一:創(chuàng)建RabbitMQ資源

  1. 登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),創(chuàng)建RabbitMQ實(shí)例。操作步驟,請(qǐng)參見創(chuàng)建實(shí)例

  2. 單擊已創(chuàng)建的實(shí)例,在實(shí)例詳情頁面創(chuàng)建以下資源。

    1. 在左側(cè)導(dǎo)航欄,單擊靜態(tài)用戶名密碼,然后單擊創(chuàng)建用戶名密碼。更多信息,請(qǐng)參見創(chuàng)建用戶名密碼

      創(chuàng)建完成后保存用戶名和密碼。image..png

    2. 在左側(cè)導(dǎo)航欄,單擊Vhost列表,然后單擊創(chuàng)建Vhost。更多信息,請(qǐng)參見創(chuàng)建Exchange

    3. 在左側(cè)導(dǎo)航欄,單擊Queue列表,在當(dāng)前 Vhost右側(cè)的切換下拉列表中,選擇已創(chuàng)建的Vhost,然后單擊創(chuàng)建Queue。更多信息,請(qǐng)參見創(chuàng)建Queue

步驟二:創(chuàng)建Connector

  1. 下載RabbitMQ Connector文件,上傳至提前創(chuàng)建好的OSS Bucket。更多信息,請(qǐng)參見控制臺(tái)上傳文件

  2. 登錄云消息隊(duì)列 Kafka 版控制臺(tái),在概覽頁面的資源分布區(qū)域,選擇地域。

  3. 在左側(cè)導(dǎo)航欄,選擇Connector生態(tài)集成 > 任務(wù)列表

  4. 任務(wù)列表頁面,單擊創(chuàng)建任務(wù)列表

  5. 創(chuàng)建任務(wù)面板。設(shè)置任務(wù)名稱,配置以下配置項(xiàng)。

    • 任務(wù)創(chuàng)建

      1. Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方Apache Kafka Connect,單擊下一步

      2. 連接器配置配置向?qū)ВO(shè)置以下配置項(xiàng),然后單擊下一步

        配置項(xiàng)

        參數(shù)

        說明

        Kafka Connect插件

        Bucket存儲(chǔ)桶

        選擇OSS Bucket。

        文件

        選擇上傳的.ZIP文件。

        Kafka資源信息

        Kafka 參數(shù)配置

        選擇Source Connect。

        Kafka實(shí)例

        選擇前提條件中創(chuàng)建的實(shí)例。

        專有網(wǎng)絡(luò)VPC

        默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的VPC ID且不可更改。

        交換機(jī)

        默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的vSwitch ID且不可更改。

        安全組

        選擇安全組。

        Kafka Connect配置信息

        解析當(dāng)前ZIP包下的properties文件

        選擇新建properties文件。選擇.ZIP文件中包含的SourceConnector對(duì)應(yīng)的.properties文件。路徑為/etc/source-xxx.properties。在輸入框中更新下列字段的取值。

        • connector.class:運(yùn)行的Connector的包名稱,無需修改。

        • tasks.max:Task的最大數(shù)量。

        • rabbitmq.host:填寫RabbitMQ實(shí)例VPC接入點(diǎn)地址。可在RabbitMQ實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域查看。

        • rabbitmq.username:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的RabbitMQ實(shí)例靜態(tài)用戶名。

        • rabbitmq.password:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的RabbitMQ實(shí)例靜態(tài)用戶名密碼。

        • rabbitmq.virtual.host:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的Vhost。

        • kafka.topic:目標(biāo)Kafka Topic,請(qǐng)?jiān)谕哆f數(shù)據(jù)前,提前創(chuàng)建好目標(biāo)Topic。

        • rabbitmq.queue:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的Queue。

        示例代碼如下:

        connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector
        name=rabbitmq-source-connector
        # RabbitMQ實(shí)例VPC接入點(diǎn)信息。
        rabbitmq.host=xxx
        # RabbitMQ實(shí)例靜態(tài)用戶名密碼。
        rabbitmq.password=xxx
        # RabbitMQ實(shí)例靜態(tài)用戶名。
        rabbitmq.username=xxx
        # RabbitMQ實(shí)例Vhost。
        rabbitmq.virtual.host=xxx
        # 目標(biāo)Kafka Topic。
        kafka.topic=xxx
        # RabbitMQ實(shí)例隊(duì)列。
        rabbitmq.queue=xxx
        tasks.max=4
      3. 實(shí)例配置配置向?qū)ВO(shè)置以下參數(shù),然后單擊下一步

        配置項(xiàng)

        參數(shù)

        說明

        Worker規(guī)格

        Worker規(guī)格

        選擇合適的Worker規(guī)格。

        最小Worker數(shù)

        設(shè)置最小Worker數(shù)量。

        最大Worker數(shù)

        設(shè)置最大Worker數(shù)量。此數(shù)值不得超過Task的最大數(shù)量。

        橫向擴(kuò)縮容閾值 %

        當(dāng)利用率大于或小于設(shè)置的CPU和Memory數(shù)值時(shí),觸發(fā)自動(dòng)擴(kuò)容或縮容。僅當(dāng)最小Worker數(shù)最大Worker數(shù)值不相等時(shí),需要配置此參數(shù)。

        Kafka Connect Worker 配置

        自動(dòng)創(chuàng)建Kafka Connect Worker依賴資源

        建議勾選此項(xiàng),此時(shí)會(huì)在選擇的Kafka實(shí)例中自動(dòng)創(chuàng)建Kafka Connect運(yùn)行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動(dòng)填入配置框中,包括以下配置項(xiàng):

        • Offset Topic:用于存儲(chǔ)源數(shù)據(jù)偏移量,命名規(guī)則為connect-eb-offset-<任務(wù)名稱>

        • Config Topic:用于存儲(chǔ)Connectors以及Tasks的配置信息,命名規(guī)則為connect-eb-config-<任務(wù)名稱>

        • Status Topic:用于存儲(chǔ)Connectors以及Tasks狀態(tài)信息,命名規(guī)則為connect-eb-status-<任務(wù)名稱>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消費(fèi)Internal Topics的消費(fèi)組,命名規(guī)則為connect-eb-cluster-<任務(wù)名稱>

        • Kafka Source Connector Consumer Group:只針對(duì)Sink Connector有效,用于消費(fèi)源Topic中的數(shù)據(jù),命名規(guī)則為connector-eb-cluster-<任務(wù)名稱>-<connector名稱>

      4. 運(yùn)行配置區(qū)域,將日志投遞方式設(shè)置為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置,然后單擊保存

        重要

        建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會(huì)導(dǎo)致任務(wù)運(yùn)行失敗。

    • 任務(wù)屬性

      設(shè)置此任務(wù)的重試策略及死信隊(duì)列。更多信息,請(qǐng)參見重試和死信

    等待任務(wù)狀態(tài)變?yōu)?b data-tag="uicontrol" id="88548e1005asr" class="uicontrol">運(yùn)行中,此時(shí)Connector已經(jīng)在正常工作中。

步驟三:測試Connector

  1. 登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),然后在左側(cè)導(dǎo)航欄選擇實(shí)例列表

  2. 實(shí)例列表頁面的頂部菜單欄選擇地域,然后在實(shí)例列表中,單擊目標(biāo)實(shí)例名稱。

  3. 在左側(cè)導(dǎo)航欄,單擊Queue列表,然后單擊目標(biāo)Queue右側(cè)操作列的詳情

  4. Queue詳情頁面,單擊被綁定信息頁簽的添加被綁定

  5. 添加被綁定面板,選擇源Exchangeamq.direct,單擊確定

  6. 被綁定信息頁簽,單擊amq.direct Exchange右側(cè)操作列的發(fā)送消息,向Kafka的目標(biāo)Topic發(fā)送消息。更多信息,請(qǐng)參見發(fā)送消息image..png

  7. 登錄云消息隊(duì)列 Kafka 版控制臺(tái),在實(shí)例列表頁面,單擊目標(biāo)實(shí)例。

  8. 在目標(biāo)實(shí)例頁面,單擊目標(biāo)Topic,然后單擊消息查詢,查看插入的消息數(shù)據(jù)。image..png

常見報(bào)錯(cuò)

場景一:所有Tasks運(yùn)行失敗

錯(cuò)誤信息:

All tasks under connector mongo-source failed, please check the error trace of the task.

解決方法:在消息流入任務(wù)詳情頁面,單擊基礎(chǔ)信息區(qū)域的診斷鏈接,即可跳轉(zhuǎn)到Connector監(jiān)控頁面,可以看到Tasks運(yùn)行失敗的詳細(xì)錯(cuò)誤信息。

場景二:Kafka Connect退出

錯(cuò)誤信息:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

解決方法:由于狀態(tài)獲取可能會(huì)有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態(tài),您可以按照以下步驟查看錯(cuò)誤信息。

  1. 在消息流入任務(wù)詳情頁面的Worker信息區(qū)域,單擊SAE應(yīng)用后的實(shí)例名稱,跳轉(zhuǎn)到SAE應(yīng)用詳情頁面。

  2. 基本信息頁面,單擊實(shí)例部署信息頁簽。

  3. 在實(shí)例右側(cè)操作列,單擊Webshell登錄Kafka Connect運(yùn)行環(huán)境。實(shí)例部署信息

    • 執(zhí)行vi /home/admin/connector-bootstrap.log命令,查看Connector啟動(dòng)日志,查找其中是否包含錯(cuò)誤信息。

    • 執(zhí)行vi /opt/kafka/logs/connect.log命令,查看Connector運(yùn)行日志,在其中查找ERROR或者WARN字段來查看是否有錯(cuò)誤信息。

基于錯(cuò)誤信息提示進(jìn)行修復(fù)操作后,可以重新啟動(dòng)對(duì)應(yīng)任務(wù)。

場景三:Connector參數(shù)校驗(yàn)失敗

錯(cuò)誤信息:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

解決方法:此時(shí)需要根據(jù)錯(cuò)誤信息,找出具體哪個(gè)參數(shù)出錯(cuò),更新對(duì)應(yīng)參數(shù)即可。若基于上述錯(cuò)誤信息無法定位具體的出錯(cuò)參數(shù),可以參考上文場景二中的步驟登錄Kafka Connect運(yùn)行環(huán)境,執(zhí)行以下命令,查詢參數(shù)是否校驗(yàn)通過。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

該指令會(huì)返回Connector參數(shù)中每個(gè)參數(shù)是否校驗(yàn)通過,若不通過,則errors屬性非空,如下所示。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}