从源码看ShardingSphere设计-归并引擎篇

归并引擎的职责定位是进行结果集的合并,支持应用以标准的JDBC接口访问正确的结果集ResultSet。因为在数据分片模式下,SQL可能会需要在多个数据节点上执行,各数据节点的结果集之间是独立不关联的,在排序、分组、聚合等操作时,就需要对结果集进行归并处理,以屏蔽后端多个数据库给应用操作带来的差异性。

代码执行分析

合并引擎对应的类为MergeEngine,但其内部真正进行处理类为MergeEntry,其实例merger在构造函数中完成创建。

org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine

   
   private final Collection<BaseRule> rules;
   
   private final MergeEntry merger;
   
   public MergeEngine(final Collection<BaseRule> rules, final ConfigurationProperties properties, final DatabaseType databaseType, final SchemaMetaData metaData) {
       this.rules = rules;
       merger = new MergeEntry(databaseType, metaData, properties);
   }
   
   /**
    * Merge.
    *
    * @param queryResults query results
    * @param sqlStatementContext SQL statement context
    * @return merged result
    * @throws SQLException SQL exception
    */
   public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
       registerMergeDecorator();
       return merger.process(queryResults, sqlStatementContext);
   }
   
   private void registerMergeDecorator() {
       for (Class<? extends ResultProcessEngine> each : OrderedRegistry.getRegisteredClasses(ResultProcessEngine.class)) {
           ResultProcessEngine processEngine = createProcessEngine(each);
           Class<?> ruleClass = (Class<?>) processEngine.getType();
           // FIXME rule.getClass().getSuperclass() == ruleClass for orchestration, should decouple extend between orchestration rule and sharding rule
           rules.stream().filter(rule -> rule.getClass() == ruleClass || rule.getClass().getSuperclass() == ruleClass).collect(Collectors.toList())
                   .forEach(rule -> merger.registerProcessEngine(rule, processEngine));
       }
   }
   
   private ResultProcessEngine createProcessEngine(final Class<? extends ResultProcessEngine> processEngine) {
       try {
           return processEngine.newInstance();
       } catch (final InstantiationException | IllegalAccessException ex) {
           throw new ShardingSphereException(String.format("Can not find public default constructor for result process engine `%s`", processEngine), ex);
       }
   }
}

merge方法首先进行了对merge装饰器进行了注册,具体根据传入的BaseRule类型,将需要的ResultProcessEngine进行实例化,并添加到MergeEntry实例的engines属性中。真正的处理逻辑在MergeEngine实例的process方法中。

在4.1.1版本中,结果集合并引擎ResultMergerEngine接口实现类只有ShardingResultMergerEngine类,结果装饰器ResultDecoratorEngine接口实现类只有EncryptResultDecoratorEngine。

接下来看下MergeEntry 类的process方法

org.apache.shardingsphere.underlying.merge.MergeEntry

@RequiredArgsConstructor
public final class MergeEntry {
    
    private final DatabaseType databaseType;
    
    private final SchemaMetaData schemaMetaData;
    
    private final ConfigurationProperties properties;
    
    private final Map<BaseRule, ResultProcessEngine> engines = new LinkedHashMap<>();
    
    … 
    /**
     * Process query results.
     *
     * @param queryResults query results
     * @param sqlStatementContext SQL statement context
     * @return merged result
     * @throws SQLException SQL exception
     */
    public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
}
private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultMergerEngine) {
                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
            }
        }
        return Optional.empty();
    }
    
    @SuppressWarnings("unchecked")
    private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext) throws SQLException {
        MergedResult result = null;
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultDecoratorEngine) {
                ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
            }
        }
        return null == result ? mergedResult : result;
    }
    
    @SuppressWarnings("unchecked")
    private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
        MergedResult result = null;
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultDecoratorEngine) {
                ResultDecorator resultDecorator = ((ResultDecoratorEngine) entry.getValue()).newInstance(databaseType, schemaMetaData, entry.getKey(), properties, sqlStatementContext);
                result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, schemaMetaData) : resultDecorator.decorate(result, sqlStatementContext, schemaMetaData);
            }
        }
        return Optional.ofNullable(result);
    }

