Spring+ActiveMQ 事务

一、思路

  1. 背景介绍
  2. spring 调用链路 + activemq事务链路介绍

二、背景介绍

写这篇文章背景是什么呢?或者说作者碰到了什么问题呢?是这样的,有个需求要在service层执行一些业务逻辑,如果失败了,则会抛出了一个RunTimeException(至于为什么会抛出RunTimeException就不纠结了),与此同时还需要向MQ发送一个Msg便于后续容错处理。而作者碰到的问题就出现了,jdbc的事务随着RunTimeException异常的抛出回滚了,JmsTemplate的事务也回滚了,导致事务发送失败。之前也没有研究过Spring是如何管理MQ的事务,以为它们是独立的,没想到在这里踩了一个坑。没办法,那就debug源码看看。
下面是spring-jms的配置,注意其中的sessionTransacted属性配置成了true,表示jms的事务由spring托管。

     <bean id="jmsTemplateQueue" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="pubSubDomain" value="false"/>
        <property name="deliveryPersistent" value="true"/>
        <property name="sessionTransacted" value="true"/>
    </bean>

下面是Service层的方法,由spring管理jdbc的事务,其中的ToleranceProducer使用JmsTemplate发送消息。

@Resource
private ToleranceProducer toleranceProducer;
// 某个service的方法,这个方法由spring管理jdbc的事务。
public Object xxx(){
        try {
            // .... 一大坨业务          
            transactionManager.commit(transactionForOrder);
        } catch (Exception e) {
            if (e instanceof CustomFailedException) {
                Object failedObject = e.getFailedObject();
                toleranceProducer.sendMessage(failedObject);
                throw e;
            }
            return something;
        }
}

三、spring 调用链路 + activemq事务链路介绍

我们知道spring的事务管理机制也是依赖于AOP,就先从这出发看看到底做了啥。
可以看到TransactionInteracptor也是采用了代理模式,执行真正的业务逻辑之前先开启事务。继续debug看看invokeWithinTransaction中做了什么。

//来自 org.springframework.transaction.interceptor.TransactionInterceptor
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
        Method var10001 = invocation.getMethod();
        invocation.getClass();
        // 开始失误并执行业务逻辑
        return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
    }

ok,这里可以看到spring开启一个新的事务,并调用代码对象执行业务逻辑代码,如果捕获到异常就回滚事务。看看completeTransactionAfterThrowing中发生了什么有趣的事。

    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {
        // 开启新事务或者从当前线程获取事务
        TransactionAttributeSource tas = this.getTransactionAttributeSource();
        TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;
        PlatformTransactionManager tm = this.determineTransactionManager(txAttr);
        String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);
        Object result;
        if (txAttr != null && tm instanceof CallbackPreferringPlatformTransactionManager) {
            // 忽略,这里不关心
        } else {
            // 事务开启
            TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            result = null;

            try {
                // 执行业务逻辑
                result = invocation.proceedWithInvocation();
            } catch (Throwable var17) {
                // 如果执行业务代码过程中抛出了异常那么就就行回滚
                this.completeTransactionAfterThrowing(txInfo, var17);
                throw var17;
            } finally {
                this.cleanupTransactionInfo(txInfo);
            }

            this.commitTransactionAfterReturning(txInfo);
            return result;
        }
        // 省略其他else了,这里不关心
    }

哈哈,其实也没啥,调用TransactionManager进行事务回滚。继续debug。

    protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);
            }

            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    // 调用TransactionManager回滚事务
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                } catch (TransactionSystemException var6) {
                    this.logger.error("Application exception overridden by rollback exception", ex);
                    var6.initApplicationException(ex);
                    throw var6;
                } catch (Error | RuntimeException var7) {
                    this.logger.error("Application exception overridden by rollback exception", ex);
                    throw var7;
                }
            } else {
                // don't care
            }
        }
    }

调用this.processRollback继续回滚。。。说的我都烦了,贴个栈信息自行观赏。


Spring回滚调用栈

