Review Board 1.7.22


Low throughput of FileChannel

Review Request #6329 - Created Aug. 3, 2012 and updated

Denny Ye
FLUME-1423
Reviewers
Flume
hshreedharan, pwendell
Flume
Here is the description in code changes
1. Remove the 'FileChannel.force(false)'. Each commit from Source will invoke this 'force' method. This method is too heavy for amounts of data comes. Each 'force' action will be consume 50-500ms that it confirms data stored into disk. Normally, OS will flush data from kernal buffer to disk asynchronously with ms level latency. It may useless in each commit operation. Certainly, data loss may occurs in server crash not process crash. Server crash is infrequent.
2. Do not pre-allocate disk space. Disk doesn't need the pre-allocation.
3. Use 'RandomAccessFile.write()' to replace 'FileChannel.write()'. Both in my test result and low-level instruction, the former is better than the latter

Here I posted three changes, and I would like to use thread-level cached DirectByteBuffer to replace inner-heap ByteBuffer.allocate() (reuse outer-heap memory to reduce time that copying from heap to kernal). I will test this changes in next phase.

After tuning, throughput increasing from 5MB to 30MB

 
trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
Revision 1363210 New Change
[20] 183 lines
[+20] [+] static class Writer {
184
    int getFileID() {
184
    int getFileID() {
185
      return fileID;
185
      return fileID;
186
    }
186
    }
187
    private void sync() throws IOException {
187
    private void sync() throws IOException {
188
      Preconditions.checkState(open, "File closed");
188
      Preconditions.checkState(open, "File closed");
189
      writeFileChannel.force(false);

   
190
    }
189
    }

    
   
190

   
191
    private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
191
    private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException {
192
      Preconditions.checkState(open, "File closed");
192
      Preconditions.checkState(open, "File closed");
193
      long length = length();
193
      long length = length();
194
      long expectedLength = length + (long) buffer.capacity();
194
      long expectedLength = length + (long) buffer.capacity();
195
      Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE);
195
      Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE);
196
      int offset = (int)length;
196
      int offset = (int)length;
197
      Preconditions.checkState(offset > 0);
197
      Preconditions.checkState(offset > 0);
198
      int recordLength = 1 + buffer.capacity();
198
      ByteBuffer toWrite = ByteBuffer.allocate(buffer.capacity() + 1);
199
      preallocate(recordLength);

   
200
      ByteBuffer toWrite = ByteBuffer.allocate(recordLength);

   
201
      toWrite.put(OP_RECORD);
199
      toWrite.put(OP_RECORD);
202
      toWrite.put(buffer);
200
      toWrite.put(buffer);
203
      toWrite.position(0);
201
      toWrite.flip();
204
      int wrote = writeFileChannel.write(toWrite);
202

   
205
      Preconditions.checkState(wrote == toWrite.limit());
203
      writeFileHandle.write(toWrite.array(), 0, toWrite.limit());
206
      return Pair.of(fileID, offset);
204
      return Pair.of(fileID, offset);
207
    }
205
    }
208
    private void preallocate(int size) throws IOException {

   
209
      long position = writeFileChannel.position();

   
210
      if(position + size > writeFileChannel.size()) {

   
211
        LOG.debug("Preallocating at position " + position);

   
212
        synchronized (FILL) {

   
213
          FILL.position(0);

   
214
          writeFileChannel.write(FILL, position);

   
215
        }

   
216
      }

   
217
    }

   
218

    
   
206

   
219
  }
207
  }
220

    
   
208

   
221
  static class RandomReader {
209
  static class RandomReader {
222
    private final File file;
210
    private final File file;
[+20] [20] 176 lines
  1. trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java: Loading...