Review Board 1.7.22


address Neha's review comments

Review Request #15201 - Created Nov. 4, 2013 and updated

Jun Rao
KAFKA-1117
Reviewers
kafka
kafka
kafka-1117; fix 5


kafka-1117; fix 4


kafka-1117; fix 3


kafka-1117; fix 2


kafka-1117; fix 1


kafka-1117

 

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 5. See what's changed.

1 2 3 4 5
1 2 3 4 5

  1. core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala: Loading...
core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
New File

    
   
1
/**

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one or more

    
   
3
 * contributor license agreements.  See the NOTICE file distributed with

    
   
4
 * this work for additional information regarding copyright ownership.

    
   
5
 * The ASF licenses this file to You under the Apache License, Version 2.0

    
   
6
 * (the "License"); you may not use this file except in compliance with

    
   
7
 * the License.  You may obtain a copy of the License at

    
   
8
 *

    
   
9
 *    http://www.apache.org/licenses/LICENSE-2.0

    
   
10
 *

    
   
11
 * Unless required by applicable law or agreed to in writing, software

    
   
12
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
14
 * See the License for the specific language governing permissions and

    
   
15
 * limitations under the License.

    
   
16
 */

    
   
17

   

    
   
18
package kafka.tools

    
   
19

   

    
   
20
import joptsimple.OptionParser

    
   
21
import kafka.cluster.Broker

    
   
22
import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}

    
   
23
import java.util.concurrent.CountDownLatch

    
   
24
import java.util.concurrent.atomic.{AtomicReference, AtomicInteger}

    
   
25
import kafka.consumer.SimpleConsumer

    
   
26
import kafka.client.ClientUtils

    
   
27
import java.util.regex.{PatternSyntaxException, Pattern}

    
   
28
import kafka.utils.{SystemTime, Logging, ShutdownableThread, Pool}

    
   
29
import kafka.common.{ErrorMapping, TopicAndPartition}

    
   
30
import kafka.api._

    
   
31

   

    
   
32
/**

    
   
33
 *  For verifying the consistency among replicas.

    
   
34
 *

    
   
35
 *  1. start a fetcher on every broker.

    
   
36
 *  2. each fetcher does the following

    
   
37
 *    2.1 issues fetch request

    
   
38
 *    2.2 puts the fetched result in a shared buffer

    
   
39
 *    2.3 waits for all other fetchers to finish step 2.2

    
   
40
 *    2.4 one of the fetchers verifies the consistency of fetched results among replicas

    
   
41
 *

    
   
42
 *  The consistency verification is up to the high watermark. The tool reports the

    
   
43
 *  max lag between the verified offset and the high watermark among all partitions.

    
   
44
 *

    
   
45
 *  If a broker goes down, the verification of the partitions on that broker is delayed

    
   
46
 *  until the broker is up again.

    
   
47
 *

    
   
48
 * Caveats:

    
   
49
 * 1. The tools needs all brokers to be up at startup time.

    
   
50
 * 2. The tool doesn't handle out of range offsets.

    
   
51
 */

    
   
52

   

    
   
