基于Redis实现延时队列服务
一、背景
订单下单之后超过30分钟用户未支付,需要取消订单
订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。
二、几种延时队列
1.Java中java.util.concurrent.DelayQueue
优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
2.Rocketmq延时队列
优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息
3.Rabbitmq延时队列(TTL+DLX实现)
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列
* 过期延时消息实时获取
* 高可用性
三、 基于Redis实现
1.0版本
功能特性
* 实时性:存在一定的时间误差(定时任务间隔)
* 支持指定消息remove
* 高可用性
整体结构
- Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value为messages pool中消息ID,score为过期时间(分为多个队列是为了提高扫描的速度)
- Timed Task定时任务,负责扫描处理每个队列过期消息
消息结构
* keys:消息过期之后发送mq的keys
* body:消息过期之后发送mq的body,提供给消费这做具体的消息处理
* delayTime:延时发送时间(默认,delayTime、expectDate有一个即可)
* expectDate:期望发送时间
流程
注:上图1、2、3或者2、3是一个事务操作
取出过期消息过程是通过一个外部定时任务每隔1min分钟去查询队列中过期的消息,然后发送mq && remove
2.0版本
2.0版本在1.0上做了一个优化,废弃掉了1min定时任务触发过期消息发送,采用了java Lock await/singlal方式实现过期消息的实时发送低延时
多节点部署结构:
- worker:pull job拉取到的过期消息会交给一个worker thread去处理,这样的好处是处理过期的消息实时性更高(pull job不必等去除过期消息全部处理完成在继续去拉取新的过期数据)
- zookeeper coordinate:通过zk的操作来完成对队列的重新分配工作,daemon thread监听zk节点的创建和删除
- 主要流程:
为每个分配queue创建一个pull job 。
pull job首先会去queue中查询是否有过期消息:
Y:将取出消息交给worker处理
N:查询queue中后一个成员(zset结构默认按score递增排序),如果为空,则await;不为空则await(成员score-System.currentTimeMillis())
当部署服务有新增,延时队列服务会重新计算得到当前处理队列,并将之前创建pull job cancel,为新处理队列重新创建pull job。删除同理。
相关文章