Review Board 1.7.22


HBASE-4797 [availability] skip recovered edits files older than the region has

Review Request #2906 - Created Nov. 21, 2011 and submitted

Jimmy Xiang
git://git.apache.org/hbase.git
HBASE-4797
Reviewers
hbase
stack, tlipcon
hbase-git
If there are multiple recovered edits files, I used the file name to find the initial sequence id.  After these files are sorted, we can find a file's possible maximum sequence id based on the next file's initial sequence id.  If the maximum sequence id is smaller than the current sequence id, the whole recovered edits file is old and ignored.
Added test case to TestHRegion, and all the tests in this test are passed.

Diff revision 4 (Latest)

1 2 3 4
1 2 3 4

  1. src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java: Loading...
  2. src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java: Loading...
src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Revision 8b89661 New Change
[20] 59 lines
[+20]
60
import org.apache.hadoop.hbase.DoNotRetryIOException;
60
import org.apache.hadoop.hbase.DoNotRetryIOException;
61
import org.apache.hadoop.hbase.DroppedSnapshotException;
61
import org.apache.hadoop.hbase.DroppedSnapshotException;
62
import org.apache.hadoop.hbase.HBaseConfiguration;
62
import org.apache.hadoop.hbase.HBaseConfiguration;
63
import org.apache.hadoop.hbase.HColumnDescriptor;
63
import org.apache.hadoop.hbase.HColumnDescriptor;
64
import org.apache.hadoop.hbase.HConstants;
64
import org.apache.hadoop.hbase.HConstants;

    
   
65
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
65
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
66
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
66
import org.apache.hadoop.hbase.HRegionInfo;
67
import org.apache.hadoop.hbase.HRegionInfo;
67
import org.apache.hadoop.hbase.HTableDescriptor;
68
import org.apache.hadoop.hbase.HTableDescriptor;
68
import org.apache.hadoop.hbase.KeyValue;
69
import org.apache.hadoop.hbase.KeyValue;
69
import org.apache.hadoop.hbase.NotServingRegionException;
70
import org.apache.hadoop.hbase.NotServingRegionException;
70
import org.apache.hadoop.hbase.UnknownScannerException;
71
import org.apache.hadoop.hbase.UnknownScannerException;
71
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;

   
72
import org.apache.hadoop.hbase.client.Append;
72
import org.apache.hadoop.hbase.client.Append;
73
import org.apache.hadoop.hbase.client.Delete;
73
import org.apache.hadoop.hbase.client.Delete;
74
import org.apache.hadoop.hbase.client.Get;
74
import org.apache.hadoop.hbase.client.Get;
75
import org.apache.hadoop.hbase.client.Increment;
75
import org.apache.hadoop.hbase.client.Increment;
76
import org.apache.hadoop.hbase.client.Put;
76
import org.apache.hadoop.hbase.client.Put;
77
import org.apache.hadoop.hbase.client.Result;
77
import org.apache.hadoop.hbase.client.Result;
78
import org.apache.hadoop.hbase.client.Row;
78
import org.apache.hadoop.hbase.client.Row;
79
import org.apache.hadoop.hbase.client.RowLock;
79
import org.apache.hadoop.hbase.client.RowLock;
80
import org.apache.hadoop.hbase.client.Scan;
80
import org.apache.hadoop.hbase.client.Scan;
81
import org.apache.hadoop.hbase.client.coprocessor.Exec;
81
import org.apache.hadoop.hbase.client.coprocessor.Exec;
82
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
82
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;

    
   
