日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

事務(wù)消息

更新時(shí)間:

事務(wù)消息為云消息隊(duì)列 RocketMQ 版中的高級(jí)特性消息,本文為您介紹事務(wù)消息的應(yīng)用場景、功能原理、使用限制、使用方法和使用建議。

應(yīng)用場景

分布式事務(wù)的訴求

分布式系統(tǒng)調(diào)用的特點(diǎn)為一個(gè)核心業(yè)務(wù)邏輯的執(zhí)行,同時(shí)需要調(diào)用多個(gè)下游業(yè)務(wù)進(jìn)行處理。因此,如何保證核心業(yè)務(wù)和多個(gè)下游業(yè)務(wù)的執(zhí)行結(jié)果完全一致,是分布式事務(wù)需要解決的主要問題。

事務(wù)消息訴求

以電商交易場景為例,用戶支付訂單這一核心操作的同時(shí)會(huì)涉及到下游物流發(fā)貨、積分變更、購物車狀態(tài)清空等多個(gè)子系統(tǒng)的變更。當(dāng)前業(yè)務(wù)的處理分支包括:

  • 主分支訂單系統(tǒng)狀態(tài)更新:由未支付變更為支付成功。

  • 物流系統(tǒng)狀態(tài)新增:新增待發(fā)貨物流記錄,創(chuàng)建訂單物流記錄。

  • 積分系統(tǒng)狀態(tài)變更:變更用戶積分,更新用戶積分表。

  • 購物車系統(tǒng)狀態(tài)變更:清空購物車,更新用戶購物車記錄。

傳統(tǒng)XA事務(wù)方案:性能不足

為了保證上述四個(gè)分支的執(zhí)行結(jié)果一致性,典型方案是基于XA協(xié)議的分布式事務(wù)系統(tǒng)來實(shí)現(xiàn)。將四個(gè)調(diào)用分支封裝成包含四個(gè)獨(dú)立事務(wù)分支的大事務(wù)。基于XA分布式事務(wù)的方案可以滿足業(yè)務(wù)處理結(jié)果的正確性,但最大的缺點(diǎn)是多分支環(huán)境下資源鎖定范圍大,并發(fā)度低,隨著下游分支的增加,系統(tǒng)性能會(huì)越來越差。

基于普通消息方案:一致性保障困難

將上述基于XA事務(wù)的方案進(jìn)行簡化,將訂單系統(tǒng)變更作為本地事務(wù),剩下的系統(tǒng)變更作為普通消息的下游來執(zhí)行,事務(wù)分支簡化成普通消息+訂單表事務(wù),充分利用消息異步化的能力縮短鏈路,提高并發(fā)度。

普通消息方案

該方案中消息下游分支和訂單系統(tǒng)變更的主分支很容易出現(xiàn)不一致的現(xiàn)象,例如:

  • 消息發(fā)送成功,訂單沒有執(zhí)行成功,需要回滾整個(gè)事務(wù)。

  • 訂單執(zhí)行成功,消息沒有發(fā)送成功,需要額外補(bǔ)償才能發(fā)現(xiàn)不一致。

  • 消息發(fā)送超時(shí)未知,此時(shí)無法判斷需要回滾訂單還是提交訂單變更。

基于云消息隊(duì)列 RocketMQ 版分布式事務(wù)消息:支持最終一致性

上述普通消息方案中,普通消息和訂單事務(wù)無法保證一致的原因,本質(zhì)上是由于普通消息無法像單機(jī)數(shù)據(jù)庫事務(wù)一樣,具備提交、回滾和統(tǒng)一協(xié)調(diào)的能力。

而基于云消息隊(duì)列 RocketMQ 版實(shí)現(xiàn)的分布式事務(wù)消息功能,在普通消息基礎(chǔ)上,支持二階段的提交能力。將二階段提交和本地事務(wù)綁定,實(shí)現(xiàn)全局提交結(jié)果的一致性。

事務(wù)消息

云消息隊(duì)列 RocketMQ 版事務(wù)消息的方案,具備高性能、可擴(kuò)展、業(yè)務(wù)開發(fā)簡單的優(yōu)勢。具體事務(wù)消息的原理和流程,請參見下文的功能原理

