日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

調(diào)用HTTP協(xié)議的SDK收發(fā)普通消息

在控制臺創(chuàng)建完所有資源之后,您可以調(diào)用云消息隊列 RocketMQ 版的HTTP協(xié)議的SDK收發(fā)普通消息。

前提條件

  • 創(chuàng)建資源

    說明

    本文以普通消息為例進(jìn)行說明,因此您創(chuàng)建的普通消息的Topic不能用來收發(fā)其他類型的消息,包括定時、延時、順序和事務(wù)消息。換言之,切勿混用不同消息類型的Topic。

  • 創(chuàng)建AccessKey

下載并安裝HTTP協(xié)議的SDK

云消息隊列 RocketMQ 版提供以下語言的HTTP協(xié)議SDK,請按需下載并安裝相應(yīng)語言的客戶端SDK。

調(diào)用HTTP協(xié)議的SDK發(fā)送普通消息

獲取相應(yīng)語言的客戶端SDK后,您即可運(yùn)行以下示例代碼發(fā)送普通消息。

Java

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // 設(shè)置HTTP接入域名。
                "${HTTP_ENDPOINT}",
                // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
                "${ACCESS_KEY}",
                // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
                "${SECRET_KEY}"
        );

        // 所屬的Topic。
        final String topic = "${TOPIC}";
        // Topic所屬實例ID,默認(rèn)實例為空。
        final String instanceId = "${INSTANCE_ID}";

        // 獲取Topic的生產(chǎn)者。
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // 循環(huán)發(fā)送4條消息。
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // 普通消息。
                    pubMsg = new TopicMessage(
                            // 消息內(nèi)容。
                            "hello mq!".getBytes(),
                            // 消息標(biāo)簽。
                            "A"
                    );
                    // 設(shè)置屬性。
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 設(shè)置Key。
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // 消息內(nèi)容。
                            "hello mq!".getBytes(),
                            // 消息標(biāo)簽。
                            "A"
                    );
                    // 設(shè)置屬性。
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 定時消息,定時時間為10s后。
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步發(fā)送消息,只要不拋異常就是成功。
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步發(fā)送消息,只要不拋異常就是成功。
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}                          

Go

package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // 設(shè)置HTTP接入域名。
    endpoint := "${HTTP_ENDPOINT}"
    // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
    accessKey := "${ACCESS_KEY}"
    // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
    secretKey := "${SECRET_KEY}"
    // 所屬的Topic。
    topic := "${TOPIC}"
    // Topic所屬實例ID,默認(rèn)實例為空。
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // 循環(huán)發(fā)送4條消息。
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",         //消息內(nèi)容。
                MessageTag:  "",                  // 消息標(biāo)簽。
                Properties:  map[string]string{}, // 消息屬性。
            }
            // 設(shè)置Key。
            msg.MessageKey = "MessageKey"
            // 設(shè)置屬性。
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",         //消息內(nèi)容。
                MessageTag:  "",                  // 消息標(biāo)簽。
                Properties:  map[string]string{}, // 消息屬性。
            }
            // 設(shè)置屬性。
            msg.Properties["a"] = strconv.Itoa(i)
            // 定時消息,定時時間為10s后,值為毫秒級別的Unix時間戳。
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
        }
        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}                          

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // 設(shè)置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${ACCESS_KEY}",
            // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${SECRET_KEY}"
        );

        // 所屬的Topic。
        $topic = "${TOPIC}";
        // Topic所屬實例ID,默認(rèn)實例為空NULL。
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx"http:// 消息內(nèi)容。
                );
                // 設(shè)置屬性。
                $publishMessage->putProperty("a", $i);
                // 設(shè)置消息Key。
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // 定時消息,定時時間為10s后。
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

?>                         

Python

#!/usr/bin/env python
# coding=utf8
import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

#初始化Client。
mq_client = MQClient(
    #設(shè)置HTTP接入域名。
    "${HTTP_ENDPOINT}",
    #AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
    "${ACCESS_KEY}",
    #AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
    "${SECRET_KEY}"
    )
