本文介紹如何使用Ruby SDK通過接入點接入云消息隊列 Kafka 版并收發消息。

環境準備

您已安裝Ruby。更多信息,請參見安裝Ruby

安裝Ruby依賴庫

執行以下命令安裝Ruby依賴庫。
gem install ruby-kafka -v 0.6.8

準備配置

  1. 可選:下載SSL根證書。如果是SSL接入點,需下載該證書。
  2. 訪問Aliware-kafka-demos,單擊download,下載Demo工程到本地并解壓。
  3. 在解壓的Demo工程找到kafka-ruby-demo文件夾,根據接入點類型打開對應的文件夾,配置producer.ruby文件和consumer.ruby文件。
    表 1. 配置項說明
    參數描述
    brokersSSL接入點。您可在云消息隊列 Kafka 版控制臺實例詳情頁面的接入點信息區域獲取。
    topicTopic名稱。您可在云消息隊列 Kafka 版控制臺Topic 管理頁面獲取。
    usernameSASL用戶名。如果是默認接入點,則無此配置項。
    說明
    • 如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺實例詳情頁面的配置信息區域獲取默認的用戶名密碼
    • 如果實例已開啟ACL,請確保要使用的SASL用戶已被授予向云消息隊列 Kafka 版實例收發消息的權限。具體操作,請參見SASL用戶授權
    passwordSASL用戶名密碼。如果是默認接入點,則無此配置項。
    consumerGroupGroup名稱。您可在云消息隊列 Kafka 版控制臺Group 管理頁面獲取。
  4. 配置完成后,將配置文件所在文件夾下的全部文件(如果是SSL接入點實例,包含證書SSL根證書文件),上傳至服務器Ruby安裝目錄下。

發送消息

執行以下命令發送消息。

ruby producer.ruby

關于代碼中配置項說明,請參見配置項說明

消息程序producer.ruby代碼示例如下:
說明 示例代碼為SSL接入點的代碼。您需要根據實際接入點類型,刪除或者修改配置項,其余代碼請根據加粗代碼注釋修改。
# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "sasl-producer",     #如果是默認接入點,取值需修改為“simple-producer”。
    logger: logger,
    # put "./cert.pem" to anywhere this can read
    #如果是默認接入點,刪除以下三行代碼。
    ssl_ca_cert: File.read('./cert.pem'),    
    sasl_plain_username: username,
    sasl_plain_password: password,
    )

producer = kafka.producer

begin
    $stdin.each_with_index do |line, index|

    producer.produce(line, topic: topic)

    producer.deliver_messages
end

ensure

    producer.deliver_messages

    producer.shutdown
end

訂閱消息

執行以下命令消費消息。

ruby consumer.ruby

消息程序consumer.ruby示例代碼如下:

關于代碼中配置項說明,請參見配置項說明
說明 示例代碼為SSL接入點的代碼。您需要根據實際接入點類型,刪除或者修改配置項,其余代碼請根據加粗代碼注釋修改。
# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-consumer",    #如果是默認接入點,取值需修改為“test”
        socket_timeout: 20,
        logger: logger,
        # put "./cert.pem" to anywhere this can read
        #如果是默認接入點,刪除以下三行代碼。
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )

consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)

trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message|
    logger.info("Get message: #{message.value}")
    end
rescue Kafka::ProcessingError => e
    warn "Got error: #{e.cause}"
    consumer.pause(e.topic, e.partition, timeout: 20)

    retry
end