Review Board 1.7.22


FLUME-1713 Netcat source to not return "OK"

Review Request #8220 - Created Nov. 26, 2012 and updated

Rahul Ravindran
FLUME-1713
Reviewers
Flume
flume-git
FLUME-1713 Netcat source should allow for *not* returning OK upon receipt of each message; Added a config boolean parameter for this source ackEveryEvent(default value:false). Add parameterized test to existing Netcat unit test
Unit test added
flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
Revision 37c09fe New Change
[20] 109 lines
[+20] [+] public class NetcatSource extends AbstractSource implements Configurable,
110
      .getLogger(NetcatSource.class);
110
      .getLogger(NetcatSource.class);
111

    
   
111

   
112
  private String hostName;
112
  private String hostName;
113
  private int port;
113
  private int port;
114
  private int maxLineLength;
114
  private int maxLineLength;

    
   
115
  private boolean ackEveryEvent;
115

    
   
116

   
116
  private CounterGroup counterGroup;
117
  private CounterGroup counterGroup;
117
  private ServerSocketChannel serverSocket;
118
  private ServerSocketChannel serverSocket;
118
  private AtomicBoolean acceptThreadShouldStop;
119
  private AtomicBoolean acceptThreadShouldStop;
119
  private Thread acceptThread;
120
  private Thread acceptThread;
[+20] [20] 9 lines
[+20] public class NetcatSource extends AbstractSource implements Configurable,
129

    
   
130

   
130
  @Override
131
  @Override
131
  public void configure(Context context) {
132
  public void configure(Context context) {
132
    String hostKey = NetcatSourceConfigurationConstants.CONFIG_HOSTNAME;
133
    String hostKey = NetcatSourceConfigurationConstants.CONFIG_HOSTNAME;
133
    String portKey = NetcatSourceConfigurationConstants.CONFIG_PORT;
134
    String portKey = NetcatSourceConfigurationConstants.CONFIG_PORT;

    
   
135
    String ackEventKey = NetcatSourceConfigurationConstants.CONFIG_ACKEVENT;
134

    
   
136

   
135
    Configurables.ensureRequiredNonNull(context, hostKey, portKey);
137
    Configurables.ensureRequiredNonNull(context, hostKey, portKey);
136

    
   
138

   
137
    hostName = context.getString(hostKey);
139
    hostName = context.getString(hostKey);
138
    port = context.getInteger(portKey);
140
    port = context.getInteger(portKey);

    
   
141
    ackEveryEvent = context.getBoolean(ackEventKey, true);
139
    maxLineLength = context.getInteger(
142
    maxLineLength = context.getInteger(
140
        NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH,
143
        NetcatSourceConfigurationConstants.CONFIG_MAX_LINE_LENGTH,
141
        NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH);
144
        NetcatSourceConfigurationConstants.DEFAULT_MAX_LINE_LENGTH);
142
  }
145
  }
143

    
   