83
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
83
import org.apache.hadoop.hbase.filter.Filter;
84
import org.apache.hadoop.hbase.filter.Filter;
84
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
85
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
85
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
86
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
86
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

   
87
import org.apache.hadoop.hbase.io.HeapSize;
87
import org.apache.hadoop.hbase.io.HeapSize;
88
import org.apache.hadoop.hbase.io.TimeRange;
88
import org.apache.hadoop.hbase.io.TimeRange;
89
import org.apache.hadoop.hbase.io.hfile.BlockCache;
89
import org.apache.hadoop.hbase.io.hfile.BlockCache;
90
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
90
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
91
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
91
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
[+20] [20] 73 lines
[+20] [+] public class HRegion implements HeapSize { // , Writable{
165

    
   
165

   
166
  //////////////////////////////////////////////////////////////////////////////
166
  //////////////////////////////////////////////////////////////////////////////
167
  // Members
167
  // Members
168
  //////////////////////////////////////////////////////////////////////////////
168
  //////////////////////////////////////////////////////////////////////////////
169

    
   
169

   
170
  private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows = 
170
  private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
171
    new ConcurrentHashMap<HashedBytes, CountDownLatch>();
171
    new ConcurrentHashMap<HashedBytes, CountDownLatch>();
172
  private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
172
  private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
173
    new ConcurrentHashMap<Integer, HashedBytes>();
173
    new ConcurrentHashMap<Integer, HashedBytes>();
174
  private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
174
  private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
175
  static private Random rand = new Random();
175
  static private Random rand = new Random();
[+20] [20] 304 lines
[+20] [+] public long initialize() throws IOException {
480
   * @return What the next sequence (edit) id should be.
480
   * @return What the next sequence (edit) id should be.
481
   * @throws IOException e
481
   * @throws IOException e
482
   */
482
   */
483
  public long initialize(final CancelableProgressable reporter)
483
  public long initialize(final CancelableProgressable reporter)
484
  throws IOException {
484
  throws IOException {
485
  
485

   
486
    MonitoredTask status = TaskMonitor.get().createStatus(
486
    MonitoredTask status = TaskMonitor.get().createStatus(
487
        "Initializing region " + this);
487
        "Initializing region " + this);
488
    
488

   
489
    if (coprocessorHost != null) {
489
    if (coprocessorHost != null) {
490
      status.setStatus("Running coprocessor pre-open hook");
490
      status.setStatus("Running coprocessor pre-open hook");
491
      coprocessorHost.preOpen();
491
      coprocessorHost.preOpen();
492
    }
492
    }
493
    // A region can be reopened if failed a split; reset flags
493
    // A region can be reopened if failed a split; reset flags
[+20] [20] 57 lines
[+20] public long initialize(final CancelableProgressable reporter)
551
    FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
551
    FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
552

    
   
552

   
553
    this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
553
    this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
554

    
   
554

   
555
    this.writestate.compacting = 0;
555
    this.writestate.compacting = 0;
556
    
556

   
557
    // Initialize split policy
557
    // Initialize split policy
558
    this.splitPolicy = RegionSplitPolicy.create(this, conf);
558
    this.splitPolicy = RegionSplitPolicy.create(this, conf);
559
    
559

   
560
    this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
560
    this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
561
    // Use maximum of log sequenceid or that which was found in stores
561
    // Use maximum of log sequenceid or that which was found in stores
562
    // (particularly if no recovered edits, seqid will be -1).
562
    // (particularly if no recovered edits, seqid will be -1).
563
    long nextSeqid = maxSeqId + 1;
563
    long nextSeqid = maxSeqId + 1;
564
    LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
564
    LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
565

    
   
565

   
566
    
566

   
567
    if (coprocessorHost != null) {
567
    if (coprocessorHost != null) {
568
      status.setStatus("Running coprocessor post-open hooks");
568
      status.setStatus("Running coprocessor post-open hooks");
569
      coprocessorHost.postOpen();
569
      coprocessorHost.postOpen();
570
    }
570
    }
571
    status.markComplete("Region opened successfully");
571
    status.markComplete("Region opened successfully");
[+20] [20] 26 lines
[+20] [+] public boolean hasReferences() {
598
        if (sf.isReference()) return true;
598
        if (sf.isReference()) return true;
599
      }
599
      }
600
    }
600
    }
601
    return false;
601
    return false;
602
  }
602
  }
603
  
603

   
604
  /**
604
  /**
605
   * This function will return the HDFS blocks distribution based on the data
605
   * This function will return the HDFS blocks distribution based on the data
606
   * captured when HFile is created
606
   * captured when HFile is created
607
   * @return The HDFS blocks distribution for the region.
607
   * @return The HDFS blocks distribution for the region.
608
   */
608
   */
[+20] [20] 26 lines
[+20] [+] static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
635
    HDFSBlocksDistribution hdfsBlocksDistribution =
