Review Board 1.7.22


HBASE-4241: Optimize flushing of the Memstore.

Review Request #1650 - Created Aug. 25, 2011 and submitted

Lars Hofhansl
trunk
HBASE-4241
Reviewers
hbase
jgray, stack, tedyu
hbase
This avoids flushing row versions to disk that are known to be GC'd by the next compaction anyway.
This covers two scenarios:
1. maxVersions=N and we find at least N versions in the memstore. We can safely avoid flushing any further versions to disk.
2. similarly minVersions=N and we find at least N versions in the memstore. Now we can safely avoid flushing any further *expired* versions to disk.

This changes the Store flush to use the same mechanism that used for compactions.
I borrowed some code from the tests and refactored the test code to use a new utility class that wraps a sorted collection and then behaves like KeyValueScanner. The same class is used to create scanner over the memstore's snapshot.
Ran all tests. TestHTablePool and TestDistributedLogSplitting error out (with or without my change).
I had to change three tests that incorrectly relied on old rows hanging around after a flush (or were otherwise incorrect).

No new test, as this should cause no functional change.
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Revision 1161347 New Change
1
/**
1
/**
2
 * Copyright 2010 The Apache Software Foundation
2
 * Copyright 2010 The Apache Software Foundation
3
 *
3
 *
4
 * Licensed to the Apache Software Foundation (ASF) under one
4
 * Licensed to the Apache Software Foundation (ASF) under one
5
 * or more contributor license agreements.  See the NOTICE file
5
 * or more contributor license agreements.  See the NOTICE file
6
 * distributed with this work for additional information
6
 * distributed with this work for additional information
7
 * regarding copyright ownership.  The ASF licenses this file
7
 * regarding copyright ownership.  The ASF licenses this file
8
 * to you under the Apache License, Version 2.0 (the
8
 * to you under the Apache License, Version 2.0 (the
9
 * "License"); you may not use this file except in compliance
9
 * "License"); you may not use this file except in compliance
10
 * with the License.  You may obtain a copy of the License at
10
 * with the License.  You may obtain a copy of the License at
11
 *
11
 *
12
 *     http://www.apache.org/licenses/LICENSE-2.0
12
 *     http://www.apache.org/licenses/LICENSE-2.0
13
 *
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
18
 * limitations under the License.
19
 */
19
 */
20
package org.apache.hadoop.hbase.regionserver;
20
package org.apache.hadoop.hbase.regionserver;
21

    
   
21

   
22
import java.io.IOException;
22
import java.io.IOException;
23
import java.io.InterruptedIOException;
23
import java.io.InterruptedIOException;
24
import java.util.ArrayList;
24
import java.util.ArrayList;
25
import java.util.Collection;
25
import java.util.Collection;
26
import java.util.Collections;
26
import java.util.Collections;
27
import java.util.List;
27
import java.util.List;
28
import java.util.NavigableSet;
28
import java.util.NavigableSet;
29
import java.util.SortedSet;
29
import java.util.SortedSet;
30
import java.util.concurrent.CopyOnWriteArraySet;
30
import java.util.concurrent.CopyOnWriteArraySet;
31
import java.util.concurrent.locks.ReentrantReadWriteLock;
31
import java.util.concurrent.locks.ReentrantReadWriteLock;
32

    
   
32

   
33
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
34
import org.apache.commons.logging.LogFactory;
35
import org.apache.hadoop.conf.Configuration;
35
import org.apache.hadoop.conf.Configuration;
36
import org.apache.hadoop.fs.FileStatus;
36
import org.apache.hadoop.fs.FileStatus;
37
import org.apache.hadoop.fs.FileSystem;
37
import org.apache.hadoop.fs.FileSystem;
38
import org.apache.hadoop.fs.FileUtil;
38
import org.apache.hadoop.fs.FileUtil;
39
import org.apache.hadoop.fs.Path;
39
import org.apache.hadoop.fs.Path;
40
import org.apache.hadoop.hbase.*;
40
import org.apache.hadoop.hbase.*;
41
import org.apache.hadoop.hbase.client.Scan;
41
import org.apache.hadoop.hbase.client.Scan;
42
import org.apache.hadoop.hbase.io.HeapSize;
42
import org.apache.hadoop.hbase.io.HeapSize;
43
import org.apache.hadoop.hbase.io.hfile.Compression;
43
import org.apache.hadoop.hbase.io.hfile.Compression;
44
import org.apache.hadoop.hbase.io.hfile.HFile;
44
import org.apache.hadoop.hbase.io.hfile.HFile;
45
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
45
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
46
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
46
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
47
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
47
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
48
import org.apache.hadoop.hbase.util.Bytes;
48
import org.apache.hadoop.hbase.util.Bytes;
49
import org.apache.hadoop.hbase.util.ClassSize;
49
import org.apache.hadoop.hbase.util.ClassSize;
50
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

    
   
51
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
51
import org.apache.hadoop.util.StringUtils;
52
import org.apache.hadoop.util.StringUtils;
52

    
   
53

   
53
import com.google.common.base.Preconditions;
54
import com.google.common.base.Preconditions;
54
import com.google.common.collect.ImmutableList;
55
import com.google.common.collect.ImmutableList;
55
import com.google.common.collect.Iterables;
56
import com.google.common.collect.Iterables;
56
import com.google.common.collect.Lists;
57
import com.google.common.collect.Lists;
57

    
   
58

   
58
/**
59
/**
59
 * A Store holds a column family in a Region.  Its a memstore and a set of zero
60
 * A Store holds a column family in a Region.  Its a memstore and a set of zero
60
 * or more StoreFiles, which stretch backwards over time.
61
 * or more StoreFiles, which stretch backwards over time.
61
 *
62
 *
62
 * <p>There's no reason to consider append-logging at this level; all logging
63
 * <p>There's no reason to consider append-logging at this level; all logging
63
 * and locking is handled at the HRegion level.  Store just provides
64
 * and locking is handled at the HRegion level.  Store just provides
64
 * services to manage sets of StoreFiles.  One of the most important of those
65
 * services to manage sets of StoreFiles.  One of the most important of those
65
 * services is compaction services where files are aggregated once they pass
66
 * services is compaction services where files are aggregated once they pass
66
 * a configurable threshold.
67
 * a configurable threshold.
67
 *
68
 *
68
 * <p>The only thing having to do with logs that Store needs to deal with is
69
 * <p>The only thing having to do with logs that Store needs to deal with is
69
 * the reconstructionLog.  This is a segment of an HRegion's log that might
70
 * the reconstructionLog.  This is a segment of an HRegion's log that might
70
 * NOT be present upon startup.  If the param is NULL, there's nothing to do.
71
 * NOT be present upon startup.  If the param is NULL, there's nothing to do.
71
 * If the param is non-NULL, we need to process the log to reconstruct
72
 * If the param is non-NULL, we need to process the log to reconstruct
72
 * a TreeMap that might not have been written to disk before the process
73
 * a TreeMap that might not have been written to disk before the process
73
 * died.
74
 * died.
74
 *
75
 *
75
 * <p>It's assumed that after this constructor returns, the reconstructionLog
76
 * <p>It's assumed that after this constructor returns, the reconstructionLog
76
 * file will be deleted (by whoever has instantiated the Store).
77
 * file will be deleted (by whoever has instantiated the Store).
77
 *
78
 *
78
 * <p>Locking and transactions are handled at a higher level.  This API should
79
 * <p>Locking and transactions are handled at a higher level.  This API should
79
 * not be called directly but by an HRegion manager.
80
 * not be called directly but by an HRegion manager.
80
 */
81
 */