#所屬的Topic。
topic_name = "${TOPIC}"
#Topic所屬實例ID,默認(rèn)實例為空None。
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# 循環(huán)發(fā)布多條消息。
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            msg = TopicMessage(
                    # 消息內(nèi)容。
                    "I am test message %s.你好" % i, 
                    # 消息標(biāo)簽。
                    ""
                    )
            # 設(shè)置屬性。
            msg.put_property("a", "i")
            # 設(shè)置Key。
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        else:
            msg = TopicMessage(
                    # 消息內(nèi)容。
                    "I am test message %s." % i, 
                    # 消息標(biāo)簽。
                    ""
                    )
            msg.put_property("a", i)
            # 定時消息,毫秒級絕對時間。
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)                       

Node.js

const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// 設(shè)置HTTP接入域名。
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
const accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// 所屬的Topic。
const topic = "${TOPIC}";
// Topic所屬實例ID,默認(rèn)實例為空。
const instanceId = "${INSTANCE_ID}";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    // 循環(huán)發(fā)送4條消息。
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        msgProps = new MessageProperties();
        // 設(shè)置屬性。
        msgProps.putProperty("a", i);
        // 設(shè)置Key。
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        msgProps = new MessageProperties();
        // 設(shè)置屬性。
        msgProps.putProperty("a", i);
        // 定時消息,定時時間為10s后。
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // 消息發(fā)送失敗,需要進(jìn)行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進(jìn)行補(bǔ)償處理。
    console.log(e)
  }
})();                         

C++

//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // 設(shè)置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${ACCESS_KEY}",
            // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${SECRET_KEY}"
            );

    // 所屬的Topic。
    string topic = "${TOPIC}";
    // Topic所屬實例ID,默認(rèn)實例為空。
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // publish message, only have body. 
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // publish message, only have body and tag.
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // publish message, have body,tag,properties and key.
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // publish timer message, message will be consumed after StartDeliverTime
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                // StartDeliverTime is an absolute time in millisecond.
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic 
                << ", msgId is:" << pmResp.getMessageId() 
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}                        

C#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // 設(shè)置HTTP接入域名。
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
        private const string _accessKeyId = "${ACCESS_KEY}";
        // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
        private const string _secretAccessKey = "${SECRET_KEY}";
        // 所屬的Topic。
        private const string _topicName = "${TOPIC}";
        // Topic所屬實例ID,默認(rèn)實例為空。
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // 循環(huán)發(fā)送4條消息。
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // 設(shè)置屬性。
                        sendMsg.PutProperty("a", i.ToString());
                        // 設(shè)置Key。
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // 設(shè)置屬性。
                        sendMsg.PutProperty("a", i.ToString());
                        // 定時消息,定時時間為10s后。
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}                       

同時,您也可以在控制臺頁面,找到您創(chuàng)建的Topic,在其操作列,單擊更多,在下拉列表中,選擇快速體驗,按需通過控制臺或Docker快速體驗。

調(diào)用HTTP協(xié)議的SDK消費普通消息

消息發(fā)送成功后,需要啟動消費者來消費普通消息。請按需運(yùn)行對應(yīng)語言的示例代碼來啟動消費者,并測試消費消息的功能。請按照說明正確設(shè)置相關(guān)參數(shù)。

Java

 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class Consumer {
 
     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 // 設(shè)置HTTP接入域名。
                 "${HTTP_ENDPOINT}",
                 // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
                 "${ACCESS_KEY}",
                 // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
                 "${SECRET_KEY}"
         );
 
         // 所屬的Topic。
         final String topic = "${TOPIC}";
         // 您在控制臺創(chuàng)建的Group ID(Consumer ID)。
         final String groupId = "${GROUP_ID}";
         // Topic所屬實例ID,默認(rèn)實例為空。
         final String instanceId = "${INSTANCE_ID}";
 
         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }
 
         // 在當(dāng)前線程循環(huán)消費消息,建議是多開個幾個線程并發(fā)消費消息。
         do {
             List<Message> messages = null;
 
             try {
                 // 長輪詢消費消息。
                 // 長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
                 messages = consumer.consumeMessage(
                         3,//  一次最多消費3條(最多可設(shè)置為16條)。
                         3// 長輪詢時間3秒(最多可設(shè)置為30秒)。
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             // 沒有消息。
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                 continue;
             }
 
             // 處理業(yè)務(wù)邏輯。
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }
 
             // Message.nextConsumeTime前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
             // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }
 
                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     // 某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                         + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
 }        