635
    HDFSBlocksDistribution hdfsBlocksDistribution =
636
      new HDFSBlocksDistribution();
636
      new HDFSBlocksDistribution();
637
    Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
637
    Path tablePath = FSUtils.getTablePath(FSUtils.getRootDir(conf),
638
      tableDescriptor.getName());
638
      tableDescriptor.getName());
639
    FileSystem fs = tablePath.getFileSystem(conf);
639
    FileSystem fs = tablePath.getFileSystem(conf);
640
         
640

   
641
    for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
641
    for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
642
      Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName,
642
      Path storeHomeDir = Store.getStoreHomedir(tablePath, regionEncodedName,
643
      family.getName());
643
      family.getName());
644
      if (!fs.exists(storeHomeDir))continue;
644
      if (!fs.exists(storeHomeDir))continue;
645

    
   
645

   
[+20] [20] 7 lines
[+20] static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
653
        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
653
        hdfsBlocksDistribution.add(storeFileBlocksDistribution);
654
      }
654
      }
655
    }
655
    }
656
    return hdfsBlocksDistribution;
656
    return hdfsBlocksDistribution;
657
  }
657
  }
658
  
658

   
659
  public AtomicLong getMemstoreSize() {
659
  public AtomicLong getMemstoreSize() {
660
    return memstoreSize;
660
    return memstoreSize;
661
  }
661
  }
662
  
662

   
663
  /**
663
  /**
664
   * Increase the size of mem store in this region and the size of global mem 
664
   * Increase the size of mem store in this region and the size of global mem
665
   * store
665
   * store
666
   * @param memStoreSize
666
   * @param memStoreSize
667
   * @return the size of memstore in this region
667
   * @return the size of memstore in this region
668
   */
668
   */
669
  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
669
  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
670
    if (this.rsServices != null) {
670
    if (this.rsServices != null) {
671
      RegionServerAccounting rsAccounting = 
671
      RegionServerAccounting rsAccounting =
672
        this.rsServices.getRegionServerAccounting();
672
        this.rsServices.getRegionServerAccounting();
673
      
673

   
674
      if (rsAccounting != null) {
674
      if (rsAccounting != null) {
675
        rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
675
        rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
676
      }
676
      }
677
    }
677
    }
678
    return this.memstoreSize.getAndAdd(memStoreSize);  
678
    return this.memstoreSize.getAndAdd(memStoreSize);
679
  }
679
  }
680

    
   
