RocketMQ源码详解:事务消息、批量消息、延迟消息
◆ 概述
在上文中,我们讨论了消费者对于消息拉取的实现,对于
这个黑盒的心脏部分,我们顺着消息的发送流程已经将其剖析了大半部分。本章我们不妨乘胜追击,接着讨论各种不同的消息的原理与实现。
◆ 事务消息
◆ 概念
RocketMQ 中的事务消息功能,实际上是 分布式事务中的本地事务表 的实现,只不过,在这里用消息中间件来代替了数据库,同时也帮我们做好了回查的操作。
在这点上,RocketMQ 和 Kafka 是截然不同的,kafka 的事务是用来实现 Exacltly Once 语义,且该语义主要用来流计算中,即在 "从 Topic 中读 -> 计算 -> 存到 Topic" 保证不被重复计算。
◆ 事务流程
客户端发送 half 消息
吐槽一下为什么要叫半消息(half message),叫 prepare 消息不是更直观吗
Broker 将 half 消息持久化
客户端根据事务执行结果,发送 Commit / Rollback 消息
Broker 收到 Commit 时,将事务消息对消费者可见。收到 Rollback 时,将消息丢弃
◆ 补偿
Broker 过久未收到事务执行结果,询问客户端执行结果
客户端收到结果查询请求,执行回查方法,发送 Commit / Rollback 方法
Broker 根据事务执行结果做出对应处理
◆ 源码流程
◆ 步
在设置好了事务监听器后(执行事务 与 事务回查),就可以发送事务消息
在将事务消息交给发送方法后,客户端首先会为消息添加事务消息的标识
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
然后将该事务消息会像普通的同步消息一样发送(且是同步发送)
sendResult = this.send(msg);
具体发送流程见:RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息
◆ 第二步
在 Broker 端接收到消息以后,会走与普通消息相同的底层通道(因为这个消息本身就只是个加上了 事务flag 的普通消息),然后由
TransactionalMessageService 来对这个消息进行额外处理。
首先会对该消息放入 real topic 属性和 real queue 属性,然后将消息 Topic 替换为用于处理所有事务消息的特殊的 Topic,当然该 Topic 对消费者是不可见的。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); // 设置标记为未收到结果 msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); // 替换到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC) msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner;}
完成后,会送到 MessageStore 像普通消息一样处理
普通消息的具体流程见 RocketMQ源码详解 | Broker篇 · 其二:文件系统
◆ 第三步
回到 Producer 端,在事务消息发送完成后,该方法会使用专门的线程池执行事务
// 2.执行本地事务,更新事务获取状态localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
然后对本地的事务执行状态进行处理,也就是将该执行状态上报
this.endTransaction(msg, sendResult, localTransactionState, localException);
这里会发送一条 oneway 命令给 Broker 端,且使用的是
RequestCode.END_TRANSACTION 请求码
// 事务结果报告(可能是 commit 或 rollback)public static final int END_TRANSACTION = 37;
完成处理后,该方法会将事务的发送结果和本地事务的执行结构都返回给上层 API
◆ 第四步
在 Broker 端,这里会由 EndTransactionProcessor 处理器来处理该请求码
然后,根据事务的执行结果来做不同的处理
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 事务执行成功,尝试完成事务 // 获取 half 消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { if (res.getCode() == ResponseCode.SUCCESS) { // 将 half 消息取出,构造真实消息,然后投入实际上的 Topic /* pass */ RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { /* * 找到半消息,进行删除 * 删除并不是物理上的删除,因为物理上的删除的代价十分的高昂,而是写入一条具有相同事务id的消息到 op Topic */ this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; }}
如果需要回滚,则对相应的半消息进行删除,且和上面一样,并不是物理上的删除,而是发送具有相同事务 id 的消息到 OP Topic,来标记这个事务已经完成了(Commit/Rollback), OP Topic 也是一个特殊的 Topic,同样对消费者不可见。
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 事务执行失败,进行 half 消息的回滚 // 首先找到 half 消息 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 进行删除 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; }}
当这些都做完后,一次事务就完成了。
◆ 补偿
当然啦,以上是顺利的情况,我们当然不能指望事务每一次都能执行成功、网络分区和宕机事件永远不会发生。
在一段时间后,如果客户端没有对事务的状态进行上报(或者上报的状态不是 Commit 或 Rollback,而是 Unknown), Broker 端当然就要进行事务状态的回查。
在 BrokerController 启动的时候,会开启事务状态检测服务,该服务会通过循环调用
TransactionalMessageServiceImpl.check() 方法,不断的扫描未结束的事务,同时对超过指定时间还不知道状态的事务进行回查操作。
check() 方法是事务回查的核心,由于很长,我们先来看部分(删减了没人在意的 Log)
// 首先找到存储所有 half 消息的 TopicString topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);// 对其中每一个 queue 进行检查for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); // 获得对应的 op 消息所在的 queue MessageQueue opQueue = getOpQueue(messageQueue); // 获取未处理的 half 消息的起始偏移量 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 获取 op 消息的 queue 的起始偏移量 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); // 用来记录已经被处理了的 op 消息的偏移量 List<Long> doneOpOffset = new ArrayList<>(); // 用来记录已经完成了的 half 消息的偏移量 // key: halfOffset, value: opOffset HashMap<Long, Long> removeMap = new HashMap<>(); PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
在 fillOpRemoveMap 方法中,主要是将 op 消息取出,来标记可以被移除的 half 消息(op 消息的存在代表对应事务的结束)
/** * 读取op消息,解析op消息,填充removeMap * * @param removeMap 要删除的半消息,key: halfOffset,value: opOffset * @param opQueue Op message queue. * @param pullOffsetOfOp op message queue 的起始偏移量 * @param miniOffset half message queue 的当前小偏移量 * @param doneOpOffset 存储已处理的 op 消息 * @return 获取到的 Op 消息 */private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) { // 首先通过 queue 获取 op 消息,大数量为 32 条 PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32); /* pass: pullResult 消息的意外状态的处理 */ List<MessageExt> opMsg = pullResult.getMsgFoundList(); for (MessageExt opMessageExt : opMsg) { // op 消息的 body 存储的是对应的 half 消息的偏移量, 现在将其取出 Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); // 感觉这里的 Tag 并没有什么意义,无论是 Commit 还是 Rollback 都会加入这个 Tag if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) { // 在 已处理偏移量 之前的话则可直接放入 已处理偏移量集合 if (queueOffset < miniOffset) { doneOpOffset.add(opMessageExt.getQueueOffset()); } else { // 否则放入需要移除的 half 的消息的集合 removeMap.put(queueOffset, opMessageExt.getQueueOffset()); } } } return pullResult;}
然后进入到 check 方法的第二部分
while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break; // 推进小已处理偏移量 if (removeMap.containsKey(i)) /* 如果该 half 消息存在对应的 op 消息,说明已经被处理了(commit/rollback) */ { // 取出放入到已处理偏移量队列 Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else /* 否则说明当前 half 消息悬而未决 */ { // 取出对应的半消息 GetResult getResult = getHalfMsg(messageQueue, i); /* pass: 半消息不存在时的意外处理 */ /* * 检测是否要丢弃或跳过 * 丢弃条件: 当前事务已经超过了大回查次数(15次) * 跳过条件: 已经超过了过期文件大保留时间(72小时) */ if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // 处理并推进偏移量 // 具体的处理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 这个 Topic,等待手动处理 listener.resolveDiscardMsg(msgExt); // 进入到下一个 half 消息 newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { break; }
上面的方法很好理解,只是对于已经被标记结束的事务的处理、和未结束事务的补足
接下来是第三部分,这里将继续对未结束事务的补足,与进行可能的回查操作
// half 消息具有小的检查时间(免疫时间), 检测时间以内可以跳过回查, 重新投入 half 消息的 Topic long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { break; } } /* * 对于当前事务的回查操作,需要满足三个条件之一 * 1.当前 op 消息的集合为空,且已经超过了小检查时间(免疫时间) * 2.大偏移量的 op 消息的生成时间 已经超过了 小检查时间 * 3.关闭小检查时间 */ List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { // 先将当前 half 消息放回 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 然后向 Product 发送检测消息 listener.resolveHalfMsg(msgExt); } else { // 否则更新 op 消息集合,以确保能够断言该 half 消息的状态 pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); continue; }} newOffset = i + 1; i++;}
上面这段代码主要围绕 "是否进行回查" 展开,且涉及到 "免疫时间"。
在一个事务消息被发送后,对应事务的执行当然需要一定的执行时间,如果我们不设置这个时间立刻进行回查,那么很有可能时候事务还没执行完,对于大多数情况下还没执行完的事务进行回查,毫无疑问带来的收益很低。所以我们需要设定一个时间,在这个时间内的事务先暂时不回查,这个时间就叫做"免疫时间"。
然后再来看下需要进行回查的三种情况:
当 op 消息的集合为空,说明当前还没有收到让当前事务结束的通知,且超过了"免疫时间",故回查
当前 op 消息大偏移量的生成时间超过了"免疫时间",说明该事务的提交消息可能丢失了,故回查
不启用 "免疫时间"
其中发送的回查消息的请求码为
RequestCode.CHECK_TRANSACTION_STATE ,发送的也是 oneway 消息
后的第四部分,同时更新 half 和 op 消息在 Queue 中的偏移量
// 对所有的 half 消息计算完成后,更新偏移量if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}// 根据已经被标记为完成的 op 消息更新偏移量long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) { // 如果不等,说明并不是所有的 op 消息都被标记为完成了 // 所以我们只将偏移量更新到个未完成的 op 消息的位置,其后面的 op 消息会在下次重复处理 transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}
然后在 Producer 这边,将由
ClientRemotingProcessor.checkTransactionState() 来处理回查操作
// 获取事务 IDString transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) { messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) { // 从 MQClientFactory 找到注册的对应 Producer MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); // 让 Producer 检查在对应 IP 上的事务状态 producer.checkTransactionState(addr, messageExt, requestHeader); } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); }} else { log.warn("checkTransactionState, pick producer group failed");}
再进入
producer.checkTransactionState() 看看 Producer 是怎样检查事务状态的
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();// 取出当前 Producer 的事务监听器TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { if (transactionCheckListener != null) { // 调用其的事务回查方法 localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } // 再将事务执行结果其发回给 Broker this.processTransactionState( localTransactionState, group, exception);} else { log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}
后发回的方法做的事情和在一开始发送事务状态的方法,所做的事情是一样的。Broker 做的处理也是一样的。
这样,补偿流程就执行完了。
◆ 批量消息
◆ 概念
在消息队列中,批量消息也是一个重要的部分,将消息压缩在一起发送不仅可以减少带宽的消耗,还能节省头部占用的空间。
有点失望的是,RocketMQ 对于批量消息的实现有点"粗糙"了
◆ 源码流程
首先,在调用 send() 的 batch 版本后,会先对批量消息进行校验
批量消息不允许延时、不允许发送到重试 Topic,且要求发送到的 Topic 必须是同一个 Topic
List<Message> messageList = new ArrayList<Message>(messages.size());Message first = null;for (Message message : messages) { if (message.getDelayTimeLevel() > ) { throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching"); } if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { throw new UnsupportedOperationException("Retry Group is not supported for batching"); } if (first == null) { first = message; } else { if (!first.getTopic().equals(message.getTopic())) { throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); } if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); } } messageList.add(message);}MessageBatch messageBatch = new MessageBatch(messageList);
在校验完成,且都放到一个 List 之后,接下来的步骤和普通的消息发送都差不多,只是在编码上理所当然的存在着不同
public static byte[] encodeMessages(List<Message> messages) { //TO DO refactor, accumulate in one buffer, avoid copies List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size()); int allSize = ; for (Message message : messages) { // 编码每一个消息 byte[] tmp = encodeMessage(message); encodedMessages.add(tmp); allSize += tmp.length; } // 放到后的大集合中 byte[] allBytes = new byte[allSize]; int pos = ; for (byte[] bytes : encodedMessages) { System.arraycopy(bytes, , allBytes, pos, bytes.length); pos += bytes.length; } return allBytes;}
然后使用
RequestCode.SEND_BATCH_MESSAGE 这个状态码发送出去。
在 Broker 端,其投入的过程大体上和普通消息类似,但是其后的持久化到硬盘时,这块批量消息被拆分为了普通的单条消息。
即 RocketMQ 使用批量消息只减少了发送时的宽带传输,对于存储与交给消费者的部分并没有获得优化
// 拆分批量消息为每一个普通消息while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE final int msgPos = messagesByteBuff.position(); final int msgLen = messagesByteBuff.getInt(); final int bodyLen = msgLen - 40; //only for log, just estimate it /* pass: 当作普通消息存储 */ queueOffset++; msgNum++; messagesByteBuff.position(msgPos + msgLen);}
◆ 延时消息
◆ 概念
在业务中,有时候有一些延时提交任务的需求,这时候就可以使用延时消息,即在投递一部分时间后才对消费者可见。
不过,在 RocketMQ 中,延迟级别并不支持自定义,而是具有固定的延迟级别。
不过商业版的 阿里云MQ 可以支持秒精度的自定义延迟时间,果然是为了阉割社区版来赚钱吗
◆ 源码流程
RocketMQ 对于延时消息的处理主要在于 Broker 端,所以我们只需要看在 Broker 对延时级别的处理。
首先,在 CommitLog 的 put 中,会对延迟级别进行判断,如果存在,会在这进行进行 Topic 的替换,将其存储到对应的延迟级别的 Queue
if (msg.getDelayTimeLevel() > ) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);}
然后会被在 DefaultMessageStore 中初始化的 ScheduleMessageService 处理
首先,该服务在启动时会进行初始化
public void start() { // 保证只被执行一次 if (started.compareAndSet(false, true)) { // 加载本地快照 super.load(); this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // 取出每一个级别 Integer level = entry.getKey(); // 当前延迟级别对应的延迟时间 Long timeDelay = entry.getValue(); // 该延迟级别之前消费到的自己的队列的偏移量 Long offset = this.offsetTable.get(level); if (null == offset) { offset = L; } // 每一个延迟级别设置一个定时任务 if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 定时持久化各个延迟级别的偏移量 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}
每一个延迟级别的 Queue 都有对应的定时任务,且都会执行以下方法
public void executeOnTimeup() { // 找到自己延迟级别的消费队列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { // 根据消费偏移量将指定的 MappedFile 文件加载进来 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = ; // 遍历每一个消息的索引 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); /* pass */ long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= ) /* 目标时间小于当起时间,可以执行 */ { // 根据偏移量取出消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { // 将延迟消息恢复成原本消息的样子 MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); /* pass */ // 投入真实的 Topic PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); /* pass: 更新度量信息 */ } catch (Exception e) { /* pass */ } } } else /* 否则,这个消息需要被消费的时间到了再通知我 */ { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); // 更新消费偏移量 ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for // 走到这里,说明暂时没有需要消费的延时消息 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 小睡一会 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) /* pass */ } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);}
可以看出,延迟消息的实现还是十分简单的,由于先投入的延时消息必先快于后投入的消息的到期,所以只需要不断的拉取各个延迟级别对应的队列 的头部的延迟消息即可。这也是只支持固定级别的延迟消息带来的好处。
来源:
https://www.cnblogs.com/enoc/p/rocketmq-so-no-nana.html
相关文章