Review Board 1.7.22


FLUME-2217 Preserve priority, timestamp and hostname fields in MultiportSyslogTcp, SyslogTcp and Udp sources

Review Request #15163 - Created Nov. 1, 2013 and updated

Jeff jlord
Reviewers
Flume
hshreedharan, mpercy
flume-git
Here is a first pass at this.
Please not that the functionality of all 3 sources was modified slightly such that with this patch we will now preserve the syslog priority as well as the timestamp and header. e.g.
<10>2013-10-31T17:36:27.381-07:00 localhost.localdomain test UDP syslog data
Added a brand new TestSyslogUdp Class to verify functionality.
Updated Syslog parser test to verify as well.
flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
Revision 884fd62 New Change
[20] 64 lines
[+20] [+] public class MultiportSyslogTCPSource extends AbstractSource implements
65
  private int readBufferSize;
65
  private int readBufferSize;
66
  private String portHeader;
66
  private String portHeader;
67
  private SourceCounter sourceCounter = null;
67
  private SourceCounter sourceCounter = null;
68
  private Charset defaultCharset;
68
  private Charset defaultCharset;
69
  private ThreadSafeDecoder defaultDecoder;
69
  private ThreadSafeDecoder defaultDecoder;

    
   
70
  private boolean keepFields;
70

    
   
71

   
71
  public MultiportSyslogTCPSource() {
72
  public MultiportSyslogTCPSource() {
72
    portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>();
73
    portCharsets = new ConcurrentHashMap<Integer, ThreadSafeDecoder>();
73
  }
74
  }
74

    
   
75

   
[+20] [20] 61 lines
[+20] [+] public void configure(Context context) {
136

    
   
137

   
137
    readBufferSize = context.getInteger(
138
    readBufferSize = context.getInteger(
138
        SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE,
139
        SyslogSourceConfigurationConstants.CONFIG_READBUF_SIZE,
139
        SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE);
140
        SyslogSourceConfigurationConstants.DEFAULT_READBUF_SIZE);
140

    
   
141

   

    
   
142
    keepFields = context.getBoolean(

    
   
143
        SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,

    
   
144
        SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS);

    
   
145

   
141
    if (sourceCounter == null) {
146
    if (sourceCounter == null) {
142
      sourceCounter = new SourceCounter(getName());
147
      sourceCounter = new SourceCounter(getName());
143
    }
148
    }
144
  }
149
  }
145

    
   
150

   
[+20] [20] 11 lines
[+20] [+] public void start() {
157
    acceptor.getSessionConfig().setReadBufferSize(readBufferSize);
162
    acceptor.getSessionConfig().setReadBufferSize(readBufferSize);
158
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
163
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
159

    
   
164

   
160
    acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize,
165
    acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize,
161
        getChannelProcessor(), sourceCounter, portHeader, defaultDecoder,
166
        getChannelProcessor(), sourceCounter, portHeader, defaultDecoder,
162
        portCharsets));
167
        portCharsets, keepFields));
163

    
   
