日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

基于事件流實現RabbitMQ消息路由

本文介紹如何應用事件總線EventBridge的事件流功能實現云消息隊列 RabbitMQ 版的消息路由。

前提條件

背景信息

事件流作為更輕量、實時端到端的流式事件通道,提供輕量流式數據的過濾和轉換的能力,在不同的數據倉庫之間、數據處理程序之間、數據分析和處理系統之間進行數據同步。源端云消息隊列 RabbitMQ 版生產的消息可以通過事件流這個通道被路由到目標端的云消息隊列 RabbitMQ 版,無需定義事件總線。更多信息,請參見事件流概述

步驟一:在目標端創建事件流

說明

事件總線EventBridge暫不支持跨地域創建云消息隊列 RabbitMQ 版的事件流。

  1. 登錄事件總線EventBridge控制臺
  2. 在頂部菜單欄,選擇地域。
  3. 在左側導航欄,單擊事件流
  4. 事件流頁面,單擊創建事件流

  5. 創建事件流面板,設置任務名稱描述,配置以下參數,然后單擊保存

    • 任務創建

        1. Source(源)配置向導,選擇數據提供方消息隊列 RabbitMQ 版,設置以下參數,然后單擊下一步

          參數

          說明

          示例

          地域

          選擇云消息隊列 RabbitMQ 版源實例所在的地域。

          華東1(杭州)

          RabbitMQ 實例

          選擇生產云消息隊列 RabbitMQ 版消息的源實例。

          amqp-cn-7pp2mwbc****

          Vhost

          選擇源實例中的Vhost。

          test

          Queue

          選擇存儲消息的隊列。

          test

          批量推送

          批量推送可幫您批量聚合多個事件,當批量推送條數批量推送間隔(單位:秒)兩者條件達到其一時即會觸發批量推送。

          例如:您設置的推送條數為100 條,間隔時間為15 s,在10 s內消息條數已達到100條,那么該次推送則不會等15 s后再推送。

          開啟

          批量推送條數

          調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為 [1,10000]。

          100

          批量推送間隔(單位:秒)

          調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。

          3

        2. Filtering(過濾)Transform(轉換)配置向導,設置事件過濾、轉換規則,單擊下一步。事件轉換的配置說明,請參見使用函數計算實現消息數據清洗

        3. Sink(目標)配置向導,選擇服務類型消息隊列RabbitMQ版,配置以下參數,單擊保存

        4. 參數

          說明

          示例

          實例ID

          選擇已創建的云消息隊列 RabbitMQ 版實例。

          amqp-cn-zvp2pny6****

          Vhost

          選擇已創建的Vhost。

          test

          目標類型

          • Exchange:生產者將消息發送到Exchange,由Exchange將消息路由到一個或多個Queue中。

          • Queue:每個消息都會被投入到一個或多個Queue里。

          Queue 模式

          Exchange

          目標類型Exchange時,選擇云消息隊列 RabbitMQ 版中的Exchange。

          exchange

          Queue

          目標類型Queue時,選擇云消息隊列 RabbitMQ 版中的選擇接收消息的隊列。

          queue

          消息路由規則(Routing Key)

          事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。當目標類型Exchange時需要配置。

          部分事件

          $.data.key

          消息體(body)

          事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。

          部分事件

          $.data.body

          MessageId

          事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。

          部分事件

          $.data.props.messageId

          自定義屬性(Properties)

          事件總線EventBridge通過JSONPath提取事件中的數據,將指定的事件內容路由到事件目標。

          部分事件

    • 任務屬性

      設置事件流的重試策略及死信隊列。更多信息,請參見重試和死信

  6. 返回事件流頁面,找到創建好的事件流,在其右側操作欄,單擊啟用

  7. 提示對話框,閱讀提示信息,然后單擊確認

    啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。

