集群容错(五)之Merger实现
本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码解析 —— 集群容错(四)之 LoadBalance 实现》 一文,分享 dubbo-cluster 模块, merger 包,各种 Merger 实现类。
Merger 相关类,如下图:
Merger 相关类
我们可以看到,目前一共有两部分:
- Merger 以及其实现类。
- MergerCluster 以及其 MergerClusterInvoker
老艿艿:本文对应 《Dubbo 用户指南 —— 分组聚合》 文档。
2. Merger
com.alibaba.dubbo.rpc.cluster.Merger ,Merger 接口,提供接口方法,将对象数组合并成一个对象。代码如下:
```plain text plain @SPI public interface Merger
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
---
- @SPI**拓展点**
注解,Dubbo SPI
,无默认值。
## 2.1 Merger 实现类
Merger 内置**十二**个实现类,从代码上看基本类似。我们以 MapMerger 和 ShortArrayMerger 作为例子。
### 2.1.1 MapMerger
com.alibaba.dubbo.rpc.cluster.merger.MapMerger ,实现 Merger 接口,Map Merger 实现类。代码如下:
```plain text
plain public class MapMerger implements Merger<Map<?, ?>> { @Override public Map<?, ?> merge(Map<?, ?>... items) { if (items.length == 0) { return null; } // 创建结果 Map Map<Object, Object> result = new HashMap<Object, Object>(); // 合并多个 Map for (Map<?, ?> item : items) { if (item != null) { result.putAll(item); } } return result; } }
2.1.2 ShortArrayMerger
com.alibaba.dubbo.rpc.cluster.merger.ShortArrayMerger ,实现 Merger 接口,Short 数组 Merger 实现类。代码如下:
```plain text plain public class ShortArrayMerger implements Merger<short[]> { @Override public short[] merge(short[]… items) { // 计算合并后的数组大小 int total = 0; for (short[] array : items) { total += array.length; } // 创建结果数组 short[] result = new short[total]; // 合并多个数组 int index = 0; for (short[] array : items) { for (short item : array) { result[index++] = item; } } return result; } }
1
2
3
4
5
6
7
8
9
---
## 2.2 MergerFactory
com.alibaba.dubbo.rpc.cluster.merger.MergerFactory ,Merger 工厂类,提供 #getMerger(Class returnType) 方法,获得**指定类**对应的 Merger 对象。代码如下:
```plain text
plain public class MergerFactory { /** * Merger 对象缓存 */ private static final ConcurrentMap<Class<?>, Merger<?>> mergerCache = new ConcurrentHashMap<Class<?>, Merger<?>>(); public static <T> Merger<T> getMerger(Class<T> returnType) { Merger result; // 数组类型 if (returnType.isArray()) { Class type = returnType.getComponentType(); // 从缓存中获得 Merger 对象 result = mergerCache.get(type); if (result == null) { loadMergers(); result = mergerCache.get(type); } // 获取不到,使用 ArrayMerger if (result == null && !type.isPrimitive()) { result = ArrayMerger.INSTANCE; } // 普通类型 } else { // 从缓存中获得 Merger 对象 result = mergerCache.get(returnType); if (result == null) { loadMergers(); result = mergerCache.get(returnType); } } return result; } /** * 初始化所有的 Merger 拓展对象,到 mergerCache 缓存中。 */ static void loadMergers() { Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class).getSupportedExtensions(); for (String name : names) { Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name); mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m); } } }
3. MergeableCluster
com.alibaba.dubbo.rpc.cluster.support.MergeableCluster ,实现 Cluster 接口,分组聚合 Cluster 实现类。代码如下:
```plain text plain public class MergeableCluster implements Cluster { public static final String NAME = “mergeable”; @Override public
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
---
- 对应 Invoker 实现类为 MergeableClusterInvoker 。
Merger 的使用,**需要设置 Cluster 的实现类为 MergeableCluster** 。但是呢,它的配置方式,和其他 Cluster 实现类不同。
- 使用方式,参见 [《Dubbo 用户指南 —— 分组聚合》](http://dubbo.apache.org/zh-cn/docs/user/demos/group-merger.html)
文档。
- 原因,参见 [《精尽 Dubbo 源码解析 —— 集群容错(三)之 Directory 实现》](http://svip.iocoder.cn/Dubbo/cluster-3-impl-directory?self=)[「4.3.3.3 toMergeMethodInvokerMap」](http://svip.iocoder.cn/Dubbo/cluster-5-impl-merger/#)
的
。
## 3.1 MergeableClusterInvoker
com.alibaba.dubbo.rpc.cluster.support.MergeableClusterInvoker ,实现 Invoker 接口,MergeableCluster Invoker 实现类。代码如下:
```plain text
plain /** * Directory$Adaptive 对象 */ private final Directory<T> directory; /** * ExecutorService 对象,并且为 CachedThreadPool 。 */ private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true)); 1: @Override 2: public Result invoke(final Invocation invocation) throws RpcException { 3: // 获得 Invoker 集合 4: List<Invoker<T>> invokers = directory.list(invocation); 5: // 获得 Merger 拓展名 6: String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); 7: // 若果未配置拓展,直接调用首个可用的 Invoker 对象 8: if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group 9: for (final Invoker<T> invoker : invokers) { 10: if (invoker.isAvailable()) { 11: return invoker.invoke(invocation); 12: } 13: } 14: return invokers.iterator().next().invoke(invocation); 15: } 16: 17: // 通过反射,获得返回类型 18: Class<?> returnType; 19: try { 20: returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); 21: } catch (NoSuchMethodException e) { 22: returnType = null; 23: } 24: 25: // 提交线程池,并行执行,发起 RPC 调用,并添加到 results 中 26: Map<String, Future<Result>> results = new HashMap<String, Future<Result>>(); 27: for (final Invoker<T> invoker : invokers) { 28: Future<Result> future = executor.submit(new Callable<Result>() { 29: public Result call() { 30: // RPC 调用 31: return invoker.invoke(new RpcInvocation(invocation, invoker)); 32: } 33: }); 34: results.put(invoker.getUrl().getServiceKey(), future); 35: } 36: 37: // 阻塞等待执行执行结果,并添加到 resultList 中 38: List<Result> resultList = new ArrayList<Result>(results.size()); 39: int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 40: for (Map.Entry<String, Future<Result>> entry : results.entrySet()) { 41: Future<Result> future = entry.getValue(); 42: try { 43: Result r = future.get(timeout, TimeUnit.MILLISECONDS); 44: if (r.hasException()) { // 异常 Result ,打印错误日志,忽略 45: log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException()); 46: } else { // 正常 Result ,添加到 resultList 中 47: resultList.add(r); 48: } 49: } catch (Exception e) { // 异常,抛出 RpcException 异常 50: throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e); 51: } 52: } 53: 54: // 结果大小为空,返回空的 RpcResult 55: if (resultList.isEmpty()) { 56: return new RpcResult((Object) null); 57: // 结果大小为 1 ,返回首个 RpcResult 58: } else if (resultList.size() == 1) { 59: return resultList.iterator().next(); 60: } 61: // 返回类型为 void ,返回空的 RpcResult 62: if (returnType == void.class) { 63: return new RpcResult((Object) null); 64: } 65: 66: Object result; 67: // 【第 1 种】基于合并方法 68: if (merger.startsWith(".")) { 69: // 获得合并方法 Method 70: merger = merger.substring(1); 71: Method method; 72: try { 73: method = returnType.getMethod(merger, returnType); 74: } catch (NoSuchMethodException e) { 75: throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString()); 76: } 77: // 有 Method ,进行合并 78: if (method != null) { 79: if (!Modifier.isPublic(method.getModifiers())) { 80: method.setAccessible(true); 81: } 82: result = resultList.remove(0).getValue(); 83: try { 84: // 方法返回类型匹配,合并时,修改 result 85: if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { 86: for (Result r : resultList) { 87: result = method.invoke(result, r.getValue()); 88: } 89: // 方法返回类型不匹配,合并时,不修改 result 90: } else { 91: for (Result r : resultList) { 92: method.invoke(result, r.getValue()); 93: } 94: } 95: } catch (Exception e) { 96: throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e); 97: } 98: // 无 Method ,抛出 RpcException 异常 99: } else { 100: throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString()); 101: } 102: // 【第 2 种】基于 Merger 103: } else { 104: Merger resultMerger; 105: // 【第 2.1 种】根据返回值类型自动匹配 Merger 106: if (ConfigUtils.isDefault(merger)) { 107: resultMerger = MergerFactory.getMerger(returnType); 108: // 【第 2.2 种】指定 Merger 109: } else { 110: resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); 111: } 112: // 有 Merger ,进行合并 113: if (resultMerger != null) { 114: List<Object> rets = new ArrayList<Object>(resultList.size()); 115: for (Result r : resultList) { 116: rets.add(r.getValue()); 117: } 118: result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0))); 119: // 无 Merger ,抛出 RpcException 异常 120: } else { 121: throw new RpcException("There is no merger to merge result."); 122: } 123: } 124: // 返回 RpcResult 结果 125: return new RpcResult(result); 126: }
- 看似比较长,实际很易懂。
- 第 4 行:调用 集合 Directory#list(invocation) 方法,获得服务 Invoker 。
- 第 6 行:调用 方法级 URL#getMethodParameter(methodName, “merger”) 方法,获得 Merger 拓展名, 。
- 第 7 至 15 行:若未配置可用 Merger 拓展名,优先调用首个 的 Invoker 对象,其次调用首个 Invoker 对象。
- 第 17 至 23 行:通过反射,获得调用方法的返回类型 。
- 第 25 至 35 行:提交线程池,并行 执行,发起 RPC 调用,并添加 Future 到 results 中。
- 第 37 至 52 行:阻塞注意忽略 等待执行结果,并添加到 resultList 中。 ,分成正常 Result、异常 Result( )、Exception 三种情况。
- 第 54 至 56 行:结果大小为空空 ,返回 的 RpcResult 。
- 第 57 至 60 行:结果大小为 1首个 ,返回 RpcResult 。
- 第 61 至 64 行:返回类型为 void空 ,返回 的 RpcResult 。
- ========== 【第 1 种 】基于 Method 合并==========
- 第 68 行:若 本身 merger 为 “.” 开头,指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型 。
- 第 69 至 76 行:调用 合并方法 Method本身《dubbo源码-集群容错之MergeableCluster》 Class#getMethod(String name, Class<?>… parameterTypes) 方法,获得 。这个方法,意味着“合并方法的参数类型必须是返回结果类型 ”!!!具体原因,见 ,搜索 “在条件分支if ( merger.startsWith(”.”) ) {}“ 。
- 第 77 至 97 行:有循环 Method , 调用 Method#invoke(Object obj, Object… args) 方法,进行合并。
- 第 98 至 101 行:无 Method ,抛出 RpcException 异常。
- ========== 【第 2 种 】基于 Merger 合并 ==========
- 【第 2.1返回值类型 种】第 105 至 107 行:当 merger 为 “default” 或 “true” 时,调用 MergerFactory#getMerger(Class returnType) 方法,根据 自动匹配 Merger 。
- 【第 2.2指定 种】第 108 至 111 行:调用 ExtensionLoader#getExtension(merger) 方法啊,获得 Merger 。
- 第 112 至 118 行:有循环 Merger , 调用 Merger#merge(T… items) 方法,进行合并。
- 第 119 至 122 行:无 Method ,抛出 RpcException 异常。