81
public class Store implements HeapSize {
82
public class Store implements HeapSize {
82
  static final Log LOG = LogFactory.getLog(Store.class);
83
  static final Log LOG = LogFactory.getLog(Store.class);
83
  protected final MemStore memstore;
84
  protected final MemStore memstore;
84
  // This stores directory in the filesystem.
85
  // This stores directory in the filesystem.
85
  private final Path homedir;
86
  private final Path homedir;
86
  private final HRegion region;
87
  private final HRegion region;
87
  private final HColumnDescriptor family;
88
  private final HColumnDescriptor family;
88
  final FileSystem fs;
89
  final FileSystem fs;
89
  final Configuration conf;
90
  final Configuration conf;
90
  // ttl in milliseconds.
91
  // ttl in milliseconds.
91
  protected long ttl;
92
  protected long ttl;
92
  protected int minVersions;
93
  protected int minVersions;

    
   
94
  protected int maxVersions;
93
  long majorCompactionTime;
95
  long majorCompactionTime;
94
  private final int minFilesToCompact;
96
  private final int minFilesToCompact;
95
  private final int maxFilesToCompact;
97
  private final int maxFilesToCompact;
96
  private final long minCompactSize;
98
  private final long minCompactSize;
97
  private final long maxCompactSize;
99
  private final long maxCompactSize;
98
  // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
100
  // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
99
  // With float, java will downcast your long to float for comparisons (bad)
101
  // With float, java will downcast your long to float for comparisons (bad)
100
  private double compactRatio;
102
  private double compactRatio;
101
  private long lastCompactSize = 0;
103
  private long lastCompactSize = 0;
102
  volatile boolean forceMajor = false;
104
  volatile boolean forceMajor = false;
103
  /* how many bytes to write between status checks */
105
  /* how many bytes to write between status checks */
104
  static int closeCheckInterval = 0;
106
  static int closeCheckInterval = 0;
105
  private final long desiredMaxFileSize;
107
  private final long desiredMaxFileSize;
106
  private final int blockingStoreFileCount;
108
  private final int blockingStoreFileCount;
107
  private volatile long storeSize = 0L;
109
  private volatile long storeSize = 0L;
108
  private volatile long totalUncompressedBytes = 0L;
110
  private volatile long totalUncompressedBytes = 0L;
109
  private final Object flushLock = new Object();
111
  private final Object flushLock = new Object();
110
  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
112
  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
111
  private final String storeNameStr;
113
  private final String storeNameStr;
112
  private final boolean inMemory;
114
  private final boolean inMemory;
113

    
   
115

   
114
  /*
116
  /*
115
   * List of store files inside this store. This is an immutable list that
117
   * List of store files inside this store. This is an immutable list that
116
   * is atomically replaced when its contents change.
118
   * is atomically replaced when its contents change.
117
   */
119
   */
118
  private ImmutableList<StoreFile> storefiles = null;
120
  private ImmutableList<StoreFile> storefiles = null;
119

    
   
121

   
120
  List<StoreFile> filesCompacting = Lists.newArrayList();
122
  List<StoreFile> filesCompacting = Lists.newArrayList();
121

    
   
123

   
122
  // All access must be synchronized.
124
  // All access must be synchronized.
123
  private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
125
  private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
124
    new CopyOnWriteArraySet<ChangedReadersObserver>();
126
    new CopyOnWriteArraySet<ChangedReadersObserver>();
125

    
   
127

   
126
  private final int blocksize;
128
  private final int blocksize;
127
  private final boolean blockcache;
129
  private final boolean blockcache;
128
  /** Compression algorithm for flush files and minor compaction */
130
  /** Compression algorithm for flush files and minor compaction */
129
  private final Compression.Algorithm compression;
131
  private final Compression.Algorithm compression;
130
  /** Compression algorithm for major compaction */
132
  /** Compression algorithm for major compaction */
131
  private final Compression.Algorithm compactionCompression;
133
  private final Compression.Algorithm compactionCompression;
132

    
   
134

   
133
  // Comparing KeyValues
135
  // Comparing KeyValues
134
  final KeyValue.KVComparator comparator;
136
  final KeyValue.KVComparator comparator;
135

    
   
137

   
136
  /**
138
  /**
137
   * Constructor
139
   * Constructor
138
   * @param basedir qualified path under which the region directory lives;
140
   * @param basedir qualified path under which the region directory lives;
139
   * generally the table subdirectory
141
   * generally the table subdirectory
140
   * @param region
142
   * @param region
141
   * @param family HColumnDescriptor for this column
143
   * @param family HColumnDescriptor for this column
142
   * @param fs file system object
144
   * @param fs file system object
143
   * @param conf configuration object
145
   * @param conf configuration object
144
   * failed.  Can be null.
146
   * failed.  Can be null.
145
   * @throws IOException
147
   * @throws IOException
146
   */
148
   */
147
  protected Store(Path basedir, HRegion region, HColumnDescriptor family,
149
  protected Store(Path basedir, HRegion region, HColumnDescriptor family,
148
    FileSystem fs, Configuration conf)
150
    FileSystem fs, Configuration conf)
149
  throws IOException {
151
  throws IOException {
150
    HRegionInfo info = region.regionInfo;
152
    HRegionInfo info = region.regionInfo;
151
    this.fs = fs;
153
    this.fs = fs;
152
    this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
154
    this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
153
    if (!this.fs.exists(this.homedir)) {
155
    if (!this.fs.exists(this.homedir)) {
154
      if (!this.fs.mkdirs(this.homedir))
156
      if (!this.fs.mkdirs(this.homedir))
155
        throw new IOException("Failed create of: " + this.homedir.toString());
157
        throw new IOException("Failed create of: " + this.homedir.toString());
156
    }
158
    }
157
    this.region = region;
159
    this.region = region;
158
    this.family = family;
160
    this.family = family;
159
    this.conf = conf;
161
    this.conf = conf;
160
    this.blockcache = family.isBlockCacheEnabled();
162
    this.blockcache = family.isBlockCacheEnabled();
161
    this.blocksize = family.getBlocksize();
163
    this.blocksize = family.getBlocksize();
162
    this.compression = family.getCompression();
164
    this.compression = family.getCompression();
163
    // avoid overriding compression setting for major compactions if the user
165
    // avoid overriding compression setting for major compactions if the user
164
    // has not specified it separately
166
    // has not specified it separately
165
    this.compactionCompression =
167
    this.compactionCompression =
166
      (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
168
      (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
167
        family.getCompactionCompression() : this.compression;
169
        family.getCompactionCompression() : this.compression;
168
    this.comparator = info.getComparator();
170
    this.comparator = info.getComparator();
169
    // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
171
    // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
170
    this.ttl = family.getTimeToLive();
172
    this.ttl = family.getTimeToLive();
171
    if (ttl == HConstants.FOREVER) {
173
    if (ttl == HConstants.FOREVER) {
172
      // default is unlimited ttl.
174
      // default is unlimited ttl.
173
      ttl = Long.MAX_VALUE;
175
      ttl = Long.MAX_VALUE;
174
    } else if (ttl == -1) {
176
    } else if (ttl == -1) {
175
      ttl = Long.MAX_VALUE;
177
      ttl = Long.MAX_VALUE;
176
    } else {
178
    } else {
177
      // second -> ms adjust for user data
179
      // second -> ms adjust for user data
178
      this.ttl *= 1000;
180
      this.ttl *= 1000;
179
    }
181
    }
180
    this.minVersions = family.getMinVersions();
182
    this.minVersions = family.getMinVersions();

    
   
183
    this.maxVersions = family.getMaxVersions();
181
    this.memstore = new MemStore(conf, this.comparator);
184
    this.memstore = new MemStore(conf, this.comparator);
182
    this.storeNameStr = Bytes.toString(this.family.getName());
185
    this.storeNameStr = Bytes.toString(this.family.getName());
183

    
   
186

   
184
    // By default, compact if storefile.count >= minFilesToCompact
187
    // By default, compact if storefile.count >= minFilesToCompact
185
    this.minFilesToCompact = Math.max(2,
188
    this.minFilesToCompact = Math.max(2,
186
      conf.getInt("hbase.hstore.compaction.min",
189
      conf.getInt("hbase.hstore.compaction.min",
187
        /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
190
        /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
188

    
   
191

   
189
    // Check if this is in-memory store
192
    // Check if this is in-memory store
190
    this.inMemory = family.isInMemory();
193
    this.inMemory = family.isInMemory();
191
    long maxFileSize = 0L;
194
    long maxFileSize = 0L;
192
    HTableDescriptor hTableDescriptor = region.getTableDesc();
195
    HTableDescriptor hTableDescriptor = region.getTableDesc();
193
    if (hTableDescriptor != null) {
196
    if (hTableDescriptor != null) {
194
      maxFileSize = hTableDescriptor.getMaxFileSize();
197
      maxFileSize = hTableDescriptor.getMaxFileSize();
195
    } else {
198
    } else {
196
      maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
199
      maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE;
197
    }
200
    }
198

    
   
201

   
199
    // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE.
202
    // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE.
200
    if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
203
    if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
201
      maxFileSize = conf.getLong("hbase.hregion.max.filesize",
204
      maxFileSize = conf.getLong("hbase.hregion.max.filesize",
202
        HConstants.DEFAULT_MAX_FILE_SIZE);
205
        HConstants.DEFAULT_MAX_FILE_SIZE);
203
    }
206
    }
204
    this.desiredMaxFileSize = maxFileSize;
207
    this.desiredMaxFileSize = maxFileSize;
205
    this.blockingStoreFileCount =
208
    this.blockingStoreFileCount =
206
      conf.getInt("hbase.hstore.blockingStoreFiles", 7);
209
      conf.getInt("hbase.hstore.blockingStoreFiles", 7);
207

    
   
210

   
208
    this.majorCompactionTime = getNextMajorCompactTime();
211
    this.majorCompactionTime = getNextMajorCompactTime();
209

    
   
212

   
210
    this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
213
    this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
211
    this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
214
    this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
212
      this.region.memstoreFlushSize);
215
      this.region.memstoreFlushSize);
213
    this.maxCompactSize
216
    this.maxCompactSize
214
      = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
217
      = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
215
    this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
218
    this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
216

    
   
219

   
217
    if (Store.closeCheckInterval == 0) {
220
    if (Store.closeCheckInterval == 0) {
218
      Store.closeCheckInterval = conf.getInt(
221
      Store.closeCheckInterval = conf.getInt(
219
          "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
222
          "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
220
    }
223
    }
221
    this.storefiles = sortAndClone(loadStoreFiles());
224
    this.storefiles = sortAndClone(loadStoreFiles());
222
  }
225
  }
223

    
   
226

   
224
  public HColumnDescriptor getFamily() {
227
  public HColumnDescriptor getFamily() {
225
    return this.family;
228
    return this.family;
226
  }
229
  }
227

    
   
230

   
228
  /**
231
  /**
229
   * @return The maximum sequence id in all store files.
232
   * @return The maximum sequence id in all store files.
230
   */
233
   */
231
  long getMaxSequenceId() {
234
  long getMaxSequenceId() {
232
    return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
235
    return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
233
  }
236
  }
234

    
   
237

   
235
  /**
238
  /**
236
   * @param tabledir
239
   * @param tabledir
237
   * @param encodedName Encoded region name.
240
   * @param encodedName Encoded region name.
238
   * @param family
241
   * @param family
239
   * @return Path to family/Store home directory.
242
   * @return Path to family/Store home directory.
240
   */
243
   */
241
  public static Path getStoreHomedir(final Path tabledir,
244
  public static Path getStoreHomedir(final Path tabledir,
242
      final String encodedName, final byte [] family) {
245
      final String encodedName, final byte [] family) {
243
    return new Path(tabledir, new Path(encodedName,
246
    return new Path(tabledir, new Path(encodedName,
244
      new Path(Bytes.toString(family))));
247
      new Path(Bytes.toString(family))));
245
  }
248
  }
246

    
   
249

   
247
  /**
250
  /**
248
   * Return the directory in which this store stores its
251
   * Return the directory in which this store stores its
249
   * StoreFiles
252
   * StoreFiles
250
   */
253
   */
251
  public Path getHomedir() {
254
  public Path getHomedir() {
252
    return homedir;
255
    return homedir;
253
  }
256
  }
254

    
   
257

   
255
  /*
258
  /*
256
   * Creates an unsorted list of StoreFile loaded from the given directory.
259
   * Creates an unsorted list of StoreFile loaded from the given directory.
257
   * @throws IOException
260
   * @throws IOException
258
   */
261
   */
259
  private List<StoreFile> loadStoreFiles()
262
  private List<StoreFile> loadStoreFiles()
260
  throws IOException {
263
  throws IOException {
261
    ArrayList<StoreFile> results = new ArrayList<StoreFile>();
264
    ArrayList<StoreFile> results = new ArrayList<StoreFile>();
262
    FileStatus files[] = this.fs.listStatus(this.homedir);
265
    FileStatus files[] = this.fs.listStatus(this.homedir);
263
    for (int i = 0; files != null && i < files.length; i++) {
266
    for (int i = 0; files != null && i < files.length; i++) {
264
      // Skip directories.
267
      // Skip directories.
265
      if (files[i].isDir()) {
268
      if (files[i].isDir()) {
266
        continue;
269
        continue;
267
      }
270
      }
268
      Path p = files[i].getPath();
271
      Path p = files[i].getPath();
269
      // Check for empty file.  Should never be the case but can happen
272
      // Check for empty file.  Should never be the case but can happen
270
      // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
273
      // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
271
      if (this.fs.getFileStatus(p).getLen() <= 0) {
274
      if (this.fs.getFileStatus(p).getLen() <= 0) {
272
        LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
275
        LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
273
        continue;
276
        continue;
274
      }
277
      }
275
      StoreFile curfile = null;
278
      StoreFile curfile = null;
276
      try {
279
      try {
277
        curfile = new StoreFile(fs, p, blockcache, this.conf,
280
        curfile = new StoreFile(fs, p, blockcache, this.conf,
278
            this.family.getBloomFilterType(), this.inMemory);
281
            this.family.getBloomFilterType(), this.inMemory);
279
        curfile.createReader();
282
        curfile.createReader();
280
      } catch (IOException ioe) {
283
      } catch (IOException ioe) {
281
        LOG.warn("Failed open of " + p + "; presumption is that file was " +
284
        LOG.warn("Failed open of " + p + "; presumption is that file was " +
282
          "corrupted at flush and lost edits picked up by commit log replay. " +
285
          "corrupted at flush and lost edits picked up by commit log replay. " +
283
          "Verify!", ioe);
286
          "Verify!", ioe);
284
        continue;
287
        continue;
285
      }
288
      }
286
      long length = curfile.getReader().length();
289
      long length = curfile.getReader().length();
287
      this.storeSize += length;
290
      this.storeSize += length;
288
      this.totalUncompressedBytes += curfile.getReader().getTotalUncompressedBytes();
291
      this.totalUncompressedBytes += curfile.getReader().getTotalUncompressedBytes();
289
      if (LOG.isDebugEnabled()) {
292
      if (LOG.isDebugEnabled()) {
290
        LOG.debug("loaded " + curfile.toStringDetailed());
293
        LOG.debug("loaded " + curfile.toStringDetailed());
291
      }
294
      }
292
      results.add(curfile);
295
      results.add(curfile);
293
    }
296
    }
294
    return results;
297
    return results;
295
  }
298
  }
296

    
   
299

   
297
  /**
300
  /**
298
   * Adds a value to the memstore
301
   * Adds a value to the memstore
299
   *
302
   *
300
   * @param kv
303
   * @param kv
301
   * @return memstore size delta
304
   * @return memstore size delta
302
   */
305
   */
303
  protected long add(final KeyValue kv) {
306
  protected long add(final KeyValue kv) {
304
    lock.readLock().lock();
307
    lock.readLock().lock();
305
    try {
308
    try {
306
      return this.memstore.add(kv);
309
      return this.memstore.add(kv);
307
    } finally {
310
    } finally {
308
      lock.readLock().unlock();
311
      lock.readLock().unlock();
309
    }
312
    }
310
  }
313
  }
311

    
   
314

   
312
  /**
315
  /**
313
   * Adds a value to the memstore
316
   * Adds a value to the memstore
314
   *
317
   *
315
   * @param kv
318
   * @param kv
316
   * @return memstore size delta
319
   * @return memstore size delta
317
   */
320
   */
318
  protected long delete(final KeyValue kv) {
321
  protected long delete(final KeyValue kv) {
319
    lock.readLock().lock();
322
    lock.readLock().lock();
320
    try {
323
    try {
321
      return this.memstore.delete(kv);
324
      return this.memstore.delete(kv);
322
    } finally {
325
    } finally {
323
      lock.readLock().unlock();
326
      lock.readLock().unlock();
324
    }
327
    }
325
  }
328
  }
326

    
   
329

   
327
  /**
330
  /**
328
   * @return All store files.
331
   * @return All store files.
329
   */
332
   */
330
  List<StoreFile> getStorefiles() {
333
  List<StoreFile> getStorefiles() {
331
    return this.storefiles;
334
    return this.storefiles;
332
  }
335
  }
333

    
   
336

   
334
  public void bulkLoadHFile(String srcPathStr) throws IOException {
337
  public void bulkLoadHFile(String srcPathStr) throws IOException {
335
    Path srcPath = new Path(srcPathStr);
338
    Path srcPath = new Path(srcPathStr);
336

    
   
339

   
337
    HFile.Reader reader  = null;
340
    HFile.Reader reader  = null;
338
    try {
341
    try {
339
      LOG.info("Validating hfile at " + srcPath + " for inclusion in "
342
      LOG.info("Validating hfile at " + srcPath + " for inclusion in "
340
          + "store " + this + " region " + this.region);
343
          + "store " + this + " region " + this.region);
341
      reader = HFile.createReader(srcPath.getFileSystem(conf),
344
      reader = HFile.createReader(srcPath.getFileSystem(conf),
342
          srcPath, null, false, false);
345
          srcPath, null, false, false);
343
      reader.loadFileInfo();
346
      reader.loadFileInfo();
344

    
   
347

   
345
      byte[] firstKey = reader.getFirstRowKey();
348
      byte[] firstKey = reader.getFirstRowKey();
346
      byte[] lk = reader.getLastKey();
349
      byte[] lk = reader.getLastKey();
347
      byte[] lastKey =
350
      byte[] lastKey =
348
          (lk == null) ? null :
351
          (lk == null) ? null :
349
              KeyValue.createKeyValueFromKey(lk).getRow();
352
              KeyValue.createKeyValueFromKey(lk).getRow();
350

    
   
353

   
351
      LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
354
      LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
352
          " last=" + Bytes.toStringBinary(lastKey));
355
          " last=" + Bytes.toStringBinary(lastKey));
353
      LOG.debug("Region bounds: first=" +
356
      LOG.debug("Region bounds: first=" +
354
          Bytes.toStringBinary(region.getStartKey()) +
357
          Bytes.toStringBinary(region.getStartKey()) +
355
          " last=" + Bytes.toStringBinary(region.getEndKey()));
358
          " last=" + Bytes.toStringBinary(region.getEndKey()));
356

    
   
359

   
357
      HRegionInfo hri = region.getRegionInfo();
360
      HRegionInfo hri = region.getRegionInfo();
358
      if (!hri.containsRange(firstKey, lastKey)) {
361
      if (!hri.containsRange(firstKey, lastKey)) {
359
        throw new WrongRegionException(
362
        throw new WrongRegionException(
360
            "Bulk load file " + srcPathStr + " does not fit inside region "
363
            "Bulk load file " + srcPathStr + " does not fit inside region "
361
            + this.region);
364
            + this.region);
362
      }
365
      }
363
    } finally {
366
    } finally {
364
      if (reader != null) reader.close();
367
      if (reader != null) reader.close();
365
    }
368
    }
366

    
   
369

   
367
    // Move the file if it's on another filesystem
370
    // Move the file if it's on another filesystem
368
    FileSystem srcFs = srcPath.getFileSystem(conf);
371
    FileSystem srcFs = srcPath.getFileSystem(conf);
369
    if (!srcFs.equals(fs)) {
372
    if (!srcFs.equals(fs)) {
370
      LOG.info("File " + srcPath + " on different filesystem than " +
373
      LOG.info("File " + srcPath + " on different filesystem than " +
371
          "destination store - moving to this filesystem.");
374
          "destination store - moving to this filesystem.");
372
      Path tmpPath = getTmpPath();
375
      Path tmpPath = getTmpPath();
373
      FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
376
      FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
374
      LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
377
      LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
375
      srcPath = tmpPath;
378
      srcPath = tmpPath;
376
    }
379
    }
377

    
   
380

   
378
    Path dstPath = StoreFile.getRandomFilename(fs, homedir);
381
    Path dstPath = StoreFile.getRandomFilename(fs, homedir);
379
    LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
382
    LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
380
    StoreFile.rename(fs, srcPath, dstPath);
383
    StoreFile.rename(fs, srcPath, dstPath);
381

    
   
384

   
382
    StoreFile sf = new StoreFile(fs, dstPath, blockcache,
385
    StoreFile sf = new StoreFile(fs, dstPath, blockcache,
383
        this.conf, this.family.getBloomFilterType(), this.inMemory);
386
        this.conf, this.family.getBloomFilterType(), this.inMemory);
384
    sf.createReader();
387
    sf.createReader();
385

    
   
388

   
386
    LOG.info("Moved hfile " + srcPath + " into store directory " +
389
    LOG.info("Moved hfile " + srcPath + " into store directory " +
387
        homedir + " - updating store file list.");
390
        homedir + " - updating store file list.");
388

    
   
391

   
389
    // Append the new storefile into the list
392
    // Append the new storefile into the list
390
    this.lock.writeLock().lock();
393
    this.lock.writeLock().lock();
391
    try {
394
    try {
392
      ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
395
      ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
393
      newFiles.add(sf);
396
      newFiles.add(sf);
394
      this.storefiles = sortAndClone(newFiles);
397
      this.storefiles = sortAndClone(newFiles);
395
      notifyChangedReadersObservers();
398
      notifyChangedReadersObservers();
396
    } finally {
399
    } finally {
397
      this.lock.writeLock().unlock();
400
      this.lock.writeLock().unlock();
398
    }
401
    }
399
    LOG.info("Successfully loaded store file " + srcPath
402
    LOG.info("Successfully loaded store file " + srcPath
400
        + " into store " + this + " (new location: " + dstPath + ")");
403
        + " into store " + this + " (new location: " + dstPath + ")");
401
  }
404
  }
402

    
   
405

   
403
  /**
406
  /**
404
   * Get a temporary path in this region. These temporary files
407
   * Get a temporary path in this region. These temporary files
405
   * will get cleaned up when the region is re-opened if they are
408
   * will get cleaned up when the region is re-opened if they are
406
   * still around.
409
   * still around.
407
   */
410
   */
408
  private Path getTmpPath() throws IOException {
411
  private Path getTmpPath() throws IOException {
409
    return StoreFile.getRandomFilename(
412
    return StoreFile.getRandomFilename(
410
        fs, region.getTmpDir());
413
        fs, region.getTmpDir());
411
  }
414
  }
412

    
   
415

   
413
  /**
416
  /**
414
   * Close all the readers
417
   * Close all the readers
415
   *
418
   *
416
   * We don't need to worry about subsequent requests because the HRegion holds
419
   * We don't need to worry about subsequent requests because the HRegion holds
417
   * a write lock that will prevent any more reads or writes.
420
   * a write lock that will prevent any more reads or writes.
418
   *
421
   *
419
   * @throws IOException
422
   * @throws IOException
420
   */
423
   */
421
  ImmutableList<StoreFile> close() throws IOException {
424
  ImmutableList<StoreFile> close() throws IOException {
422
    this.lock.writeLock().lock();
425
    this.lock.writeLock().lock();
423
    try {
426
    try {
424
      ImmutableList<StoreFile> result = storefiles;
427
      ImmutableList<StoreFile> result = storefiles;
425

    
   
428

   
426
      // Clear so metrics doesn't find them.
429
      // Clear so metrics doesn't find them.
427
      storefiles = ImmutableList.of();
430
      storefiles = ImmutableList.of();
428

    
   
431

   
429
      for (StoreFile f: result) {
432
      for (StoreFile f: result) {
430
        f.closeReader();
433
        f.closeReader();
431
      }
434
      }
432
      LOG.debug("closed " + this.storeNameStr);
435
      LOG.debug("closed " + this.storeNameStr);
433
      return result;
436
      return result;
434
    } finally {
437
    } finally {
435
      this.lock.writeLock().unlock();
438
      this.lock.writeLock().unlock();
436
    }
439
    }
437
  }
440
  }
438

    
   
441

   
439
  /**
442
  /**
440
   * Snapshot this stores memstore.  Call before running
443
   * Snapshot this stores memstore.  Call before running
441
   * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
444
   * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
442
   */
445
   */
443
  void snapshot() {
446
  void snapshot() {
444
    this.memstore.snapshot();
447
    this.memstore.snapshot();
445
  }
448
  }
446

    
   
449

   
447
  /**
450
  /**
448
   * Write out current snapshot.  Presumes {@link #snapshot()} has been called
451
   * Write out current snapshot.  Presumes {@link #snapshot()} has been called
449
   * previously.
452
   * previously.
450
   * @param logCacheFlushId flush sequence number
453
   * @param logCacheFlushId flush sequence number
451
   * @param snapshot
454
   * @param snapshot
452
   * @param snapshotTimeRangeTracker
455
   * @param snapshotTimeRangeTracker
453
   * @return true if a compaction is needed
456
   * @return true if a compaction is needed
454
   * @throws IOException
457
   * @throws IOException
455
   */
458
   */
456
  private StoreFile flushCache(final long logCacheFlushId,
459
  private StoreFile flushCache(final long logCacheFlushId,
457
      SortedSet<KeyValue> snapshot,
460
      SortedSet<KeyValue> snapshot,
458
      TimeRangeTracker snapshotTimeRangeTracker,
461
      TimeRangeTracker snapshotTimeRangeTracker,
459
      MonitoredTask status) throws IOException {
462
      MonitoredTask status) throws IOException {
460
    // If an exception happens flushing, we let it out without clearing
463
    // If an exception happens flushing, we let it out without clearing
461
    // the memstore snapshot.  The old snapshot will be returned when we say
464
    // the memstore snapshot.  The old snapshot will be returned when we say
462
    // 'snapshot', the next time flush comes around.
465
    // 'snapshot', the next time flush comes around.
463
    return internalFlushCache(
466
    return internalFlushCache(
464
        snapshot, logCacheFlushId, snapshotTimeRangeTracker, status);
467
        snapshot, logCacheFlushId, snapshotTimeRangeTracker, status);
465
  }
468
  }
466

    
   
469

   
467
  /*
470
  /*
468
   * @param cache
471
   * @param cache
469
   * @param logCacheFlushId
472
   * @param logCacheFlushId
470
   * @return StoreFile created.
473
   * @return StoreFile created.
471
   * @throws IOException
474
   * @throws IOException
472
   */
475
   */
473
  private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
476
  private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
474
      final long logCacheFlushId,
477
      final long logCacheFlushId,
475
      TimeRangeTracker snapshotTimeRangeTracker,
478
      TimeRangeTracker snapshotTimeRangeTracker,
476
      MonitoredTask status)
479
      MonitoredTask status)
477
      throws IOException {
480
      throws IOException {
478
    StoreFile.Writer writer = null;
481
    StoreFile.Writer writer = null;
479
    long flushed = 0;
482
    long flushed = 0;
480
    // Don't flush if there are no entries.
483
    // Don't flush if there are no entries.
481
    if (set.size() == 0) {
484
    if (set.size() == 0) {
482
      return null;
485
      return null;
483
    }
486
    }
484
    long oldestTimestamp = System.currentTimeMillis() - ttl;
487
    Scan scan = new Scan();

    
   
488
    scan.setMaxVersions(maxVersions);

    
   
489
    // Use a store scanner to find which rows to flush.

    
   
490
    // Note that we need to retain deletes, hence

    
   
491
    // pass true as the StoreScanner's retainDeletesInOutput argument.

    
   
492
    InternalScanner scanner = new StoreScanner(this, scan,

    
   
493
        Collections.singletonList(new CollectionBackedScanner(set,

    
   
494
            this.comparator)), true);
485
    // TODO:  We can fail in the below block before we complete adding this
495
    // TODO:  We can fail in the below block before we complete adding this
486
    // flush to list of store files.  Add cleanup of anything put on filesystem
496
    // flush to list of store files.  Add cleanup of anything put on filesystem
487
    // if we fail.
497
    // if we fail.
488
    synchronized (flushLock) {
498
    synchronized (flushLock) {
489
      status.setStatus("Flushing " + this + ": creating writer");
499
      status.setStatus("Flushing " + this + ": creating writer");
490
      // A. Write the map out to the disk
500
      // A. Write the map out to the disk
491
      writer = createWriterInTmp(set.size());
501
      writer = createWriterInTmp(set.size());
492
      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
502
      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
493
      try {
503
      try {
494
        for (KeyValue kv: set) {
504
        List<KeyValue> kvs = new ArrayList<KeyValue>();
495
          // If minVersion > 0 we will wait until the next compaction to
505
        while (scanner.next(kvs)) {
496
          // collect expired KVs. (following the logic for maxVersions).
506
          if (!kvs.isEmpty()) {
497
          // TODO: As Jonathan Gray points this can be optimized
507
            for (KeyValue kv : kvs) {
498
          // (see HBASE-4241)

   
499
          if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {

   
500
            writer.append(kv);
508
              writer.append(kv);
501
            flushed += this.memstore.heapSizeChange(kv, true);
509
              flushed += this.memstore.heapSizeChange(kv, true);
502
          }
510
            }

    
   
511
            kvs.clear();

    
   
512
          }
503
        }
513
        }

    
   
514
        scanner.close();
504
      } finally {
515
      } finally {
505
        // Write out the log sequence number that corresponds to this output
516
        // Write out the log sequence number that corresponds to this output
506
        // hfile.  The hfile is current up to and including logCacheFlushId.
517
        // hfile.  The hfile is current up to and including logCacheFlushId.
507
        status.setStatus("Flushing " + this + ": appending metadata");
518
        status.setStatus("Flushing " + this + ": appending metadata");
508
        writer.appendMetadata(logCacheFlushId, false);
519
        writer.appendMetadata(logCacheFlushId, false);
509
        status.setStatus("Flushing " + this + ": closing flushed file");
520
        status.setStatus("Flushing " + this + ": closing flushed file");
510
        writer.close();
521
        writer.close();
511
      }
522
      }
512
    }
523
    }
513

    
   
524

   
514
    // Write-out finished successfully, move into the right spot
525
    // Write-out finished successfully, move into the right spot
515
    Path dstPath = StoreFile.getUniqueFile(fs, homedir);
526
    Path dstPath = StoreFile.getUniqueFile(fs, homedir);
516
    String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
527
    String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
517
    LOG.info(msg);
528
    LOG.info(msg);
518
    status.setStatus("Flushing " + this + ": " + msg);
529
    status.setStatus("Flushing " + this + ": " + msg);
519
    if (!fs.rename(writer.getPath(), dstPath)) {
530
    if (!fs.rename(writer.getPath(), dstPath)) {
520
      LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
531
      LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
521
    }
532
    }
522

    
   
533

   
523
    status.setStatus("Flushing " + this + ": reopening flushed file");
534
    status.setStatus("Flushing " + this + ": reopening flushed file");
524
    StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
535
    StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
525
      this.conf, this.family.getBloomFilterType(), this.inMemory);
536
      this.conf, this.family.getBloomFilterType(), this.inMemory);
526
    StoreFile.Reader r = sf.createReader();
537
    StoreFile.Reader r = sf.createReader();
527
    this.storeSize += r.length();
538
    this.storeSize += r.length();
528
    this.totalUncompressedBytes += r.getTotalUncompressedBytes();
539
    this.totalUncompressedBytes += r.getTotalUncompressedBytes();
529
    if(LOG.isInfoEnabled()) {
540
    if(LOG.isInfoEnabled()) {
530
      LOG.info("Added " + sf + ", entries=" + r.getEntries() +
541
      LOG.info("Added " + sf + ", entries=" + r.getEntries() +
531
        ", sequenceid=" + logCacheFlushId +
542
        ", sequenceid=" + logCacheFlushId +
532
        ", memsize=" + StringUtils.humanReadableInt(flushed) +
543
        ", memsize=" + StringUtils.humanReadableInt(flushed) +
533
        ", filesize=" + StringUtils.humanReadableInt(r.length()));
544
        ", filesize=" + StringUtils.humanReadableInt(r.length()));
534
    }
545
    }
535
    return sf;
546
    return sf;
536
  }
547
  }
