日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

跨云賬號授權(quán)場景

更新時間:

本文以調(diào)用Java SDK為例,介紹在RAM角色跨賬號授權(quán)場景,通過開源SDK實現(xiàn)消息收發(fā)的操作過程,幫助您更好地理解消息收發(fā)的完整過程,其他語言或框架的SDK消息收發(fā)過程相似。

前提條件

背景信息

當(dāng)您需要通過RAM STS角色授權(quán)的方式訪問云消息隊列 RabbitMQ 版服務(wù)時,需要通過阿里云提供的權(quán)限認(rèn)證類(AliyunCredentialsProvider)設(shè)置 AccessKeyIDAccessKeySecretSecurityToken進(jìn)行權(quán)限認(rèn)證才能訪問。

借助訪問控制RAM的RAM角色,您可以跨云賬號授權(quán),使某個企業(yè)訪問另一個企業(yè)的云消息隊列 RabbitMQ 版

  • 企業(yè)A希望能專注于業(yè)務(wù)系統(tǒng),僅作為云消息隊列 RabbitMQ 版所有者。企業(yè)A希望可以授權(quán)企業(yè)B來操作部分業(yè)務(wù),例如:云消息隊列 RabbitMQ 版的運維、監(jiān)控以及管理等。

  • 企業(yè)A希望當(dāng)企業(yè)B的員工加入或離職時,無需做任何權(quán)限變更。企業(yè)B可以進(jìn)一步將企業(yè)A的資源訪問權(quán)限分配給企業(yè)B的RAM用戶(員工或應(yīng)用),并可以精細(xì)控制其員工或應(yīng)用對資源的訪問和操作權(quán)限。

  • 企業(yè)A希望如果雙方合同終止,企業(yè)A隨時可以撤銷企業(yè)B的授權(quán)。

更多信息,請參見RAM跨云賬號授權(quán)

收發(fā)消息流程(以Java語言為例)

開源SDK消息收發(fā)流程

說明

云消息隊列 RabbitMQ 版與開源RabbitMQ完全兼容。更多語言SDK,請參見開源RabbitMQ AMQP協(xié)議支持的多語言或框架SDK

安裝Java依賴庫

pom.xml中添加以下依賴。

<dependency>            
    <groupId>com.rabbitmq</groupId>            
    <artifactId>amqp-client</artifactId>            
    <version>5.5.0</version> <!-- 支持開源所有版本 -->       
</dependency>        
<dependency>          
    <groupId>com.alibaba.mq-amqp</groupId>          
    <artifactId>mq-amqp-client</artifactId>         
    <version>1.0.5</version>       
</dependency>       
<dependency>          
    <groupId>com.aliyun</groupId>          
    <artifactId>alibabacloud-sts20150401</artifactId>          
    <version>1.0.4</version>       
</dependency>