168

   
164
    for (int port : ports) {
169
    for (int port : ports) {
165
      InetSocketAddress addr;
170
      InetSocketAddress addr;
166
      if (host != null) {
171
      if (host != null) {
167
        addr = new InetSocketAddress(host, port);
172
        addr = new InetSocketAddress(host, port);
[+20] [20] 43 lines
[+20] [+] static class MultiportSyslogHandler extends IoHandlerAdapter {
211
    private final String portHeader;
216
    private final String portHeader;
212
    private final SyslogParser syslogParser;
217
    private final SyslogParser syslogParser;
213
    private final LineSplitter lineSplitter;
218
    private final LineSplitter lineSplitter;
214
    private final ThreadSafeDecoder defaultDecoder;
219
    private final ThreadSafeDecoder defaultDecoder;
215
    private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;
220
    private final ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets;

    
   
221
    private final boolean keepFields;
216

    
   
222

   
217
    public MultiportSyslogHandler(int maxEventSize, int batchSize,
223
    public MultiportSyslogHandler(int maxEventSize, int batchSize,
218
        ChannelProcessor cp, SourceCounter ctr, String portHeader,
224
        ChannelProcessor cp, SourceCounter ctr, String portHeader,
219
        ThreadSafeDecoder defaultDecoder,
225
        ThreadSafeDecoder defaultDecoder,
220
        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets) {
226
        ConcurrentMap<Integer, ThreadSafeDecoder> portCharsets, boolean keepFields) {
221
      channelProcessor = cp;
227
      channelProcessor = cp;
222
      sourceCounter = ctr;
228
      sourceCounter = ctr;
223
      this.maxEventSize = maxEventSize;
229
      this.maxEventSize = maxEventSize;
224
      this.batchSize = batchSize;
230
      this.batchSize = batchSize;
225
      this.portHeader = portHeader;
231
      this.portHeader = portHeader;
226
      this.defaultDecoder = defaultDecoder;
232
      this.defaultDecoder = defaultDecoder;
227
      this.portCharsets = portCharsets;
233
      this.portCharsets = portCharsets;

    
   
234
      this.keepFields = keepFields;
228
      syslogParser = new SyslogParser();
235
      syslogParser = new SyslogParser();
229
      lineSplitter = new LineSplitter(maxEventSize);
236
      lineSplitter = new LineSplitter(maxEventSize);
230
    }
237
    }
231

    
   
238

   
232
    @Override
239
    @Override
[+20] [20] 86 lines
[+20] [+] public void messageReceived(IoSession session, Object message) {
319
    }
326
    }
320

    
   
327

   
321
    /**
328
    /**
322
     * Decodes a syslog-formatted ParsedLine into a Flume Event.
329
     * Decodes a syslog-formatted ParsedLine into a Flume Event.
323
     * @param parsedBuf Buffer containing characters to be parsed
330
     * @param parsedBuf Buffer containing characters to be parsed
324
     * @param port Incoming port
331
     * @param decoder Character set is configurable on a per-port basis.
325
     * @return
332
     * @return
326
     */
333
     */
327
    Event parseEvent(ParsedBuffer parsedBuf, CharsetDecoder decoder) {
334
    Event parseEvent(ParsedBuffer parsedBuf, CharsetDecoder decoder) {
328
      String msg = null;
335
      String msg = null;
329
      try {
336
      try {
[+20] [20] 19 lines
[+20] public void messageReceived(IoSession session, Object message) {
349

    
   
356

   
350
      logger.trace("Seen raw event: {}", msg);
357
      logger.trace("Seen raw event: {}", msg);
351

    
   
358

   
352
      Event event;
359
      Event event;
353
      try {
360
      try {
354
        event = syslogParser.parseMessage(msg, decoder.charset());
361
        event = syslogParser.parseMessage(msg, decoder.charset(), keepFields);
355
        if (parsedBuf.incomplete) {
362
        if (parsedBuf.incomplete) {
356
          event.getHeaders().put(SyslogUtils.EVENT_STATUS,
363
          event.getHeaders().put(SyslogUtils.EVENT_STATUS,
357
              SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus());
364
              SyslogUtils.SyslogStatus.INCOMPLETE.getSyslogStatus());
358
        }
365
        }
359
      } catch (IllegalArgumentException ex) {
366
      } catch (IllegalArgumentException ex) {
[+20] [20] 180 lines
flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java
Revision bf3305c New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
Revision 7a12d27 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
Revision 96a9e85 New Change
 
flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java
Revision f2ea932 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
Revision 680e592 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java
Revision 258c2f1 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
Revision a6a1d5b New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
Revision eae26ed New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java
Revision 898096b New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 8687cb7 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java: Loading...
  3. flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java: Loading...
  4. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java: Loading...
  5. flume-ng-core/src/main/java/org/apache/flume/source/SyslogUtils.java: Loading...
  6. flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java: Loading...
  7. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogParser.java: Loading...
  8. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java: Loading...
  9. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java: Loading...
  10. flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUtils.java: Loading...
  11. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...