日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Go SDK接入示例

本文介紹使用AMQP協議的Go客戶端接入阿里云物聯網平臺,接收服務端訂閱消息的示例。

前提條件

已獲取消費組ID,并訂閱Topic消息。

準備開發環境

本示例的測試環境為Go 1.12.7。

下載SDK

可使用以下命令導入Go語言AMQP SDK。

import "pack.ag/amqp"

SDK使用說明,請參見package amqp

代碼示例

package main

import (
	"os"
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)
//參數說明,請參見AMQP客戶端接入說明文檔。
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
//iotInstanceId:實例ID。
const iotInstanceId = "${YourIotInstanceId}"
//接入域名,請參見AMQP客戶端接入說明文檔。
const host = "${YourHost}"

func main() {
	//工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性。以下代碼示例使用環境變量獲取 AccessKey 的方式進行調用,僅供參考
	accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
	accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    address := "amqps://" + host + ":5671"
    timestamp := time.Now().Nanosecond() / 1000000
    //userName組裝方法,請參見AMQP客戶端接入說明文檔。
    userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|", 
        clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    //計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    amqpManager := &AmqpManager{
        address:address,
        userName:userName,
        password:password,
    }

    //如果需要做接受消息通信或者取消操作,從Background衍生context。
    ctx := context.Background()

    amqpManager.startReceiveMessage(ctx)
}

//業務函數。用戶自定義實現,該函數被異步執行,請考慮系統資源消耗情況。
func (am *AmqpManager) processMessage(message *amqp.Message) {
    fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}

type AmqpManager struct {
    address     string
    userName     string
    password     string
    client       *amqp.Client
    session     *amqp.Session
    receiver     *amqp.Receiver
}

func (am *AmqpManager) startReceiveMessage(ctx context.Context)  {

    childCtx, _ := context.WithCancel(ctx)
    err := am.generateReceiverWithRetry(childCtx)
    if nil != err {
        return
    }
    defer func() {
        am.receiver.Close(childCtx)
        am.session.Close(childCtx)
        am.client.Close()
    }()

    for {
        //阻塞接受消息,如果ctx是background則不會被打斷。
        message, err := am.receiver.Receive(ctx)

        if nil == err {
            go am.processMessage(message)
            message.Accept()
        } else {
            fmt.Println("amqp receive data error:", err)

            //如果是主動取消,則退出程序。
            select {
            case <- childCtx.Done(): return
            default:
            }

            //非主動取消,則重新建立連接。
            err := am.generateReceiverWithRetry(childCtx)
            if nil != err {
                return
            }
        }
    }
}

func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
    //退避重連,從10ms依次x2,直到20s。
    duration := 10 * time.Millisecond
    maxDuration := 20000 * time.Millisecond
    times := 1

    //異常情況,退避重連。
    for {
        select {
        case <- ctx.Done(): return amqp.ErrConnClosed
        default:
        }

        err := am.generateReceiver()
        if nil != err {
            time.Sleep(duration)
            if duration < maxDuration {
                duration *= 2
            }
            fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
            times ++
        } else {
            fmt.Println("amqp connect init success")
            return nil
        }
    }
}

//由于包不可見,無法判斷Connection和Session狀態,重啟連接獲取。
func (am *AmqpManager) generateReceiver() error {

    if am.session != nil {
        receiver, err := am.session.NewReceiver(
            amqp.LinkSourceAddress("/queue-name"),
            amqp.LinkCredit(20),
        )
        //如果斷網等行為發生,Connection會關閉導致Session建立失敗,未關閉連接則建立成功。
        if err == nil {
            am.receiver = receiver
            return nil
        }
    }

    //清理上一個連接。
    if am.client != nil {
        am.client.Close()
    }

    client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
    if err != nil {
        return err
    }
    am.client = client

    session, err := client.NewSession()
    if err != nil {
        return err
    }
    am.session = session

    receiver, err := am.session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if err != nil {
        return err
    }
    am.receiver = receiver

    return nil
}

您需按照如下表格中的參數說明,修改代碼中的參數值。更多參數說明,請參見AMQP客戶端接入說明

重要

請確保參數值輸入正確,否則AMQP客戶端接入會失敗。

參數

說明

accessKey

登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。

說明

如果使用RAM用戶,您需授予該RAM用戶管理物聯網平臺的權限(AliyunIOTFullAccess),否則將連接失敗。授權方法請參見RAM用戶訪問

accessSecret

consumerGroupId

當前物聯網平臺對應實例中的消費組ID。

登錄物聯網平臺控制臺,在對應實例的消息轉發 > 服務端訂閱 > 消費組列表查看您的消費組ID。

iotInstanceId

實例ID。您可在物聯網平臺控制臺實例概覽頁面,查看當前實例的ID。

  • 若有ID值,必須傳入該ID值。

  • 若無實例概覽頁面或ID值,傳入空值,即iotInstanceId = ""

clientId

表示客戶端ID,需您自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。

AMQP客戶端接入并啟動成功后,登錄物聯網平臺控制臺,在對應實例的消息轉發 > 服務端訂閱 > 消費組列表頁簽,單擊消費組對應的查看消費組詳情頁面將顯示該參數,方便您識別區分不同的客戶端。

host

AMQP接入域名。

${YourHost}對應的AMQP接入域名信息,請參見管理實例終端節點

運行結果示例

  • 成功:返回類似如下日志信息,表示AMQP客戶端已接入物聯網平臺并成功接收消息。成功

  • 失敗:返回類似如下日志信息,表示AMQP客戶端連接物聯網平臺失敗。

    您可根據日志提示,檢查代碼或網絡環境,然后修正問題,重新運行代碼。

    失敗

相關文檔

服務端訂閱消息相關錯誤碼,請參見消息相關錯誤碼