ok,省略一大堆回滚的调用栈了,到关键的方法。

    public void afterCompletion(int status) {
        if (this.shouldUnbindAtCompletion()) {
            boolean releaseNecessary = false;
            if (this.holderActive) {
                this.holderActive = false;
                TransactionSynchronizationManager.unbindResourceIfPossible(this.resourceKey);
                this.resourceHolder.unbound();
                releaseNecessary = true;
            } else {
                releaseNecessary = this.shouldReleaseAfterCompletion(this.resourceHolder);
            }

            if (releaseNecessary) {
                // 释放事务同步器中的资源
                this.releaseResource(this.resourceHolder, this.resourceKey);
            }
        } else {
            this.cleanupResource(this.resourceHolder, this.resourceKey, status == 0);
        }

        this.resourceHolder.reset();
    }

看到resource中赫然就有我们要找的MQ连接信息。继续看在releaseResource中发生了啥?


Resource信息

一系列方法调用后,我们发现最终调用了Spring Jms包中的ConnectionFactory方法,对JMS的事务进行了回滚。

// from org.springframework.jms.connection.CachingConnectionFactory
        private void logicalClose(Session proxy) throws JMSException {
            // Preserve rollback-on-close semantics.
            if (this.transactionOpen && this.target.getTransacted()) {
                this.transactionOpen = false;
                this.target.rollback();
            }
            // Physically close durable subscribers at time of Session close call.
            for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
                Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
                if (entry.getKey().subscription != null) {
                    entry.getValue().close();
                    it.remove();
                }
            }
            // Allow for multiple close calls...
            boolean returned = false;
            synchronized (this.sessionList) {
                if (!this.sessionList.contains(proxy)) {
                    this.sessionList.addLast(proxy);
                    returned = true;
                }
            }
            if (returned && logger.isTraceEnabled()) {
                logger.trace("Returned cached Session: " + this.target);
            }
        }

到这里为什么JMS事务会随着JDBC的事务回滚了就一目了然了。但是还有一个问题,TransactionSynchronizationManager事务管理器中resource的MQ connection信息是哪儿来的?
又经过一番debug后,在JmsTransactionManager中终于找到了MQ是如何注册事务到TransactionSynchronizationManager中的。

    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
        }

        ConnectionFactory connectionFactory = obtainConnectionFactory();
        JmsTransactionObject txObject = (JmsTransactionObject) transaction;
        Connection con = null;
        Session session = null;
        try {
            JmsResourceHolder resourceHolder;
            if (this.lazyResourceRetrieval) {
                resourceHolder = new LazyJmsResourceHolder(connectionFactory);
            }
            else {
                con = createConnection();
                session = createSession(con);
                if (logger.isDebugEnabled()) {
                    logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
                }
                resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
            }
            resourceHolder.setSynchronizedWithTransaction(true);
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                resourceHolder.setTimeoutInSeconds(timeout);
            }
            txObject.setResourceHolder(resourceHolder);
                        // 注册当前MQ事务到当前线程的事务管理器中。
            TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
        }
        catch (Throwable ex) {
            if (session != null) {
                try {
                    session.close();
                }
                catch (Throwable ex2) {
                    // ignore
                }
            }
            if (con != null) {
                try {
                    con.close();
                }
                catch (Throwable ex2) {
                    // ignore
                }
            }
            throw new CannotCreateTransactionException("Could not create JMS transaction", ex);
        }
    }

OK,到此为止,作者之前碰到问题就有了答案了。但是debug过程中对spring事务倒是有研究的兴趣了,下次我们来说Spring事务的细节。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容

  • 原文链接:https://docs.spring.io/spring-boot/docs/1.4.x/refere...
    pseudo_niaonao阅读 4,659评论 0 9
  • Spring 事务属性分析 事务管理对于企业应用而言至关重要。它保证了用户的每一次操作都是可靠的,即便出现了异常的...
    壹点零阅读 1,279评论 0 2
  • 要加“m”说明是MB,否则就是KB了. -Xms:初始值 -Xmx:最大值 -Xmn:最小值 java -Xms8...
    dadong0505阅读 4,795评论 0 53
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,775评论 2 11
  • 父亲去世已有十多年了,每每想起他生前的事与景,总会泪盈满眶悲由心起。 父亲是个坚强的人。他建国前出生,在那个年...
    楚霸图阅读 512评论 0 4