大厂面试Kafka,一定会问到的幂等性

2020-05-27 00:00:00 数据 执行 消息 业务 会话

文章来自微信公众号:WeCoding

01 幂等性如此重要

Kafka作为分布式MQ,大量用于分布式系统中,如消息推送系统、业务平台系统(如结算平台),就拿结算来说,业务方作为上游把数据打到结算平台,如果一份数据被计算、处理了多次,产生的后果将会特别严重。

02 哪些因素影响幂等性

使用Kafka时,需要保证exactly-once语义。要知道在分布式系统中,出现网络分区是不可避免的,如果kafka broker 在回复ack时,出现网络故障或者是full gc导致ack timeout,producer将会重发,如何保证producer重试时不造成重复or乱序?又或者producer 挂了,新的producer并没有old producer的状态数据,这个时候如何保证幂等?即使Kafka 发送消息满足了幂等,consumer拉取到消息后,把消息交给线程池workers,workers线程对message的处理可能包含异步操作,又会出现以下情况:

  • 先commit,再执行业务逻辑:提交成功,处理失败 。造成丢失
  • 先执行业务逻辑,再commit:提交失败,执行成功。造成重复执行
  • 先执行业务逻辑,再commit:提交成功,异步执行fail。造成丢失

本文将针对以上问题作出讨论

03 Kafka保证发送幂等性

针对以上的问题,kafka在0.11版新增了幂等型producer和事务型producer。前者解决了单会话幂等性等问题,后者解决了多会话幂等性。

单会话幂等性

为解决producer重试引起的乱序和重复。Kafka增加了pid和seq。Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个tp也会维护pid-seq的映射,并且每Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则不保存(inSequence方法)。

ProducerStateManager.scala

private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
    validationType match {
      case ValidationType.None =>

      case ValidationType.EpochOnly =>
        checkProducerEpoch(producerEpoch, offset)

      case ValidationType.Full =>
        checkProducerEpoch(producerEpoch, offset)
        checkSequence(producerEpoch, firstSeq, offset)
    }
}

private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = {
  if (producerEpoch != updatedEntry.producerEpoch) {
    if (appendFirstSeq != ) {
      if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
        throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at offset $offset in " +
          s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number)")
      } else {
        throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker at offset $offset" +
          s"in partition $topicPartition. It is possible that the last message with the producerId=$producerId has " +
          "been removed due to hitting the retention limit.")
      }
    }
  } else {
    val currentLastSeq = if (!updatedEntry.isEmpty)
      updatedEntry.lastSeq
    else if (producerEpoch == currentEntry.producerEpoch)
      currentEntry.lastSeq
    else
      RecordBatch.NO_SEQUENCE

    if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != ) {
      // We have a matching epoch, but we do not know the next sequence number. This case can happen if
      // only a transaction marker is left in the log for this producer. We treat this as an unknown
      // producer id error, so that the producer can check the log start offset for truncation and reset
      // the sequence number. Note that this check follows the fencing check, so the marker still fences
      // old producers even if it cannot determine our next expected sequence number.
      throw new UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " +
        s"for producerId=$producerId at offset $offset in partition $topicPartition, but the next expected " +
        "sequence number is not known.")
    } else if (!inSequence(currentLastSeq, appendFirstSeq)) {
      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId at " +
        s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
        s"$currentLastSeq (current end sequence number)")
    }
  }
}

  private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {
    nextSeq == lastSeq + 1L || (nextSeq ==  && lastSeq == Int.MaxValue)
  }

相关文章