rocketmq的事务消息

一个业务操作,涉及的数据库操作在两个库里,就会涉及到分布式事务。
举那个常用的转账例子。
A要转钱给B,A账户和B账户在不同的DB存储(不同的银行系统)。
如何保证操作的原子性昵?

方案一:

容易想到的方案,把A扣钱操作,加上网络请求(给B账号增钱)放到一个事务去处理。
这个方案的带来的问题是:
(1)A扣钱的操作成功,网络请求有问题,比如返回response是失败;这时结果A操作回滚,但是B收到了请求,增加金钱成功。
(2)网络可能不稳定,一直再重试,此时由于是一个事务里,会导致一直锁A所在的表,致使服务不可用。
(先网络请求,再操作A的扣钱操作情况类似)。

方案二:

正确的方案思路就是
在任何不可靠请求传输前,先落地,再请求。
引入一个流转状态,定时任务通过状态判断,把遗漏请求补发。

在不引入消息队列的情况下,就是创建一个消息发送日志表commitlog,其中有唯一id,流转状态信息。

  1. 请求前,先执行A扣钱+insert commitlog事务操作
  2. 同步发送请求到B,根据返回结果更新commitlog表。
  3. 起一个定时任务,定时轮训commitlog里没有成功的记录。

此方案缺点,需要接入方,自己去实现日志记录表,和定时任务。

方案三

使用事务消息


rocketmq事务消息流程图

消息状态:
unkown callback commit

  1. 发送prepared消息(阿里云上服务叫half消息,一个意思)
  2. 消息成功后执行本地事务
  3. 本地事务成功,发送confirm消息
    以上是正常成功流程
  4. 本地事务失败处理:在发送prepared消息时,会在MQ Server注册监听回调,MQ Server会启定时任务,查询MQ服务器上所有的prepared状态消息,根据消息id,回查接入方producor,看本地事务是否成功,根据本地事务成功与否,确认是发送confirm消息还是callback消息。
  5. 最后,mq订阅方都是通过拉的方式,去消费。往MQ Server发送confirm消息就是,根据消息id查找到对应log,把消息状态置为commit,而MQ订阅方就是拉取commit的消息。

可以看出来,引入mq事务消息相比较第二种方案,基本思路就是,把定时扫描失败的任务交给了MQ去做。而MQ的落地信息(包含状态字段)充当了commitlog的作用。

贴下rocketmq的源码对证下

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //事务回查检测器,是定时任务调用,所有会有线程池设置
        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(2);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        //本地事务执行器
        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
        for (int i = 0; i < 100; i++) {
            try {
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //发送事务消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

有两个角色,一个是事务回查处理类,要实现TransactionCheckListener接口。
一个是本地事务执行器,要实现LocalTransactionExecuter接口
都是返回LocalTransactionState(信息状态unkown callback commit

public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
        throws MQClientException {
        SendResult sendResult = null;
        try {
            //发送prepared信息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            //发送成功,执行本地事务
            case SEND_OK: {
                try {
                    //执行本地事务
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                }
            }
            break;
            //处理mq server端可能的异常状态,信息置为回滚状态
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
        //其余为unknow状态
        try {
            //本地事务执行完成后,发送确认消息或者回滚消息
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        return transactionSendResult;
    }

参考文章:
https://help.aliyun.com/document_detail/29548.html
http://blog.csdn.net/chunlongyu/article/details/53844393

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • 分布式开放消息系统(RocketMQ)的原理与实践 来源:http://www.jianshu.com/p/453...
    meng_philip123阅读 12,889评论 6 104
  • 消息队列设计精要 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终...
    meng_philip123阅读 1,505评论 1 25
  • 现在出门不能开车可以叫滴滴打车上班路上也能欣赏到沿路的风景,去郊外可以随手刷一辆共享单车感受大自然的清新空气,出去...
    水兔儿阅读 524评论 0 0
  • 月儿当空圆 深信广寒有佳人 迟迟不得见佳人容颜 难诉心中思与苦 只愿群星闪烁 孤单夜里有人陪 奈何群星终不如月 唯...
    seven一十一阅读 102评论 0 1