process方法主要逻辑主要有两部分:
一、 遍历拿到注册的ResultMergerEngine实例(只能有一个),调用其newInstance方法返回对应的ResultMerger对象,然后执行ResultMerger实例的merge方法生成合并结果集MergedResult实例。

转入ShardingResultMergerEngine的newInstance方法可以看到,根据SQLStatementContext类型的种类创建不同的ResultMerger实例。如果是SQL是Select,则创建ShardingDQLResultMerger对象;如果是DAL语句(show database、show table 等),则创建ShardingDALResultMerger对象。

org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine

/**
 * Result merger engine for sharding.
 */
public final class ShardingResultMergerEngine implements ResultMergerEngine<ShardingRule> {
    
    @Override
    public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
        if (sqlStatementContext instanceof SelectStatementContext) {
            return new ShardingDQLResultMerger(databaseType);
        } 
        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
            return new ShardingDALResultMerger(shardingRule);
        }
        return new TransparentResultMerger();
    }
…
}

接下来我们看下最常见的Select类的结果合并器
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger

public final class ShardingDQLResultMerger implements ResultMerger {
    
    private final DatabaseType databaseType;
    
    @Override
    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        if (1 == queryResults.size()) {
            return new IteratorStreamMergedResult(queryResults);
        }
        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
        selectStatementContext.setIndexes(columnLabelIndexMap);
        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);//生成合并结果集
        return decorate(queryResults, selectStatementContext, mergedResult);//对合并结果集进行装饰处理
    }
    …    
    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        if (isNeedProcessGroupBy(selectStatementContext)) {
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessDistinctRow(selectStatementContext)) {
            setGroupByForDistinctRow(selectStatementContext);
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessOrderBy(selectStatementContext)) {
            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
        }
        return new IteratorStreamMergedResult(queryResults);
}
…private MergedResult getGroupByMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                                                final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        return selectStatementContext.isSameGroupByAndOrderByItems()
                ? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectStatementContext, schemaMetaData)
                : new GroupByMemoryMergedResult(queryResults, selectStatementContext, schemaMetaData);
    }
    
    private boolean isNeedProcessOrderBy(final SelectStatementContext selectStatementContext) {
        return !selectStatementContext.getOrderByContext().getItems().isEmpty();
    }
    
    private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {
        PaginationContext paginationContext = selectStatementContext.getPaginationContext();
        if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
            return mergedResult;
        }
        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
            return new LimitDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("Oracle".equals(trunkDatabaseName)) {
            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("SQLServer".equals(trunkDatabaseName)) {
            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        return mergedResult;
    }

}

可以看到ShardingDQLResultMerger的merge方法:

  1. 首先生成columnLabel位置下标的Map,然后传入SelectStatementContext.setIndexes方法,进行聚合projecttion、group by、order by设置其对应的columnLabel所在位置下标,这些下标会在GroupByStreamMergedResult的等类中访问。
  2. build方法中判断是否需要处理group by、distinct以及order by,然后返回对应的
    MergedResult实现类。这些实现类是合并结果集的真正逻辑所在。getGroupByMergedResult方法中判断group by与order by的列是否一样,如果是则创建GroupByStreamMergedResult实例,否则创建GroupByMemoryMergedResult实例,前者为基于流方式,后者为基于内存方式。

看下GroupByStreamMergedResult类的实现

org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByStreamMergedResult

/**
 * Stream merged result for group by.
 */
public final class GroupByStreamMergedResult extends OrderByStreamMergedResult {
    
    private final SelectStatementContext selectStatementContext;
    
    private final List<Object> currentRow;
    
    private List<?> currentGroupByValues;
    
    public GroupByStreamMergedResult(final Map<String, Integer> labelAndIndexMap, final List<QueryResult> queryResults,
                                     final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        super(queryResults, selectStatementContext, schemaMetaData);
        this.selectStatementContext = selectStatementContext;
        currentRow = new ArrayList<>(labelAndIndexMap.size());
        currentGroupByValues = getOrderByValuesQueue().isEmpty()
                ? Collections.emptyList() : new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues();
    }
    
    @Override
    public boolean next() throws SQLException {
        currentRow.clear();
        if (getOrderByValuesQueue().isEmpty()) {
            return false;
        }
        if (isFirstNext()) {
            super.next();
        }
        if (aggregateCurrentGroupByRowAndNext()) {
            currentGroupByValues = new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues();
        }
        return true;
    }
    
