 * Broker states are the possible state that a kafka broker can be in.
 * A broker should be only in one state at a time.
 * The expected state transition with the following defined states is:
 *                +-----------+
 *                |Not Running|
 *                +-----+-----+
 *                      |
 *                      v
 *                +-----+-----+
 *                |Starting   +--+
 *                +-----+-----+  | +----+------------+
 *                      |        +>+RecoveringFrom   |
 *                      v          |UncleanShutdown  |
 *               +-------+-------+ +-------+---------+
 *               |RunningAsBroker|            |
 *               +-------+-------+<-----------+
 *                       |
 *                       v
 *                +-----+------------+
 *                |PendingControlled |
 *                |Shutdown          |
 *                +-----+------------+
 *                      |
 *                      v
 *               +-----+----------+
 *               |BrokerShutting  |
 *               |Down            |
 *               +-----+----------+
 *                     |
 *                     v
 *               +-----+-----+
 *               |Not Running|
 *               +-----------+
 * Custom states is also allowed for cases where there are custom kafka states for different scenarios.
sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }







管理一个Kafka broker的生命周期。处理所有的功能,包括启动和关闭一个Kafka节点。

class KafkaServer() {
  // 开启一个Kafka server,初始化LogManager, SocketServer和请求处理器
  def startup(): Unit = {
    try {

      if (isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

      if (startupComplete.get)

      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) {
        // 1. 设置broker状态为STARTING状态

        // 2. 设置zookeeper
        /* setup zookeeper */

        // 3. 获取集群id
        /* Get or create cluster_id */
        _clusterId = getOrGenerateClusterId(zkClient)
        info(s"Cluster ID = $clusterId")

        /* load metadata */
        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs

        /* check cluster id */
        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
          throw new InconsistentClusterIdException(
            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")

        /* generate brokerId */
        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
        logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
        this.logIdent = logContext.logPrefix

        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
        // applied after DynamicConfigManager starts.
        // 向zookeeper注册broker动态配置属性

        // 4. 启动调度器
        /* start scheduler */
        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

        /* create and configure metrics */
        val reporters = new util.ArrayList[MetricsReporter]
        reporters.add(new JmxReporter(jmxPrefix))
        val metricConfig = KafkaServer.metricConfig(config)
        metrics = new Metrics(metricConfig, reporters, time, true)

        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats

        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)

        logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

        // 5. 启动日志管理器
        /* start log manager */
        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)

        // 6. 元数据缓存
        metadataCache = new MetadataCache(config.brokerId)
        // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
        // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
        tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
        credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

        // Create and start the socket server acceptor threads so that the bound port is known.
        // Delay starting processors until the end of the initialization sequence to ensure
        // that credentials have been loaded before processing authentications.
        // 7. 创建broker上的SocketServer并启动,NIO的服务端
        socketServer = new SocketServer(config, metrics, time, credentialProvider)
        socketServer.startup(startupProcessors = false)

        // 8. 创建副本管理器并启动
        /* start replica manager */
        replicaManager = createReplicaManager(isShuttingDown)

        val brokerInfo = createBrokerInfo
        val brokerEpoch = zkClient.registerBroker(brokerInfo)

        // Now that the broker is successfully registered, checkpoint its metadata
        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))

        /* start token manager */
        tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)

        // 9. 开启KafkaController
        /* start kafka controller */
        // broker 0,broker 1,broker 2,启动完了从里面选举出来一个controller节点,作为主节点,用来管理整个Kafka集群
        kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)

        adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

        // 10. 启动 group coordinator
        // group coordinator是负责进行 consumer 的 group 成员与 offset 管理
        /* start group coordinator */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)

        // 协调事务的,https://blog.csdn.net/cq_pf/article/details/106266505
        /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
        transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)

        /* Get the authorizer and initialize it if one is specified.*/
        authorizer = config.authorizer
        val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
          case Some(authZ) =>
            authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.mapValues(_.toCompletableFuture).toMap
          case None =>
            brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap

        val fetchManager = new FetchManager(Time.SYSTEM,
          new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,

        /* start processing requests */
        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
          fetchManager, brokerTopicStats, clusterId, time, tokenManager)

        // 11. 处理队列里面的请求的 numIoThreads 默认 8
        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

        socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
          controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
            fetchManager, brokerTopicStats, clusterId, time, tokenManager)
          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
            1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)


        /* Add all reconfigurables for config change notification before starting config handlers */

        // 12. 创建topic配置管理器并启动
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                           ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                           ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                           ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)

        // 13. 启动 socketServer 的 Processor
        // 14. 设置broker状态为RUNNINGASBROKER
        shutdownLatch = new CountDownLatch(1)
        // 15. 新的broker已经启动
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics, time.milliseconds())
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        throw e

initZkClient 设置zookeeper

initZkClient -> createZkClient -> KafkaZkClient -> ZooKeeperClient

KafkaZkClient 类主要提供kafka操作zookeeper。

ZooKeeperClient 封装了pipelined requests和zk操作。信号量用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。响应存储到响应队列。

