侧边栏壁纸
  • 累计撰写 27 篇文章
  • 累计创建 42 个标签
  • 累计收到 34 条评论

目 录CONTENT

文章目录

常见负载均衡算法

miykah
2023-08-29 / 1 评论 / 0 点赞 / 64 阅读 / 14853 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-12-11,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

最近做 RPC 项目的时候,会涉及到一些负载均衡算法,这里简单总结一下常见的负载均衡算法。

首先先定义了一个LoadBalance 接口,并定义了一个AbstractLoadBalance 抽象类。

public interface LoadBalance {
    /**
     * Choose one from the list of existing service addresses list
     *
     * @param serviceUrlList Service address list
     * @param rpcRequest
     * @return target service address
     */
    String selectServiceAddress(List<String> serviceUrlList, List<Integer> weights,  RpcRequest rpcRequest);
}
public abstract class AbstractLoadBalance implements LoadBalance {
    @Override
    public String selectServiceAddress(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
        // 如果服务地址列表为空,直接返回null
        if (CollectionUtil.isEmpty(serviceAddresses)) {
            return null;
        }
        // 如果目标服务只有一个,直接返回
        if (serviceAddresses.size() == 1) {
            return serviceAddresses.get(0);
        }
        return doSelect(serviceAddresses, weights, rpcRequest);
    }
​
    protected abstract String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest);
​
}

具体的负载均衡算法需要继承AbstractLoadBalance ,并实现doSelect 方法。

随机负载均衡

RandomLoadBalance

顾名思义,即从多个节点中随机选取一个节点进行访问。这种方式最大的优点就是简单,但是当请求数量较少时,随机性可能不强,可能会出现单实例节点负载过大的情况。当请求数量很大时,每个实例节点接受的请求数量会接近于均衡,效果较好。

public class RandomLoadBalance extends AbstractLoadBalance {
    @Override
    protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
        Random random = new Random();
        return serviceAddresses.get(random.nextInt(serviceAddresses.size()));
    }
}

权重随机负载均衡

WeighedRandomLoadBalance

在随机负载均衡的基础上,加上了权重,每个服务对应有一个权重。

比如A服务权重为7,B服务权重为3,那么就会将其映射到 [0,10) 的长度中,生成一个随机数,如果在[0,7) 则会负载均衡到服务A,如果在[7, 10) ,则会负载均衡到服务B。

public class WeightedRandomLoadBalance extends AbstractLoadBalance {
​
    @Override
    protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
        int[] weightsArray = new int[weights.size()];
        int totalWeight = 0;
        boolean sameWeight = true;
​
        for (int i = 0; i < weights.size(); i++) {
            int weight = weights.get(i);
            totalWeight += weight;
            weightsArray[i] = totalWeight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
​
        if (totalWeight > 0 && !sameWeight) {
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < weights.size(); i++) {
                if (offset < weightsArray[i]) {
                    return serviceAddresses.get(i);
                }
            }
        }
​
        return serviceAddresses.get(ThreadLocalRandom.current().nextInt(serviceAddresses.size()));
    }
}

轮询负载均衡

RoundRobinLoadBalance

即轮转调度。假设当前服务有 3 个实例节点,第一次请求发送给 A 节点,第二次请求发送给 B 节点,第三次请求发送给 C 节点,那么第四次请求就会再次发送给 A 节点,实现均衡请求的效果。

public class RoundRobinLoadBalance extends AbstractLoadBalance {
​
    private static AtomicInteger index = new AtomicInteger(0);
​
    @Override
    protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
        if (index.get() >= serviceAddresses.size()) {
            index = new AtomicInteger(0);
        }
        String address = serviceAddresses.get(index.get());
        index.incrementAndGet();
        return address;
    }
​
}

权重轮询负载均衡

WeightedRoundRobinLoadBalance

