Review Board 1.7.22


FLUME-2014: Race condition when using local timestamp with BucketPath

Review Request #10699 - Created April 22, 2013 and updated

Mike Percy
FLUME-2014
Reviewers
Flume
flume-git
This patch changes the behavior of the BucketPath.escapeString() function to only ever call clock.currentTimeMillis() once during an invocation of the method. This prevents a race condition that can cause unexpected results in the interpolated paths.

Added unit test for the previous condition. All tests pass.

Here is some output from running the new unit test without the patch:

Running org.apache.flume.formatter.output.TestBucketPath
Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.87 sec <<< FAILURE!

Results :

Failed tests:   testDateRace(org.apache.flume.formatter.output.TestBucketPath): Race condition detected expected:<02:[5]0> but was:<02:[0]0>

Tests run: 6, Failures: 1, Errors: 0, Skipped: 0
flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
Revision 971c75c New Change
[20] 46 lines
[+20] [+] public class BucketPath {
47

    
   
47

   
48
  /**
48
  /**
49
   * Returns true if in contains a substring matching TAG_REGEX (i.e. of the
49
   * Returns true if in contains a substring matching TAG_REGEX (i.e. of the
50
   * form %{...} or %x.
50
   * form %{...} or %x.
51
   */
51
   */

    
   
52
  @VisibleForTesting

    
   
53
  @Deprecated
52
  public static boolean containsTag(String in) {
54
  public static boolean containsTag(String in) {
53
    return tagPattern.matcher(in).find();
55
    return tagPattern.matcher(in).find();
54
  }
56
  }
55

    
   
57

   

    
   
58
  @VisibleForTesting

    
   
59
  @Deprecated
56
  public static String expandShorthand(char c) {
60
  public static String expandShorthand(char c) {
57
    // It's a date
61
    // It's a date
58
    switch (c) {
62
    switch (c) {
59
    case 'a':
63
    case 'a':
60
      return "weekday_short";
64
      return "weekday_short";
[+20] [20] 53 lines
[+20] public static String expandShorthand(char c) {
114
   *
118
   *
115
   * Returns the empty string if an escape is not recognized.
119
   * Returns the empty string if an escape is not recognized.
116
   *
120
   *
117
   * Dates follow the same format as unix date, with a few exceptions.
121
   * Dates follow the same format as unix date, with a few exceptions.
118
   *
122
   *

    
   
123
   * <p>This static method will be REMOVED in a future version of Flume</p>

    
   
124
   *
119
   */
125
   */

    
   
126
  @VisibleForTesting

    
   
127
  @Deprecated
120
  public static String replaceShorthand(char c, Map<String, String> headers) {
128
  public static String replaceShorthand(char c, Map<String, String> headers) {
121
    return replaceShorthand(c, headers, false, 0, 0);
129
    return replaceShorthand(c, headers, false, 0, 0);
122
  }
130
  }
123

    
   
131

   
124
  /**
132
  /**
125
   * A wrapper around
133
   * A wrapper around
126
   * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)}
134
   * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int,

    
   
135
   * int, boolean)}
127
   * with the timezone set to the default.
136
   * with the timezone set to the default.

    
   
137
   *

    
   
138
   * <p>This static method will be REMOVED in a future version of Flume</p>
128
   */
139
   */

    
   
140
  @VisibleForTesting

    
   
141
  @Deprecated
129
  public static String replaceShorthand(char c, Map<String, String> headers,
142
  public static String replaceShorthand(char c, Map<String, String> headers,
130
      boolean needRounding, int unit, int roundDown) {
143
      boolean needRounding, int unit, int roundDown) {
131
    return replaceShorthand(c, headers, null, needRounding, unit, roundDown,
144
    return replaceShorthand(c, headers, null, needRounding, unit, roundDown,
132
      false);
145
      false);
133
  }
146
  }
[+20] [20] 4 lines
[+20] public static String replaceShorthand(char c, Map<String, String> headers,
138
   * All shorthands are Date format strings, currently.
151
   * All shorthands are Date format strings, currently.
139
   *
152
   *
140
   * Returns the empty string if an escape is not recognized.
153
   * Returns the empty string if an escape is not recognized.
141
   *
154
   *
142
   * Dates follow the same format as unix date, with a few exceptions.
155
   * Dates follow the same format as unix date, with a few exceptions.

    
   
156
   *

    
   
157
   * <p>This static method will be REMOVED in a future version of Flume</p>

    
   
158
   *
143
   * @param c - The character to replace.
159
   * @param c - The character to replace.
144
   * @param headers - Event headers
160
   * @param headers - Event headers
145
   * @param timeZone - The timezone to use for formatting the timestamp
161
   * @param timeZone - The timezone to use for formatting the timestamp
146
   * @param needRounding - Should the timestamp be rounded down?
162
   * @param needRounding - Should the timestamp be rounded down?
147
   * @param unit - if needRounding is true, what unit to round down to. This
163
   * @param unit - if needRounding is true, what unit to round down to. This
148
   * must be one of the units specified by {@link java.util.Calendar} -
164
   * must be one of the units specified by {@link java.util.Calendar} -
149
   * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
165
   * HOUR, MINUTE or SECOND. Defaults to second, if none of these are present.
150
   * Ignored if needRounding is false.
166
   * Ignored if needRounding is false.
151
   * @param roundDown - if needRounding is true,
167
   * @param roundDown - if needRounding is true,
152
   * The time should be rounded to the largest multiple of this
168
   * The time should be rounded to the largest multiple of this
153
   * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
169
   * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off
154
   * to the second/minute/hour immediately lower than the timestamp supplied.
170
   * to the second/minute/hour immediately lower than the timestamp supplied.
155
   * Ignored if needRounding is false.
171
   * Ignored if needRounding is false.

    
   
172
   *
156
   * @return
173
   * @return
157
   */
174
   */

    
   
175
  @VisibleForTesting

    
   
176
  @Deprecated
158
  public static String replaceShorthand(char c, Map<String, String> headers,
177
  public static String replaceShorthand(char c, Map<String, String> headers,
159
    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
178
    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
160
    boolean useLocalTimestamp) {
179
    boolean useLocalTimestamp) {
161
    long ts;
180
    long ts = 0;
162
    String timestampHeader;
181
    if (useLocalTimestamp) {

    
   
182
      ts = clock.currentTimeMillis();

    
   
183
    }

    
   
184
    return replaceShorthand(c, headers, timeZone, needRounding, unit,

    
   
185
        roundDown, false, ts);

    
   
186
  }

    
   
187

   

    
   
188
  /**

    
   
189
   * Not intended as a public API

    
   
190
   */

    
   
191
  @VisibleForTesting

    
   
192
  protected static String replaceShorthand(char c, Map<String, String> headers,

    
   
193
      TimeZone timeZone, boolean needRounding, int unit, int roundDown,

    
   
194
      boolean useLocalTimestamp, long ts) {

    
   
195

   

    
   
196
    String timestampHeader = null;
163
    try {
197
    try {
164
      if(!useLocalTimestamp) {
198
      if(!useLocalTimestamp) {
165
        timestampHeader = headers.get("timestamp");
199
        timestampHeader = headers.get("timestamp");
166
        Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
200
        Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
167
          "the Flume event headers, but it was null");
201
          "the Flume event headers, but it was null");

    
   
202
        ts = Long.valueOf(timestampHeader);
168
      } else {
203
      } else {
169
        timestampHeader = String.valueOf(clock.currentTimeMillis());
204
        timestampHeader = String.valueOf(ts);
170
      }
205
      }
171
      ts = Long.valueOf(timestampHeader);

   
172
    } catch (NumberFormatException e) {
206
    } catch (NumberFormatException e) {
173
      throw new RuntimeException("Flume wasn't able to parse timestamp header"
207
      throw new RuntimeException("Flume wasn't able to parse timestamp header"
174
        + " in the event to resolve time based bucketing. Please check that"
208
        + " in the event to resolve time based bucketing. Please check that"
175
        + " you're correctly populating timestamp header (for example using"
209
        + " you're correctly populating timestamp header (for example using"
176
        + " TimestampInterceptor source interceptor).", e);
210
        + " TimestampInterceptor source interceptor).", e);
[+20] [20] 123 lines
[+20] [+] private static long roundDown(int roundDown, int unit, long ts){
300
    return escapeString(in, headers, false, 0, 0);
334
    return escapeString(in, headers, false, 0, 0);
301
  }
335
  }
302

    
   
336

   
303
  /**
337
  /**
304
   * A wrapper around
338
   * A wrapper around
305
   * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)}
339
   * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int,

    
   
340
   boolean)}
306
   * with the timezone set to the default.
341
   * with the timezone set to the default.
307
   */
342
   */
308
  public static String escapeString(String in, Map<String, String> headers,
343
  public static String escapeString(String in, Map<String, String> headers,
309
      boolean needRounding, int unit, int roundDown) {
344
      boolean needRounding, int unit, int roundDown) {
310
    return escapeString(in, headers, null, needRounding, unit, roundDown,
345
    return escapeString(in, headers, null, needRounding, unit, roundDown,
[+20] [20] 22 lines
[+20] public static String escapeString(String in, Map<String, String> headers,
333
   * @return Escaped string.
368
   * @return Escaped string.
334
   */
369
   */
335
  public static String escapeString(String in, Map<String, String> headers,
370
  public static String escapeString(String in, Map<String, String> headers,
336
    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
371
    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
337
    boolean useLocalTimeStamp) {
372
    boolean useLocalTimeStamp) {

    
   
373

   

    
   
374
    long ts = clock.currentTimeMillis();

    
   
375

   
338
    Matcher matcher = tagPattern.matcher(in);
376
    Matcher matcher = tagPattern.matcher(in);
339
    StringBuffer sb = new StringBuffer();
377
    StringBuffer sb = new StringBuffer();
340
    while (matcher.find()) {
378
    while (matcher.find()) {
341
      String replacement = "";
379
      String replacement = "";
342
      // Group 2 is the %{...} pattern
380
      // Group 2 is the %{...} pattern
[+20] [20] 11 lines
[+20] public static String escapeString(String in, Map<String, String> headers,
354
        Preconditions.checkState(matcher.group(1) != null
392
        Preconditions.checkState(matcher.group(1) != null
355
            && matcher.group(1).length() == 1,
393
            && matcher.group(1).length() == 1,
356
            "Expected to match single character tag in string " + in);
394
            "Expected to match single character tag in string " + in);
357
        char c = matcher.group(1).charAt(0);
395
        char c = matcher.group(1).charAt(0);
358
        replacement = replaceShorthand(c, headers, timeZone,
396
        replacement = replaceShorthand(c, headers, timeZone,
359
            needRounding, unit, roundDown, useLocalTimeStamp);
397
            needRounding, unit, roundDown, useLocalTimeStamp, ts);
360
      }
398
      }
361

    
   
399

   
362
      // The replacement string must have '$' and '\' chars escaped. This
400
      // The replacement string must have '$' and '\' chars escaped. This
363
      // replacement string is pretty arcane.
401
      // replacement string is pretty arcane.
364
      //
402
      //
[+20] [20] 16 lines
[+20] public static String escapeString(String in, Map<String, String> headers,
381
  /**
419
  /**
382
   * Instead of replacing escape sequences in a string, this method returns a
420
   * Instead of replacing escape sequences in a string, this method returns a
383
   * mapping of an attribute name to the value based on the escape sequence
421
   * mapping of an attribute name to the value based on the escape sequence
384
   * found in the argument string.
422
   * found in the argument string.
385
   */
423
   */

    
   
424
  @VisibleForTesting

    
   
425
  @Deprecated
386
  public static Map<String, String> getEscapeMapping(String in,
426
  public static Map<String, String> getEscapeMapping(String in,
387
      Map<String, String> headers) {
427
      Map<String, String> headers) {
388
    return getEscapeMapping(in, headers, false, 0, 0);
428
    return getEscapeMapping(in, headers, false, 0, 0);
389
  }
429
  }

    
   
430

   

    
   
431
  @VisibleForTesting

    
   
432
  @Deprecated
390
  public static Map<String, String> getEscapeMapping(String in,
433
  public static Map<String, String> getEscapeMapping(String in,
391
      Map<String, String> headers, boolean needRounding,
434
      Map<String, String> headers, boolean needRounding,
392
      int unit, int roundDown) {
435
      int unit, int roundDown) {
393
    Map<String, String> mapping = new HashMap<String, String>();
436
    Map<String, String> mapping = new HashMap<String, String>();
394
    Matcher matcher = tagPattern.matcher(in);
437
    Matcher matcher = tagPattern.matcher(in);
[+20] [20] 24 lines
[+20] public static String escapeString(String in, Map<String, String> headers,
419
    }
462
    }
420
    return mapping;
463
    return mapping;
421

    
   
464

   
422
  }
465
  }
423

    
   
466

   
424
  //Should not be called from outside unit tests.
467
  /*

    
   
468
   * May not be called from outside unit tests.

    
   
469
   */
425
  @VisibleForTesting
470
  @VisibleForTesting
426
  public static void setClock(Clock clk) {
471
  public static void setClock(Clock clk) {
427
    clock = clk;
472
    clock = clk;
428
  }
473
  }

    
   
474

   

    
   
475
  /*

    
   
476
   * May not be called from outside unit tests.

    
   
477
   */

    
   
478
  @VisibleForTesting

    
   
479
  public static Clock getClock() {

    
   
480
    return clock;

    
   
481
  }
429
}
482
}
430

    
   
483

   
flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
Revision 9cfefc0 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java: Loading...