Queue管理
本文介紹使用消息模型時(shí)如何進(jìn)行Queue管理。
獲取Queue實(shí)例
Queue是單個(gè)消息隊(duì)列的抽象概念,對應(yīng)TimelineStore下單個(gè)Identifier的所有消息。獲取Queue實(shí)例時(shí)通過TimelineStore的接口創(chuàng)建。
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", "group_1")
.build();
//單TimelineStore下單identifier對應(yīng)的消息隊(duì)列(Queue)。
TimelineQueue timelineQueue = timelineStore.createTimelineQueue(identifier);
Queue是單存儲庫下單Identifier對應(yīng)的消息隊(duì)列的管理實(shí)例,主要有同步寫、異步寫、批量寫、刪、同步改、異步改、單行讀、范圍讀等接口。
Store
同步存儲消息,兩個(gè)接口分別支持SequenceId的兩種實(shí)現(xiàn)方式自增列和手動設(shè)置,相關(guān)配置在TimelineSchema中。
timelineQueue.store(message); //自增列實(shí)現(xiàn)的SequenceId。
timelineQueue.store(sequenceId, message); //手動設(shè)置SequenceId。
StoreAsync
異步存儲消息,您可以自定義回調(diào),對成功或者失敗做自定義處理。接口返回Future<TimelineEntry>。
TimelineCallback callback = new TimelineCallback() {
@Override
public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
// do something when succeed.
}
@Override
public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
// do something when failed.
}
};
timelineQueue.storeAsync(message, callback); //自增列實(shí)現(xiàn)的SequenceId。
timelineQueue.storeAsync(sequenceId, message, callback); //手動設(shè)置SequenceId。
BatchStore
批量存儲消息,支持無回調(diào)和有回調(diào)兩種方式。您可以自定義回調(diào),對成功或者失敗做自定義處理。
timelineQueue.batchStore(message); //自增列實(shí)現(xiàn)的SequenceId。
timelineQueue.batchStore(sequenceId, message); //手動設(shè)置SequenceId。
timelineQueue.batchStore(message, callback); //自增列實(shí)現(xiàn)的SequenceId。
timelineQueue.batchStore(sequenceId, message, callback); //手動設(shè)置SequenceId。
Get
通過SequenceId讀取單行消息。當(dāng)消息不存在時(shí)不拋錯,返回null。
timelineQueue.get(sequenceId);
GetLatestTimelineEntry
讀取最新一條消息。當(dāng)消息不存在時(shí)不拋錯,返回null。
timelineQueue.getLatestTimelineEntry();
GetLatestSequenceId
獲取最新一條消息的SequenceId。當(dāng)消息不存在時(shí)不拋錯,返回0。
timelineQueue.getLatestSequenceId();
Update
通過SequenceId同步更新消息內(nèi)容。
TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine.");
//update message with new field
message.setField("text", "new value");
timelineQueue.update(sequenceId, message);
UpdateAsync
通過SequenceId異步更新消息。您可以自定義回調(diào),對成功或者失敗做自定義處理。接口返回Future<TimelineEntry>。
TimelineMessage oldMessage = new TimelineMessage().setField("text", "Timeline is fine.");
TimelineCallback callback = new TimelineCallback() {
@Override
public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
// do something when succeed.
}
@Override
public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
// do something when failed.
}
};
TimelineMessage newMessage = oldMessage;
newMessage.setField("text", "new value");
timelineQueue.updateAsync(sequenceId, newMessage, callback);
Delete
根據(jù)SequenceId刪除單行消息。
timelineQueue.delete(sequenceId);
Scan
根據(jù)Scan參數(shù)正序(或逆序)范圍讀取單個(gè)Queue下的消息,返回Iterator<TimelineEntry>,通過迭代器遍歷。
ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0);
timelineQueue.scan(scanParameter);