文章

过滤器(十)之CacheFilter

过滤器(十)之CacheFilter

本文基于 Dubbo 2.6.1 版本,望知悉。

1. 概述

本文分享 dubbo-filter-cache 项目的 CacheFilter 过滤器,用于服务消费者提供者中,提供 结果缓存 的功能。在 《Dubbo 用户指南 —— 结果缓存》 定义如下:

结果缓存 ,用于加速热门数据的访问速度,Dubbo 提供声明式缓存,以减少用户加缓存的工作量。

Dubbo 提供了三种实现:

  • lru:基于最近最少使用原则删除多余缓存,保持最热的数据被缓存。
  • threadlocal:当前线程缓存,比如一个页面渲染,用到很多 portal,每个 portal 都要去查用户信息,通过线程缓存,可以减少这种多余访问。
  • jcache:与 JSR107 集成,可以桥接各种缓存实现。
  • 具体的配置方式,在 《Dubbo 用户指南 —— 结果缓存》 文档中,已经详细分享。

本文涉及的类,如下图所示:

类图

2. CacheFilter

com.alibaba.dubbo.cache.filter.CacheFilter ,实现 Filter 接口,缓存过滤器实现类。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {

    /**
     * CacheFactory$Adaptive 对象。
     *
     * 通过 Dubbo SPI 机制,调用 {@link #setCacheFactory(CacheFactory)} 方法,进行注入
     */
    private CacheFactory cacheFactory;

    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 方法开启 Cache 功能
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
            // 基于 URL + Method 为维度,获得 Cache 对象。
            Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName())); // 添加 `method` 属性,是因为 JCache 需要。
            if (cache != null) {
                // 获得 Cache Key
                String key = StringUtils.toArgumentString(invocation.getArguments());
                // 从缓存中获得结果。若存在,创建 RpcResult 对象。
                Object value = cache.get(key);
                if (value != null) {
                    return new RpcResult(value);
                }
                // 服务调用
                Result result = invoker.invoke(invocation);
                // 若非异常结果,缓存结果
                if (!result.hasException()) {
                    cache.put(key, result.getValue());
                }
                return result;
            }
        }
        // 服务调用
        return invoker.invoke(invocation);
    }

}
  • 第 18 行:判断方法开启部分 Cache 功能。因为,一个服务里,可能只有方法开启了 Cache 功能。
  • 第 20 行:调用 URL + Method CacheFactory$Adaptive#getCache(url) 方法,基于为维度,获得 Cache 对象。
  • 第 23 行:调用 StringUtils#toArgumentString(Object[] args) 方法,获得 Cache Key 。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * 将参数数组,拼接成字符串。
 *
 * 1. 使用逗号分隔
 * 2. 使用 JSON 格式化对象
 *
 * @param args 参数数组
 * @return 字符串
 */
public static String toArgumentString(Object[] args) {
    StringBuilder buf = new StringBuilder();
    for (Object arg : args) {
        if (buf.length() > 0) {
            buf.append(Constants.COMMA_SEPARATOR); // 分隔
        }
        // 拼接参数
        if (arg == null || ReflectUtils.isPrimitives(arg.getClass())) {
            buf.append(arg);
        } else {
            try {
                buf.append(JSON.toJSONString(arg)); // 使用 JSON 格式化对象
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
                buf.append(arg);
            }
        }
    }
    return buf.toString();
}
  • 第 24 至 28 行:调用 存在 Cache#get(key) 方法,从缓存中获得结果。若存在,创建 RpcResult 对象并返回。
  • 第 30 行:调用 Invoker#invoke(invocation) 方法,服务调用。
  • 第 31 至 34 行:若非异常正常的,调用 Cache#put(key, value) 方法,缓存结果。
  • 第 35 行:返回调用结果。
  • 第 39 行:若不使用直接 Cache 功能,调用 Invoker#invoke(invocation) 方法,服务调用。

3. API 定义

3.1 Cache

