日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

客戶端配置自動重連

更新時間:

由于服務端升級、服務端重啟、網絡抖動等原因,服務端和客戶端的網絡連接可能會斷開。本文介紹使用Java、Python、PHP語言在客戶端設置自動恢復的方法和示例代碼,避免斷連對您的業務造成影響。

觸發原因

  • Connection的I/O拋出異常。

  • Socket讀取操作超時。

  • 檢測到服務端心跳丟失。

恢復方法

Java

重要

4.0.0及以上版本Java客戶端默認開啟Connection和Topology自動恢復,您無需在代碼中設置。

在客戶端開啟Connection和Topology(Queue、Exchange、Binding、Consumer)自動恢復的方法如下:

  • factory.setAutomaticRecoveryEnabled(boolean):用于開啟或關閉Connection自動恢復。

  • factory.setNetworkRecoveryInterval(long):用于設置重試時間間隔。如果Connection自動恢復異常,設置了Connection自動恢復的客戶端將在一段固定時間間隔(默認為5秒)后重試。

  • factory.setTopologyRecoveryEnabled(boolean):用于開啟Topology自動恢復。Topology包括Queue、Exchange、Binding、Consumer。

Python

Pika是開源RabbitMQ官方推薦的Python客戶端庫。與Java客戶端提供的連接自動恢復功能不同,Pika庫本身不直接支持通過配置來實現自動的連接恢復。因此,要在Python中實現這一功能,您需要通過編寫回調函數來手動處理連接的恢復過程。

PHP

php-amqplib是一個PHP庫,用于與AMQP協議兼容的消息隊列(如RabbitMQ)進行高效的消息發布和消費。php-amqplib庫本身不直接支持通過配置來實現自動的連接恢復。因此,要在PHP中實現這一功能,您需要手動處理連接的恢復過程。

說明
  • 設置自動重連時,使用的php-amqplib庫版本應為3.6.1及以上版本。

  • 當客戶端連接AMQProxy且連接因空閑斷開時,下文連接自動重連的代碼不生效。因此,在消息收發頻率低的場景下(容易發生連接空閑斷開),建議直接連接RabbitMQ服務的接入點。

示例代碼

Java

開啟Connection和Topology自動恢復的客戶端示例代碼如下:

ConnectionFactory factory = new ConnectionFactory();
// 設置接入點,在云消息隊列 RabbitMQ 版控制臺實例詳情頁面獲取。
factory.setHost("xxx.xxx.aliyuncs.com");
// ${instanceId}為實例ID,從云消息隊列 RabbitMQ 版控制臺實例詳情頁面獲取。
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
// 設置Vhost名稱,請確保已在消息隊列AMQP版控制臺上創建。
factory.setVirtualHost("${VhostName}");
// 默認端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于網絡環境設置合理的超時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 開啟Connection自動恢復。
factory.setAutomaticRecoveryEnabled(true);
// 設置Connection重試時間間隔為10秒。
factory.setNetworkRecoveryInterval(10000);
// 開啟Topology自動恢復。
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection();                      

Python

Python Pika連接自動恢復的消費者客戶端示例代碼如下:

# -*- coding: utf-8 -*-

import logging
import time
import pika

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

class Consumer(object):

    def __init__(self, amqp_url, queue):
        self.should_reconnect = False

        self._connection = None
        self._channel = None
        self._closing = False
        self._url = amqp_url

        self._queue = queue

    def connect(self):
        '''
        創建connection,并設置以下回調:
        on_open_callback: 連接成功回調
        on_open_error_callback: 創建連接失敗回調
        on_close_callback: 連接關閉回調
        '''
        return pika.SelectConnection(
            parameters=pika.URLParameters(self._url),
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed)

    def on_connection_open(self, _unused_connection):
        '''
        連接成功回調
        創建channel,并設置回調:
        on_channel_open: channel創建成功回調
        '''
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_connection_open_error(self, _unused_connection, err):
        """
        創建連接失敗回調
        打印錯誤信息,并嘗試重新連接
        """
        LOGGER.error('Connection open failed: %s', err)
        self.reconnect()

    def on_connection_closed(self, _unused_connection, reason):
        """
        連接關閉回調
        分以下兩種情況:
        1. 正常關閉,直接退出
        2. 連接異常斷開,嘗試重新連接
        """
        self._channel = None
        if self._closing:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reconnect necessary: %s', reason)
            self.reconnect()

    def close_connection(self):
        """
        關閉連接
        """
        if self._connection.is_closing or self._connection.is_closed:
            LOGGER.info('Connection is closing or already closed')
        else:
            LOGGER.info('Closing connection')
            self._connection.close()

    def reconnect(self):
        """
        修改should_reconnect為True,并停止io_loop
        """
        self.should_reconnect = True
        self.stop()

    def on_channel_open(self, channel):
        """
        channel創建成功回調
        設置回調:
        on_channel_closed: channel關閉回調
        開始隊列消費
        """
        self._channel = channel
        self._channel.add_on_close_callback(self.on_channel_closed)
        self.start_consuming()

    def on_channel_closed(self, channel, reason):
        """
        channel關閉回調
        打印channel關閉信息,并關閉連接
        """
        LOGGER.warning('Channel %i was closed: %s', channel, reason)
        self.close_connection()

    def start_consuming(self):
        """
        開始隊列消費
        """
        LOGGER.info('start consuming...')
        self._channel.basic_consume(
            self._queue, self.on_message)

    def on_message(self, _unused_channel, basic_deliver, properties, body):
        """
        消費消息并上傳ack
        """
        LOGGER.info('Received message: %s', body.decode())
        # 處理業務邏輯
        self._channel.basic_ack(basic_deliver.delivery_tag)

    def run(self):
        """
        創建connection,并啟動io_loop
        """
        self._connection = self.connect()
        self._connection.ioloop.start()

    def stop(self):
        """
        停止io_loop
        """
        if not self._closing:
            self._closing = True
            self._connection.ioloop.stop()
            LOGGER.info('Stopped')