53
object ReplicaVerificationTool extends Logging {

    
   
54
  val clientId= "replicaVerificationTool"

    
   
55

   

    
   
56
  def main(args: Array[String]): Unit = {

    
   
57
    val parser = new OptionParser

    
   
58
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")

    
   
59
                         .withRequiredArg

    
   
60
                         .describedAs("hostname:port,...,hostname:port")

    
   
61
                         .ofType(classOf[String])

    
   
62
    val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")

    
   
63
                         .withRequiredArg

    
   
64
                         .describedAs("fetchsize")

    
   
65
                         .ofType(classOf[java.lang.Integer])

    
   
66
                         .defaultsTo(1024 * 1024)

    
   
67
    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")

    
   
68
                         .withRequiredArg

    
   
69
                         .describedAs("ms")

    
   
70
                         .ofType(classOf[java.lang.Integer])

    
   
71
                         .defaultsTo(1000)

    
   
72
    val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.")

    
   
73
                         .withRequiredArg()

    
   
74
                         .describedAs("Java regex (String)")

    
   
75
                         .ofType(classOf[String])

    
   
76
                         .defaultsTo(".*")

    
   
77
    val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.")

    
   
78
                           .withRequiredArg

    
   
79
                           .describedAs("timestamp/-1(latest)/-2(earliest)")

    
   
80
                           .ofType(classOf[java.lang.Long])

    
   
81
                           .defaultsTo(-1L)

    
   
82
    val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.")

    
   
83
                         .withRequiredArg

    
   
84
                         .describedAs("ms")

    
   
85
                         .ofType(classOf[java.lang.Long])

    
   
86
                         .defaultsTo(30 * 1000L)

    
   
87

   

    
   
88

   

    
   
89
    val options = parser.parse(args : _*)

    
   
90
    for(arg <- List(brokerListOpt)) {

    
   
91
      if(!options.has(arg)) {

    
   
92
        error("Missing required argument \"" + arg + "\"")

    
   
93
        parser.printHelpOn(System.err)

    
   
94
        System.exit(1)

    
   
95
      }

    
   
96
    }

    
   
97

   

    
   
98
    val regex = options.valueOf(topicWhiteListOpt)

    
   
99
            .trim

    
   
100
            .replace(',', '|')

    
   
101
            .replace(" ", "")

    
   
102
            .replaceAll("""^["']+""","")

    
   
103
            .replaceAll("""["']+$""","") // property files may bring quotes

    
   
104

   

    
   
105
    try {

    
   
106
      Pattern.compile(regex)

    
   
107
    }

    
   
108
    catch {

    
   
109
      case e: PatternSyntaxException =>

    
   
110
        throw new RuntimeException(regex + " is an invalid regex.")

    
   
111
    }

    
   
112

   

    
   
113
    val fetchSize = options.valueOf(fetchSizeOpt).intValue

    
   
114
    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

    
   
115
    val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue()

    
   
116
    val reportInterval = options.valueOf(reportIntervalOpt).longValue()

    
   
117
    // getting topic metadata

    
   
118
    info("Getting topic metatdata...")

    
   
119
    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))

    
   
120
    val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)

    
   
121
    val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata)

    
   
122
    val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(

    
   
123
        topicMetadata => if (topicMetadata.topic.matches(regex)) true else false

    
   
124
    )

    
   
125
    val topicPartitionReplicaList: Seq[(String,  Int,  Int)] = filteredTopicMetadata.flatMap(

    
   
126
      topicMetadataResponse =>

    
   
127
        topicMetadataResponse.partitionsMetadata.flatMap(

    
   
128
          partitionMetadata =>

    
   
129
            partitionMetadata.replicas.map(broker => (topicMetadataResponse.topic, partitionMetadata.partitionId, broker.id))

    
   
130
        )

    
   
131
    )

    
   
132
    debug("Selected topic partitions: " + topicPartitionReplicaList)

    
   
133
    val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_._3)

    
   
134
      .map{case (brokerId, partitions) =>

    
   
135
             brokerId -> partitions.map{ case partition => new TopicAndPartition(partition._1, partition._2)}}

    
   
136
    debug("Topic partitions per broker: " + topicAndPartitionsPerBroker)

    
   
137
    val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] =

    
   
138
          topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica._1, replica._2))

    
   
139
          .map{case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size}

    
   
140
    debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition)

    
   
141
    val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap(

    
   
142
      topicMetadataResponse =>

    
   
143
        topicMetadataResponse.partitionsMetadata.map(

    
   
144
          partitionMetadata =>

    
   
145
            (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id))

    
   
146
    ).groupBy(_._2)

    
   
147
     .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map{

    
   
148
        case(topicAndPartition, leaderId) => topicAndPartition})

    
   
149
    debug("Leaders per broker: " + leadersPerBroker)

    
   
150

   

    
   
151
    val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition,

    
   
152
                                          leadersPerBroker,

    
   
153
                                          topicAndPartitionsPerBroker.size,

    
   
154
                                          brokerMap,

    
   
155
                                          initialOffsetTime,

    
   
156
                                          reportInterval)

    
   
157
    var doVerification = true

    
   
158
    // create all replica fetcher threads

    
   
159
    val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map{

    
   
160
      case (brokerId, topicAndPartitions) =>

    
   
161
        val replicaFetcher = new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId,

    
   
162
                                                sourceBroker = brokerMap(brokerId),

    
   
163
                                                topicAndPartitions = topicAndPartitions,

    
   
164
                                                replicaBuffer = replicaBuffer,

    
   
165
                                                socketTimeout = 30000,

    
   
166
                                                socketBufferSize = 256000,

    
   
167
                                                fetchSize = fetchSize,

    
   
168
                                                maxWait = maxWaitMs,

    
   
169
                                                minBytes = 1,

    
   
170
                                                doVerification = doVerification)

    
   
171
        doVerification = false

    
   
172
        replicaFetcher

    
   
173
    }

    
   
174

   

    
   
175
    Runtime.getRuntime.addShutdownHook(new Thread() {

    
   
176
      override def run() {

    
   
177
        info("Stopping all fetchers")

    
   
178
        fetcherThreads.foreach(_.shutdown())

    
   
179
      }

    
   
180
    })

    
   