537

    
   
548

   
538
  /*
549
  /*
539
   * @param maxKeyCount
550
   * @param maxKeyCount
540
   * @return Writer for a new StoreFile in the tmp dir.
551
   * @return Writer for a new StoreFile in the tmp dir.
541
   */
552
   */
542
  private StoreFile.Writer createWriterInTmp(int maxKeyCount)
553
  private StoreFile.Writer createWriterInTmp(int maxKeyCount)
543
  throws IOException {
554
  throws IOException {
544
    return createWriterInTmp(maxKeyCount, this.compression);
555
    return createWriterInTmp(maxKeyCount, this.compression);
545
  }
556
  }
546

    
   
557

   
547
  /*
558
  /*
548
   * @param maxKeyCount
559
   * @param maxKeyCount
549
   * @param compression Compression algorithm to use
560
   * @param compression Compression algorithm to use
550
   * @return Writer for a new StoreFile in the tmp dir.
561
   * @return Writer for a new StoreFile in the tmp dir.
551
   */
562
   */
552
  private StoreFile.Writer createWriterInTmp(int maxKeyCount,
563
  private StoreFile.Writer createWriterInTmp(int maxKeyCount,
553
    Compression.Algorithm compression)
564
    Compression.Algorithm compression)
554
  throws IOException {
565
  throws IOException {
555
    return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
566
    return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
556
        compression, this.comparator, this.conf,
567
        compression, this.comparator, this.conf,
557
        this.family.getBloomFilterType(), maxKeyCount);
568
        this.family.getBloomFilterType(), maxKeyCount);
558
  }
569
  }
559

    
   
570

   
560
  /*
571
  /*
561
   * Change storefiles adding into place the Reader produced by this new flush.
572
   * Change storefiles adding into place the Reader produced by this new flush.
562
   * @param sf
573
   * @param sf
563
   * @param set That was used to make the passed file <code>p</code>.
574
   * @param set That was used to make the passed file <code>p</code>.
564
   * @throws IOException
575
   * @throws IOException
565
   * @return Whether compaction is required.
576
   * @return Whether compaction is required.
566
   */
577
   */
567
  private boolean updateStorefiles(final StoreFile sf,
578
  private boolean updateStorefiles(final StoreFile sf,
568
                                   final SortedSet<KeyValue> set)
579
                                   final SortedSet<KeyValue> set)
569
  throws IOException {
580
  throws IOException {
570
    this.lock.writeLock().lock();
581
    this.lock.writeLock().lock();
571
    try {
582
    try {
572
      ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
583
      ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
573
      newList.add(sf);
584
      newList.add(sf);
574
      storefiles = sortAndClone(newList);
585
      storefiles = sortAndClone(newList);
575
      this.memstore.clearSnapshot(set);
586
      this.memstore.clearSnapshot(set);
576

    
   
587

   
577
      // Tell listeners of the change in readers.
588
      // Tell listeners of the change in readers.
578
      notifyChangedReadersObservers();
589
      notifyChangedReadersObservers();
579

    
   
590

   
580
      return needsCompaction();
591
      return needsCompaction();
581
    } finally {
592
    } finally {
582
      this.lock.writeLock().unlock();
593
      this.lock.writeLock().unlock();
583
    }
594
    }
584
  }
595
  }
585

    
   
596

   
586
  /*
597
  /*
587
   * Notify all observers that set of Readers has changed.
598
   * Notify all observers that set of Readers has changed.
588
   * @throws IOException
599
   * @throws IOException
589
   */
600
   */
590
  private void notifyChangedReadersObservers() throws IOException {
601
  private void notifyChangedReadersObservers() throws IOException {
591
    for (ChangedReadersObserver o: this.changedReaderObservers) {
602
    for (ChangedReadersObserver o: this.changedReaderObservers) {
592
      o.updateReaders();
603
      o.updateReaders();
593
    }
604
    }
594
  }
605
  }
595

    
   
606

   
596
  /*
607
  /*
597
   * @param o Observer who wants to know about changes in set of Readers
608
   * @param o Observer who wants to know about changes in set of Readers
598
   */
609
   */
599
  void addChangedReaderObserver(ChangedReadersObserver o) {
610
  void addChangedReaderObserver(ChangedReadersObserver o) {
600
    this.changedReaderObservers.add(o);
611
    this.changedReaderObservers.add(o);
601
  }
612
  }
602

    
   
613

   
603
  /*
614
  /*
604
   * @param o Observer no longer interested in changes in set of Readers.
615
   * @param o Observer no longer interested in changes in set of Readers.
605
   */
616
   */
606
  void deleteChangedReaderObserver(ChangedReadersObserver o) {
617
  void deleteChangedReaderObserver(ChangedReadersObserver o) {
607
    // We don't check if observer present; it may not be (legitimately)
618
    // We don't check if observer present; it may not be (legitimately)
608
    this.changedReaderObservers.remove(o);
619
    this.changedReaderObservers.remove(o);
609
  }
620
  }
610

    
   
621

   
611
  //////////////////////////////////////////////////////////////////////////////
622
  //////////////////////////////////////////////////////////////////////////////
612
  // Compaction
623
  // Compaction
613
  //////////////////////////////////////////////////////////////////////////////
624
  //////////////////////////////////////////////////////////////////////////////
614

    
   
625

   
615
  /**
626
  /**
616
   * Compact the StoreFiles.  This method may take some time, so the calling
627
   * Compact the StoreFiles.  This method may take some time, so the calling
617
   * thread must be able to block for long periods.
628
   * thread must be able to block for long periods.
618
   *
629
   *
619
   * <p>During this time, the Store can work as usual, getting values from
630
   * <p>During this time, the Store can work as usual, getting values from
620
   * StoreFiles and writing new StoreFiles from the memstore.
631
   * StoreFiles and writing new StoreFiles from the memstore.
621
   *
632
   *
622
   * Existing StoreFiles are not destroyed until the new compacted StoreFile is
633
   * Existing StoreFiles are not destroyed until the new compacted StoreFile is
623
   * completely written-out to disk.
634
   * completely written-out to disk.
624
   *
635
   *
625
   * <p>The compactLock prevents multiple simultaneous compactions.
636
   * <p>The compactLock prevents multiple simultaneous compactions.
626
   * The structureLock prevents us from interfering with other write operations.
637
   * The structureLock prevents us from interfering with other write operations.
627
   *
638
   *
628
   * <p>We don't want to hold the structureLock for the whole time, as a compact()
639
   * <p>We don't want to hold the structureLock for the whole time, as a compact()
629
   * can be lengthy and we want to allow cache-flushes during this period.
640
   * can be lengthy and we want to allow cache-flushes during this period.
630
   *
641
   *
631
   * @param CompactionRequest
642
   * @param CompactionRequest
632
   *          compaction details obtained from requestCompaction()
643
   *          compaction details obtained from requestCompaction()
633
   * @throws IOException
644
   * @throws IOException
634
   */
645
   */