680

   
681
  /*
681
  /*
682
   * Write out an info file under the region directory.  Useful recovering
682
   * Write out an info file under the region directory.  Useful recovering
683
   * mangled regions.
683
   * mangled regions.
[+20] [20] 104 lines
[+20] [+] public MultiVersionConsistencyControl getMVCC() {
788
    // Only allow one thread to close at a time. Serialize them so dual
788
    // Only allow one thread to close at a time. Serialize them so dual
789
    // threads attempting to close will run up against each other.
789
    // threads attempting to close will run up against each other.
790
    MonitoredTask status = TaskMonitor.get().createStatus(
790
    MonitoredTask status = TaskMonitor.get().createStatus(
791
        "Closing region " + this +
791
        "Closing region " + this +
792
        (abort ? " due to abort" : ""));
792
        (abort ? " due to abort" : ""));
793
    
793

   
794
    status.setStatus("Waiting for close lock");
794
    status.setStatus("Waiting for close lock");
795
    try {
795
    try {
796
      synchronized (closeLock) {
796
      synchronized (closeLock) {
797
        return doClose(abort, status);
797
        return doClose(abort, status);
798
      }
798
      }
[+20] [20] 374 lines
[+20] [+] public boolean flushcache() throws IOException {
1173
                (writestate.flushing ? "already flushing" : "writes not enabled"));
1173
                (writestate.flushing ? "already flushing" : "writes not enabled"));
1174
            return false;
1174
            return false;
1175
          }
1175
          }
1176
        }
1176
        }
1177
        boolean result = internalFlushcache(status);
1177
        boolean result = internalFlushcache(status);
1178
        
1178

   
1179
        if (coprocessorHost != null) {
1179
        if (coprocessorHost != null) {
1180
          status.setStatus("Running post-flush coprocessor hooks");
1180
          status.setStatus("Running post-flush coprocessor hooks");
1181
          coprocessorHost.postFlush();
1181
          coprocessorHost.postFlush();
1182
        }
1182
        }
1183

    
   
1183

   
[+20] [20] 37 lines
[+20] public boolean flushcache() throws IOException {
1221
   * </ul>
1221
   * </ul>
1222
   * <p>This method is protected, but can be accessed via several public
1222
   * <p>This method is protected, but can be accessed via several public
1223
   * routes.
1223
   * routes.
1224
   *
1224
   *
1225
   * <p> This method may block for some time.
1225
   * <p> This method may block for some time.
1226
   * @param status 
1226
   * @param status
1227
   *
1227
   *
1228
   * @return true if the region needs compacting
1228
   * @return true if the region needs compacting
1229
   *
1229
   *
1230
   * @throws IOException general io exceptions
1230
   * @throws IOException general io exceptions
1231
   * @throws DroppedSnapshotException Thrown when replay of hlog is required
1231
   * @throws DroppedSnapshotException Thrown when replay of hlog is required
[+20] [20] 5 lines
[+20] [+] protected boolean internalFlushcache(MonitoredTask status) throws IOException {
1237

    
   
1237

   
1238
  /**
1238
  /**
1239
   * @param wal Null if we're NOT to go via hlog/wal.
1239
   * @param wal Null if we're NOT to go via hlog/wal.
1240
   * @param myseqid The seqid to use if <code>wal</code> is null writing out
1240
   * @param myseqid The seqid to use if <code>wal</code> is null writing out
1241
   * flush file.
1241
   * flush file.
1242
   * @param status 
1242
   * @param status
1243
   * @return true if the region needs compacting
1243
   * @return true if the region needs compacting
1244
   * @throws IOException
1244
   * @throws IOException
1245
   * @see #internalFlushcache(MonitoredTask)
1245
   * @see #internalFlushcache(MonitoredTask)
1246
   */
1246
   */