功能原理

什么是事務(wù)消息

事務(wù)消息是云消息隊(duì)列 RocketMQ 版提供的一種高級(jí)消息類型,支持在分布式場景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。

事務(wù)消息處理流程

事務(wù)消息交互流程如下圖所示。事務(wù)消息

  1. 生產(chǎn)者將消息發(fā)送至云消息隊(duì)列 RocketMQ 版服務(wù)端。

  2. 云消息隊(duì)列 RocketMQ 版服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息被標(biāo)記為“暫不能投遞”,這種狀態(tài)下的消息即為半事務(wù)消息。

  3. 生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。

  4. 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback),服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下:

    • 二次確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者。

    • 二次確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會(huì)將半事務(wù)消息投遞給消費(fèi)者。

  5. 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過固定時(shí)間后,服務(wù)端將對消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。

    說明

    服務(wù)端回查的間隔時(shí)間和最大回查次數(shù),請參見參數(shù)限制

  6. 生產(chǎn)者收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。

  7. 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對半事務(wù)消息進(jìn)行處理。

事務(wù)消息生命周期

事務(wù)消息

  • 初始化

    半事務(wù)消息被生產(chǎn)者構(gòu)建并完成初始化,待發(fā)送到服務(wù)端的狀態(tài)。

  • 事務(wù)待提交

    半事務(wù)消息被發(fā)送到服務(wù)端,和普通消息不同,并不會(huì)直接被服務(wù)端持久化,而是會(huì)被單獨(dú)存儲(chǔ)到事務(wù)存儲(chǔ)系統(tǒng)中,等待第二階段本地事務(wù)返回執(zhí)行結(jié)果后再提交。此時(shí)消息對下游消費(fèi)者不可見。

  • 消息回滾

    第二階段如果事務(wù)執(zhí)行結(jié)果明確為回滾,服務(wù)端會(huì)將半事務(wù)消息回滾,該事務(wù)消息流程終止。

  • 提交待消費(fèi)

    第二階段如果事務(wù)執(zhí)行結(jié)果明確為提交,服務(wù)端會(huì)將半事務(wù)消息重新存儲(chǔ)到普通存儲(chǔ)系統(tǒng)中,此時(shí)消息對下游消費(fèi)者可見,等待被消費(fèi)者獲取并消費(fèi)。

  • 消費(fèi)中

    消息被消費(fèi)者獲取,并按照消費(fèi)者本地的業(yè)務(wù)邏輯進(jìn)行處理的過程。

    此時(shí)服務(wù)端會(huì)等待消費(fèi)者完成消費(fèi)并提交消費(fèi)結(jié)果,如果一定時(shí)間后沒有收到消費(fèi)者的響應(yīng),云消息隊(duì)列 RocketMQ 版會(huì)對消息進(jìn)行重試處理。具體信息,請參見消費(fèi)重試

  • 消費(fèi)提交

    消費(fèi)者完成消費(fèi)處理,并向服務(wù)端提交消費(fèi)結(jié)果,服務(wù)端標(biāo)記當(dāng)前消息已經(jīng)被處理(包括消費(fèi)成功和失敗)。

    云消息隊(duì)列 RocketMQ 版默認(rèn)支持保留所有消息,此時(shí)消息數(shù)據(jù)并不會(huì)立即被刪除,只是邏輯標(biāo)記已消費(fèi)。消息在保存時(shí)間到期或存儲(chǔ)空間不足被刪除前,消費(fèi)者仍然可以回溯消息重新消費(fèi)。

  • 消息刪除

    云消息隊(duì)列 RocketMQ 版按照消息保存機(jī)制滾動(dòng)清理最早的消息數(shù)據(jù),將消息從物理文件中刪除。更多信息,請參見消息存儲(chǔ)和清理機(jī)制

使用限制

消息類型一致性

事務(wù)消息僅支持在MessageTypeTransaction的主題內(nèi)使用,即事務(wù)消息只能發(fā)送至類型為事務(wù)消息的主題中,發(fā)送的消息的類型必須和主題的類型一致。

消費(fèi)事務(wù)性

