一个业务操作,涉及的数据库操作在两个库里,就会涉及到分布式事务。
举那个常用的转账例子。
A要转钱给B,A账户和B账户在不同的DB存储(不同的银行系统)。
如何保证操作的原子性昵?
方案一:
容易想到的方案,把A扣钱操作,加上网络请求(给B账号增钱)放到一个事务去处理。
这个方案的带来的问题是:
(1)A扣钱的操作成功,网络请求有问题,比如返回response是失败;这时结果A操作回滚,但是B收到了请求,增加金钱成功。
(2)网络可能不稳定,一直再重试,此时由于是一个事务里,会导致一直锁A所在的表,致使服务不可用。
(先网络请求,再操作A的扣钱操作情况类似)。
方案二:
正确的方案思路就是
在任何不可靠请求传输前,先落地,再请求。
引入一个流转状态,定时任务通过状态判断,把遗漏请求补发。
在不引入消息队列的情况下,就是创建一个消息发送日志表commitlog,其中有唯一id,流转状态信息。
- 请求前,先执行A扣钱+insert commitlog事务操作
- 同步发送请求到B,根据返回结果更新commitlog表。
- 起一个定时任务,定时轮训commitlog里没有成功的记录。
此方案缺点,需要接入方,自己去实现日志记录表,和定时任务。
方案三
使用事务消息
消息状态:
unkown callback commit
- 发送prepared消息(阿里云上服务叫half消息,一个意思)
- 消息成功后执行本地事务
- 本地事务成功,发送confirm消息
以上是正常成功流程 - 本地事务失败处理:在发送prepared消息时,会在MQ Server注册监听回调,MQ Server会启定时任务,查询MQ服务器上所有的prepared状态消息,根据消息id,回查接入方producor,看本地事务是否成功,根据本地事务成功与否,确认是发送confirm消息还是callback消息。
- 最后,mq订阅方都是通过拉的方式,去消费。往MQ Server发送confirm消息就是,根据消息id查找到对应log,把消息状态置为commit,而MQ订阅方就是拉取commit的消息。
可以看出来,引入mq事务消息相比较第二种方案,基本思路就是,把定时扫描失败的任务交给了MQ去做。而MQ的落地信息(包含状态字段)充当了commitlog的作用。
贴下rocketmq的源码对证下
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//事务回查检测器,是定时任务调用,所有会有线程池设置
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
//本地事务执行器
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
有两个角色,一个是事务回查处理类,要实现TransactionCheckListener接口。
一个是本地事务执行器,要实现LocalTransactionExecuter接口
都是返回LocalTransactionState(信息状态unkown callback commit)
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
SendResult sendResult = null;
try {
//发送prepared信息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
//发送成功,执行本地事务
case SEND_OK: {
try {
//执行本地事务
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
}
}
break;
//处理mq server端可能的异常状态,信息置为回滚状态
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
//其余为unknow状态
try {
//本地事务执行完成后,发送确认消息或者回滚消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
return transactionSendResult;
}
参考文章:
https://help.aliyun.com/document_detail/29548.html
http://blog.csdn.net/chunlongyu/article/details/53844393