前言
平时在研究ES的分布式Doc(文档)写入操作时,我们已经知道对将要写入的Doc,ES首先会计算其应该写入到索引的哪个分片,然后在根据集群metaData中的路由信息判断此分片所在的ES节点,最后将写入请求发送到这个节点并完成最终的写入操作。写入流程说明如下:
源码分析
当前ES版本为5.6.16,确定待写入Doc的Shard编号的主要代码部分如下:
1. # TransportBulkAction.java
protected void doRun() throws Exception {
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
MetaData metaData = clusterState.metaData();
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
// 根据路由,找出doc写入的目标shard id
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
}
1. # OperationRouting.java
public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) {
return shards(clusterState, index, id, routing).shardsIt();
}
protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
int shardId = generateShardId(indexMetaData(clusterState, index), id, routing);
return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
}
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
TransportBulkAction类的doRun()方法中,ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); 这行代码获取最终Doc的ShardID信息。clusterService.operationRouting()方法返回OperationRouting对象,然后紧接着调用其indexShards(...)方法,接着进入shards(...)方法,最后可看到int shardId = generateShardId(indexMetaData(clusterState, index), id, routing); 这行代码。这里最终得到分片编号shardId,所以我们重点关注的逻辑就在generateShardId(...)方法中。generateShardId(...)方法接受indexMetaData(索引元数据)、id(文档Doc的id号,即为此次写入请求的id号)、routing(写入时自定义的routing信息)。下面我们重点看下generateShardId(...)方法内部的逻辑。
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
首先方法内声明了String类型的effectiveRouting与int类型的partitionOffset。ES在Doc写入操作时通常有两种方式:一种是指定routing,另一种是不指定routing。
指定routing的写入方式类似为:
POST my_index/doc?routing=tony
{
"name": "tony",
"age": 10
}
routing的设置可以使得写入的数据分布到当前索引下的具体的某些分片中,引入routing机制也是为了更好的搜索性能,使得遍历的分片范围可以进一步的缩小;当然同时要面临着数据分布倾斜的风险。在routing机制下ES提供了一个有意义的设置项index.routing_partition_size,此参数在索引创建时结合着routing一起使用。其意义是使得写入的数据能够集中的落入到routing_partition_size个分片集合中。比如索引my_index包含3个分片,若此时routing_partition_size的值设为2,那经过routing写入到my_index的数据只会落入其中的两个分片,而另一个会处于闲置状态。ES官网指出routing_partition_size的值通常设置为大于1且小于number_of_shards。
当写入时不带有routing机制(对应到代码routing==null, effectiveRouting=id),此时数据会经过hash(doc_id) % number_primary_shards的方式均匀的写入到各个主分片中;通过routing机制写入,想要达到数据分布均匀,则上一种计算公式就不能满足条件了,需要结合doc_id以及routing值重新计算。只是平时大部分的时候我们在写入ES时并没有指定routing,在ES内部处理上默认会把doc_id当做_routing,因此我们对hash(doc_id) % number_primary_shards这个公式比较熟悉。带有routing的写入,effectiveRouting被赋予routing值。接下来代码中会判断当前索引是否设置了routing_partition_size选项,若存在则partitionOffset = hash(doc_id) % routing_partition_size值,否则partitionOffset=0。接着到了calculateScaledShardId(...)方法,方法如下:
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
方法中两行代码,本质上对应着ES官网上的公式shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards;hash(_routing) + hash(_id) % routing_partition_size等价于Murmur3HashFunction.hash(effectiveRouting) + partitionOffset,然后再对num_primary_shards做取模运算。整个公式的意思即_routing字段用于计算索引内的分片子集,然后_id用于选择该分片子集内的一个分片。这样就完整的结合了routing与doc_id信息计算出具体的分片编号。认真分析代码我们会发现以下两点可疑的地方:
- 取模计算使用的是indexMetaData.getRoutingNumShards这个值,而不是number_of_primary对应的值
- 取模计算后接着又做了除以indexMetaData.getRoutingFactor的除法运算
为啥此处要这样做呢?经过代码注释与实践发现,这个其实是包含索引shrink功能的计算方法。索引shrink允许我们将一个索引由比如原来的8个分片,shrink成为4、2、1三种数量的分片索引,是一个比较有用的功能。关于factor,这里做个简单的说明,比如数字6,存在6、3、2、1四个因子(Factor)。从索引shrink的角度看factor,比如8个shards同时存在4、2、1三个factor(8意义不大),所以indexMetaData.getRoutingFactor的值获取的就是这个因子数。另外这里一个重要的知识点是假定一个包含m(偶数)个分片的索引A,经过shrink之后(假定shrink为m/2个分片,自然factor=2)变为索引B,但此时索引B的getRoutingNumShards值依然为m,而非m/2。有了这个知识点作铺垫之后,我们就理解了为啥整个公式的计算结果后面要除以indexMetaData.getRoutingFactor的值了。因为公式中除数getRoutingNumShards没有做同步的减小,因此中间的计算结果需要同步除以getRoutingFactor的值。通常索引的getRoutingFactor的值默认为1,这个能够理解,因为通常索引都是没有做shrink操作的。到此,我们就分析完了shardID的整个计算过程了,计算的本质没有变化,因为要考虑routing以及shrink的功能,所以计算公式稍微变得复杂了些。
小结
到此结合着代码,我们分析完了ES内部计算一个将要写入的Doc对应的分片编号的整个过程。计算的本质当然是为了使得数据能够均匀的分布在满足条件的每个分片上。为了友好的支持其他的功能,计算会综合考虑到其他的一些影响因素,比如shrink,routing。但计算的本质没有发生变化。对于routing与shrink功能,文章中没有贴出具体的详细的实践步骤,这块希望大家后面动手实践起来,同时也结合着代码一起研究起来,一起学习ES,一起进步。