这个和权重随机负载均衡有区别。

  • 对于权重随机负载均衡,A的权重是7,B的权重是3,但并不是说10次,就恰好有7次负载均衡到A,很肯能会大于7次,是一个概率问题。

  • 对于权重轮询负载均衡,10次就一定会有7次是A,3次是B。

但是基础的权重轮询并不平滑,很可能会导致某段时间某个服务压力比较大,比如A的权重是7,B的权重是3,那么前7次就会负载均衡到A,后三次就会负载均衡到B,而不是一个交替轮询的方式。

所以我这里实现的权重轮询并不是一个特别好的实现。Dubbo 参考了 Nginx 实现了更好的权重轮询。

public class WeightedRoundRobinLoadBalance extends AbstractLoadBalance {
​
    private static AtomicInteger index = new AtomicInteger(0);
​
    @Override
    protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
        int[] weightsArray = new int[weights.size()];
        int totalWeight = 0;
        boolean sameWeight = true;
​
        for (int i = 0; i < weights.size(); i++) {
            int weight = weights.get(i);
            totalWeight += weight;
            weightsArray[i] = totalWeight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
​
        if (totalWeight > 0 && !sameWeight) {
            // 权重不同
            if (index.get() >= totalWeight) {
                index = new AtomicInteger(0);
            }
            int weight = index.get();
            for (int i = 0; i < weights.size(); i++) {
                if (weight < weightsArray[i]) {
                    String address = serviceAddresses.get(i);
                    index.incrementAndGet();
                    return address;
                }
            }
​
        }
​
        // 权重相同,就和普通轮询一样
        if (index.get() >= serviceAddresses.size()) {
            index = new AtomicInteger(0);
        }
        String address = serviceAddresses.get(index.get());
        index.incrementAndGet();
        return address;
    }
}

最小活跃数负载均衡

参考 Dubbo

初始状态下所有服务提供者的活跃数均为 0(每个服务提供者的中特定方法都对应一个活跃数,我在后面的源码中会提到),每收到一个请求后,对应的服务提供者的活跃数 +1,当这个请求处理完之后,活跃数 -1。

因此,Dubbo 就认为谁的活跃数越少,谁的处理速度就越快,性能也越好,这样的话,我就优先把请求给活跃数少的服务提供者处理。

如果有多个服务提供者的活跃数相等怎么办?

很简单,那就再走一遍 RandomLoadBalance

