Review Board 1.7.22


Guozhang Wang got review request #14041!

Patch for KAFKA-1030

Review Request #14041 - Created Sept. 9, 2013 and updated

Guozhang Wang
0.8
KAFKA-1030
Reviewers
kafka
kafka
Using the approach of reading directly from ZK.
unit tests

Diff revision 5 (Latest)

1 2 3 4 5
1 2 3 4 5

  1. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala: Loading...
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Revision 81bf0bda3229e94ecb6b6aff3ffc9fde852df61b New Change
[20] 28 lines
[+20]
29
import org.apache.zookeeper.Watcher.Event.KeeperState
29
import org.apache.zookeeper.Watcher.Event.KeeperState
30
import java.util.UUID
30
import java.util.UUID
31
import kafka.serializer._
31
import kafka.serializer._
32
import kafka.utils.ZkUtils._
32
import kafka.utils.ZkUtils._
33
import kafka.common._
33
import kafka.common._
34
import kafka.client.ClientUtils

   
35
import com.yammer.metrics.core.Gauge
34
import com.yammer.metrics.core.Gauge
36
import kafka.metrics._
35
import kafka.metrics._
37
import scala.Some
36
import scala.Some
38

    
   
37

   
39

    
   
38

   
[+20] [20] 380 lines
[+20]
420
        warn("no brokers found when trying to rebalance.")
419
        warn("no brokers found when trying to rebalance.")
421
        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
420
        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
422
        true
421
        true
423
      }
422
      }
424
      else {
423
      else {
425
        val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
424
        val partitionsAssignmentPerTopicMap = getPartitionAssignmentForTopics(zkClient, myTopicThreadIdsMap.keySet.toSeq)
426
                                                            brokers,
425
        val partitionsPerTopicMap = partitionsAssignmentPerTopicMap.map(p => (p._1, p._2.keySet.toSeq))
427
                                                            config.clientId,

   
428
                                                            config.socketTimeoutMs,

   
429
                                                            correlationId.getAndIncrement).topicsMetadata

   
430
        val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]

   
431
        topicsMetadata.foreach(m => {

   
432
          val topic = m.topic

   
433
          val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)

   
434
          partitionsPerTopicMap.put(topic, partitions)

   
435
        })

   
436

    
   
426

   
437
        /**
427
        /**
438
         * fetchers must be stopped to avoid data duplication, since if the current
428
         * fetchers must be stopped to avoid data duplication, since if the current
439
         * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
429
         * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
440
         * But if we don't stop the fetchers first, this consumer would continue returning data for released
430
         * But if we don't stop the fetchers first, this consumer would continue returning data for released
[+20] [20] 345 lines
  1. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala: Loading...