Review Board 1.7.22


FLUME-1947. ExecSource should optionally block if the channel is full.

Review Request #9914 - Created March 13, 2013 and updated

Hari Shreedharan
FLUME-1947
Reviewers
Flume
flume-git
Made multiple changes to the exec source:

1. Flush the events to the channel, based on a configurable timeout - even when a full batch was not received.
2. Block the spawned process until channel exceptions are cleared up by not reading the stdout buffer. Backoff exponentially (subject to an upper limit), till events are available.
Added unit tests for the new features. All current tests pass.
flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
Revision 1d8d267 New Change
[20] 29 lines
[+20]
30
import java.util.concurrent.ScheduledExecutorService;
30
import java.util.concurrent.ScheduledExecutorService;
31
import java.util.concurrent.ScheduledFuture;
31
import java.util.concurrent.ScheduledFuture;
32
import java.util.concurrent.TimeUnit;
32
import java.util.concurrent.TimeUnit;
33

    
   
33

   
34
import org.apache.flume.Channel;
34
import org.apache.flume.Channel;

    
   
35
import org.apache.flume.ChannelException;
35
import org.apache.flume.Context;
36
import org.apache.flume.Context;
36
import org.apache.flume.Event;
37
import org.apache.flume.Event;
37
import org.apache.flume.EventDrivenSource;
38
import org.apache.flume.EventDrivenSource;
38
import org.apache.flume.Source;
39
import org.apache.flume.Source;
39
import org.apache.flume.SystemClock;
40
import org.apache.flume.SystemClock;
[+20] [20] 115 lines
[+20] [+] public class ExecSource extends AbstractSource implements EventDrivenSource,
155
  private boolean logStderr;
156
  private boolean logStderr;
156
  private Integer bufferCount;
157
  private Integer bufferCount;
157
  private long batchTimeout;
158
  private long batchTimeout;
158
  private ExecRunnable runner;
159
  private ExecRunnable runner;
159
  private Charset charset;
160
  private Charset charset;

    
   
161
  private boolean blockOnFullChannel;
160

    
   
162

   
161
  @Override
163
  @Override
