javadubbo源码分析

服务集群的概述概述

为了避免单点故障,现在的应用通常至少会部署在两台服务器上,这样就组成了集群。集群就是单机的多实例,在多个服务器上部署多个服务,每个服务就是一个节点,部署N个节点,处理业务的能力就提升 N倍(大约),这些节点的集合就叫做集群。

javadubbo源码分析(dubbo源码解析-高可用集群)(1)

管理控制台

目前的管理控制台已经发布0.1版本,结构上采取了前后端分离的方式,前端使用Vue和Vuetify分别作为Javascript框架和UI框架,后端采用Spring Boot框架。既可以按照标准的Maven方式进行打包,部署,也可以采用前后端分离的部署方式,方便开发,功能上,目前具备了服务查询,服务治理(包括Dubbo2.7中新增的治理规则)以及服务测试三部分内容。

Maven方式部署

  • 安装

git clone https://github.com/apache/dubbo-admin.git cd dubbo-admin mvn clean package cd dubbo-admin-distribution/target java -jar dubbo-admin-0.1.jar

  • 访问 http://localhost:8080

前后端分离部署

  • 前端

cd dubbo-admin-ui npm install npm run dev

  • 后端

cd dubbo-admin-server mvn clean package cd target java -jar dubbo-admin-server-0.1.jar

  • 访问 http://localhost:8081
  • 前后端分离模式下,前端的修改可以实时生效
环境搭建

集群调用存在的问题

  • 负载均衡
  • 集群容错
  • 服务治理
集群的调用过程调用过程

在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。

javadubbo源码分析(dubbo源码解析-高可用集群)(2)

集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。

Directory 的用途是保存 Invoker,可简单类比为 List<Invoker>。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

组件介绍
  • Directory:它代表多个Invoker,从methodInvokerMap提取,但是他的值是动态,例如注册中心的变更。
  • Router:负责从多个Invoker中按路由规则选出子集,例如应用隔离或读写分离或灰度发布等等
  • Cluster:将Directory中的多个Invoker伪装成一个Invoker 来容错,调用失败重试。
  • LoadBalance:从多个Invoker选取一个做本次调用,具体包含很多种负载均衡算法。
  • Invoker:Provider中的一个可调用接口。例如DemoService
集群容错

在分布式系统中,集群某个某些节点出现问题是大概率事件,因此在设计分布式RPC框架的过程中,必须要把失败作为设计的一等公民来对待。一次调用失败之后,应该如何选择对失败的选择策略,这是一个见仁见智的问题,每种策略可能都有自己独特的应用场景。因此,作为框架来说,应当针对不同场景提供多种策略,供用户进行选择。

在Dubbo设计中,通过Cluster这个接口的抽象,把一组可供调用的Provider信息组合成为一个统一的Invoker供调用方进行调用。经过路由规则过滤,负载均衡选址后,选中一个具体地址进行调用,如果调用失败,则会按照集群配置的容错策略进行容错处理。

内置集群容错方式

Dubbo默认内置了若干容错策略,如果不能满足用户需求,则可以通过自定义容错策略进行配置

Dubbo主要内置了如下几种策略:

  • Failover(失败自动切换)
  • Failsafe(失败安全)
  • Failfast(快速失败)
  • Failback(失败自动恢复)
  • Forking(并行调用)
  • Broadcast(广播调用)

这些名称比较相似,概念也比较容易混淆,下面逐一进行解释。

Failover(失败自动切换)

Failover是高可用系统中的一个常用概念,服务器通常拥有主备两套机器配置,如果主服务器出现故障,则自动切换到备服务器中,从而保证了整体的高可用性。

Dubbo也借鉴了这个思想,并且把它作为Dubbo默认的容错策略。当调用出现失败的时候,根据配置的重试次数,会自动从其他可用地址中重新选择一个可用的地址进行调用,直到调用成功,或者是达到重试的上限位置。

Dubbo里默认配置的重试次数是2,也就是说,算上第一次调用,最多会调用3次。

其配置方法,容错策略既可以在服务提供方配置,也可以服务调用方进行配置。而重试次数的配置则更为灵活,既可以在服务级别进行配置,也可以在方法级别进行配置。具体优先顺序为:

服务调用方方法级配置 > 服务调用方服务级配置 > 服务提供方方法级配置 > 服务提供方服务级配置

以XML方式为例,具体配置方法如下:

服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failover" retries="2" />

服务提供方,方法级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"cluster="failover"> <dubbo:method name="sayHello" retries="2" /> </dubbo:reference>

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failover" retries="1"/>

服务调用方,方法级配置:

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failover"> <dubbo:method name="sayHello" retries="3" /> </dubbo:reference>

Failover可以自动对失败进行重试,对调用者屏蔽了失败的细节,但是Failover策略也会带来一些副作用:

  • 重试会额外增加一下开销,例如增加资源的使用,在高负载系统下,额外的重试可能让系统雪上加霜。
  • 重试会增加调用的响应时间。
  • 某些情况下,重试甚至会造成资源的浪费。考虑一个调用场景,A->B->C,如果A处设置了超时100ms,再B->C的第一次调用完成时已经超过了100ms,但很不幸B->C失败,这时候会进行重试,但其实这时候重试已经没有意义,因此在A看来这次调用已经超时,A可能已经开始执行其他逻辑。
