说在前面
sql路由这里的内容比较多,包含单表路由或者绑定表路由、多库多表路由、笛卡尔积路由,分三部分来介绍,今天先介绍单表或绑定表路由。
sql路由源码解析
com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine、com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine两个sql路由引擎类,预编译的用的比较多,我们以预编译的Statement的引擎类来跟踪下sharding-jdbc是对sql怎么进行路由的。
上层sql执行器接收到逻辑sql后再进行sql路由的时候会创建预编译statement对象的路由器,因此会调用其构造器
/**
* 预解析的SQL路由器.
*
* @author zhangliang
*/
public final class PreparedStatementRoutingEngine {
// 逻辑sql
private final StringlogicSQL;
//sql路由器
private final SQLRoutersqlRouter;
// sql语句对象
private SQLStatementsqlStatement;
public PreparedStatementRoutingEngine(final String logicSQL, final ShardingContext shardingContext) {
this.logicSQL = logicSQL;
sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext);
}
/**
* 路由引擎工厂.
*
* @author zhangiang
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class SQLRouterFactory {
/**
* 创建SQL路由器.
*
* @param shardingContext 数据源运行期上下文
* @return SQL路由器
*/
// 这里是静态工厂方法实现
public static SQLRoutercreateSQLRouter(final ShardingContext shardingContext) {
return HintManagerHolder.isDatabaseShardingOnly() ?new DatabaseHintSQLRouter(shardingContext) :new ParsingSQLRouter(shardingContext);
}
}
接下来会创建ParsingSQLRouter对象
**
* 需要解析的SQL路由器.
*
* @author zhangiang
*/
public final class ParsingSQLRouterimplements SQLRouter {
// 分库分表配置对象
private final ShardingRuleshardingRule;
// 支持的数据库类型
private final DatabaseTypedatabaseType;
// 是否要展示sql
private final boolean showSQL;
private final ListgeneratedKeys;
// 上面这些属性值都是存储在分片上下文中
public ParsingSQLRouter(final ShardingContext shardingContext) {
shardingRule = shardingContext.getShardingRule();
databaseType = shardingContext.getDatabaseType();
showSQL = shardingContext.isShowSQL();
generatedKeys =new LinkedList<>();
}
这个方法是sql路由的入口方法
/**
* SQL路由.
* 当第一次路由时进行SQL解析,之后的路由复用第一次的解析结果.
*
* @param parameters SQL中的参数
* @return 路由结果
*/
public SQLRouteResultroute(final List parameters) {//sql路由业务方法
if (null ==sqlStatement) {
sqlStatement =sqlRouter.parse(logicSQL, parameters.size());
}
return sqlRouter.route(logicSQL, parameters, sqlStatement);
}
进入到这个parse方法
sqlStatement =sqlRouter.parse(logicSQL, parameters.size());
@Override
public SQLStatementparse(final String logicSQL, final int parametersSize) {
// 创建sql解析引擎
SQLParsingEngine parsingEngine =new SQLParsingEngine(databaseType, logicSQL, shardingRule);
// 开启度量上下文
Context context = MetricsContext.start("Parse SQL");
// sql解析器解析获得sql语句对象
SQLStatement result = parsingEngine.parse();
if (resultinstanceof InsertStatement) {
((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
}
MetricsContext.stop(context);
return result;
}
进入下面的sql路由方法,返回路由结果
return sqlRouter.route(logicSQL, parameters, sqlStatement);
private RoutingResultroute(final List parameters, final SQLStatement sqlStatement) {
Collection tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
// 如果表集合是1,或者是绑定表路由就走简单路由规则
if (1 == tableNames.size() ||shardingRule.isAllBindingTables(tableNames)) {
routingEngine =new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//单表路由
}else {
// TODO 可配置是否执行笛卡尔积
routingEngine =new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
}
return routingEngine.route();//tianhe TODO 笛卡尔积
}
创建简单路由引擎
routingEngine =new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);//单表路由
/**
* 简单路由引擎.
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class SimpleRoutingEngineimplements RoutingEngine {
// 分库分表配置对象
private final ShardingRuleshardingRule;
// sql参数
private final Listparameters;
// 逻辑表名
private final StringlogicTableName;
// sql语句对象
private final SQLStatementsqlStatement;
不是单表路由,就走多库多表路由引擎,创建多库多表路由对象
routingEngine =new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
/**
* 混合多库表路由引擎.
*
* @author gaohongtao
* @author zhangliang
*/
@RequiredArgsConstructor
@Slf4j
public final class ComplexRoutingEngineimplements RoutingEngine {
private final ShardingRuleshardingRule;
private final Listparameters;
private final CollectionlogicTables;
private final SQLStatementsqlStatement;
return routingEngine.route();//tianhe TODO 笛卡尔积
这里是路由逻辑,这里有三种实现,一种是单表或者绑定表路由,一种是多库多表路由,一种是笛卡尔积路由
单表或者绑定表路由
@Override
public RoutingResultroute() {
// 根据逻辑表名获得表规则配置对象
TableRule tableRule =shardingRule.getTableRule(logicTableName);
// 根据表规则配置对象获得数据源集合
Collection routedDataSources = routeDataSources(tableRule);
Map> routedMap =new LinkedHashMap<>(routedDataSources.size());
for (String each : routedDataSources) {
routedMap.put(each, routeTables(tableRule, each));
}
return generateRoutingResult(tableRule, routedMap);
}
根据逻辑表名获得表规则配置对象
TableRule tableRule =shardingRule.getTableRule(logicTableName);
/**
* 根据逻辑表名称查找分片规则.
*
* @param logicTableName 逻辑表名称
* @return 该逻辑表的分片规则
*/
public TableRulegetTableRule(final String logicTableName) {
// 根据逻辑表返回表规则配置对象
Optional tableRule = tryFindTableRule(logicTableName);
if (tableRule.isPresent()) {
return tableRule.get();
}
// 如果默认数据源不为空就根据默认数据源创建表配置规则对象
if (dataSourceRule.getDefaultDataSource().isPresent()) {
return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule);
}
throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName);
}
// 如果默认数据源不为空就根据默认数据源创建表配置规则对象
if (dataSourceRule.getDefaultDataSource().isPresent()) {
return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule);
}
// 根据默认数据源创建部分库数据分片策略,数据表不分表分片策略对象,并创建表配置规则对象进行装载
private TableRulecreateTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) {
Map defaultDataSourceMap =new HashMap<>(1);
defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get());
return TableRule.builder(logicTableName)
.dataSourceRule(new DataSourceRule(defaultDataSourceMap))
.databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm()))
.tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build();
}
返回到这里
@Override
public RoutingResultroute() {
// 根据逻辑表名获得表规则配置对象
TableRule tableRule =shardingRule.getTableRule(logicTableName);
// 根据表规则配置对象获得数据源集合
Collection routedDataSources = routeDataSources(tableRule);
Map> routedMap =new LinkedHashMap<>(routedDataSources.size());
for (String each : routedDataSources) {
routedMap.put(each, routeTables(tableRule, each));
}
return generateRoutingResult(tableRule, routedMap);
}
// 根据表规则配置对象获得数据源集合
Collection routedDataSources = routeDataSources(tableRule);
根据分片列获取分片值
getShardingValues(strategy.getShardingColumns());
// 根据真实的数据源名称和分片值计算静态分片
Collection result = strategy.doStaticSharding(tableRule.getActualDatasourceNames(), shardingValues);
/**
* 计算静态分片.
*
* @param availableTargetNames 所有的可用分片资源集合
* @param shardingValues 分片值集合
* @return 分库后指向的数据源名称集合
*/
public CollectiondoStaticSharding(final Collection availableTargetNames, final Collection> shardingValues) {
Collection result =new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
// 如果没有解析到传入的数据源分片值,要走全库路由
if (shardingValues.isEmpty()) {
result.addAll(availableTargetNames);
}else {
// 如果传入分片值,根据分片值去获取具体的数据源
result.addAll(doSharding(shardingValues, availableTargetNames));
}
return result;
}
注意上面的数据库路由的默认实现,如果不传入数据库分片值会走全库路由的,数据量大的话是会影响性能的,所以建议必须要传入分片值,阿里的TDDL这里的实现是直接报错的。
// 如果传入分片值,根据分片值去获取具体的数据源
result.addAll(doSharding(shardingValues, availableTargetNames));
private CollectiondoSharding(final Collection> shardingValues, final Collection availableTargetNames) {
// 如果没分片
if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
return Collections.singletonList(((NoneKeyShardingAlgorithm)shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
}
// 如果按一个分片值分片
if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
SingleKeyShardingAlgorithm singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm)shardingAlgorithm;
ShardingValue shardingValue = shardingValues.iterator().next();
switch (shardingValue.getType()) {
case SINGLE:
// = 元算符分片
return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
case LIST:
// in运算符分片
return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
case RANGE:
// between运算符分片
return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
default:
// 现在只支持这三种运算符分片
throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());
}
}
// 如果是多个分片值
if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {
return ((MultipleKeysShardingAlgorithm)shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
}
// 其他方式的分片不支持
throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());
}
返回到这里
@Override
public RoutingResultroute() {
// 根据逻辑表名获得表规则配置对象
TableRule tableRule =shardingRule.getTableRule(logicTableName);
// 根据表规则配置对象获得数据源集合
Collection routedDataSources = routeDataSources(tableRule);
Map> routedMap =new LinkedHashMap<>(routedDataSources.size());
for (String each : routedDataSources) {
routedMap.put(each, routeTables(tableRule, each));
}
return generateRoutingResult(tableRule, routedMap);
}
根据数据源和表配置规则组装路由map
routedMap.put(each, routeTables(tableRule, each));
下面这个方法是获取路由的表的集合
private CollectionrouteTables(final TableRule tableRule, final String routedDataSource) {
// 获取表分片策略
TableShardingStrategy strategy =shardingRule.getTableShardingStrategy(tableRule);
// 获取分片值
List> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns())
: getShardingValues(strategy.getShardingColumns());//doDynamicSharding
// 如果是动态分片走动态分片,如果是静态分片走静态分片
Collection result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues) : strategy.doStaticSharding(tableRule.getActualTableNames(routedDataSource), shardingValues);
Preconditions.checkState(!result.isEmpty(), "no table route info");
return result;
}
/**
* 计算动态分片.
*
* @param shardingValues 分片值集合
* @return 分库后指向的分片资源集合
*/
public CollectiondoDynamicSharding(final Collection> shardingValues) {//doDynamicSharding
Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value.");
Collection availableTargetNames = Collections.emptyList();
Collection result =new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
result.addAll(doSharding(shardingValues, availableTargetNames));
return result;
}
返回到这里
@Override
public RoutingResultroute() {
// 根据逻辑表名获得表规则配置对象
TableRule tableRule =shardingRule.getTableRule(logicTableName);
// 根据表规则配置对象获得数据源集合
Collection routedDataSources = routeDataSources(tableRule);
Map> routedMap =new LinkedHashMap<>(routedDataSources.size());
for (String each : routedDataSources) {
routedMap.put(each, routeTables(tableRule, each));
}
return generateRoutingResult(tableRule, routedMap);
}
// 生成路由结果
return generateRoutingResult(tableRule, routedMap);
private RoutingResultgenerateRoutingResult(final TableRule tableRule, final Map> routedMap) {
RoutingResult result =new RoutingResult();
// 遍历roadMap,roadMap里面key值存储的是数据源名称,value值是物理数据表集合
for (Entry> entry : routedMap.entrySet()) {
// 获取最下数据单元,每个数据单元是一个DataNode
Collection dataNodes = tableRule.getActualDataNodes(entry.getKey(), entry.getValue());
for (DataNode each : dataNodes) {
// 组装数据表单元装载到路由结果中
result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName()));
}
}
return result;
}
数据模型
/**
* SQL路由结果.
*
* @author gaohongtao
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class SQLRouteResult {
// sql语句对象
private final SQLStatementsqlStatement;
// 最小sql执行单元集合
private final SetexecutionUnits =new LinkedHashSet<>();
private final ListgeneratedKeys =new LinkedList<>();
}
/**
* SQL最小执行单元.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class SQLExecutionUnit {
// 具体的数据源
private final StringdataSource;
// 具体要执行的物理sql语句
private final Stringsql;
}
/**
* 路由表单元.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class TableUnit {
// 数据源名
private final StringdataSourceName;
// 逻辑表名
private final StringlogicTableName;
// 物理表名
private final StringactualTableName;
}
/**
* 路由表单元集合.
*
* @author zhangliang
*/
@Getter
@ToString
public final class TableUnits {
// 路由表单元集合
private final ListtableUnits =new LinkedList<>();
/**
* 路由结果.
*
* @author zhangliang
*/
@Getter
public class RoutingResult {
// 表路由单元集合
private final TableUnitstableUnits =new TableUnits();
/**
* 路由表单元.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class TableUnit {
// 数据源名
private final StringdataSourceName;
// 逻辑表名
private final StringlogicTableName;
// 物理表名
private final StringactualTableName;
}
/**
* 分库分表数据单元.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class DataNode {
private static final StringDELIMITER =".";
private final StringdataSourceName;//数据库名
private final StringtableName;//表名
说到最后
以上介绍,仅供参考。
关注微信公众号
加入技术微信群