sql改写源码解析
找到这个方法
com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine#route
/**
* SQL路由.
* 当第一次路由时进行SQL解析,之后的路由复用第一次的解析结果.
*
* @param parameters SQL中的参数
* @return 路由结果
*/
public SQLRouteResultroute(final List parameters) {//sql路由业务方法
if (null ==sqlStatement) {
sqlStatement =sqlRouter.parse(logicSQL, parameters.size());
}
return sqlRouter.route(logicSQL, parameters, sqlStatement);
}
return sqlRouter.route(logicSQL, parameters, sqlStatement);
进入到这个方法
com.dangdang.ddframe.rdb.sharding.routing.router.ParsingSQLRouter#route(java.lang.String, java.util.List, com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.SQLStatement)
@Override
public SQLRouteResultroute(final String logicSQL, final List parameters, final SQLStatement sqlStatement) {
final Context context = MetricsContext.start("Route SQL");
SQLRouteResult result =new SQLRouteResult(sqlStatement);
// 如果是insert语句去生成分布式逐渐的逻辑
if (sqlStatementinstanceof InsertStatement &&null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
}
// 进行sql路由返回路由结果
RoutingResult routingResult = route(parameters, sqlStatement);
SQLRewriteEngine rewriteEngine =new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement);
boolean isSingleRouting = routingResult.isSingleRouting();
if (sqlStatementinstanceof SelectStatement &&null != ((SelectStatement) sqlStatement).getLimit()) {
processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
}
SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
if (routingResultinstanceof CartesianRoutingResult) {
for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
}
}
}else {
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
}
}
MetricsContext.stop(context);
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
}
return result;
}
// sql改写
SQLRewriteEngine rewriteEngine =new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement);
sql路由完毕后会进行sql改写,sql改写的部分主要是内部实现为了结果集归并的一些操作,涉及性能问题,还有就是分页的实现,接下来我们跟踪下sql改写的源码实现。
创建sql改写引擎
SQLRewriteEngine rewriteEngine =new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement);
**
* SQL重写引擎.
*
* @author zhangliang
*/
public final class SQLRewriteEngine {
// 分库分表配置对象
private final ShardingRuleshardingRule;
// sql改写之前的sql
private final StringoriginalSQL;
// sql标记对象集合
private final ListsqlTokens =new LinkedList<>();
// sql语句对象
private final SQLStatementsqlStatement;
// 判断是否为单库表路由
boolean isSingleRouting = routingResult.isSingleRouting();
if (sqlStatementinstanceof SelectStatement &&null != ((SelectStatement) sqlStatement).getLimit()) {
// 处理分页
processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
}
private void processLimit(final List parameters, final SelectStatement selectStatement, final boolean isSingleRouting) {
// select语句排序项不为空 或者聚合选择项不为空 或者排序项和分组项不一致
boolean isNeedFetchAll = (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) && !selectStatement.isSameGroupByAndOrderByItems();
// 填充改写分页参数,!isSingleRouting注意这里只要不是单库表操作分页sql都会进行sql改写,改写成这样,改写前0,10,改写后0,10,改写前10,20 改写后0,20
selectStatement.getLimit().processParameters(parameters, !isSingleRouting, isNeedFetchAll);
}
不是单库表路由,slq改写引擎返回一个sql构造器
SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
public SQLBuilderrewrite(final boolean isRewriteLimit) {
SQLBuilder result =new SQLBuilder();
if (sqlTokens.isEmpty()) {
result.appendLiterals(originalSQL);
return result;
}
int count =0;
sortByBeginPosition();
for (SQLToken each :sqlTokens) {
if (0 == count) {
// 拼接字面量
result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));
}
if (eachinstanceof TableToken) {
// 拼装table
appendTableToken(result, (TableToken) each, count, sqlTokens);
}else if (eachinstanceof ItemsToken) {
// 拼装选择项
appendItemsToken(result, (ItemsToken) each, count, sqlTokens);
}else if (eachinstanceof RowCountToken) {
// 拼装分页长度项
appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit);
}else if (eachinstanceof OffsetToken) {
// 拼装分页偏移量项
appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit);
}else if (eachinstanceof OrderByToken) {
// 拼装排序项
appendOrderByToken(result, count, sqlTokens);
}
count++;
}
return result;
}
// 如果路由结果集是笛卡尔积结果集
if (routingResultinstanceof CartesianRoutingResult) {
// 遍历数据源
for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
// 遍历笛卡尔积表路由组
for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
// 拼装最小执行单元,并装载路由结果集对象
result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
}else {
// 简单路由拼装最小执行单元
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
}
以上是sql改写的源码解析
说到最后
以上内容,仅供参考。
关注微信公众号
加入技术微信群