Failsafe(失败安全)

失败安全策略的核心是即使失败了也不会影响整个调用流程。通常情况下用于旁路系统或流程中,它的失败不影响核心业务的正确性。在实现上,当出现调用失败时,会忽略此错误,并记录一条日志,同时返回一个空结果,在上游看来调用是成功的。

应用场景,可以用于写入审计日志等操作。

具体配置方法:

服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failsafe" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failsafe"/>

其中服务调用方配置优先于服务提供方配置。

Failfast(快速失败)

某些业务场景中,某些操作可能是非幂等的,如果重复发起调用,可能会导致出现脏数据等。例如调用某个服务,其中包含一个数据库的写操作,如果写操作完成,但是在发送结果给调用方的过程中出错了,那么在调用发看来这次调用失败了,但其实数据写入已经完成。这种情况下,重试可能并不是一个好策略,这时候就需要使用到Failfast策略,调用失败立即报错。让调用方来决定下一步的操作并保证业务的幂等性。

具体配置方法:

服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failfast" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failfast"/>

其中服务调用方配置优先于服务提供方配置。

Failback(失败自动恢复)

Failback通常和Failover两个概念联系在一起。在高可用系统中,当主机发生故障,通过Failover进行主备切换后,待故障恢复后,系统应该具备自动恢复原始配置的能力。

Dubbo中的Failback策略中,如果调用失败,则此次失败相当于Failsafe,将返回一个空结果。而与Failsafe不同的是,Failback策略会将这次调用加入内存中的失败列表中,对于这个列表中的失败调用,会在另一个线程中进行异步重试,重试如果再发生失败,则会忽略,即使重试调用成功,原来的调用方也感知不到了。因此它通常适合于,对于实时性要求不高,且不需要返回值的一些异步操作。

具体配置方法:

服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failsafe" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failsafe"/>

其中服务调用方配置优先于服务提供方配置。

按照目前的实现,Failback策略还有一些局限,例如内存中的失败调用列表没有上限,可能导致堆积,异步重试的执行间隔无法调整,默认是5秒。

Forking(并行调用)

上述几种策略中,主要都是针对调用失败发生后如何进行弥补的角度去考虑的,而Forking策略则跟上述几种策略不同,是一种典型的用成本换时间的思路。即第一次调用的时候就同时发起多个调用,只要其中一个调用成功,就认为成功。在资源充足,且对于失败的容忍度较低的场景下,可以采用此策略。

具体配置方法:

服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="forking" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="forking"/>

其中服务调用方配置优先于服务提供方配置。

Broadcast(广播调用)

在某些场景下,可能需要对服务的所有提供者进行操作,此时可以使用广播调用策略。此策略会逐个调用所有提供者,只要任意有一个提供者出错,则认为此次调用出错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

具体配置方法:

服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="broadcast" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="broadcast"/>

其中服务调用方配置优先于服务提供方配置。

集群容错调优

下表对各种策略做一个简单对比,

策略名称

优点

缺点

Failover

对调用者屏蔽调用失败的信息

增加RT,额外资源开销,资源浪费

Failfast

业务快速感知失败状态进行自主决策

产生较多报错的信息

Failsafe

即使失败了也不会影响核心流程

对于失败的信息不敏感,需要额外的监控

Failback

失败自动异步重试

重试任务可能堆积

Forking

并行发起多个调用,降低失败概率

消耗额外的机器资源,需要确保操作幂等性

Broadcast

支持对所有的服务提供者进行操作

资源消耗很大

综上我们得知,不同的容错策略往往对应不同的业务处理,这里做一个总结如下:

  • Failover :通常用于对调用rt不敏感的场景,如读操作;但重试会带来更长延迟
  • Failfast :通常用于非幂等性操作,需要快速感知失败的场景;比如新增记录
  • Failsafe :通常用于旁路系统,失败不影响核心流程正确性的场景;如日志记录
  • Failback :通常用于对于实时性要求不高,且不需要返回值的一些异步操作的场景
  • Forking :通常用于资源充足,且对于失败的容忍度较低,实时性要求高的读操作,但需要浪费更多服务资源
  • Broadcast:如通知所有提供者更新缓存或日志等本地资源信息
源码分析

我们在上一章看到了两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。那么 Cluster 接口和相关实现类有什么用呢?用途比较简单,仅用于生成 Cluster Invoker。下面我们来看一下源码。

public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 创建并返回 FailoverClusterInvoker 对象 return new FailoverClusterInvoker<T>(directory); } }

如上,FailoverCluster 总共就包含这几行代码,用于创建 FailoverClusterInvoker 对象,很简单。下面再看一个。

public class FailbackCluster implements Cluster { public final static String NAME = "failback"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 创建并返回 FailbackClusterInvoker 对象 return new FailbackClusterInvoker<T>(directory); } }