com.alibaba.dubbo.cache.Cache ,缓存容器接口。方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Cache {

    /**
     * 添加键值
     *
     * @param key 键
     * @param value 值
     */
    void put(Object key, Object value);

    /**
     * 获得值
     *
     * @param key 键
     * @return 值
     */
    Object get(Object key);

}
  • Cache 是个缓存容器管理,内部可以缓存的键值。

3.2 CacheFactory

com.alibaba.dubbo.cache.CacheFactory ,Cache 工厂接口。方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SPI("lru")
public interface CacheFactory {

    /**
     * 获得缓存对象
     *
     * @param url URL 对象
     * @return 缓存对象
     */
    @Adaptive("cache")
    Cache getCache(URL url);

}
  • @SPI("lru") 拓展点注解,Dubbo SPI,默认为 "lru"
  • @Adaptive("cache") 注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Cache 实现,使用 URL.cache 属性。

3.2.1 AbstractCacheFactory

com.alibaba.dubbo.cache.support.AbstractCacheFactory ,Cache 工厂抽象类。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractCacheFactory implements CacheFactory {

    /**
     * Cache 集合
     *
     * key:URL
     */
    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();

    @Override
    public Cache getCache(URL url) {
        // 获得 Cache 对象
        String key = url.toFullString();
        Cache cache = caches.get(key);
        // 不存在,创建 Cache 对象,并缓存
        if (cache == null) {
            caches.put(key, createCache(url));
            cache = caches.get(key);
        }
        return cache;
    }

    /**
     * 创建 Cache 对象
     *
     * @param url URL
     * @return Cache 对象
     */
    protected abstract Cache createCache(URL url);

}

4. LRU 实现

lru ,基于最近最少使用原则删除多余缓存,保持最热的数据被缓存。

4.1 LruCache

com.alibaba.dubbo.cache.support.lru.LruCache ,实现 Cache 接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class LruCache implements Cache {

    /**
     * 缓存集合
     */
    private final Map<Object, Object> store;

    public LruCache(URL url) {
        // `"cache.size"` 配置项,设置缓存大小
        final int max = url.getParameter("cache.size", 1000);
        // 创建 LRUCache 对象
        this.store = new LRUCache<Object, Object>(max);
    }

    @Override
    public void put(Object key, Object value) {
        store.put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get(key);
    }

}
  • "cache.size" 大小配置项,设置缓存。
  • 基于 com.alibaba.dubbo.common.utils.LRUCache 实现。

4.1.1 LRUCache

com.alibaba.dubbo.common.utils.LRUCache ,实现 LinkedHashMap 类,LRU 缓存实现类。代码比较多,胖友点击链接查看。笔者说几个关键点:

  • 构造方法,设置 LRUCache 为按访问顺序(调用get方法)的链表。代码如下:
1
2
3
4
public LRUCache(int maxCapacity) {
    super(16, DEFAULT_LOAD_FACTOR, true); // 最后一个参数,按访问顺序(调用get方法)的链表
    this.maxCapacity = maxCapacity;
}
  • 重写 removeEldestEntry 方法返回 true 值,指定插入元素时移除最老的元素。代码如下:
1
2
3
4
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<K, V> eldest) {
    return size() > maxCapacity;
}
  • 根据链表中元素的顺序可以分为:按插入顺序的链表,和按访问顺序(调用get方法)的链表。默认是按插入顺序排序,如果指定按访问顺序排序,那么调用get方法后,会将这次访问的元素移至链表尾部,不断访问可以形成按访问顺序排序的链表。

  • lock 属性,锁。避免并发读写,导致死锁。参见 《疫苗:JAVA HashMap 的死循环》。涉及该属性的方法示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public V put(K key, V value) {
    try {
        lock.lock();
        return super.put(key, value);
    } finally {
        lock.unlock();
    }
}

@Override
public V remove(Object key) {
    try {
        lock.lock();
        return super.remove(key);
    } finally {
        lock.unlock();
    }
}

4.2 LruCacheFactory

