Review Board 1.7.22


Small bug fix

Review Request #14898 - Created Oct. 24, 2013 and updated

Guozhang Wang
KAFKA-1060
Reviewers
kafka
kafka
KAFKA-1060.v3.1


KAFKA-1060.v3


KAFKA-1060.v2


KAFKA-1060.v1

 

Diff revision 4 (Latest)

1 2 3 4
1 2 3 4

  1. core/src/main/scala/kafka/network/RequestChannel.scala: Loading...
  2. core/src/main/scala/kafka/server/KafkaRequestHandler.scala: Loading...
core/src/main/scala/kafka/network/RequestChannel.scala
Revision 77d7ec034d8534c0d3aef49e9516b9b853c99cb2 New Change
[20] 39 lines
[+20]
40
    byteBuffer.rewind()
40
    byteBuffer.rewind()
41
    byteBuffer
41
    byteBuffer
42
  }
42
  }
43

    
   
43

   
44
  case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
44
  case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
45
    @volatile var dequeueTimeMs = -1L
45
    @volatile var requestDequeueTimeMs = -1L
46
    @volatile var apiLocalCompleteTimeMs = -1L
46
    @volatile var apiLocalCompleteTimeMs = -1L
47
    @volatile var responseCompleteTimeMs = -1L
47
    @volatile var responseCompleteTimeMs = -1L

    
   
48
    @volatile var responseDequeueTimeMs = -1L
48
    val requestId = buffer.getShort()
49
    val requestId = buffer.getShort()
49
    val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
50
    val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
50
    buffer = null
51
    buffer = null
51
    private val requestLogger = Logger.getLogger("kafka.request.logger")
52
    private val requestLogger = Logger.getLogger("kafka.request.logger")
52
    trace("Processor %d received request : %s".format(processor, requestObj))
53
    trace("Processor %d received request : %s".format(processor, requestObj))
53

    
   
54

   
54
    def updateRequestMetrics() {
55
    def updateRequestMetrics() {
55
      val endTimeMs = SystemTime.milliseconds
56
      val endTimeMs = SystemTime.milliseconds
56
      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote
57
      // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote
57
      // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
58
      // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs.
58
      if (apiLocalCompleteTimeMs < 0)
59
      if (apiLocalCompleteTimeMs < 0)
59
        apiLocalCompleteTimeMs = responseCompleteTimeMs
60
        apiLocalCompleteTimeMs = responseCompleteTimeMs
60
      val queueTime = (dequeueTimeMs - startTimeMs).max(0L)
61
      val requestQueueTime = (requestDequeueTimeMs - startTimeMs).max(0L)
61
      val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L)
62
      val apiLocalTime = (apiLocalCompleteTimeMs - requestDequeueTimeMs).max(0L)
62
      val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
63
      val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
63
      val responseSendTime = (endTimeMs - responseCompleteTimeMs).max(0L)
64
      val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L)

    
   
65
      val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
64
      val totalTime = endTimeMs - startTimeMs
66
      val totalTime = endTimeMs - startTimeMs
65
      var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
67
      var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
66
      if (requestId == RequestKeys.FetchKey) {
68
      if (requestId == RequestKeys.FetchKey) {
67
        val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
69
        val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
68
        metricsList ::= ( if (isFromFollower)
70
        metricsList ::= ( if (isFromFollower)
69
                            RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
71
                            RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
70
                          else
72
                          else
71
                            RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) )
73
                            RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) )
72
      }
74
      }
73
      metricsList.foreach{
75
      metricsList.foreach{
74
        m => m.requestRate.mark()
76
        m => m.requestRate.mark()
75
             m.queueTimeHist.update(queueTime)
77
             m.requestQueueTimeHist.update(requestQueueTime)
76
             m.localTimeHist.update(apiLocalTime)
78
             m.localTimeHist.update(apiLocalTime)
77
             m.remoteTimeHist.update(apiRemoteTime)
79
             m.remoteTimeHist.update(apiRemoteTime)

    
   
80
             m.responseQueueTimeHist.update(responseQueueTime)
78
             m.responseSendTimeHist.update(responseSendTime)
81
             m.responseSendTimeHist.update(responseSendTime)
79
             m.totalTimeHist.update(totalTime)
82
             m.totalTimeHist.update(totalTime)
80
      }
83
      }
81
      if(requestLogger.isTraceEnabled)
84
      if(requestLogger.isTraceEnabled)
82
        requestLogger.trace("Completed request:%s from client %s;totalTime:%d,queueTime:%d,localTime:%d,remoteTime:%d,sendTime:%d"
85
        requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
83
          .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
86
          .format(requestObj, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
84
    }
87
    }
85
  }
88
  }
86
  
89
  
87
  case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
90
  case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
88
    request.responseCompleteTimeMs = SystemTime.milliseconds
91
    request.responseCompleteTimeMs = SystemTime.milliseconds
[+20] [20] 63 lines
[+20]
152
  /** Get the next request or block until there is one */
155
  /** Get the next request or block until there is one */
153
  def receiveRequest(): RequestChannel.Request =
156
  def receiveRequest(): RequestChannel.Request =
154
    requestQueue.take()
157
    requestQueue.take()
155

    
   
158

   
156
  /** Get a response for the given processor if there is one */
159
  /** Get a response for the given processor if there is one */
157
  def receiveResponse(processor: Int): RequestChannel.Response =
160
  def receiveResponse(processor: Int): RequestChannel.Response = {
158
    responseQueues(processor).poll()
161
    val response = responseQueues(processor).poll()

    
   
162
    if (response != null)

    
   
163
      response.request.responseDequeueTimeMs = SystemTime.milliseconds

    
   
164
    response

    
   
165
  }
159

    
   
166

   
160
  def addResponseListener(onResponse: Int => Unit) { 
167
  def addResponseListener(onResponse: Int => Unit) { 
161
    responseListeners ::= onResponse
168
    responseListeners ::= onResponse
162
  }
169
  }
163

    
   
170

   
[+20] [20] 11 lines
[+20]
175
}
182
}
176

    
   
183

   
177
class RequestMetrics(name: String) extends KafkaMetricsGroup {
184
class RequestMetrics(name: String) extends KafkaMetricsGroup {
178
  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
185
  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
179
  // time a request spent in a request queue
186
  // time a request spent in a request queue
180
  val queueTimeHist = newHistogram(name + "-QueueTimeMs")
187
  val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs")
181
  // time a request takes to be processed at the local broker
188
  // time a request takes to be processed at the local broker
182
  val localTimeHist = newHistogram(name + "-LocalTimeMs")
189
  val localTimeHist = newHistogram(name + "-LocalTimeMs")
183
  // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
190
  // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
184
  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")
191
  val remoteTimeHist = newHistogram(name + "-RemoteTimeMs")

    
   
192
  // time a response spent in a response queue

    
   
193
  val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs")
185
  // time to send the response to the requester
194
  // time to send the response to the requester
186
  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
195
  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs")
187
  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
196
  val totalTimeHist = newHistogram(name + "-TotalTimeMs")
188
}
197
}
189

    
   
198

   
core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Revision 6d562c213b0f55d70341f9d657a7d0c8b212338e New Change
 
  1. core/src/main/scala/kafka/network/RequestChannel.scala: Loading...
  2. core/src/main/scala/kafka/server/KafkaRequestHandler.scala: Loading...