云消息隊(duì)列 RocketMQ 版事務(wù)消息保證本地主分支事務(wù)和下游消息發(fā)送事務(wù)的一致性,但不保證消息消費(fèi)結(jié)果和上游事務(wù)的一致性。因此需要下游業(yè)務(wù)分支自行保證消息正確處理,建議消費(fèi)端做好消費(fèi)重試,如果有短暫失敗可以利用重試機(jī)制保證最終處理成功。

中間狀態(tài)可見性

云消息隊(duì)列 RocketMQ 版事務(wù)消息為最終一致性,即在消息提交到下游消費(fèi)端處理完成之前,下游分支和上游事務(wù)之間的狀態(tài)會(huì)不一致。因此,事務(wù)消息僅適合接受異步執(zhí)行的事務(wù)場景。

事務(wù)超時(shí)機(jī)制

云消息隊(duì)列 RocketMQ 版事務(wù)消息的生命周期存在超時(shí)機(jī)制,即半事務(wù)消息被生產(chǎn)者發(fā)送服務(wù)端后,如果在指定時(shí)間內(nèi)服務(wù)端無法確認(rèn)提交或者回滾狀態(tài),則消息默認(rèn)會(huì)被回滾。事務(wù)超時(shí)時(shí)間,請參見參數(shù)限制

不支持多個(gè)sendReceipt

事務(wù)消息在一個(gè)事務(wù)中僅允許一個(gè)sendReceipt,不支持多個(gè)sendReceipt。

使用示例

事務(wù)消息相比普通消息發(fā)送時(shí)需要修改以下幾點(diǎn):

  • 發(fā)送事務(wù)消息前,需要開啟事務(wù)并關(guān)聯(lián)本地的事務(wù)執(zhí)行。

  • 為保證事務(wù)一致性,在構(gòu)建生產(chǎn)者時(shí),必須設(shè)置事務(wù)檢查器和預(yù)綁定事務(wù)消息發(fā)送的主題列表,客戶端內(nèi)置的事務(wù)檢查器會(huì)對綁定的事務(wù)主題做異常狀態(tài)恢復(fù)。

以Java語言為例,使用事務(wù)消息示例參考如下:

完整的消息收發(fā)示例代碼請參見RocketMQ 5.x系列SDK(推薦)

示例代碼

import java.time.Duration;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.shaded.com.google.common.base.Strings;

public class ProducerTransactionMessageExample {
    /**
     * 演示demo,模擬訂單表查詢服務(wù),用來確認(rèn)訂單事務(wù)是否提交成功。
     */
    private static boolean checkOrderById(String orderId) {
        return true;
    }

    /**
     * 演示demo,模擬本地事務(wù)的執(zhí)行結(jié)果。
     */
    private static boolean doLocalTransaction() {
        return true;
    }

