日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

作為Output接入

云消息隊列 Kafka 版可以作為Output接入Logstash。本文說明如何在公網環境下通過Logstash向云消息隊列 Kafka 版發送消息。

前提條件

在開始本教程前,請確保您已完成以下操作:

步驟一:獲取接入點

Logstash通過云消息隊列 Kafka 版的接入點與云消息隊列 Kafka 版建立連接。

  1. 登錄云消息隊列 Kafka 版控制臺

  2. 概覽頁面的資源分布區域,選擇地域。

  3. 實例列表頁面,單擊作為Output接入Logstash的實例的名稱。
  4. 實例詳情頁面的接入點信息區域,獲取實例的接入點。在配置信息區域,獲取用戶名密碼

    endpoint
    說明

    不同接入點的差異,請參見接入點對比

步驟二:創建Topic

創建用于存儲消息的Topic。

  1. 登錄云消息隊列 Kafka 版控制臺

  2. 概覽頁面的資源分布區域,選擇地域。

    重要

    Topic需要在應用程序所在的地域(即所部署的ECS的所在地域)進行創建。Topic不能跨地域使用。例如Topic創建在華北2(北京)這個地域,那么消息生產端和消費端也必須運行在華北2(北京)的ECS。

  3. 實例列表頁面,單擊目標實例名稱。

  4. 在左側導航欄,單擊Topic 管理

  5. Topic 管理頁面,單擊創建 Topic

  6. 創建 Topic面板,設置Topic屬性,然后單擊確定

    創建Topic

    參數

    說明

    示例

    名稱

    Topic名稱。

    demo

    描述

    Topic的簡單描述。

    demo test

    分區數

    Topic的分區數量。

    12

    存儲引擎

    說明

    當前僅專業版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。

    Topic消息的存儲引擎。

    云消息隊列 Kafka 版支持以下兩種存儲引擎。

    • 云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規格類型標準版(高寫版)時,存儲引擎只能為云存儲

    • Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。

    云存儲

    消息類型

    Topic消息的類型。

    • 普通消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息

    • 分區順序消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,仍然保證分區內按照發送順序存儲。但是會出現部分分區發送消息失敗,等到分區恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區順序消息

    普通消息

    日志清理策略

    Topic日志的清理策略。

    存儲引擎選擇Local 存儲(當前僅專業版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略

    云消息隊列 Kafka 版支持以下兩種日志清理策略。

    • Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。

    • Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統宕機后恢復狀態,系統重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統狀態信息或配置信息。

      重要

      Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫

    Compact

    標簽

    Topic的標簽。

    demo

    創建完成后,在Topic 管理頁面的列表中顯示已創建的Topic。

步驟三:Logstash發送消息

在安裝了Logstash的機器上啟動Logstash,向創建的Topic發送消息。

  1. 執行cd命令切換到logstash的bin目錄。
  2. 執行以下命令下載kafka.client.truststore.jks證書文件。
    wget -O kafka.client.truststore.jks https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-log-stash-demo/vpc-ssl/mix.4096.client.truststore.jks
  3. 創建jaas.conf配置文件。
    1. 執行命令vim jaas.conf創建空的配置文件。
    2. i鍵進入插入模式。

    3. 輸入以下內容。
      KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="XXX"
        password="XXX";
      };
      參數描述示例值
      username公網/VPC實例的用戶名。 alikafka_pre-cn-v0h1***
      password公網/VPC實例的密碼。GQiSmqbQVe3b9hdKLDcIlkrBK6***
    4. Esc鍵回到命令行模式。

    5. 鍵進入底行模式,輸入wq,然后按回車鍵保存文件并退出。

  4. 創建output.conf配置文件。
    1. 執行命令vim output.conf創建空的配置文件。
    2. i鍵進入插入模式。

    3. 輸入以下內容。
      input {
          stdin{}
      }
      
      output {
         kafka {
              bootstrap_servers => "alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093"
              topic_id => "logstash_test"
              security_protocol => "SASL_SSL"
              sasl_mechanism => "PLAIN"
              jaas_path => "/home/logstash-7.6.2/bin/jaas.conf"
              ssl_truststore_password => "KafkaOnsClient"
              ssl_truststore_location => "/home/logstash-7.6.2/bin/kafka.client.truststore.jks"
              ssl_endpoint_identification_algorithm => ""
          }
      }
      參數描述示例值
      bootstrap_servers云消息隊列 Kafka 版提供的公網接入點為SSL接入點。 alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
      topic_idTopic的名稱。logstash_test
      security_protocol安全協議。默認為SASL_SSL,無需修改。SASL_SSL
      sasl_mechanism安全認證機制。默認為PLAIN,無需修改。PLAIN
      jaas_pathjaas.conf配置文件位置。/home/logstash-7.6.2/bin/jaas.conf
      ssl_truststore_passwordkafka.client.truststore.jks證書密碼。默認值為KafkaOnsClient,無需修改。KafkaOnsClient
      ssl_truststore_locationkafka.client.truststore.jks證書位置。/home/logstash-7.6.2/bin/kafka.client.truststore.jks
      ssl_endpoint_identification_algorithmSSL接入點辨識算法。6.x及以上版本Logstash需要加上該參數。空值
    4. Esc鍵回到命令行模式。

    5. 鍵進入底行模式,輸入wq,然后按回車鍵保存文件并退出。

  5. 向創建的Topic發送消息。
    1. 執行./logstash -f output.conf
    2. 輸入test,然后按回車鍵。
      output_result