146

   
[+20] [20] 24 lines
[+20] [+] public void start() {
168
    AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
171
    AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
169
    acceptThreadShouldStop.set(false);
172
    acceptThreadShouldStop.set(false);
170
    acceptRunnable.counterGroup = counterGroup;
173
    acceptRunnable.counterGroup = counterGroup;
171
    acceptRunnable.handlerService = handlerService;
174
    acceptRunnable.handlerService = handlerService;
172
    acceptRunnable.shouldStop = acceptThreadShouldStop;
175
    acceptRunnable.shouldStop = acceptThreadShouldStop;

    
   
176
    acceptRunnable.ackEveryEvent = ackEveryEvent;
173
    acceptRunnable.source = this;
177
    acceptRunnable.source = this;
174
    acceptRunnable.serverSocket = serverSocket;
178
    acceptRunnable.serverSocket = serverSocket;
175

    
   
179

   
176
    acceptThread = new Thread(acceptRunnable);
180
    acceptThread = new Thread(acceptRunnable);
177

    
   
181

   
[+20] [20] 66 lines
[+20] [+] private static class AcceptHandler implements Runnable {
244
    private ServerSocketChannel serverSocket;
248
    private ServerSocketChannel serverSocket;
245
    private CounterGroup counterGroup;
249
    private CounterGroup counterGroup;
246
    private ExecutorService handlerService;
250
    private ExecutorService handlerService;
247
    private EventDrivenSource source;
251
    private EventDrivenSource source;
248
    private AtomicBoolean shouldStop;
252
    private AtomicBoolean shouldStop;

    
   
253
    private boolean ackEveryEvent;
249

    
   
254

   
250
    private final int maxLineLength;
255
    private final int maxLineLength;
251

    
   
256

   
252
    public AcceptHandler(int maxLineLength) {
257
    public AcceptHandler(int maxLineLength) {
253
      this.maxLineLength = maxLineLength;
258
      this.maxLineLength = maxLineLength;
[+20] [20] 10 lines
[+20] [+] public void run() {
264
          NetcatSocketHandler request = new NetcatSocketHandler(maxLineLength);
269
          NetcatSocketHandler request = new NetcatSocketHandler(maxLineLength);
265

    
   
270

   
266
          request.socketChannel = socketChannel;
271
          request.socketChannel = socketChannel;
267
          request.counterGroup = counterGroup;
272
          request.counterGroup = counterGroup;
268
          request.source = source;
273
          request.source = source;

    
   
274
          request.ackEveryEvent = ackEveryEvent;
269

    
   
275

   
270
          handlerService.submit(request);
276
          handlerService.submit(request);
271

    
   
277

   
272
          counterGroup.incrementAndGet("accept.succeeded");
278
          counterGroup.incrementAndGet("accept.succeeded");
273
        } catch (ClosedByInterruptException e) {
279
        } catch (ClosedByInterruptException e) {
[+20] [20] 11 lines
[+20] public void run() {
285
  private static class NetcatSocketHandler implements Runnable {
291
  private static class NetcatSocketHandler implements Runnable {
286

    
   
292

   
287
    private Source source;
293
    private Source source;
288
    private CounterGroup counterGroup;
294
    private CounterGroup counterGroup;
289
    private SocketChannel socketChannel;
295
    private SocketChannel socketChannel;

    
   
296
    private boolean ackEveryEvent;
290

    
   
297

   
291
    private final int maxLineLength;
298
    private final int maxLineLength;
292

    
   
299

   
293
    public NetcatSocketHandler(int maxLineLength) {
300
    public NetcatSocketHandler(int maxLineLength) {
294
      this.maxLineLength = maxLineLength;
301
      this.maxLineLength = maxLineLength;
[+20] [20] 95 lines
[+20] [+] private int processEvents(CharBuffer buffer, Writer writer)
390
            }
397
            }
391

    
   
398

   
392
            if (ex == null) {
399
            if (ex == null) {
393
              counterGroup.incrementAndGet("events.processed");
400
              counterGroup.incrementAndGet("events.processed");
394
              numProcessed++;
401
              numProcessed++;

    
   
402
              if (true == ackEveryEvent) {
395
              writer.write("OK\n");
403
                writer.write("OK\n");

    
   
404
              }
396
            } else {
405
            } else {
397
              counterGroup.incrementAndGet("events.failed");
406
              counterGroup.incrementAndGet("events.failed");
398
              logger.warn("Error processing event. Exception follows.", ex);
407
              logger.warn("Error processing event. Exception follows.", ex);
399
              writer.write("FAILED: " + ex.getMessage() + "\n");
408
              writer.write("FAILED: " + ex.getMessage() + "\n");
400
            }
409
            }
[+20] [20] 51 lines
flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java
Revision 1d8b5e4 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision b4a8868 New Change
 
flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Revision 3c17d3d New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/NetcatSourceConfigurationConstants.java: Loading...
  3. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  4. flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java: Loading...