val ZkMaxInFlightRequests = 10
private def initZkClient(time: Time): Unit = {
    // 创建client
    _zkClient = createZkClient(config.zkConnect, secureAclsEnabled)
    // 初始化路径信息

val PersistentZkPaths = Seq(
    ConsumerPathZNode.path, // old consumer path /consumers
    BrokerIdsZNode.path, // /brokers/ids
    TopicsZNode.path, // /brokers/topics
    ConfigEntityChangeNotificationZNode.path, // /config/changes
    DeleteTopicsZNode.path, // admin/delete_topics
    BrokerSequenceIdZNode.path, // /brokers/seqid
    IsrChangeNotificationZNode.path, // /isr_change_notification
    ProducerIdBlockZNode.path, // /latest_producer_id_block
    LogDirEventNotificationZNode.path // log_dir_event_notification
  ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)


A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor

val BackgroundThreads = 10


模块 定时任务 参数 默认 执行逻辑 备注
LogManager cleanupLogs log.retention.check.interval.ms 300s 过期日志文件清理  
LogManager flushDirtyLogs log.flush.scheduler.interval.ms Integer.Max 刷日志到日盘  
LogManager checkpointLogRecoveryOffsets log.flush.offset.checkpoint.interval.ms 60s 将topicandpartition的checkpoint写入磁盘 写入文件recovery-point-offset-checkpoint
LogManager checkpointLogStartOffsets log.flush.start.offset.checkpoint.interval.ms 60s 将offset的checkpoint写入磁盘  
ReplicaManager checkpointHighWatermarks replica.high.watermark.checkpoint.interval.ms 5s 对hw进行checkpoint 写入文件 replication-offset-checkpoint
ReplicaManager maybeShrinkIsr replica.lag.time.max.ms 10s 检查是否需要减少isr列表中的replica  
ReplicaManager maybePropagateIsrChanges NA 2.5s 检查是否生成/广播ISr列表/需要写入zk zk目录为/isr_change_notification/isr_change_
GroupMetadataManager deleteExpiredOffsets offsets.retention.check.interval.ms 600s 删除过期的offset 一天之后失效  
ZookeeperConsumerConnector autoCommit auto.commit.interval.ms 60s 打开自动commit的场景下有效,默认值AutoCommitInterval,默认存储在zk,可以设置存储在kafka,以及zk  
KafkaController checkAndTriggerPartitionRebalance leader.imbalance.check.interval.seconds 300s 执行分区均衡 打开自动分区均衡的场景下有效,当分区leader不在perferred节点比例大于leader.imbalance.per.broker.percentage/100时(默认10%),进行


模块 定时任务 参数 默认 执行逻辑 备注
LogManager deleteLogs NA NA 该任务执行的任务主要是删除分区目录,同时删除底下的segment数据文件。  
Log flush(newOffset) NA NA flush日志到磁盘 roll方法中调用
Log deleteSeg NA NA 删除segment 删除前会先修改log和index文件后缀.deleted  
GroupMetadataManager loadGroupsAndOffsets NA NA 加载partition的group和offset信息 在_offset_consumer的partition变为leader时执行
GroupMetadataManager removeGroupsAndOffsets NA NA 去除partition的group和offset信息 在_offset_consumer的partition变为follower时执行


 * An append-only log for storing messages.
 * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
class Log() {
    // LogSegment 集合,SkipList 结构,以 baseOffset 作为 key,以 LogSegment 对象作为 value
    private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
  1. segments是线程安全的,这样 Kafka 源码不需要自行确保日志段操作过程中的线程安全
  2. segments是键值(Key)可排序的 Map。Kafka 将每个日志段的起始位移值作为 Key,能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段。




loadLogs执行加载每个 log 目录下的日志文件,并为每个 topic 分区对应的日志目录创建一个 Log 对象,对于标记为需要删除的 topic 分区目录(对应-delete后缀的目录),则将其 Log 对象添加到 LogManager.logsToBeDeleted 字段中,等待后面的周期性任务(kafka-delete-logs)对其进行删除。

Start the background threads to flush logs and do log cleanup
class LogManager() {
   * Recover and load all logs in the given data directories
  private def loadLogs(): Unit = {
    info("Loading logs.")
    val startMs = time.milliseconds
    val threadPools = ArrayBuffer.empty[ExecutorService]
    val offlineDirs = mutable.Set.empty[(String, IOException)]
    val jobs = mutable.Map.empty[File, Seq[Future[_]]]

    // 遍历处理每个 log 目录
    for (dir <- liveLogDirs) {
      try {
        // 为每个 log 目录创建一个 ioThreads 大小的线程池
        val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)

        // 尝试获取 .kafka_cleanshutdown 文件,如果该文件存在则说明 broker 节点是正常关闭的
        val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)

        if (cleanShutdownFile.exists) {
          debug(s"Found clean shutdown file. Skipping recovery for all logs in data directory: ${dir.getAbsolutePath}")
        } else {
          // log recovery itself is being performed by `Log` class during initialization
          // 当前 broker 不是正常关闭,设置 broker 状态为 RecoveringFromUncleanShutdown,表示正在从上次异常关闭中恢复

        // 读取每个 log 目录下的 recovery-point-offset-checkpoint 文件,返回 topic 分区对象与 HW 之间的映射关系
        var recoveryPoints = Map[TopicPartition, Long]()
        try {
          recoveryPoints = this.recoveryPointCheckpoints(dir).read
        } catch {
          case e: Exception =>
            warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e)
            warn("Resetting the recovery checkpoint to 0")

        var logStartOffsets = Map[TopicPartition, Long]()
        try {
          logStartOffsets = this.logStartOffsetCheckpoints(dir).read
        } catch {
          case e: Exception =>
            warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e)

        // 遍历当前 log 目录的子目录,仅处理目录,忽略文件
        val jobsForDir = for {
          dirContent <- Option(dir.listFiles).toList
          logDir <- dirContent if logDir.isDirectory
        } yield {
          // 为每个 Log 目录创建一个 Runnable 任务
          CoreUtils.runnable {
            try {
              // 调用loadLog 创建log对象
              loadLog(logDir, recoveryPoints, logStartOffsets)
            } catch {
              case e: IOException =>
                offlineDirs.add((dir.getAbsolutePath, e))
                error(s"Error while loading log dir ${dir.getAbsolutePath}", e)
        // 提交上面创建的任务,并将提交结果封装到 jobs 集合中,jobsForDir 是 List[Runnable] 类型
        jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
      } catch {
        case e: IOException =>
          offlineDirs.add((dir.getAbsolutePath, e))
          error(s"Error while loading log dir ${dir.getAbsolutePath}", e)

    // 阻塞等待上面提交的任务执行完成,即等待所有 log 目录下 topic 分区对应的目录文件加载完成
    try {
      for ((cleanShutdownFile, dirJobs) <- jobs) {
        try {
          // 删除对应的 .kafka_cleanshutdown 文件
        } catch {
          case e: IOException =>
            offlineDirs.add((cleanShutdownFile.getParent, e))
            error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)

      offlineDirs.foreach { case (dir, e) =>
        logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
    } catch {
      case e: ExecutionException =>
        error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
        throw e.getCause
    } finally {
      // 遍历关闭线程池

    info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")

  private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = {
    debug(s"Loading log '${logDir.getName}'")
    // 依据目录名解析得到对应的 topic 分区对象
    val topicPartition = Log.parseTopicPartitionName(logDir)
    // 获取当前 topic 分区对应的配置
    val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig)
    // 获取 topic 分区对应的 HW 值
    val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
    val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)

    // 创建对应的 Log 对象,每个 topic 分区目录对应一个 Log 对象
    val log = Log(
      dir = logDir,
      config = config,
      logStartOffset = logStartOffset,
      recoveryPoint = logRecoveryPoint,
      maxProducerIdExpirationMs = maxPidExpirationMs,
      producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
      scheduler = scheduler,
      time = time,
      brokerTopicStats = brokerTopicStats,
      logDirFailureChannel = logDirFailureChannel)

    // 如果当前 log 是需要被删除的文件,topic被删除,则记录到 logsToBeDeleted 队列中,会有周期性任务对其执行删除操作
    if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
    } else {
      // 建立 topic 分区对象与其 Log 对象之间的映射关系,不允许一个 topic 分区对象对应多个目录
      val previous = {
        if (log.isFuture)
          this.futureLogs.put(topicPartition, log)
          this.currentLogs.put(topicPartition, log)
      if (previous != null) {
        if (log.isFuture)
          throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
            s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
            s"replacing current replica with future replica. Recover broker from this failure by manually deleting one of the two directories " +
            s"for this partition. It is recommended to delete the partition in the log directory that is known to have failed recently.")
   *  Start the background threads to flush logs and do log cleanup
  def startup(): Unit = {
    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
                         cleanupLogs _,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
                         flushDirtyLogs _,
                         delay = InitialTaskDelayMs,
                         period = flushCheckMs,
                         checkpointLogRecoveryOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushRecoveryOffsetCheckpointMs,
                         checkpointLogStartOffsets _,
                         delay = InitialTaskDelayMs,
                         period = flushStartOffsetCheckpointMs,
      scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)
    if (cleanerConfig.enableCleaner)



class LogManager() {
   * Delete any eligible logs. Return the number of segments deleted.
   * Only consider logs that are not compacted.
  def cleanupLogs(): Unit = {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds

    // clean current logs.
    val deletableLogs = {
      if (cleaner != null) {
        // prevent cleaner from working on same partitions when changing cleanup policy
      } else {
        currentLogs.filter {
          case (_, log) => !log.config.compact

    try {
      deletableLogs.foreach {
        case (topicPartition, log) =>
          debug(s"Garbage collecting '${log.name}'")
          total += log.deleteOldSegments()

          val futureLog = futureLogs.get(topicPartition)
          if (futureLog != null) {
            // clean future logs
            debug(s"Garbage collecting future log '${futureLog.name}'")
            total += futureLog.deleteOldSegments()
    } finally {
      if (cleaner != null) {

    debug(s"Log cleanup completed. $total files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")

   * If topic deletion is enabled, delete any log segments that have either expired due to time based retention
   * or because the log size is > retentionSize.
   * Whether or not deletion is enabled, delete any log segments that are before the log start offset
  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {

  // 根据时间策略删除相关日志
  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")

  // 根据日志大小删除相关日志
  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
      } else {

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")

  private def deleteLogStartOffsetBreachedSegments(): Int = {
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")


  • 删除:超过时间或大小阈值的旧 segment,直接进行删除;
  • 压缩:不是直接删除日志分段,而是采用合并压缩的方式进行


根据时间策略删除相关日志 retention.ms


  • 对于那些大小为0并且是正在使用中的日志段不会被删除
  • 如果扫描完发现全部的日志段都过期了,就要马上新生成一个新的日志段来处理后面的消息
  • 日志段的删除时异步的,此处只会标记一下,往日志段文件后面加上.delete后缀,然后开启一个定时任务删除文件。定时任务的延迟时间和file.delete.delay.ms有关系。

根据日志大小删除相关日志 retention.bytes






class LogManager() {
   * Flush any log which has exceeded its flush interval and has unwritten messages.
  private def flushDirtyLogs(): Unit = {
    debug("Checking for dirty logs to flush...")

    for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
      try {
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
              s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
        if(timeSinceLastFlush >= log.config.flushMs)
      } catch {
        case e: Throwable =>
          error(s"Error flushing topic ${topicPartition.topic}", e)




class LogManager() {
   * Write out the current recovery point for all logs to a text file in the log directory
   * to avoid recovering the whole log on startup.
  def checkpointLogRecoveryOffsets(): Unit = {
    logsByDir.foreach { case (dir, partitionToLogMap) =>
      liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
        checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)

   * Write out the current log start offset for all logs to a text file in the log directory
   * to avoid exposing data that have been deleted by DeleteRecordsRequest
  def checkpointLogStartOffsets(): Unit = {

遍历所有的LogDir,然后将内存中维护的recovery-checkpoint写到log 目录下文件recovery-point-offset-checkpoint 上,避免启动时恢复所有的日志。

recovery-point-offset-checkpoint 表示已经刷写到磁盘的记录

0  // 当前版本
374 // LogDir目录下有多少个partition目录
__consumer_offsets 22 3 // topic partition编号 recovery-checkpoint
topic1 0 45
topic2 0 2

遍历所有的LogDir,然后将start offset 写到log目录下文件replication-offset-checkpoint 上,避免被DeleteRecordsRequest删除的数据暴露

replication-offset-checkpoint 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。

0  // 当前版本
__consumer_offsets 22 3 // topic partition编号 recovery-checkpoint
topic1 0 45
topic2 0 2





class LogManager() {
   *  Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
   *  has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
   *  considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
   *  after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
   *  `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
  private def deleteLogs(): Unit = {
    var nextDelayMs = 0L
    try {
      def nextDeleteDelayMs: Long = {
        if (!logsToBeDeleted.isEmpty) {
          val (_, scheduleTimeMs) = logsToBeDeleted.peek()
          scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
        } else

      while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
        val (removedLog, _) = logsToBeDeleted.take()
        if (removedLog != null) {
          try {
            info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
          } catch {
            case e: KafkaStorageException =>
              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
    } catch {
      case e: Throwable =>
        error(s"Exception in kafka-delete-logs thread.", e)
    } finally {
      try {
          deleteLogs _,
          delay = nextDelayMs,
          unit = TimeUnit.MILLISECONDS)
      } catch {
        case e: Throwable =>
          if (scheduler.isStarted) {
            // No errors should occur unless scheduler has been shutdown
            error(s"Failed to schedule next delete in kafka-delete-logs thread", e)

    * 异步删除
    * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
    * add it in the queue for deletion.
    * @param topicPartition TopicPartition that needs to be deleted
    * @param isFuture True iff the future log of the specified partition should be deleted
    * @return the removed log
  def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
    val removedLog: Log = logCreationOrDeletionLock synchronized {
      if (isFuture)
    if (removedLog != null) {
      //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
      if (cleaner != null && !isFuture) {
      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
      info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
    } else if (offlineLogDirs.nonEmpty) {
      throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}")




  • 需要先把分区目录标记一下,在后缀加上-delete表示该分区准备删除了。这样做可以防止如果删除时间没到就宕机,下次重启时可以扫描-delete结尾的分区再删除
  • 把分区目录添加到logsToBeDeleted中待删除


KafkaServer 通过 SocketServer 接受消息并且进行处理。并采用了 Reactor 模式。


Reactor 网络通信模型


1 + N + M

  • 1:表示 1 个 Acceptor 线程,负责监听新的连接,然后将新连接交给 Processor 线程处理
  • N:表示 N 个 Processor 线程,每个 Processor 都有自己的 selector,负责从 socket 中读写数据。
  • M:表示 M 个 KafkaRequestHandler 业务处理线程,它通过调用 KafkaApis 进行业务处理,然后生成 response,再交由给 Processor 线程。
 * Handles new connections, requests and responses to and from broker.
 * Kafka supports two types of request planes :
 *  - data-plane :
 *    - Handles requests from clients and other brokers in the cluster.
 *    - The threading model is
 *      1 Acceptor thread per listener, that handles new connections.
 *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
 *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
 *  - control-plane :
 *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
 *      If not configured, the controller requests are handled by the data-plane.
 *    - The threading model is
 *      1 Acceptor thread that handles new connections
 *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
 *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
class SocketServer() {
  private val maxQueuedRequests = config.queuedMaxRequests
  // data-plane
  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
  // RequestChannel负责消息从网络层转接到业务层,以及将业务层的处理结果交付给网络层进而返回给客户端。
  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
  // control-plane
  private var controlPlaneProcessorOpt : Option[Processor] = None
  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))

  def startup(startupProcessors: Boolean = true): Unit = {
    this.synchronized {
      // 控制每个 ip 上最大的连接数的
      connectionQuotas = new ConnectionQuotas(config, time)
      // 创建 Acceptor 和 Processor
      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
      // 开启 processor 线程
      if (startupProcessors) {
    info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane")
    if (controlPlaneAcceptorOpt.isDefined)
      info("Started control-plane acceptor thread")

  private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap

  private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                    endpoints: Seq[EndPoint]): Unit = synchronized {
    // 每一个endPoint创建一个Acceptor,创建多个Processor放入processor线程数组                                                      
    endpoints.foreach { endpoint =>
      connectionQuotas.addListener(config, endpoint.listenerName)
      val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
      KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
      info(s"Created data-plane acceptor and processors for endpoint : $endpoint")

  private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
    endpointOpt.foreach { endpoint =>
      connectionQuotas.addListener(config, endpoint.listenerName)
      // Acceptor
      val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
      controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
      controlPlaneProcessorOpt = Some(controlPlaneProcessor)
      val listenerProcessors = new ArrayBuffer[Processor]()
      listenerProcessors += controlPlaneProcessor
      nextProcessorId += 1
      // Processors
      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
      // 创建非守护进程
      KafkaThread.nonDaemon(s"${ControlPlaneThreadPrefix}-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start()
      info(s"Created control-plane acceptor and processor for endpoint : $endpoint")

  private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = synchronized {
    val sendBufferSize = config.socketSendBufferBytes
    val recvBufferSize = config.socketReceiveBufferBytes
    val brokerId = config.brokerId
    new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix)

// openServerSocket()打开对应的EndPoint的Socket,即打开端口并启动监听。
// run()方法处理op_accept 事件创建 socketChannel 并调用 accept方法,accept 方法则是把该事件交给 Processor.accept 处理
private[kafka] class Acceptor() {
  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  // Acceptor会负责构造并管理的一个Processor的ArrayBuffer,每一个Processor都是一个独立线程
  private val processors = new ArrayBuffer[Processor]()
  private val processorsStarted = new AtomicBoolean

  private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
    processors ++= newProcessors
    if (processorsStarted.get)
      startProcessors(newProcessors, processorThreadPrefix)

  private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
    if (!processorsStarted.getAndSet(true)) {
      startProcessors(processors, processorThreadPrefix)

  private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
    processors.foreach { processor =>

  // 不断监听对应ServerChannel上的连接请求(ACCEPT),如果有新的连接请求,使用的轮询方式将通道分配给Processor
  // 调用assignNewConnection把新连接交付给Processor
  def run(): Unit = {
    // 注册nio的OP_ACCEPT
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    // 标记当前线程已经启动,唤醒阻塞线程
    try {
      var currentProcessorIndex = 0
      // 线程存活,执行
      while (isRunning) {
        try {
          // nioSelector
          // 等待nio关注事件,等待时间500ms
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next

                if (key.isAcceptable) {
                  accept(key).foreach { socketChannel =>
                    // 轮询方式选择 processor
                    // Assign the channel to the next processor (using round-robin) to which the
                    // channel can be added without blocking. If newConnections queue is full on
                    // all processors, block until the last one is able to accept a connection.
                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                      processor = synchronized {
                        // adjust the index (if necessary) and retrieve the processor atomically for
                        // correct behaviour in case the number of processors is reduced dynamically
                        currentProcessorIndex = currentProcessorIndex % processors.length
                      currentProcessorIndex += 1
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
        catch {
    } finally {
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)

   * Create a server socket to listen for connections on.
  private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val socketAddress =
      if (host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
        new InetSocketAddress(host, port)
    val serverChannel = ServerSocketChannel.open()
    if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)

    try {
      info(s"Awaiting socket connections on ${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
    } catch {
      case e: SocketException =>
        throw new KafkaException(s"Socket server failed to bind to ${socketAddress.getHostString}:$port: ${e.getMessage}.", e)

  // assignNewConnection中通过processor.accept的调用,将SocketChannel放入每个processor维护的新连接的队列,后面processor会从队列取出做后续处理
  private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
    if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
    } else

Processor:核心方法是 run 方法,调用 poll 方法发送成功后会加入 completedReceives,conpletedSends,disConnected 中等待处理。



private[kafka] object Processor {
  val IdlePercentMetricName = "IdlePercent"
  val NetworkProcessorMetricTag = "networkProcessor"
  val ListenerMetricTag = "listener"

  val ConnectionQueueSize = 20

 * Thread that processes all requests from a single connection. There are N of these running in parallel
 * each of which has its own selector
private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               failedAuthenticationDelayMs: Int,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider,
                               memoryPool: MemoryPool,
                               logContext: LogContext,
                               connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  // 维护一个新连接队列,在run方法里会取出处理
  private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  // 每一个processor维护一个responseQueue
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

  // processor都维护了一个单独的Selector
  private val selector = createSelector(
      listenerName == config.interBrokerListenerName,
  // Visible to override for testing
  protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
    channelBuilder match {
      case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
      case _ =>
    new KSelector(

  // Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The index is a
  // non-negative incrementing value that ensures that even if remotePort is reused after a connection is
  // closed, connection ids are not reused while requests from the closed connection are being processed.
  private var nextConnectionIndex = 0

  override def run(): Unit = {
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          // processor从newConnections队列取出新连接,并将其注册到selector并监听OR_READ事件
          // register any new responses for writing
          // 处理响应队列,这个响应队列是Handler线程处理以后的结果,会交付给RequestChannel.responseQueue.同时调用unmute,开始接受请求
          // 调用KSelector.poll(),进行真正的数据读写
          // 调用Selector.mute,不再接受Read请求,发送响应之前,不可以再接收任何请求
        } catch {
    } finally {

  private def processNewResponses(): Unit = {
    var currentResponse: RequestChannel.Response = null
    while ({currentResponse = dequeueResponse(); currentResponse != null}) {
      val channelId = currentResponse.request.context.connectionId
      try {
        currentResponse match {
          case response: NoOpResponse =>
            // There is no response to send to the client, we need to read more pipelined requests
            // that are sitting in the server's socket buffer
            trace(s"Socket server received empty response to send, registering for read: $response")
            // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
            // it will be unmuted immediately. If the channel has been throttled, it will be unmuted only if the
            // throttling delay has already passed by now.
            handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)

          case response: SendResponse =>
            sendResponse(response, response.responseSend)
          case response: CloseConnectionResponse =>
            trace("Closing socket connection actively according to the response code.")
          case _: StartThrottlingResponse =>
            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
          case _: EndThrottlingResponse =>
            // Try unmuting the channel. The channel will be unmuted only if the response has already been sent out to
            // the client.
            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
          case _ =>
            throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
      } catch {
        case e: Throwable =>
          processChannelException(channelId, s"Exception while processing response for $channelId", e)

  private def poll(): Unit = {
    val pollTimeout = if (newConnections.isEmpty) 300 else 0
    try selector.poll(pollTimeout)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        // The exception is not re-thrown and any completed sends/receives/connections/disconnections
        // from this poll will be processed.
        error(s"Processor $id poll failed", e)

  private def processCompletedReceives(): Unit = {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
            val header = RequestHeader.parse(receive.payload)
            if (header.apiKey() == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
              trace(s"Begin re-authentication: $channel")
            else {
              val nowNanos = time.nanoseconds()
              if (channel.serverAuthenticationSessionExpired(nowNanos)) {
                // be sure to decrease connection count and drop any in-flight responses
                debug(s"Disconnecting expired channel: $channel : $header")
                expiredConnectionsKilledCount.record(null, 1, 0)
              } else {
                val connectionId = receive.source
                val context = new RequestContext(header, connectionId, channel.socketAddress,
                  channel.principal, listenerName, securityProtocol)
                val req = new RequestChannel.Request(processor = id, context = context,
                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
                // 将请求通过RequestChannel.requestQueue交付给Handler
                // 不再接受Read请求发送响应之前,不可以再接收任何请求
                handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
          case None =>
            // This should never happen since completed receives are processed immediately after `poll()`
            throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
      } catch {

  private def processCompletedSends(): Unit = {
    selector.completedSends.asScala.foreach { send =>
      try {
        val response = inflightResponses.remove(send.destination).getOrElse {
          throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")

        // Invoke send completion callback
        response.onComplete.foreach(onComplete => onComplete(send))

        // Try unmuting the channel. If there was no quota violation and the channel has not been throttled,
        // it will be unmuted immediately. If the channel has been throttled, it will unmuted only if the throttling
        // delay has already passed by now.
        handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
      } catch {
        case e: Throwable => processChannelException(send.destination,
          s"Exception while processing completed send to ${send.destination}", e)

   * 放入队列 newConnections
   * Queue up a new connection for reading
  def accept(socketChannel: SocketChannel,
             mayBlock: Boolean,
             acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
    val accepted = {
      if (newConnections.offer(socketChannel))
      else if (mayBlock) {
        val startNs = time.nanoseconds
        acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs)
      } else
    if (accepted)

  private def configureNewConnections(): Unit = {
    var connectionsProcessed = 0
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
      // 取出新连接SocketChannel
      val channel = newConnections.poll()
      try {
        // 将SocketChannel注册到selector OP_READ
        selector.register(connectionId(channel.socket), channel)
        connectionsProcessed += 1
      } catch {

   * Wakeup the thread for selection.
  override def wakeup() = selector.wakeup()
public class Selector {
  public void register(String id, SocketChannel socketChannel) throws IOException {
    registerChannel(id, socketChannel, SelectionKey.OP_READ);


KafkaApis 是 kafka 服务器处理请求的入口,它负责将请求分发到不同的 handle*() 进行处理


class KafkaApis() {
   * Top-level method that handles all requests and multiplexes to the right api
  def handle(request: RequestChannel.Request): Unit = {
    try {
      trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
        case ApiKeys.END_TXN => handleEndTxnRequest(request)
        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
    } catch {
      case e: FatalExitError => throw e
      case e: Throwable => handleError(request, e)
    } finally {
      request.apiLocalCompleteTimeNanos = time.nanoseconds

   * Handle a produce request
  def handleProduceRequest(request: RequestChannel.Request): Unit = {
    val produceRequest = request.body[ProduceRequest]
    val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes

    if (produceRequest.hasTransactionalRecords) {
      val isAuthorizedTransactional = produceRequest.transactionalId != null &&
        authorize(request, WRITE, TRANSACTIONAL_ID, produceRequest.transactionalId)
      if (!isAuthorizedTransactional) {
        sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
      // Note that authorization to a transactionalId implies ProducerId authorization

    } else if (produceRequest.hasIdempotentRecords && !authorize(request, IDEMPOTENT_WRITE, CLUSTER, CLUSTER_NAME)) {
      sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)

    val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
    val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
    val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
    val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
    val authorizedTopics = filterAuthorized(request, WRITE, TOPIC,
    // 遍历这个ProduceRequest 本次发送过来的所有消息记录。对ProduceRequest 中所有Record 的处理是逐条处理的,并不是某一条的异常影响所有Record 都失败
    for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
      // 如果判断topic未授权,生成未授权topic 的response
      if (!authorizedTopics.contains(topicPartition.topic))
        unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      else if (!metadataCache.contains(topicPartition))
      // 如果判断broker的元数据缓存不包含该topicPartition,生成不存在topic的response
        nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
      // 如果brocker的元数据缓存包含该topicPartition,将该二元组添加到authorizedRequestInfo的map集合
        try {
          ProduceRequest.validateRecords(request.header.apiVersion(), memoryRecords)
          authorizedRequestInfo += (topicPartition -> memoryRecords)
        } catch {
          case e: ApiException =>
            invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))

    // 处理完所有请求后,回调该函数返回应答
    // the callback for sending a produce response
    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
      val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses ++ invalidRequestResponses
      var errorInResponse = false

      // 逐个处理应答
      mergedResponseStatus.foreach { case (topicPartition, status) =>
        if (status.error != Errors.NONE) {
          errorInResponse = true
          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(

      // When this callback is triggered, the remote API call has completed
      request.apiRemoteCompleteTimeNanos = time.nanoseconds

      // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas
      // have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note
      // that the request quota is not enforced if acks == 0.
      val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds())
      val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request)
      val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
      if (maxThrottleTimeMs > 0) {
        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
          quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse)
        } else {
          quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)

      // Send the response immediately. In case of throttling, the channel has already been muted.
      if (produceRequest.acks == 0) {
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        if (errorInResponse) {
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts)
        } else {
          // Note that although request throttling is exempt for acks == 0, the channel may be throttled due to
          // bandwidth quota violation.
      } else {
        sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None)

    def processingStatsCallback(processingStats: FetchResponseStats): Unit = {
      processingStats.foreach { case (tp, info) =>
        updateRecordConversionStats(request, tp, info)

    if (authorizedRequestInfo.isEmpty)
    else {
      val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

      // 调用副本管理的方法,将消息添加到对应所有的副本分区中。将record添加到副本broker
      // call the replica manager to append messages to the replicas
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        origin = AppendOrigin.Client,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordConversionStatsCallback = processingStatsCallback)

      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
      // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log


KafkaRequestHandlerPool 处理request的线程池,请求处理池。KafkaRequestHandlerPool构造方法中初始化并启动了多个KafkaRequestHandler线程对象,线程池大小通过Kafka配置文件配置项num.io.threads进行配置。KafkaRequestHandlerPool线程池中的所有KafkaRequestHandler,通过竞争方式从RequestChannel.requestQueue中获取请求进行处理。由于requestQueue的类型是ArrayBlockingQueue,通过调用ArrayBlockingQueue.poll()方法取出请求.

KafkaRequestHandler 请求处理线程

KafkaRequestHandler run方法的具体逻辑是从RequestChannel取出processor之前put请求,调用KafkaApi针对不同请求类型分别处理


class KafkaRequestHandlerPool() {
  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()

class KafkaRequestHandler() {
  def run(): Unit = {
    while (!stopped) {
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      val startSelectTime = time.nanoseconds
      // 从RequestChannel.requestQueue队列中取出请求
      val req = requestChannel.receiveRequest(300)
      val endTime = time.nanoseconds
      // 统计线程空闲时间
      val idleTime = endTime - startSelectTime
      aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

      req match {
        // 关闭线程请求,说明该 Broker 发起了关闭操作
        case RequestChannel.ShutdownRequest =>
          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
        // 普通请求
        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            trace(s"Kafka request handler $id on broker $brokerId handling request $request")
            // 调用KafkaApi.handle(),将请求交付给业务
          } catch {
            case e: FatalExitError =>
            case e: Throwable => error("Exception when handling request", e)
          } finally {
            // 释放请求对象占用的内存缓冲区资源

        case null => // continue



object KafkaController extends Logging {
  def startup() = {
    zkClient.registerStateChangeHandler(new StateChangeHandler {
      override val name: String = StateChangeHandlers.ControllerHandler
      override def afterInitializingSession(): Unit = {
      override def beforeInitializingSession(): Unit = {
        val queuedEvent = eventManager.clearAndPut(Expire)

        // Block initialization of the new session until the expiration event is being handled,
        // which ensures that all pending events have been processed before creating the new session


Kafka 中需要副本写入的场景主要有

  • 生产者向 Leader 副本写入消息
  • Follower 副本拉取消息后写入副本
  • 消费者组写入组信息
  • 事务管理器写入事务信息(包括事务标记、事务元数据等)


/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)
  • -1: Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了 如果说Partition Leader刚接收到了消息,但是结果Follower没有收到消息,此时Leader宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。 此时可能Partition 2的Follower变成Leader了,此时ISR列表里只有最新的这个Follower转变成的Leader了,那么只要这个新的Leader接收消息就算成功了 acks=all就代表数据一定不会丢失了吗? 当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗? 当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。 所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。 这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失
  • 1: 只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管其他的Follower有没有同步过去这条消息了 这种设置其实是kafka默认的设置方式 也就是说默认情况下,要是不设置这个参数,只要Partition Leader写成功就算成功。 但是这里有一个问题,万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了,此时也会导致这条消息丢失,因为人家客户端已经认为发送成功了。
  • 0: kafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,就不管他了,直接认为这个消息发送成功
class ReplicaManager() {
   * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
   * the callback function will be triggered either when timeout or the required acks are satisfied;
   * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
  def appendRecords(timeout: Long, // 请求处理超时时间
                    requiredAcks: Short, // 请求acks 设置
                    internalTopicsAllowed: Boolean, // 是否允许写入内部Topic
                    origin: AppendOrigin, // 是否来自客户端
                    entriesPerPartition: Map[TopicPartition, MemoryRecords], // 待写入消息
                    responseCallback: Map[TopicPartition, PartitionResponse] => Unit, // 回调逻辑
                    delayedProduceLock: Option[Lock] = None,
                    recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {
    // 合法取值是-1,0,1,否则视为非法
    if (isValidRequiredAcks(requiredAcks)) {
      val sTime = time.milliseconds
      // 将消息追加到本地副本日志中
      val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
        origin, entriesPerPartition, requiredAcks)
      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition ->
                  // 设置下一条待写入消息的位移值
                  result.info.lastOffset + 1, // required offset
                  // 构建PartitionResponse封装写入结果
                  new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
                    result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
      // 尝试更新消息格式转换的指标数据
      recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

      // 用于判断消息集合被写入到日志之后,是否需要等待其他副本也写入成功
      if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        // 创建DelayedProduce延时请求对象,等待其他副本写入成功
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        // 再一次尝试完成该延时请求
        // 如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        // we can respond immediately
        // 无需等待其他副本写入完成,可以立即发送Response
        val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
        // 调用回调逻辑然后返回即可
    } else {
      // If required.acks is outside accepted range, something is wrong with the client
      // Just return an error and don't handle the request at all
      val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
        topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
          LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
      // 构造INVALID_REQUIRED_ACKS 异常并封装进回调函数调用中

   * Append the messages to the local replica logs
  private def appendToLocalLog(internalTopicsAllowed: Boolean,
                               origin: AppendOrigin,
                               entriesPerPartition: Map[TopicPartition, MemoryRecords],
                               requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {

    def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
      val logStartOffset = getPartition(topicPartition) match {
        case HostedPartition.Online(partition) => partition.logStartOffset
        case HostedPartition.None | HostedPartition.Offline => -1L
      error(s"Error processing append operation on partition $topicPartition", t)


    // 遍历处理每个 topic 分区及其待追加的消息数据
    trace(s"Append [$entriesPerPartition] to local log")
    entriesPerPartition.map { case (topicPartition, records) =>

      // reject appending to internal topics if it is not allowed
      if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
        (topicPartition, LogAppendResult(
          Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
      } else {
        try {
          // 获取分区对象
          val partition = getPartitionOrException(topicPartition, expectLeader = true)
          // 向该分区对象写入消息集合
          val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
          val numAppendedMessages = info.numMessages

          // update stats for successfully appended bytes and messages as bytesInRate and messageInRate

          trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
            s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
          (topicPartition, LogAppendResult(info))
        } catch {
          // NOTE: Failed produce requests metric is not incremented for known exceptions
          // it is supposed to indicate un-expected failures of a broker in handling a produce request
          case e@ (_: UnknownTopicOrPartitionException |
                   _: NotLeaderForPartitionException |
                   _: RecordTooLargeException |
                   _: RecordBatchTooLargeException |
                   _: CorruptRecordException |
                   _: KafkaStorageException) =>
            (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
          case rve: RecordValidationException =>
            val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
            val recordErrors = rve.recordErrors
            (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
              logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException)))
          case t: Throwable =>
            val logStartOffset = processFailedRecord(topicPartition, t)
            (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))


class Partition() {
  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
    // inReadLock是一个柯里化函数,第二个参数是一个函数,返回值是LogAppendInfo和HW是否增加的bool值
    // 相当于给方法加了读锁
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      // leaderReplicaIfLocal表示本地broker中的leader副本
      leaderLogIfLocal match {
        case Some(leaderLog) =>
          val minIsr = leaderLog.config.minInSyncReplicas
          val inSyncSize = inSyncReplicaIds.size

          // Avoid writing to leader if there are not enough insync replicas to make it safe
          // 如果没有足够isr副本就返回异常,min.insync.replicas是和ack=-1一起使用的
          if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException(s"The size of the current ISR $inSyncReplicaIds " +
              s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
          // 只写入leaderlog
          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,

          // 写入完消息,尝试触发Fetch请求,比如满足消费者的fetch.max.bytes
          // we may need to increment high watermark since ISR could be down to 1
          (info, maybeIncrementLeaderHW(leaderLog))

        case None =>
          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
            .format(topicPartition, localBrokerId))

    // some delayed operations may be unblocked after HW changed
    if (leaderHWIncremented)
    else {
      // probably unblock some follower fetch requests since log end offset has been updated



class Log()
  def appendAsLeader(records: MemoryRecords,
                     leaderEpoch: Int,
                     origin: AppendOrigin = AppendOrigin.Client,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)

  private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: ApiVersion,
                     assignOffsets: Boolean,
                     leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      val appendInfo = analyzeAndValidateRecords(records, origin)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        if (assignOffsets) {
          // assign offsets to the message set
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          // 各种验证
          val validateAndOffsetAssignResult = try {
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          // 验证通过后的消息
          validRecords = validateAndOffsetAssignResult.validatedRecords
          // 根据校验结果完善appendInfo对象
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
        } else {
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)

        // update the epoch cache with the epoch stamped onto the message by the leader
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")

        // Records总消息不能比segment.bytes大
        // check messages set size may be exceed config.segmentSize
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")

        // 是否需要生成一个新的segment,具体判断条件见下文
        // maybe roll the log if this segment is full
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        // 保存位移的VO
        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, origin)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo

        // 真正append日志的是LogSegment对象
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 更新LEO,lastOffset + 1
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        for (producerAppendInfo <- updatedProducers.values) {

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")

        if (unflushedMessages >= config.flushInterval)




groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)

__consumer_offsets topic,它是 Kafka 内部使用的一个 topic,专门用来存储 group 消费的情况,默认情况下有50个 partition,每个 partition 默认有三个副本,而具体的一个 group 的消费情况要存储到哪一个 partition 上,是根据 abs(GroupId.hashCode()) % NumPartitions 来计算的(其中,NumPartitions 是 __consumer_offsets 的 partition 数,默认是50个)






kafka server源代码分析


Kafka 源码解析之Broker请求处理流程


Kafka 请求模块



