Review Board 1.7.22


HBASE-4241: Optimize flushing of the Memstore.

Review Request #1650 - Created Aug. 25, 2011 and submitted

Lars Hofhansl
trunk
HBASE-4241
Reviewers
hbase
jgray, stack, tedyu
hbase
This avoids flushing row versions to disk that are known to be GC'd by the next compaction anyway.
This covers two scenarios:
1. maxVersions=N and we find at least N versions in the memstore. We can safely avoid flushing any further versions to disk.
2. similarly minVersions=N and we find at least N versions in the memstore. Now we can safely avoid flushing any further *expired* versions to disk.

This changes the Store flush to use the same mechanism that used for compactions.
I borrowed some code from the tests and refactored the test code to use a new utility class that wraps a sorted collection and then behaves like KeyValueScanner. The same class is used to create scanner over the memstore's snapshot.
Ran all tests. TestHTablePool and TestDistributedLogSplitting error out (with or without my change).
I had to change three tests that incorrectly relied on old rows hanging around after a flush (or were otherwise incorrect).

No new test, as this should cause no functional change.
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Revision 1161347 New Change
[20] 45 lines
[+20]
46
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
46
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
47
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
47
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
48
import org.apache.hadoop.hbase.util.Bytes;
48
import org.apache.hadoop.hbase.util.Bytes;
49
import org.apache.hadoop.hbase.util.ClassSize;
49
import org.apache.hadoop.hbase.util.ClassSize;
50
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

    
   
51
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
51
import org.apache.hadoop.util.StringUtils;
52
import org.apache.hadoop.util.StringUtils;
52

    
   
53

   
53
import com.google.common.base.Preconditions;
54
import com.google.common.base.Preconditions;
54
import com.google.common.collect.ImmutableList;
55
import com.google.common.collect.ImmutableList;
55
import com.google.common.collect.Iterables;
56
import com.google.common.collect.Iterables;
[+20] [20] 32 lines
[+20] [+] public class Store implements HeapSize {
88
  final FileSystem fs;
89
  final FileSystem fs;
89
  final Configuration conf;
90
  final Configuration conf;
90
  // ttl in milliseconds.
91
  // ttl in milliseconds.
91
  protected long ttl;
92
  protected long ttl;
92
  protected int minVersions;
93
  protected int minVersions;

    
   
94
  protected int maxVersions;
93
  long majorCompactionTime;
95
  long majorCompactionTime;
94
  private final int minFilesToCompact;
96
  private final int minFilesToCompact;
95
  private final int maxFilesToCompact;
97
  private final int maxFilesToCompact;
96
  private final long minCompactSize;
98
  private final long minCompactSize;
97
  private final long maxCompactSize;
99
  private final long maxCompactSize;
[+20] [20] 78 lines
[+20] public class Store implements HeapSize {
176
    } else {
178
    } else {
177
      // second -> ms adjust for user data
179
      // second -> ms adjust for user data
178
      this.ttl *= 1000;
180
      this.ttl *= 1000;
179
    }
181
    }
180
    this.minVersions = family.getMinVersions();
182
    this.minVersions = family.getMinVersions();

    
   
183
    this.maxVersions = family.getMaxVersions();
181
    this.memstore = new MemStore(conf, this.comparator);
184
    this.memstore = new MemStore(conf, this.comparator);
182
    this.storeNameStr = Bytes.toString(this.family.getName());
185
    this.storeNameStr = Bytes.toString(this.family.getName());
183

    
   
186

   
184
    // By default, compact if storefile.count >= minFilesToCompact
187
    // By default, compact if storefile.count >= minFilesToCompact
