Review Board 1.7.22


Patch for FLUME-2120

Review Request #14569 - Created Oct. 10, 2013 and updated

Venkatesh Sivasubramanian
v1.4.0
Flume-2120
Reviewers
Flume
flume-git
Pls. find the patch attached for the FLUME-2120. 

I have made changes to SyslogUDPSource and SyslogTCPSource. Tested it locally and also have added a JUnit for SyslogUDPSource.
I see the MultiportSyslogTCPSource is pretty much a better version/likely replacement for SyslogTCPSource. But still went ahead and added the metrics counters to SyslogTCPSource, as the change was straight forward and its very much a part of the code base.
JUnit added for SyslogUDPSource and SyslogTcpSource
flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
Revision 7a12d27 New Change
[20] 21 lines
[+20]
22
import java.net.SocketAddress;
22
import java.net.SocketAddress;
23
import java.util.Map;
23
import java.util.Map;
24
import java.util.concurrent.Executors;
24
import java.util.concurrent.Executors;
25
import java.util.concurrent.TimeUnit;
25
import java.util.concurrent.TimeUnit;
26

    
   
26

   
27
import com.google.common.annotations.VisibleForTesting;

   
28
import org.apache.flume.ChannelException;
27
import org.apache.flume.ChannelException;
29
import org.apache.flume.Context;
28
import org.apache.flume.Context;
30
import org.apache.flume.CounterGroup;

   
31
import org.apache.flume.Event;
29
import org.apache.flume.Event;
32
import org.apache.flume.EventDrivenSource;
30
import org.apache.flume.EventDrivenSource;
33
import org.apache.flume.conf.Configurable;
31
import org.apache.flume.conf.Configurable;
34
import org.apache.flume.conf.Configurables;
32
import org.apache.flume.conf.Configurables;

    
   
33
import org.apache.flume.instrumentation.SourceCounter;
35
import org.jboss.netty.bootstrap.ServerBootstrap;
34
import org.jboss.netty.bootstrap.ServerBootstrap;
36
import org.jboss.netty.buffer.ChannelBuffer;
35
import org.jboss.netty.buffer.ChannelBuffer;
37
import org.jboss.netty.channel.Channel;
36
import org.jboss.netty.channel.Channel;
38
import org.jboss.netty.channel.ChannelFactory;
37
import org.jboss.netty.channel.ChannelFactory;
39
import org.jboss.netty.channel.ChannelHandlerContext;
38
import org.jboss.netty.channel.ChannelHandlerContext;
[+20] [20] 4 lines
[+20]
44
import org.jboss.netty.channel.SimpleChannelHandler;
43
import org.jboss.netty.channel.SimpleChannelHandler;
45
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
44
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
46
import org.slf4j.Logger;
45
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
46
import org.slf4j.LoggerFactory;
48

    
   
47

   

    
   
48
import com.google.common.annotations.VisibleForTesting;

    
   
