执行器
simpleExecutor
每次执行SQL需要预编译SQL语句,即每次都需要重新创建一个PreparedStatement对象。
查询日志会发现mybatis执行了两次预编译操作:
ReuseExecutor
与simpleExecutor用法没有不同,但对同一SQL语句执行只需要预编译一次SQL语句。(注意与下文的一级缓存、二级缓存命中的区别,这个只需要SQL语句一样,就会重用预编译PreparedStatement对象)
BatchExecutor
批处理执行器,只针对修改操作的SQL语句预编译一次,并且需要手动刷新SQL执行才生效。
BaseExecutor
执行器抽象类,执行上面3个执行器的重复操作,比如一级缓存、doQuery、doUpdate方法
将
SimpleExecutor
、ReuseExecutor
、BatchExecutor
赋值给接口Executor
,如上图所示,多次调用他们父类方法BaseExecutor#query
,查看缓存会发现明明执行了两次查询,但是仅仅执行了一次sql
public abstract class BaseExecutor implements Executor {
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler,
CacheKey key, BoundSql boundSql) throws SQLException {
// ...
List<E> list;
try {
queryStack++;
// 执行query方法的时候,会先去localCache一级缓存查询
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
// 缓存没有查询到才会交由具体的SimpleExecutor\ReuseExecutor\BatchExecutor执行器执行具体数据库操作
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
//...
}
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds,
ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
// 从数据库查询出来之后保存到一级缓存localCache中
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
}
CachingExecutor
不同于SimpleExecutor
、ReuseExecutor
、BatchExecutor
三者通过继承BaseExecutor实现Executor接口,Mybatis将CachingExecutor
独立处理直接实现Executor接口,内部通过属性delegate封装其他三个Executor实现,如下图代码所示:
public class CachingExecutor implements Executor {
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds,
ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
// 使用二级缓存的条件是开启二级缓存,并且本次查询并未设置结果处理器(设置了结果处理器可能返回的结果就不一样了)
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
// 查询缓存
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
// 将查询操作交给SimpleExecutor三个Executor实现之一执行
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
// 不存在二级缓存,直接将查询操作交给SimpleExecutor三个Executor实现之一执行
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
}
二级缓存,与一级缓存的区别:
一级缓存查询数据库操作后会直接缓存,二级缓存需要当次数据库操作提交事务后才能进行缓存(二级缓存跨线程处理,一级缓存不用)。
可以通过如下方式指定SqlSession采用哪种Executor:
一级缓存
命中条件
① 查询之后就会生效
② SQL 和 参数必须相同
③ 相同的statementId
④ 必须是同一个SqlSession(一级缓存与线程绑定,关闭会话会清空)
⑤ RowBounds相同
一级缓存其实就是一个Map,因为一级缓存与线程绑定,所以也不存在线程安全问题,不需要使用ConcurrentHashMap,类结构如下:
public class PerpetualCache implements Cache {
private final String id;
private Map<Object, Object> cache = new HashMap<>();
// ...
}
HashMap中,判断key是否是同一个对象,首先要通过hash方法定位散列桶,然后通过equals方法在桶上的链表上查询相等的对象,CacheKey大概源码如下:
public class CacheKey implements Cloneable, Serializable {
private int hashcode;
private List<Object> updateList;
public void update(Object object) {
int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);
count++;
checksum += baseHashCode;
baseHashCode *= count;
hashcode = multiplier * hashcode + baseHashCode;
updateList.add(object);
}
public int hashCode() {
return hashcode;
}
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (!(object instanceof CacheKey)) {
return false;
}
final CacheKey cacheKey = (CacheKey) object;
if (hashcode != cacheKey.hashcode) {
return false;
}
if (checksum != cacheKey.checksum) {
return false;
}
if (count != cacheKey.count) {
return false;
}
for (int i = 0; i < updateList.size(); i++) {
Object thisObject = updateList.get(i);
Object thatObject = cacheKey.updateList.get(i);
if (!ArrayUtil.equals(thisObject, thatObject)) {
return false;
}
}
return true;
}
}
缓存何时会被清空
public abstract class BaseExecutor implements Executor {
// 以下为一级缓存部分删除场景
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds,
ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
// ...
// 在查询栈第一次的时候会清空(嵌套查询的子查询不能清空缓存,它需要依赖父查询的结果)
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
// ...
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
deferredLoads.clear();
// 在查询栈第一次且一级缓存仅应用于会话级别会清空(嵌套查询的子查询不能清空缓存,它需要依赖父查询的结果)
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
clearLocalCache();
}
}
return list;
}
// 执行update操作将会删除一级缓存
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// 删除一级缓存
clearLocalCache();
return doUpdate(ms, parameter);
}
// 执行commit操作将会删除一级缓存
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
// 删除一级缓存
clearLocalCache();
flushStatements();
if (required) {
transaction.commit();
}
}
// 执行rollback操作将会删除一级缓存
@Override
public void rollback(boolean required) throws SQLException {
if (!closed) {
try {
// 删除一级缓存
clearLocalCache();
flushStatements(true);
} finally {
if (required) {
transaction.rollback();
}
}
}
}
// 一级缓存其实就是一个Map,执行clear()操作就能删除所有数据
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
}
集成Spring一级缓存失效问题
由于Mybatis的一级缓存和会话SqlSession绑定,但是Mybatis提供的Mybatis-Spring集成包中,存在SqlSessionTemplate
,该类负责通过JDK代理生成Mapper的代理对象,每一次通过mybatis操作数据库都会生成一个全新的SqlSession,执行完毕将其关闭,所以Mybatis集成Spring之后一级缓存就会失效。
public class SqlSessionTemplate implements SqlSession, DisposableBean {
public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
// 通过JDK代理生成SqlSession代理对象
this.sqlSessionProxy = (SqlSession) newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class },
new SqlSessionInterceptor());
}
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// Spring每一次执行Mybatis操作,都会生成一个全新的SqlSession对象
SqlSession sqlSession = getSqlSession(
SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType,
SqlSessionTemplate.this.exceptionTranslator);
try {
// 调用被代理对象目标方法
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
// release the connection to avoid a deadlock if the translator is no loaded. See issue #22
// 关闭这次生成的SqlSession
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
if (sqlSession != null) {
// 关闭这次生成的SqlSession
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
}
这嵌套了两层动态打理,Spring通过动态代理生成了Mapper的代理作为单例对象注册到Spring容器,执行Mapper操作的时候,再次通过动态代理生成SqlSession接口的实现类。
那么是不是Mybatis集成Spring之后,一级缓存就一定生效不了呢,不是的,只要两个操作之间共享同一个事务,那么它们就可以共享同一个SqlSession。
原理如下:
public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
// Spring创建事务的时候,无论是手动还是通过@Transaction注解,都会调用这个方法
public static void initSynchronization() throws IllegalStateException {
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
synchronizations.set(new LinkedHashSet<>());
}
// 判断当前事务是否有事务同步器
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
// 将数据缓存到线程安全的ThreadLocal
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Map<Object, Object> map = resources.get();
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
// ...
}
}
// 回到SqlSessionInterceptor中获取SqlSession的方法,定位到如下类
public final class SqlSessionUtils {
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory,
ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
// 可以看到首先尝试从TransactionSynchronizationManager的ThreadLocal中获取SqlSession
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
// 从线程缓存中未获取到SqlSession,则创建一个并添加到线程缓存中
session = sessionFactory.openSession(executorType);
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
private static void registerSessionHolder(SqlSessionFactory sessionFactory,
ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
// 存在事务则该条件满足
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
// 将SqlSession缓存到TreadLocal中
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
// ...
} else {
// ...
}
}
二级缓存
命中条件:
① 提交之后缓存才生效(自动提交事务无效,必须手动提交)
② SQL 和 参数必须相同
③ 相同的statementId
④ RowBounds相同
设计模式:装饰器模式+责任链模式
结合上图,Mybatis对对缓存提供了一个Cache接口抽象,除了PerpetualCache
真的提供了缓存数据的功能,其他的SynchronizedCache
,LoggingCache
,LRUCache
,ScheduledCache
,BlockingCache
都是对PerpetualCache
的装饰器,他们内部通过持有Cache类型的delegate对象,调用另一个Cache实现。
具体代码如下:
public interface Cache {
String getId();
void putObject(Object key, Object value);
Object getObject(Object key);
Object removeObject(Object key);
void clear();
int getSize();
default ReadWriteLock getReadWriteLock() {
return null;
}
}
public class SynchronizedCache implements Cache {
private final Cache delegate;
public SynchronizedCache(Cache delegate) {
this.delegate = delegate;
}
@Override
public String getId() {
return delegate.getId();
}
// 省略对其他Cache接口方法的实现...
}
// 不展示缓存链条中的其他缓存装饰器
public class PerpetualCache implements Cache {
private final String id;
private Map<Object, Object> cache = new HashMap<>();
public PerpetualCache(String id) {
this.id = id;
}
// 省略其他Cache接口实现...
}
为什么需要提交才能命中二级缓存
如下图所示,由于二级缓存不同于一级缓存,是跨线程共享的。因此,如果一个线程执行完操作之后,直接存入二级缓存,但若在处理数据库提交的时失败,再去二级缓存删除数据的话,可能造成在这个空档期间其他线程从耳机缓存读取了脏数据。
二级缓存结构
CachingExecutor通过TransactionalCacheManager
来管理二级缓存,二级缓存Cache
作为全局变量transactionalCaches
的key,TransactionalCache
类保存尚未提交的数据,同时作为transactionalCaches
的value,它同时也封装了二级缓存Cache
。
public class TransactionalCacheManager {
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
public void clear(Cache cache) {
getTransactionalCache(cache).clear();
}
public Object getObject(Cache cache, CacheKey key) {
return getTransactionalCache(cache).getObject(key);
}
public void putObject(Cache cache, CacheKey key, Object value) {
getTransactionalCache(cache).putObject(key, value);
}
public void commit() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.commit();
}
}
public void rollback() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.rollback();
}
}
private TransactionalCache getTransactionalCache(Cache cache) {
return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
}
}
TransactionalCache
通过属性entriesToAddOnCommit
保存修改的数据,此时数据尚未保存到二级缓存中。
public class TransactionalCache implements Cache {
private final Cache delegate; // 持有的二级缓存
private boolean clearOnCommit; // 是否提交的时候需要先清空二级缓存
private final Map<Object, Object> entriesToAddOnCommit; // 缓存数据暂存区
private final Set<Object> entriesMissedInCache; // 空数据缓存,避免缓存穿透
public TransactionalCache(Cache delegate) {
this.delegate = delegate;
this.clearOnCommit = false;
this.entriesToAddOnCommit = new HashMap<>();
this.entriesMissedInCache = new HashSet<>();
}
// ...
@Override
public Object getObject(Object key) {
Object object = delegate.getObject(key);
if (object == null) {
entriesMissedInCache.add(key);
}
// 如果发现clearOnCommit变为true,表示二级缓存获得的数据属于脏数据,不能返回
if (clearOnCommit) {
return null;
} else {
return object;
}
}
@Override
public void putObject(Object key, Object object) {
entriesToAddOnCommit.put(key, object);
}
@Override
public Object removeObject(Object key) {
return null;
}
@Override
public void clear() {
clearOnCommit = true;
entriesToAddOnCommit.clear();
}
public void commit() {
if (clearOnCommit) {
delegate.clear();
}
flushPendingEntries();
reset();
}
public void rollback() {
unlockMissedEntries();
reset();
}
private void reset() {
clearOnCommit = false;
entriesToAddOnCommit.clear();
entriesMissedInCache.clear();
}
private void flushPendingEntries() {
// 在执行commit的时候,将数据暂存区的数据保存到二级缓存
for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
delegate.putObject(entry.getKey(), entry.getValue());
}
// 如果缓存的空数据不在数据暂存区,也保存到二级缓存避免穿透到数据库
for (Object entry : entriesMissedInCache) {
if (!entriesToAddOnCommit.containsKey(entry)) {
delegate.putObject(entry, null);
}
}
}
private void unlockMissedEntries() {
for (Object entry : entriesMissedInCache) {
try {
delegate.removeObject(entry);
} catch (Exception e) {
// ...
}
}
}
}
通过CachingExecutor
执行查询,会先去二级缓存查找,未命中缓存则执行数据库查询,并将查询结果保存到数据暂存区,等待执行commit才会将暂存区的数据刷新到二级缓存。
public class CachingExecutor implements Executor {
// 省略与本例不相关方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds,
ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
// 从MappedStatement中获取二级缓存
Cache cache = ms.getCache();
if (cache != null) {
// 是否需要清空二级缓存
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
// 首先尝试在二级缓存中查询
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
// 二级缓存未查询到相关结果,执行数据库查询
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
// 将数据库查询结果保存到二级缓存暂存区
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
}
二级缓存结构图:
附录:二级缓存执行流程图