Review Board 1.7.22


Follow up to handle ack=0 in handleError

Review Request #14140 - Created Sept. 13, 2013 and updated

Guozhang Wang
KAFKA-955
Reviewers
kafka
kafka

 

 

Diff revision 1 (Latest)

  1. core/src/main/scala/kafka/api/ProducerRequest.scala: Loading...
core/src/main/scala/kafka/api/ProducerRequest.scala
Revision fda3e39184307798f52083a42cf8468b3fb134d7 New Change
[20] 132 lines
[+20]
133
    producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
133
    producerRequest.append("; TopicAndPartition: " + topicPartitionMessageSizeMap.mkString(","))
134
    producerRequest.toString()
134
    producerRequest.toString()
135
  }
135
  }
136

    
   
136

   
137
  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
137
  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {

    
   
138
    if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {

    
   
139
        requestChannel.closeConnection(request.processor, request)

    
   
140
    }

    
   
141
    else {
138
    val producerResponseStatus = data.map {
142
      val producerResponseStatus = data.map {
139
      case (topicAndPartition, data) =>
143
        case (topicAndPartition, data) =>
140
        (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
144
          (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
141
    }
145
      }
142
    val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
146
      val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
143
    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
147
      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
144
  }
148
    }

    
   
149
  }
145

    
   
150

   
146
  def emptyData(){
151
  def emptyData(){
147
    data.clear()
152
    data.clear()
148
  }
153
  }
149
}
154
}
150

    
   
155

   
  1. core/src/main/scala/kafka/api/ProducerRequest.scala: Loading...