使用消息模型時,您需要完成Factory、MetaStore和TimelineStore的初始化。本文介紹如何完成初始化的配置和操作。
前提條件
已配置訪問密鑰。具體操作,請參考配置訪問密鑰。
已獲取表格存儲實例的Endpoint。具體操作,請參考獲取實例Endpoint。
初始化Factory
將SyncClient作為參數,初始化StoreFactory。通過Store工廠創建Meta數據和Timeline數據的管理Store。
實現錯誤重試需要依賴SyncClient的重試策略,您可以通過配置SyncClient實現重試。如果有特殊需求,可自定義策略(只需實現RetryStrategy接口)。
/**
* 重試策略配置。
* Code: configuration.setRetryStrategy(new DefaultRetryStrategy());
**/
ClientConfiguration configuration = new ClientConfiguration();
SyncClient client = new SyncClient(
"http://instanceName.cn-shanghai.ots.aliyuncs.com",
System.getenv("OTS_AK_ENV"),
System.getenv("OTS_SK_ENV"),
"instanceName", configuration);
TimelineStoreFactory serviceFactory = new TimelineStoreFactoryImpl(client);
初始化MetaStore
構建Meta表的Schema,包含Identifier、MetaIndex等參數,通過Store工廠創建并獲取Meta的管理Store,配置參數包含Meta表名、索引名、主鍵字段、索引類型等。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
IndexSchema metaIndex = new IndexSchema();
metaIndex.setFieldSchemas(Arrays.asList(//配置索引字段以及字段類型。
new FieldSchema("group_name", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("create_time", FieldType.LONG).setIndex(true)
));
TimelineMetaSchema metaSchema = new TimelineMetaSchema("groupMeta", idSchema)
.withIndex("metaIndex", metaIndex); //配置索引。
TimelineMetaStore timelineMetaStore = serviceFactory.createMetaStore(metaSchema);
建表
根據metaSchema的參數創建表。如果metaSchema中配置了索引,建表成功后會創建索引。
timelineMetaStore.prepareTables();
刪表
當表存在索引時,在刪除表前,系統會先刪除索引,再刪除Store相應表。
timelineMetaStore.dropAllTables();
初始化TimelineStore
構建timeline表的Schema配置,包含Identifier、TimelineIndex等參數,通過Store工廠創建并獲取Timeline的管理Store;配置參數包含Timeline表名、索引名、主鍵字段、索引類型等。
消息的批量寫入,基于Tablestore的DefaultTableStoreWriter提升并發,用戶可以根據自己需求設置線程池數目。
TimelineIdentifierSchema idSchema = new TimelineIdentifierSchema.Builder()
.addStringField("timeline_id").build();
IndexSchema timelineIndex = new IndexSchema();
timelineIndex.setFieldSchemas(Arrays.asList(//配置索引的字段以及字段類型。
new FieldSchema("text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
new FieldSchema("receivers", FieldType.KEYWORD).setIndex(true).setIsArray(true)
));
TimelineSchema timelineSchema = new TimelineSchema("timeline", idSchema)
.autoGenerateSeqId() //SequenceId 設置為自增列方式。
.setCallbackExecuteThreads(5) //設置Writer初始線程數為5。
.withIndex("metaIndex", timelineIndex); //設置索引。
TimelineStore timelineStore = serviceFactory.createTimelineStore(timelineSchema);
建表
根據TimelineSchema的參數創建表。如果TimelineSchema中配置了索引,建表成功后會創建索引。
timelineStore.prepareTables();
刪表
當表存在索引時,在刪除表前,系統會先刪除索引,再刪除Store相應的表。
timelineStore.dropAllTables();