Review Board 1.7.22


FLUME-1828: ResettableInputStream should support seek()

Review Request #8926 - Created Jan. 11, 2013 and submitted

Mike Percy
FLUME-1828
Reviewers
Flume
flume-git
ResettableInputStream should support seek().

This patch makes ResettableInputStream an abstract class instead of an interface, and adds a Seekable interface which is implemented by ResettableInputStream and its subclasses.
Added unit test for seek(). Unit tests pass.
flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
Revision f9e4ec9 New Change
[20] 41 lines
[+20]
42
 * <p/>The ability to {@link #reset()} is dependent on the underlying {@link
42
 * <p/>The ability to {@link #reset()} is dependent on the underlying {@link
43
 * PositionTracker} instance's durability semantics.
43
 * PositionTracker} instance's durability semantics.
44
 */
44
 */
45
@InterfaceAudience.Private
45
@InterfaceAudience.Private
46
@InterfaceStability.Evolving
46
@InterfaceStability.Evolving
47
public class ResettableFileInputStream implements ResettableInputStream {
47
public class ResettableFileInputStream extends ResettableInputStream {
48

    
   
48

   
49
  public static final int DEFAULT_BUF_SIZE = 16384;
49
  public static final int DEFAULT_BUF_SIZE = 16384;
50

    
   
50

   
51
  private final File file;
51
  private final File file;
52
  private final PositionTracker tracker;
52
  private final PositionTracker tracker;
[+20] [20] 128 lines
[+20] [+] public synchronized int readChar() throws IOException {
181

    
   
181

   
182
  }
182
  }
183

    
   
183

   
184
  private void refillBuf() throws IOException {
184
  private void refillBuf() throws IOException {
185
    buf.compact();
185
    buf.compact();

    
   
186
    chan.position(position); // ensure we read from the proper offset
186
    chan.read(buf);
187
    chan.read(buf);
187
    buf.flip();
188
    buf.flip();
188
  }
189
  }
189

    
   
190

   
190
  @Override
191
  @Override
[+20] [20] 4 lines
[+20] [+] public void mark() throws IOException {
195
  @Override
196
  @Override
196
  public void reset() throws IOException {
197
  public void reset() throws IOException {
197
    seek(tracker.getPosition());
198
    seek(tracker.getPosition());
198
  }
199
  }
199

    
   
200

   
200
  private long tell() throws IOException {
201
  @Override

    
   
202
  public long tell() throws IOException {
201
    return syncPosition;
203
    return syncPosition;
202
  }
204
  }
203

    
   
205

   
204
  private synchronized void seek(long position) throws IOException {
206
  @Override
205
    // perform underlying file seek
207
  public synchronized void seek(long newPos) throws IOException {
206
    chan.position(position);

   
207

    
   
208

   
208
    // invalidate cache
209
    // check to see if we can seek within our existing buffer

    
   
210
    long relativeChange = newPos - position;

    
   
211
    if (relativeChange == 0) return; // seek to current pos => no-op

    
   
212

   

    
   
213
    long newBufPos = buf.position() + relativeChange;

    
   
214
    if (newBufPos >= 0 && newBufPos < buf.limit()) {

    
   
215
      // we can reuse the read buffer

    
   
216
      buf.position((int)newBufPos);

    
   
217
    } else {

    
   
218
      // otherwise, we have to invalidate the read buffer
209
    buf.clear();
219
      buf.clear();
210
    buf.flip();
220
      buf.flip();

    
   
221
    }
211

    
   
222

   
212
    // clear decoder state
223
    // clear decoder state
213
    decoder.reset();
224
    decoder.reset();
214

    
   
225

   

    
   
226
    // perform underlying file seek

    
   
227
    chan.position(newPos);

    
   
228

   
215
    // reset position pointers
229
    // reset position pointers
216
    this.position = this.syncPosition = position;
230
    position = syncPosition = newPos;
217
  }
231
  }
218

    
   
232

   
219
  private void incrPosition(int incr, boolean updateSyncPosition) {
233
  private void incrPosition(int incr, boolean updateSyncPosition) {
220
    this.position += incr;
234
    position += incr;
221
    if (updateSyncPosition) {
235
    if (updateSyncPosition) {
222
      syncPosition = position;
236
      syncPosition = position;
223
    }
237
    }
224
  }
238
  }
225

    
   
239

   
226
  @Override
240
  @Override
227
  public void close() throws IOException {
241
  public void close() throws IOException {
228
    tracker.close();
242
    tracker.close();
229
    in.close();
243
    in.close();
230
  }
244
  }
231

    
   
245

   
232
}
246
}
flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java
Revision ae989a6 New Change
 
flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java
New File
 
flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java
Revision ef8b7ba New Change
 
flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
Revision 73e2baa New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java: Loading...
  5. flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java: Loading...