162
  public void start() {
164
  public void start() {
163
    logger.info("Exec source starting with command:{}", command);
165
    logger.info("Exec source starting with command:{}", command);
164

    
   
166

   
165
    executor = Executors.newSingleThreadExecutor();
167
    executor = Executors.newSingleThreadExecutor();
166

    
   
168

   
167
    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
169
    runner = new ExecRunnable(shell, command, getChannelProcessor(),
168
        restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
170
      sourceCounter, restart, restartThrottle, blockOnFullChannel, logStderr,

    
   
171
      bufferCount, batchTimeout, charset);
169

    
   
172

   
170
    // FIXME: Use a callback-like executor / future to signal us upon failure.
173
    // FIXME: Use a callback-like executor / future to signal us upon failure.
171
    runnerFuture = executor.submit(runner);
174
    runnerFuture = executor.submit(runner);
172

    
   
175

   
173
    /*
176
    /*
[+20] [20] 65 lines
[+20] [+] public void configure(Context context) {
239
    charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
242
    charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
240
        ExecSourceConfigurationConstants.DEFAULT_CHARSET));
243
        ExecSourceConfigurationConstants.DEFAULT_CHARSET));
241

    
   
244

   
242
    shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
245
    shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
243

    
   
246

   

    
   
247

   

    
   
248
    blockOnFullChannel = context.getBoolean(

    
   
249
      ExecSourceConfigurationConstants.BLOCK_ON_FULL_CHANNEL,

    
   
250
      ExecSourceConfigurationConstants.DEFAULT_BLOCK_ON_FULL_CHANNEL);

    
   
251

   
244
    if (sourceCounter == null) {
252
    if (sourceCounter == null) {
245
      sourceCounter = new SourceCounter(getName());
253
      sourceCounter = new SourceCounter(getName());
246
    }
254
    }
247
  }
255
  }
248

    
   
256

   
249
  private static class ExecRunnable implements Runnable {
257
  private static class ExecRunnable implements Runnable {
250

    
   
258

   
251
    public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
259
    public ExecRunnable(String shell, String command,
252
        SourceCounter sourceCounter, boolean restart, long restartThrottle,
260
      ChannelProcessor channelProcessor, SourceCounter sourceCounter,

    
   
261
      boolean restart, long restartThrottle, boolean blockOnFullChannel,
253
        boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
262
      boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
254
      this.command = command;
263
      this.command = command;
255
      this.channelProcessor = channelProcessor;
264
      this.channelProcessor = channelProcessor;
256
      this.sourceCounter = sourceCounter;
265
      this.sourceCounter = sourceCounter;
257
      this.restartThrottle = restartThrottle;
266
      this.restartThrottle = restartThrottle;
258
      this.bufferCount = bufferCount;
267
      this.bufferCount = bufferCount;
259
      this.batchTimeout = batchTimeout;
268
      this.batchTimeout = batchTimeout;
260
      this.restart = restart;
269
      this.restart = restart;
261
      this.logStderr = logStderr;
270
      this.logStderr = logStderr;
262
      this.charset = charset;
271
      this.charset = charset;
263
      this.shell = shell;
272
      this.shell = shell;

    
   
273
      this.blockOnFullChannel = blockOnFullChannel;
264
    }
274
    }
265

    
   
275

   
266
    private final String shell;
276
    private final String shell;
267
    private final String command;
277
    private final String command;
268
    private final ChannelProcessor channelProcessor;
278
    private final ChannelProcessor channelProcessor;
[+20] [20] 7 lines
[+20] private static class ExecRunnable implements Runnable {
276
    private Process process = null;
286
    private Process process = null;
277
    private SystemClock systemClock = new SystemClock();
287
    private SystemClock systemClock = new SystemClock();
278
    private Long lastPushToChannel = systemClock.currentTimeMillis();
288
    private Long lastPushToChannel = systemClock.currentTimeMillis();
279
    ScheduledExecutorService timedFlushService;
289
    ScheduledExecutorService timedFlushService;
280
    ScheduledFuture<?> future;
290
    ScheduledFuture<?> future;

    
   
291
    private final boolean blockOnFullChannel;

    
   
292
    private static final int INITIAL_BACKOFF = 500;

    
   
293
    private static final int MAX_BACKOFF = 5000;

    
   
294
    private int backOff = INITIAL_BACKOFF;
281

    
   
295

   
282
    @Override
296
    @Override
283
    public void run() {
297
    public void run() {
284
      do {
298
      do {
285
        String exitCode = "unknown";
299
        String exitCode = "unknown";
[+20] [20] 84 lines
[+20] [+] public void run() {
370
          logger.info("Command [" + command + "] exited with " + exitCode);
384
          logger.info("Command [" + command + "] exited with " + exitCode);
371
        }
385
        }
372
      } while(restart);
386
      } while(restart);
373
    }
387
    }
374

    
   
388

   
375
    private void flushEventBatch(List<Event> eventList){
389
    private void flushEventBatch(List<Event> eventList)

    
   
390
      throws ChannelException, InterruptedException {

    
   
391
      boolean flushed = false;

    
   
392
      do {

    
   
393
        try {
376
      channelProcessor.processEventBatch(eventList);
394
          channelProcessor.processEventBatch(eventList);

    
   
395
          flushed = true;

    
   
396
          backOff = INITIAL_BACKOFF;

    
   
397
        } catch (ChannelException ex) {

    
   
398
          if (blockOnFullChannel) {

    
   
399
            logger.warn("Channel is full. Exec Source will backoff for " +

    
   
400
              String.valueOf(backOff) + " seconds");

    
   
401
            TimeUnit.MILLISECONDS.sleep(backOff);

    
   
402
            int nextBackOff = backOff << 1;

    
   
403
            backOff = nextBackOff > MAX_BACKOFF ? MAX_BACKOFF : nextBackOff;

    
   
404
            continue;

    
   
405
          } else {

    
   
406
            throw ex;

    
   
407
          }

    
   
408
        }
377
      sourceCounter.addToEventAcceptedCount(eventList.size());
409
        sourceCounter.addToEventAcceptedCount(eventList.size());
378
      eventList.clear();
410
        eventList.clear();
379
      lastPushToChannel = systemClock.currentTimeMillis();
411
        lastPushToChannel = systemClock.currentTimeMillis();

    
   
412
      } while (!flushed);

    
   
413

   
380
    }
414
    }
381

    
   
415

   
382
    private boolean timeout(){
416
    private boolean timeout(){
383
      return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
417
      return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
384
    }
418
    }
[+20] [20] 83 lines
flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
Revision 957ec7f New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
Revision 54f71a1 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java: Loading...