Review Board 1.7.22


HDFEventSink throws NPE if event is generated without timestamp in header

Review Request #7896 - Created Nov. 6, 2012 and updated

Alexander Alten-Lorenz
FLUME-1264
Reviewers
Flume
flume-git
HDFEventSink throws NPE if event is generated without timestamp in header. The fix uses timestamp interceptor.
Tests passed against Flume NG Node 1.4.0-SNAPSHOT

Diff revision 2 (Latest)

1 2
1 2

  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java: Loading...
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Revision e369604 New Change
[20] 41 lines
[+20]
42
import org.apache.flume.Event;
42
import org.apache.flume.Event;
43
import org.apache.flume.EventDeliveryException;
43
import org.apache.flume.EventDeliveryException;
44
import org.apache.flume.Transaction;
44
import org.apache.flume.Transaction;
45
import org.apache.flume.conf.Configurable;
45
import org.apache.flume.conf.Configurable;
46
import org.apache.flume.formatter.output.BucketPath;
46
import org.apache.flume.formatter.output.BucketPath;

    
   
47
import org.apache.flume.interceptor.Interceptor;

    
   
48
import org.apache.flume.interceptor.InterceptorBuilderFactory;

    
   
49
import org.apache.flume.interceptor.InterceptorType;
47
import org.apache.flume.instrumentation.SinkCounter;
50
import org.apache.flume.instrumentation.SinkCounter;
48
import org.apache.flume.sink.AbstractSink;
51
import org.apache.flume.sink.AbstractSink;
49
import org.apache.flume.sink.FlumeFormatter;
52
import org.apache.flume.sink.FlumeFormatter;
50
import org.apache.hadoop.conf.Configuration;
53
import org.apache.hadoop.conf.Configuration;
51
import org.apache.hadoop.io.SequenceFile.CompressionType;
54
import org.apache.hadoop.io.SequenceFile.CompressionType;
[+20] [20] 34 lines
[+20] [+] public class HDFSEventSink extends AbstractSink implements Configurable {
86
   * the case that they take too long. In which
89
   * the case that they take too long. In which
87
   * case we create a new file and move on.
90
   * case we create a new file and move on.
88
   */
91
   */
89
  private static final int defaultThreadPoolSize = 10;
92
  private static final int defaultThreadPoolSize = 10;
90
  private static final int defaultRollTimerPoolSize = 1;
93
  private static final int defaultRollTimerPoolSize = 1;

    
   
94
  private static final Interceptor tsInterceptor =

    
   
95
         createTimestampInterceptor();
91

    
   
96

   
92
  /**
97
  /**
93
   * Singleton credential manager that manages static credentials for the
98
   * Singleton credential manager that manages static credentials for the
94
   * entire JVM
99
   * entire JVM
95
   */
100
   */
[+20] [20] 29 lines
[+20] public class HDFSEventSink extends AbstractSink implements Configurable {
125
  private int roundUnit = Calendar.SECOND;
130
  private int roundUnit = Calendar.SECOND;
126
  private int roundValue = 1;
131
  private int roundValue = 1;
127

    
   
132

   
128
  private long callTimeout;
133
  private long callTimeout;
129
  private Context context;
134
  private Context context;

    
   
135

   

    
   
136
  /**

    
   
137
   * Creates an instance of a timestamp interceptor

    
   
138
   * @return TimestampInterceptor

    
   
139
   */

    
   
140
  private static Interceptor createTimestampInterceptor() {

    
   
141
         Interceptor interceptor = null;

    
   
142
         try {

    
   
143
                 interceptor = InterceptorBuilderFactory.newInstance(

    
   
144
                         InterceptorType.TIMESTAMP.toString()).build();

    
   
145
         } catch (final Exception e) {

    
   
146
         } finally {

    
   
147
                 return interceptor;

    
   
148
         }

    
   
149
  }

    
   
150

   
130
  private SinkCounter sinkCounter;
151
  private SinkCounter sinkCounter;
131

    
   
152

   
132
  /*
153
  /*
133
   * Extended Java LinkedHashMap for open file handle LRU queue.
154
   * Extended Java LinkedHashMap for open file handle LRU queue.
134
   * We want to clear the oldest file handle if there are too many open ones.
155
   * We want to clear the oldest file handle if there are too many open ones.
[+20] [20] 248 lines
[+20] [+] public Status process() throws EventDeliveryException {
383
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
404
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
384
        Event event = channel.take();
405
        Event event = channel.take();
385
        if (event == null) {
406
        if (event == null) {
386
          break;
407
          break;
387
        }
408
        }
388

    
   
409
        // if timestamp header value is null, intercept an event and default to

    
   
410
        // system timestamp

    
   
411
        if (event.getHeaders().get("timestamp") == null) {

    
   
412
               event = tsInterceptor.intercept(event);

    
   
413
        }
389
        // reconstruct the path name by substituting place holders
414
        // reconstruct the path name by substituting place holders
390
        String realPath = BucketPath.escapeString(path, event.getHeaders(),
415
        String realPath = BucketPath.escapeString(path, event.getHeaders(),
391
            timeZone, needRounding, roundUnit, roundValue);
416
            timeZone, needRounding, roundUnit, roundValue);
392
        BucketWriter bucketWriter = sfWriters.get(realPath);
417
        BucketWriter bucketWriter = sfWriters.get(realPath);
393

    
   
418

   
[+20] [20] 352 lines
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java: Loading...