Review Board 1.7.22


Patch for KAFKA-930

Review Request #15711 - Created Nov. 20, 2013 and updated

Sriram Subramanian
KAFKA-930
Reviewers
kafka
kafka
Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/controller/KafkaController.scala

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/controller/KafkaController.scala

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


some more changes


use zk for auto rebalance


Address code review feedbacks


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


commit missing code


some more changes


fix merge conflicts


Add auto leader rebalance support


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/admin/AdminUtils.scala
	core/src/main/scala/kafka/admin/TopicCommand.scala

change comments


commit the remaining changes


Move AddPartitions into TopicCommand

 

Diff revision 3

This is not the most recent revision of the diff. The latest diff is revision 10. See what's changed.

1 2 3 4 5 6 7 8 9 10
1 2 3 4 5 6 7 8 9 10

  1. core/src/main/scala/kafka/controller/KafkaController.scala: Loading...
  2. core/src/main/scala/kafka/server/KafkaConfig.scala: Loading...
core/src/main/scala/kafka/controller/KafkaController.scala
Revision 88792c2b2a360e928ab9cd00de151e5d5f94452d New Change
[20] 25 lines
[+20]
26
import kafka.cluster.Broker
26
import kafka.cluster.Broker
27
import kafka.common._
27
import kafka.common._
28
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
28
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
29
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
29
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
30
import kafka.utils.ZkUtils._
30
import kafka.utils.ZkUtils._
31
import kafka.utils.{Json, Utils, ZkUtils, Logging}
31
import kafka.utils.{Json, Utils, ZkUtils, Logging, KafkaScheduler}
32
import org.apache.zookeeper.Watcher.Event.KeeperState
32
import org.apache.zookeeper.Watcher.Event.KeeperState
33
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
33
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
34
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
34
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
35
import java.util.concurrent.atomic.AtomicInteger
35
import java.util.concurrent.atomic.AtomicInteger
36
import scala.Some
36
import scala.Some
[+20] [20] 73 lines
[+20]
110
  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
110
  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
111
  private val partitionStateMachine = new PartitionStateMachine(this)
111
  private val partitionStateMachine = new PartitionStateMachine(this)
112
  private val replicaStateMachine = new ReplicaStateMachine(this)
112
  private val replicaStateMachine = new ReplicaStateMachine(this)
113
  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
113
  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
114
    config.brokerId)
114
    config.brokerId)

    
   
115
  // have a separate scheduler for the controller to be able to start and stop independently of the

    
   
116
  // kafka server

    
   
117
  private val autoRebalanceScheduler = new KafkaScheduler(1)
115
  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
118
  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
116
  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
119
  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
117
  private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
120
  private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
118
  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
121
  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
119
  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controllerContext, sendRequest, this.config.brokerId, this.clientId)
122
  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controllerContext, sendRequest, this.config.brokerId, this.clientId)
[+20] [20] 128 lines
[+20]
248
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
251
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
249
      initializeAndMaybeTriggerPartitionReassignment()
252
      initializeAndMaybeTriggerPartitionReassignment()
250
      initializeAndMaybeTriggerPreferredReplicaElection()
253
      initializeAndMaybeTriggerPreferredReplicaElection()
251
      /* send partition leadership info to all live brokers */
254
      /* send partition leadership info to all live brokers */
252
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
255
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

    
   
256
      if (config.autoLeaderRebalanceEnable) {

    
   
257
        info("starting the partition rebalance scheduler")

    
   
258
        autoRebalanceScheduler.startup()

    
   
259
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,

    
   
260
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)

    
   
261
      }
253
    }
262
    }
254
    else
263
    else
255
      info("Controller has been shut down, aborting startup/failover")
264
      info("Controller has been shut down, aborting startup/failover")
256
  }
265
  }
257

    
   
266

   
[+20] [20] 196 lines
[+20]
454
      // remove the partition from the admin path to unblock the admin client
463
      // remove the partition from the admin path to unblock the admin client
455
      removePartitionFromReassignedPartitions(topicAndPartition)
464
      removePartitionFromReassignedPartitions(topicAndPartition)
456
    }
465
    }
457
  }
466
  }
458

    
   
467

   
459
  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
468
  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: Boolean = true) {
460
    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
469
    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
461
    try {
470
    try {
462
      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
471
      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
463
      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
472
      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
464
    } catch {
473
    } catch {
465
      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
474
      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
466
    } finally {
475
    } finally {
467
      removePartitionsFromPreferredReplicaElection(partitions)
476
      removePartitionsFromPreferredReplicaElection(partitions, updateZk)
468
    }
477
    }
469
  }
478
  }
470

    
   
