Review Board 1.7.22


Patch for KAFKA-930

Review Request #15711 - Created Nov. 20, 2013 and updated

Sriram Subramanian
KAFKA-930
Reviewers
kafka
kafka
Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/controller/KafkaController.scala

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/controller/KafkaController.scala

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


some more changes


use zk for auto rebalance


Address code review feedbacks


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


commit missing code


some more changes


fix merge conflicts


Add auto leader rebalance support


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/admin/AdminUtils.scala
	core/src/main/scala/kafka/admin/TopicCommand.scala

change comments


commit the remaining changes


Move AddPartitions into TopicCommand

 

Diff revision 10 (Latest)

1 2 3 4 5 6 7 8 9 10
1 2 3 4 5 6 7 8 9 10

  1. core/src/main/scala/kafka/controller/KafkaController.scala: Loading...
core/src/main/scala/kafka/controller/KafkaController.scala
Revision 00a1f9802474a688fe725db8d6bff493ede48684 New Change
[20] 600 lines
[+20]
601
      // remove the partition from the admin path to unblock the admin client
601
      // remove the partition from the admin path to unblock the admin client
602
      removePartitionFromReassignedPartitions(topicAndPartition)
602
      removePartitionFromReassignedPartitions(topicAndPartition)
603
    }
603
    }
604
  }
604
  }
605

    
   
605

   
606
  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
606
  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) {
607
    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
607
    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
608
    try {
608
    try {
609
      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
609
      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
610
      deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
610
      deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
611
      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
611
      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
612
    } catch {
612
    } catch {
613
      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
613
      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
614
    } finally {
614
    } finally {
615
      removePartitionsFromPreferredReplicaElection(partitions)
615
      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
616
      deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
616
      deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
617
    }
617
    }
618
  }
618
  }
619

    
   
619

   
620
  /**
620
  /**
[+20] [20] 291 lines
[+20]
912
      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
912
      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
913
      case e2: Throwable => throw new KafkaException(e2.toString)
913
      case e2: Throwable => throw new KafkaException(e2.toString)
914
    }
914
    }
915
  }
915
  }
916

    
   
916

   
917
  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
917
  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],

    
   
918
                                                   isTriggeredByAutoRebalance : Boolean) {
918
    for(partition <- partitionsToBeRemoved) {
919
    for(partition <- partitionsToBeRemoved) {
919
      // check the status
920
      // check the status
920
      val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
921
      val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
921
      val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
922
      val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
922
      if(currentLeader == preferredReplica) {
923
      if(currentLeader == preferredReplica) {
923
        info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
924
        info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
924
      } else {
925
      } else {
925
        warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
926
        warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
926
      }
927
      }
927
    }
928
    }

    
   
929
    if (!isTriggeredByAutoRebalance)
928
    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
930
      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
929
    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
931
    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
930
  }
932
  }
931

    
   
933

   
932
  /**
934
  /**
[+20] [20] 155 lines
[+20]
1088
          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
1090
          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
1089
          inLock(controllerContext.controllerLock) {
1091
          inLock(controllerContext.controllerLock) {
1090
            topicsNotInPreferredReplica =
1092
            topicsNotInPreferredReplica =
1091
              topicAndPartitionsForBroker.filter {
1093
              topicAndPartitionsForBroker.filter {
1092
                case(topicPartition, replicas) => {
1094
                case(topicPartition, replicas) => {

    
   
1095
                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
1093
                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
1096
                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
1094
                }
1097
                }
1095
              }
1098
              }
1096
            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
1099
            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
1097
            val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
1100
            val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
1098
            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
1101
            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
1099
            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
1102
            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
1100
            trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
1103
            trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
1101
          }
1104
          }
1102
          // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
1105
          // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
1103
          // that need to be on this broker
1106
          // that need to be on this broker
1104
          if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
1107
          if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {

    
   
1108
            topicsNotInPreferredReplica.foreach {

    
   
1109
              case(topicPartition, replicas) => {
1105
            inLock(controllerContext.controllerLock) {
1110
                inLock(controllerContext.controllerLock) {
1106
              // do this check only if the broker is live and there are no partitions being reassigned currently
1111
                  // do this check only if the broker is live and there are no partitions being reassigned currently
1107
              // and preferred replica election is not in progress
1112
                  // and preferred replica election is not in progress
1108
              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
1113
                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
1109
                  controllerContext.partitionsBeingReassigned.size == 0 &&
1114
                      controllerContext.partitionsBeingReassigned.size == 0 &&
1110
                  controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
1115
                      controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
1111
                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
1116
                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
1112
                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
1117
                      !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
1113
                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
1118
                      controllerContext.allTopics.contains(topicPartition.topic)) {
1114
                try {
1119
                    onPreferredReplicaElection(Set(topicPartition), false)
1115
                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
1120
                  }
1116
                  info("Created preferred replica election path with %s".format(jsonData))

   
1117
                } catch {

   
1118
                  case e2: ZkNodeExistsException =>

   
1119
                    val partitionsUndergoingPreferredReplicaElection =

   
1120
                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)

   
1121
                    error("Preferred replica leader election currently in progress for " +

   
1122
                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));

   
1123
                  case e3: Throwable =>

   
1124
                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))

   
1125
                }
1121
                }
1126
              }
1122
              }
1127
            }
1123
            }
1128
          }
1124
          }
1129
        }
1125
        }
[+20] [20] 214 lines
  1. core/src/main/scala/kafka/controller/KafkaController.scala: Loading...