49

   
49
public class SyslogTcpSource extends AbstractSource
50
public class SyslogTcpSource extends AbstractSource
50
implements EventDrivenSource, Configurable {
51
implements EventDrivenSource, Configurable {
51

    
   
52

   
52

    
   
53

   
53
  private static final Logger logger = LoggerFactory
54
  private static final Logger logger = LoggerFactory
54
      .getLogger(SyslogTcpSource.class);
55
      .getLogger(SyslogTcpSource.class);
55
  private int port;
56
  private int port;
56
  private String host = null;
57
  private String host = null;
57
  private Channel nettyChannel;
58
  private Channel nettyChannel;
58
  private Integer eventSize;
59
  private Integer eventSize;
59
  private Map<String, String> formaterProp;
60
  private Map<String, String> formaterProp;
60
  private CounterGroup counterGroup = new CounterGroup();
61
  private SourceCounter sourceCounter;
61
  private Boolean keepFields;
62
  private Boolean keepFields;
62

    
   
63

   
63
  public class syslogTcpHandler extends SimpleChannelHandler {
64
  public class syslogTcpHandler extends SimpleChannelHandler {
64

    
   
65

   
65
    private SyslogUtils syslogUtils = new SyslogUtils();
66
    private SyslogUtils syslogUtils = new SyslogUtils();
[+20] [20] 10 lines
[+20] [+] public void setKeepFields(boolean removeFields){
76
      syslogUtils.addFormats(prop);
77
      syslogUtils.addFormats(prop);
77
    }
78
    }
78

    
   
79

   
79
    @Override
80
    @Override
80
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
81
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {

    
   
82
      sourceCounter.incrementEventReceivedCount();
81
      ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
83
      ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
82
      while (buff.readable()) {
84
      while (buff.readable()) {
83
        Event e = syslogUtils.extractEvent(buff);
85
        Event e = syslogUtils.extractEvent(buff);
84
        if (e == null) {
86
        if (e == null) {
85
          logger.debug("Parsed partial event, event will be generated when " +
87
          logger.debug("Parsed partial event, event will be generated when " +
86
              "rest of the event is received.");
88
              "rest of the event is received.");
87
          continue;
89
          continue;
88
        }
90
        }
89
        try {
91
        try {
90
          getChannelProcessor().processEvent(e);
92
          getChannelProcessor().processEvent(e);
91
          counterGroup.incrementAndGet("events.success");
93
          sourceCounter.incrementEventAcceptedCount();
92
        } catch (ChannelException ex) {
94
        } catch (ChannelException ex) {
93
          counterGroup.incrementAndGet("events.dropped");

   
94
          logger.error("Error writting to channel, event dropped", ex);
95
          logger.error("Error writting to channel, event dropped", ex);
95
        }
96
        }
96
      }
97
      }
97

    
   
98

   
98
    }
99
    }
[+20] [20] 22 lines
[+20] [+] public ChannelPipeline getPipeline() {
121
      nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
122
      nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
122
    } else {
123
    } else {
123
      nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
124
      nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
124
    }
125
    }
125

    
   
126

   

    
   
127
    sourceCounter.start();
126
    super.start();
128
    super.start();
127
  }
129
  }
128

    
   
130

   
129
  @Override
131
  @Override
130
  public void stop() {
132
  public void stop() {
131
    logger.info("Syslog TCP Source stopping...");
133
    logger.info("Syslog TCP Source stopping...");
132
    logger.info("Metrics:{}", counterGroup);

   
133

    
   
134

   
134
    if (nettyChannel != null) {
135
    if (nettyChannel != null) {
135
      nettyChannel.close();
136
      nettyChannel.close();
136
      try {
137
      try {
137
        nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
138
        nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS);
138
      } catch (InterruptedException e) {
139
      } catch (InterruptedException e) {
139
        logger.warn("netty server stop interrupted", e);
140
        logger.warn("netty server stop interrupted", e);
140
      } finally {
141
      } finally {
141
        nettyChannel = null;
142
        nettyChannel = null;
142
      }
143
      }
143
    }
144
    }
144

    
   
145

   

    
   
146
    sourceCounter.stop();
145
    super.stop();
147
    super.stop();

    
   
148

   

    
   
149
    logger.info("{} stopped. Metrics: {}", this, sourceCounter);
146
  }
150
  }
147

    
   
151

   
148
  @Override
152
  @Override
149
  public void configure(Context context) {
153
  public void configure(Context context) {
150
    Configurables.ensureRequiredNonNull(context,
154
    Configurables.ensureRequiredNonNull(context,
151
        SyslogSourceConfigurationConstants.CONFIG_PORT);
155
        SyslogSourceConfigurationConstants.CONFIG_PORT);
152
    port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
156
    port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
153
    host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
157
    host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
154
    eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
158
    eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
155
    formaterProp = context.getSubProperties(
159
    formaterProp = context.getSubProperties(
156
        SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
160
        SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);

    
   
161

   

    
   
162
    if (sourceCounter == null) {

    
   
163
        sourceCounter = new SourceCounter(getName());

    
   
164
    }
157
    keepFields = context.getBoolean
165
    keepFields = context.getBoolean
158
      (SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, false);
166
      (SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS, false);
159
  }
167
  }
160

    
   
168

   
161
  @VisibleForTesting
169
  @VisibleForTesting
[+20] [20] 10 lines
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
Revision 96a9e85 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
Revision a6a1d5b New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
Revision eae26ed New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java: Loading...