配置權(quán)限認(rèn)證類(AliyunCredentialsProvider)

  1. 創(chuàng)建權(quán)限認(rèn)證類AliyunCredentialsProvider.java,根據(jù)代碼提示信息,設(shè)置相關(guān)參數(shù)。具體信息,請參見參數(shù)列表

    import com.alibaba.mq.amqp.utils.UserUtils;
    import com.aliyun.auth.credentials.Credential;
    import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
    import com.aliyun.sdk.service.sts20150401.AsyncClient;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleRequest;
    import com.aliyun.sdk.service.sts20150401.models.AssumeRoleResponse;
    import com.rabbitmq.client.impl.CredentialsProvider;
    import darabonba.core.client.ClientOverrideConfiguration;
    import org.apache.commons.lang3.StringUtils;
    
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class AliyunCredentialsProvider implements CredentialsProvider {
        /**
         * 默認(rèn)過期時間,單位毫秒。可以根據(jù)業(yè)務(wù)實際情況設(shè)置。
         */
        private final long STS_TIMEOUT_DEFAULT = 1800 * 1000;
        /**
         * 實例ID,從云消息隊列 RabbitMQ 版控制臺獲取。
         */
        private final String instanceId;
        /**
         * Access Key ID。
         */
        private String accessKeyId;
        /**
         * Access Key Secret。
         */
        private String accessKeySecret;
        /**
         *  (可選)security temp token。
         */
        private String securityToken;
        /**
         * STS過期時間, 記錄后可提前更新STS token。
         */
        private Long timeStampLimit;
    
        // 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運維。
        // 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
        public AliyunCredentialsProvider(final String instanceId) {
            this.instanceId = instanceId;
        }
        public void updateProperties(String alibabaAccessKeyId, String alibabaAccessKeySecret, String region, String roleARN) throws ExecutionException, InterruptedException {
            this.timeStampLimit = System.currentTimeMillis() + STS_TIMEOUT_DEFAULT;
            // 自行調(diào)用AssumeRole接口實現(xiàn),進(jìn)行身份信息獲取。
            StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                    .accessKeyId(alibabaAccessKeyId)
                    .accessKeySecret(alibabaAccessKeySecret)
                    .build());
            AsyncClient client = AsyncClient.builder()
                    .region(region) // 請設(shè)置Region ID, 例如cn-hangzhou。
                    .credentialsProvider(provider)
                    .overrideConfiguration(
                            ClientOverrideConfiguration.create()
                                    // Endpoint請參考https://api.aliyun.com/product/Sts。
                                    .setEndpointOverride("sts." + region + ".aliyuncs.com")
                            //.setConnectTimeout(Duration.ofSeconds(30))
                    )
                    .build();
            AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder()
                    .roleArn(roleARN) // 從控制臺獲取得到的角色ARN。
                    .roleSessionName("testRoleName") // 當(dāng)前角色Session的名稱,可自定義。
                    .durationSeconds(STS_TIMEOUT_DEFAULT / 1000)
                    .build();
            CompletableFuture<AssumeRoleResponse> response = client.assumeRole(assumeRoleRequest);
            // Synchronously get the return value of the API request
            AssumeRoleResponse resp = response.get();
            if (resp.getBody().getCredentials() != null) {
                System.out.println("[INFO] Update AK, SK, Token successfully.");
                this.accessKeyId = resp.getBody().getCredentials().getAccessKeyId();
                this.securityToken = resp.getBody().getCredentials().getSecurityToken();
                this.accessKeySecret = resp.getBody().getCredentials().getAccessKeySecret();
            }
            client.close();
        }
    
        // 檢測當(dāng)前該token是否快要過期。
        public boolean isNearlyExpired() {
            // 提前30秒判斷。
            return System.currentTimeMillis() > timeStampLimit - 30 * 1000L;
        }
    
        @Override
        public String getUsername() {
            if(StringUtils.isNotEmpty(securityToken)) {
                return UserUtils.getUserName(accessKeyId, instanceId, securityToken);
            } else {
                return UserUtils.getUserName(accessKeyId, instanceId);
            }
        }
    
        @Override
        public String getPassword() {
            try {
                return UserUtils.getPassord(accessKeySecret);
            } catch (InvalidKeyException e) {
                //todo
            } catch (NoSuchAlgorithmException e) {
                //todo
            }
            return null;
        }
    }

表 1. 參數(shù)列表

參數(shù)

示例值

描述

hostName

1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

云消息隊列 RabbitMQ 版實例接入點。

Port

5672

默認(rèn)端口。非加密端口5672,加密端口5671。

AccessKeyID

LTAI5tJQKnB9zVvQ****

阿里云賬號或RAM用戶的AccessKey ID。您可以登錄RAM訪問控制臺,創(chuàng)建RAM角色,并賦予角色AliyunAMQPFullAccess權(quán)限,獲取角色的ARN,調(diào)用AssumeRole接口獲取一個扮演該角色的臨時身份。AssumeRole執(zhí)行成功會返回RAM角色的 AccessKeyIDAccessKeySecret以及SecurityToken。角色ARN的概念,請參見RAM角色概覽

AccessKeySecret

jw6MawfKOVBveRr84u****

阿里云賬號或RAM用戶的AccessKey Secret。

region

cn-hangzhou

調(diào)用對應(yīng)地域的AssumeRole接口,詳情請參見AssumeRole

roleARN

acs:ram::125xxxxxxx223:role/xxx

RAM角色的ARN。格式為acs:ram::<account_id>:role/<role_name>。詳情請參見AssumeRole