635
  void compact(CompactionRequest cr) throws IOException {
646
  void compact(CompactionRequest cr) throws IOException {
636
    if (cr == null || cr.getFiles().isEmpty()) {
647
    if (cr == null || cr.getFiles().isEmpty()) {
637
      return;
648
      return;
638
    }
649
    }
639
    Preconditions.checkArgument(cr.getStore().toString()
650
    Preconditions.checkArgument(cr.getStore().toString()
640
        .equals(this.toString()));
651
        .equals(this.toString()));
641

    
   
652

   
642
    List<StoreFile> filesToCompact = cr.getFiles();
653
    List<StoreFile> filesToCompact = cr.getFiles();
643

    
   
654

   
644
    synchronized (filesCompacting) {
655
    synchronized (filesCompacting) {
645
      // sanity check: we're compacting files that this store knows about
656
      // sanity check: we're compacting files that this store knows about
646
      // TODO: change this to LOG.error() after more debugging
657
      // TODO: change this to LOG.error() after more debugging
647
      Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
658
      Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
648
    }
659
    }
649

    
   
660

   
650
    // Max-sequenceID is the last key in the files we're compacting
661
    // Max-sequenceID is the last key in the files we're compacting
651
    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
662
    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
652

    
   
663

   
653
    // Ready to go. Have list of files to compact.
664
    // Ready to go. Have list of files to compact.
654
    LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
665
    LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
655
        + this.storeNameStr + " of "
666
        + this.storeNameStr + " of "
656
        + this.region.getRegionInfo().getRegionNameAsString()
667
        + this.region.getRegionInfo().getRegionNameAsString()
657
        + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
668
        + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
658
        + StringUtils.humanReadableInt(cr.getSize()));
669
        + StringUtils.humanReadableInt(cr.getSize()));
659

    
   
670

   
660
    StoreFile sf = null;
671
    StoreFile sf = null;
661
    try {
672
    try {
662
      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
673
      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
663
          maxId);
674
          maxId);
664
      // Move the compaction into place.
675
      // Move the compaction into place.
665
      sf = completeCompaction(filesToCompact, writer);
676
      sf = completeCompaction(filesToCompact, writer);
666
    } finally {
677
    } finally {
667
      synchronized (filesCompacting) {
678
      synchronized (filesCompacting) {
668
        filesCompacting.removeAll(filesToCompact);
679
        filesCompacting.removeAll(filesToCompact);
669
      }
680
      }
670
    }
681
    }
671

    
   
682

   
672
    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
683
    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
673
        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
684
        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
674
        + this.region.getRegionInfo().getRegionNameAsString()
685
        + this.region.getRegionInfo().getRegionNameAsString()
675
        + "; new storefile name=" + (sf == null ? "none" : sf.toString())
686
        + "; new storefile name=" + (sf == null ? "none" : sf.toString())
676
        + ", size=" + (sf == null ? "none" :
687
        + ", size=" + (sf == null ? "none" :
677
          StringUtils.humanReadableInt(sf.getReader().length()))
688
          StringUtils.humanReadableInt(sf.getReader().length()))
678
        + "; total size for store is "
689
        + "; total size for store is "
679
        + StringUtils.humanReadableInt(storeSize));
690
        + StringUtils.humanReadableInt(storeSize));
680
  }
691
  }
681

    
   
692

   
682
  /*
693
  /*
683
   * Compact the most recent N files. Essentially a hook for testing.
694
   * Compact the most recent N files. Essentially a hook for testing.
684
   */
695
   */
685
  protected void compactRecent(int N) throws IOException {
696
  protected void compactRecent(int N) throws IOException {
686
    List<StoreFile> filesToCompact;
697
    List<StoreFile> filesToCompact;
687
    long maxId;
698
    long maxId;
688
    boolean isMajor;
699
    boolean isMajor;
689

    
   
700

   
690
    this.lock.readLock().lock();
701
    this.lock.readLock().lock();
691
    try {
702
    try {
692
      synchronized (filesCompacting) {
703
      synchronized (filesCompacting) {
693
        filesToCompact = Lists.newArrayList(storefiles);
704
        filesToCompact = Lists.newArrayList(storefiles);
694
        if (!filesCompacting.isEmpty()) {
705
        if (!filesCompacting.isEmpty()) {
695
          // exclude all files older than the newest file we're currently
706
          // exclude all files older than the newest file we're currently
696
          // compacting. this allows us to preserve contiguity (HBASE-2856)
707
          // compacting. this allows us to preserve contiguity (HBASE-2856)
697
          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
708
          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
698
          int idx = filesToCompact.indexOf(last);
709
          int idx = filesToCompact.indexOf(last);
699
          Preconditions.checkArgument(idx != -1);
710
          Preconditions.checkArgument(idx != -1);
700
          filesToCompact.subList(0, idx + 1).clear();
711
          filesToCompact.subList(0, idx + 1).clear();
701
        }
712
        }
702
        int count = filesToCompact.size();
713
        int count = filesToCompact.size();
703
        if (N > count) {
714
        if (N > count) {
704
          throw new RuntimeException("Not enough files");
715
          throw new RuntimeException("Not enough files");
705
        }
716
        }
706

    
   
717

   
707
        filesToCompact = filesToCompact.subList(count - N, count);
718
        filesToCompact = filesToCompact.subList(count - N, count);
708
        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
719
        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
709
        isMajor = (filesToCompact.size() == storefiles.size());
720
        isMajor = (filesToCompact.size() == storefiles.size());
710
        filesCompacting.addAll(filesToCompact);
721
        filesCompacting.addAll(filesToCompact);
711
        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
722
        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
712
      }
723
      }
713
    } finally {
724
    } finally {
714
      this.lock.readLock().unlock();
725
      this.lock.readLock().unlock();
715
    }
726
    }
716

    
   
727

   
717
    try {
728
    try {
718
      // Ready to go. Have list of files to compact.
729
      // Ready to go. Have list of files to compact.
719
      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
730
      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
720
      // Move the compaction into place.
731
      // Move the compaction into place.
721
      completeCompaction(filesToCompact, writer);
732
      completeCompaction(filesToCompact, writer);
722
    } finally {
733
    } finally {
723
      synchronized (filesCompacting) {
734
      synchronized (filesCompacting) {
724
        filesCompacting.removeAll(filesToCompact);
735
        filesCompacting.removeAll(filesToCompact);
725
      }
736
      }
726
    }
737
    }
727
  }
738
  }
728

    
   
739

   
729
  boolean hasReferences() {
740
  boolean hasReferences() {
730
    return hasReferences(this.storefiles);
741
    return hasReferences(this.storefiles);
731
  }
742
  }
732

    
   
743

   
733
  /*
744
  /*
734
   * @param files
745
   * @param files
735
   * @return True if any of the files in <code>files</code> are References.
746
   * @return True if any of the files in <code>files</code> are References.
736
   */
747
   */
737
  private boolean hasReferences(Collection<StoreFile> files) {
748
  private boolean hasReferences(Collection<StoreFile> files) {
738
    if (files != null && files.size() > 0) {
749
    if (files != null && files.size() > 0) {
739
      for (StoreFile hsf: files) {
750
      for (StoreFile hsf: files) {
740
        if (hsf.isReference()) {
751
        if (hsf.isReference()) {
741
          return true;
752
          return true;
742
        }
753
        }
743
      }
754
      }
744
    }
755
    }
745
    return false;
756
    return false;
746
  }
757
  }
747

    
   
758

   
748
  /*
759
  /*
749
   * Gets lowest timestamp from candidate StoreFiles
760
   * Gets lowest timestamp from candidate StoreFiles
750
   *
761
   *
751
   * @param fs
762
   * @param fs
752
   * @param dir
763
   * @param dir
753
   * @throws IOException
764
   * @throws IOException
754
   */
765
   */
755
  public static long getLowestTimestamp(final List<StoreFile> candidates) 
766
  public static long getLowestTimestamp(final List<StoreFile> candidates) 
756
      throws IOException {
767
      throws IOException {
757
    long minTs = Long.MAX_VALUE;
768
    long minTs = Long.MAX_VALUE;
758
    for (StoreFile storeFile : candidates) {
769
    for (StoreFile storeFile : candidates) {
759
      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
770
      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
760
    }
771
    }
761
    return minTs;
772
    return minTs;
762
  }
773
  }
763

    
   
774

   
764
  /*
775
  /*
765
   * @return True if we should run a major compaction.
776
   * @return True if we should run a major compaction.
766
   */
777
   */
767
  boolean isMajorCompaction() throws IOException {
778
  boolean isMajorCompaction() throws IOException {
768
    for (StoreFile sf : this.storefiles) {
779
    for (StoreFile sf : this.storefiles) {
769
      if (sf.getReader() == null) {
780
      if (sf.getReader() == null) {
770
        LOG.debug("StoreFile " + sf + " has null Reader");
781
        LOG.debug("StoreFile " + sf + " has null Reader");
771
        return false;
782
        return false;
772
      }
783
      }
773
    }
784
    }
774

    
   
785

   
775
    List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
786
    List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
776

    
   
787

   
777
    // exclude files above the max compaction threshold
788
    // exclude files above the max compaction threshold
778
    // except: save all references. we MUST compact them
789
    // except: save all references. we MUST compact them
779
    int pos = 0;
790
    int pos = 0;
780
    while (pos < candidates.size() &&
791
    while (pos < candidates.size() &&
781
           candidates.get(pos).getReader().length() > this.maxCompactSize &&
792
           candidates.get(pos).getReader().length() > this.maxCompactSize &&
782
           !candidates.get(pos).isReference()) ++pos;
793
           !candidates.get(pos).isReference()) ++pos;
783
    candidates.subList(0, pos).clear();
794
    candidates.subList(0, pos).clear();
784

    
   
795

   
785
    return isMajorCompaction(candidates);
796
    return isMajorCompaction(candidates);
786
  }
797
  }
787

    
   
798

   
788
  /*
799
  /*
789
   * @param filesToCompact Files to compact. Can be null.
800
   * @param filesToCompact Files to compact. Can be null.
790
   * @return True if we should run a major compaction.
801
   * @return True if we should run a major compaction.
791
   */
802
   */
792
  private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
803
  private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
793
    boolean result = false;
804
    boolean result = false;
794
    if (filesToCompact == null || filesToCompact.isEmpty() ||
805
    if (filesToCompact == null || filesToCompact.isEmpty() ||
795
        majorCompactionTime == 0) {
806
        majorCompactionTime == 0) {
796
      return result;
807
      return result;
797
        }
808
        }
798
    // TODO: Use better method for determining stamp of last major (HBASE-2990)
809
    // TODO: Use better method for determining stamp of last major (HBASE-2990)
799
    long lowTimestamp = getLowestTimestamp(filesToCompact);
810
    long lowTimestamp = getLowestTimestamp(filesToCompact);
800
    long now = System.currentTimeMillis();
811
    long now = System.currentTimeMillis();
801
    if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
812
    if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
802
      // Major compaction time has elapsed.
813
      // Major compaction time has elapsed.
803
      if (filesToCompact.size() == 1) {
814
      if (filesToCompact.size() == 1) {
804
        // Single file
815
        // Single file
805
        StoreFile sf = filesToCompact.get(0);
816
        StoreFile sf = filesToCompact.get(0);
806
        long oldest =
817
        long oldest =
807
            (sf.getReader().timeRangeTracker == null) ?
818
            (sf.getReader().timeRangeTracker == null) ?
808
                Long.MIN_VALUE :
819
                Long.MIN_VALUE :
809
                now - sf.getReader().timeRangeTracker.minimumTimestamp;
820
                now - sf.getReader().timeRangeTracker.minimumTimestamp;
810
        if (sf.isMajorCompaction() &&
821
        if (sf.isMajorCompaction() &&
811
            (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
822
            (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
812
          if (LOG.isDebugEnabled()) {
823
          if (LOG.isDebugEnabled()) {
813
            LOG.debug("Skipping major compaction of " + this.storeNameStr +
824
            LOG.debug("Skipping major compaction of " + this.storeNameStr +
814
                " because one (major) compacted file only and oldestTime " +
825
                " because one (major) compacted file only and oldestTime " +
815
                oldest + "ms is < ttl=" + this.ttl);
826
                oldest + "ms is < ttl=" + this.ttl);
816
          }
827
          }
817
        } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
828
        } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
818
          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
829
          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
819
            ", because keyvalues outdated; time since last major compaction " + 
830
            ", because keyvalues outdated; time since last major compaction " + 
820
            (now - lowTimestamp) + "ms");
831
            (now - lowTimestamp) + "ms");
821
          result = true;
832
          result = true;
822
        }
833
        }
823
      } else {
834
      } else {
824
        if (LOG.isDebugEnabled()) {
835
        if (LOG.isDebugEnabled()) {
825
          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
836
          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
826
              "; time since last major compaction " + (now - lowTimestamp) + "ms");
837
              "; time since last major compaction " + (now - lowTimestamp) + "ms");
827
        }
838
        }
828
        result = true;
839
        result = true;
829
      }
840
      }
830
    }
841
    }
831
    return result;
842
    return result;
832
  }
843
  }
833

    
   
844

   
834
  long getNextMajorCompactTime() {
845
  long getNextMajorCompactTime() {
835
    // default = 24hrs
846
    // default = 24hrs
836
    long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
847
    long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
837
    if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
848
    if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
838
      String strCompactionTime =
849
      String strCompactionTime =
839
        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
850
        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
840
      ret = (new Long(strCompactionTime)).longValue();
851
      ret = (new Long(strCompactionTime)).longValue();
841
    }
852
    }
842

    
   
853

   
843
    if (ret > 0) {
854
    if (ret > 0) {
844
      // default = 20% = +/- 4.8 hrs
855
      // default = 20% = +/- 4.8 hrs
845
      double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
856
      double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
846
          0.20F);
857
          0.20F);
847
      if (jitterPct > 0) {
858
      if (jitterPct > 0) {
848
        long jitter = Math.round(ret * jitterPct);
859
        long jitter = Math.round(ret * jitterPct);
849
        ret += jitter - Math.round(2L * jitter * Math.random());
860
        ret += jitter - Math.round(2L * jitter * Math.random());
850
      }
861
      }
851
    }
862
    }
852
    return ret;
863
    return ret;
853
  }
864
  }
854

    
   