185
    this.minFilesToCompact = Math.max(2,
188
    this.minFilesToCompact = Math.max(2,
[+20] [20] 293 lines
[+20] [+] private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
479
    long flushed = 0;
482
    long flushed = 0;
480
    // Don't flush if there are no entries.
483
    // Don't flush if there are no entries.
481
    if (set.size() == 0) {
484
    if (set.size() == 0) {
482
      return null;
485
      return null;
483
    }
486
    }
484
    long oldestTimestamp = System.currentTimeMillis() - ttl;
487
    Scan scan = new Scan();

    
   
488
    scan.setMaxVersions(maxVersions);

    
   
489
    // Use a store scanner to find which rows to flush.

    
   
490
    // Note that we need to retain deletes, hence

    
   
491
    // pass true as the StoreScanner's retainDeletesInOutput argument.

    
   
492
    InternalScanner scanner = new StoreScanner(this, scan,

    
   
493
        Collections.singletonList(new CollectionBackedScanner(set,

    
   
494
            this.comparator)), true);
485
    // TODO:  We can fail in the below block before we complete adding this
495
    // TODO:  We can fail in the below block before we complete adding this
486
    // flush to list of store files.  Add cleanup of anything put on filesystem
496
    // flush to list of store files.  Add cleanup of anything put on filesystem
487
    // if we fail.
497
    // if we fail.
488
    synchronized (flushLock) {
498
    synchronized (flushLock) {
489
      status.setStatus("Flushing " + this + ": creating writer");
499
      status.setStatus("Flushing " + this + ": creating writer");
490
      // A. Write the map out to the disk
500
      // A. Write the map out to the disk
491
      writer = createWriterInTmp(set.size());
501
      writer = createWriterInTmp(set.size());
492
      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
502
      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
493
      try {
503
      try {
494
        for (KeyValue kv: set) {
504
        List<KeyValue> kvs = new ArrayList<KeyValue>();
495
          // If minVersion > 0 we will wait until the next compaction to
505
        while (scanner.next(kvs)) {
496
          // collect expired KVs. (following the logic for maxVersions).
506
          if (!kvs.isEmpty()) {
497
          // TODO: As Jonathan Gray points this can be optimized
507
            for (KeyValue kv : kvs) {
498
          // (see HBASE-4241)

   
499
          if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {

   
500
            writer.append(kv);
508
              writer.append(kv);
501
            flushed += this.memstore.heapSizeChange(kv, true);
509
              flushed += this.memstore.heapSizeChange(kv, true);
502
          }
510
            }

    
   
511
            kvs.clear();

    
   
512
          }
503
        }
513
        }

    
   
514
        scanner.close();
504
      } finally {
515
      } finally {
505
        // Write out the log sequence number that corresponds to this output
516
        // Write out the log sequence number that corresponds to this output
506
        // hfile.  The hfile is current up to and including logCacheFlushId.
517
        // hfile.  The hfile is current up to and including logCacheFlushId.
507
        status.setStatus("Flushing " + this + ": appending metadata");
518
        status.setStatus("Flushing " + this + ": appending metadata");
508
        writer.appendMetadata(logCacheFlushId, false);
519
        writer.appendMetadata(logCacheFlushId, false);
[+20] [20] 1223 lines
[+20] [+] public boolean needsCompaction() {
1732
  }
1743
  }
1733

    
   
1744

   
1734
  public static final long FIXED_OVERHEAD = ClassSize.align(
1745
  public static final long FIXED_OVERHEAD = ClassSize.align(
1735
      ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
1746
      ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
1736
      (8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
1747
      (8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
1737
      (5 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
1748
      (6 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
1738

    
   
1749

   
1739
  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1750
  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1740
      ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
1751
      ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
1741
      ClassSize.CONCURRENT_SKIPLISTMAP +
1752
      ClassSize.CONCURRENT_SKIPLISTMAP +
1742
      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
1753
      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
1743

    
   
1754

   
1744
  @Override
1755
  @Override
1745
  public long heapSize() {
1756
  public long heapSize() {
1746
    return DEEP_OVERHEAD + this.memstore.heapSize();
1757
    return DEEP_OVERHEAD + this.memstore.heapSize();
1747
  }
1758
  }
1748
}
1759
}
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
New File
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Revision 1161347 New Change
 
  1. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java: Loading...
  2. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java: Loading...
  3. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java: Loading...
  4. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java: Loading...
  5. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java: Loading...
  6. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java: Loading...
  7. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java: Loading...