com.alibaba.dubbo.cache.support.lru.LruCacheFactory ,实现 AbstractCacheFactory 抽象类,代码如下:

1
2
3
4
5
6
7
8
public class LruCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new LruCache(url);
    }

}

5. ThreadLocal 实现

基于 ThreadLocal ,当前线程缓存,比如一个页面渲染,用到很多 portal,每个 portal 都要去查用户信息,通过线程缓存,可以减少这种多余访问。

5.1 ThreadLocalCache

com.alibaba.dubbo.cache.support.threadlocal.ThreadLocalCache ,实现 Cache 接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ThreadLocalCache implements Cache {

    private final ThreadLocal<Map<Object, Object>> store; // 线程变量

    public ThreadLocalCache(URL url) {
        this.store = new ThreadLocal<Map<Object, Object>>() {

            @Override
            protected Map<Object, Object> initialValue() {
                return new HashMap<Object, Object>();
            }

        };
    }

    @Override
    public void put(Object key, Object value) {
        store.get().put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get().get(key);
    }

}
  • 基于 ThreadLocal 实现,相当于一个线程,一个 ThreadLocalCache 对象。
  • ThreadLocalCache 目前没有过期或清理机制,所以需要注意

5.2 ThreadLocalCacheFactory

com.alibaba.dubbo.cache.support.threadlocal.ThreadLocalCacheFactory ,实现 AbstractCacheFactory 抽象类,代码如下:

1
2
3
4
5
6
7
8
public class ThreadLocalCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new ThreadLocalCache(url);
    }

}

6. JCache 实现

JSR107 集成,可以桥接各种缓存实现。

6.1 JCache

com.alibaba.dubbo.cache.support.jcache.JCache ,实现 Cache 接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class JCache implements com.alibaba.dubbo.cache.Cache {

    private final Cache<Object, Object> store;

    public JCache(URL url) {
        // 获得 Cache Key
        String method = url.getParameter(Constants.METHOD_KEY, "");
        String key = url.getAddress() + "." + url.getServiceKey() + "." + method;
        // `"jcache"` 配置项,为 Java SPI 实现的全限定类名
        // jcache parameter is the full-qualified class name of SPI implementation
        String type = url.getParameter("jcache");

        // 基于类型,获得 javax.cache.CachingProvider 对象,
        CachingProvider provider = type == null || type.length() == 0 ? Caching.getCachingProvider() : Caching.getCachingProvider(type);
        // 获得 javax.cache.CacheManager 对象
        CacheManager cacheManager = provider.getCacheManager();
        // 获得 javax.cache.Cache 对象
        Cache<Object, Object> cache = cacheManager.getCache(key);
        // 不存在,则进行创建
        if (cache == null) {
            try {
                // 设置 Cache 配置项
                // configure the cache
                MutableConfiguration config =
                        new MutableConfiguration<Object, Object>()
                                // 类型
                                .setTypes(Object.class, Object.class)
                                // 过期策略,按照写入时间过期。通过 `"cache.write.expire"` 配置项设置过期时间,默认为 1 分钟。
                                .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, url.getMethodParameter(method, "cache.write.expire", 60 * 1000))))
                                .setStoreByValue(false)
                                // 设置 MBean
                                .setManagementEnabled(true)
                                .setStatisticsEnabled(true);
                // 创建 javax.cache.Cache 对象
                cache = cacheManager.createCache(key, config);
            } catch (CacheException e) {
                // 初始化 cache 的并发情况
                // concurrent cache initialization
                cache = cacheManager.getCache(key);
            }
        }

        this.store = cache;
    }

    @Override
    public void put(Object key, Object value) {
        store.put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get(key);
    }

}

6.2 JCacheFactory

com.alibaba.dubbo.cache.support.jcache.JCacheFactory ,实现 AbstractCacheFactory 抽象类,代码如下:

1
2
3
4
5
6
7
8
public class JCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new JCache(url);
    }

}
本文由作者按照 CC BY 4.0 进行授权