使用RabbitMQ Connector同步RabbitMQ數(shù)據(jù)
本文介紹如何創(chuàng)建RabbitMQ Connector,將云消息隊(duì)列 RabbitMQ 版的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版。
前提條件
開通對(duì)象存儲(chǔ)OSS服務(wù)并創(chuàng)建存儲(chǔ)空間(Bucket)。更多信息,請(qǐng)參見控制臺(tái)創(chuàng)建存儲(chǔ)空間。
開通Serverless應(yīng)用引擎服務(wù)。更多信息,請(qǐng)參見準(zhǔn)備工作。
購買并部署云消息隊(duì)列 Kafka 版實(shí)例。更多信息,請(qǐng)參見購買和部署實(shí)例。
步驟一:創(chuàng)建RabbitMQ資源
登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),創(chuàng)建RabbitMQ實(shí)例。操作步驟,請(qǐng)參見創(chuàng)建實(shí)例。
單擊已創(chuàng)建的實(shí)例,在實(shí)例詳情頁面創(chuàng)建以下資源。
在左側(cè)導(dǎo)航欄,單擊靜態(tài)用戶名密碼,然后單擊創(chuàng)建用戶名密碼。更多信息,請(qǐng)參見創(chuàng)建用戶名密碼。
創(chuàng)建完成后保存用戶名和密碼。
在左側(cè)導(dǎo)航欄,單擊Vhost列表,然后單擊創(chuàng)建Vhost。更多信息,請(qǐng)參見創(chuàng)建Exchange。
在左側(cè)導(dǎo)航欄,單擊Queue列表,在當(dāng)前 Vhost右側(cè)的切換下拉列表中,選擇已創(chuàng)建的Vhost,然后單擊創(chuàng)建Queue。更多信息,請(qǐng)參見創(chuàng)建Queue。
步驟二:創(chuàng)建Connector
下載RabbitMQ Connector文件,上傳至提前創(chuàng)建好的OSS Bucket。更多信息,請(qǐng)參見控制臺(tái)上傳文件。
登錄云消息隊(duì)列 Kafka 版控制臺(tái),在概覽頁面的資源分布區(qū)域,選擇地域。
在左側(cè)導(dǎo)航欄,選擇
。在任務(wù)列表頁面,單擊創(chuàng)建任務(wù)列表。
在創(chuàng)建任務(wù)面板。設(shè)置任務(wù)名稱,配置以下配置項(xiàng)。
任務(wù)創(chuàng)建
在Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方為Apache Kafka Connect,單擊下一步。
在連接器配置配置向?qū)ВO(shè)置以下配置項(xiàng),然后單擊下一步。
配置項(xiàng)
參數(shù)
說明
Kafka Connect插件
Bucket存儲(chǔ)桶
選擇OSS Bucket。
文件
選擇上傳的.ZIP文件。
Kafka資源信息
Kafka 參數(shù)配置
選擇Source Connect。
Kafka實(shí)例
選擇前提條件中創(chuàng)建的實(shí)例。
專有網(wǎng)絡(luò)VPC
默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的VPC ID且不可更改。
交換機(jī)
默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的vSwitch ID且不可更改。
安全組
選擇安全組。
Kafka Connect配置信息
解析當(dāng)前ZIP包下的properties文件
選擇新建properties文件。選擇.ZIP文件中包含的SourceConnector對(duì)應(yīng)的.properties文件。路徑為/etc/source-xxx.properties。在輸入框中更新下列字段的取值。
connector.class:運(yùn)行的Connector的包名稱,無需修改。
tasks.max:Task的最大數(shù)量。
rabbitmq.host:填寫RabbitMQ實(shí)例VPC接入點(diǎn)地址。可在RabbitMQ實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域查看。
rabbitmq.username:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的RabbitMQ實(shí)例靜態(tài)用戶名。
rabbitmq.password:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的RabbitMQ實(shí)例靜態(tài)用戶名密碼。
rabbitmq.virtual.host:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的Vhost。
kafka.topic:目標(biāo)Kafka Topic,請(qǐng)?jiān)谕哆f數(shù)據(jù)前,提前創(chuàng)建好目標(biāo)Topic。
rabbitmq.queue:填寫步驟一:創(chuàng)建RabbitMQ資源中創(chuàng)建的Queue。
示例代碼如下:
connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector name=rabbitmq-source-connector # RabbitMQ實(shí)例VPC接入點(diǎn)信息。 rabbitmq.host=xxx # RabbitMQ實(shí)例靜態(tài)用戶名密碼。 rabbitmq.password=xxx # RabbitMQ實(shí)例靜態(tài)用戶名。 rabbitmq.username=xxx # RabbitMQ實(shí)例Vhost。 rabbitmq.virtual.host=xxx # 目標(biāo)Kafka Topic。 kafka.topic=xxx # RabbitMQ實(shí)例隊(duì)列。 rabbitmq.queue=xxx tasks.max=4
在實(shí)例配置配置向?qū)ВO(shè)置以下參數(shù),然后單擊下一步。
配置項(xiàng)
參數(shù)
說明
Worker規(guī)格
Worker規(guī)格
選擇合適的Worker規(guī)格。
最小Worker數(shù)
設(shè)置最小Worker數(shù)量。
最大Worker數(shù)
設(shè)置最大Worker數(shù)量。此數(shù)值不得超過Task的最大數(shù)量。
橫向擴(kuò)縮容閾值 %
當(dāng)利用率大于或小于設(shè)置的CPU和Memory數(shù)值時(shí),觸發(fā)自動(dòng)擴(kuò)容或縮容。僅當(dāng)最小Worker數(shù)和最大Worker數(shù)值不相等時(shí),需要配置此參數(shù)。
Kafka Connect Worker 配置
自動(dòng)創(chuàng)建Kafka Connect Worker依賴資源
建議勾選此項(xiàng),此時(shí)會(huì)在選擇的Kafka實(shí)例中自動(dòng)創(chuàng)建Kafka Connect運(yùn)行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動(dòng)填入配置框中,包括以下配置項(xiàng):
Offset Topic:用于存儲(chǔ)源數(shù)據(jù)偏移量,命名規(guī)則為
connect-eb-offset-<任務(wù)名稱>
。Config Topic:用于存儲(chǔ)Connectors以及Tasks的配置信息,命名規(guī)則為
connect-eb-config-<任務(wù)名稱>
。Status Topic:用于存儲(chǔ)Connectors以及Tasks狀態(tài)信息,命名規(guī)則為
connect-eb-status-<任務(wù)名稱>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消費(fèi)Internal Topics的消費(fèi)組,命名規(guī)則為
connect-eb-cluster-<任務(wù)名稱>
。Kafka Source Connector Consumer Group:只針對(duì)Sink Connector有效,用于消費(fèi)源Topic中的數(shù)據(jù),命名規(guī)則為
connector-eb-cluster-<任務(wù)名稱>-<connector名稱>
。
在運(yùn)行配置區(qū)域,將日志投遞方式設(shè)置為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置,然后單擊保存。
重要建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會(huì)導(dǎo)致任務(wù)運(yùn)行失敗。
任務(wù)屬性
設(shè)置此任務(wù)的重試策略及死信隊(duì)列。更多信息,請(qǐng)參見重試和死信。
等待任務(wù)狀態(tài)變?yōu)?b data-tag="uicontrol" id="88548e1005asr" class="uicontrol">運(yùn)行中,此時(shí)Connector已經(jīng)在正常工作中。
步驟三:測試Connector
登錄云消息隊(duì)列 RabbitMQ 版控制臺(tái),然后在左側(cè)導(dǎo)航欄選擇實(shí)例列表。
在實(shí)例列表頁面的頂部菜單欄選擇地域,然后在實(shí)例列表中,單擊目標(biāo)實(shí)例名稱。
在左側(cè)導(dǎo)航欄,單擊Queue列表,然后單擊目標(biāo)Queue右側(cè)操作列的詳情。
在Queue詳情頁面,單擊被綁定信息頁簽的添加被綁定。
在添加被綁定面板,選擇源Exchange為amq.direct,單擊確定。
在被綁定信息頁簽,單擊amq.direct Exchange右側(cè)操作列的發(fā)送消息,向Kafka的目標(biāo)Topic發(fā)送消息。更多信息,請(qǐng)參見發(fā)送消息。
登錄云消息隊(duì)列 Kafka 版控制臺(tái),在實(shí)例列表頁面,單擊目標(biāo)實(shí)例。
在目標(biāo)實(shí)例頁面,單擊目標(biāo)Topic,然后單擊消息查詢,查看插入的消息數(shù)據(jù)。
常見報(bào)錯(cuò)
場景一:所有Tasks運(yùn)行失敗
錯(cuò)誤信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解決方法:在消息流入任務(wù)詳情頁面,單擊基礎(chǔ)信息區(qū)域的診斷鏈接,即可跳轉(zhuǎn)到Connector監(jiān)控頁面,可以看到Tasks運(yùn)行失敗的詳細(xì)錯(cuò)誤信息。
場景二:Kafka Connect退出
錯(cuò)誤信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解決方法:由于狀態(tài)獲取可能會(huì)有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態(tài),您可以按照以下步驟查看錯(cuò)誤信息。
在消息流入任務(wù)詳情頁面的Worker信息區(qū)域,單擊SAE應(yīng)用后的實(shí)例名稱,跳轉(zhuǎn)到SAE應(yīng)用詳情頁面。
在基本信息頁面,單擊實(shí)例部署信息頁簽。
在實(shí)例右側(cè)操作列,單擊Webshell登錄Kafka Connect運(yùn)行環(huán)境。
執(zhí)行
vi /home/admin/connector-bootstrap.log
命令,查看Connector啟動(dòng)日志,查找其中是否包含錯(cuò)誤信息。執(zhí)行
vi /opt/kafka/logs/connect.log
命令,查看Connector運(yùn)行日志,在其中查找ERROR或者WARN字段來查看是否有錯(cuò)誤信息。
基于錯(cuò)誤信息提示進(jìn)行修復(fù)操作后,可以重新啟動(dòng)對(duì)應(yīng)任務(wù)。
場景三:Connector參數(shù)校驗(yàn)失敗
錯(cuò)誤信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解決方法:此時(shí)需要根據(jù)錯(cuò)誤信息,找出具體哪個(gè)參數(shù)出錯(cuò),更新對(duì)應(yīng)參數(shù)即可。若基于上述錯(cuò)誤信息無法定位具體的出錯(cuò)參數(shù),可以參考上文場景二中的步驟登錄Kafka Connect運(yùn)行環(huán)境,執(zhí)行以下命令,查詢參數(shù)是否校驗(yàn)通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
該指令會(huì)返回Connector參數(shù)中每個(gè)參數(shù)是否校驗(yàn)通過,若不通過,則errors屬性非空,如下所示。
"value":{
"name":"snapshot.mode",
"value":null,
"recommended_values":[
"never",
"initial_only",
"when_needed",
"initial",
"schema_only",
"schema_only_recovery"
],
"errors":[
"Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
],
"visible":true
}