    private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
        boolean result = false;
//生成各AggregationProjection对应的聚合单元AggregationUnit,在进行聚合计算时会根据该map进行运算。
        Map<AggregationProjection, AggregationUnit> aggregationUnitMap = Maps.toMap(
                selectStatementContext.getProjectionsContext().getAggregationProjections(), input -> AggregationUnitFactory.create(input.getType(), input instanceof AggregationDistinctProjection));
//如果从优先队列中取到的group by值与当前group by值一样,则进行聚合运算
        while (currentGroupByValues.equals(new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues())) {
            aggregate(aggregationUnitMap);
            cacheCurrentRow();
            result = super.next();
            if (!result) {
                break;
            }
        }
//将聚合计算后的结果设置到currentRow中,这样外围在调用getValue()时,就可以拿到聚合运算后的准确结果(在sharding-jdbc中,应用调用ShardingResultSet的getXXX方法,其内部再调用MergeResult的getValue方法)
        setAggregationValueToCurrentRow(aggregationUnitMap);
        return result;
    }
    //聚合计算
    private void aggregate(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) throws SQLException {
        for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
            List<Comparable<?>> values = new ArrayList<>(2);
            if (entry.getKey().getDerivedAggregationProjections().isEmpty()) {
                values.add(getAggregationValue(entry.getKey()));
            } else {///当为avg聚合操作时会有DerivedAggregationProjections,主要是rewrite时添加的sum和count
                for (AggregationProjection each : entry.getKey().getDerivedAggregationProjections()) {
                    values.add(getAggregationValue(each));
                }
            }
            entry.getValue().merge(values);
        }
    }
    
    private void cacheCurrentRow() throws SQLException {
        for (int i = 0; i < getCurrentQueryResult().getColumnCount(); i++) {
            currentRow.add(getCurrentQueryResult().getValue(i + 1, Object.class));
        }
    }
    
...
    private void setAggregationValueToCurrentRow(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) {
        for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
            currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult());
        }
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) {
        Object result = currentRow.get(columnIndex - 1);
        setWasNull(null == result);
        return result;
    }
    …
    }

GroupByStreamMergedResult继承自OrderByStreamMergedResult类,该类实现的即为基于流模式的排序合并结果集。

org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByStreamMergedResult

/**
 * Stream merged result for order by.
 */
public class OrderByStreamMergedResult extends StreamMergedResult {
    
    private final Collection<OrderByItem> orderByItems;
    
    @Getter(AccessLevel.PROTECTED)
    private final Queue<OrderByValue> orderByValuesQueue;
    
    @Getter(AccessLevel.PROTECTED)
    private boolean isFirstNext;
    
    public OrderByStreamMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        this.orderByItems = selectStatementContext.getOrderByContext().getItems();
        this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
        orderResultSetsToQueue(queryResults, selectStatementContext, schemaMetaData);
        isFirstNext = true;
    }
    //将每个数据节点的查询结果各取一个放入优先队列中,设置当前查询结果为队列头元素,由于优先队列能保证其中元素的顺序性,因此 每次取出的队头元素即为排序最小的。
    private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        for (QueryResult each : queryResults) {
            OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schemaMetaData);
            if (orderByValue.next()) {
                orderByValuesQueue.offer(orderByValue);
            }
        }
        setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
    }
    
    @Override
    public boolean next() throws SQLException {
        if (orderByValuesQueue.isEmpty()) {
            return false;
        }
        if (isFirstNext) {
            isFirstNext = false;
            return true;
        }
//获取队列头元素,然后取该元素所在数据组的下一个元素,如果存在则继续压入有限队列
        OrderByValue firstOrderByValue = orderByValuesQueue.poll();
        if (firstOrderByValue.next()) {
            orderByValuesQueue.offer(firstOrderByValue);
        }
        if (orderByValuesQueue.isEmpty()) {
            return false;
        }
        setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
        return true;
    }
}

OrderByStreamMergedResult类巧妙的使用优先队列实现了基于流模式的排序,由于每个数据集已经有序,所以在next()操作时弹出队列头部元素,然后再取该数据集下一个压入队列,当进行读取数据时直接读取队头元素对应值即可。

