Review Board 1.7.22


FLUME-1037: Netcat source should create one event per newline

Review Request #4497 - Created March 27, 2012 and submitted

Mike Percy
FLUME-1037
Reviewers
Flume
flume-git
The NetcatSource does not create one event per newline. Instead, it creates one event per TCP session.

I have addressed multiple issues with NetcatSource in this patch:

1. Properly process a new event per newline.
2. Enforce a maximum length per line, to ensure clients cannot run the server out of memory. (This is now configurable; the default is 512 characters.)
3. Properly flush responses to the client.
4. Increment counters for successful processing and failure.
5. Use shutdownNow() when shutting down the server, otherwise an open client connection will cause the server to hang on shutdown.
6. Made the inner classes of NetcatSource private; I believe making them public was unintentional.
7. Corrected unit test so it now sends a newline with each request. Also now checks for "OK" response from server.
8. Attempting to sneak in a minor fix for the EventHelper.dumpEvent() method into this patch, since I'm testing with LoggerSink and it's bugging me when it throws an exception on a zero-length body.
Unit tests pass. Did a bunch of manual testing using the nc tool with a Netcat source and a Logger sink.
flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java
Revision a326a70 New Change
[20] 38 lines
[+20] [+] public class EventHelper {
39
    return dumpEvent(event, DEFAULT_MAX_BYTES);
39
    return dumpEvent(event, DEFAULT_MAX_BYTES);
40
  }
40
  }
41

    
   
41

   
42
  public static String dumpEvent(Event event, int maxBytes) {
42
  public static String dumpEvent(Event event, int maxBytes) {
43
    StringBuilder buffer = new StringBuilder();
43
    StringBuilder buffer = new StringBuilder();
44
    if(event == null) {
44
    if (event == null || event.getBody() == null) {
45
      buffer.append("null");
45
      buffer.append("null");

    
   
46
    } else if (event.getBody().length == 0) {

    
   
47
      // do nothing... in this case, HexDump.dump() will throw an exception
46
    } else {
48
    } else {
47
      byte[] body = event.getBody();
49
      byte[] body = event.getBody();
48
      byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));
50
      byte[] data = Arrays.copyOf(body, Math.min(body.length, maxBytes));
49
      ByteArrayOutputStream out = new ByteArrayOutputStream();
51
      ByteArrayOutputStream out = new ByteArrayOutputStream();
50
      try {
52
      try {
[+20] [20] 21 lines
flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
Revision a841b0e New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
New File
 
flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Revision fb2a960 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/event/EventHelper.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java: Loading...
  4. flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java: Loading...