181
    fetcherThreads.foreach(_.start())

    
   
182
    println("Verification process is started.")

    
   
183

   

    
   
184
  }

    
   
185
}

    
   
186

   

    
   
187
private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset])

    
   
188

   

    
   
189
private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long)

    
   
190

   

    
   
191
private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int],

    
   
192
                            leadersPerBroker: Map[Int, Seq[TopicAndPartition]],

    
   
193
                            expectedNumFetchers: Int,

    
   
194
                            brokerMap: Map[Int, Broker],

    
   
195
                            initialOffsetTime: Long,

    
   
196
                            reportInterval: Long) extends Logging {

    
   
197
  val fetchOffsetMap = new Pool[TopicAndPartition, Long]

    
   
198
  val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]]

    
   
199
  private val respondedNumBrokers = new AtomicInteger(0)

    
   
200
  private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))

    
   
201
  private val verificationBarrier = new AtomicReference(new CountDownLatch(1))

    
   
202
  @volatile private var lastReportTime = SystemTime.milliseconds

    
   
203
  private var maxLag: Long = -1L

    
   
204
  private var maxLagTopicAndPartition: TopicAndPartition = null

    
   
205
  initialize()

    
   
206

   

    
   
207
  def createNewFetcherBarrier() {

    
   
208
    fetcherBarrier.set(new CountDownLatch(expectedNumFetchers))

    
   
209
  }

    
   
210

   

    
   
211
  def getFetcherBarrier() = fetcherBarrier.get()

    
   
212

   

    
   
213
  def createNewVerificationBarrier() {

    
   
214
    verificationBarrier.set(new CountDownLatch(1))

    
   
215
  }

    
   
216

   

    
   
217
  def getVerificationBarrier() = verificationBarrier.get()

    
   
218

   

    
   
219
  private def initialize() {

    
   
220
    for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet)

    
   
221
      messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData])

    
   
222
    setInitialOffsets()

    
   
223
  }

    
   
224

   

    
   
225
  private def setInitialOffsets() {

    
   
226
    for ((brokerId, topicAndPartitions) <- leadersPerBroker) {

    
   
227
      val broker = brokerMap(brokerId)

    
   
228
      val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId)

    
   
229
      val offsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] =

    
   
230
        topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap

    
   
231
      val offsetRequest = OffsetRequest(offsetMap)

    
   
232
      val offsetResponse = consumer.getOffsetsBefore(offsetRequest)

    
   
233
      assert(!offsetResponse.hasError)

    
   
234
      offsetResponse.partitionErrorAndOffsets.foreach{

    
   
235
        case (topicAndPartition, partitionOffsetResponse) =>

    
   
236
          fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head)

    
   
237
      }

    
   
238
    }

    
   
239
  }

    
   
240

   

    
   
241
  def verifyCheckSum() {

    
   
242
    debug("Begin verification")

    
   
243
    maxLag = -1L

    
   
244
    for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) {

    
   
245
      debug("Verifying " + topicAndPartition)

    
   
246
      assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition))

    
   
247
      val messageIteratorMap = fetchResponsePerReplica.map{

    
   
248
        case(replicaId, fetchResponse) =>

    
   
249
          assert(fetchResponse.error == ErrorMapping.NoError)

    
   
250
          replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator}

    
   
251
      val maxHw = fetchResponsePerReplica.values.map(_.hw).max

    
   
252

   

    
   
253
      var isDone = false

    
   
254
      while (!isDone) {

    
   
255
        var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None

    
   
256
        for ( (replicaId, messageIterator) <- messageIteratorMap) {

    
   
257
          if (messageIterator.hasNext) {

    
   
258
            val messageAndOffset = messageIterator.next()

    
   
259

   

    
   
260
            // only verify up to the high watermark

    
   
261
            if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw)

    
   
262
              isDone = true

    
   
263
            else {

    
   
264
              messageInfoFromFirstReplicaOpt match {

    
   
265
                case None =>

    
   
266
                  messageInfoFromFirstReplicaOpt = Some(

    
   
267
                    MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum))

    
   
268
                case Some(messageSetFromFirstReplica) =>

    
   
269
                  if (messageSetFromFirstReplica.offset != messageAndOffset.offset) {

    
   
270
                    println("Partition " + topicAndPartition + ": replica " + messageSetFromFirstReplica.replicaId

    
   
271
                      + "'s offset " + messageSetFromFirstReplica.offset + " doesn't match replica "

    
   
272
                      + replicaId + "'s offset " + messageAndOffset.offset)

    
   
273
                    System.exit(1)

    
   
274
                  }

    
   
275
                  if (messageSetFromFirstReplica.checksum != messageAndOffset.message.checksum)

    
   
276
                    println("Partition " + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset

    
   
277
                      + "; replica " + messageSetFromFirstReplica.replicaId + "'s checksum " + messageSetFromFirstReplica.checksum

    
   
278
                      + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum)

    
   
279
              }

    
   
280
            }

    
   
281
          } else

    
   