如上,FailbackCluster 的逻辑也是很简单,无需解释了。所以接下来,我们把重点放在各种 Cluster Invoker 上

Cluster Invoker

我们首先从各种 Cluster Invoker 的父类 AbstractClusterInvoker 源码开始说起。前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。

public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; // 绑定 attachments 到 invocation 中. Map<String String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 列举 Invoker List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { // 加载 LoadBalance loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation) Constants.LOADBALANCE_KEY Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl() invocation); // 调用 doInvoke 进行后续操作 return doInvoke(invocation invokers loadbalance); } // 抽象方法,由子类实现 protected abstract Result doInvoke(Invocation invocation List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException;

AbstractClusterInvoker 的 invoke 方法主要用于列举 Invoker,以及加载 LoadBalance。最后再调用模板方法 doInvoke 进行后续操作。下面我们来看一下 Invoker 列举方法 list(Invocation) 的逻辑,如下:

protected List<Invoker<T>> list(Invocation invocation) throws RpcException { // 调用 Directory 的 list 方法列举 Invoker List<Invoker<T>> invokers = directory.list(invocation); return invokers; }

如上,AbstractClusterInvoker 中的 list 方法做的事情很简单,只是简单的调用了 Directory 的 list 方法,没有其他更多的逻辑了。Directory 即相关实现类在前文已经分析过,这里就不多说了。接下来,我们把目光转移到 AbstractClusterInvoker 的各种实现类上,来看一下这些实现类是如何实现 doInvoke 方法逻辑的。

FailoverClusterInvoker

FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。下面来看一下该类的逻辑。

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { // 省略部分代码 @Override public Result doInvoke(Invocation invocation final List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers invocation); // 获取重试次数 int len = getUrl().getMethodParameter(invocation.getMethodName() Constants.RETRIES_KEY Constants.DEFAULT_RETRIES) 1; if (len <= 0) { len = 1; } RpcException le = null; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); // 循环调用,失败重试 for (int i = 0; i < len; i ) { if (i > 0) { checkWhetherDestroyed(); // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了, // 通过调用 list 可得到最新可用的 Invoker 列表 copyinvokers = list(invocation); // 对 copyinvokers 进行判空检查 checkInvokers(copyinvokers invocation); } // 通过负载均衡选择 Invoker Invoker<T> invoker = select(loadbalance invocation copyinvokers invoked); // 添加到 invoker 到 invoked 列表中 invoked.add(invoker); // 设置 invoked 到 RPC 上下文中 RpcContext.getContext().setInvokers((List) invoked); try { // 调用目标 Invoker 的 invoke 方法 Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage() e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 若重试失败,则抛出异常 throw new RpcException(... "Failed to invoke the method ..."); } }

如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。整个流程大致如此,不是很难理解。下面我们看一下 select 方法的逻辑。

protected Invoker<T> select(LoadBalance loadbalance Invocation invocation List<Invoker<T>> invokers List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 获取调用方法名 String methodName = invocation == null ? "" : invocation.getMethodName(); // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的 // 调用同一个服务提供者,除非该提供者挂了再进行切换 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName Constants.CLUSTER_STICKY_KEY Constants.DEFAULT_CLUSTER_STICKY); { // 检测 invokers 列表是否包含 stickyInvoker,如果不包含, // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含 // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。 // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的 // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。 if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。 // 此时继续调用 doSelect 选择 Invoker Invoker<T> invoker = doSelect(loadbalance invocation invokers selected); // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker if (sticky) { stickyInvoker = invoker; } return invoker; }

如上,select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测 invokers 列表中是否包含 stickyInvoker,如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。

如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。当可用性检测通过,才可返回 stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。

以上就是 select 方法的逻辑,这段逻辑看起来不是很复杂,但是信息量比较大。不搞懂 invokers 和 selected 两个入参的含义,以及粘滞连接特性,这段代码是不容易看懂的。所以大家在阅读这段代码时,不要忽略了对背景知识的理解。关于 select 方法先分析这么多,继续向下分析。

private Invoker<T> doSelect(LoadBalance loadbalance Invocation invocation List<Invoker<T>> invokers List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); if (loadbalance == null) { // 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } // 通过负载均衡组件选择 Invoker Invoker<T> invoker = loadbalance.select(invokers getUrl() invocation); // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { // 进行重选 Invoker<T> rinvoker = reselect(loadbalance invocation invokers selected availablecheck); if (rinvoker != null) { // 如果 rinvoker 不为空,则将其赋值给 invoker invoker = rinvoker; } else { // rinvoker 为空,定位 invoker 在 invokers 中的位置 int index = invokers.indexOf(invoker); try { // 获取 index 1 位置处的 Invoker,以下代码等价于: // invoker = invokers.get((index 1) % invokers.size()); invoker = index < invokers.size() - 1 ? invokers.get(index 1) : invokers.get(0); } catch (Exception e) { logger.warn("... may because invokers list dynamic change ignore."); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is : ..."); } } return invoker; }

doSelect 主要做了两件事,第一是通过负载均衡组件选择 Invoker。第二是,如果选出来的 Invoker 不稳定,或不可用,此时需要调用 reselect 方法进行重选。若 reselect 选出来的 Invoker 为空,此时定位 invoker 在 invokers 列表中的位置 index,然后获取 index 1 处的 invoker,这也可以看做是重选逻辑的一部分。下面我们来看一下 reselect 方法的逻辑。

private Invoker<T> reselect(LoadBalance loadbalance Invocation invocation List<Invoker<T>> invokers List<Invoker<T>> selected boolean availablecheck) throws RpcException { List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); // 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下 // 根据 availablecheck 进行不同的处理 if (availablecheck) { // 遍历 invokers 列表 for (Invoker<T> invoker : invokers) { // 检测可用性 if (invoker.isAvailable()) { // 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中 if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } } // reselectInvokers 不为空,此时通过负载均衡组件进行选择 if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers getUrl() invocation); } // 不检查 Invoker 可用性 } else { for (Invoker<T> invoker : invokers) { // 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中 if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { // 通过负载均衡组件进行选择 return loadbalance.select(reselectInvokers getUrl() invocation); } } { // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。 // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中 if (selected != null) { for (Invoker<T> invoker : selected) { if ((invoker.isAvailable()) && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { // 再次进行选择,并返回选择结果 return loadbalance.select(reselectInvokers getUrl() invocation); } } return null; }

reselect 方法总结下来其实只做了两件事情,第一是查找可用的 Invoker,并将其添加到 reselectInvokers 集合中。第二,如果 reselectInvokers 不为空,则通过负载均衡组件再次进行选择。其中第一件事情又可进行细分,一开始,reselect 从 invokers 列表中查找有效可用的 Invoker,若未能找到,此时再到 selected 列表中继续查找。关于 reselect 方法就先分析到这,继续分析其他的 Cluster Invoker。

FailbackClusterInvoker

FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。下面来看一下它的实现逻辑。

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final long RETRY_FAILED_PERIOD = 5 * 1000; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2 new NamedInternalThreadFactory("failback-cluster-timer" true)); private final ConcurrentMap<Invocation AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation AbstractClusterInvoker<?>>(); private volatile ScheduledFuture<?> retryFuture; @Override protected Result doInvoke(Invocation invocation List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers invocation); // 选择 Invoker Invoker<T> invoker = select(loadbalance invocation invokers null); // 进行调用 return invoker.invoke(invocation); } catch (Throwable e) { // 如果调用过程中发生异常,此时仅打印错误日志,不抛出异常 logger.error("Failback to invoke method ..."); // 记录调用信息 addFailed(invocation this); // 返回一个空结果给服务消费者 return new RpcResult(); } } private void addFailed(Invocation invocation AbstractClusterInvoker<?> router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { // 创建定时任务,每隔5秒执行一次 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { // 对失败的调用进行重试 retryFailed(); } catch (Throwable t) { // 如果发生异常,仅打印异常日志,不抛出 logger.error("Unexpected error occur at collect statistic" t); } } } RETRY_FAILED_PERIOD RETRY_FAILED_PERIOD TimeUnit.MILLISECONDS); } } } // 添加 invocation 和 invoker 到 failed 中 failed.put(invocation router); } void retryFailed() { if (failed.size() == 0) { return; } // 遍历 failed,对失败的调用进行重试 for (Map.Entry<Invocation AbstractClusterInvoker<?>> entry : new HashMap<Invocation AbstractClusterInvoker<?>>(failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { // 再次进行调用 invoker.invoke(invocation); // 调用成功后,从 failed 中移除 invoker failed.remove(invocation); } catch (Throwable e) { // 仅打印异常,不抛出 logger.error("Failed retry to invoke method ..."); } } } }

这个类主要由3个方法组成,首先是 doInvoker,该方法负责初次的远程调用。若远程调用失败,则通过 addFailed 方法将调用信息存入到 failed 中,等待定时重试。addFailed 在开始阶段会根据 retryFuture 为空与否,来决定是否开启定时任务。retryFailed 方法则是包含了失败重试的逻辑,该方法会对 failed 进行遍历,然后依次对 Invoker 进行调用。调用成功则将 Invoker 从 failed 中移除,调用失败则忽略失败原因。

以上就是 FailbackClusterInvoker 的执行逻辑,不是很复杂,继续往下看。

FailfastClusterInvoker

FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。源码如下:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { @Override public Result doInvoke(Invocation invocation List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException { checkInvokers(invokers invocation); // 选择 Invoker Invoker<T> invoker = select(loadbalance invocation invokers null); try { // 调用 Invoker return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // 抛出异常 throw (RpcException) e; } // 抛出异常 throw new RpcException(... "Failfast invoke providers ..."); } } }

如上,首先是通过 select 方法选择 Invoker,然后进行远程调用。如果调用失败,则立即抛出异常。FailfastClusterInvoker 就先分析到这,下面分析 FailsafeClusterInvoker。

FailsafeClusterInvoker

FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。下面分析源码。

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> { @Override public Result doInvoke(Invocation invocation List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers invocation); // 选择 Invoker Invoker<T> invoker = select(loadbalance invocation invokers null); // 进行远程调用 return invoker.invoke(invocation); } catch (Throwable e) { // 打印错误日志,但不抛出 logger.error("Failsafe ignore exception: " e.getMessage() e); // 返回空结果忽略错误 return new RpcResult(); } } }

FailsafeClusterInvoker 的逻辑和 FailfastClusterInvoker 的逻辑一样简单,无需过多说明。继续向下分析。

ForkingClusterInvoker

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。下面来看该类的实现。

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> { private final ExecutorService executor = Executors.newCachedThreadPool( new NamedInternalThreadFactory("forking-cluster-timer" true)); @Override public Result doInvoke(final Invocation invocation List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers invocation); final List<Invoker<T>> selected; // 获取 forks 配置 final int forks = getUrl().getParameter(Constants.FORKS_KEY Constants.DEFAULT_FORKS); // 获取超时配置 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY Constants.DEFAULT_TIMEOUT); // 如果 forks 配置不合理,则直接将 invokers 赋值给 selected if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); // 循环选出 forks 个 Invoker,并添加到 selected 中 for (int i = 0; i < forks; i ) { // 选择 Invoker Invoker<T> invoker = select(loadbalance invocation invokers selected); if (!selected.contains(invoker)) { selected.add(invoker); } } } // ----------------------✨ 分割线1 ✨---------------------- // RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); // 遍历 selected 列表 for (final Invoker<T> invoker : selected) { // 为每个 Invoker 创建一个执行线程 executor.execute(new Runnable() { @Override public void run() { try { // 进行远程调用 Result result = invoker.invoke(invocation); // 将结果存到阻塞队列中 ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); // 仅在 value 大于等于 selected.size() 时,才将异常对象 // 放入阻塞队列中,请大家思考一下为什么要这样做。 if (value >= selected.size()) { // 将异常对象存入到阻塞队列中 ref.offer(e); } } } }); } // ----------------------✨ 分割线2 ✨---------------------- // try { // 从阻塞队列中取出远程调用结果 Object ret = ref.poll(timeout TimeUnit.MILLISECONDS); // 如果结果类型为 Throwable,则抛出异常 if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(... "Failed to forking invoke provider ..."); } // 返回结果 return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider ..."); } } finally { RpcContext.getContext().clearAttachments(); } } }

ForkingClusterInvoker 的 doInvoker 方法比较长,这里通过两个分割线将整个方法划分为三个逻辑块。从方法开始到分割线1之间的代码主要是用于选出 forks 个 Invoker,为接下来的并发调用提供输入。分割线1和分割线2之间的逻辑通过线程池并发调用多个 Invoker,并将结果存储在阻塞队列中。分割线2到方法结尾之间的逻辑主要用于从阻塞队列中获取返回结果,并对返回结果类型进行判断。如果为异常类型,则直接抛出,否则返回。

以上就是ForkingClusterInvoker 的 doInvoker 方法大致过程。我们在分割线1和分割线2之间的代码上留了一个问题,问题是这样的:为什么要在value >= selected.size()的情况下,才将异常对象添加到阻塞队列中?这里来解答一下。原因是这样的,在并行调用多个服务提供者的情况下,只要有一个服务提供者能够成功返回结果,而其他全部失败。此时 ForkingClusterInvoker 仍应该返回成功的结果,而非抛出异常。在value >= selected.size()时将异常对象放入阻塞队列中,可以保证异常对象不会出现在正常结果的前面,这样可从阻塞队列中优先取出正常的结果。

关于 ForkingClusterInvoker 就先分析到这,接下来分析最后一个 Cluster Invoker。

BroadcastClusterInvoker

本章的最后,我们再来看一下 BroadcastClusterInvoker。BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。源码如下。

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> { @Override public Result doInvoke(final Invocation invocation List<Invoker<T>> invokers LoadBalance loadbalance) throws RpcException { checkInvokers(invokers invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // 遍历 Invoker 列表,逐个调用 for (Invoker<T> invoker : invokers) { try { // 进行远程调用 result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage() e); } catch (Throwable e) { exception = new RpcException(e.getMessage() e); logger.warn(e.getMessage() e); } } // exception 不为空,则抛出异常 if (exception != null) { throw exception; } return result; } }

以上就是 BroadcastClusterInvoker 的代码,比较简单,就不多说了。

这里分析了集群容错的几种实现方式。集群容错对于 Dubbo 框架来说,是很重要的逻辑。集群模块处于服务提供者和消费者之间,对于服务消费者来说,集群可向其屏蔽服务提供者集群的情况,使其能够专心进行远程调用。除此之外,通过集群模块,我们还可以对服务之间的调用链路进行编排优化,治理服务。总的来说,对于 Dubbo 而言,集群容错相关逻辑是非常重要的。想要对 Dubbo 有比较深的理解,集群容错是必须要掌握的。

负载均衡

在之前章节中,介绍了服务集群的调用方式。我们发现在多服务实例时,负载均衡调用是其中极其重要的一环。在本章节中,我们一起学习Dubbo中的各种负载均衡策略

负载均衡的主要作用

javadubbo源码分析(dubbo源码解析-高可用集群)(3)

负载均衡(LoadBalance),它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。

在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。

内置的负载均衡策略

Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。这几个负载均衡算法代码不是很长,但是想看懂也不是很容易,需要大家对这几个算法的原理有一定了解才行。如果不是很了解,也没不用太担心。我们会在分析每个算法的源码之前,对算法原理进行简单的讲解,帮助大家建立初步的印象。

RandomLoadBalance

RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A B C],他们对应的权重为 weights = [5 3 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0 5) 区间属于服务器 A,[5 8) 区间属于服务器 B,[8 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。

javadubbo源码分析(dubbo源码解析-高可用集群)(4)

以上就是 RandomLoadBalance 背后的算法思想,比较简单。下面开始分析源码。

public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers URL url Invocation invocation) { int length = invokers.size(); int totalWeight = 0; boolean sameWeight = true; // 下面这个循环有两个作用,第一是计算总权重 totalWeight, // 第二是检测每个服务提供者的权重是否相同 for (int i = 0; i < length; i ) { int weight = getWeight(invokers.get(i) invocation); // 累加权重 totalWeight = weight; // 检测当前服务提供者的权重与上一个服务提供者的权重是否相同, // 不相同的话,则将 sameWeight 置为 false。 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1) invocation)) { sameWeight = false; } } // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上 if (totalWeight > 0 && !sameWeight) { // 随机获取一个 [0 totalWeight) 区间内的数字 int offset = random.nextInt(totalWeight); // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。 // 举例说明一下,我们有 servers = [A B C],weights = [5 3 2],offset = 7。 // 第一次循环,offset - 5 = 2 > 0,即 offset > 5, // 表明其不会落在服务器 A 对应的区间上。 // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8, // 表明其会落在服务器 B 对应的区间上 for (int i = 0; i < length; i ) { // 让随机值 offset 减去权重值 offset -= getWeight(invokers.get(i) invocation); if (offset < 0) { // 返回相应的 Invoker return invokers.get(i); } } } // 如果所有服务提供者权重值相同,此时直接随机返回一个即可 return invokers.get(random.nextInt(length)); } }

RandomLoadBalance 的算法思想比较简单,在经过多次请求后,能够将调用请求按照权重值进行“均匀”分配。当然 RandomLoadBalance 也存在一定的缺点,当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。RandomLoadBalance 是一个简单,高效的负载均衡实现,因此 Dubbo 选择它作为缺省实现。

LeastActiveLoadBalance

LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。关于 LeastActiveLoadBalance 的背景知识就先介绍到这里,下面开始分析源码。

public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers URL url Invocation invocation) { int length = invokers.size(); // 最小的活跃数 int leastActive = -1; // 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量 int leastCount = 0; // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息 int[] leastIndexes = new int[length]; // 记录每个Invoker的权重 int[] weights = new int[length]; int totalWeight = 0; // 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比, // 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等 int firstWeight = 0; boolean sameWeight = true; //遍历 invokers 列表 for (int i = 0; i < length; i ) { Invoker<T> invoker = invokers.get(i); // 获取 Invoker 对应的活跃数 int active = RpcStatus.getStatus(invoker.getUrl() invocation.getMethodName()).getActive(); int afterWarmup = getWeight(invoker invocation); //获取权重 weights[i] = afterWarmup; // 发现更小的活跃数,重新开始 if (leastActive == -1 || active < leastActive) { // 使用当前活跃数 active 更新最小活跃数 leastActive leastActive = active; // 更新 leastCount 为 1 leastCount = 1; // 记录当前下标值到 leastIndexs 中 leastIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; // 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同 } else if (active == leastActive) { // 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标 leastIndexes[leastCount ] = i; // 累加权重 totalWeight = afterWarmup; // 检测当前 Invoker 的权重与 firstWeight 是否相等, // 不相等则将 sameWeight 置为 false if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可 if (leastCount == 1) { return invokers.get(leastIndexes[0]); } // 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同 if (!sameWeight && totalWeight > 0) { // 随机生成一个 [0 totalWeight) 之间的数字 int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // 循环让随机数减去具有最小活跃数的 Invoker 的权重值, // 当 offset 小于等于0时,返回相应的 Invoker for (int i = 0; i < leastCount; i ) { int leastIndex = leastIndexes[i]; //获取权重值,并让随机数减去权重值 offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 如果权重相同或权重为0时,随机返回一个 Invoker return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }

除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。

ConsistentHashLoadBalance

一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0 2^32-1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。

javadubbo源码分析(dubbo源码解析-高可用集群)(5)

下面来看看一致性 hash 在 Dubbo 中的应用。我们把上图的缓存节点替换成 Dubbo 的服务提供者,于是得到了下图:

javadubbo源码分析(dubbo源码解析-高可用集群)(6)

这里相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,…… Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。比如:

javadubbo源码分析(dubbo源码解析-高可用集群)(7)

如上,由于 Invoker-1 和 Invoker-2 在圆环上分布不均,导致系统中75%的请求都会落到 Invoker-1 上,只有 25% 的请求会落到 Invoker-2 上。解决这个问题办法是引入虚拟节点,通过虚拟节点均衡各个节点的请求量。

到这里背景知识就普及完了,接下来开始分析源码。我们先从 ConsistentHashLoadBalance 的 doSelect 方法开始看起,如下:

public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentMap<String ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String ConsistentHashSelector<?>>(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers URL url Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() "." methodName; // 获取 invokers 原始的 hashcode int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); // 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。 // 此时 selector.identityHashCode != identityHashCode 条件成立 if (selector == null || selector.identityHashCode != identityHashCode) { // 创建新的 ConsistentHashSelector selectors.put(key new ConsistentHashSelector<T>(invokers methodName identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } // 调用 ConsistentHashSelector 的 select 方法选择 Invoker return selector.select(invocation); } private static final class ConsistentHashSelector<T> {...} }

如上,doSelect 方法主要做了一些前置工作,比如检测 invokers 列表是不是变动过,以及创建 ConsistentHashSelector。这些工作做完后,接下来开始调用 ConsistentHashSelector 的 select 方法执行负载均衡逻辑。在分析 select 方法之前,我们先来看一下一致性 hash 选择器 ConsistentHashSelector 的初始化过程,如下:

private static final class ConsistentHashSelector<T> { // 使用 TreeMap 存储 Invoker 虚拟节点 private final TreeMap<Long Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers String methodName int identityHashCode) { this.virtualInvokers = new TreeMap<Long Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取虚拟节点数,默认为160 this.replicaNumber = url.getMethodParameter(methodName "hash.nodes" 160); // 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName "hash.arguments" "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i ) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i ) { // 对 address i 进行 md5 运算,得到一个长度为16的字节数组 byte[] digest = md5(address i); // 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数 for (int h = 0; h < 4; h ) { // h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算 // h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算 // h = 2 h = 3 时过程同上 long m = hash(digest h); // 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中, // virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存储结构 virtualInvokers.put(m invoker); } } } } }

ConsistentHashSelector 的构造方法执行了一系列的初始化逻辑,比如从配置中获取虚拟节点数以及参与 hash 计算的参数下标,默认情况下只使用第一个参数进行 hash。需要特别说明的是,ConsistentHashLoadBalance 的负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。ConsistentHashLoadBalance 不 关系权重,因此使用时需要注意一下。

在获取虚拟节点数和参数下标配置后,接下来要做的事情是计算虚拟节点 hash 值,并将虚拟节点存储到 TreeMap 中。到此,ConsistentHashSelector 初始化工作就完成了。接下来,我们来看看 select 方法的逻辑。

public Invoker<T> select(Invocation invocation) { // 将参数转为 key String key = toKey(invocation.getArguments()); // 对参数 key 进行 md5 运算 byte[] digest = md5(key); // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法, // 寻找合适的 Invoker return selectForKey(hash(digest 0)); } private Invoker<T> selectForKey(long hash) { // 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker Map.Entry<Long Invoker<T>> entry = virtualInvokers.tailMap(hash true).firstEntry(); // 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null, // 需要将 TreeMap 的头节点赋值给 entry if (entry == null) { entry = virtualInvokers.firstEntry(); } // 返回 Invoker return entry.getValue(); }

如上,选择的过程相对比较简单了。首先是对参数进行 md5 以及 hash 运算,得到一个 hash 值。然后再拿这个值到 TreeMap 中查找目标 Invoker 即可。

RoundRobinLoadBalance

LeastActiveLoadBalance 即加权轮询负载均衡,我们先来了解一下什么是加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。

但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static int RECYCLE_PERIOD = 60000; protected static class WeightedRoundRobin { // 服务提供者权重 private int weight; // 当前权重 private AtomicLong current = new AtomicLong(0); // 最后一次更新时间 private long lastUpdate; public void setWeight(int weight) { this.weight = weight; // 初始情况下,current = 0 current.set(0); } public long increaseCurrent() { // current = current weight; return current.addAndGet(weight); } public void sel(int total) { // current = current - total; current.addAndGet(-1 * total); } } // 嵌套 Map 结构,存储的数据结构示例如下: // { // "UserService.query": { // "url1": WeightedRoundRobin@123 // "url2": WeightedRoundRobin@456 // } // "UserService.update": { // "url1": WeightedRoundRobin@123 // "url2": WeightedRoundRobin@456 // } // } // 最外层为服务类名 方法名,第二层为 url 到 WeightedRoundRobin 的映射关系。 // 这里我们可以将 url 看成是服务提供者的 id private ConcurrentMap<String ConcurrentMap<String WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String ConcurrentMap<String WeightedRoundRobin>>(); // 原子更新锁 private AtomicBoolean updateLock = new AtomicBoolean(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers URL url Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() "." invocation.getMethodName(); // 获取 url 到 WeightedRoundRobin 映射表,如果为空,则创建一个新的 ConcurrentMap<String WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key new ConcurrentHashMap<String WeightedRoundRobin>()); map = methodWeightMap.get(key); } int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; // 获取当前时间 long now = System.currentTimeMillis(); Invoker<T> selectedInvoker = null; WeightedRoundRobin selectedWRR = null; // 下面这个循环主要做了这样几件事情: // 1. 遍历 Invoker 列表,检测当前 Invoker 是否有 // 相应的 WeightedRoundRobin,没有则创建 // 2. 检测 Invoker 权重是否发生了变化,若变化了, // 则更新 WeightedRoundRobin 的 weight 字段 // 3. 让 current 字段加上自身权重,等价于 current = weight // 4. 设置 lastUpdate 字段,即 lastUpdate = now // 5. 寻找具有最大 current 的 Invoker,以及 Invoker 对应的 WeightedRoundRobin, // 暂存起来,留作后用 // 6. 计算权重总和 for (Invoker<T> invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); int weight = getWeight(invoker invocation); if (weight < 0) { weight = 0; } // 检测当前 Invoker 是否有对应的 WeightedRoundRobin,没有则创建 if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); // 设置 Invoker 权重 weightedRoundRobin.setWeight(weight); // 存储 url 唯一标识 identifyString 到 weightedRoundRobin 的映射关系 map.putIfAbsent(identifyString weightedRoundRobin); weightedRoundRobin = map.get(identifyString); } // Invoker 权重不等于 WeightedRoundRobin 中保存的权重,说明权重变化了,此时进行更新 if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // 让 current 加上自身权重,等价于 current = weight long cur = weightedRoundRobin.increaseCurrent(); // 设置 lastUpdate,表示近期更新过 weightedRoundRobin.setLastUpdate(now); // 找出最大的 current if (cur > maxCurrent) { maxCurrent = cur; // 将具有最大 current 权重的 Invoker 赋值给 selectedInvoker selectedInvoker = invoker; // 将 Invoker 对应的 weightedRoundRobin 赋值给 selectedWRR,留作后用 selectedWRR = weightedRoundRobin; } // 计算权重总和 totalWeight = weight; } // 对 <identifyString WeightedRoundRobin> 进行检查,过滤掉长时间未被更新的节点。 // 该节点可能挂了,invokers 中不包含该节点,所以该节点的 lastUpdate 长时间无法被更新。 // 若未更新时长超过阈值后,就会被移除掉,默认阈值为60秒。 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false true)) { try { ConcurrentMap<String WeightedRoundRobin> newMap = new ConcurrentHashMap<String WeightedRoundRobin>(); // 拷贝 newMap.putAll(map); // 遍历修改,即移除过期记录 Iterator<Entry<String WeightedRoundRobin>> it = newMap.entrySet().iterator(); while (it.hasNext()) { Entry<String WeightedRoundRobin> item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } // 更新引用 methodWeightMap.put(key newMap); } finally { updateLock.set(false); } } } if (selectedInvoker != null) { // 让 current 减去权重总和,等价于 current -= totalWeight selectedWRR.sel(totalWeight); // 返回具有最大 current 的 Invoker return selectedInvoker; } // should not happen here return invokers.get(0); } }

轮询调用并不是简单的一个接着一个依次调用,它是根据权重的值进行循环的。

负载均衡总结

Dubbo 负载均衡策略提供下列四种方式:

Random LoadBalance 随机,按权重设置随机概率。 Dubbo的默认负载均衡策略

在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

RoundRobin LoadBalance 轮循,按公约后的权重设置轮循比率。

存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive LoadBalance 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

ConsistentHash LoadBalance 一致性Hash,相同参数的请求总是发到同一提供者。

当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

服务治理服务治理的概述

服务治理主要作用是改变运行时服务的行为和选址逻辑,达到限流,权重配置等目的,主要有:标签路由,条件路由,黑白名单,动态配置,权重调节,负载均衡等功能。

javadubbo源码分析(dubbo源码解析-高可用集群)(8)

执行过程

javadubbo源码分析(dubbo源码解析-高可用集群)(9)

1、消费者,提供者启动成功,订阅zookeeper节点

2、管理平台对服务进行治理处理,向zookeeper写入节点数据

3、写入成功,通知消费者,提供者

4、根据不同的业务处理,在invoker调用时做出响应的处理

相关案例服务禁用

服务禁用:通常用于临时踢除某台提供者机器

configVersion: v2.7 scope: application key: demo-provider enabled: true configs: - addresses: ["192.168.191.2:20883"] side: provider parameters: disabled: true服务降级屏蔽

服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。

  • 容错:当系统出现非业务异常(比如并发数太高导致超时,网络异常等)时,不对该接口进行处理。
  • 屏蔽:在大促,促销活动的可预知情况下,例如双11活动。采用直接屏蔽接口访问

configVersion: v2.7 scope: service key: org.apache.dubbo.samples.governance.api.DemoService enabled: true configs: - side: consumer parameters: force: return 12345

〖特别声明〗:本文内容仅供参考,不做权威认证,如若验证其真实性,请咨询相关权威专业人士。如有侵犯您的原创版权或者图片、等版权权利请告知 wzz#tom.com,我们将尽快删除相关内容。

赞 ()
打赏 微信扫一扫 微信扫一扫

相关推荐