1247
  protected boolean internalFlushcache(
1247
  protected boolean internalFlushcache(
[+20] [20] 526 lines
[+20] [+] public OperationStatus[] put(Put[] puts) throws IOException {
1774
    return put(putsAndLocks);
1774
    return put(putsAndLocks);
1775
  }
1775
  }
1776

    
   
1776

   
1777
  /**
1777
  /**
1778
   * Perform a batch of puts.
1778
   * Perform a batch of puts.
1779
   * 
1779
   *
1780
   * @param putsAndLocks
1780
   * @param putsAndLocks
1781
   *          the list of puts paired with their requested lock IDs.
1781
   *          the list of puts paired with their requested lock IDs.
1782
   * @return an array of OperationStatus which internally contains the
1782
   * @return an array of OperationStatus which internally contains the
1783
   *         OperationStatusCode and the exceptionMessage if any.
1783
   *         OperationStatusCode and the exceptionMessage if any.
1784
   * @throws IOException
1784
   * @throws IOException
[+20] [20] 192 lines
[+20] [+] private long doMiniBatchPut(
1977
      }
1977
      }
1978
      // -------------------------
1978
      // -------------------------
1979
      // STEP 7. Sync wal.
1979
      // STEP 7. Sync wal.
1980
      // -------------------------
1980
      // -------------------------
1981
      if (walEdit.size() > 0 &&
1981
      if (walEdit.size() > 0 &&
1982
          (this.regionInfo.isMetaRegion() || 
1982
          (this.regionInfo.isMetaRegion() ||
1983
           !this.htableDescriptor.isDeferredLogFlush())) {
1983
           !this.htableDescriptor.isDeferredLogFlush())) {
1984
        this.log.sync(txid);
1984
        this.log.sync(txid);
1985
      }
1985
      }
1986
      walSyncSuccessful = true;
1986
      walSyncSuccessful = true;
1987
      // ------------------------------------------------------------------
1987
      // ------------------------------------------------------------------
[+20] [20] 335 lines
[+20] [+] private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap,
2323
     return size;
2323
     return size;
2324
   }
2324
   }
2325

    
   
2325

   
2326
  /**
2326
  /**
2327
   * Remove all the keys listed in the map from the memstore. This method is
2327
   * Remove all the keys listed in the map from the memstore. This method is
2328
   * called when a Put has updated memstore but subequently fails to update 
2328
   * called when a Put has updated memstore but subequently fails to update
2329
   * the wal. This method is then invoked to rollback the memstore.
2329
   * the wal. This method is then invoked to rollback the memstore.
2330
   */
2330
   */
2331
  private void rollbackMemstore(BatchOperationInProgress<Pair<Put, Integer>> batchOp,
2331
  private void rollbackMemstore(BatchOperationInProgress<Pair<Put, Integer>> batchOp,
2332
                                Map<byte[], List<KeyValue>>[] familyMaps,
2332
                                Map<byte[], List<KeyValue>>[] familyMaps,
2333
                                int start, int end) {
2333
                                int start, int end) {
2334
    int kvsRolledback = 0;
2334
    int kvsRolledback = 0;
2335
    for (int i = start; i < end; i++) {
2335
    for (int i = start; i < end; i++) {
2336
      // skip over request that never succeeded in the first place.
2336
      // skip over request that never succeeded in the first place.
2337
      if (batchOp.retCodeDetails[i].getOperationStatusCode()
2337
      if (batchOp.retCodeDetails[i].getOperationStatusCode()
2338
            != OperationStatusCode.SUCCESS) {
2338
            != OperationStatusCode.SUCCESS) {
2339
        continue;
2339
        continue;
2340
      }
2340
      }
2341

    
   
2341

   
2342
      // Rollback all the kvs for this row. 
2342
      // Rollback all the kvs for this row.
2343
      Map<byte[], List<KeyValue>> familyMap  = familyMaps[i]; 
2343
      Map<byte[], List<KeyValue>> familyMap  = familyMaps[i];
2344
      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
2344
      for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
2345
        byte[] family = e.getKey();
2345
        byte[] family = e.getKey();
2346
        List<KeyValue> edits = e.getValue();
2346
        List<KeyValue> edits = e.getValue();
2347

    
   
2347

   
2348
        // Remove those keys from the memstore that matches our 
2348
        // Remove those keys from the memstore that matches our
2349
        // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
2349
        // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
2350
        // that even the memstoreTS has to match for keys that will be rolleded-back.
2350
        // that even the memstoreTS has to match for keys that will be rolleded-back.
2351
        Store store = getStore(family);
2351
        Store store = getStore(family);
2352
        for (KeyValue kv: edits) {
2352
        for (KeyValue kv: edits) {
2353
          store.rollback(kv);
2353
          store.rollback(kv);
[+20] [20] 97 lines
[+20] [+] protected long replayRecoveredEditsIfAny(final Path regiondir,
2451
      final MonitoredTask status)
2451
      final MonitoredTask status)
2452
  throws UnsupportedEncodingException, IOException {
2452
      throws UnsupportedEncodingException, IOException {
2453
    long seqid = minSeqId;
2453
    long seqid = minSeqId;
2454
    NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
2454
    NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
2455
    if (files == null || files.isEmpty()) return seqid;
2455
    if (files == null || files.isEmpty()) return seqid;

    
   
2456
    boolean checkSafeToSkip = true;
2456
    for (Path edits: files) {
2457
    for (Path edits: files) {
2457
      if (edits == null || !this.fs.exists(edits)) {
2458
      if (edits == null || !this.fs.exists(edits)) {
2458
        LOG.warn("Null or non-existent edits file: " + edits);
2459
        LOG.warn("Null or non-existent edits file: " + edits);
2459
        continue;
2460
        continue;
2460
      }
2461
      }
2461
      if (isZeroLengthThenDelete(this.fs, edits)) continue;
2462
      if (isZeroLengthThenDelete(this.fs, edits)) continue;

    
   
2463

   

    
   
2464
      if (checkSafeToSkip) {

    
   
2465
        Path higher = files.higher(edits);

    
   
2466
        long maxSeqId = Long.MAX_VALUE;

    
   
2467
        if (higher != null) {

    
   
2468
          // Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+"

    
   
2469
          String fileName = higher.getName();

    
   
2470
          maxSeqId = Math.abs(Long.parseLong(fileName));

    
   
2471
        }

    
   
2472
        if (maxSeqId <= minSeqId) {

    
   
2473
          String msg = "Maximum possible sequenceid for this log is " + maxSeqId

    
   
2474
              + ", skipped the whole file, path=" + edits;

    
   
2475
          LOG.debug(msg);

    
   
2476
          continue;

    
   
2477
        } else {

    
   
2478
          checkSafeToSkip = false;

    
   
2479
        }

    
   
2480
      }

    
   
2481

   
2462
      try {
2482
      try {
2463
        seqid = replayRecoveredEdits(edits, seqid, reporter);
2483
        seqid = replayRecoveredEdits(edits, seqid, reporter);
2464
      } catch (IOException e) {
2484
      } catch (IOException e) {
2465
        boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
2485
        boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
2466
        if (skipErrors) {
2486
        if (skipErrors) {
[+20] [20] 34 lines
[+20] [+] private long replayRecoveredEdits(final Path edits,
2501
    throws IOException {
2521
    throws IOException {
2502
    String msg = "Replaying edits from " + edits + "; minSequenceid=" +
2522
    String msg = "Replaying edits from " + edits + "; minSequenceid=" +
2503
      minSeqId + "; path=" + edits;
2523
      minSeqId + "; path=" + edits;
2504
    LOG.info(msg);
2524
    LOG.info(msg);
2505
    MonitoredTask status = TaskMonitor.get().createStatus(msg);
2525
    MonitoredTask status = TaskMonitor.get().createStatus(msg);
2506
    
2526

   
2507
    status.setStatus("Opening logs");
2527
    status.setStatus("Opening logs");
2508
    HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
2528
    HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
2509
    try {
2529
    try {
2510
    long currentEditSeqId = minSeqId;
2530
      long currentEditSeqId = minSeqId;
2511
    long firstSeqIdInLog = -1;
2531
      long firstSeqIdInLog = -1;
[+20] [20] 234 lines
[+20] [+] private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
2746
    checkRow(row, "row lock");
2766
    checkRow(row, "row lock");
2747
    startRegionOperation();
2767
    startRegionOperation();
2748
    try {
2768
    try {
2749
      HashedBytes rowKey = new HashedBytes(row);
2769
      HashedBytes rowKey = new HashedBytes(row);
2750
      CountDownLatch rowLatch = new CountDownLatch(1);
2770
      CountDownLatch rowLatch = new CountDownLatch(1);
2751
      
2771

   
2752
      // loop until we acquire the row lock (unless !waitForLock)
2772
      // loop until we acquire the row lock (unless !waitForLock)
2753
      while (true) {
2773
      while (true) {
2754
        CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
2774
        CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
2755
        if (existingLatch == null) {
2775
        if (existingLatch == null) {
2756
          break;
2776
          break;
[+20] [20] 10 lines
[+20] private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
2767
          } catch (InterruptedException ie) {
2787
          } catch (InterruptedException ie) {
2768
            // Empty
2788
            // Empty
2769
          }
2789
          }
2770
        }
2790
        }
2771
      }
2791
      }
2772
       
2792

   
2773
      // loop until we generate an unused lock id
2793
      // loop until we generate an unused lock id
2774
      while (true) {
2794
      while (true) {
2775
        Integer lockId = lockIdGenerator.incrementAndGet();
2795
        Integer lockId = lockIdGenerator.incrementAndGet();
2776
        HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
2796
        HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
2777
        if (existingRowKey == null) {
2797
        if (existingRowKey == null) {
[+20] [20] 15 lines
[+20] private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
2793
   */
2813
   */
2794
  byte[] getRowFromLock(final Integer lockid) {
2814
  byte[] getRowFromLock(final Integer lockid) {
2795
    HashedBytes rowKey = lockIds.get(lockid);
2815
    HashedBytes rowKey = lockIds.get(lockid);
2796
    return rowKey == null ? null : rowKey.getBytes();
2816
    return rowKey == null ? null : rowKey.getBytes();
2797
  }
2817
  }
2798
  
2818

   
2799
  /**
2819
  /**
2800
   * Release the row lock!
2820
   * Release the row lock!
2801
   * @param lockId  The lock ID to release.
2821
   * @param lockId  The lock ID to release.
2802
   */
2822
   */
2803
  public void releaseRowLock(final Integer lockId) {
2823
  public void releaseRowLock(final Integer lockId) {
[+20] [20] 40 lines
[+20] [+] private Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
2844
      }
2864
      }
2845
      lid = lockid;
2865
      lid = lockid;
2846
    }
2866
    }
2847
    return lid;
2867
    return lid;
2848
  }
2868
  }
2849
    
2869

   
2850
  /**
2870
  /**
2851
   * Determines whether multiple column families are present
2871
   * Determines whether multiple column families are present
2852
   * Precondition: familyPaths is not null
2872
   * Precondition: familyPaths is not null
2853
   *
2873
   *
2854
   * @param familyPaths List of Pair<byte[] column family, String hfilePath>
2874
   * @param familyPaths List of Pair<byte[] column family, String hfilePath>
[+20] [20] 84 lines
[+20] [+] public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
2939
        String path = p.getSecond();
2959
        String path = p.getSecond();
2940
        Store store = getStore(familyName);
2960
        Store store = getStore(familyName);
2941
        try {
2961
        try {
2942
          store.bulkLoadHFile(path);
2962
          store.bulkLoadHFile(path);
2943
        } catch (IOException ioe) {
2963
        } catch (IOException ioe) {
2944
          // a failure here causes an atomicity violation that we currently 
2964
          // a failure here causes an atomicity violation that we currently
2945
          // cannot recover from since it is likely a failed hdfs operation.
2965
          // cannot recover from since it is likely a failed hdfs operation.
2946

    
   
2966

   
2947
          // TODO Need a better story for reverting partial failures due to HDFS.
2967
          // TODO Need a better story for reverting partial failures due to HDFS.
2948
          LOG.error("There was a partial failure due to IO when attempting to" +
2968
          LOG.error("There was a partial failure due to IO when attempting to" +
2949
              " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond());
2969
              " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond());
[+20] [20] 298 lines
[+20] [+] public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3248
    return createHRegion(info, rootDir, conf, hTableDescriptor, null);
3268
    return createHRegion(info, rootDir, conf, hTableDescriptor, null);
3249
  }
3269
  }
3250

    
   
3270

   
3251
  /**
3271
  /**
3252
   * Convenience method creating new HRegions. Used by createTable.
3272
   * Convenience method creating new HRegions. Used by createTable.
3253
   * The {@link HLog} for the created region needs to be closed explicitly. 
3273
   * The {@link HLog} for the created region needs to be closed explicitly.
3254
   * Use {@link HRegion#getLog()} to get access.
3274
   * Use {@link HRegion#getLog()} to get access.
3255
   * 
3275
   *
3256
   * @param info Info for region to create.
3276
   * @param info Info for region to create.
3257
   * @param rootDir Root directory for HBase instance
3277
   * @param rootDir Root directory for HBase instance
3258
   * @param conf
3278
   * @param conf
3259
   * @param hTableDescriptor
3279
   * @param hTableDescriptor
3260
   * @param hlog shared HLog
3280
   * @param hlog shared HLog
[+20] [20] 16 lines
[+20] [+] public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
3277
    FileSystem fs = FileSystem.get(conf);
3297
    FileSystem fs = FileSystem.get(conf);
3278
    fs.mkdirs(regionDir);
3298
    fs.mkdirs(regionDir);
3279
    HLog effectiveHLog = hlog;
3299
    HLog effectiveHLog = hlog;
3280
    if (hlog == null) {
3300
    if (hlog == null) {
3281
      effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
3301
      effectiveHLog = new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
3282
          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);     
3302
          new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf);
3283
    }
3303
    }
3284
    HRegion region = HRegion.newHRegion(tableDir,
3304
    HRegion region = HRegion.newHRegion(tableDir,
3285
        effectiveHLog, fs, conf, info, hTableDescriptor, null);
3305
        effectiveHLog, fs, conf, info, hTableDescriptor, null);
3286
    region.initialize();
3306
    region.initialize();
3287
    return region;
3307
    return region;
3288
  }
3308
  }
3289
  
3309

   
3290
  /**
3310
  /**
3291
   * Open a Region.
3311
   * Open a Region.
3292
   * @param info Info for region to be opened.
3312
   * @param info Info for region to be opened.
3293
   * @param wal HLog for region to use. This method will call
3313
   * @param wal HLog for region to use. This method will call
3294
   * HLog#setSequenceNumber(long) passing the result of the call to
3314
   * HLog#setSequenceNumber(long) passing the result of the call to
[+20] [20] 573 lines
[+20] [+] public Result get(final Get get, final Integer lockid) throws IOException {
3868
  }
3888
  }
3869

    
   
3889

   
3870
  // TODO: There's a lot of boiler plate code identical
3890
  // TODO: There's a lot of boiler plate code identical
3871
  // to increment... See how to better unify that.
3891
  // to increment... See how to better unify that.
3872
  /**
3892
  /**
3873
   * 
3893
   *
3874
   * Perform one or more append operations on a row.
3894
   * Perform one or more append operations on a row.
3875
   * <p>
3895
   * <p>
3876
   * Appends performed are done under row lock but reads do not take locks out
3896
   * Appends performed are done under row lock but reads do not take locks out
3877
   * so this can be seen partially complete by gets and scans.
3897
   * so this can be seen partially complete by gets and scans.
3878
   * 
3898
   *
3879
   * @param append
3899
   * @param append
3880
   * @param lockid
3900
   * @param lockid
3881
   * @param writeToWAL
3901
   * @param writeToWAL
3882
   * @return new keyvalues after increment
3902
   * @return new keyvalues after increment
3883
   * @throws IOException
3903
   * @throws IOException
[+20] [20] 351 lines
[+20] [+] private void checkFamily(final byte [] family)
4235
      Bytes.SIZEOF_BOOLEAN);
4255
      Bytes.SIZEOF_BOOLEAN);
4236

    
   
4256

   
4237
  public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
4257
  public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
4238
      ClassSize.OBJECT + // closeLock
4258
      ClassSize.OBJECT + // closeLock
4239
      (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
4259
      (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
4240
      ClassSize.ATOMIC_LONG + // memStoreSize 
4260
      ClassSize.ATOMIC_LONG + // memStoreSize
4241
      ClassSize.ATOMIC_INTEGER + // lockIdGenerator
4261
      ClassSize.ATOMIC_INTEGER + // lockIdGenerator
4242
      (3 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds, scannerReadPoints
4262
      (3 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds, scannerReadPoints
4243
      WriteState.HEAP_SIZE + // writestate
4263
      WriteState.HEAP_SIZE + // writestate
4244
      ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
4264
      ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
4245
      (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
4265
      (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
[+20] [20] 214 lines
[+20] [+] public byte[] checkSplit() {
4460
    }
4480
    }
4461

    
   
4481

   
4462
    if (this.explicitSplitPoint != null) {
4482
    if (this.explicitSplitPoint != null) {
4463
      return this.explicitSplitPoint;
4483
      return this.explicitSplitPoint;
4464
    }
4484
    }
4465
    
4485

   
4466
    if (!splitPolicy.shouldSplit()) {
4486
    if (!splitPolicy.shouldSplit()) {
4467
      return null;
4487
      return null;
4468
    }
4488
    }
4469
    
4489

   
4470
    byte[] ret = splitPolicy.getSplitPoint();
4490
    byte[] ret = splitPolicy.getSplitPoint();
4471
    
4491

   
4472
    if (ret != null) {
4492
    if (ret != null) {
4473
      try {
4493
      try {
4474
        checkRow(ret, "calculated split");
4494
        checkRow(ret, "calculated split");
4475
      } catch (IOException e) {
4495
      } catch (IOException e) {
4476
        LOG.error("Ignoring invalid split", e);
4496
        LOG.error("Ignoring invalid split", e);
4477
        return null;
4497
        return null;
4478
      }
4498
      }
4479
    }        
4499
    }
4480
    return ret;
4500
    return ret;
4481
  }
4501
  }
4482

    
   
4502

   
4483
  /**
4503
  /**
4484
   * @return The priority that this region should have in the compaction queue
4504
   * @return The priority that this region should have in the compaction queue
[+20] [20] 160 lines
src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Revision 5daa02b New Change
 
  1. src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java: Loading...
  2. src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java: Loading...