865

   
855
  public CompactionRequest requestCompaction() {
866
  public CompactionRequest requestCompaction() {
856
    // don't even select for compaction if writes are disabled
867
    // don't even select for compaction if writes are disabled
857
    if (!this.region.areWritesEnabled()) {
868
    if (!this.region.areWritesEnabled()) {
858
      return null;
869
      return null;
859
    }
870
    }
860

    
   
871

   
861
    CompactionRequest ret = null;
872
    CompactionRequest ret = null;
862
    this.lock.readLock().lock();
873
    this.lock.readLock().lock();
863
    try {
874
    try {
864
      synchronized (filesCompacting) {
875
      synchronized (filesCompacting) {
865
        // candidates = all storefiles not already in compaction queue
876
        // candidates = all storefiles not already in compaction queue
866
        List<StoreFile> candidates = Lists.newArrayList(storefiles);
877
        List<StoreFile> candidates = Lists.newArrayList(storefiles);
867
        if (!filesCompacting.isEmpty()) {
878
        if (!filesCompacting.isEmpty()) {
868
          // exclude all files older than the newest file we're currently
879
          // exclude all files older than the newest file we're currently
869
          // compacting. this allows us to preserve contiguity (HBASE-2856)
880
          // compacting. this allows us to preserve contiguity (HBASE-2856)
870
          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
881
          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
871
          int idx = candidates.indexOf(last);
882
          int idx = candidates.indexOf(last);
872
          Preconditions.checkArgument(idx != -1);
883
          Preconditions.checkArgument(idx != -1);
873
          candidates.subList(0, idx + 1).clear();
884
          candidates.subList(0, idx + 1).clear();
874
        }
885
        }
875
        List<StoreFile> filesToCompact = compactSelection(candidates);
886
        List<StoreFile> filesToCompact = compactSelection(candidates);
876

    
   
887

   
877
        // no files to compact
888
        // no files to compact
878
        if (filesToCompact.isEmpty()) {
889
        if (filesToCompact.isEmpty()) {
879
          return null;
890
          return null;
880
        }
891
        }
881

    
   
892

   
882
        // basic sanity check: do not try to compact the same StoreFile twice.
893
        // basic sanity check: do not try to compact the same StoreFile twice.
883
        if (!Collections.disjoint(filesCompacting, filesToCompact)) {
894
        if (!Collections.disjoint(filesCompacting, filesToCompact)) {
884
          // TODO: change this from an IAE to LOG.error after sufficient testing
895
          // TODO: change this from an IAE to LOG.error after sufficient testing
885
          Preconditions.checkArgument(false, "%s overlaps with %s",
896
          Preconditions.checkArgument(false, "%s overlaps with %s",
886
              filesToCompact, filesCompacting);
897
              filesToCompact, filesCompacting);
887
        }
898
        }
888
        filesCompacting.addAll(filesToCompact);
899
        filesCompacting.addAll(filesToCompact);
889
        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
900
        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
890

    
   
901

   
891
        // major compaction iff all StoreFiles are included
902
        // major compaction iff all StoreFiles are included
892
        boolean isMajor = (filesToCompact.size() == this.storefiles.size());
903
        boolean isMajor = (filesToCompact.size() == this.storefiles.size());
893
        if (isMajor) {
904
        if (isMajor) {
894
          // since we're enqueuing a major, update the compaction wait interval
905
          // since we're enqueuing a major, update the compaction wait interval
895
          this.forceMajor = false;
906
          this.forceMajor = false;
896
          this.majorCompactionTime = getNextMajorCompactTime();
907
          this.majorCompactionTime = getNextMajorCompactTime();
897
        }
908
        }
898

    
   
909

   
899
        // everything went better than expected. create a compaction request
910
        // everything went better than expected. create a compaction request
900
        int pri = getCompactPriority();
911
        int pri = getCompactPriority();
901
        ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
912
        ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
902
      }
913
      }
903
    } catch (IOException ex) {
914
    } catch (IOException ex) {
904
      LOG.error("Compaction Request failed for region " + region + ", store "
915
      LOG.error("Compaction Request failed for region " + region + ", store "
905
          + this, RemoteExceptionHandler.checkIOException(ex));
916
          + this, RemoteExceptionHandler.checkIOException(ex));
906
    } finally {
917
    } finally {
907
      this.lock.readLock().unlock();
918
      this.lock.readLock().unlock();
908
    }
919
    }
909
    return ret;
920
    return ret;
910
  }
921
  }
911

    
   
922

   
912
  public void finishRequest(CompactionRequest cr) {
923
  public void finishRequest(CompactionRequest cr) {
913
    synchronized (filesCompacting) {
924
    synchronized (filesCompacting) {
914
      filesCompacting.removeAll(cr.getFiles());
925
      filesCompacting.removeAll(cr.getFiles());
915
    }
926
    }
916
  }
927
  }
917

    
   
928

   
918
  /**
929
  /**
919
   * Algorithm to choose which files to compact
930
   * Algorithm to choose which files to compact
920
   *
931
   *
921
   * Configuration knobs:
932
   * Configuration knobs:
922
   *  "hbase.hstore.compaction.ratio"
933
   *  "hbase.hstore.compaction.ratio"
923
   *    normal case: minor compact when file <= sum(smaller_files) * ratio
934
   *    normal case: minor compact when file <= sum(smaller_files) * ratio
924
   *  "hbase.hstore.compaction.min.size"
935
   *  "hbase.hstore.compaction.min.size"
925
   *    unconditionally compact individual files below this size
936
   *    unconditionally compact individual files below this size
926
   *  "hbase.hstore.compaction.max.size"
937
   *  "hbase.hstore.compaction.max.size"
927
   *    never compact individual files above this size (unless splitting)
938
   *    never compact individual files above this size (unless splitting)
928
   *  "hbase.hstore.compaction.min"
939
   *  "hbase.hstore.compaction.min"
929
   *    min files needed to minor compact
940
   *    min files needed to minor compact
930
   *  "hbase.hstore.compaction.max"
941
   *  "hbase.hstore.compaction.max"
931
   *    max files to compact at once (avoids OOM)
942
   *    max files to compact at once (avoids OOM)
932
   *
943
   *
933
   * @param candidates candidate files, ordered from oldest to newest
944
   * @param candidates candidate files, ordered from oldest to newest
934
   * @return subset copy of candidate list that meets compaction criteria
945
   * @return subset copy of candidate list that meets compaction criteria
935
   * @throws IOException
946
   * @throws IOException
936
   */
947
   */
937
  List<StoreFile> compactSelection(List<StoreFile> candidates)
948
  List<StoreFile> compactSelection(List<StoreFile> candidates)
938
      throws IOException {
949
      throws IOException {
939
    // ASSUMPTION!!! filesCompacting is locked when calling this function
950
    // ASSUMPTION!!! filesCompacting is locked when calling this function
940

    
   
951

   
941
    /* normal skew:
952
    /* normal skew:
942
     *
953
     *
943
     *         older ----> newer
954
     *         older ----> newer
944
     *     _
955
     *     _
945
     *    | |   _
956
     *    | |   _
946
     *    | |  | |   _
957
     *    | |  | |   _
947
     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
958
     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
948
     *    | |  | |  | |  | |  _  | |
959
     *    | |  | |  | |  | |  _  | |
949
     *    | |  | |  | |  | | | | | |
960
     *    | |  | |  | |  | | | | | |
950
     *    | |  | |  | |  | | | | | |
961
     *    | |  | |  | |  | | | | | |
951
     */
962
     */
952
    List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
963
    List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
953

    
   
964

   
954
    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
965
    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
955
    if (!forcemajor) {
966
    if (!forcemajor) {
956
      // do not compact old files above a configurable threshold
967
      // do not compact old files above a configurable threshold
957
      // save all references. we MUST compact them
968
      // save all references. we MUST compact them
958
      int pos = 0;
969
      int pos = 0;
959
      while (pos < filesToCompact.size() &&
970
      while (pos < filesToCompact.size() &&
960
             filesToCompact.get(pos).getReader().length() > maxCompactSize &&
971
             filesToCompact.get(pos).getReader().length() > maxCompactSize &&
961
             !filesToCompact.get(pos).isReference()) ++pos;
972
             !filesToCompact.get(pos).isReference()) ++pos;
962
      filesToCompact.subList(0, pos).clear();
973
      filesToCompact.subList(0, pos).clear();
963
    }
974
    }
964

    
   
975

   
965
    if (filesToCompact.isEmpty()) {
976
    if (filesToCompact.isEmpty()) {
966
      LOG.debug(this.storeNameStr + ": no store files to compact");
977
      LOG.debug(this.storeNameStr + ": no store files to compact");
967
      return filesToCompact;
978
      return filesToCompact;
968
    }
979
    }
969

    
   
980

   
970
    // major compact on user action or age (caveat: we have too many files)
981
    // major compact on user action or age (caveat: we have too many files)
971
    boolean majorcompaction = filesToCompact.size() < this.maxFilesToCompact
982
    boolean majorcompaction = filesToCompact.size() < this.maxFilesToCompact
972
      && (forcemajor || isMajorCompaction(filesToCompact));
983
      && (forcemajor || isMajorCompaction(filesToCompact));
973

    
   
984

   
974
    if (!majorcompaction && !hasReferences(filesToCompact)) {
985
    if (!majorcompaction && !hasReferences(filesToCompact)) {
975
      // we're doing a minor compaction, let's see what files are applicable
986
      // we're doing a minor compaction, let's see what files are applicable
976
      int start = 0;
987
      int start = 0;
977
      double r = this.compactRatio;
988
      double r = this.compactRatio;
978

    
   
989

   
979
      // skip selection algorithm if we don't have enough files
990
      // skip selection algorithm if we don't have enough files
980
      if (filesToCompact.size() < this.minFilesToCompact) {
991
      if (filesToCompact.size() < this.minFilesToCompact) {
981
        return Collections.emptyList();
992
        return Collections.emptyList();
982
      }
993
      }
983

    
   
994

   
984
      /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
995
      /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
985
      // Sort files by size to correct when normal skew is altered by bulk load.
996
      // Sort files by size to correct when normal skew is altered by bulk load.
986
      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
997
      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
987
       */
998
       */
988

    
   
999

   
989
      // get store file sizes for incremental compacting selection.
1000
      // get store file sizes for incremental compacting selection.
990
      int countOfFiles = filesToCompact.size();
1001
      int countOfFiles = filesToCompact.size();
991
      long [] fileSizes = new long[countOfFiles];
1002
      long [] fileSizes = new long[countOfFiles];
992
      long [] sumSize = new long[countOfFiles];
1003
      long [] sumSize = new long[countOfFiles];
993
      for (int i = countOfFiles-1; i >= 0; --i) {
1004
      for (int i = countOfFiles-1; i >= 0; --i) {
994
        StoreFile file = filesToCompact.get(i);
1005
        StoreFile file = filesToCompact.get(i);
995
        fileSizes[i] = file.getReader().length();
1006
        fileSizes[i] = file.getReader().length();
996
        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
1007
        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
997
        int tooFar = i + this.maxFilesToCompact - 1;
1008
        int tooFar = i + this.maxFilesToCompact - 1;
998
        sumSize[i] = fileSizes[i]
1009
        sumSize[i] = fileSizes[i]
999
                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
1010
                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
1000
                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
1011
                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
1001
      }
1012
      }
1002

    
   
1013

   
1003
      /* Start at the oldest file and stop when you find the first file that
1014
      /* Start at the oldest file and stop when you find the first file that
1004
       * meets compaction criteria:
1015
       * meets compaction criteria:
1005
       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
1016
       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
1006
       *      OR
1017
       *      OR
1007
       *   (2) within the compactRatio of sum(newer_files)
1018
       *   (2) within the compactRatio of sum(newer_files)
1008
       * Given normal skew, any newer files will also meet this criteria
1019
       * Given normal skew, any newer files will also meet this criteria
1009
       *
1020
       *
1010
       * Additional Note:
1021
       * Additional Note:
1011
       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
1022
       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
1012
       * compact().  Consider the oldest files first to avoid a
1023
       * compact().  Consider the oldest files first to avoid a
1013
       * situation where we always compact [end-threshold,end).  Then, the
1024
       * situation where we always compact [end-threshold,end).  Then, the
1014
       * last file becomes an aggregate of the previous compactions.
1025
       * last file becomes an aggregate of the previous compactions.
1015
       */
1026
       */
1016
      while(countOfFiles - start >= this.minFilesToCompact &&
1027
      while(countOfFiles - start >= this.minFilesToCompact &&
1017
            fileSizes[start] >
1028
            fileSizes[start] >
1018
              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
1029
              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
1019
        ++start;
1030
        ++start;
1020
      }
1031
      }
1021
      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
1032
      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
1022
      long totalSize = fileSizes[start]
1033
      long totalSize = fileSizes[start]
1023
                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
1034
                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
1024
      filesToCompact = filesToCompact.subList(start, end);
1035
      filesToCompact = filesToCompact.subList(start, end);
1025

    
   
1036

   
1026
      // if we don't have enough files to compact, just wait
1037
      // if we don't have enough files to compact, just wait
1027
      if (filesToCompact.size() < this.minFilesToCompact) {
1038
      if (filesToCompact.size() < this.minFilesToCompact) {
1028
        if (LOG.isDebugEnabled()) {
1039
        if (LOG.isDebugEnabled()) {
1029
          LOG.debug("Skipped compaction of " + this.storeNameStr
1040
          LOG.debug("Skipped compaction of " + this.storeNameStr
1030
            + ".  Only " + (end - start) + " file(s) of size "
1041
            + ".  Only " + (end - start) + " file(s) of size "
1031
            + StringUtils.humanReadableInt(totalSize)
1042
            + StringUtils.humanReadableInt(totalSize)
1032
            + " have met compaction criteria.");
1043
            + " have met compaction criteria.");
1033
        }
1044
        }
1034
        return Collections.emptyList();
1045
        return Collections.emptyList();
1035
      }
1046
      }
1036
    } else {
1047
    } else {
1037
      // all files included in this compaction, up to max
1048
      // all files included in this compaction, up to max
1038
      if (filesToCompact.size() > this.maxFilesToCompact) {
1049
      if (filesToCompact.size() > this.maxFilesToCompact) {
1039
        int pastMax = filesToCompact.size() - this.maxFilesToCompact;
1050
        int pastMax = filesToCompact.size() - this.maxFilesToCompact;
1040
        filesToCompact.subList(0, pastMax).clear();
1051
        filesToCompact.subList(0, pastMax).clear();
1041
      }
1052
      }
1042
    }
1053
    }
1043
    return filesToCompact;
1054
    return filesToCompact;
1044
  }
1055
  }
1045

    
   
1056

   
1046
  /**
1057
  /**
1047
   * Do a minor/major compaction on an explicit set of storefiles in a Store.
1058
   * Do a minor/major compaction on an explicit set of storefiles in a Store.
1048
   * Uses the scan infrastructure to make it easy.
1059
   * Uses the scan infrastructure to make it easy.
1049
   *
1060
   *
1050
   * @param filesToCompact which files to compact
1061
   * @param filesToCompact which files to compact
1051
   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
1062
   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
1052
   * @param maxId Readers maximum sequence id.
1063
   * @param maxId Readers maximum sequence id.
1053
   * @return Product of compaction or null if all cells expired or deleted and
1064
   * @return Product of compaction or null if all cells expired or deleted and
1054
   * nothing made it through the compaction.
1065
   * nothing made it through the compaction.
1055
   * @throws IOException
1066
   * @throws IOException
1056
   */
1067
   */
1057
  private StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
1068
  private StoreFile.Writer compactStore(final Collection<StoreFile> filesToCompact,
1058
                               final boolean majorCompaction, final long maxId)
1069
                               final boolean majorCompaction, final long maxId)
