Review Board 1.7.22


Neha Narkhede got review request #14641!

Patch for KAFKA-1086

Review Request #14641 - Created Oct. 15, 2013 and updated

Neha Narkhede
KAFKA-1086
Reviewers
kafka
kafka
Improved GetOffsetShell to query metadata automatically, add batch offset request for all partitions of a topic and improve usability

 

Diff revision 1 (Latest)

  1. core/src/main/scala/kafka/tools/GetOffsetShell.scala: Loading...
core/src/main/scala/kafka/tools/GetOffsetShell.scala
Revision 2b9438ae3a4da2c7f97e05fb26bc179fe48f137d New Change
[20] 20 lines
[+20]
21
import kafka.consumer._
21
import kafka.consumer._
22
import joptsimple._
22
import joptsimple._
23
import java.net.URI
23
import java.net.URI
24
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
24
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
25
import kafka.common.TopicAndPartition
25
import kafka.common.TopicAndPartition

    
   
26
import kafka.client.ClientUtils
26

    
   
27

   
27

    
   
28

   
28
object GetOffsetShell {
29
object GetOffsetShell {
29

    
   
30

   
30
  def main(args: Array[String]): Unit = {
31
  def main(args: Array[String]): Unit = {
31
    val parser = new OptionParser
32
    val parser = new OptionParser
32
    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
33
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
33
                           .withRequiredArg
34
                           .withRequiredArg
34
                           .describedAs("kafka://hostname:port")
35
                           .describedAs("hostname:port,...,hostname:port")
35
                           .ofType(classOf[String])
36
                           .ofType(classOf[String])
36
    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
37
    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
37
                           .withRequiredArg
38
                           .withRequiredArg
38
                           .describedAs("topic")
39
                           .describedAs("topic")
39
                           .ofType(classOf[String])
40
                           .ofType(classOf[String])
40
    val partitionOpt = parser.accepts("partition", "partition id")
41
    val partitionOpt = parser.accepts("partition", "comma separated list of partition ids. If not specified, it will find offsets for all partitions")
41
                           .withRequiredArg
42
                           .withRequiredArg
42
                           .describedAs("partition id")
43
                           .describedAs("partition ids")
43
                           .ofType(classOf[java.lang.Integer])
44
                           .ofType(classOf[String])
44
                           .defaultsTo(0)
45
                           .defaultsTo("")
45
    val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
46
    val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
46
                           .withRequiredArg
47
                           .withRequiredArg
47
                           .describedAs("timestamp/-1(latest)/-2(earliest)")
48
                           .describedAs("timestamp/-1(latest)/-2(earliest)")
48
                           .ofType(classOf[java.lang.Long])
49
                           .ofType(classOf[java.lang.Long])
49
    val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
50
    val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned")
50
                           .withRequiredArg
51
                           .withRequiredArg
51
                           .describedAs("count")
52
                           .describedAs("count")
52
                           .ofType(classOf[java.lang.Integer])
53
                           .ofType(classOf[java.lang.Integer])
53
                           .defaultsTo(1)
54
                           .defaultsTo(1)

    
   
55
    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")

    
   
56
                           .withRequiredArg

    
   
57
                           .describedAs("ms")

    
   
58
                           .ofType(classOf[java.lang.Integer])

    
   
59
                           .defaultsTo(1000)
54

    
   
60

   
55
    val options = parser.parse(args : _*)
61
    val options = parser.parse(args : _*)
56

    
   
62

   
57
    for(arg <- List(urlOpt, topicOpt, timeOpt)) {
63
    for(arg <- List(brokerListOpt, topicOpt, timeOpt)) {
58
      if(!options.has(arg)) {
64
      if(!options.has(arg)) {
59
        System.err.println("Missing required argument \"" + arg + "\"")
65
        System.err.println("Missing required argument \"" + arg + "\"")
60
        parser.printHelpOn(System.err)
66
        parser.printHelpOn(System.err)
61
        System.exit(1)
67
        System.exit(1)
62
      }
68
      }
63
    }
69
    }
64

    
   
70

   
65
    val url = new URI(options.valueOf(urlOpt))
71
    val clientId = "GetOffsetShell"

    
   
72
    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
66
    val topic = options.valueOf(topicOpt)
73
    val topic = options.valueOf(topicOpt)
67
    val partition = options.valueOf(partitionOpt).intValue
74
    var partitionList = options.valueOf(partitionOpt)
68
    var time = options.valueOf(timeOpt).longValue
75
    var time = options.valueOf(timeOpt).longValue
69
    val nOffsets = options.valueOf(nOffsetsOpt).intValue
76
    val nOffsets = options.valueOf(nOffsetsOpt).intValue
70
    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
77
    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
71
    val topicAndPartition = TopicAndPartition(topic, partition)
78

   

    
   
79
    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata

    
   
80
    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {

    
   
81
      System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +

    
   
82
        "kafka-list-topic.sh to verify")

    
   
83
      System.exit(1)

    
   
84
    }

    
   
85
    val partitions =

    
   
86
      if(partitionList == "") {

    
   
87
        topicsMetadata.head.partitionsMetadata.map(_.partitionId)

    
   
88
      } else {

    
   
89
        partitionList.mkString(",").map(_.toInt)

    
   
90
      }

    
   
91
    partitions.foreach { partitionId =>

    
   
92
      val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)

    
   
93
      partitionMetadataOpt match {

    
   
94
        case Some(metadata) =>

    
   
95
          metadata.leader match {

    
   
96
            case Some(leader) =>

    
   
97
              val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)

    
   
98
              val topicAndPartition = TopicAndPartition(topic, partitionId)
72
    val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
99
              val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
73
    val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
100
              val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
74
    println("get " + offsets.length + " results")
101

   
75
    for (offset <- offsets)
102
              println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
76
      println(offset)
103
            case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))

    
   
104
          }

    
   
105
        case None => System.err.println("Error: partition %d does not exist".format(partitionId))

    
   
106
      }

    
   
107
    }
77
  }
108
  }
78
}
109
}
  1. core/src/main/scala/kafka/tools/GetOffsetShell.scala: Loading...