Go

package main

import (
	"fmt"
	"github.com/gogap/errors"
	"strings"
	"time"

	"github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
	// 設(shè)置HTTP接入域名。
	endpoint := "${HTTP_ENDPOINT}"
	// AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
	accessKey := "${ACCESS_KEY}"
	// AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
	secretKey := "${SECRET_KEY}"
	// 所屬的Topic。
	topic := "${TOPIC}"
	// Topic所屬實例ID,默認(rèn)實例為空。
	instanceId := "${INSTANCE_ID}"
	// 您在控制臺創(chuàng)建的Group ID(Consumer ID)。
	groupId := "${GROUP_ID}"

	client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

	mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

	for {
		endChan := make(chan int)
		respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
		errChan := make(chan error)
		go func() {
			select {
			case resp := <-respChan:
				{
					// 處理業(yè)務(wù)邏輯。
					var handles []string
					fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
					for _, v := range resp.Messages {
						handles = append(handles, v.ReceiptHandle)
						fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
							"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
							v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
							v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
					}

                    // NextConsumeTime前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
                    // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
                    ackerr := mqConsumer.AckMessage(handles)
					if ackerr != nil {
						// 某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
						fmt.Println(ackerr)
						for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
							fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
								errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
						}
						time.Sleep(time.Duration(3) * time.Second)
					} else {
						fmt.Printf("Ack ---->\n\t%s\n", handles)
					}

					endChan <- 1
				}
			case err := <-errChan:
				{
					// 沒有消息。
					if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
						fmt.Println("\nNo new message, continue!")
					} else {
						fmt.Println(err)
						time.Sleep(time.Duration(3) * time.Second)
					}
					endChan <- 1
				}
			case <-time.After(35 * time.Second):
				{
					fmt.Println("Timeout of consumer message ??")
					endChan <- 1
				}
			}
		}()

		// 長輪詢消費消息。
		// 長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
		mqConsumer.ConsumeMessage(respChan, errChan,
			3, //  一次最多消費3條(最多可設(shè)置為16條)。
			3, // 長輪詢時間3秒(最多可設(shè)置為30秒)。
		)
		<-endChan
	}
}                                                                   

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // 設(shè)置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${ACCESS_KEY}",
            // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${SECRET_KEY}"
        );

        // 所屬的Topic。
        $topic = "${TOPIC}";
        // 您在控制臺創(chuàng)建的Group ID(Consumer ID)。
        $groupId = "${GROUP_ID}";
        // Topic所屬實例ID,默認(rèn)實例為空NULL。
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // 在當(dāng)前線程循環(huán)消費消息,建議是多開個幾個線程并發(fā)消費消息。
        while (True) {
            try {
                // 長輪詢消費消息。
                // 長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
                $messages = $this->consumer->consumeMessage(
                    3, //  一次最多消費3條(最多可設(shè)置為16條)。
                    3 // 長輪詢時間3秒(最多可設(shè)置為30秒)。
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // 沒有消息可以消費,接著輪詢。
                    printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // 處理業(yè)務(wù)邏輯。
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // $message->getNextConsumeTime() 前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
            // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    // 某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

?>                                               

Python

#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

#初始化client
mq_client = MQClient(
    #設(shè)置HTTP接入域名。
    "",
    #AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
    "${ACCESS_KEY}",
    #AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
    "${SECRET_KEY}"
  )
#所屬的Topic。
topic_name = "${TOPIC}"
#您在控制臺創(chuàng)建的Group ID。
group_id = "GID_test"
#Topic所屬實例ID,默認(rèn)實例為空None。
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

#長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
#長輪詢時間3秒(最多可設(shè)置為30秒)。
wait_seconds = 3
#一次最多消費3條(最多可設(shè)置為16條)。
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        #長輪詢消費消息。
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

        print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

    #msg.next_consume_time前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
    #消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        #某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])

Node.js

const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// 設(shè)置HTTP接入域名。
const endpoint = "${HTTP_ENDPOINT}";
// AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
const accessKeyId = "${ACCESS_KEY}";
// AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// 所屬的Topic。
const topic = "${TOPIC}";
// 您在控制臺創(chuàng)建的Group ID(Consumer ID)。
const groupId = "${GROUP_ID}";
// Topic所屬實例ID,默認(rèn)實例為空。
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // 循環(huán)消費消息。
  while(true) {
    try {
      // 長輪詢消費消息。
      // 長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
      res = await consumer.consumeMessage(
          3, //  一次最多消費3條(最多可設(shè)置為16條)。
          3 // 長輪詢時間3秒(最多可設(shè)置為30秒)。
          );

      if (res.code == 200) {
        // 消費消息,處理業(yè)務(wù)邏輯。
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + 
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // message.NextConsumeTime前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
        // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          // 某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          // 消息確認(rèn)消費成功。
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // 沒有消息,則繼續(xù)長輪詢服務(wù)器。
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();                                   

C++

#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // 設(shè)置HTTP接入域名。
            "${HTTP_ENDPOINT}",
            // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${ACCESS_KEY}",
            // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
            "${SECRET_KEY}"
            );

    // 所屬的Topic。
    string topic = "${TOPIC}";
    // 您在控制臺創(chuàng)建的Group ID(Consumer ID)。
    string groupId = "${GROUP_ID}";
    // Topic所屬實例ID,默認(rèn)實例為空。
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // 長輪詢消費消息。
            // 長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
            consumer->consumeMessage(
                    3,// 一次最多消費3條(最多可設(shè)置為16條)。
                    3,//長輪詢時間3秒(最多可設(shè)置為30秒) 。
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // 處理消息。
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes() 
                    << " Properties: " << iter->getPropertiesAsString() 
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // 確認(rèn)消息消費成功。
            // Message.NextConsumeTime前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
            // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                // 某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}                                          

C#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
	public class ConsumerSample
    {
        // 設(shè)置HTTP接入域名。
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // AccessKey ID,阿里云身份驗證標(biāo)識。獲取方式,請參見本文前提條件中的獲取AccessKey。
        private const string _accessKeyId = "${ACCESS_KEY}";
        // AccessKey Secret,阿里云身份驗證密鑰。獲取方式,請參見本文前提條件中的獲取AccessKey。
        private const string _secretAccessKey = "${SECRET_KEY}";
        // 所屬的Topic。
        private const string _topicName = "${TOPIC}";
        // Topic所屬實例ID,默認(rèn)實例為空。
        private const string _instanceId = "${INSTANCE_ID}";
        // 您在控制臺創(chuàng)建的Group ID(Consumer ID)。
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // 在當(dāng)前線程循環(huán)消費消息,建議是多開個幾個線程并發(fā)消費消息。
            while (true)
            {
                try
                {
                    // 長輪詢消費消息。
                    // 長輪詢表示如果Topic沒有消息則請求會在服務(wù)端掛住3s,3s內(nèi)如果有消息可以消費則立即返回。
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, //  一次最多消費3條(最多可設(shè)置為16條)。
                            3 // 長輪詢時間3秒(最多可設(shè)置為30秒)。
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // 處理業(yè)務(wù)邏輯。
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // Message.nextConsumeTime前若不確認(rèn)消息消費成功,則消息會重復(fù)消費。
                    // 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // 某些消息的句柄可能超時了會導(dǎo)致確認(rèn)不成功。
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}                                     

后續(xù)步驟

您可通過查詢消息及其軌跡的方式驗證消息是否消費成功。詳細(xì)信息,請參見消息查詢查詢消息軌跡。