282
            isDone = true

    
   
283
        }

    
   
284
        if (!isDone) {

    
   
285
          val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset

    
   
286
          fetchOffsetMap.put(topicAndPartition, nextOffset)

    
   
287
          debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " +

    
   
288
                nextOffset + " for " + topicAndPartition)

    
   
289
        }

    
   
290
      }

    
   
291
      if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) {

    
   
292
        maxLag = maxHw - fetchOffsetMap.get(topicAndPartition)

    
   
293
        maxLagTopicAndPartition = topicAndPartition

    
   
294
      }

    
   
295
      fetchResponsePerReplica.clear()

    
   
296
    }

    
   
297
    val currentTimeMs = SystemTime.milliseconds

    
   
298
    if (currentTimeMs - lastReportTime > reportInterval) {

    
   
299
      println("Time: " + currentTimeMs + ": max lag is " + maxLag + " for partition " + maxLagTopicAndPartition)

    
   
300
      lastReportTime = currentTimeMs

    
   
301
    }

    
   
302
  }

    
   
303
}

    
   
304

   

    
   
305
private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition],

    
   
306
                             replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int,

    
   
307
                             fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean)

    
   
308
  extends ShutdownableThread(name) {

    
   
309
  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId)

    
   
310
  val fetchRequestBuilder = new FetchRequestBuilder().

    
   
311
          clientId(ReplicaVerificationTool.clientId).

    
   
312
          replicaId(Request.DebuggingConsumerId).

    
   
313
          maxWait(maxWait).

    
   
314
          minBytes(minBytes)

    
   
315
  val offsetMap = replicaBuffer.fetchOffsetMap

    
   
316
  val messageSetCache = replicaBuffer.messageSetCache

    
   
317

   

    
   
318
  override def doWork() {

    
   
319

   

    
   
320
    val fetcherBarrier = replicaBuffer.getFetcherBarrier()

    
   
321
    val verificationBarrier = replicaBuffer.getVerificationBarrier()

    
   
322

   

    
   
323
    for (topicAndPartition <- topicAndPartitions)

    
   
324
      fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,

    
   
325
        offsetMap.get(topicAndPartition), fetchSize)

    
   
326

   

    
   
327
    val fetchRequest = fetchRequestBuilder.build()

    
   
328
    debug("Issuing fetch request " + fetchRequest)

    
   
329

   

    
   
330
    var response: FetchResponse = null

    
   
331
    try {

    
   
332
      response = simpleConsumer.fetch(fetchRequest)

    
   
333
    } catch {

    
   
334
      case t: Throwable =>

    
   
335
        if (!isRunning.get)

    
   
336
          throw t

    
   
337
    }

    
   
338

   

    
   
339
    if (response != null) {

    
   
340
      response.data.foreach {

    
   
341
        case(topicAndPartition, partitionData) =>

    
   
342
          messageSetCache.get(topicAndPartition).put(sourceBroker.id, partitionData)

    
   
343
      }

    
   
344
    } else {

    
   
345
      for (topicAndPartition <- topicAndPartitions)

    
   
346
        messageSetCache.get(topicAndPartition).put(sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty))

    
   
347
    }

    
   
348

   

    
   
349
    fetcherBarrier.countDown()

    
   
350
    debug("Done fetching")

    
   
351

   

    
   
352
    // wait for all fetchers to finish

    
   
353
    fetcherBarrier.await()

    
   
354
    debug("Ready for verification")

    
   
355

   

    
   
356
    // one of the fetchers will do the verification

    
   
357
    if (doVerification) {

    
   
358
      debug("Do verifcation")

    
   
359
      replicaBuffer.verifyCheckSum()

    
   
360
      replicaBuffer.createNewFetcherBarrier()

    
   
361
      replicaBuffer.createNewVerificationBarrier()

    
   
362
      debug("Created new barrier")

    
   
363
      verificationBarrier.countDown()

    
   
364
    }

    
   
365

   

    
   
366
    verificationBarrier.await()

    
   
367
    debug("Done verifcation")

    
   
368
  }

    
   
369
}
  1. core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala: Loading...