Review Board 1.7.22


Guozhang Wang got review request #14041!

Patch for KAFKA-1030

Review Request #14041 - Created Sept. 9, 2013 and updated

Guozhang Wang
0.8
KAFKA-1030
Reviewers
kafka
kafka
Using the approach of reading directly from ZK.
unit tests

Diff revision 2

This is not the most recent revision of the diff. The latest diff is revision 5. See what's changed.

1 2 3 4 5
1 2 3 4 5

  1. core/src/main/scala/kafka/client/ClientUtils.scala: Loading...
  2. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala: Loading...
core/src/main/scala/kafka/client/ClientUtils.scala
Revision cc526ec933052b239f0e7ce43e76cd9d011d5bd9 New Change
[20] 18 lines
[+20]
19
import scala.collection._
19
import scala.collection._
20
import kafka.cluster._
20
import kafka.cluster._
21
import kafka.api._
21
import kafka.api._
22
import kafka.producer._
22
import kafka.producer._
23
import kafka.common.KafkaException
23
import kafka.common.KafkaException
24
import kafka.utils.{Utils, Logging}
24
import kafka.utils.{ZkUtils, Json, Utils, Logging}
25
import java.util.Properties
25
import java.util.Properties
26
import util.Random
26
import util.Random

    
   
27
 import org.I0Itec.zkclient.ZkClient
27

    
   
28

   
28
/**
29
 /**
29
 * Helper functions common to clients (producer, consumer, or admin)
30
 * Helper functions common to clients (producer, consumer, or admin)
30
 */
31
 */
31
object ClientUtils extends Logging{
32
object ClientUtils extends Logging{
[+20] [20] 55 lines
[+20]
87
    val producerConfig = new ProducerConfig(props)
88
    val producerConfig = new ProducerConfig(props)
88
    fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
89
    fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
89
  }
90
  }
90

    
   
91

   
91
  /**
92
  /**

    
   
93
   * Read the topic partitions info directly from ZK

    
   
94
   * @param topics The topics for which the metadata needs to be read

    
   
95
   * @param zkClient The client used to communicate with ZK

    
   
96
   */

    
   
97
  def fetchTopicPartitionsFromZK(topics: Set[String], zkClient: ZkClient): mutable.Map[String, Seq[Int]] = {

    
   
98
    val ret = new mutable.HashMap[String, Seq[Int]]

    
   
99
    topics.foreach { topic =>

    
   
100
      val jsonPartitionMapOpt = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPath(topic))._1

    
   
101
      jsonPartitionMapOpt match {

    
   
102
        case Some(jsonPartitionMap) =>

    
   
103
          Json.parseFull(jsonPartitionMap) match {

    
   
104
            case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {

    
   
105
              case Some(repl)  =>

    
   
106
                val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]

    
   
107
                val partitions = replicaMap.map(_._1.toInt)

    
   
108
                ret.put(topic, partitions.toSeq)

    
   
109
              case None => ret.put(topic, Seq.empty[Int])

    
   
110
            }

    
   
111
            case None => ret.put(topic, Seq.empty[Int])

    
   
112
          }

    
   
113
        case None => ret.put(topic, Seq.empty[Int])

    
   
114
      }

    
   
115
    }

    
   
116
    ret

    
   
117
  }

    
   
118

   

    
   
119
  /**
92
   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
120
   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
93
   */
121
   */
94
  def parseBrokerList(brokerListStr: String): Seq[Broker] = {
122
  def parseBrokerList(brokerListStr: String): Seq[Broker] = {
95
    val brokersStr = Utils.parseCsvList(brokerListStr)
123
    val brokersStr = Utils.parseCsvList(brokerListStr)
96

    
   
124

   
[+20] [20] 11 lines
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Revision e7a692a1d23ca5a9ecf86e3cb34be418b9c0c943 New Change
 
  1. core/src/main/scala/kafka/client/ClientUtils.scala: Loading...
  2. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala: Loading...