本文以Java SDK為例介紹如何在公網(wǎng)環(huán)境下使用SDK接入云消息隊(duì)列 Kafka 版的SSL接入點(diǎn)并使用PLAIN機(jī)制收發(fā)消息。

前提條件

安裝Java依賴庫(kù)

pom.xml中添加以下依賴。
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
說(shuō)明 建議您保持服務(wù)端和客戶端版本一致,即保持客戶端庫(kù)版本和云消息隊(duì)列 Kafka 版實(shí)例的大版本一致。您可以在云消息隊(duì)列 Kafka 版控制臺(tái)的實(shí)例詳情頁(yè)面獲取云消息隊(duì)列 Kafka 版實(shí)例的大版本。

準(zhǔn)備配置

  1. 創(chuàng)建Log4j配置文件log4j.properties
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, STDOUT
    
    log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
    log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
    log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n
  2. 下載SSL根證書
  3. 創(chuàng)建JAAS配置文件kafka_client_jaas.conf
    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    };                       
    說(shuō)明
    • 如果實(shí)例未開(kāi)啟ACL,您可以在云消息隊(duì)列 Kafka 版控制臺(tái)的實(shí)例詳情頁(yè)面獲取默認(rèn)用戶的用戶名和密碼。
    • 如果實(shí)例已開(kāi)啟ACL,請(qǐng)確保要使用的SASL用戶為PLAIN類型且已授權(quán)收發(fā)消息的權(quán)限,詳情請(qǐng)參見(jiàn)SASL用戶授權(quán)
  4. 創(chuàng)建云消息隊(duì)列 Kafka 版配置文件kafka.properties
    ##SSL接入點(diǎn),通過(guò)控制臺(tái)獲取。
    bootstrap.servers=xxxx
    ##Topic,通過(guò)控制臺(tái)創(chuàng)建。
    topic=xxxx
    ##Group,通過(guò)控制臺(tái)創(chuàng)建。
    group.id=xxxx
    ##SSL根證書。
    ssl.truststore.location=/xxxx/kafka.client.truststore.jks
    ##JAAS配置文件。
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf                       
  5. 創(chuàng)建配置文件加載程序JavaKafkaConfigurer.java
    import java.util.Properties;
    
    public class JavaKafkaConfigurer {
    
        private static Properties properties;
    
        public static void configureSasl() {
            //如果用-D或者其它方式設(shè)置過(guò),這里不再設(shè)置。
            if (null == System.getProperty("java.security.auth.login.config")) {
                //請(qǐng)注意將XXX修改為自己的路徑。
                //這個(gè)路徑必須是一個(gè)文件系統(tǒng)可讀的路徑,不能被打包到JAR中。
                System.setProperty("java.security.auth.login.config", getKafkaProperties().getProperty("java.security.auth.login.config"));
            }
        }
    
        public synchronized static Properties getKafkaProperties() {
            if (null != properties) {
                return properties;
            }
            //獲取配置文件kafka.properties的內(nèi)容。
            Properties kafkaProperties = new Properties();
            try {
                kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
            } catch (Exception e) {
                //沒(méi)加載到文件,程序要考慮退出。
                e.printStackTrace();
            }
            properties = kafkaProperties;
            return kafkaProperties;
        }
    }                    

