背景
并发场景下,我们有很多因素需要考虑,比如:幂等,限流等。今天想讨论下单机限流,尽管市面上已经有很多成熟的限流方案,比如阿里巴巴的sentinal,今天使用guava cache来实现一套限流方案,比较简单
代码
package com.zoterap.javabasic.current;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.String.format;
/**
* 单机并发控制锁
*/
public class LocalCurrentLock {
private static final String SERVICE_A = "SERVICE_A";
private static final String SERVICE_B = "SERVICE_B";
/**
* 服务限流配置
*/
private static Map<String, CurrentConfig> configMap = Maps.newHashMap();
/**
* 描述缓存
* KEY: serviceName
* VALUE: 时间戳
*/
static Cache<String, Long> secondsLock =
CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(
new CacheLoader<String, Long>() {
@Override
public Long load(String key) {
/**
* 清除计数器
*/
counter.remove(key);
return null;
}
}
);
/**
* 并发技术器
* KEY: serviceName
* VALUE: 并发数量
*/
static Map<String, AtomicInteger> counter = Maps.newConcurrentMap();
public static void main(String[] args) {
init();
for (int i = 0; i < 20; i++) {
print(SERVICE_A, i, accessCheck(SERVICE_A));
//print(SERVICE_B, i, accessCheck(SERVICE_B));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static void init() {
/**
* 最大5QPS
*/
configMap.put(SERVICE_A, new CurrentConfig(SERVICE_A, TimeUnit.SECONDS, 5));
configMap.put(SERVICE_B, new CurrentConfig(SERVICE_B, TimeUnit.SECONDS, 3));
}
private static boolean accessCheck(String serviceName) {
if (secondsLock.getIfPresent(serviceName) == null) {
secondsLock.put(serviceName, System.currentTimeMillis());
counter.put(serviceName, new AtomicInteger(1));
return true;
} else {
Integer maxAmount = configMap.get(serviceName).getMaxAmount();
AtomicInteger count = counter.get(serviceName);
if (count.get() < maxAmount) {
counter.put(serviceName, new AtomicInteger(count.addAndGet(1)));
return true;
} else {
return false;
}
}
}
public static String getCurrentTime() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss SSS"));
}
public static void print(String serviceName, int reqNum, boolean result) {
System.out.println(format("%s[%d], 访问结果[%s], 当前时间[%s]",
serviceName,
reqNum,
result ? "成功" : "===被限流===",
getCurrentTime()));
}
@Data
@AllArgsConstructor
static class CurrentConfig {
/**
* 配置项编码
*/
private String code;
/**
* 并发单位
* 最小单位为秒
*/
private TimeUnit timeUnit;
/**
* 并发数量
*/
private Integer maxAmount;
}
}
结果
SERVICE_A[0], 访问结果[成功], 当前时间[2019-02-14 16:32:04 530]
SERVICE_A[1], 访问结果[成功], 当前时间[2019-02-14 16:32:04 662]
SERVICE_A[2], 访问结果[成功], 当前时间[2019-02-14 16:32:04 763]
SERVICE_A[3], 访问结果[成功], 当前时间[2019-02-14 16:32:04 864]
SERVICE_A[4], 访问结果[成功], 当前时间[2019-02-14 16:32:04 969]
SERVICE_A[5], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:05 071]
SERVICE_A[6], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:05 172]
SERVICE_A[7], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:05 277]
SERVICE_A[8], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:05 382]
SERVICE_A[9], 访问结果[成功], 当前时间[2019-02-14 16:32:05 498]
SERVICE_A[10], 访问结果[成功], 当前时间[2019-02-14 16:32:05 599]
SERVICE_A[11], 访问结果[成功], 当前时间[2019-02-14 16:32:05 704]
SERVICE_A[12], 访问结果[成功], 当前时间[2019-02-14 16:32:05 809]
SERVICE_A[13], 访问结果[成功], 当前时间[2019-02-14 16:32:05 911]
SERVICE_A[14], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:06 012]
SERVICE_A[15], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:06 113]
SERVICE_A[16], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:06 215]
SERVICE_A[17], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:06 319]
SERVICE_A[18], 访问结果[===被限流===], 当前时间[2019-02-14 16:32:06 422]
SERVICE_A[19], 访问结果[成功], 当前时间[2019-02-14 16:32:06 526]
代码缺陷
- 缺少并发场景的考虑
- 测试用例不完善