这里需要注意的是该优先队列采用的是小顶堆,但实际排序的时候可能是升序也可能是降序,那这里如何保证在不同排序方向时都能正确返回结果呢。玄机在排序元素OrderByValue类,其实现了Comparable接口,其会根据排序方向保证元素值的正确顺序。
org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue

public final class OrderByValue implements Comparable<OrderByValue> {
…
    public int compareTo(final OrderByValue o) {
        int i = 0;
        for (OrderByItem each : orderByItems) {
            int result = CompareUtil.compareTo(orderValues.get(i), o.orderValues.get(i), each.getSegment().getOrderDirection(),
                each.getSegment().getNullOrderDirection(), orderValuesCaseSensitive.get(i));
            if (0 != result) {
                return result;
            }
            i++;
        }
        return 0;
}
}

在CompareUtil类的compareTo方法中,根据排序方向进行返回比较结果,当降序时直接返回值得比较结果,如果是升序则对值比较结果取反。

public final class CompareUtil {
    
    /**
     * Compare two object with order type.
     *
     * @param thisValue this value
     * @param otherValue other value
     * @param orderDirection order direction 
     * @param nullOrderDirection order direction for null value
     * @param caseSensitive case sensitive
     * @return compare result
     */
    @SuppressWarnings("unchecked")
    public static int compareTo(final Comparable thisValue, final Comparable otherValue, final OrderDirection orderDirection, final OrderDirection nullOrderDirection, final boolean caseSensitive) {
        if (null == thisValue && null == otherValue) {
            return 0;
        }
        if (null == thisValue) {
            return orderDirection == nullOrderDirection ? -1 : 1;
        }
        if (null == otherValue) {
            return orderDirection == nullOrderDirection ? 1 : -1;
        }
        if (!caseSensitive && thisValue instanceof String && otherValue instanceof String) {
            return compareToCaseInsensitiveString((String) thisValue, (String) otherValue, orderDirection);
        }
        return OrderDirection.ASC == orderDirection ? thisValue.compareTo(otherValue) : -thisValue.compareTo(otherValue);//当升序时,将比较结果值取反
    }
    

当group by和order by的列不一样时,则采用基于内存模式归并,对应合并结果集类为org.apache.shardingsphere.sharding.merge.dql.groupby.GroupByMemoryMergedResult

/**
 * Memory merged result for group by.
 */
public final class GroupByMemoryMergedResult extends MemoryMergedResult<ShardingRule> {
    
    public GroupByMemoryMergedResult(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        super(null, schemaMetaData, selectStatementContext, queryResults);
    }
    
    @Override
    protected List<MemoryQueryResultRow> init(final ShardingRule shardingRule,
                                              final SchemaMetaData schemaMetaData, final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
        Map<GroupByValue, MemoryQueryResultRow> dataMap = new HashMap<>(1024);
        Map<GroupByValue, Map<AggregationProjection, AggregationUnit>> aggregationMap = new HashMap<>(1024);
        for (QueryResult each : queryResults) {//遍历所有结果集元素,然后进行聚合运算
            while (each.next()) {
                GroupByValue groupByValue = new GroupByValue(each, selectStatementContext.getGroupByContext().getItems());
                initForFirstGroupByValue(selectStatementContext, each, groupByValue, dataMap, aggregationMap);//初始化创建GroupByValue对应的AggregationUnit
                aggregate(selectStatementContext, each, groupByValue, aggregationMap);//对各分组进行聚合计算
            }
        }
        setAggregationValueToMemoryRow(selectStatementContext, dataMap, aggregationMap);//将计算的值设置到aggregationMap
        List<Boolean> valueCaseSensitive = queryResults.isEmpty() ? Collections.emptyList() : getValueCaseSensitive(queryResults.iterator().next(), selectStatementContext, schemaMetaData);
        return getMemoryResultSetRows(selectStatementContext, dataMap, valueCaseSensitive);//
}
…
//将aggregationMap中值排序后生成List<MemoryQueryResultRow>集合,此及时作为内存合并结果的内部数据类
    private List<MemoryQueryResultRow> getMemoryResultSetRows(final SelectStatementContext selectStatementContext, 
                                                              final Map<GroupByValue, MemoryQueryResultRow> dataMap, final List<Boolean> valueCaseSensitive) {
        List<MemoryQueryResultRow> result = new ArrayList<>(dataMap.values());
        result.sort(new GroupByRowComparator(selectStatementContext, valueCaseSensitive));
        return result;
    }
}

可以看到与基于流模式归并的区别在于需要遍历读取所有结果集中元素,然后根据GroupByValue分组进行聚合运算。

在上面我们提到会根据创建聚合类型,创建AggregationUnit实例,其对应的工厂类为org.apache.shardingsphere.sharding.merge.dql.groupby.aggregation.AggregationUnitFactory

public final class AggregationUnitFactory {
    