發(fā)送消息

  1. 創(chuàng)建發(fā)送消息程序KafkaProducerDemo.java
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    
    public class KafkaProducerDemo {
    
        public static void main(String args[]) {
            //設(shè)置JAAS配置文件的路徑。
            JavaKafkaConfigurer.configureSasl();
    
            //加載kafka.properties。
            Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();
    
            Properties props = new Properties();
            //設(shè)置接入點(diǎn),請(qǐng)通過(guò)控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
            //設(shè)置SSL根證書的路徑,請(qǐng)記得將XXX修改為自己的路徑。
            //與sasl路徑類似,該文件也不能被打包到j(luò)ar中。
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
            //根證書store的密碼,保持不變。
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入。
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鑒權(quán)方式,保持不變。
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            //云消息隊(duì)列 Kafka 版消息的序列化方式。
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            //請(qǐng)求的最長(zhǎng)等待時(shí)間。
            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
            //設(shè)置客戶端內(nèi)部重試次數(shù)。
            props.put(ProducerConfig.RETRIES_CONFIG, 5);
            //設(shè)置客戶端內(nèi)部重試間隔。
            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
    
            //Hostname校驗(yàn)改成空。
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    
            //構(gòu)造Producer對(duì)象,注意,該對(duì)象是線程安全的,一般來(lái)說(shuō),一個(gè)進(jìn)程內(nèi)一個(gè)Producer對(duì)象即可。
            //如果想提高性能,可以多構(gòu)造幾個(gè)對(duì)象,但不要太多,最好不要超過(guò)5個(gè)。
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            //構(gòu)造一個(gè)云消息隊(duì)列 Kafka 版消息。
            String topic = kafkaProperties.getProperty("topic"); //消息所屬的Topic,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后,填寫在這里。
            String value = "this is the message's value"; //消息的內(nèi)容。
    
            try {
                //批量獲取 futures 可以加快速度, 但注意,批量不要太大。
                List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
                for (int i =0; i < 100; i++) {
                    //發(fā)送消息,并獲得一個(gè)Future對(duì)象。
                    ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                    Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                    futures.add(metadataFuture);
    
                }
                producer.flush();
                for (Future<RecordMetadata> future: futures) {
                    //同步獲得Future對(duì)象的結(jié)果。
                    try {
                        RecordMetadata recordMetadata = future.get();
                        System.out.println("Produce ok:" + recordMetadata.toString());
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            } catch (Exception e) {
                //客戶端內(nèi)部重試之后,仍然發(fā)送失敗,業(yè)務(wù)要應(yīng)對(duì)此類錯(cuò)誤。
                System.out.println("error occurred");
                e.printStackTrace();
            }
        }
    }  
  2. 編譯并運(yùn)行KafkaProducerDemo.java發(fā)送消息。

訂閱消息

選擇以下任意一種方式訂閱消息。
  • 單Consumer訂閱消息。
    1. 創(chuàng)建單Consumer訂閱消息程序KafkaConsumerDemo.java
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import org.apache.kafka.clients.CommonClientConfigs;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.config.SaslConfigs;
      import org.apache.kafka.common.config.SslConfigs;
      
      public class KafkaConsumerDemo {
      
          public static void main(String args[]) {
              //設(shè)置JAAS配置文件的路徑。
              JavaKafkaConfigurer.configureSasl();
      
              //加載kafka.properties。
              Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();
      
              Properties props = new Properties();
              //設(shè)置接入點(diǎn),請(qǐng)通過(guò)控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)。
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
              //設(shè)置SSL根證書的路徑,請(qǐng)記得將XXX修改為自己的路徑。
              //與SASL路徑類似,該文件也不能被打包到j(luò)ar中。
              props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
              //根證書存儲(chǔ)的密碼,保持不變。
              props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
              //接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入。
              props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
              //SASL鑒權(quán)方式,保持不變。
              props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
              //兩次Poll之間的最大允許間隔。
              //消費(fèi)者超過(guò)該值沒(méi)有返回心跳,服務(wù)端判斷消費(fèi)者處于非存活狀態(tài),服務(wù)端將消費(fèi)者從Group移除并觸發(fā)Rebalance,默認(rèn)30s。
              props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
              //設(shè)置單次拉取的量,走公網(wǎng)訪問(wèn)時(shí),該參數(shù)會(huì)有較大影響。
              props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
              props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
              //每次Poll的最大數(shù)量。
              //注意該值不要改得太大,如果Poll太多數(shù)據(jù),而不能在下次Poll之前消費(fèi)完,則會(huì)觸發(fā)一次負(fù)載均衡,產(chǎn)生卡頓。
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              //消息的反序列化方式。
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              //當(dāng)前消費(fèi)實(shí)例所屬的消費(fèi)組,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后填寫。
              //屬于同一個(gè)組的消費(fèi)實(shí)例,會(huì)負(fù)載消費(fèi)消息。
              props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
              //Hostname校驗(yàn)改成空。
              props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
      
              //構(gòu)造消費(fèi)對(duì)象,也即生成一個(gè)消費(fèi)實(shí)例。
              KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
              //設(shè)置消費(fèi)組訂閱的Topic,可以訂閱多個(gè)。
              //如果GROUP_ID_CONFIG是一樣,則訂閱的Topic也建議設(shè)置成一樣。
              List<String> subscribedTopics =  new ArrayList<String>();
              //如果需要訂閱多個(gè)Topic,則在這里加進(jìn)去即可。
              //每個(gè)Topic需要先在控制臺(tái)進(jìn)行創(chuàng)建。
              subscribedTopics.add(kafkaProperties.getProperty("topic"));
              consumer.subscribe(subscribedTopics);
      
              //循環(huán)消費(fèi)消息。
              while (true){
                  try {
                      ConsumerRecords<String, String> records = consumer.poll(1000);
                      //必須在下次Poll之前消費(fèi)完這些數(shù)據(jù), 且總耗時(shí)不得超過(guò)SESSION_TIMEOUT_MS_CONFIG。
                      //建議開(kāi)一個(gè)單獨(dú)的線程池來(lái)消費(fèi)消息,然后異步返回結(jié)果。
                      for (ConsumerRecord<String, String> record : records) {
                          System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                      }
                  } catch (Exception e) {
                      try {
                          Thread.sleep(1000);
                      } catch (Throwable ignore) {
      
                      }
                      e.printStackTrace();
                  }
              }
          }
      }
    2. 編譯并運(yùn)行KafkaConsumerDemo.java消費(fèi)消息。
  • 多Consumer訂閱消息。
    1. 創(chuàng)建多Consumer訂閱消息程序KafkaMultiConsumerDemo.java
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import java.util.concurrent.atomic.AtomicBoolean;
      import org.apache.kafka.clients.CommonClientConfigs;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.config.SaslConfigs;
      import org.apache.kafka.common.config.SslConfigs;
      import org.apache.kafka.common.errors.WakeupException;
      
      /**
       * 本教程演示如何在一個(gè)進(jìn)程內(nèi)開(kāi)啟多個(gè)Consumer同時(shí)消費(fèi)Topic。
       * 注意全局Consumer數(shù)量不要超過(guò)訂閱的Topic總分區(qū)數(shù)。
       */
      public class KafkaMultiConsumerDemo {
      
          public static void main(String args[]) throws InterruptedException {
              //設(shè)置JAAS配置文件的路徑。
              JavaKafkaConfigurer.configureSasl();
      
              //加載kafka.properties。
              Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
      
              Properties props = new Properties();
              //設(shè)置接入點(diǎn),請(qǐng)通過(guò)控制臺(tái)獲取對(duì)應(yīng)Topic的接入點(diǎn)。
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
              //設(shè)置SSL根證書的路徑,請(qǐng)記得將XXX修改為自己的路徑。
              //與SASL路徑類似,該文件也不能被打包到JAR中。
              props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
              //根證書存儲(chǔ)的密碼,保持不變。
              props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
              //接入?yún)f(xié)議,目前支持使用SASL_SSL協(xié)議接入。
              props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
              //SASL鑒權(quán)方式,保持不變。
              props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
              //兩次Poll之間的最大允許間隔。
              //消費(fèi)者超過(guò)該值沒(méi)有返回心跳,服務(wù)端判斷消費(fèi)者處于非存活狀態(tài),服務(wù)端將消費(fèi)者從Group移除并觸發(fā)Rebalance,默認(rèn)30s。
              props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
              //每次Poll的最大數(shù)量。
              //注意該值不要改得太大,如果Poll太多數(shù)據(jù),而不能在下次Poll之前消費(fèi)完,則會(huì)觸發(fā)一次負(fù)載均衡,產(chǎn)生卡頓。
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              //消息的反序列化方式。
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              //當(dāng)前消費(fèi)實(shí)例所屬的消費(fèi)組,請(qǐng)?jiān)诳刂婆_(tái)申請(qǐng)之后填寫。
              //屬于同一個(gè)組的消費(fèi)實(shí)例,會(huì)負(fù)載消費(fèi)消息。
              props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
              //構(gòu)造消費(fèi)對(duì)象,也即生成一個(gè)消費(fèi)實(shí)例。
      
              //Hostname校驗(yàn)改成空。
              props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
      
              int consumerNum = 2;
              Thread[] consumerThreads = new Thread[consumerNum];
              for (int i = 0; i < consumerNum; i++) {
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
                  List<String> subscribedTopics = new ArrayList<String>();
                  subscribedTopics.add(kafkaProperties.getProperty("topic"));
                  consumer.subscribe(subscribedTopics);
      
                  KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
                  consumerThreads[i] = new Thread(kafkaConsumerRunner);
              }
      
              for (int i = 0; i < consumerNum; i++) {
                  consumerThreads[i].start();
              }
      
              for (int i = 0; i < consumerNum; i++) {
                  consumerThreads[i].join();
              }
          }
      
          static class KafkaConsumerRunner implements Runnable {
              private final AtomicBoolean closed = new AtomicBoolean(false);
              private final KafkaConsumer consumer;
      
              KafkaConsumerRunner(KafkaConsumer consumer) {
                  this.consumer = consumer;
              }
      
              @Override
              public void run() {
                  try {
                      while (!closed.get()) {
                          try {
                              ConsumerRecords<String, String> records = consumer.poll(1000);
                              //必須在下次Poll之前消費(fèi)完這些數(shù)據(jù), 且總耗時(shí)不得超過(guò)SESSION_TIMEOUT_MS_CONFIG。
                              for (ConsumerRecord<String, String> record : records) {
                                  System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                              }
                          } catch (Exception e) {
                              try {
                                  Thread.sleep(1000);
                              } catch (Throwable ignore) {
      
                              }
                              e.printStackTrace();
                          }
                      }
                  } catch (WakeupException e) {
                      //如果關(guān)閉則忽略異常。
                      if (!closed.get()) {
                          throw e;
                      }
                  } finally {
                      consumer.close();
                  }
              }
      
              //可以被另一個(gè)線程調(diào)用的關(guān)閉Hook。
              public void shutdown() {
                  closed.set(true);
                  consumer.wakeup();
              }
          }
      }
    2. 編譯并運(yùn)行KafkaMultiConsumerDemo.java消費(fèi)消息。