實例限流最佳實踐
云消息隊列 RabbitMQ 版會對單實例的TPS流量峰值進行限流,本文介紹云消息隊列 RabbitMQ 版實例的限流規則、限流后的行為以及限流最佳實踐等。
限流后行為
當云消息隊列 RabbitMQ 版實例的TPS流量峰值超過您所購買實例的TPS規格上限時,云消息隊列 RabbitMQ 版實例會被限流。
限流后的行為如下:
云消息隊列 RabbitMQ 版服務端會返回錯誤碼信息。具體請參見錯誤碼說明。
云消息隊列 RabbitMQ 版服務端關閉當前請求的Channel。代碼中可以捕獲異常重新開啟Channel。
限流Java示例代碼如下:
private static final int MAX_RETRIES = 5; // 最大重試次數
private static final long WAIT_TIME_MS = 2000; // 每次重試的等待時間(以毫秒為單位)
private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
try {
// ......
// 在當前通道channel下執行的任何操作
// 例如消息發送、消費等
// ......
} catch (AlreadyClosedException e) {
String message = e.getMessage();
if (isChannelClosed(message)) {
// 如果通道已經關閉,關閉并重新創建通道
channel = createChannelWithRetry(connection);
// 在重連后可以繼續執行其它操作
// ......
} else {
throw e;
}
}
}
private Channel createChannelWithRetry(Connection connection) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
return connection.createChannel();
} catch (Exception e) {
System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
// 檢查錯誤, 若仍是被限流導致的關閉錯誤,則可以等待后繼續重試
// 也可移除本部分重試邏輯
if (attempt < MAX_RETRIES) {
try {
Thread.sleep(WAIT_TIME_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // 還原中斷狀態
}
} else {
throw new RuntimeException("Exceeded maximum retries to create channel", e);
}
}
}
throw new RuntimeException("This line should never be reached"); // 理論上不會到達這里
}
private boolean isChannelClosed(String errorMsg) {
// 判斷是否包含channel.close報錯,該報錯代表通道已關閉。
// 可能涵蓋530,541等錯誤信息。
if (errorMsg != null && errorMsg.contains("channel.close")) {
System.out.println("[ChannelClosed] Error details: " + errorMsg);
return true;
}
return false;
}
錯誤碼信息:
錯誤碼:reply-code=530
錯誤信息:reply-text=denied for too many requests
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>
(reply-code=530, reply-text=denied for too many requests, ReqId:5FB4C999314635F952FCBFF6, ErrorHelp[dstQueue=XXX_test_queue,
srcExchange=Producer.ExchangeName,bindingKey=XXX_test_bk, http://mrw.so/6rNqO8], class-id=50, method-id=20)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
at java.lang.Thread.run(Thread.java:748)
實例秒級TPS峰值查詢
通過查詢實例實際使用的秒級TPS峰值,您可以了解業務的流量波動情況和流量峰值,判斷實例規格是否滿足業務需求。
云消息隊列 RabbitMQ 版提供以下三種方式查詢實例的秒級TPS峰值:
查詢方式 | 說明 | 查詢時間級別 | 查詢資源級別 |
優勢:
| 分鐘級TPS峰值 取值為1分鐘周期內,每秒鐘實例TPS的最大值。 | 實例級別TPS峰值 | |
(推薦)通過實例詳情查詢實例TPS峰值 |
| 秒級TPS峰值 |
|
| 秒級TPS峰值 | 實例級別TPS峰值 |
實例TPS計算規則
以下接口調用時,會被計算進TPS流量中,即調用一次接口,計算為一次TPS。
ConnectionOpen、ChannelOpen
QueueDeclare、QueueDelete、QueueBind、QueueUnbind
ExchangeDeclare、ExchangeDelete
ExchangeBind、ExchangeUnBind
SendMessage、BasicConsume、BasicGet、BasicAck、BasicReject、BasicNack、BasicRecover
延時消息是云消息隊列 RabbitMQ 版的高級特性消息,發送延時消息時,調用API接口的次數需要在普通消息的基礎上乘以5倍,消費延時消息時與普通消息次數相同。
示例:1秒內發送2條延時消息,消費3條延時消息。則此時API調用TPS為:2×5+3=13次/秒。
統計SendMessage接口的調用次數時,實際計算值為消息經過路由后要存儲到的Queue的數量。
例如,發送1條到Fanout類型Exchange的消息,最后要保存到10個Queue中,則SendMessage調用次數計算為10次。