Review Board 1.7.22


Jonathan Natkins got review request #6886!

FLUME-1509. Flume HDFS sink should allow for the use of different timezones when resolving sink paths

Review Request #6886 - Created Sept. 1, 2012 and updated

Jonathan Natkins
FLUME-1509
Reviewers
Flume
flume-git
This patch adds a timeZone parameter to the HDFS sink. It uses standard timezone strings to specify what timezone the timestamp header should be resolved in.
Automated tests added
flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
Revision cf105c7 New Change
[20] 20 lines
[+20]
21
import java.text.SimpleDateFormat;
21
import java.text.SimpleDateFormat;
22
import java.util.Calendar;
22
import java.util.Calendar;
23
import java.util.Date;
23
import java.util.Date;
24
import java.util.HashMap;
24
import java.util.HashMap;
25
import java.util.Map;
25
import java.util.Map;

    
   
26
import java.util.TimeZone;
26
import java.util.regex.Matcher;
27
import java.util.regex.Matcher;
27
import java.util.regex.Pattern;
28
import java.util.regex.Pattern;
28

    
   
29

   
29
import org.apache.flume.tools.TimestampRoundDownUtil;
30
import org.apache.flume.tools.TimestampRoundDownUtil;
30

    
   
31

   
[+20] [20] 83 lines
[+20] [+] public static String expandShorthand(char c) {
114
  public static String replaceShorthand(char c, Map<String, String> headers) {
115
  public static String replaceShorthand(char c, Map<String, String> headers) {
115
    return replaceShorthand(c, headers, false, 0, 0);
116
    return replaceShorthand(c, headers, false, 0, 0);
116
  }
117
  }
117

    
   
118

   
118
  /**
119
  /**

    
   
120
   * A wrapper around

    
   
121
   * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)}

    
   
122
   * with the timezone set to the default.

    
   
123
   */

    
   
124
  public static String replaceShorthand(char c, Map<String, String> headers,

    
   
125
      boolean needRounding, int unit, int roundDown) {

    
   
126
    return replaceShorthand(c, headers, null, needRounding, unit, roundDown);

    
   
127
  }

    
   
128

   

    
   
129
  /**
119
   * Hardcoded lookups for %x style escape replacement. Add your own!
130
   * Hardcoded lookups for %x style escape replacement. Add your own!
120
   *
131
   *
121
   * All shorthands are Date format strings, currently.
132
   * All shorthands are Date format strings, currently.
122
   *
133
   *
123
   * Returns the empty string if an escape is not recognized.
134
   * Returns the empty string if an escape is not recognized.
124
   *
135
   *
125
   * Dates follow the same format as unix date, with a few exceptions.
136
   * Dates follow the same format as unix date, with a few exceptions.
126
   * @param c - The character to replace.
137
   * @param c - The character to replace.
127
   * @param headers - Event headers
138
   * @param headers - Event headers

    
   
139
   * @param timeZone - The timezone to use for formatting the timestamp
128
   * @param needRounding - Should the timestamp be rounded down?
140
   * @param needRounding - Should the timestamp be rounded down?
129
   * @param unit - if needRounding is true, what unit to round down to. This
141
   * @param unit - if needRounding is true, what unit to round down to. This
130
   * must be one of the units specified by {@link java.util.Calendar} -
142
   * must be one of the units specified by {@link java.util.Calendar} -
131
   * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
143
   * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
132
   * Ignored if needRounding is false.
144
   * Ignored if needRounding is false.
133
   * @param roundDown - if needRounding is true,
145
   * @param roundDown - if needRounding is true,
134
   * The time should be rounded to the largest multiple of this
146
   * The time should be rounded to the largest multiple of this
135
   * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
147
   * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
136
   * to the second/minute/hour immediately lower than the timestamp supplied.
148
   * to the second/minute/hour immediately lower than the timestamp supplied.
137
   * Ignored if needRounding is false.
149
   * Ignored if needRounding is false.
138
   * @return
150
   * @return
139
   */
151
   */
140
  public static String replaceShorthand(char c, Map<String, String> headers,
152
  public static String replaceShorthand(char c, Map<String, String> headers,
141
      boolean needRounding, int unit, int roundDown) {
153
      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
142

    
   
154

   
143
    String timestampHeader = headers.get("timestamp");
155
    String timestampHeader = headers.get("timestamp");
144
    long ts;
156
    long ts;
145
    try {
157
    try {
146
      ts = Long.valueOf(timestampHeader);
158
      ts = Long.valueOf(timestampHeader);
[+20] [20] 80 lines
[+20] public static String replaceShorthand(char c, Map<String, String> headers) { public static String replaceShorthand(char c, Map<String, String> headers,
227
//      LOG.warn("Unrecognized escape in event format string: %" + c);
239
//      LOG.warn("Unrecognized escape in event format string: %" + c);
228
      return "";
240
      return "";
229
    }
241
    }
230

    
   
242

   
231
    SimpleDateFormat format = new SimpleDateFormat(formatString);
243
    SimpleDateFormat format = new SimpleDateFormat(formatString);

    
   
244
    if (timeZone != null) {

    
   
245
      format.setTimeZone(timeZone);

    
   
246
    }

    
   
247

   
232
    Date date = new Date(ts);
248
    Date date = new Date(ts);
233
    return format.format(date);
249
    return format.format(date);
234
  }
250
  }
235

    
   
251

   
236
  private static long roundDown(int roundDown, int unit, long ts){
252
  private static long roundDown(int roundDown, int unit, long ts){
[+20] [20] 33 lines
[+20] public static String replaceShorthand(char c, Map<String, String> headers) { public static String replaceShorthand(char c, Map<String, String> headers,
270
  public static String escapeString(String in, Map<String, String> headers){
286
  public static String escapeString(String in, Map<String, String> headers){
271
    return escapeString(in, headers, false, 0, 0);
287
    return escapeString(in, headers, false, 0, 0);
272
  }
288
  }
273

    
   
289

   
274
  /**
290
  /**

    
   
291
   * A wrapper around

    
   
292
   * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)}

    
   
293
   * with the timezone set to the default.

    
   
294
   */

    
   
295
  public static String escapeString(String in, Map<String, String> headers,

    
   
296
      boolean needRounding, int unit, int roundDown) {

    
   
297
    return escapeString(in, headers, null, needRounding, unit, roundDown);

    
   
298
  }

    
   
299

   

    
   
300
  /**
275
   * Replace all substrings of form %{tagname} with get(tagname).toString() and
301
   * Replace all substrings of form %{tagname} with get(tagname).toString() and
276
   * all shorthand substrings of form %x with a special value.
302
   * all shorthand substrings of form %x with a special value.
277
   *
303
   *
278
   * Any unrecognized / not found tags will be replaced with the empty string.
304
   * Any unrecognized / not found tags will be replaced with the empty string.
279
   *
305
   *
[+20] [20] 11 lines
[+20] public static String escapeString(String in, Map<String, String> headers){ [+] public static String escapeString(String in, Map<String, String> headers,
291
   * to the second/minute/hour immediately lower than the timestamp supplied.
317
   * to the second/minute/hour immediately lower than the timestamp supplied.
292
   * Ignored if needRounding is false.
318
   * Ignored if needRounding is false.
293
   * @return Escaped string.
319
   * @return Escaped string.
294
   */
320
   */
295
  public static String escapeString(String in, Map<String, String> headers,
321
  public static String escapeString(String in, Map<String, String> headers,
296
      boolean needRounding, int unit, int roundDown) {
322
      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
297
    Matcher matcher = tagPattern.matcher(in);
323
    Matcher matcher = tagPattern.matcher(in);
298
    StringBuffer sb = new StringBuffer();
324
    StringBuffer sb = new StringBuffer();
299
    while (matcher.find()) {
325
    while (matcher.find()) {
300
      String replacement = "";
326
      String replacement = "";
301
      // Group 2 is the %{...} pattern
327
      // Group 2 is the %{...} pattern
[+20] [20] 10 lines
[+20] public static String escapeString(String in, Map<String, String> headers){ public static String escapeString(String in, Map<String, String> headers,
312
        // switch on that rather than the string.
338
        // switch on that rather than the string.
313
        Preconditions.checkState(matcher.group(1) != null
339
        Preconditions.checkState(matcher.group(1) != null
314
            && matcher.group(1).length() == 1,
340
            && matcher.group(1).length() == 1,
315
            "Expected to match single character tag in string " + in);
341
            "Expected to match single character tag in string " + in);
316
        char c = matcher.group(1).charAt(0);
342
        char c = matcher.group(1).charAt(0);
317
        replacement = replaceShorthand(c, headers,
343
        replacement = replaceShorthand(c, headers, timeZone,
318
            needRounding, unit, roundDown);
344
            needRounding, unit, roundDown);
319
      }
345
      }
320

    
   
346

   
321
      // The replacement string must have '$' and '\' chars escaped. This
347
      // The replacement string must have '$' and '\' chars escaped. This
322
      // replacement string is pretty arcane.
348
      // replacement string is pretty arcane.
[+20] [20] 61 lines
flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
Revision 86f3293 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 45dd7cc New Change
 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
Revision fcb9642 New Change
 
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Revision b5f8c88 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java: Loading...
  3. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  4. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java: Loading...
  5. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java: Loading...