请求合并的三种方式,大大提高接口性能!
前言
Hystrix Collapser
hystrix
<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />
collapser
我们在需要合并的方法上添加 @HystrixCollapser 注解,在定义好的合并方法上添加 @HystrixCommand 注解; single 方法只能传入一个参数,多参数情况下需要自己包装一个参数类,而 batch 方法需要 java.util.List<SingleParam>
;single 方法返回 java.util.concurrent.Future<SingleReturn>
, batch 方法返回java.util.List<SingleReturn>
,且要保证返回的结果数量和传入的参数数量一致。
public class HystrixCollapserSample {
@HystrixCollapser(batchMethod = "batch")
public Future<Boolean> single(String input) {
return null; // single方法不会被执行到
}
public List<Boolean> batch(List<String> inputs) {
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}
源码实现
在 spring-boot 内注册切面类的 bean,里面包含 @HystrixCollapser 注解切面; 在方法执行时检测到方法被 HystrixCollapser 注解后,spring 调用 methodsAnnotatedWithHystrixCommand
方法来执行 hystrix 代理;hystrix 获取一个 collapser 实例(在当前 scope 内检测不到即创建); hystrix 将当前请求的参数提交给 collapser, 由 collapser 存储在一个 concurrentHashMap (RequestArgumentType -> CollapsedRequest)
内,此方法会创建一个 Observable 对象,并返回一个 观察此对象的 Future 给业务线程;collpser 在创建时会创建一个 timer 线程,定时消费存储的请求,timer 会将多个请求构造成一个合并后的请求,调用 batch 执行后将结果顺序映射到输出参数,并通知 Future 任务已完成。
需要注意,由于需要等待 timer 执行真正的请求操作,collapser 会导致所有的请求的 cost 都会增加约 timerInterval/2 ms;
配置
collapserKey,这个可以不用配置,hystrix 会默认使用当前方法名; batchMethod,配置 batch 方法名,我们一般会将 single 方法和 batch 方法定义在同一个类内,直接填方法名即可; scope,坑的配置项,也是逼我读源码的元凶, com.netflix.hystrix.HystrixCollapser.Scope
枚举类,有 REQUEST, GLOBAL 两种选项,在 scope 为 REQUEST 时,hystrix 会为每个请求都创建一个 collapser, 此时你会发现 batch 方法执行时,传入的请求数总为1。而且 REQUEST 项还是默认项,不明白这样请求合并还有什么意义;collapserProperties
, 在此选项内我们可以配置 hystrixCommand 的通用配置;
maxRequestsInBatch, 构造批量请求时,使用的单个请求的大数量; timerDelayInMilliseconds, 此选项配置 collapser 的 timer 线程多久会合并一次请求; requestCache.enabled, 配置提交请求时是否缓存;
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})
BatchCollapser
设计
是一种 Collection,类似于 ArrayList 或 Queue,可以存重复元素且有顺序; 在多线程环境中能安全地将里面的数据全取出来进行消费,而不用自己实现锁。
java.util.concurrent
包内的 LinkedBlockingDeque 刚好符合要求,首先它实现了 BlockingDeque 接口,多线程环境下的存取操作是安全的;此外,它还提供 drainTo(Collection<? super E> c, int maxElements)
方法,可以将容器内 maxElements 个元素安全地取出来,放到 Collection c 中。实现
public class BatchCollapser<E> implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
private Handler<List<E>, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
this.clean();
} catch (Exception e) {
logger.error("clean container exception", e);
}
}, , interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) {
clean();
}
}
private void clean() {
List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) {
return;
}
try {
cleaner.handle(transferList);
} catch (Exception e) {
logger.error("batch execute error, transferList:{}", transferList, e);
}
}
public static <E> BatchCollapser getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
if (instance.get(jobClass) == null) {
instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}
由于合并器的全局性需求,需要将合并器实现为一个单例,另外为了提升它的通用性,内部使用使用 concurrentHashMap 和 double check 实现了一个简单的单例工厂。 为了区分不同用途的合并器,工厂需要传入一个实现了 Handler 的实例,通过实例的 class 来对请求进行分组存储。 由于 java.util.Timer
的阻塞特性,一个 Timer 线程在阻塞时不会启动另一个同样的 Timer 线程,所以使用ScheduledExecutorService
定时启动 Timer 线程。
ConcurrentHashMultiset
设计
ConcurrentHashMultiset
,它不同于普通的 set 结构存储相同元素时直接覆盖原有元素,而是给每个元素保持一个计数 count, 插入重复时元素的 count 值加1。而且它在添加和删除时并不加锁也能保证线程安全,具体实现是通过一个 while(true)
循环尝试操作,直到操作够所需要的数量。ConcurrentHashMultiset
这种排重计数的特性,非常适合数据统计这种元素在短时间内重复率很高的场景,经过排重后的数量计算,可以大大降低下游服务器的压力,即使重复率不高,能用少量的内存空间换取系统可用性的提高,也是很划算的。实现
ConcurrentHashMultiset
进行请求合并与使用普通容器在整体结构上并无太大差异,具体类似于:if (ConcurrentHashMultiset.isEmpty()) {
return;
}
List<Request> transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= ) {
return;
}
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});
小结
hystrix collapser
: 需要每个请求的结果,并且不在意每个请求的 cost 会增加;BatchCollapser
: 不在意请求的结果,需要请求合并能在时间和数量两个维度上触发;ConcurrentHashMultiset
:请求重复率很高的统计类场景;
BatchCollapser
和 ConcurrentHashMultiset
结合一下,在BatchCollapser 里使用 ConcurrentHashMultiset
作为容器,这样就可以结合两者的优势了。相关文章