日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

消息發(fā)布與訂閱

云數(shù)據(jù)庫 Tair(兼容 Redis)也提供了與Redis相同的消息發(fā)布(publish)與訂閱(subscribe)功能。即一個客戶端發(fā)布消息,其他多個客戶端訂閱消息。

場景介紹

云數(shù)據(jù)庫 Tair(兼容 Redis)發(fā)布的消息是“非持久”的,即消息發(fā)布者只負(fù)責(zé)發(fā)送消息,而不管消息是否有接收方,也不會保存之前發(fā)送的消息,即發(fā)布的消息“即發(fā)即失”;消息訂閱者也只能得到訂閱之后的消息,頻道(channel)中此前的消息將無從獲得。

此外,消息發(fā)布者(即publish客戶端)無需獨(dú)占與服務(wù)器端的連接,您可以在發(fā)布消息的同時,使用同一個客戶端連接進(jìn)行其他操作(例如List操作等)。但是,消息訂閱者(即subscribe客戶端)需要獨(dú)占與服務(wù)器端的連接,即進(jìn)行 subscribe 期間,該客戶端無法執(zhí)行其他操作,而是以阻塞的方式等待頻道(channel)中的消息;因此消息訂閱者需要使用單獨(dú)的服務(wù)器連接,或者需要在單獨(dú)的線程中使用(參見如下示例)。

代碼示例

消息發(fā)布者 (即publish client)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
    private Jedis jedis;
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //KVStore的實(shí)例密碼
        String authString = jedis.auth(password);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println("  >>> 發(fā)布(PUBLISH) > Channel:"+channel+" > 發(fā)送出的Message:"+message);
        jedis.publish(channel, message);
    }
    public void close(String channel){
        System.out.println("  >>> 發(fā)布(PUBLISH)結(jié)束 > Channel:"+channel+" > Message:quit");
        //消息發(fā)布者結(jié)束發(fā)送,即發(fā)送一個“quit”消息;
        jedis.publish(channel, "quit");
    }
}

消息訂閱者 (即subscribe client)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
                //ApsaraDB for Redis的實(shí)例密碼
                String authString = jedis.auth(password);//password
                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> 訂閱(SUBSCRIBE) > Channel:"+channel);
        System.out.println();
        //接收者在偵聽訂閱的消息時,將會阻塞進(jìn)程,直至接收到quit消息(被動方式),或主動取消訂閱
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println("  >>> 取消訂閱(UNSUBSCRIBE) > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("----------訂閱消息SUBSCRIBE 開始-------");
            subscribe();
            System.out.println("----------訂閱消息SUBSCRIBE 結(jié)束-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

消息監(jiān)聽者

package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< 訂閱(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
        System.out.println();
        //當(dāng)接收到的message為quit時,取消訂閱(被動方式)
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}

示例主程序

package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
    //ApsaraDB for Redis的連接信息,從控制臺可以獲得
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="password";//password
    public static void main(String[] args) throws Exception{
            KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
            final String channel = "KVStore頻道-A";
            //消息發(fā)送者開始發(fā)消息,此時還無人訂閱,所以此消息不會被接收
            pubClient.pub(channel, "Aliyun消息1:(此時還無人訂閱,所以此消息不會被接收)");
            //消息接收者
            KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
            JedisPubSub listener = new KVStoreMessageListener();
            subClient.setChannelAndListener(listener, channel);
            //消息接收者開始訂閱
            subClient.start();
            //消息發(fā)送者繼續(xù)發(fā)消息
            for (int i = 0; i < 5; i++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }
            //消息接收者主動取消訂閱
            subClient.unsubscribe(channel);
            Thread.sleep(1000);
            pubClient.pub(channel, "Aliyun消息2:(此時訂閱取消,所以此消息不會被接收)");
            //消息發(fā)布者結(jié)束發(fā)送,即發(fā)送一個“quit”消息;
            //此時如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”時,將執(zhí)行“unsubscribe”操作。
            pubClient.close(channel);
        }
    }

運(yùn)行結(jié)果

在輸入了正確的云數(shù)據(jù)庫 Tair(兼容 Redis)實(shí)例訪問地址和密碼之后,運(yùn)行以上Java程序,輸出結(jié)果如下。

  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:Aliyun消息1:(此時還無人訂閱,所以此消息不會被接收)
----------訂閱消息SUBSCRIBE 開始-------
  >>> 訂閱(SUBSCRIBE) > Channel:KVStore頻道-A
  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
  <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
  <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
  <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  <<< 訂閱(SUBSCRIBE)< Channel:KVStore頻道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  >>> 取消訂閱(UNSUBSCRIBE) > Channel:KVStore頻道-A
----------訂閱消息SUBSCRIBE 結(jié)束-------
  >>> 發(fā)布(PUBLISH) > Channel:KVStore頻道-A > 發(fā)送出的Message:Aliyun消息2:(此時訂閱取消,所以此消息不會被接收)
  >>> 發(fā)布(PUBLISH)結(jié)束 > Channel:KVStore頻道-A > Message:quit

以上示例中僅演示了一個發(fā)布者與一個訂閱者的情況,實(shí)際上發(fā)布者與訂閱者都可以為多個,發(fā)送消息的頻道(channel)也可以是多個,對以上代碼稍作修改即可。

視頻介紹

您可以觀看以下視頻了解Redis發(fā)布訂閱(Pub/Sub)功能的實(shí)現(xiàn)、相關(guān)接口、以及應(yīng)用場景等信息,視頻時長約14分鐘。