class AutoRecoveryConsumer(object):

    def __init__(self, amqp_url, queue):
        self._amqp_url = amqp_url
        self._queue = queue
        self._consumer = Consumer(self._amqp_url, queue)

    def run(self):
        """
        while True 循環,直到KeyboardInterrupt異常
        在run方法中,會啟動io_loop監聽隊列并處理消息,通過循環確保消費者持續運行并且能夠自動重連
        """
        while True:
            try:
                self._consumer.run()
            except KeyboardInterrupt:
                self._consumer.stop()
                break
            self._maybe_reconnect()

    def _maybe_reconnect(self):
        """
        判斷是否需要重連,每次重連間隔1s
        """
        if self._consumer.should_reconnect:
            self._consumer.stop()
            time.sleep(1)
            self._consumer = Consumer(self._amqp_url, self._queue)


def main():
    username = 'MjoxODgwNzcwODY5MD****'
    password = 'NDAxREVDQzI2MjA0OT****'
    host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com'
    port = 5672
    vhost = 'vhost_test'

    # amqp_url: amqp://<username>:<password>@<host>:<port>/<vhost>
    amqp_url = 'amqp://%s:%s@%s:%i/%s' % (username, password, host, port, vhost)
    consumer = AutoRecoveryConsumer(amqp_url, 'QueueTest')
    consumer.run()


if __name__ == '__main__':
    main()

PHP

PHP php-amqplib連接自動恢復的消費者客戶端示例代碼如下:

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;

const ONE_SECOND = 1;


/**
 * 創建連接
 */
function connect() {
    $host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com';
    $username = 'NDAxREVDQzI2MjA0OT****';
    $password = 'NDAxREVDQzI2MjA0OT****';
    $port = 5672;
    $vhost = 'vhost_test';
    return new AMQPStreamConnection($host, $port, $username, $password, $vhost);
}

/**
 * 清理連接
 */
function cleanup_connection($connection) {
    try {
        if($connection !== null) {
            $connection->close();
        }
    } catch (\ErrorException $e) {
    }
}

$connection = null;

while(true){
    try {
        $connection = connect();
        start_consuming($connection);
    } catch (AMQPConnectionClosedException $e) {
        echo $e->getMessage() . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    } catch(AMQPRuntimeException $e) {
        echo $e->getMessage() . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    } catch(\RuntimeException $e) {
        echo 'Runtime exception ' . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    } catch(\ErrorException $e) {
        echo 'Error exception ' . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    }
}

/**
 * 啟動消費
 * @param AMQPStreamConnection $connection
 */
function start_consuming($connection) {
    $queue = 'queueTest';
    $consumerTag = 'consumer';
    $channel = $connection->channel();
    $channel->queue_declare($queue, false, true, false, false);
    $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
    while ($channel->is_consuming()) {
        $channel->wait();
    }
}


/**
 * 處理消息
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    // 處理業務邏輯
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();
}

恢復限制

  • Connection斷開需要一定的時間檢測。要確保這段時間內發送的消息不丟失,需使用Publisher Confirms實現可靠發送。

  • Channel異常導致Connection斷開時,不會觸發Connection自動恢復。Channel異常通常為應用級別的問題,需要使用方自行處理。

  • Connection自動恢復不會使Channel也自動恢復。