一致性hash算法
1.hash算法
先说一下hash算法,hash算法是将任意长度的二进制值映射为固定长度的二进制值。
在分布式系统中, 可以通过该算法计算哈希值
h = Hash(key)%n
Hash是一个字符串到正整数的hash映射函数, key是键值(例如服务器ip地址/唯一主机名), n是键的个数。每当改变服务器数量时, 都会使hash值改变,容错性和扩展性会极差。
2.一致性hash算法
hash环
一致性hash算法将2的32次方的hash空间组成一个首尾相连的圆环,然后把服务器ip地址/唯一主机名作为键进行hash得到一个唯一的hash值,该值就是该服务器在圆环上的位置。数据也通过hash得到一个唯一的hash值,然后把数据放进最近的服务器中(顺时针),如下图。
假如服务器C宕机了, 数据B就会被放在服务器A,其他服务器和数据都不会受到影响。
假如新增服务器D, 数据C会放在服务器D中,其他的都不变。
虚拟节点
在服务器节点太少时, 会有数据倾斜问题,即大部分数据在一个节点上。
为了解决这个问题,引入了虚拟节点。可以在ip地址/唯一主机名后面加上编号,使一台服务器算出多个hash值,在hash环上增加同一服务器节点,该节点就是虚拟节点;在服务器节点较少时也能实现数据均匀分布。
特点(百度百科)
- 平衡性:hash的结果应该平均分配到各个节点,这样从算法上解决了负载均衡问题
- 单调性:在新增或者删减节点时,不影响系统正常运行
- 分散性:数据应该分散地存放在分布式集群中的各个节点(节点自己可以有备份),不必每个节点都存储所有的数据
3.算法实现
package main
import (
"sync"
"strconv"
"hash/crc32"
"sort"
"fmt"
"errors"
)
// 声明切片类型
type units []uint32
// 返回切片长度
func (u units) Len() int {
return len(u)
}
// 对比两个数大小
func (u units)Less(i, j int) bool {
return u[i] < u[j]
}
// 切片中两个值的交换
func (u units)Swap(i, j int) {
u[i], u[j] = u[j], u[i]
}
// 创建结构体 保存一致性hash信息
type Consistent struct {
// hash环, key为hash值, value存放节点信息
circle map[uint32]string
// 排序的节点hash切片
sortHashes units
// 虚拟节点个数, 增加hash环数据平衡性
VirtualNode int
// map 读写锁
sync.RWMutex
}
// 创建一致性hash算法结构体, 设置默认节点数量
func NewConsistent() *Consistent {
return &Consistent{
//初始化变量
circle: make(map[uint32]string),
//设置虚拟节点个数
VirtualNode: 50,
}
}
// 自动生成key值
func (c *Consistent) generateKey(element string, index int) string {
// 副本key生成逻辑
return element + strconv.Itoa(index)
}
// 获取hash位置(hash值)
func (c *Consistent) hashKey(key string) uint32 {
if len(key) < 64 {
// 声明一个数组长度为64
var array [64]byte
// 拷贝数据到数组中
copy(array[:], key)
// (MurMurHash算法)使用IEEE多项式返回数据的CRC-32校验和
return crc32.ChecksumIEEE(array[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
// 更新排序, 方便查找
func (c *Consistent)updateSortedHashes() {
hashes := c.sortHashes[:0]
// 判断切片容量, 是否过大, 如果过大则重置
if cap(c.sortHashes)/(c.VirtualNode*4) > len(c.circle) {
hashes = nil
}
// 添加hashes
for K := range c.circle {
hashes = append(hashes, K)
}
// 对所有节点hash值进行排序, 方便之后进行二分查找
sort.Sort(hashes)
// 重新赋值
c.sortHashes = hashes
}
// 向hash环中添加节点
func (c *Consistent) Add(element string) {
// 加锁
c.Lock()
// 解锁
defer c.Unlock()
// 循环虚拟节点, 设置副本
for i := 0; i < c.VirtualNode; i++ {
// 根据生成的节点添加到hash
c.circle[c.hashKey(c.generateKey(element, i))] = element
}
// 更新排序
c.updateSortedHashes()
}
// 删除一个节点
func (c *Consistent)Remove(element string) {
c.Lock()
defer c.Unlock()
for i := 0; i < c.VirtualNode; i++ {
delete(c.circle, c.hashKey(c.generateKey(element, i)))
}
c.updateSortedHashes()
}
// 顺时针查找最近的节点
func (c *Consistent) search(key uint32) int {
// 查找算法
f := func(x int) bool {
return c.sortHashes[x] > key
}
// 使用 二分查找 算法来搜索指定切片满足条件最小值
i := sort.Search(len(c.sortHashes), f)
// 如果超出范围则设置i=0
if i >= len(c.sortHashes) {
i = 0
}
return i
}
// 根据数据提示获取最近服务器节点信息
func (c *Consistent)Get(name string) (string, error) {
// 添加锁
c.RLock()
// 解锁
defer c.RUnlock()
// 如果为零则返回error
if len(c.circle) == 0 {
return "", errors.New("Hash环没有数据")
}
// 计算hash值
key := c.hashKey(name)
// 查找最近节点
i := c.search(key)
return c.circle[c.sortHashes[i]], nil
}
// 测试
func main() {
// 添加服务器节点
hashConsistent := NewConsistent()
for i := 0; i < 5; i++ {
hashConsistent.Add("192.168.20." + strconv.Itoa(i))
}
//ip为key, value是访问次数
ipArray := make(map[string]int, 0)
// 数据测试
for i:=0; i<1000; i++ {
k, _ := hashConsistent.Get("hszz:hszz:hszz" + strconv.Itoa(i))
if _, ok := ipArray[k]; ok {
ipArray[k] += 1
} else {
ipArray[k] = 1
}
}
for k, v := range ipArray {
fmt.Println("ip:", k, "count:", v)
}
}