文章

集群容错(五)之Merger实现

集群容错(五)之Merger实现

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文接 《精尽 Dubbo 源码解析 —— 集群容错(四)之 LoadBalance 实现》 一文,分享 dubbo-cluster 模块, merger 包,各种 Merger 实现类

Merger 相关类,如下图:

Merger 相关类

我们可以看到,目前一共有两部分

  • Merger 以及其实现类。
  • MergerCluster 以及其 MergerClusterInvoker

老艿艿:本文对应 《Dubbo 用户指南 —— 分组聚合》 文档。

2. Merger

com.alibaba.dubbo.rpc.cluster.Merger ,Merger 接口,提供接口方法,将对象数组合并成一个对象。代码如下:

```plain text plain @SPI public interface Merger { /** * 合并 T 数组,返回合并后的 T 对象 * * @param items T 数组 * @return T 对象 */ T merge(T... items); }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
---

- @SPI**拓展点**
注解,Dubbo SPI
,无默认值。

## 2.1 Merger 实现类

Merger 内置**十二**个实现类,从代码上看基本类似。我们以 MapMerger 和 ShortArrayMerger 作为例子。

### 2.1.1 MapMerger

com.alibaba.dubbo.rpc.cluster.merger.MapMerger ,实现 Merger 接口,Map Merger 实现类。代码如下:

```plain text
plain public class MapMerger implements Merger<Map<?, ?>> {      @Override     public Map<?, ?> merge(Map<?, ?>... items) {         if (items.length == 0) {             return null;         }         // 创建结果 Map         Map<Object, Object> result = new HashMap<Object, Object>();         // 合并多个 Map         for (Map<?, ?> item : items) {             if (item != null) {                 result.putAll(item);             }         }         return result;     }  }

2.1.2 ShortArrayMerger

com.alibaba.dubbo.rpc.cluster.merger.ShortArrayMerger ,实现 Merger 接口,Short 数组 Merger 实现类。代码如下:

```plain text plain public class ShortArrayMerger implements Merger<short[]> { @Override public short[] merge(short[]… items) { // 计算合并后的数组大小 int total = 0; for (short[] array : items) { total += array.length; } // 创建结果数组 short[] result = new short[total]; // 合并多个数组 int index = 0; for (short[] array : items) { for (short item : array) { result[index++] = item; } } return result; } }

1
2
3
4
5
6
7
8
9
---

## 2.2 MergerFactory

com.alibaba.dubbo.rpc.cluster.merger.MergerFactory ,Merger 工厂类,提供 #getMerger(Class returnType) 方法,获得**指定类**对应的 Merger 对象。代码如下:

```plain text
plain public class MergerFactory {      /**      * Merger 对象缓存      */     private static final ConcurrentMap<Class<?>, Merger<?>> mergerCache = new ConcurrentHashMap<Class<?>, Merger<?>>();      public static <T> Merger<T> getMerger(Class<T> returnType) {         Merger result;         // 数组类型         if (returnType.isArray()) {             Class type = returnType.getComponentType();             // 从缓存中获得 Merger 对象             result = mergerCache.get(type);             if (result == null) {                 loadMergers();                 result = mergerCache.get(type);             }             // 获取不到,使用 ArrayMerger             if (result == null && !type.isPrimitive()) {                 result = ArrayMerger.INSTANCE;             }         // 普通类型         } else {             // 从缓存中获得 Merger 对象             result = mergerCache.get(returnType);             if (result == null) {                 loadMergers();                 result = mergerCache.get(returnType);             }         }         return result;     }      /**      * 初始化所有的 Merger 拓展对象,到 mergerCache 缓存中。       */     static void loadMergers() {         Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class).getSupportedExtensions();         for (String name : names) {             Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);             mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);         }     }  }

3. MergeableCluster

com.alibaba.dubbo.rpc.cluster.support.MergeableCluster ,实现 Cluster 接口,分组聚合 Cluster 实现类。代码如下:

```plain text plain public class MergeableCluster implements Cluster { public static final String NAME = “mergeable”; @Override public Invoker join(Directory directory) throws RpcException { return new MergeableClusterInvoker(directory); } }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
---

- 对应 Invoker 实现类为 MergeableClusterInvoker 。

Merger 的使用,**需要设置 Cluster 的实现类为 MergeableCluster** 。但是呢,它的配置方式,和其他 Cluster 实现类不同。

- 使用方式,参见 [《Dubbo 用户指南 —— 分组聚合》](http://dubbo.apache.org/zh-cn/docs/user/demos/group-merger.html)
文档。
- 原因,参见 [《精尽 Dubbo 源码解析 —— 集群容错(三)之 Directory 实现》](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory?self=)[「4.3.3.3 toMergeMethodInvokerMap」](http://svip.iocoder.cn/Dubbo/cluster-5-impl-merger/#)
的
。

## 3.1 MergeableClusterInvoker

com.alibaba.dubbo.rpc.cluster.support.MergeableClusterInvoker ,实现 Invoker 接口,MergeableCluster Invoker 实现类。代码如下:

```plain text
plain /**  * Directory$Adaptive 对象  */ private final Directory<T> directory; /**  * ExecutorService 对象,并且为 CachedThreadPool 。  */ private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));    1: @Override   2: public Result invoke(final Invocation invocation) throws RpcException {   3:     // 获得 Invoker 集合   4:     List<Invoker<T>> invokers = directory.list(invocation);   5:     // 获得 Merger 拓展名   6:     String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);   7:     // 若果未配置拓展,直接调用首个可用的 Invoker 对象   8:     if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group   9:         for (final Invoker<T> invoker : invokers) {  10:             if (invoker.isAvailable()) {  11:                 return invoker.invoke(invocation);  12:             }  13:         }  14:         return invokers.iterator().next().invoke(invocation);  15:     }  16:   17:     // 通过反射,获得返回类型  18:     Class<?> returnType;  19:     try {  20:         returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();  21:     } catch (NoSuchMethodException e) {  22:         returnType = null;  23:     }  24:   25:     // 提交线程池,并行执行,发起 RPC 调用,并添加到 results 中  26:     Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();  27:     for (final Invoker<T> invoker : invokers) {  28:         Future<Result> future = executor.submit(new Callable<Result>() {  29:             public Result call() {  30:                 // RPC 调用  31:                 return invoker.invoke(new RpcInvocation(invocation, invoker));  32:             }  33:         });  34:         results.put(invoker.getUrl().getServiceKey(), future);  35:     }  36:   37:     // 阻塞等待执行执行结果,并添加到 resultList 中  38:     List<Result> resultList = new ArrayList<Result>(results.size());  39:     int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);  40:     for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {  41:         Future<Result> future = entry.getValue();  42:         try {  43:             Result r = future.get(timeout, TimeUnit.MILLISECONDS);  44:             if (r.hasException()) { // 异常 Result ,打印错误日志,忽略  45:                 log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());  46:             } else { // 正常 Result ,添加到 resultList 中  47:                 resultList.add(r);  48:             }  49:         } catch (Exception e) { // 异常,抛出 RpcException 异常  50:             throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);  51:         }  52:     }  53:   54:     // 结果大小为空,返回空的 RpcResult  55:     if (resultList.isEmpty()) {  56:         return new RpcResult((Object) null);  57:     // 结果大小为 1 ,返回首个 RpcResult  58:     } else if (resultList.size() == 1) {  59:         return resultList.iterator().next();  60:     }  61:     // 返回类型为 void ,返回空的 RpcResult  62:     if (returnType == void.class) {  63:         return new RpcResult((Object) null);  64:     }  65:   66:     Object result;  67:     // 【第 1 种】基于合并方法  68:     if (merger.startsWith(".")) {  69:         // 获得合并方法 Method  70:         merger = merger.substring(1);  71:         Method method;  72:         try {  73:             method = returnType.getMethod(merger, returnType);  74:         } catch (NoSuchMethodException e) {  75:             throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());  76:         }  77:         // 有 Method ,进行合并  78:         if (method != null) {  79:             if (!Modifier.isPublic(method.getModifiers())) {  80:                 method.setAccessible(true);  81:             }  82:             result = resultList.remove(0).getValue();  83:             try {  84:                 // 方法返回类型匹配,合并时,修改 result  85:                 if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {  86:                     for (Result r : resultList) {  87:                         result = method.invoke(result, r.getValue());  88:                     }  89:                 // 方法返回类型不匹配,合并时,不修改 result  90:                 } else {  91:                     for (Result r : resultList) {  92:                         method.invoke(result, r.getValue());  93:                     }  94:                 }  95:             } catch (Exception e) {  96:                 throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);  97:             }  98:         // 无 Method ,抛出 RpcException 异常  99:         } else { 100:             throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString()); 101:         } 102:     // 【第 2 种】基于 Merger 103:     } else { 104:         Merger resultMerger; 105:         // 【第 2.1 种】根据返回值类型自动匹配 Merger 106:         if (ConfigUtils.isDefault(merger)) { 107:             resultMerger = MergerFactory.getMerger(returnType); 108:         // 【第 2.2 种】指定 Merger 109:         } else { 110:             resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); 111:         } 112:         // 有 Merger ,进行合并 113:         if (resultMerger != null) { 114:             List<Object> rets = new ArrayList<Object>(resultList.size()); 115:             for (Result r : resultList) { 116:                 rets.add(r.getValue()); 117:             } 118:             result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0))); 119:         // 无 Merger ,抛出 RpcException 异常 120:         } else { 121:             throw new RpcException("There is no merger to merge result."); 122:         } 123:     } 124:     // 返回 RpcResult 结果 125:     return new RpcResult(result); 126: }

  • 看似比较长,实际很易懂。
  • 第 4 行:调用 集合 Directory#list(invocation) 方法,获得服务 Invoker 。
  • 第 6 行:调用 方法级 URL#getMethodParameter(methodName, “merger”) 方法,获得 Merger 拓展名, 。
  • 第 7 至 15 行:若未配置可用 Merger 拓展名,优先调用首个 的 Invoker 对象,其次调用首个 Invoker 对象。
  • 第 17 至 23 行:通过反射,获得调用方法的返回类型
  • 第 25 至 35 行:提交线程池,并行 执行,发起 RPC 调用,并添加 Future 到 results 中。
  • 第 37 至 52 行:阻塞注意忽略 等待执行结果,并添加到 resultList 中。 ,分成正常 Result、异常 Result( )、Exception 三种情况。
  • 第 54 至 56 行:结果大小为空空 ,返回 的 RpcResult 。
  • 第 57 至 60 行:结果大小为 1首个 ,返回 RpcResult 。
  • 第 61 至 64 行:返回类型为 void空 ,返回 的 RpcResult 。
  • ========== 【第 1 种 】基于 Method 合并==========
  • 第 68 行:若 本身 merger 为 “.” 开头,指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型 。
  • 第 69 至 76 行:调用 合并方法 Method本身《dubbo源码-集群容错之MergeableCluster》 Class#getMethod(String name, Class<?>… parameterTypes) 方法,获得 。这个方法,意味着“合并方法的参数类型必须是返回结果类型 ”!!!具体原因,见 ,搜索 “在条件分支if ( merger.startsWith(”.”) ) {}“ 。
  • 第 77 至 97 行:有循环 Method , 调用 Method#invoke(Object obj, Object… args) 方法,进行合并。
  • 第 98 至 101 行: Method ,抛出 RpcException 异常。
  • ========== 【第 2 种 】基于 Merger 合并 ==========
  • 【第 2.1返回值类型 种】第 105 至 107 行:当 merger 为 “default” 或 “true” 时,调用 MergerFactory#getMerger(Class returnType) 方法,根据 自动匹配 Merger 。
  • 【第 2.2指定 种】第 108 至 111 行:调用 ExtensionLoader#getExtension(merger) 方法啊,获得 Merger 。
  • 第 112 至 118 行:有循环 Merger , 调用 Merger#merge(T… items) 方法,进行合并。
  • 第 119 至 122 行: Method ,抛出 RpcException 异常。
本文由作者按照 CC BY 4.0 进行授权