    /**
     * Create aggregation unit instance.
     * 
     * @param type aggregation function type
     * @param isDistinct is distinct
     * @return aggregation unit instance
     */
    public static AggregationUnit create(final AggregationType type, final boolean isDistinct) {
        switch (type) {
            case MAX:
                return new ComparableAggregationUnit(false);
            case MIN:
                return new ComparableAggregationUnit(true);
            case SUM:
                return isDistinct ? new DistinctSumAggregationUnit() : new AccumulationAggregationUnit();
            case COUNT:
                return isDistinct ? new DistinctCountAggregationUnit() : new AccumulationAggregationUnit();
            case AVG:
                return isDistinct ? new DistinctAverageAggregationUnit() : new AverageAggregationUnit();
            default:
                throw new UnsupportedOperationException(type.name());
        }
    }

这些AggregationUnit实现类逻辑都比较简单,主要就是累积计算传入集合元素的和,如果带有distinct的,则其内部通过一个HashSet进行判重,当为avg聚合运算时,传入的值有两个元素,分别为count和sum值。

org.apache.shardingsphere.sharding.merge.dql.groupby.aggregation.AccumulationAggregationUnit

/**
 * Accumulation aggregation unit.
 */
@RequiredArgsConstructor
public final class AccumulationAggregationUnit implements AggregationUnit {
    
    private BigDecimal result;
    
    @Override
    public void merge(final List<Comparable<?>> values) {
        if (null == values || null == values.get(0)) {
            return;
        }
        if (null == result) {
            result = new BigDecimal("0");
        }
        result = result.add(new BigDecimal(values.get(0).toString()));
    }
    
    @Override
    public Comparable<?> getResult() {
        return result;
    }
}

org.apache.shardingsphere.sharding.merge.dql.groupby.aggregation.DistinctAverageAggregationUnit

**
 * Distinct average aggregation unit.
 */
@RequiredArgsConstructor
public final class DistinctAverageAggregationUnit implements AggregationUnit {
    
    private BigDecimal count;
    
    private BigDecimal sum;
    
    private Collection<Comparable<?>> countValues = new LinkedHashSet<>();
    
    private Collection<Comparable<?>> sumValues = new LinkedHashSet<>();
    
    @Override
    public void merge(final List<Comparable<?>> values) {
        if (null == values || null == values.get(0) || null == values.get(1)) {
            return;
        }
        if (this.countValues.add(values.get(0)) && this.sumValues.add(values.get(0))) {
            if (null == count) {
                count = new BigDecimal("0");
            }
            if (null == sum) {
                sum = new BigDecimal("0");
            }
            count = count.add(new BigDecimal(values.get(0).toString()));
            sum = sum.add(new BigDecimal(values.get(1).toString()));
        }
    } 

由于这些类逻辑比较简单,这里就不详细介绍其中代码。

在基于流或者内存生成合并结果集后,最后还有一个分页的装饰操作,其核心操作是根据offset,先对结果集取next()操作offset次,之所以需要这样处理,是因为如果是多个数据节点的分页操作,在rewrite环节会将offset设置成0,所以在返回给应用前需要跳过前offset记录。
org.apache.shardingsphere.sharding.merge.dql.pagination.LimitDecoratorMergedResult

/**
 * Decorator merged result for limit pagination.
 */
public final class LimitDecoratorMergedResult extends DecoratorMergedResult {
    
    private final PaginationContext pagination;
    
    private final boolean skipAll;
    
    private int rowNumber;
    
