Review Board 1.7.22


Patch for KAFKA-1107

Review Request #15137 - Created Oct. 31, 2013 and updated

Neha Narkhede
KAFKA-1107
Reviewers
kafka
jkreps
kafka
Remove whitespace changes to LogTest


Removed an unncessary log statement and fixed a few more tests that initially had the Log public API change


Per Jay's suggestion, avoid changing the public API of Log


Avoid printing clean shutdown file log message on every log


KAFKA-1107 Broker unnecessarily recovers all logs when upgrading from 0.8 to 0.8.1; Fix includes modifying Log to avoid recovery if the clean shutdown file exists; LogManager deletes the clean shutdown file after all logs in a log directory have finished loading; If the clean shutdown file does not exist, fall back to 0.8.1 recovery logic

 

Diff revision 5 (Latest)

1 2 3 4 5
1 2 3 4 5

  1. core/src/main/scala/kafka/log/Log.scala: Loading...
  2. core/src/main/scala/kafka/log/LogManager.scala: Loading...
  3. core/src/test/scala/unit/kafka/log/LogTest.scala: Loading...
core/src/main/scala/kafka/log/Log.scala
Revision 0cc402b13e8484ae5569f1b8ff7156331a2f82d7 New Change
[20] 162 lines
[+20]
163
    }
163
    }
164
  }
164
  }
165
  
165
  
166
  private def recoverLog() {
166
  private def recoverLog() {
167
    val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
167
    val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}

    
   
168
    val cleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile)

    
   
169
    val needsRecovery = !cleanShutdownFile.exists()

    
   
170
    if(!needsRecovery) {

    
   
171
      this.recoveryPoint = lastOffset

    
   
172
      return

    
   
173
    }
168
    if(lastOffset <= this.recoveryPoint) {
174
    if(lastOffset <= this.recoveryPoint) {
169
      info("Log '%s' is fully intact, skipping recovery".format(name))
175
      info("Log '%s' is fully intact, skipping recovery".format(name))
170
      this.recoveryPoint = lastOffset
176
      this.recoveryPoint = lastOffset
171
      return
177
      return
172
    }
178
    }
[+20] [20] 522 lines
[+20]
695
  val CleanedFileSuffix = ".cleaned"
701
  val CleanedFileSuffix = ".cleaned"
696
    
702
    
697
  /** A temporary file used when swapping files into the log */
703
  /** A temporary file used when swapping files into the log */
698
  val SwapFileSuffix = ".swap"
704
  val SwapFileSuffix = ".swap"
699

    
   
705

   

    
   
706
  /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility

    
   
707
    * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */

    
   
708
  /** TODO: Get rid of CleanShutdownFile in 0.8.2 */

    
   
709
  val CleanShutdownFile = ".kafka_cleanshutdown"

    
   
710

   
700
  /**
711
  /**
701
   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
712
   * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
702
   * so that ls sorts the files numerically.
713
   * so that ls sorts the files numerically.
703
   * @param offset The offset to use in the file name
714
   * @param offset The offset to use in the file name
704
   * @return The filename
715
   * @return The filename
[+20] [20] 27 lines
core/src/main/scala/kafka/log/LogManager.scala
Revision d489e08452ab97334d504f76f381eb314ec56901 New Change
 
core/src/test/scala/unit/kafka/log/LogTest.scala
Revision 140317c6ab6741308d125e9c1f43078b672c5f95 New Change
 
  1. core/src/main/scala/kafka/log/Log.scala: Loading...
  2. core/src/main/scala/kafka/log/LogManager.scala: Loading...
  3. core/src/test/scala/unit/kafka/log/LogTest.scala: Loading...