479

   
471
  /**
480
  /**
472
   * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
481
   * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
[+20] [20] 18 lines
[+20]
491
  def shutdown() = {
500
  def shutdown() = {
492
    controllerContext.controllerLock synchronized {
501
    controllerContext.controllerLock synchronized {
493
      isRunning = false
502
      isRunning = false
494
      partitionStateMachine.shutdown()
503
      partitionStateMachine.shutdown()
495
      replicaStateMachine.shutdown()
504
      replicaStateMachine.shutdown()

    
   
505
      if (config.autoLeaderRebalanceEnable)

    
   
506
        autoRebalanceScheduler.shutdown()
496
      if(controllerContext.controllerChannelManager != null) {
507
      if(controllerContext.controllerChannelManager != null) {
497
        controllerContext.controllerChannelManager.shutdown()
508
        controllerContext.controllerChannelManager.shutdown()
498
        controllerContext.controllerChannelManager = null
509
        controllerContext.controllerChannelManager = null
499
        info("Controller shutdown complete")
510
        info("Controller shutdown complete")
500
      }
511
      }
[+20] [20] 228 lines
[+20]
729
      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
740
      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
730
      case e2: Throwable => throw new KafkaException(e2.toString)
741
      case e2: Throwable => throw new KafkaException(e2.toString)
731
    }
742
    }
732
  }
743
  }
733

    
   
744

   
734
  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
745
  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], updateZK : Boolean) {
735
    for(partition <- partitionsToBeRemoved) {
746
    for(partition <- partitionsToBeRemoved) {
736
      // check the status
747
      // check the status
737
      val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
748
      val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
738
      val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
749
      val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
739
      if(currentLeader == preferredReplica) {
750
      if(currentLeader == preferredReplica) {
740
        info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
751
        info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
741
      } else {
752
      } else {
742
        warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
753
        warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
743
      }
754
      }
744
    }
755
    }

    
   
756
    if (updateZK)
745
    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
757
      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
746
    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
758
    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
747
  }
759
  }
748

    
   
760

   
749
  private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
761
  private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
[+20] [20] 146 lines
[+20]
896
        }
908
        }
897
        controllerElector.elect
909
        controllerElector.elect
898
      }
910
      }
899
    }
911
    }
900
  }
912
  }

    
   
913

   

    
   
914
  private def checkAndTriggerPartitionRebalance(): Unit = {

    
   
915
    if (isActive()) {

    
   
916
      info("checking need to trigger partition rebalance")

    
   
917
      // get all the active brokers

    
   
918
      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null;

    
   
919
      controllerContext.controllerLock synchronized {

    
   
920
        preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy(_._2.head)

    
   
921
      }

    
   
922
      debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)

    
   
923
      // for each broker, check if a preferred replica election needs to be triggered

    
   
924
      preferredReplicasForTopicsByBrokers.foreach( brokerInfo => {

    
   
925
        var imbalanceRatio: Double = 0

    
   
926
        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null

    
   
927
        controllerContext.controllerLock synchronized {

    
   
928
          val brokerIds = controllerContext.liveBrokerIds

    
   
929
          if (brokerIds.contains(brokerInfo._1) &&

    
   
930
              controllerContext.partitionsBeingReassigned.size == 0) {

    
   
931
            // do this check only if the broker is live and there are no partitions being reassigned currently

    
   
932
            topicsNotInPreferredReplica =

    
   
933
              brokerInfo._2.filter(item => controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader != brokerInfo._1);

    
   
934
            debug("topics not in preferred replica " + topicsNotInPreferredReplica)

    
   
935
            val totalTopicPartitionsForBroker = brokerInfo._2.size

    
   
936
            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size

    
   
937
            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker

    
   
938
            info("leader imbalance ratio for broker %d is %f".format(brokerInfo._1, imbalanceRatio))

    
   
939
          }

    
   
940
        }

    
   
941
        // check ratio and if greater than desired ratio, trigger a rebalance for the topics

    
   
942
        // that need to be on this broker

    
   
943
        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {

    
   
944
          topicsNotInPreferredReplica.foreach(topicPartition => {

    
   
945
            controllerContext.controllerLock synchronized {

    
   
946
              onPreferredReplicaElection(Set(topicPartition._1), false)

    
   
947
            }

    
   
948
          })

    
   
949
        }

    
   
950
      }

    
   
951
      )

    
   
952
    }

    
   
953
  }
901
}
954
}
902

    
   
955

   
903
/**
956
/**
904
 * Starts the partition reassignment process unless -
957
 * Starts the partition reassignment process unless -
905
 * 1. Partition previously existed
958
 * 1. Partition previously existed
[+20] [20] 188 lines
core/src/main/scala/kafka/server/KafkaConfig.scala
Revision b324344d0a383398db8bfe2cbeec2c1378fe13c9 New Change
 
  1. core/src/main/scala/kafka/controller/KafkaController.scala: Loading...
  2. core/src/main/scala/kafka/server/KafkaConfig.scala: Loading...