Review Board 1.7.22


FLUME-2089 ElasticsearchSink blocks and raises exceptions when event body has unexpected encoding

Review Request #13509 - Created Aug. 12, 2013 and updated

Edward Sargisson
Reviewers
Flume
flume-git
The ElasticsearchSink is quite aggressive in assuming that the incoming event is JSON. However, if it is not JSON or YAML then an exception is thrown which would block the queue from processing further messages. 
This patch catches Exception and writes the data as a simple field.
Mvn install passes.
A custom build with this patch is currently running in many of our environments with no problems and the event which caused the issue was safely stored.
flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java
Revision bf7c57c New Change
[20] 19 lines
[+20]
20
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
20
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
21

    
   
21

   
22
import java.io.IOException;
22
import java.io.IOException;
23
import java.nio.charset.Charset;
23
import java.nio.charset.Charset;
24

    
   
24

   
25
import org.elasticsearch.common.jackson.core.JsonParseException;

   
26
import org.elasticsearch.common.xcontent.XContentBuilder;
25
import org.elasticsearch.common.xcontent.XContentBuilder;
27
import org.elasticsearch.common.xcontent.XContentFactory;
26
import org.elasticsearch.common.xcontent.XContentFactory;
28
import org.elasticsearch.common.xcontent.XContentParser;
27
import org.elasticsearch.common.xcontent.XContentParser;
29
import org.elasticsearch.common.xcontent.XContentType;
28
import org.elasticsearch.common.xcontent.XContentType;
30

    
   
29

   
[+20] [20] 29 lines
[+20] [+] public static void addComplexField(XContentBuilder builder, String fieldName,
60
      XContentBuilder tmp = jsonBuilder();
59
      XContentBuilder tmp = jsonBuilder();
61
      parser = XContentFactory.xContent(contentType).createParser(data);
60
      parser = XContentFactory.xContent(contentType).createParser(data);
62
      parser.nextToken();
61
      parser.nextToken();
63
      tmp.copyCurrentStructure(parser);
62
      tmp.copyCurrentStructure(parser);
64
      builder.field(fieldName, tmp);
63
      builder.field(fieldName, tmp);
65
    } catch (JsonParseException ex) {
64
    } catch (Exception ex) {
66
      // If we get an exception here the most likely cause is nested JSON that
65
      // Any sort of exception here (JSON, CharConversion, etc) indicates
67
      // can't be figured out in the body. At this point just push it through
66
      // that we made an assumption on content type (JSON), and that may
68
      // as is, we have already added the field so don't do it again
67
      // or may not be accurate.  At this point just push it through as

    
   
68
      // is, we have already added the field so don't do it again.
69
      addSimpleField(builder, fieldName, data);
69
      addSimpleField(builder, fieldName, data);
70
    } finally {
70
    } finally {
71
      if (parser != null) {
71
      if (parser != null) {
72
        parser.close();
72
        parser.close();
73
      }
73
      }
74
    }
74
    }
75
  }
75
  }
76
}
76
}
flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
Revision 9dff4b0 New Change
 
  1. flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java: Loading...
  2. flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java: Loading...