系统层级
Sharding-JDBC本质是JDBC的增强,使服务能够实现数据的分布式存储效果。可查看如何理解ShardingSphere?。深入ShardingSphere之前需要了解其定义的相关基本概念如分片、逻辑表、物理表、广播表、分片算法分类等,具体可以查看官网。如下图所示Sharding-JDBC整体还是属于数据访问层的,在数据访问层中处于ORM框架之下和ORM是完全解耦的,所以他是可以完全兼容各种类型的ORM框架。Sharding-JDBC对jdbc-connector进行了封装,对其核心的四大对象重新进行了实现,在实现中加入了相关的内核逻辑,包括:SQL解析、SQL路由、 SQL改写、SQL执行、SQL归并等核心逻辑。本文所有分析和文档基于版本4.1.1
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>
调用过程
以ShardingStatement
执行一次查询的过程为例分析具体的调用过程。
执行的入口都是在ShardingStatement
中,StatementExecutor
封装了SQL解析、重写的核心过程。MergeEngine
负责统筹结果的合并,最后返回合并结果在ShardingStatement
中封装成ShardingResultSet
返回。
JDBC增强
-
java.sql.Wrapper
提供了判断当前类是否是目标包装类和反包装为目标类对象的API,目的是为了方便调用,将工具类提供的功能放到对象维度去使用,跟设计模式中的包装器模式没什么关系。 -
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.WrapperAdapter
实现了java.sql.Wrapper
的两个工具方法,同时增加了两个记录调用方法和回放调用方法的API和容器来存储回放方法列表。该类重点是增加支持记录方法和调用方法,而非适配。
为什么需要调用方法的记录和回放?
针对JDBC四大对象Sharding-JDBC是重新做了封装,而对实际的四大对象的一些方法调用往往发生在SQL路由操作完成之后,所以需要提前记录之后回放。
- 所有
AbstractUnsupportedXXX
对象代表了对不支持的操作的默认实现(抛异常SQLFeatureNotSupportedException
)。
配置
sharding-JDBC的自动化配置类是:
org.apache.shardingsphere.shardingjdbc.spring.boot.SpringBootConfiguration
主要是针对配置参数和不同的场景的数据源进行了配置。DataSource在应用的实例只能存在一份,不同的场景通过注解@Conditional
的配置判断不同的配置参数。
启动类注解@AutoConfigureBefore(DataSourceAutoConfiguration.class)
表明它启动在Spring管理的数据源自动化配置DataSourceAutoConfiguration
之前能很好兼容历史数据源以及配置。
DataSource
抽象数据源适配器对象:
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter
做了两点适配:
- AutoCloseable的实现,在close方法中关闭资源。
- 增加了
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.RuntimeContext
实现类Getter方法,用于支撑相关的JDBC-Sharding操作。
针对不同场景的数据源都作出了对应的实现,不同的实现关键区别点主要是在RuntimeContext
的实现不同、不同Connection对象的实现和每个实现静态代码中初始化注册的装饰对象的不同。例如:
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
NewInstanceServiceLoader.register(ResultProcessEngine.class);
}
//...ignore...
}
public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
private final MasterSlaveRuntimeContext runtimeContext;
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
}
//...ignore...
}
...
这种在静态代码块中的通过注册不同的Java SPI实现,可以完成对不同的场景的特殊处理。可以把这种处理逻辑理解成 SPI装饰层:针对不同业务场景(主从、加密、正常分片)的核心逻辑(SQL路由、SQL重新,结果集归并)进行的独立装饰处理的一层,实现方式是Java SPI。
Connection
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter
:增加了获取实际数据库连接对象的方法,同时将连接对象的创建留给子类实现。获取连接列表时针对不同链接模式MEMORY_STRICTLY, CONNECTION_STRICTLY
会有不同操作。内存限制模式时链接没有限制可以并发的请求数据然后在内存中做归并,如果一个链接一个链接的获取可能存在饥饿等待导致死锁所以需要加锁并一次性获取所有连接适合OLAP业务。连接限制模式将结果集装载在内存之后直接释放资源不需要加锁,保证了资源的使用率适合OLTP业务。关于链接模式的判断逻辑为每个连接执行SQL的数量,执行1个为内存限制模式,执行1个以上为连接限制模式,代码逻辑如下:
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups
private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
int count = 0;
for (List<SQLUnit> each : sqlUnitPartitions) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
}
return result;
}
org.apache.shardingsphere.underlying.common.hook.SPIRootInvokeHook
:通过SPI实现在链接创建和关闭处埋点用于可能的逻辑扩展。
Statement
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter
增加了如下两个适配方法,其直接子类均是普通Statement对象。
protected abstract boolean isAccumulate();//用于判断是否返回累加结果作为更新影响数。
protected abstract Collection<? extends Statement> getRoutedStatements();//获取路由后的语句对象列表。
预定制的Statement均继承了org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractShardingPreparedStatementAdapter
:实现了PreparedStatement
相关的参数设置外,单独维护了参数设置方法调用的列表并提供了setter和回放调用,主要用于实际SQL执行之前、逻辑SQL路由之后SQL参数的设置。sharding-jdbc中相关Statement
实现类是相关内核逻辑执行的入口。
ResultSet
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter
:仅仅做了ResultSet
的一些抽象实现未做其他适配。ResultSet
的相关实现类针对不同场景结果集做了不同的封装和实现。
内核逻辑
SQL解析
SQL解析是Sharding-JDBC进行路由的开始,主要分为几个步骤:
- 通过SQL解析器将SQL解析成
AST(抽象语法树)
- 通过抽取器将语法树抽取成SQL片段
- 通过填充器将片段拼接成解析结果
- 通过优化器输出最后的结果
相关概念可参考官网
代码层面解析逻辑由org.apache.shardingsphere.sql.parser.SQLParserEngine#parse
作为入口,本方法中加入的SQL解析的SPI埋点:org.apache.shardingsphere.sql.parser.hook.SPIParsingHook
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
调用对象进行解析并加入了缓存的逻辑。
public final class SQLParserEngine {
private final String databaseTypeName;
private final SQLParseResultCache cache = new SQLParseResultCache();
/** * Parse SQL. * * @param sql SQL * @param useCache use cache or not * @return SQL statement */
public SQLStatement parse(final String sql, final boolean useCache) {
ParsingHook parsingHook = new SPIParsingHook();
parsingHook.start(sql);
try {
SQLStatement result = parse0(sql, useCache);
parsingHook.finishSuccess(result);
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
parsingHook.finishFailure(ex);
throw ex;
}
}
private SQLStatement parse0(final String sql, final boolean useCache) {
if (useCache) {
Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
}
ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
if (useCache) {
cache.put(sql, result);
}
return result;
}
}
SQL路由
根据不同的场景对SQL的路由也分为不同的方式,参考下图:
SQL路由
具体参考官网文档
SQL的解析和路由发生在Statement
对应SQL执行方法exeXXXX
的准备阶段。
- 准备阶段的逻辑继续分层和下沉首先会到准备引擎
BasePrepareEngine(SimpleQueryPrepareEngine/PreparedQueryPrepareEngine)
,两个实现类的区别在于是否使用SQL解析的缓存(PreparedQueryPrepareEngine用缓存,推荐)。同時在这层加载SPI装饰层对象。 - 接着进入下层
org.apache.shardingsphere.underlying.route.DataNodeRouter
,这层主要是增加了SPI埋点org.apache.shardingsphere.underlying.route.hook.SPIRoutingHook
:分为开始,成功,失败三阶段。接着传递到下层。 - 本层解析引擎
org.apache.shardingsphere.sql.parser.SQLParserEngine
负责解析,并将解析的SQLStatement
包装在上下文SQLStatementContext
中作为整体构成RouteContext
返回。 - 此时上下文中已经持有SQL的解析结果的上下文传递到SPI路由装饰层进行实际的路由。本层中会更加Statement的不同获取不同策略的路由引擎(
org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngine
)实现类进行路由,然后得到路由结果,至此准备阶段结束。
SQL改写
SQL改写主要做什么?直接查看官方说明。改表名索引之类的标识符、补列、分页修正、优化的范畴。
同样是在准备阶段,SQL路由完成返回了路由上下文(RouteContext
)之后org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
方法中:
private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
registerRewriteDecorator();
SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
}
- SPI装饰层对象加载
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContextDecorator
- 进入
org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
完成上下文的创建并执行装饰层逻辑。 - 接着调用
org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine
重写引擎执行重写逻辑。
SQL执行
SQL在解析、路由,初始化后进入了执行环节。
- 首先由执行器
org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor#exeXXX
进入执行,维护关键的执行逻辑并以匿名内部类的方式将逻辑下传,如下例:org.apache.shardingsphere.shardingjdbc.executor.StatementExecutor#executeQuery
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(sql, statement, connectionMode);
}
};
return executeCallback(executeCallback);
}
private QueryResult getQueryResult(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
ResultSet resultSet = statement.executeQuery(sql);
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
- 然后进入
org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteTemplate
执行模板,由模板调度执行引擎:org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine
- 执行引擎
org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine
负责发起执行,然后同步或者异步执行。 - 执行的逻辑单元封装咋对象
org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteCallback
中,如下的execute0方法,封装了异常处理、SQL执行SPI埋点``、SQL执行。
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData());
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
ExecutionUnit executionUnit = statementExecuteUnit.getExecutionUnit();
sqlExecutionHook.start(executionUnit.getDataSourceName(), executionUnit.getSqlUnit().getSql(), executionUnit.getSqlUnit().getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
T result = executeSQL(executionUnit.getSqlUnit().getSql(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
return result;
} catch (final SQLException ex) {
sqlExecutionHook.finishFailure(ex);
ExecutorExceptionHandler.handleException(ex);
return null;
}
}
SQL归并
SQL的归并主要针对于多节点返回到的数据进行处理的过程,相关的介绍参考官网
- SQL的归并依赖于当前的连接模式
ConnectionMode
,参考上文Connection部分,连接模式决定了SQL执行的返回结果org.apache.shardingsphere.sharding.execute.sql.execute.result.StreamQueryResult
ororg.apache.shardingsphere.sharding.execute.sql.execute.result.MemoryQueryResult
- 进入归并引擎
org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge
进行归并操作。MergeEngine
会加载SPI装饰层的处理引擎org.apache.shardingsphere.underlying.merge.engine.ResultProcessEngine
并注册到org.apache.shardingsphere.underlying.merge.MergeEntry
中。 - 然后进入
org.apache.shardingsphere.underlying.merge.MergeEntry#process
进行处理。