public class LeastActiveLoadBalance extends AbstractLoadBalance {
​
    public static final String NAME = "leastactive";
​
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        int leastActive = -1;
        int leastCount = 0;
        int[] leastIndexes = new int[length];
        int[] weights = new int[length];
        int totalWeight = 0;
        int firstWeight = 0;
        boolean sameWeight = true;
        // 这个 for 循环的主要作用是遍历 invokers 列表,找出活跃数最小的 Invoker
        // 如果有多个 Invoker 具有相同的最小活跃数,还会记录下这些 Invoker 在 invokers 集合中的下标,并累加它们的权重,比较它们的权重值是否相等
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 获取 invoker 对应的活跃(active)数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            int afterWarmup = getWeight(invoker, invocation);
            weights[i] = afterWarmup;
            if (leastActive == -1 || active < leastActive) {
                leastActive = active;
                leastCount = 1;
                leastIndexes[0] = i;
                totalWeight = afterWarmup;
                firstWeight = afterWarmup;
                sameWeight = true;
            } else if (active == leastActive) {
                leastIndexes[leastCount++] = i;
                totalWeight += afterWarmup;
                if (sameWeight && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
       // 如果只有一个 Invoker 具有最小的活跃数,此时直接返回该 Invoker 即可
        if (leastCount == 1) {
            return invokers.get(leastIndexes[0]);
        }
        // 如果有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
        // 这里的处理方式就和  RandomLoadBalance 一致了
        if (!sameWeight && totalWeight > 0) {
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }
}

活跃数是通过 RpcStatus 中的一个 ConcurrentMap 保存的,根据 URL 以及服务提供者被调用的方法的名称,我们便可以获取到对应的活跃数。也就是说服务提供者中的每一个方法的活跃数都是互相独立的。

public class RpcStatus {
​
    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS =
            new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
​
   public static RpcStatus getStatus(URL url, String methodName) {
        String uri = url.toIdentityString();
        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.computeIfAbsent(uri, k -> new ConcurrentHashMap<>());
        return map.computeIfAbsent(methodName, k -> new RpcStatus());
    }
    public int getActive() {
        return active.get();
    }
​
}

一致性Hash负载均衡

ConsistentHashLoadBalance

image-20230829120326367

我们可以从一致性 Hash 环的原理讲起!

我们可以看到如图1「圆环」中存在有 3 个节点,分别为 NodeA / Node B / Node C,4 个请求,分别为 Req 1 / Req 2 / Req 3 / Req 4。

为什么叫这个「圆环」为「一致性 Hash 环」呢?这是因为我们要对每一个节点根据 Hash 算法计算得到一个 Hash 值,并将其映射到圆环中的某一个位置。对于请求也是如此,根据参数来计算具体的 Hash 值,也映射到圆环中对应的位置。

接下来的事情就很简单啦,每一个请求沿着当前圆环 顺时针 寻找,找到的第一个节点就是对应的处理请求节点。

如图对应关系为,Node A 处理 Req 3 和 Req 4,Node B 处理 Req 1,Node C 处理 Req 2。

但是一致性 Hash 环容易造成一个问题,如图2。

当服务节点过少时,节点 Hash 值映射到圆环的位置可能聚集于某一处,容易因为节点分布不均匀而造成请求均衡问题,即「数据倾斜」

在图中,Node A 承担了大部分请求,Node C 只承担了一个请求,Node B 一个都没有。

为此,我们需要引入「虚拟节点」解决这一问题。

图中黄色节点对应的就是「虚拟节点」,起到均衡请求、避免数据倾斜的作用。

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    private final ConcurrentHashMap<String, ConsistentHashSelector> selectors = new ConcurrentHashMap<>();
​
    @Override
    protected String doSelect(List<String> serviceAddresses, List<Integer> weights, RpcRequest rpcRequest) {
        int identityHashCode = System.identityHashCode(serviceAddresses);
        // build rpc service name by rpcRequest
        String rpcServiceName = rpcRequest.getRpcServiceName();
        ConsistentHashSelector selector = selectors.get(rpcServiceName);
        // check for updates
        if (selector == null || selector.identityHashCode != identityHashCode) {
            selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
            selector = selectors.get(rpcServiceName);
        }
        return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
    }
​
    static class ConsistentHashSelector {
        private final TreeMap<Long, String> virtualInvokers;
​
        private final int identityHashCode;
​
        ConsistentHashSelector(List<String> invokers, int replicaNumber, int identityHashCode) {
            this.virtualInvokers = new TreeMap<>();
            this.identityHashCode = identityHashCode;
​
            for (String invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
​
        static byte[] md5(String key) {
            MessageDigest md;
            try {
                md = MessageDigest.getInstance("MD5");
                byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
                md.update(bytes);
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
​
            return md.digest();
        }
​
        static long hash(byte[] digest, int idx) {
            return ((long) (digest[3 + idx * 4] & 255) << 24 | (long) (digest[2 + idx * 4] & 255) << 16 | (long) (digest[1 + idx * 4] & 255) << 8 | (long) (digest[idx * 4] & 255)) & 4294967295L;
        }
​
        public String select(String rpcServiceKey) {
            byte[] digest = md5(rpcServiceKey);
            return selectForKey(hash(digest, 0));
        }
​
        public String selectForKey(long hashCode) {
            Map.Entry<Long, String> entry = virtualInvokers.tailMap(hashCode, true).firstEntry();
​
            if (entry == null) {
                entry = virtualInvokers.firstEntry();
            }
​
            return entry.getValue();
        }
    }
}

0

评论区