步驟二:調用SDK發送消息

  1. 獲取接入點。您需要在云消息隊列 RabbitMQ 版控制臺獲取實例的接入點。在發送消息時,您需要為發布端配置該接入點,通過接入點接入云消息隊列 RabbitMQ 版實例。

    1. 登錄云消息隊列 RabbitMQ 版控制臺

    2. 概覽頁面的資源分布區域,選擇地域。

    3. 實例列表頁面,單擊目標實例名稱。

    4. 實例詳情頁面的接入點信息頁簽,將鼠標指針移動到目標類型的接入點,單擊該接入點右側的復制圖標,復制該接入點。

      類型

      說明

      示例值

      公網接入點

      公網環境可讀寫。按量付費實例默認支持,預付費實例需在購買時選擇才支持。

      XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

      VPC接入點

      VPC環境可讀寫。按量付費實例和預付費實例默認都支持。

      XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

  2. 安裝Java依賴庫。在pom.xml添加以下依賴。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- 支持開源所有版本 -->
    </dependency>
  3. 生成用戶名密碼。

    1. 登錄云消息隊列 RabbitMQ 版控制臺

    2. 概覽頁面的資源分布區域,選擇地域。

    3. 實例列表頁面,單擊目標實例名稱。

    4. 在左側導航欄,單擊靜態用戶名密碼

    5. 靜態用戶名密碼頁面,單擊創建用戶名密碼

    6. 創建用戶名密碼面板,輸入AccessKey ID,輸入AccessKey Secret,單擊確定

      靜態用戶名密碼頁面,顯示創建的靜態用戶名與密碼,密碼處于隱藏狀態。用戶名密碼

    7. 在創建的靜態用戶名密碼的密碼列,單擊顯示密碼,可查看用戶名的密碼。

  4. 生產消息。創建并編譯運行Qava

    重要

    編譯運行ProducerTest.java生產消息之前,您需要根據代碼提示信息配置參數列表中所列舉的參數。

    表 3. 參數列表

    參數

    示例值

    描述

    hostName

    1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

    云消息隊列 RabbitMQ 版實例接入點。

    Port

    5672

    默認端口。非加密端口為5672,加密端口為5671。

    userName

    MjoxODgwNzcwODY5MD****

    云消息隊列 RabbitMQ 版控制臺將阿里云賬號或RAM用戶的AccessKey ID、AccessKey Secret和云消息隊列 RabbitMQ 版實例ID通過Base64編碼后生成的靜態用戶名。您可以在云消息隊列 RabbitMQ 版控制臺靜態用戶名密碼頁面獲取。

    passWord

    NDAxREVDQzI2MjA0OT****

    云消息隊列 RabbitMQ 版控制臺將阿里云賬號或RAM用戶的AccessKey Secret和timestamp參數(系統當前時間)通過HMAC-SHA1生成一個簽名后,再將這個簽名和timestamp參數(系統當前時間)通過Base64編碼后生成的靜態密碼。您可以在云消息隊列 RabbitMQ 版控制臺靜態用戶名密碼獲取。

    virtualHost

    Test

    云消息隊列 RabbitMQ 版實例的Vhost。您可以在云消息隊列 RabbitMQ 版控制臺Vhost 詳情頁面查看。如何查看Vhost,請參見查看Vhost連接詳情

    ExchangeName

    ExchangeTest

    云消息隊列 RabbitMQ 版的Exchange。您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面,結合實例ID與Vhost模糊搜索已創建的Exchange。

    BindingKey

    BindingKeyTest

    云消息隊列 RabbitMQ 版Exchange與Queue的Binding Key。您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面查看Exchange的綁定關系,獲取Binding Key。

    QueueName

    QueueTest

    云消息隊列 RabbitMQ 版的Queue。僅在訂閱消息時候需要配置,您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面,查看Exchange的綁定關系,獲取Exchange綁定的Queue。

    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    import java.util.HashMap;
    import java.util.UUID;
    
    public class ProducerTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            // 設置接入點,在消息隊列RabbitMQ版控制臺實例詳情頁面查看。
            factory.setHost("xxx.xxx.aliyuncs.com");
            // 用戶名,在消息隊列RabbitMQ版控制臺靜態用戶名密碼頁面查看。
            factory.setUsername("${UserName}");
            // 密碼,在消息隊列RabbitMQ版控制臺靜態用戶名密碼頁面查看。
            factory.setPassword("${PassWord}");
            //設置為true,開啟Connection自動恢復功能;設置為false,關閉Connection自動恢復功能。
            factory.setAutomaticRecoveryEnabled(true);
            factory.setNetworkRecoveryInterval(5000);
            // 設置Vhost名稱,請確保已在消息隊列RabbitMQ版控制臺上創建完成。
            factory.setVirtualHost("${VhostName}");
            // 默認端口,非加密端口5672,加密端口5671。
            factory.setPort(5672);
            // 基于網絡環境合理設置超時時間。
            factory.setConnectionTimeout(30 * 1000);
            factory.setHandshakeTimeout(30 * 1000);
            factory.setShutdownTimeout(0);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();        
            channel.exchangeDeclare("${ExchangeName}", "${ExchangeType}", true, false, false, null);
            channel.queueDeclare("${QueueName}", true, false, false, new HashMap<String, Object>());
            channel.queueBind("${QueueName}", "${ExchangeName}", "${BindingKey}");
            // 開始發送消息。
            for (int i = 0; i < 100; i++  ) {
                // ${ExchangeName}必須在消息隊列RabbitMQ版控制臺上已存在,并且Exchange的類型與控制臺上的類型一致。
                // BindingKey根據業務需求填入相應的BindingKey。
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish("${ExchangeName}", "${BindingKey}", true, props,
                        ("消息發送Body"  + i).getBytes(StandardCharsets.UTF_8));
            }
            connection.close();
        }
    }

步驟三:驗證事件流

  1. 登錄云消息隊列 RabbitMQ 版控制臺,然后在左側導航欄選擇實例列表

  2. 實例列表頁面的頂部菜單欄選擇地域,然后在實例列表中,單擊目標實例名稱。

  3. 實例列表頁面,單擊步驟一:在目標端創建事件流中配置的目標實例名稱。

  4. 實例詳情頁面的基本信息區域,單擊消息查詢

  5. 設置查詢方式為按 Message ID 查詢按 Queue 查詢,然后設置時間范圍,單擊查詢

    查詢消息
  6. 查看查詢到的消息內容是否與步驟二:調用SDK發送消息中發送的消息一致。