Review Board 1.7.22


FLUME-1814: Problem with the default Locale in RegexExtractorInterceptorMillisSerializer

Review Request #8783 - Created Dec. 31, 2012 and updated

St├ęphane Moreau
1.3.0
FLUME-1814
Reviewers
Flume
flume-git
It is not possible in the version 1.3.0 of Flume to parse UK or US date from a French computer using the interceptor RegexExtractorInterceptorMillisSerializer.

Indeed, the DateTimeFormatter created in the interceptor is currently using the default Locale which is FR on my computer. When I try to parse some files I got from US, I got the following exception:
2012-12-31 17:09:13,370 (pool-5-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:148)] Uncaught exception in Runnable
java.lang.IllegalArgumentException: Invalid format: "29/Dec/2012:05:09:34 -0700" is malformed at "Dec/2012:05:09:34 -0700"
        at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:866)
        at org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer.serialize(RegexExtractorInterceptorMillisSerializer.java:48)
        at org.apache.flume.interceptor.RegexExtractorInterceptor.intercept(RegexExtractorInterceptor.java:147)
        at org.apache.flume.interceptor.RegexExtractorInterceptor.intercept(RegexExtractorInterceptor.java:158)
        at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:146)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:143)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)

The solution I propose is to add a new property called "language" to the interceptor which will allow us to override the default Locale.
I added two JUnit tests and I fully tested the new property.
flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java
Revision 83bf0c9 New Change
[20] 23 lines
[+20]
24
import org.joda.time.format.DateTimeFormat;
24
import org.joda.time.format.DateTimeFormat;
25
import org.joda.time.format.DateTimeFormatter;
25
import org.joda.time.format.DateTimeFormatter;
26

    
   
26

   
27
import com.google.common.base.Preconditions;
27
import com.google.common.base.Preconditions;
28

    
   
28

   

    
   
29
import java.util.Locale;

    
   
30

   
29
/**
31
/**
30
 * Serializer that converts the passed in value into milliseconds using the
32
 * Serializer that converts the passed in value into milliseconds using the
31
 * specified formatting pattern
33
 * specified formatting pattern
32
 */
34
 */
33
public class RegexExtractorInterceptorMillisSerializer implements
35
public class RegexExtractorInterceptorMillisSerializer implements
[+20] [20] 5 lines
[+20]
39
  public void configure(Context context) {
41
  public void configure(Context context) {
40
    String pattern = context.getString("pattern");
42
    String pattern = context.getString("pattern");
41
    Preconditions.checkArgument(!StringUtils.isEmpty(pattern),
43
    Preconditions.checkArgument(!StringUtils.isEmpty(pattern),
42
        "Must configure with a valid pattern");
44
        "Must configure with a valid pattern");
43
    formatter = DateTimeFormat.forPattern(pattern);
45
    formatter = DateTimeFormat.forPattern(pattern);

    
   
46

   

    
   
47
    String language = context.getString("language");

    
   
48
    if (!StringUtils.isEmpty(language)) {

    
   
49
      formatter = formatter.withLocale(new Locale(language));

    
   
50
    }
44
  }
51
  }
45

    
   
52

   
46
  @Override
53
  @Override
47
  public String serialize(String value) {
54
  public String serialize(String value) {
48
    DateTime dateTime = formatter.parseDateTime(value);
55
    return Long.toString(formatter.parseMillis(value));
49
    return Long.toString(dateTime.getMillis());

   
50
  }
56
  }
51

    
   
57

   
52
  @Override
58
  @Override
53
  public void configure(ComponentConfiguration conf) {
59
  public void configure(ComponentConfiguration conf) {
54
    // NO-OP...
60
    // NO-OP...
55
  }
61
  }
56
}
62
}
flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
Revision ac46131 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 54caf33 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java: Loading...
  3. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...