instanceId

amqp-cn-v0h1kb9nu***

云消息隊列 RabbitMQ 版的實例ID。您可以在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。如何查看實例ID,請參見查看實例詳情

virtualHost

Test

云消息隊列 RabbitMQ 版實例的Vhost。您可以在云消息隊列 RabbitMQ 版控制臺Vhost 列表頁面查看。如何查看Vhost,請參見查看Vhost連接詳情

ExchangeName

ExchangeTest

云消息隊列 RabbitMQ 版的Exchange。您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面獲取。

RoutingKey

RoutingKeyTest

云消息隊列 RabbitMQ 版Exchange與Queue的Routing Key。您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面查看Exchange的綁定關(guān)系,獲取Routing Key。

QueueName

QueueTest

云消息隊列 RabbitMQ 版的Queue。僅在訂閱消息時候需要配置,您可以在云消息隊列 RabbitMQ 版控制臺Exchange 列表頁面,查看Exchange的綁定關(guān)系,獲取Exchange綁定的Queue。

生產(chǎn)消息

創(chuàng)建并編譯運行ProducerTest.java

重要

編譯運行ProducerTest.java生產(chǎn)消息之前,您需要根據(jù)代碼提示信息配置參數(shù)列表中所列舉的參數(shù)。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    // 推薦將AK/SK/ARN等信息在環(huán)境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數(shù)據(jù)泄露風(fēng)險。
    // 阿里云賬號的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里云賬號的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里云服務(wù)所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里云角色ARN,從控制臺獲取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //設(shè)置實例的接入點。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //設(shè)置實例的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //設(shè)置Exchange、Queue和綁定關(guān)系。
    private static final String exchangeName = "${ExchangeName}";
    private static final String queueName = "${QueueName}";
    private static final String routingKey = "${RoutingKey}";
    //設(shè)置Exchange類型。
    private static final String exchangeType = "${ExchangeType}";

    public static void main(String[] args) throws InterruptedException, IOException, TimeoutException, ExecutionException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
        factory.setHost(hostName);
        // ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺概覽頁面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        // ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //設(shè)置為true,開啟Connection自動恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動恢復(fù)功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 設(shè)置Vhost名稱,請確保已在云消息隊列 RabbitMQ 版控制臺上創(chuàng)建完成。
        factory.setVirtualHost(virtualHost);
        // 默認(rèn)端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        // 基于網(wǎng)絡(luò)環(huán)境合理設(shè)置超時時間。
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // 開始發(fā)送消息,3600條消息,每條發(fā)送后暫停1秒,將持續(xù)1小時。
        for (int i = 0; i < 3600; i++) {
            try {
                if (aliyunCredentialsProvider.isNearlyExpired()) {
                    // 認(rèn)證可能過期,重新認(rèn)證
                    System.out.println("[WARN] Token maybe expired, so try to update it.");
                    updateSTSProperties(aliyunCredentialsProvider);
                    factory.setCredentialsProvider(aliyunCredentialsProvider);
                    // 當(dāng)配置更新后,需要重新建立連接。
                    connection = factory.newConnection();
                    channel = connection.createChannel();
                }
                // ${ExchangeName}必須在云消息隊列 RabbitMQ 版控制臺上已存在,并且Exchange的類型與控制臺上的類型一致。
                // ${RoutingKey}根據(jù)業(yè)務(wù)需求填入相應(yīng)的RoutingKey。
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
                channel.basicPublish(exchangeName, routingKey, true, props,
                        ("消息發(fā)送Body-"  + i).getBytes(StandardCharsets.UTF_8));
                System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId());
                Thread.sleep(1000L);
            } catch (Exception e) {
                System.out.println("[ERROR] Send fail, error: " + e.getMessage());
                Thread.sleep(5000L);
            }
        }
        connection.close();
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        // 推薦將AK/SK在環(huán)境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數(shù)據(jù)泄露風(fēng)險。
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}
說明

云消息隊列 RabbitMQ 版會對單實例的TPS流量峰值進(jìn)行限流,更多信息,請參見實例限流最佳實踐

訂閱消息

創(chuàng)建并編譯運行ConsumerTest.java訂閱消息。