    public static void main(String[] args) throws ClientException {
        /**
         * 實(shí)例接入點(diǎn),從控制臺(tái)實(shí)例詳情頁的接入點(diǎn)頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,建議填寫VPC接入點(diǎn)。
         * 如果是在本地公網(wǎng)訪問,或者是線下IDC環(huán)境訪問,可以使用公網(wǎng)接入點(diǎn)。使用公網(wǎng)接入點(diǎn)訪問,必須開啟實(shí)例的公網(wǎng)訪問功能。
         */
        String endpoints = "xxx-hangzhou.rmq.aliyuncs.com:8080";
        //消息發(fā)送的目標(biāo)Topic名稱,需要提前在控制臺(tái)創(chuàng)建,如果不創(chuàng)建直接使用會(huì)返回報(bào)錯(cuò)。
        String topic = "topic1";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網(wǎng)接入點(diǎn)訪問,configuration還需要設(shè)置實(shí)例的用戶名和密碼。實(shí)例用戶名和密碼在控制臺(tái)訪問控制的智能身份識(shí)別頁簽中獲取。
         * 如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置,服務(wù)端會(huì)根據(jù)內(nèi)網(wǎng)VPC信息智能獲取。
         * 如果實(shí)例類型為Serverlesss實(shí)例,則不管是公網(wǎng)訪問還是內(nèi)網(wǎng)訪問都必須設(shè)置實(shí)例的用戶名密碼。
         */
        builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        builder.setRequestTimeout(Duration.ofMillis(5000));
        ClientConfiguration configuration = builder.build();
        
        //構(gòu)造事務(wù)生產(chǎn)者:事務(wù)消息需要生產(chǎn)者構(gòu)建一個(gè)事務(wù)檢查器,用于檢查確認(rèn)異常半事務(wù)的中間狀態(tài)。
        Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * 事務(wù)檢查器一般是根據(jù)業(yè)務(wù)的ID去檢查本地事務(wù)是否正確提交還是回滾,此處以訂單ID屬性為例。
                 * 在訂單表找到了這個(gè)訂單,說明本地事務(wù)插入訂單的操作已經(jīng)正確提交;如果訂單表沒有訂單,說明本地事務(wù)已經(jīng)回滾。
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 錯(cuò)誤的消息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            }).setTopics(topic)
            .setClientConfiguration(configuration)
            .build();
        //開啟事務(wù)分支。
        final Transaction transaction;
        try {
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
            e.printStackTrace();
            //事務(wù)分支開啟失敗,直接退出。
            return;
        }
        Message message = provider.newMessageBuilder()
            .setTopic(topic)
            //設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。
            .setKeys("messageKey1")
            //設(shè)置消息Tag,用于消費(fèi)端根據(jù)指定Tag過濾消息。
            .setTag("messageTag")
            //一般事務(wù)消息都會(huì)設(shè)置一個(gè)本地事務(wù)關(guān)聯(lián)的唯一ID,用來做本地事務(wù)回查的校驗(yàn)。
            .addProperty("OrderId", "xxx")
            //消息體。
            .setBody("messageBody".getBytes())
            .build();
        //發(fā)送半事務(wù)消息
        final SendReceipt sendReceipt;
        try {
            sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
            //半事務(wù)消息發(fā)送失敗,事務(wù)可以直接退出并回滾。
            return;
        }
        /**
         * 執(zhí)行本地事務(wù),并確定本地事務(wù)結(jié)果。
         * 1. 如果本地事務(wù)提交成功,則提交消息事務(wù)。
         * 2. 如果本地事務(wù)提交失敗,則回滾消息事務(wù)。
         * 3. 如果本地事務(wù)未知異常,則不處理,等待事務(wù)消息回查。
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
            try {
                transaction.commit();
            } catch (ClientException e) {
                // 業(yè)務(wù)可以自身對實(shí)時(shí)性的要求選擇是否重試,如果放棄重試,可以依賴事務(wù)消息回查機(jī)制進(jìn)行事務(wù)狀態(tài)的提交。
                e.printStackTrace();
            }
        } else {
            try {
                transaction.rollback();
            } catch (ClientException e) {
                // 建議記錄異常信息,回滾異常時(shí)可以無需重試,依賴事務(wù)消息回查機(jī)制進(jìn)行事務(wù)狀態(tài)的提交。
                e.printStackTrace();
            }
        }
    }
}

使用建議

避免大量未決事務(wù)導(dǎo)致超時(shí)

云消息隊(duì)列 RocketMQ 版支持在事務(wù)提交階段異常的情況下發(fā)起事務(wù)回查,保證事務(wù)一致性。但生產(chǎn)者應(yīng)該盡量避免本地事務(wù)返回未知結(jié)果。大量的事務(wù)檢查會(huì)導(dǎo)致系統(tǒng)性能受損,容易導(dǎo)致事務(wù)處理延遲。

正確處理“進(jìn)行中”的事務(wù)

消息回查時(shí),對于正在進(jìn)行中的事務(wù)不要返回Rollback或Commit結(jié)果,應(yīng)繼續(xù)保持Unknown的狀態(tài)。

一般出現(xiàn)消息回查時(shí)事務(wù)正在處理的原因?yàn)椋菏聞?wù)執(zhí)行較慢,消息回查太快。解決方案如下:

  • 將第一次事務(wù)回查時(shí)間設(shè)置較大一些,但可能導(dǎo)致依賴回查的事務(wù)提交延遲較大。

  • 程序能正確識(shí)別正在進(jìn)行中的事務(wù)。