步驟四:查看Topic分區

查看消息發送到Topic的情況。

  1. 登錄云消息隊列 Kafka 版控制臺

  2. 概覽頁面的資源分布區域,選擇地域。

  3. 實例列表頁面,單擊目標實例名稱。

  4. 在左側導航欄,單擊Topic 管理

  5. Topic 管理頁面,找到目標Topic,在其操作列中,選擇更多 > 分區狀態

    表 1. 分區狀態信息

    參數

    說明

    分區ID

    該Topic分區的ID號。

    最小位點

    該Topic在當前分區下的最小消費位點。

    最大位點

    該Topic在當前分區下的最大消費位點。

    分區消息量

    該Topic在當前分區下的消息總量。

    最近更新時間

    本分區中最近一條消息的存儲時間。

    分區狀態信息

步驟五:按位點查詢消息

您可以根據發送的消息的分區ID和位點信息查詢該消息。

  1. 登錄云消息隊列 Kafka 版控制臺

  2. 概覽頁面的資源分布區域,選擇地域。

  3. 實例列表頁面,單擊目標實例名稱。

  4. 在左側導航欄,單擊消息查詢

  5. 消息查詢頁面的查詢方式列表中,選擇按位點查詢

  6. Topic列表中,選擇消息所屬Topic名稱;在分區列表中,選擇消息所屬的分區;在起始位點文本框,輸入消息所在分區的位點,然后單擊查詢

    展示該查詢位點及以后連續的消息。例如,指定的分區和位點都為“5”,那么返回的結果從位點“5”開始。

    表 2. 查詢結果參數解釋

    參數

    描述

    分區

    消息的Topic分區。

    位點

    消息的所在的位點。

    Key

    消息的鍵(已強制轉化為String類型)。

    Value

    消息的值(已強制轉化為String類型),即消息的具體內容。

    消息創建時間

    發送消息時,客戶端自帶的或是您指定的ProducerRecord中的消息創建時間。

    說明
    • 如果配置了該字段,則按配置值顯示。

    • 如果未配置該字段,則默認取消息發送時的系統時間。

    • 如果顯示值為1970/x/x x:x:x,則說明發送時間配置為0或其他有誤的值。

    • 0.9及以前版本的云消息隊列 Kafka 版客戶端不支持配置該時間。

    操作

    • 單擊下載 Key:下載消息的鍵值。

    • 單擊下載 Value:下載消息的具體內容。

    重要
    • 查詢到的每條消息在控制臺上最多顯示1 KB的內容,超過1 KB的部分將自動截斷。如需查看完整的消息內容,請下載相應的消息。

    • 下載的消息最大為10 MB。如果消息超過10 MB,則只下載10 MB的內容。

更多信息

更多參數設置,請參見Kafka output plugin