重要

編譯運行ConsumerTest.java訂閱消息之前,您需要根據(jù)代碼提示信息配置參數(shù)列表中所列舉的參數(shù)。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    // 推薦將AK/SK/ARN等信息在環(huán)境變量中配置,若將其明文保存在工程代碼中,將帶來不必要的數(shù)據(jù)泄露風(fēng)險。
    // 阿里云賬號的AccessKey ID。
    private static final String alibabaAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    // 阿里云賬號的AccessKey Secret。
    private static final String alibabaAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    // 阿里云服務(wù)所在的Region。
    private static final String region = System.getenv("ALIBABA_CLOUD_REGION");
    // 阿里云角色ARN,從控制臺獲取。
    private static final String roleARN = System.getenv("ALIBABA_CLOUD_ROLE_ARN");
    //設(shè)置實例的接入點。
    private static final String hostName = "xxx.xxx.aliyuncs.com";
    private static final String instanceId = "${InstanceId}";
    //設(shè)置實例的Vhost。
    private static final String virtualHost = "${VirtualHost}";
    //設(shè)置Queue。
    private static final String queueName = "${QueueName}";

    public static void main(String[] args) throws IOException, TimeoutException, ExecutionException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面查看。
        factory.setHost(hostName);
        // ${instanceId}為實例ID,在云消息隊列 RabbitMQ 版控制臺概覽頁面查看。
        AliyunCredentialsProvider aliyunCredentialsProvider =
                new AliyunCredentialsProvider(instanceId);
        updateSTSProperties(aliyunCredentialsProvider);
        factory.setCredentialsProvider(aliyunCredentialsProvider);
        //設(shè)置為true,開啟Connection自動恢復(fù)功能;設(shè)置為false,關(guān)閉Connection自動恢復(fù)功能。
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // 設(shè)置Vhost名稱,請確保已在云消息隊列 RabbitMQ 版控制臺上創(chuàng)建完成。
        factory.setVirtualHost(virtualHost);
        // 默認(rèn)端口,非加密端口5672,加密端口5671。
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 創(chuàng)建${QueueName} ,該Queue必須在云消息隊列 RabbitMQ 版控制臺上已存在。
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        consume(channel, queueName);
        System.out.println("Consumer started.");

        // 循環(huán)檢測sts是否即將過期,若過期則更新connection,重新消費。
        // 這里為了方便理解,使用while循環(huán)檢測認(rèn)證是否接近過期。
        // 可以使用定時任務(wù),更優(yōu)雅地實現(xiàn)定時檢查、更新操作。
        while (true) {
            // 每次處理完消息后,可以判斷是否接近過期。
            // 如果接近過期,則更新一次認(rèn)證類,
            // 該過程需要重新創(chuàng)建連接,以確保業(yè)務(wù)持續(xù)運行。
            if (aliyunCredentialsProvider.isNearlyExpired()) {
                System.out.println("token maybe expired, so try to update it.");
                updateSTSProperties(aliyunCredentialsProvider);
                factory.setCredentialsProvider(aliyunCredentialsProvider);
                connection.close();
                connection = factory.newConnection();
                channel = connection.createChannel();
                // 重新開始消費消息。
                consume(channel, queueName);
                System.out.println("Consumer started.");
            } else {
                // 每秒檢測一次。
                Thread.sleep(1000);
            }
        }
    }

    public static void consume(Channel channel, String queueName) throws IOException {

        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                try {
                    //接收到的消息,進(jìn)行業(yè)務(wù)邏輯處理。
                    System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    System.out.println("Exception, cause:" + e.getMessage());
                }
            }
        });
    }

    public static void updateSTSProperties(AliyunCredentialsProvider aliyunCredentialsProvider) throws ExecutionException, InterruptedException {
        System.out.println("Try to update STS properties");
        aliyunCredentialsProvider.updateProperties(alibabaAccessKeyId, alibabaAccessKeySecret, region, roleARN);
    }
}

查詢消息

如果您想確認(rèn)消息是否成功發(fā)送至云消息隊列 RabbitMQ 版,可以在云消息隊列 RabbitMQ 版控制臺查詢消息。具體操作,請參見查詢消息