    public LimitDecoratorMergedResult(final MergedResult mergedResult, final PaginationContext pagination) throws SQLException {
        super(mergedResult);
        this.pagination = pagination;
        skipAll = skipOffset();
    }

    private boolean skipOffset() throws SQLException {
        for (int i = 0; i < pagination.getActualOffset(); i++) {
            if (!getMergedResult().next()) { //如果结果集总概述小于offset值,设置skipAll为true,表示跳过所有结果集,后续next()直接返回false
                return true;
            }
        }
        rowNumber = 0;
        return false;
    }
    
    @Override
    public boolean next() throws SQLException {
        if (skipAll) {
            return false;
        }
        if (!pagination.getActualRowCount().isPresent()) {
            return getMergedResult().next();
        }
//由于改写了offset,多个数据节点返回的结果集总数大于SQL中指定的RowCount,因此在next()操作时要记录当前已返回的记录总数rowNumber,同时要判断该值不能大于SQL指定的RowCount
        return ++rowNumber <= pagination.getActualRowCount().get() && getMergedResult().next();
    }
}

二、 回到MergeEntry的decorate方法,该方法遍历拿到注册的ResultDecoratorEngine,依次调用其newInstance创建ResultDecorator实例,然后执行其decorate方法,对第一步生成的MergedResult实例进行二次处理。ResultDecoratorEngine接口目前实现类只有org.apache.shardingsphere.encrypt.merge.EncryptResultDecoratorEngine

/**
 * Result decorator engine for encrypt.
 */
public final class EncryptResultDecoratorEngine implements ResultDecoratorEngine<EncryptRule> {
    
    @Override
    public ResultDecorator newInstance(final DatabaseType databaseType, final SchemaMetaData schemaMetaData, 
                                       final EncryptRule encryptRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
        if (sqlStatementContext instanceof SelectStatementContext) {
            return new EncryptDQLResultDecorator(
                    new EncryptorMetaData(schemaMetaData, encryptRule, (SelectStatementContext) sqlStatementContext), properties.<Boolean>getValue(ConfigurationPropertyKey.QUERY_WITH_CIPHER_COLUMN));
        } 
        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
            return new EncryptDALResultDecorator();
        }
        return new TransparentResultDecorator();
}
..
}

可以看到其创建了org.apache.shardingsphere.encrypt.merge.dql.EncryptDQLResultDecorator实例


/**
 * DQL result decorator for encrypt.
 */
@RequiredArgsConstructor
public final class EncryptDQLResultDecorator implements ResultDecorator {
    
    private final EncryptorMetaData encryptorMetaData;
    
    private final boolean queryWithCipherColumn;
    
    @Override
    public MergedResult decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) {
        return new EncryptMergedResult(encryptorMetaData, new TransparentMergedResult(queryResult), queryWithCipherColumn);
    }
    
    @Override
    public MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) {
        return new EncryptMergedResult(encryptorMetaData, mergedResult, queryWithCipherColumn);
    }
}

decorate方法则直接创建了EncryptMergedResult实例返回,该类逻辑也很清晰,实现了MergedResult接口,在getValue方法中,先判断是否需要进行解密,如果需要则查询对应的加密器对象,然后执行解密,然后返回。

org.apache.shardingsphere.encrypt.merge.dql.EncryptMergedResult

/**
 * Merged result for encrypt.
 */
@RequiredArgsConstructor
public final class EncryptMergedResult implements MergedResult {
    
    private final EncryptorMetaData metaData;
    
    private final MergedResult mergedResult;
    
    private final boolean queryWithCipherColumn;
    
    @Override
    public boolean next() throws SQLException {
        return mergedResult.next();
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        if (!queryWithCipherColumn) {
            return mergedResult.getValue(columnIndex, type);
        }
        Optional<Encryptor> encryptor = metaData.findEncryptor(columnIndex);
        if (!encryptor.isPresent()) {
            return mergedResult.getValue(columnIndex, type);
        }
        String ciphertext = (String) mergedResult.getValue(columnIndex, String.class);
        return null == ciphertext ? null : encryptor.get().decrypt(ciphertext);
    }
    

关于加密的源码限于篇幅,这里不展开分析。

归并引擎执行流程

官网文档对归并引擎画了一张流程图:

其它关于归并引擎原理的更多介绍可参见官网介绍https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/merge/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342