日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

AMQP客戶端接入

配置服務端訂閱消費組消息后,您需開發AMQP客戶端,實現企業服務器接入并接收冷鏈運輸追蹤器上報的數據。本文介紹使用AMQP協議的JMS客戶端接入阿里云物聯網平臺,接收服務端訂閱的消息。

準備開發環境

本文AMQP客戶端示例使用Java開發語言,推薦使用Apache Qpid JMS客戶端。您可訪問Qpid JMS 0.57.0,查看Qpid JMS客戶端的使用說明。

示例使用的開發環境如下:

AMQP客戶端接入示例

  1. 打開IntelliJ IDEA,創建一個Mave工程。例如Amqp

  2. 在工程中的pom.xml文件中,添加Maven依賴,然后單擊Load Maven Changes圖標,完成依賴包下載。

    <!-- amqp 1.0 qpid client -->
            <dependency>
                <groupId>org.apache.qpid</groupId>
                <artifactId>qpid-jms-client</artifactId>
                <version>0.57.0</version>
            </dependency>
            <!-- util for base64-->
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
                <scope>compile</scope>
            </dependency>
  3. 在路徑Amqp\src\main\java下,創建Java類(例如AmqpJavaClientDemo),將AmqpJavaClientDemo.java文件內容替換為以下代碼。

    以下代碼示例中涉及的參數說明,請參見AMQP客戶端接入說明

    import java.net.URI;
    import java.util.Hashtable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import org.apache.commons.codec.binary.Base64;
    import org.apache.qpid.jms.JmsConnection;
    import org.apache.qpid.jms.JmsConnectionListener;
    import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AmqpJavaClientDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
    
        //業務處理異步線程池,線程池參數可以根據您的業務特點調整,或者您也可以用其他異步方式處理接收到的消息。
        private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000));
    
        public static void main(String[] args) throws Exception {
            //參數說明,請參見AMQP客戶端接入說明文檔。
            String accessKey = "${YourAccessKey}";
            String accessSecret = "${YourAccessSecret}";
            String consumerGroupId = "${YourConsumerGroupId}";
            //iotInstanceId:實例ID。
            String iotInstanceId = "${YourIotInstanceId}"; 
            long timeStamp = System.currentTimeMillis();
            //簽名方法:支持hmacmd5、hmacsha1和hmacsha256。
            String signMethod = "hmacsha1";
            //控制臺服務端訂閱中消費組狀態頁客戶端ID一欄將顯示clientId參數。
            //建議使用機器UUID、MAC地址、IP等唯一標識等作為clientId。便于您區分識別不同的客戶端。
            String clientId = "${YourClientId}";
    
            //userName組裝方法,請參見AMQP客戶端接入說明文檔。
            String userName = clientId + "|authMode=aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",iotInstanceId=" + iotInstanceId
                + ",consumerGroupId=" + consumerGroupId
                + "|";
            //計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
            String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
            String password = doSign(signContent,accessSecret, signMethod);
            //接入域名,請參見AMQP客戶端接入說明文檔。
            String connectionUrl = "failover:(amqps://${YourHost}:5671?amqp.idleTimeout=80000)"
                + "?failover.reconnectDelay=30";
    
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF",connectionUrl);
            hashtable.put("queue.QUEUE", "default");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // 創建連接。
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // 創建會話。
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手動調用message.acknowledge()。
            // Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            // 創建Receiver連接。
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        }
    
        private static MessageListener messageListener = new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    //1.收到消息之后一定要ACK。
                    // 推薦做法:創建Session選擇Session.AUTO_ACKNOWLEDGE,這里會自動ACK。
                    // 其他做法:創建Session選擇Session.CLIENT_ACKNOWLEDGE,這里一定要調message.acknowledge()來ACK。
                    // message.acknowledge();
                    //2.建議異步處理收到的消息,確保onMessage函數里沒有耗時邏輯。
                    // 如果業務處理耗時過程過長阻塞住線程,可能會影響SDK收到消息后的正常回調。
                    executorService.submit(() -> processMessage(message));
                } catch (Exception e) {
                    logger.error("submit task occurs exception ", e);
                }
            }
        };
    
        /**
         * 在這里處理您收到消息后的具體業務邏輯。
         */
        private static void processMessage(Message message) {
            try {
                byte[] body = message.getBody(byte[].class);
                String content = new String(body);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                String tag = message.getStringProperty("tag");
                logger.info("receive message"
                    + ",\n topic = " + topic
                    + ",\n messageId = " + messageId
                    + ",\n tag = " + tag
                    + ",\n content = " + content
                    +"\n");
                System.out.println();
                //如果創建Session選擇的是Session.CLIENT_ACKNOWLEDGE,這里需要手動ACK。
                message.acknowledge();
                //如果要對收到的消息做耗時的處理,請異步處理,確保這里不要有耗時邏輯。
            } catch (Exception e) {
                logger.error("processMessage occurs error ", e);
            }
        }
    
        private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
            /**
             * 連接成功建立。
             */
            @Override
            public void onConnectionEstablished(URI remoteURI) {
                logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
            }
    
            /**
             * 嘗試過最大重試次數之后,最終連接失敗。
             */
            @Override
            public void onConnectionFailure(Throwable error) {
                logger.error("onConnectionFailure, {}", error.getMessage());
            }
    
            /**
             * 連接中斷。
             */
            @Override
            public void onConnectionInterrupted(URI remoteURI) {
                logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
            }
    
            /**
             * 連接中斷后又自動重連上。
             */
            @Override
            public void onConnectionRestored(URI remoteURI) {
                logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
            }
    
            @Override
            public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
    
            @Override
            public void onSessionClosed(Session session, Throwable cause) {}
    
            @Override
            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
    
            @Override
            public void onProducerClosed(MessageProducer producer, Throwable cause) {}
        };
    
        /**
         * 計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
         */
        private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
            SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            byte[] rawHmac = mac.doFinal(toSignString.getBytes());
            return Base64.encodeBase64String(rawHmac);
        }
    }
  4. 在以上代碼中配置相關參數。

    參數

    示例

    說明

    accessKey

    LTAI4GFGQvKuqHJhFa******

    您的阿里云賬號的AccessKey ID和AccessKey Secret。

    登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。

    說明

    如果使用RAM用戶,您需授予該RAM用戶管理物聯網平臺的權限(AliyunIOTFullAccess),否則將連接失敗。授權方法請參見授權RAM用戶訪問物聯網平臺

    accessSecret

    iMS8ZhCDdfJbCMeA005sieKe******

    consumerGroupId

    VWhGZ2QnP7kxWpeSSjt******

    當前物聯網平臺對應實例中的消費組ID。

    登錄物聯網平臺控制臺,在對應實例的消息轉發 > 服務端訂閱 > 消費組列表查看您的消費組ID。

    iotInstanceId

    iot-***j

    實例ID。您可在物聯網平臺控制臺實例概覽頁面,查看當前實例的ID。

    • 若有ID值,必須傳入該ID值。

    • 若無實例概覽頁面或ID值,傳入空值,即iotInstanceId = ""

    clientId

    12345

    表示客戶端ID,需您自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。

    AMQP客戶端接入并啟動成功后,登錄物聯網平臺控制臺,在對應實例的消息轉發 > 服務端訂閱 > 消費組列表頁簽,單擊消費組對應的查看消費組詳情頁面將顯示該參數,方便您識別區分不同的客戶端。

    connectionUrl

    failover:(amqps://iothub-unit-g****.amqp.iothub.aliyuncs.com:5671?amqp.idleTimeout=80000)" + "?failover.reconnectDelay=30

    AMQP客戶端接入物聯網平臺的連接地址。${YourHost}對應的AMQP接入域名信息,請參見查看實例終端節點

  5. 運行程序文件AmqpJavaClientDemo.java。例如接收到自定義Topic數據,執行結果如下圖。

    運行結果

  6. 代碼運行成功后,您可以在服務端訂閱中,查看消費組狀態的基本信息。

    具體操作,請參見管理AMQP消費組

    消費狀態

查看日志

所有配置完成后,登錄物聯網平臺控制臺,在企業版實例下的監控運維 > 日志服務中,從產品列表選擇冷鏈運輸追蹤器,可在云端運行日志頁簽下,查看完整的日志信息。

日志信息

序號

描述

設備上報數據到阿里云。

數據轉發到AMQP服務端訂閱消費組。

上報到平臺上的數據通過AMQP通道流轉至您的服務器。

后續步驟

云端下發指令