Review Board 1.7.22


FLUME-1126: Support RFC 3164 and 5424 syslog format timestamp parsing

Review Request #4809 - Created April 19, 2012 and updated

Prasad Mujumdar
trunk
FLUME-1126
Reviewers
Flume
aprabhakar, mpercy
flume-git
Support for timestamp and hostname parsing.
Updated syslog tests to cover timestamp and hostname handling.
Manually tested using syslog4j
flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
Revision b0485b1 New Change
[20] 16 lines
[+20]
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.source;
19
package org.apache.flume.source;
20

    
   
20

   
21
import java.net.InetSocketAddress;
21
import java.net.InetSocketAddress;

    
   
22
import java.util.Map;
22
import java.util.concurrent.Executors;
23
import java.util.concurrent.Executors;
23
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.TimeUnit;
24

    
   
25

   
25
import org.apache.flume.ChannelException;
26
import org.apache.flume.ChannelException;
26
import org.apache.flume.Context;
27
import org.apache.flume.Context;
[+20] [20] 24 lines
[+20] [+] public class SyslogTcpSource extends AbstractSource
51
      .getLogger(SyslogTcpSource.class);
52
      .getLogger(SyslogTcpSource.class);
52
  private int port;
53
  private int port;
53
  private String host = null;
54
  private String host = null;
54
  private Channel nettyChannel;
55
  private Channel nettyChannel;
55
  private Integer eventSize;
56
  private Integer eventSize;

    
   
57
  private Map<String, String> formaterProp;
56
  private CounterGroup counterGroup = new CounterGroup();
58
  private CounterGroup counterGroup = new CounterGroup();
57

    
   
59

   
58
  public class syslogTcpHandler extends SimpleChannelHandler {
60
  public class syslogTcpHandler extends SimpleChannelHandler {
59

    
   
61

   
60
    private SyslogUtils syslogUtils = new SyslogUtils();
62
    private SyslogUtils syslogUtils = new SyslogUtils();
61

    
   
63

   
62
    public void setEventSize(int eventSize){
64
    public void setEventSize(int eventSize){
63
      syslogUtils.setEventSize(eventSize);
65
      syslogUtils.setEventSize(eventSize);
64
    }
66
    }
65

    
   
67

   

    
   
68
    public void setFormater(Map<String, String> prop) {

    
   
69
      syslogUtils.addFormats(prop);

    
   
70
    }

    
   
71

   
66
    @Override
72
    @Override
67
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
73
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
68
      ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
74
      ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
69
      while (buff.readable()) {
75
      while (buff.readable()) {
70
        Event e = syslogUtils.extractEvent(buff);
76
        Event e = syslogUtils.extractEvent(buff);
[+20] [20] 22 lines
[+20] [+] public void start() {
93
    ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
99
    ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
94
    serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
100
    serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
95
      public ChannelPipeline getPipeline() {
101
      public ChannelPipeline getPipeline() {
96
        syslogTcpHandler handler = new syslogTcpHandler();
102
        syslogTcpHandler handler = new syslogTcpHandler();
97
        handler.setEventSize(eventSize);
103
        handler.setEventSize(eventSize);

    
   
104
        handler.setFormater(formaterProp);
98
        return Channels.pipeline(handler);
105
        return Channels.pipeline(handler);
99
      }
106
      }
100
    });
107
    });
101

    
   
108

   
102
    logger.info("Syslog TCP Source starting...");
109
    logger.info("Syslog TCP Source starting...");
[+20] [20] 30 lines
[+20] [+] public void stop() {
133
  public void configure(Context context) {
140
  public void configure(Context context) {
134
    Configurables.ensureRequiredNonNull(context, "port");
141
    Configurables.ensureRequiredNonNull(context, "port");
135
    port = context.getInteger("port");
142
    port = context.getInteger("port");
136
    host = context.getString("host");
143
    host = context.getString("host");
137
    eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
144
    eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);

    
   
145
    formaterProp = context.getSubProperties("format");
138
  }
146
  }
139

    
   
147

   
140
}
148
}
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
Revision 732cce5 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
Revision 653f5eb New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
Revision 8b1f7c5 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java: Loading...
  4. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java: Loading...