1059
      throws IOException {
1070
      throws IOException {
1060
    // calculate maximum key count after compaction (for blooms)
1071
    // calculate maximum key count after compaction (for blooms)
1061
    int maxKeyCount = 0;
1072
    int maxKeyCount = 0;
1062
    for (StoreFile file : filesToCompact) {
1073
    for (StoreFile file : filesToCompact) {
1063
      StoreFile.Reader r = file.getReader();
1074
      StoreFile.Reader r = file.getReader();
1064
      if (r != null) {
1075
      if (r != null) {
1065
        // NOTE: getFilterEntries could cause under-sized blooms if the user
1076
        // NOTE: getFilterEntries could cause under-sized blooms if the user
1066
        //       switches bloom type (e.g. from ROW to ROWCOL)
1077
        //       switches bloom type (e.g. from ROW to ROWCOL)
1067
        long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
1078
        long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
1068
          ? r.getFilterEntries() : r.getEntries();
1079
          ? r.getFilterEntries() : r.getEntries();
1069
        maxKeyCount += keyCount;
1080
        maxKeyCount += keyCount;
1070
        if (LOG.isDebugEnabled()) {
1081
        if (LOG.isDebugEnabled()) {
1071
          LOG.debug("Compacting " + file +
1082
          LOG.debug("Compacting " + file +
1072
            ", keycount=" + keyCount +
1083
            ", keycount=" + keyCount +
1073
            ", bloomtype=" + r.getBloomFilterType().toString() +
1084
            ", bloomtype=" + r.getBloomFilterType().toString() +
1074
            ", size=" + StringUtils.humanReadableInt(r.length()) );
1085
            ", size=" + StringUtils.humanReadableInt(r.length()) );
1075
        }
1086
        }
1076
      }
1087
      }
1077
    }
1088
    }
1078

    
   
1089

   
1079
    // For each file, obtain a scanner:
1090
    // For each file, obtain a scanner:
1080
    List<StoreFileScanner> scanners = StoreFileScanner
1091
    List<StoreFileScanner> scanners = StoreFileScanner
1081
      .getScannersForStoreFiles(filesToCompact, false, false);
1092
      .getScannersForStoreFiles(filesToCompact, false, false);
1082

    
   
1093

   
1083
    // Make the instantiation lazy in case compaction produces no product; i.e.
1094
    // Make the instantiation lazy in case compaction produces no product; i.e.
1084
    // where all source cells are expired or deleted.
1095
    // where all source cells are expired or deleted.
1085
    StoreFile.Writer writer = null;
1096
    StoreFile.Writer writer = null;
1086
    try {
1097
    try {
1087
      InternalScanner scanner = null;
1098
      InternalScanner scanner = null;
1088
      try {
1099
      try {
1089
        Scan scan = new Scan();
1100
        Scan scan = new Scan();
1090
        scan.setMaxVersions(family.getMaxVersions());
1101
        scan.setMaxVersions(family.getMaxVersions());
1091
        /* include deletes, unless we are doing a major compaction */
1102
        /* include deletes, unless we are doing a major compaction */
1092
        scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
1103
        scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
1093
        int bytesWritten = 0;
1104
        int bytesWritten = 0;
1094
        // since scanner.next() can return 'false' but still be delivering data,
1105
        // since scanner.next() can return 'false' but still be delivering data,
1095
        // we have to use a do/while loop.
1106
        // we have to use a do/while loop.
1096
        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
1107
        ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
1097
        while (scanner.next(kvs)) {
1108
        while (scanner.next(kvs)) {
1098
          if (writer == null && !kvs.isEmpty()) {
1109
          if (writer == null && !kvs.isEmpty()) {
1099
            writer = createWriterInTmp(maxKeyCount,
1110
            writer = createWriterInTmp(maxKeyCount,
1100
              this.compactionCompression);
1111
              this.compactionCompression);
1101
          }
1112
          }
1102
          if (writer != null) {
1113
          if (writer != null) {
1103
            // output to writer:
1114
            // output to writer:
1104
            for (KeyValue kv : kvs) {
1115
            for (KeyValue kv : kvs) {
1105
              writer.append(kv);
1116
              writer.append(kv);
1106

    
   
1117

   
1107
              // check periodically to see if a system stop is requested
1118
              // check periodically to see if a system stop is requested
1108
              if (Store.closeCheckInterval > 0) {
1119
              if (Store.closeCheckInterval > 0) {
1109
                bytesWritten += kv.getLength();
1120
                bytesWritten += kv.getLength();
1110
                if (bytesWritten > Store.closeCheckInterval) {
1121
                if (bytesWritten > Store.closeCheckInterval) {
1111
                  bytesWritten = 0;
1122
                  bytesWritten = 0;
1112
                  if (!this.region.areWritesEnabled()) {
1123
                  if (!this.region.areWritesEnabled()) {
1113
                    writer.close();
1124
                    writer.close();
1114
                    fs.delete(writer.getPath(), false);
1125
                    fs.delete(writer.getPath(), false);
1115
                    throw new InterruptedIOException(
1126
                    throw new InterruptedIOException(
1116
                        "Aborting compaction of store " + this +
1127
                        "Aborting compaction of store " + this +
1117
                        " in region " + this.region +
1128
                        " in region " + this.region +
1118
                        " because user requested stop.");
1129
                        " because user requested stop.");
1119
                  }
1130
                  }
1120
                }
1131
                }
1121
              }
1132
              }
1122
            }
1133
            }
1123
          }
1134
          }
1124
          kvs.clear();
1135
          kvs.clear();
1125
        }
1136
        }
1126
      } finally {
1137
      } finally {
1127
        if (scanner != null) {
1138
        if (scanner != null) {
1128
          scanner.close();
1139
          scanner.close();
1129
        }
1140
        }
1130
      }
1141
      }
1131
    } finally {
1142
    } finally {
1132
      if (writer != null) {
1143
      if (writer != null) {
1133
        writer.appendMetadata(maxId, majorCompaction);
1144
        writer.appendMetadata(maxId, majorCompaction);
1134
        writer.close();
1145
        writer.close();
1135
      }
1146
      }
1136
    }
1147
    }
1137
    return writer;
1148
    return writer;
1138
  }
1149
  }
1139

    
   
1150

   
1140
  /*
1151
  /*
1141
   * <p>It works by processing a compaction that's been written to disk.
1152
   * <p>It works by processing a compaction that's been written to disk.
1142
   *
1153
   *
1143
   * <p>It is usually invoked at the end of a compaction, but might also be
1154
   * <p>It is usually invoked at the end of a compaction, but might also be
1144
   * invoked at HStore startup, if the prior execution died midway through.
1155
   * invoked at HStore startup, if the prior execution died midway through.
1145
   *
1156
   *
1146
   * <p>Moving the compacted TreeMap into place means:
1157
   * <p>Moving the compacted TreeMap into place means:
1147
   * <pre>
1158
   * <pre>
1148
   * 1) Moving the new compacted StoreFile into place
1159
   * 1) Moving the new compacted StoreFile into place
1149
   * 2) Unload all replaced StoreFile, close and collect list to delete.
1160
   * 2) Unload all replaced StoreFile, close and collect list to delete.
1150
   * 3) Loading the new TreeMap.
1161
   * 3) Loading the new TreeMap.
1151
   * 4) Compute new store size
1162
   * 4) Compute new store size
1152
   * </pre>
1163
   * </pre>
1153
   *
1164
   *
1154
   * @param compactedFiles list of files that were compacted
1165
   * @param compactedFiles list of files that were compacted
1155
   * @param compactedFile StoreFile that is the result of the compaction
1166
   * @param compactedFile StoreFile that is the result of the compaction
1156
   * @return StoreFile created. May be null.
1167
   * @return StoreFile created. May be null.
1157
   * @throws IOException
1168
   * @throws IOException
1158
   */
1169
   */
1159
  private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
1170
  private StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
1160
                                       final StoreFile.Writer compactedFile)
1171
                                       final StoreFile.Writer compactedFile)
1161
      throws IOException {
1172
      throws IOException {
1162
    // 1. Moving the new files into place -- if there is a new file (may not
1173
    // 1. Moving the new files into place -- if there is a new file (may not
1163
    // be if all cells were expired or deleted).
1174
    // be if all cells were expired or deleted).
1164
    StoreFile result = null;
1175
    StoreFile result = null;
1165
    if (compactedFile != null) {
1176
    if (compactedFile != null) {
1166
      Path p = null;
1177
      Path p = null;
1167
      try {
1178
      try {
1168
        p = StoreFile.rename(this.fs, compactedFile.getPath(),
1179
        p = StoreFile.rename(this.fs, compactedFile.getPath(),
1169
          StoreFile.getRandomFilename(fs, this.homedir));
1180
          StoreFile.getRandomFilename(fs, this.homedir));
1170
      } catch (IOException e) {
1181
      } catch (IOException e) {
1171
        LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
1182
        LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
1172
        return null;
1183
        return null;
1173
      }
1184
      }
1174
      result = new StoreFile(this.fs, p, blockcache, this.conf,
1185
      result = new StoreFile(this.fs, p, blockcache, this.conf,
1175
          this.family.getBloomFilterType(), this.inMemory);
1186
          this.family.getBloomFilterType(), this.inMemory);
1176
      result.createReader();
1187
      result.createReader();
1177
    }
1188
    }
1178
    this.lock.writeLock().lock();
1189
    this.lock.writeLock().lock();
1179
    try {
1190
    try {
1180
      try {
1191
      try {
1181
        // Change this.storefiles so it reflects new state but do not
1192
        // Change this.storefiles so it reflects new state but do not
1182
        // delete old store files until we have sent out notification of
1193
        // delete old store files until we have sent out notification of
1183
        // change in case old files are still being accessed by outstanding
1194
        // change in case old files are still being accessed by outstanding
1184
        // scanners.
1195
        // scanners.
1185
        ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
1196
        ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
1186
        newStoreFiles.removeAll(compactedFiles);
1197
        newStoreFiles.removeAll(compactedFiles);
1187
        filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
1198
        filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
1188

    
   
1199

   
1189
        // If a StoreFile result, move it into place.  May be null.
1200
        // If a StoreFile result, move it into place.  May be null.
1190
        if (result != null) {
1201
        if (result != null) {
1191
          newStoreFiles.add(result);
1202
          newStoreFiles.add(result);
1192
        }
1203
        }
1193

    
   
1204

   
1194
        this.storefiles = sortAndClone(newStoreFiles);
1205
        this.storefiles = sortAndClone(newStoreFiles);
1195

    
   
1206

   
1196
        // Tell observers that list of StoreFiles has changed.
1207
        // Tell observers that list of StoreFiles has changed.
1197
        notifyChangedReadersObservers();
1208
        notifyChangedReadersObservers();
1198
        // Finally, delete old store files.
1209
        // Finally, delete old store files.
1199
        for (StoreFile hsf: compactedFiles) {
1210
        for (StoreFile hsf: compactedFiles) {
1200
          hsf.deleteReader();
1211
          hsf.deleteReader();
1201
        }
1212
        }
1202
      } catch (IOException e) {
1213
      } catch (IOException e) {
1203
        e = RemoteExceptionHandler.checkIOException(e);
1214
        e = RemoteExceptionHandler.checkIOException(e);
1204
        LOG.error("Failed replacing compacted files in " + this.storeNameStr +
1215
        LOG.error("Failed replacing compacted files in " + this.storeNameStr +
1205
          ". Compacted file is " + (result == null? "none": result.toString()) +
1216
          ". Compacted file is " + (result == null? "none": result.toString()) +
1206
          ".  Files replaced " + compactedFiles.toString() +
1217
          ".  Files replaced " + compactedFiles.toString() +
1207
          " some of which may have been already removed", e);
1218
          " some of which may have been already removed", e);
1208
      }
1219
      }
1209
      // 4. Compute new store size
1220
      // 4. Compute new store size
1210
      this.storeSize = 0L;
1221
      this.storeSize = 0L;
1211
      this.totalUncompressedBytes = 0L;
1222
      this.totalUncompressedBytes = 0L;
1212
      for (StoreFile hsf : this.storefiles) {
1223
      for (StoreFile hsf : this.storefiles) {
1213
        StoreFile.Reader r = hsf.getReader();
1224
        StoreFile.Reader r = hsf.getReader();
1214
        if (r == null) {
1225
        if (r == null) {
1215
          LOG.warn("StoreFile " + hsf + " has a null Reader");
1226
          LOG.warn("StoreFile " + hsf + " has a null Reader");
1216
          continue;
1227
          continue;
1217
        }
1228
        }
1218
        this.storeSize += r.length();
1229
        this.storeSize += r.length();
1219
        this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1230
        this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1220
      }
1231
      }
1221
    } finally {
1232
    } finally {
1222
      this.lock.writeLock().unlock();
1233
      this.lock.writeLock().unlock();
1223
    }
1234
    }
1224
    return result;
1235
    return result;
1225
  }
1236
  }
1226

    
   
1237

   
1227
  public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
1238
  public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
1228
    Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
1239
    Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
1229
    ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
1240
    ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
1230
    return newList;
1241
    return newList;
1231
  }
1242
  }
1232

    
   
1243

   
1233
  // ////////////////////////////////////////////////////////////////////////////
1244
  // ////////////////////////////////////////////////////////////////////////////
1234
  // Accessors.
1245
  // Accessors.
1235
  // (This is the only section that is directly useful!)
1246
  // (This is the only section that is directly useful!)
1236
  //////////////////////////////////////////////////////////////////////////////
1247
  //////////////////////////////////////////////////////////////////////////////
1237
  /**
1248
  /**
1238
   * @return the number of files in this store
1249
   * @return the number of files in this store
1239
   */
1250
   */
1240
  public int getNumberOfstorefiles() {
1251
  public int getNumberOfstorefiles() {
1241
    return this.storefiles.size();
1252
    return this.storefiles.size();
1242
  }
1253
  }
1243

    
   
1254

   
1244
  /*
1255
  /*
1245
   * @param wantedVersions How many versions were asked for.
1256
   * @param wantedVersions How many versions were asked for.
1246
   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1257
   * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1247
   */
1258
   */
1248
  int versionsToReturn(final int wantedVersions) {
1259
  int versionsToReturn(final int wantedVersions) {
1249
    if (wantedVersions <= 0) {
1260
    if (wantedVersions <= 0) {
1250
      throw new IllegalArgumentException("Number of versions must be > 0");
1261
      throw new IllegalArgumentException("Number of versions must be > 0");
1251
    }
1262
    }
1252
    // Make sure we do not return more than maximum versions for this store.
1263
    // Make sure we do not return more than maximum versions for this store.
1253
    int maxVersions = this.family.getMaxVersions();
1264
    int maxVersions = this.family.getMaxVersions();
1254
    return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1265
    return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1255
  }
1266
  }
1256

    
   
1267

   
1257
  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1268
  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1258
    return key.getTimestamp() < oldestTimestamp;
1269
    return key.getTimestamp() < oldestTimestamp;
1259
  }
1270
  }
1260

    
   
1271

   
1261
  /**
1272
  /**
1262
   * Find the key that matches <i>row</i> exactly, or the one that immediately
1273
   * Find the key that matches <i>row</i> exactly, or the one that immediately
1263
   * preceeds it. WARNING: Only use this method on a table where writes occur
1274
   * preceeds it. WARNING: Only use this method on a table where writes occur
1264
   * with strictly increasing timestamps. This method assumes this pattern of
1275
   * with strictly increasing timestamps. This method assumes this pattern of
1265
   * writes in order to make it reasonably performant.  Also our search is
1276
   * writes in order to make it reasonably performant.  Also our search is
1266
   * dependent on the axiom that deletes are for cells that are in the container
1277
   * dependent on the axiom that deletes are for cells that are in the container
1267
   * that follows whether a memstore snapshot or a storefile, not for the
1278
   * that follows whether a memstore snapshot or a storefile, not for the
1268
   * current container: i.e. we'll see deletes before we come across cells we
1279
   * current container: i.e. we'll see deletes before we come across cells we
1269
   * are to delete. Presumption is that the memstore#kvset is processed before
1280
   * are to delete. Presumption is that the memstore#kvset is processed before
1270
   * memstore#snapshot and so on.
1281
   * memstore#snapshot and so on.
1271
   * @param row The row key of the targeted row.
1282
   * @param row The row key of the targeted row.
1272
   * @return Found keyvalue or null if none found.
1283
   * @return Found keyvalue or null if none found.
1273
   * @throws IOException
1284
   * @throws IOException
1274
   */
1285
   */
1275
  KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1286
  KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
1276
    // If minVersions is set, we will not ignore expired KVs.
1287
    // If minVersions is set, we will not ignore expired KVs.
1277
    // As we're only looking for the latest matches, that should be OK.
1288
    // As we're only looking for the latest matches, that should be OK.
1278
    // With minVersions > 0 we guarantee that any KV that has any version
1289
    // With minVersions > 0 we guarantee that any KV that has any version
1279
    // at all (expired or not) has at least one version that will not expire.
1290
    // at all (expired or not) has at least one version that will not expire.
1280
    // Note that this method used to take a KeyValue as arguments. KeyValue
1291
    // Note that this method used to take a KeyValue as arguments. KeyValue
1281
    // can be back-dated, a row key cannot.
1292
    // can be back-dated, a row key cannot.
1282
    long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
1293
    long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
1283

    
   
1294

   
1284
    KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1295
    KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1285

    
   
1296

   
1286
    GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1297
    GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1287
      this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
1298
      this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
1288
    this.lock.readLock().lock();
1299
    this.lock.readLock().lock();
