一、一致性Hash负载均衡原理
缓存服务器集群如下:
现需将对象Object存入到缓存服务器中,现在有4台服务器,存入到哪台呢,也就是说需要定义一个规则来确定选取的服务器。
假设Object的数据结构如下:
class Object {
private String id;
private String name;
......
}
普通Hash算法
缓存服务器集群为这样的集合S = {A,B,C,D},选择服务器则也就是对这个集合进行取样。
随机取样是一种方式,但是随机取样影响查找的性能。随机获取一台服务器然后将Object存入,应用程序中通过id从缓存服务器查找Object,这种方式是无法第一时间确定其所在的服务器的,需要遍历集群中的所有服务器,然后比对查找出来的对象的id,才能获得查找的Object。
数据结构中的哈希表可以解决查找的问题。S集合使用线性列表方式存储,这样每台服务器相对应的都有个编号,对于上面的4台服务器来说,A的编号为0,其余的服务器编号依此类推。这样的话确定了编号,就可以确定选择的服务器。这个线性列表就是一张哈希表。通过下面的公式来确定对象Object存入到哪个编号的服务器中:
hash(id) % N
N为集群中服务器的数量。
这样通过id进行查找的时候可以非常快的确定Object所在的服务器。进而从这台服务器中获取Object。
不过这样做存在一个问题,从上面的公式中可以看到,服务器编号的确定跟集群中服务器的数量是有关系的,如果N变化了,那么计算出来的编号就会发生变化。
增加了一台服务器编号计算变为:
hash(id) %(N + 1)
减少了一台服务器编号计算变为:
hash(id) %(N - 1)
N的变化会导致相同的id前后计算出来的编号不一样,这样会带来什么问题呢?
问题就是:当前集群中服务器数量为N,存入Object对象,确定编号为0,也就是编号为0的服务器存放着Object对象,现在增加了一台服务器,也就是当前集群中服务器的数量为N+1,这时候通过id进行查找,重新计算编号后得到的编号为1,编号为1的这台服务器是没有Object对象数据的,然后查找结果报告的是无数据,也就是所谓的缓存失效。我们知道缓存的引入减少对数据库的请求,提升应用的性能,现在因为缓存服务器的增加,大量根据id进行查找的请求出现缓存失效的表现,势必会直接去请求数据库,导致数据库访问压力增大,这就是缓存雪崩现象。
一致性Hash算法的引入就是为了解决这种普通Hash算法存在的问题。
一致性Hash算法
在上面通过哈希函数对id进行哈希,然后对服务器数量进行求余会受到服务器数量的影响,需要寻求另外一种解决方式。
先来看看对id的哈希:
hash(id)
通过这个哈希函数计算出来的哈希码通常都是一个整型数值,一般是4个字节,也就是32位。取无符号表示,4个字节的整型的取值范围为0~2^32-1。也就是说任何的对象通过哈希函数计算后得到的哈希码的数值都会在这个区间中。
将这个区间内的点组成连接成环,如下所示:
现在有4个对象Object1~Object4, 对应的id为id1~id4,将id1 ~ id4这4个id映射到环中,先进行哈希计算:
h1 = hash(id1);
h2 = hash(id2);
h3 = hash(id3);
h4 = hash(id4);
映射后如下图所示:
接下来取服务器的某种标识,然后将3台缓存服务器也映射到这个环中,先进行哈希计算:
c1 = hash(cache1);
c2 = hash(cache2);
c3 = hash(cache3);
映射后的环如下所示:
现在id1~id4和Cache1,Cache2,Cache3都被映射到了环中。回过头来看一下我们到底要做什么,我们要做的是确定id1 ~ id4分别被分配到Cache1~Cache3的哪个中,也就是确定id和Cache的分配关系。而在环中h节点可以代表id,c节点可以代表Cache,那么确定了h和c的对应关系,那么就间接地确定了id和Cache的关系。
如何确定h和c的对应关系呢?可以这样理解,把h当做一个人,环为它查找路线,它沿着环开始走,寻找c节点,找到的c节点收入囊中,即完成了h和c的对应。如果找到的c节点代表的Cache服务器下线,那么继续从这个节点出发继续寻找下一个要对应的c节点。
在上图中h的查找我们采用逆时针行走方式,最终的对应关系如下所示:
通过上面的操作则有:
- Object1被存入Cache1
- Object2被存入如Cache3
- Object3被存入Cache3
- Object4被存入Cache2
缓存服务器下线
现在Cache2服务器下线了,根据上面的描述,最终的查找效果如下所示:
Cache2下线,那么Cache2中的数据就失效了,通过id4查找会出现缓存失效,应用程序此时会对缓存失效进行处理,重新从数据库或者其他地方获取Object4对象,然后试图重新将Object4放入到缓存服务器中,放入到哪台呢?还是根据上面描述的原理,从h4节点出发,查找c节点,找到的是c3节点,则将Object4重新放入到Cache3这台服务器中。从这里可以看到,Cache2服务器的下线,只会影响到这台服务器上的缓存数据,并不会对其他缓存服务器上的数据造成影响。这和普通Hash算法的表现是不同的,普通Hash算法会影响其他缓存服务器上的数据。
增加缓存服务器
增加Cache4,c4节点落在h2和h3之间,此时根据id2进行查找,定位到h2节点,从h2出发寻找对应的c节点,未增加之前找到的是c3节点,增加之后找到的是c4节点,c4节点代表的缓存服务器Cache4并没有Object2数据,那么应用程序从数据库或其他地方获取Object2数据然后重新放入到Cache2中,Cache3中的Object2此时就是无效的。可以看出增加Cache4服务器,只会影响到Cache2和Cache4之间的h节点代表的数据。
增加Cache4服务器后,最终的查找效果如下:
虚拟节点
先看一下下面的环:
现在只有两台缓存服务器Cache1和Cache2,根据上面描述,h和c的对应关系如下:
从图可以看到,数据大部分都被放入到了Cache2这台缓存服务器。也就是说当缓存服务器比较少的情况下,会出现某一台缓存服务器大量缓存数据的情况,也就是说缓存分配不均匀。
如何解决这种情况呢?一致性Hash算法引入了"虚拟节点"这种解决方案。
虚拟节点就是缓存服务器的副本,每一个缓存服务器都会在环中有数个相对应的虚拟节点。当增加缓存服务器的时候,相应地就会在缓存创建数个相对应的虚拟机节点;当删除缓存服务器的时候就会同时从环中移除相对应的虚拟节点。
如下图所示,现在有两台缓存服务器Cache1和Cache2,引入虚拟节点,每台缓存服务器对应有两个虚拟节点,那么环中就会有4个虚拟节点。c1.1和c1.2代表的是Cache1,c2.1和c2.2代表的就是Cache2。
引入虚拟节点后,h和c的对应关系如下所示:
从上图可以看到,此时缓存的数据是均匀分配的。
虚拟节点的引入会要求虚拟节点和缓存服务器有映射关系,找到虚拟节点后,通过映射关系就可以确定缓存服务器。
参考文章:https://www.codeproject.com/Articles/56138/Consistent-hashing
二、一致性Hash负载均衡算法实现
1. Hash函数
要将对象和服务器映射到Hash环中,需要计算出来哈希码,这就需要有Hash函数来完成,也就是关系到使用的哈希算法。使用一个好的哈希算法是很重要的,为什么这么说呢,拿我们上面提到的缓存服务来说,一个完美的解决方案是需要数据分配的平衡,假如Hash环的映射是这样的:
Hash码数值落在一个小区间内,出现Hash码聚集情况,那么从上图可以看到缓存数据全部由c3节点的服务器存储,出现数据分配不平衡。那么就需要一个好的哈希处理使得哈希码在环中的分配尽可能得分散,类似这样:
上面说到过,环中数值点的取值范围为[0,2^32-1],也就是说我们通过Hash函数计算出来的这些哈希码数值应该避免集中在某一小区间范围内。
Hash算法对于一致性Hash负载均衡的作用可见一斑,而写出好的适用于一致性Hash负载均衡的Hash算法是需要些技术能力的,这里不研究如何写,而是查阅已有的实现方式。
xmemcached:哈希函数
xmemcached是memcached的java版本的客户端。它其中包含了一致性Hash算法的实现。
网上内容摘抄:Memcached在实现分布集群部署时,Memcached服务端的之间是没有通讯的,服务端是伪分布式,实现分布式是由客户端实现的,客户端实现了分布式算法把数据保存到不同的Memcached服务端。
HashAlgorithm.java
package net.rubyeye.xmemcached;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.CRC32;
import net.rubyeye.xmemcached.exception.MemcachedClientException;
import net.rubyeye.xmemcached.utils.ByteUtils;
/**
* Known hashing algorithms for locating a server for a key. Note that all hash algorithms return
* 64-bits of hash, but only the lower 32-bits are significant. This allows a positive 32-bit number
* to be returned for all cases.
*/
public enum HashAlgorithm {
/**
* Native hash (String.hashCode()).
*/
NATIVE_HASH,
/**
* CRC32_HASH as used by the perl API. This will be more consistent both across multiple API users
* as well as java versions, but is mostly likely significantly slower.
*/
CRC32_HASH,
/**
* FNV hashes are designed to be fast while maintaining a low collision rate. The FNV speed allows
* one to quickly hash lots of data while maintaining a reasonable collision rate.
*
* @see http://www.isthe.com/chongo/tech/comp/fnv/
* @see http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash
*/
FNV1_64_HASH,
/**
* Variation of FNV.
*/
FNV1A_64_HASH,
/**
* 32-bit FNV1.
*/
FNV1_32_HASH,
/**
* 32-bit FNV1a.
*/
FNV1A_32_HASH,
/**
* MD5-based hash algorithm used by ketama.
*/
KETAMA_HASH,
/**
* From mysql source
*/
MYSQL_HASH,
ELF_HASH,
RS_HASH,
/**
* From lua source,it is used for long key
*/
LUA_HASH,
ELECTION_HASH,
/**
* The Jenkins One-at-a-time hash ,please see http://www.burtleburtle.net/bob/hash/doobs.html
*/
ONE_AT_A_TIME;
private static final long FNV_64_INIT = 0xcbf29ce484222325L;
private static final long FNV_64_PRIME = 0x100000001b3L;
private static final long FNV_32_INIT = 2166136261L;
private static final long FNV_32_PRIME = 16777619;
/**
* Compute the hash for the given key.
*
* @return a positive integer hash
*/
public long hash(final String k) {
long rv = 0;
switch (this) {
case NATIVE_HASH:
rv = k.hashCode();
break;
case CRC32_HASH:
// return (crc32(shift) >> 16) & 0x7fff;
CRC32 crc32 = new CRC32();
crc32.update(ByteUtils.getBytes(k));
rv = crc32.getValue() >> 16 & 0x7fff;
break;
case FNV1_64_HASH: {
// Thanks to pierre@demartines.com for the pointer
rv = FNV_64_INIT;
int len = k.length();
for (int i = 0; i < len; i++) {
rv *= FNV_64_PRIME;
rv ^= k.charAt(i);
}
}
break;
case FNV1A_64_HASH: {
rv = FNV_64_INIT;
int len = k.length();
for (int i = 0; i < len; i++) {
rv ^= k.charAt(i);
rv *= FNV_64_PRIME;
}
}
break;
case FNV1_32_HASH: {
rv = FNV_32_INIT;
int len = k.length();
for (int i = 0; i < len; i++) {
rv *= FNV_32_PRIME;
rv ^= k.charAt(i);
}
}
break;
case FNV1A_32_HASH: {
rv = FNV_32_INIT;
int len = k.length();
for (int i = 0; i < len; i++) {
rv ^= k.charAt(i);
rv *= FNV_32_PRIME;
}
}
break;
case ELECTION_HASH:
case KETAMA_HASH:
byte[] bKey = computeMd5(k);
rv = (long) (bKey[3] & 0xFF) << 24 | (long) (bKey[2] & 0xFF) << 16
| (long) (bKey[1] & 0xFF) << 8 | bKey[0] & 0xFF;
break;
case MYSQL_HASH:
int nr2 = 4;
for (int i = 0; i < k.length(); i++) {
rv ^= ((rv & 63) + nr2) * k.charAt(i) + (rv << 8);
nr2 += 3;
}
break;
case ELF_HASH:
long x = 0;
for (int i = 0; i < k.length(); i++) {
rv = (rv << 4) + k.charAt(i);
if ((x = rv & 0xF0000000L) != 0) {
rv ^= x >> 24;
rv &= ~x;
}
}
rv = rv & 0x7FFFFFFF;
break;
case RS_HASH:
long b = 378551;
long a = 63689;
for (int i = 0; i < k.length(); i++) {
rv = rv * a + k.charAt(i);
a *= b;
}
rv = rv & 0x7FFFFFFF;
break;
case LUA_HASH:
int step = (k.length() >> 5) + 1;
rv = k.length();
for (int len = k.length(); len >= step; len -= step) {
rv = rv ^ (rv << 5) + (rv >> 2) + k.charAt(len - 1);
}
break;
case ONE_AT_A_TIME:
try {
int hash = 0;
for (byte bt : k.getBytes("utf-8")) {
hash += (bt & 0xFF);
hash += (hash << 10);
hash ^= (hash >>> 6);
}
hash += (hash << 3);
hash ^= (hash >>> 11);
hash += (hash << 15);
rv = hash;
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("Hash function error", e);
}
break;
default:
assert false;
}
return rv & 0xffffffffL; /* Convert to unsigned 32-bits */
}
private static ThreadLocal<MessageDigest> md5Local = new ThreadLocal<MessageDigest>();
/**
* Get the md5 of the given key.
*/
public static byte[] computeMd5(String k) {
MessageDigest md5 = md5Local.get();
if (md5 == null) {
try {
md5 = MessageDigest.getInstance("MD5");
md5Local.set(md5);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
}
md5.reset();
md5.update(ByteUtils.getBytes(k));
return md5.digest();
}
// public static void main(String[] args) {
// HashAlgorithm alg=HashAlgorithm.CRC32_HASH;
// long h=0;
// long start=System.currentTimeMillis();
// for(int i=0;i<100000;i++)
// h=alg.hash("MYSQL_HASH");
// System.out.println(System.currentTimeMillis()-start);
// }
}
Dubbo:哈希函数
/**
* ConsistentHashLoadBalance
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
// 代码省略
......
private static final class ConsistentHashSelector<T> {
// 代码省略
......
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
2. 环存储数据结构
环由[0, 2^32-1]这个区间内的整数值组成。体现在程序中就是用一种数据结构存储这些值。
比如说用列表来存储,将服务器标识通过Hash函数计算后得到的哈希码存入到列表中,类似这样:
现在这个列表就表示了哈希码环。现在假设对象标识经过Hash函数计算后得到的哈希码值87。那么现在h=87,从环中找c点。如何确定c点呢?观察一下哈希码环,我们可以发现顺时针行走,哈希码值越来越小;逆时针行走哈希码值越来越大,而上面我们说到确定了h点后,逆时针行走查找c点,既然是逆时针行走那么就是找第一个大于h点的c,也就是说从列表中查找第一个大于h的元素。
满足这个需求的实现方法当然有很多了,这里有一种方式,就是先对列表进行从小到大排序,排序后列表结构如下:
循环列表进行查找,第一个大于h的点就是88。查找涉及到时间复杂度,这种方式需要遍历列表,在查找性能上并不是最好的。
数据有序并且查找的时间复杂度小。使用Java容器类中的TreeMap比较合适。
3. 代码实现
参考Dubbo的ConsistentHashLoadBalance类
ConsistentHashLoadBalancer.java
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
* @Author rocky.hu
* @Date 2019-04-20 20:34
*/
public class ConsistentHashLoadBalancer implements LoadBalancerStrategy<Server> {
private CandidateSelector<Server> candidateSelector;
@Override
public Server choose(List<Server> candidates) {
return null;
}
public Server choose(List<Server> candidates, String key) {
int identityHashCode = System.identityHashCode(candidates);
if (candidateSelector == null || candidateSelector.identityHashCode != identityHashCode) {
candidateSelector = new CandidateSelector<Server>(candidates, identityHashCode);
}
return candidateSelector.select(key);
}
private static final class CandidateSelector<T> {
// 引入虚拟节点概念,此属性表示Hash环中总的虚拟节点数
private final TreeMap<Long, Server> virtualCandidates;
// 每台真实服务器节点的虚拟节点数,这个值可做成可配置化的
private final int replicaNumber = 160;
// 服务器列表的Hash码,做缓存作用,用来判断服务器列表长度的变化
private final int identityHashCode;
CandidateSelector(List<Server> candidates, int identityHashCode) {
this.virtualCandidates = new TreeMap<Long, Server>();
this.identityHashCode = identityHashCode;
// 将服务器节点映射到Hash环中
for (Server server : candidates) {
String address = server.getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualCandidates.put(m, server);
}
}
}
}
public Server select(String key) {
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private Server selectForKey(long hash) {
// 使用TreeMap的ceilingEntry方法返回键值大于或等于的指定键的Entry(相当于Hash环逆时针行走查找服务器节点)
Map.Entry<Long, Server> entry = virtualCandidates.ceilingEntry(hash);
if (entry == null) {
entry = virtualCandidates.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
Server.java
/**
* @Author rocky.hu
* @Date 2019-04-21 00:47
*/
public class Server {
private String address;
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}