最近做 RPC 项目的时候,会涉及到一些负载均衡算法,这里简单总结一下常见的负载均衡算法。
首先先定义了一个LoadBalance
接口,并定义了一个AbstractLoadBalance
抽象类。
public interface LoadBalance {
/**
* Choose one from the list of existing service addresses list
*
* @param serviceUrlList Service address list
* @param rpcRequest
* @return target service address
*/
String selectServiceAddress(List<String> serviceUrlList, List<Integer> weights, RpcRequest rpcRequest);
}
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public String selectServiceAddress(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
// 如果服务地址列表为空,直接返回null
if (CollectionUtil.isEmpty(serviceAddresses)) {
return null;
}
// 如果目标服务只有一个,直接返回
if (serviceAddresses.size() == 1) {
return serviceAddresses.get(0);
}
return doSelect(serviceAddresses, weights, rpcRequest);
}
protected abstract String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest);
}
具体的负载均衡算法需要继承AbstractLoadBalance
,并实现doSelect
方法。
随机负载均衡
RandomLoadBalance
顾名思义,即从多个节点中随机选取一个节点进行访问。这种方式最大的优点就是简单,但是当请求数量较少时,随机性可能不强,可能会出现单实例节点负载过大的情况。当请求数量很大时,每个实例节点接受的请求数量会接近于均衡,效果较好。
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
Random random = new Random();
return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
}
}
权重随机负载均衡
WeighedRandomLoadBalance
在随机负载均衡的基础上,加上了权重,每个服务对应有一个权重。
比如A服务权重为7,B服务权重为3,那么就会将其映射到 [0,10)
的长度中,生成一个随机数,如果在[0,7)
则会负载均衡到服务A,如果在[7, 10)
,则会负载均衡到服务B。
public class WeightedRandomLoadBalance extends AbstractLoadBalance {
@Override
protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
int[] weightsArray = new int[weights.size()];
int totalWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < weights.size(); i++) {
int weight = weights.get(i);
totalWeight += weight;
weightsArray[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < weights.size(); i++) {
if (offset < weightsArray[i]) {
return serviceAddresses.get(i);
}
}
}
return serviceAddresses.get(ThreadLocalRandom.current().nextInt(serviceAddresses.size()));
}
}
轮询负载均衡
RoundRobinLoadBalance
即轮转调度。假设当前服务有 3 个实例节点,第一次请求发送给 A 节点,第二次请求发送给 B 节点,第三次请求发送给 C 节点,那么第四次请求就会再次发送给 A 节点,实现均衡请求的效果。
public class RoundRobinLoadBalance extends AbstractLoadBalance {
private static AtomicInteger index = new AtomicInteger(0);
@Override
protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
if (index.get() >= serviceAddresses.size()) {
index = new AtomicInteger(0);
}
String address = serviceAddresses.get(index.get());
index.incrementAndGet();
return address;
}
}
权重轮询负载均衡
WeightedRoundRobinLoadBalance
这个和权重随机负载均衡有区别。
对于权重随机负载均衡,A的权重是7,B的权重是3,但并不是说10次,就恰好有7次负载均衡到A,很肯能会大于7次,是一个概率问题。
对于权重轮询负载均衡,10次就一定会有7次是A,3次是B。
但是基础的权重轮询并不平滑,很可能会导致某段时间某个服务压力比较大,比如A的权重是7,B的权重是3,那么前7次就会负载均衡到A,后三次就会负载均衡到B,而不是一个交替轮询的方式。
所以我这里实现的权重轮询并不是一个特别好的实现。Dubbo 参考了 Nginx 实现了更好的权重轮询。
public class WeightedRoundRobinLoadBalance extends AbstractLoadBalance {
private static AtomicInteger index = new AtomicInteger(0);
@Override
protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
int[] weightsArray = new int[weights.size()];
int totalWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < weights.size(); i++) {
int weight = weights.get(i);
totalWeight += weight;
weightsArray[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// 权重不同
if (index.get() >= totalWeight) {
index = new AtomicInteger(0);
}
int weight = index.get();
for (int i = 0; i < weights.size(); i++) {
if (weight < weightsArray[i]) {
String address = serviceAddresses.get(i);
index.incrementAndGet();
return address;
}
}
}
// 权重相同,就和普通轮询一样
if (index.get() >= serviceAddresses.size()) {
index = new AtomicInteger(0);
}
String address = serviceAddresses.get(index.get());
index.incrementAndGet();
return address;
}
}
最小活跃数负载均衡
参考 Dubbo
初始状态下所有服务提供者的活跃数均为 0(每个服务提供者的中特定方法都对应一个活跃数,我在后面的源码中会提到),每收到一个请求后,对应的服务提供者的活跃数 +1,当这个请求处理完之后,活跃数 -1。
因此,Dubbo 就认为谁的活跃数越少,谁的处理速度就越快,性能也越好,这样的话,我就优先把请求给活跃数少的服务提供者处理。
如果有多个服务提供者的活跃数相等怎么办?
很简单,那就再走一遍 RandomLoadBalance
。
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
int[] weights = new int[length];
int totalWeight = 0;
int firstWeight = 0;
boolean sameWeight = true;
// 这个 for 循环的主要作用是遍历 invokers 列表,找出活跃数最小的 Invoker
// 如果有多个 Invoker 具有相同的最小活跃数,还会记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 获取 invoker 对应的活跃(active)数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
// 如果有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
// 这里的处理方式就和 RandomLoadBalance 一致了
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
活跃数是通过 RpcStatus
中的一个 ConcurrentMap
保存的,根据 URL 以及服务提供者被调用的方法的名称,我们便可以获取到对应的活跃数。也就是说服务提供者中的每一个方法的活跃数都是互相独立的。
public class RpcStatus {
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS =
new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.computeIfAbsent(uri, k -> new ConcurrentHashMap<>());
return map.computeIfAbsent(methodName, k -> new RpcStatus());
}
public int getActive() {
return active.get();
}
}
一致性Hash负载均衡
ConsistentHashLoadBalance
我们可以从一致性 Hash 环的原理讲起!
我们可以看到如图1「圆环」中存在有 3 个节点,分别为 NodeA / Node B / Node C,4 个请求,分别为 Req 1 / Req 2 / Req 3 / Req 4。
为什么叫这个「圆环」为「一致性 Hash 环」呢?这是因为我们要对每一个节点根据 Hash 算法计算得到一个 Hash 值,并将其映射到圆环中的某一个位置。对于请求也是如此,根据参数来计算具体的 Hash 值,也映射到圆环中对应的位置。
接下来的事情就很简单啦,每一个请求沿着当前圆环 顺时针 寻找,找到的第一个节点就是对应的处理请求节点。
如图对应关系为,Node A 处理 Req 3 和 Req 4,Node B 处理 Req 1,Node C 处理 Req 2。
但是一致性 Hash 环容易造成一个问题,如图2。
当服务节点过少时,节点 Hash 值映射到圆环的位置可能聚集于某一处,容易因为节点分布不均匀而造成请求均衡问题,即「数据倾斜」。
在图中,Node A 承担了大部分请求,Node C 只承担了一个请求,Node B 一个都没有。
为此,我们需要引入「虚拟节点」解决这一问题。
图中黄色节点对应的就是「虚拟节点」,起到均衡请求、避免数据倾斜的作用。
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();
@Override
protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
int identityHashCode = System.identityHashCode(serviceAddresses);
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.getRpcServiceName();
ConsistentHashSelector selector = selectors.get(rpcServiceName);
// check for updates
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}
return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
}
static class ConsistentHashSelector {
private final TreeMap<Long, String> virtualInvokers;
private final int identityHashCode;
ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;
for (String invoker : invokers) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(invoker + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
static byte[] md5(String key) {
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
md.update(bytes);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
return md.digest();
}
static long hash(byte[] digest, int idx) {
return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
}
public String select(String rpcServiceKey) {
byte[] digest = md5(rpcServiceKey);
return selectForKey(hash(digest, 0));
}
public String selectForKey(long hashCode) {
Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}
}
评论区