1289
    try {
1300
    try {
1290
      // First go to the memstore.  Pick up deletes and candidates.
1301
      // First go to the memstore.  Pick up deletes and candidates.
1291
      this.memstore.getRowKeyAtOrBefore(state);
1302
      this.memstore.getRowKeyAtOrBefore(state);
1292
      // Check if match, if we got a candidate on the asked for 'kv' row.
1303
      // Check if match, if we got a candidate on the asked for 'kv' row.
1293
      // Process each store file. Run through from newest to oldest.
1304
      // Process each store file. Run through from newest to oldest.
1294
      for (StoreFile sf : Iterables.reverse(storefiles)) {
1305
      for (StoreFile sf : Iterables.reverse(storefiles)) {
1295
        // Update the candidate keys from the current map file
1306
        // Update the candidate keys from the current map file
1296
        rowAtOrBeforeFromStoreFile(sf, state);
1307
        rowAtOrBeforeFromStoreFile(sf, state);
1297
      }
1308
      }
1298
      return state.getCandidate();
1309
      return state.getCandidate();
1299
    } finally {
1310
    } finally {
1300
      this.lock.readLock().unlock();
1311
      this.lock.readLock().unlock();
1301
    }
1312
    }
1302
  }
1313
  }
1303

    
   
1314

   
1304
  /*
1315
  /*
1305
   * Check an individual MapFile for the row at or before a given row.
1316
   * Check an individual MapFile for the row at or before a given row.
1306
   * @param f
1317
   * @param f
1307
   * @param state
1318
   * @param state
1308
   * @throws IOException
1319
   * @throws IOException
1309
   */
1320
   */
1310
  private void rowAtOrBeforeFromStoreFile(final StoreFile f,
1321
  private void rowAtOrBeforeFromStoreFile(final StoreFile f,
1311
                                          final GetClosestRowBeforeTracker state)
1322
                                          final GetClosestRowBeforeTracker state)
1312
      throws IOException {
1323
      throws IOException {
1313
    StoreFile.Reader r = f.getReader();
1324
    StoreFile.Reader r = f.getReader();
1314
    if (r == null) {
1325
    if (r == null) {
1315
      LOG.warn("StoreFile " + f + " has a null Reader");
1326
      LOG.warn("StoreFile " + f + " has a null Reader");
1316
      return;
1327
      return;
1317
    }
1328
    }
1318
    // TODO: Cache these keys rather than make each time?
1329
    // TODO: Cache these keys rather than make each time?
1319
    byte [] fk = r.getFirstKey();
1330
    byte [] fk = r.getFirstKey();
1320
    KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1331
    KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1321
    byte [] lk = r.getLastKey();
1332
    byte [] lk = r.getLastKey();
1322
    KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1333
    KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1323
    KeyValue firstOnRow = state.getTargetKey();
1334
    KeyValue firstOnRow = state.getTargetKey();
1324
    if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1335
    if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1325
      // If last key in file is not of the target table, no candidates in this
1336
      // If last key in file is not of the target table, no candidates in this
1326
      // file.  Return.
1337
      // file.  Return.
1327
      if (!state.isTargetTable(lastKV)) return;
1338
      if (!state.isTargetTable(lastKV)) return;
1328
      // If the row we're looking for is past the end of file, set search key to
1339
      // If the row we're looking for is past the end of file, set search key to
1329
      // last key. TODO: Cache last and first key rather than make each time.
1340
      // last key. TODO: Cache last and first key rather than make each time.
1330
      firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1341
      firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1331
    }
1342
    }
1332
    // Get a scanner that caches blocks and that uses pread.
1343
    // Get a scanner that caches blocks and that uses pread.
1333
    HFileScanner scanner = r.getScanner(true, true);
1344
    HFileScanner scanner = r.getScanner(true, true);
1334
    // Seek scanner.  If can't seek it, return.
1345
    // Seek scanner.  If can't seek it, return.
1335
    if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
1346
    if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
1336
    // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
1347
    // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
1337
    // Unlikely that there'll be an instance of actual first row in table.
1348
    // Unlikely that there'll be an instance of actual first row in table.
1338
    if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
1349
    if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
1339
    // If here, need to start backing up.
1350
    // If here, need to start backing up.
1340
    while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1351
    while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1341
       firstOnRow.getKeyLength())) {
1352
       firstOnRow.getKeyLength())) {
1342
      KeyValue kv = scanner.getKeyValue();
1353
      KeyValue kv = scanner.getKeyValue();
1343
      if (!state.isTargetTable(kv)) break;
1354
      if (!state.isTargetTable(kv)) break;
1344
      if (!state.isBetterCandidate(kv)) break;
1355
      if (!state.isBetterCandidate(kv)) break;
1345
      // Make new first on row.
1356
      // Make new first on row.
1346
      firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1357
      firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1347
      // Seek scanner.  If can't seek it, break.
1358
      // Seek scanner.  If can't seek it, break.
1348
      if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
1359
      if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
1349
      // If we find something, break;
1360
      // If we find something, break;
1350
      if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
1361
      if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
1351
    }
1362
    }
1352
  }
1363
  }
1353

    
   
1364

   
1354
  /*
1365
  /*
1355
   * Seek the file scanner to firstOnRow or first entry in file.
1366
   * Seek the file scanner to firstOnRow or first entry in file.
1356
   * @param scanner
1367
   * @param scanner
1357
   * @param firstOnRow
1368
   * @param firstOnRow
1358
   * @param firstKV
1369
   * @param firstKV
1359
   * @return True if we successfully seeked scanner.
1370
   * @return True if we successfully seeked scanner.
1360
   * @throws IOException
1371
   * @throws IOException
1361
   */
1372
   */
1362
  private boolean seekToScanner(final HFileScanner scanner,
1373
  private boolean seekToScanner(final HFileScanner scanner,
1363
                                final KeyValue firstOnRow,
1374
                                final KeyValue firstOnRow,
1364
                                final KeyValue firstKV)
1375
                                final KeyValue firstKV)
1365
      throws IOException {
1376
      throws IOException {
1366
    KeyValue kv = firstOnRow;
1377
    KeyValue kv = firstOnRow;
1367
    // If firstOnRow < firstKV, set to firstKV
1378
    // If firstOnRow < firstKV, set to firstKV
1368
    if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1379
    if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1369
    int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1380
    int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1370
      kv.getKeyLength());
1381
      kv.getKeyLength());
1371
    return result >= 0;
1382
    return result >= 0;
1372
  }
1383
  }
1373

    
   
1384

   
1374
  /*
1385
  /*
1375
   * When we come in here, we are probably at the kv just before we break into
1386
   * When we come in here, we are probably at the kv just before we break into
1376
   * the row that firstOnRow is on.  Usually need to increment one time to get
1387
   * the row that firstOnRow is on.  Usually need to increment one time to get
1377
   * on to the row we are interested in.
1388
   * on to the row we are interested in.
1378
   * @param scanner
1389
   * @param scanner
1379
   * @param firstOnRow
1390
   * @param firstOnRow
1380
   * @param state
1391
   * @param state
1381
   * @return True we found a candidate.
1392
   * @return True we found a candidate.
1382
   * @throws IOException
1393
   * @throws IOException
1383
   */
1394
   */
1384
  private boolean walkForwardInSingleRow(final HFileScanner scanner,
1395
  private boolean walkForwardInSingleRow(final HFileScanner scanner,
1385
                                         final KeyValue firstOnRow,
1396
                                         final KeyValue firstOnRow,
1386
                                         final GetClosestRowBeforeTracker state)
1397
                                         final GetClosestRowBeforeTracker state)
1387
      throws IOException {
1398
      throws IOException {
1388
    boolean foundCandidate = false;
1399
    boolean foundCandidate = false;
1389
    do {
1400
    do {
1390
      KeyValue kv = scanner.getKeyValue();
1401
      KeyValue kv = scanner.getKeyValue();
1391
      // If we are not in the row, skip.
1402
      // If we are not in the row, skip.
1392
      if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1403
      if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1393
      // Did we go beyond the target row? If so break.
1404
      // Did we go beyond the target row? If so break.
1394
      if (state.isTooFar(kv, firstOnRow)) break;
1405
      if (state.isTooFar(kv, firstOnRow)) break;
1395
      if (state.isExpired(kv)) {
1406
      if (state.isExpired(kv)) {
1396
        continue;
1407
        continue;
1397
      }
1408
      }
1398
      // If we added something, this row is a contender. break.
1409
      // If we added something, this row is a contender. break.
1399
      if (state.handle(kv)) {
1410
      if (state.handle(kv)) {
1400
        foundCandidate = true;
1411
        foundCandidate = true;
1401
        break;
1412
        break;
1402
      }
1413
      }
1403
    } while(scanner.next());
1414
    } while(scanner.next());
1404
    return foundCandidate;
1415
    return foundCandidate;
1405
  }
1416
  }
1406

    
   
1417

   
1407
  /**
1418
  /**
1408
   * Determines if Store should be split
1419
   * Determines if Store should be split
1409
   * @return byte[] if store should be split, null otherwise.
1420
   * @return byte[] if store should be split, null otherwise.
1410
   */
1421
   */
1411
  public byte[] checkSplit() {
1422
  public byte[] checkSplit() {
1412
    this.lock.readLock().lock();
1423
    this.lock.readLock().lock();
1413
    try {
1424
    try {
1414
      boolean force = this.region.shouldForceSplit();
1425
      boolean force = this.region.shouldForceSplit();
1415
      // sanity checks
1426
      // sanity checks
1416
      if (this.storefiles.isEmpty()) {
1427
      if (this.storefiles.isEmpty()) {
1417
        return null;
1428
        return null;
1418
      }
1429
      }
1419
      if (!force && storeSize < this.desiredMaxFileSize) {
1430
      if (!force && storeSize < this.desiredMaxFileSize) {
1420
        return null;
1431
        return null;
1421
      }
1432
      }
1422

    
   
1433

   
1423
      if (this.region.getRegionInfo().isMetaRegion()) {
1434
      if (this.region.getRegionInfo().isMetaRegion()) {
1424
        if (force) {
1435
        if (force) {
1425
          LOG.warn("Cannot split meta regions in HBase 0.20");
1436
          LOG.warn("Cannot split meta regions in HBase 0.20");
1426
        }
1437
        }
1427
        return null;
1438
        return null;
1428
      }
1439
      }
1429

    
   
1440

   
1430
      // Not splitable if we find a reference store file present in the store.
1441
      // Not splitable if we find a reference store file present in the store.
1431
      boolean splitable = true;
1442
      boolean splitable = true;
1432
      long maxSize = 0L;
1443
      long maxSize = 0L;
1433
      StoreFile largestSf = null;
1444
      StoreFile largestSf = null;
1434
      for (StoreFile sf : storefiles) {
1445
      for (StoreFile sf : storefiles) {
1435
        if (splitable) {
1446
        if (splitable) {
1436
          splitable = !sf.isReference();
1447
          splitable = !sf.isReference();
1437
          if (!splitable) {
1448
          if (!splitable) {
1438
            // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
1449
            // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
1439
            if (LOG.isDebugEnabled()) {
1450
            if (LOG.isDebugEnabled()) {
1440
              LOG.debug(sf +  " is not splittable");
1451
              LOG.debug(sf +  " is not splittable");
1441
            }
1452
            }
1442
            return null;
1453
            return null;
1443
          }
1454
          }
1444
        }
1455
        }
1445
        StoreFile.Reader r = sf.getReader();
1456
        StoreFile.Reader r = sf.getReader();
1446
        if (r == null) {
1457
        if (r == null) {
1447
          LOG.warn("Storefile " + sf + " Reader is null");
1458
          LOG.warn("Storefile " + sf + " Reader is null");
1448
          continue;
1459
          continue;
1449
        }
1460
        }
1450
        long size = r.length();
1461
        long size = r.length();
1451
        if (size > maxSize) {
1462
        if (size > maxSize) {
1452
          // This is the largest one so far
1463
          // This is the largest one so far
1453
          maxSize = size;
1464
          maxSize = size;
1454
          largestSf = sf;
1465
          largestSf = sf;
1455
        }
1466
        }
1456
      }
1467
      }
1457
      // if the user explicit set a split point, use that
1468
      // if the user explicit set a split point, use that
1458
      if (this.region.getSplitPoint() != null) {
1469
      if (this.region.getSplitPoint() != null) {
1459
        return this.region.getSplitPoint();
1470
        return this.region.getSplitPoint();
1460
      }
1471
      }
1461
      StoreFile.Reader r = largestSf.getReader();
1472
      StoreFile.Reader r = largestSf.getReader();
1462
      if (r == null) {
1473
      if (r == null) {
1463
        LOG.warn("Storefile " + largestSf + " Reader is null");
1474
        LOG.warn("Storefile " + largestSf + " Reader is null");
1464
        return null;
1475
        return null;
1465
      }
1476
      }
1466
      // Get first, last, and mid keys.  Midkey is the key that starts block
1477
      // Get first, last, and mid keys.  Midkey is the key that starts block
1467
      // in middle of hfile.  Has column and timestamp.  Need to return just
1478
      // in middle of hfile.  Has column and timestamp.  Need to return just
1468
      // the row we want to split on as midkey.
1479
      // the row we want to split on as midkey.
1469
      byte [] midkey = r.midkey();
1480
      byte [] midkey = r.midkey();
1470
      if (midkey != null) {
1481
      if (midkey != null) {
1471
        KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
1482
        KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
1472
        byte [] fk = r.getFirstKey();
1483
        byte [] fk = r.getFirstKey();
1473
        KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1484
        KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1474
        byte [] lk = r.getLastKey();
1485
        byte [] lk = r.getLastKey();
1475
        KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1486
        KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1476
        // if the midkey is the same as the first and last keys, then we cannot
1487
        // if the midkey is the same as the first and last keys, then we cannot
1477
        // (ever) split this region.
1488
        // (ever) split this region.
1478
        if (this.comparator.compareRows(mk, firstKey) == 0 &&
1489
        if (this.comparator.compareRows(mk, firstKey) == 0 &&
1479
            this.comparator.compareRows(mk, lastKey) == 0) {
1490
            this.comparator.compareRows(mk, lastKey) == 0) {
1480
          if (LOG.isDebugEnabled()) {
1491
          if (LOG.isDebugEnabled()) {
1481
            LOG.debug("cannot split because midkey is the same as first or " +
1492
            LOG.debug("cannot split because midkey is the same as first or " +
1482
              "last row");
1493
              "last row");
1483
          }
1494
          }
1484
          return null;
1495
          return null;
1485
        }
1496
        }
1486
        return mk.getRow();
1497
        return mk.getRow();
1487
      }
1498
      }
1488
    } catch(IOException e) {
1499
    } catch(IOException e) {
1489
      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
1500
      LOG.warn("Failed getting store size for " + this.storeNameStr, e);
1490
    } finally {
1501
    } finally {
1491
      this.lock.readLock().unlock();
1502
      this.lock.readLock().unlock();
1492
    }
1503
    }
1493
    return null;
1504
    return null;
1494
  }
1505
  }
1495

    
   
1506

   
1496
  /** @return aggregate size of all HStores used in the last compaction */
1507
  /** @return aggregate size of all HStores used in the last compaction */
1497
  public long getLastCompactSize() {
1508
  public long getLastCompactSize() {
1498
    return this.lastCompactSize;
1509
    return this.lastCompactSize;
1499
  }
1510
  }
1500

    
   
1511

   
1501
  /** @return aggregate size of HStore */
1512
  /** @return aggregate size of HStore */
1502
  public long getSize() {
1513
  public long getSize() {
1503
    return storeSize;
1514
    return storeSize;
1504
  }
1515
  }
1505

    
   
1516

   
1506
  void triggerMajorCompaction() {
1517
  void triggerMajorCompaction() {
1507
    this.forceMajor = true;
1518
    this.forceMajor = true;
1508
  }
1519
  }
1509

    
   
1520

   
1510
  boolean getForceMajorCompaction() {
1521
  boolean getForceMajorCompaction() {
1511
    return this.forceMajor;
1522
    return this.forceMajor;
1512
  }
1523
  }
1513

    
   
1524

   
1514
  //////////////////////////////////////////////////////////////////////////////
1525
  //////////////////////////////////////////////////////////////////////////////
1515
  // File administration
1526
  // File administration
1516
  //////////////////////////////////////////////////////////////////////////////
1527
  //////////////////////////////////////////////////////////////////////////////
1517

    
   
1528

   
1518
  /**
1529
  /**
1519
   * Return a scanner for both the memstore and the HStore files
1530
   * Return a scanner for both the memstore and the HStore files
1520
   * @throws IOException
1531
   * @throws IOException
1521
   */
1532
   */
1522
  public KeyValueScanner getScanner(Scan scan,
1533
  public KeyValueScanner getScanner(Scan scan,
1523
      final NavigableSet<byte []> targetCols) throws IOException {
1534
      final NavigableSet<byte []> targetCols) throws IOException {
1524
    lock.readLock().lock();
1535
    lock.readLock().lock();
1525
    try {
1536
    try {
1526
      return new StoreScanner(this, scan, targetCols);
1537
      return new StoreScanner(this, scan, targetCols);
1527
    } finally {
1538
    } finally {
1528
      lock.readLock().unlock();
1539
      lock.readLock().unlock();
1529
    }
1540
    }
1530
  }
1541
  }
1531

    
   
1542

   
1532
  @Override
1543
  @Override
1533
  public String toString() {
1544
  public String toString() {
1534
    return this.storeNameStr;
1545
    return this.storeNameStr;
1535
  }
1546
  }
1536

    
   
1547

   
1537
  /**
1548
  /**
1538
   * @return Count of store files
1549
   * @return Count of store files
1539
   */
1550
   */
1540
  int getStorefilesCount() {
1551
  int getStorefilesCount() {
1541
    return this.storefiles.size();
1552
    return this.storefiles.size();
1542
  }
1553
  }
1543

    
   
1554

   
1544
  /**
1555
  /**
1545
   * @return The size of the store files, in bytes, uncompressed.
1556
   * @return The size of the store files, in bytes, uncompressed.
1546
   */
1557
   */
1547
  long getStoreSizeUncompressed() {
1558
  long getStoreSizeUncompressed() {
1548
    return this.totalUncompressedBytes;
1559
    return this.totalUncompressedBytes;
1549
  }
1560
  }
1550

    
   
1561

   
1551
  /**
1562
  /**
1552
   * @return The size of the store files, in bytes.
1563
   * @return The size of the store files, in bytes.
1553
   */
1564
   */
1554
  long getStorefilesSize() {
1565
  long getStorefilesSize() {
1555
    long size = 0;
1566
    long size = 0;
1556
    for (StoreFile s: storefiles) {
1567
    for (StoreFile s: storefiles) {
1557
      StoreFile.Reader r = s.getReader();
1568
      StoreFile.Reader r = s.getReader();
1558
      if (r == null) {
1569
      if (r == null) {
1559
        LOG.warn("StoreFile " + s + " has a null Reader");
1570
        LOG.warn("StoreFile " + s + " has a null Reader");
1560
        continue;
1571
        continue;
1561
      }
1572
      }
1562
      size += r.length();
1573
      size += r.length();
1563
    }
1574
    }
1564
    return size;
1575
    return size;
1565
  }
1576
  }
1566

    
   
1577

   
1567
  /**
1578
  /**
1568
   * @return The size of the store file indexes, in bytes.
1579
   * @return The size of the store file indexes, in bytes.
1569
   */
1580
   */
1570
  long getStorefilesIndexSize() {
1581
  long getStorefilesIndexSize() {
1571
    long size = 0;
1582
    long size = 0;
1572
    for (StoreFile s: storefiles) {
1583
    for (StoreFile s: storefiles) {
1573
      StoreFile.Reader r = s.getReader();
1584
      StoreFile.Reader r = s.getReader();
1574
      if (r == null) {
1585
      if (r == null) {
1575
        LOG.warn("StoreFile " + s + " has a null Reader");
1586
        LOG.warn("StoreFile " + s + " has a null Reader");
1576
        continue;
1587
        continue;
1577
      }
1588
      }
1578
      size += r.indexSize();
1589
      size += r.indexSize();
1579
    }
1590
    }
1580
    return size;
1591
    return size;
1581
  }
1592
  }
1582

    
   
1593

   
1583
  /**
1594
  /**
1584
   * Returns the total size of all index blocks in the data block indexes,
1595
   * Returns the total size of all index blocks in the data block indexes,
1585
   * including the root level, intermediate levels, and the leaf level for
1596
   * including the root level, intermediate levels, and the leaf level for
1586
   * multi-level indexes, or just the root level for single-level indexes.
1597
   * multi-level indexes, or just the root level for single-level indexes.
1587
   *
1598
   *
1588
   * @return the total size of block indexes in the store
1599
   * @return the total size of block indexes in the store
1589
   */
1600
   */
1590
  long getTotalStaticIndexSize() {
1601
  long getTotalStaticIndexSize() {
1591
    long size = 0;
1602
    long size = 0;
1592
    for (StoreFile s : storefiles) {
1603
    for (StoreFile s : storefiles) {
1593
      size += s.getReader().getUncompressedDataIndexSize();
1604
      size += s.getReader().getUncompressedDataIndexSize();
1594
    }
1605
    }
1595
    return size;
1606
    return size;
1596
  }
1607
  }
1597

    
   
1608

   
1598
  /**
1609
  /**
1599
   * Returns the total byte size of all Bloom filter bit arrays. For compound
1610
   * Returns the total byte size of all Bloom filter bit arrays. For compound
1600
   * Bloom filters even the Bloom blocks currently not loaded into the block
1611
   * Bloom filters even the Bloom blocks currently not loaded into the block
1601
   * cache are counted.
1612
   * cache are counted.
1602
   *
1613
   *
1603
   * @return the total size of all Bloom filters in the store
1614
   * @return the total size of all Bloom filters in the store
1604
   */
1615
   */
1605
  long getTotalStaticBloomSize() {
1616
  long getTotalStaticBloomSize() {
1606
    long size = 0;
1617
    long size = 0;
1607
    for (StoreFile s : storefiles) {
1618
    for (StoreFile s : storefiles) {
1608
      StoreFile.Reader r = s.getReader();
1619
      StoreFile.Reader r = s.getReader();
1609
      size += r.getTotalBloomSize();
1620
      size += r.getTotalBloomSize();
1610
    }
1621
    }
1611
    return size;
1622
    return size;
1612
  }
1623
  }
1613

    
   
1624

   
1614
  /**
1625
  /**
1615
   * @return The priority that this store should have in the compaction queue
1626
   * @return The priority that this store should have in the compaction queue
1616
   */
1627
   */
1617
  public int getCompactPriority() {
1628
  public int getCompactPriority() {
1618
    return this.blockingStoreFileCount - this.storefiles.size();
1629
    return this.blockingStoreFileCount - this.storefiles.size();
1619
  }
1630
  }
1620

    
   
1631

   
1621
  HRegion getHRegion() {
1632
  HRegion getHRegion() {
1622
    return this.region;
1633
    return this.region;
1623
  }
1634
  }
1624

    
   
1635

   
1625
  HRegionInfo getHRegionInfo() {
1636
  HRegionInfo getHRegionInfo() {
1626
    return this.region.regionInfo;
1637
    return this.region.regionInfo;
1627
  }
1638
  }
1628

    
   
1639

   
1629
  /**
1640
  /**
1630
   * Increments the value for the given row/family/qualifier.
1641
   * Increments the value for the given row/family/qualifier.
1631
   *
1642
   *
1632
   * This function will always be seen as atomic by other readers
1643
   * This function will always be seen as atomic by other readers
1633
   * because it only puts a single KV to memstore. Thus no
1644
   * because it only puts a single KV to memstore. Thus no
1634
   * read/write control necessary.
1645
   * read/write control necessary.
1635
   *
1646
   *
1636
   * @param row
1647
   * @param row
1637
   * @param f
1648
   * @param f
1638
   * @param qualifier
1649
   * @param qualifier
1639
   * @param newValue the new value to set into memstore
1650
   * @param newValue the new value to set into memstore
1640
   * @return memstore size delta
1651
   * @return memstore size delta
1641
   * @throws IOException
1652
   * @throws IOException
1642
   */
1653
   */
1643
  public long updateColumnValue(byte [] row, byte [] f,
1654
  public long updateColumnValue(byte [] row, byte [] f,
1644
                                byte [] qualifier, long newValue)
1655
                                byte [] qualifier, long newValue)
1645
      throws IOException {
1656
      throws IOException {
1646

    
   
1657

   
1647
    this.lock.readLock().lock();
1658
    this.lock.readLock().lock();
1648
    try {
1659
    try {
1649
      long now = EnvironmentEdgeManager.currentTimeMillis();
1660
      long now = EnvironmentEdgeManager.currentTimeMillis();
1650

    
   
1661

   
1651
      return this.memstore.updateColumnValue(row,
1662
      return this.memstore.updateColumnValue(row,
1652
          f,
1663
          f,
1653
          qualifier,
1664
          qualifier,
1654
          newValue,
1665
          newValue,
1655
          now);
1666
          now);
1656

    
   
1667

   
1657
    } finally {
1668
    } finally {
1658
      this.lock.readLock().unlock();
1669
      this.lock.readLock().unlock();
1659
    }
1670
    }
1660
  }
1671
  }
1661

    
   
1672

   
1662
  /**
1673
  /**
1663
   * Adds or replaces the specified KeyValues.
1674
   * Adds or replaces the specified KeyValues.
1664
   * <p>
1675
   * <p>
1665
   * For each KeyValue specified, if a cell with the same row, family, and
1676
   * For each KeyValue specified, if a cell with the same row, family, and
1666
   * qualifier exists in MemStore, it will be replaced.  Otherwise, it will just
1677
   * qualifier exists in MemStore, it will be replaced.  Otherwise, it will just
1667
   * be inserted to MemStore.
1678
   * be inserted to MemStore.
1668
   * <p>
1679
   * <p>
1669
   * This operation is atomic on each KeyValue (row/family/qualifier) but not
1680
   * This operation is atomic on each KeyValue (row/family/qualifier) but not
1670
   * necessarily atomic across all of them.
1681
   * necessarily atomic across all of them.
1671
   * @param kvs
1682
   * @param kvs
1672
   * @return memstore size delta
1683
   * @return memstore size delta
1673
   * @throws IOException
1684
   * @throws IOException
1674
   */
1685
   */
1675
  public long upsert(List<KeyValue> kvs)
1686
  public long upsert(List<KeyValue> kvs)
1676
      throws IOException {
1687
      throws IOException {
1677
    this.lock.readLock().lock();
1688
    this.lock.readLock().lock();
1678
    try {
1689
    try {
1679
      // TODO: Make this operation atomic w/ RWCC
1690
      // TODO: Make this operation atomic w/ RWCC
1680
      return this.memstore.upsert(kvs);
1691
      return this.memstore.upsert(kvs);
1681
    } finally {
1692
    } finally {
1682
      this.lock.readLock().unlock();
1693
      this.lock.readLock().unlock();
1683
    }
1694
    }
1684
  }
1695
  }
1685

    
   
1696

   
1686
  public StoreFlusher getStoreFlusher(long cacheFlushId) {
1697
  public StoreFlusher getStoreFlusher(long cacheFlushId) {
1687
    return new StoreFlusherImpl(cacheFlushId);
1698
    return new StoreFlusherImpl(cacheFlushId);
1688
  }
1699
  }
1689

    
   
1700

   
1690
  private class StoreFlusherImpl implements StoreFlusher {
1701
  private class StoreFlusherImpl implements StoreFlusher {
1691

    
   
1702

   
1692
    private long cacheFlushId;
1703
    private long cacheFlushId;
1693
    private SortedSet<KeyValue> snapshot;
1704
    private SortedSet<KeyValue> snapshot;
1694
    private StoreFile storeFile;
1705
    private StoreFile storeFile;
1695
    private TimeRangeTracker snapshotTimeRangeTracker;
1706
    private TimeRangeTracker snapshotTimeRangeTracker;
1696

    
   
1707

   
1697
    private StoreFlusherImpl(long cacheFlushId) {
1708
    private StoreFlusherImpl(long cacheFlushId) {
1698
      this.cacheFlushId = cacheFlushId;
1709
      this.cacheFlushId = cacheFlushId;
1699
    }
1710
    }
1700

    
   
1711

   
1701
    @Override
1712
    @Override
1702
    public void prepare() {
1713
    public void prepare() {
1703
      memstore.snapshot();
1714
      memstore.snapshot();
1704
      this.snapshot = memstore.getSnapshot();
1715
      this.snapshot = memstore.getSnapshot();
1705
      this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1716
      this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1706
    }
1717
    }
1707

    
   
1718

   
1708
    @Override
1719
    @Override
1709
    public void flushCache(MonitoredTask status) throws IOException {
1720
    public void flushCache(MonitoredTask status) throws IOException {
1710
      storeFile = Store.this.flushCache(
1721
      storeFile = Store.this.flushCache(
1711
          cacheFlushId, snapshot, snapshotTimeRangeTracker, status);
1722
          cacheFlushId, snapshot, snapshotTimeRangeTracker, status);
1712
    }
1723
    }
1713

    
   
1724

   
1714
    @Override
1725
    @Override
1715
    public boolean commit() throws IOException {
1726
    public boolean commit() throws IOException {
1716
      if (storeFile == null) {
1727
      if (storeFile == null) {
1717
        return false;
1728
        return false;
1718
      }
1729
      }
1719
      // Add new file to store files.  Clear snapshot too while we have
1730
      // Add new file to store files.  Clear snapshot too while we have
1720
      // the Store write lock.
1731
      // the Store write lock.
1721
      return Store.this.updateStorefiles(storeFile, snapshot);
1732
      return Store.this.updateStorefiles(storeFile, snapshot);
1722
    }
1733
    }
1723
  }
1734
  }
1724

    
   
1735

   
1725
  /**
1736
  /**
1726
   * See if there's too much store files in this store
1737
   * See if there's too much store files in this store
1727
   * @return true if number of store files is greater than
1738
   * @return true if number of store files is greater than
1728
   *  the number defined in minFilesToCompact
1739
   *  the number defined in minFilesToCompact
1729
   */
1740
   */
1730
  public boolean needsCompaction() {
1741
  public boolean needsCompaction() {
1731
    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
1742
    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
1732
  }
1743
  }
1733

    
   
1744

   
1734
  public static final long FIXED_OVERHEAD = ClassSize.align(
1745
  public static final long FIXED_OVERHEAD = ClassSize.align(
1735
      ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
1746
      ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
1736
      (8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
1747
      (8 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
1737
      (5 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
1748
      (6 * Bytes.SIZEOF_INT) + (3 * Bytes.SIZEOF_BOOLEAN));
1738

    
   
1749

   
1739
  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1750
  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1740
      ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
1751
      ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
1741
      ClassSize.CONCURRENT_SKIPLISTMAP +
1752
      ClassSize.CONCURRENT_SKIPLISTMAP +
1742
      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
1753
      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
1743

    
   
1754

   
1744
  @Override
1755
  @Override
1745
  public long heapSize() {
1756
  public long heapSize() {
1746
    return DEEP_OVERHEAD + this.memstore.heapSize();
1757
    return DEEP_OVERHEAD + this.memstore.heapSize();
1747
  }
1758
  }
1748
}
1759
}
http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java
New File
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
Revision 1161347 New Change
 
http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Revision 1161347 New Change
 
  1. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java: Loading...
  2. http://svn.apache.org/repos/asf/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/CollectionBackedScanner.java: Loading...
  3. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java: Loading...
  4. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java: Loading...
  5. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java: Loading...
  6. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java: Loading...
  7. http://svn.apache.org/repos/asf/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java: Loading...