Review Board 1.7.22


ExecSource don't flush the cache if there is no input entries

Review Request #8854 - Created Jan. 7, 2013 and updated

Fengdong Yu
1.4.0
Reviewers
Flume
flume-git
ExecSource has a default batchSize: 20, exec source read data from the source, then put it into the cache, after the cache is full, push it to the channel.

but if exec source's cache is not full, and there isn't any input for a long time, then these entries always kept in the cache, there is no chance to the channel until the source's cache is full.

so, the patch added a new config line: batchTimeout for ExecSource, and default is 3 seconds, if batchTimeout exceeded, push all cached data to the channel even the cache is not full.

 
flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
Revision 495b03f New Change
[20] 32 lines
[+20]
33
import org.apache.flume.Context;
33
import org.apache.flume.Context;
34
import org.apache.flume.CounterGroup;
34
import org.apache.flume.CounterGroup;
35
import org.apache.flume.Event;
35
import org.apache.flume.Event;
36
import org.apache.flume.EventDrivenSource;
36
import org.apache.flume.EventDrivenSource;
37
import org.apache.flume.Source;
37
import org.apache.flume.Source;

    
   
38
import org.apache.flume.SystemClock;
38
import org.apache.flume.channel.ChannelProcessor;
39
import org.apache.flume.channel.ChannelProcessor;
39
import org.apache.flume.conf.Configurable;
40
import org.apache.flume.conf.Configurable;
40
import org.apache.flume.event.EventBuilder;
41
import org.apache.flume.event.EventBuilder;
41
import org.slf4j.Logger;
42
import org.slf4j.Logger;
42
import org.slf4j.LoggerFactory;
43
import org.slf4j.LoggerFactory;
[+20] [20] 101 lines
[+20] [+] public class ExecSource extends AbstractSource implements EventDrivenSource,
144
  private String command;
145
  private String command;
145
  private CounterGroup counterGroup;
146
  private CounterGroup counterGroup;
146
  private ExecutorService executor;
147
  private ExecutorService executor;
147
  private Future<?> runnerFuture;
148
  private Future<?> runnerFuture;
148
  private long restartThrottle;
149
  private long restartThrottle;

    
   
150
  private long batchTimeout;
149
  private boolean restart;
151
  private boolean restart;
150
  private boolean logStderr;
152
  private boolean logStderr;
151
  private Integer bufferCount;
153
  private Integer bufferCount;
152
  private ExecRunnable runner;
154
  private ExecRunnable runner;
153
  private Charset charset;
155
  private Charset charset;
[+20] [20] 4 lines
[+20] [+] public void start() {
158

    
   
160

   
159
    executor = Executors.newSingleThreadExecutor();
161
    executor = Executors.newSingleThreadExecutor();
160
    counterGroup = new CounterGroup();
162
    counterGroup = new CounterGroup();
161

    
   
163

   
162
    runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
164
    runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
163
        restart, restartThrottle, logStderr, bufferCount, charset);
165
        restart, restartThrottle, logStderr, bufferCount, charset, batchTimeout);
164

    
   
166

   
165
    // FIXME: Use a callback-like executor / future to signal us upon failure.
167
    // FIXME: Use a callback-like executor / future to signal us upon failure.
166
    runnerFuture = executor.submit(runner);
168
    runnerFuture = executor.submit(runner);
167

    
   
169

   
168
    /*
170
    /*
[+20] [20] 58 lines
[+20] [+] public void configure(Context context) {
227
    bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
229
    bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
228
        ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
230
        ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
229

    
   
231

   
230
    charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
232
    charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
231
        ExecSourceConfigurationConstants.DEFAULT_CHARSET));
233
        ExecSourceConfigurationConstants.DEFAULT_CHARSET));

    
   
234

   

    
   
235
    batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,

    
   
236
        ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);
232
  }
237
  }
233

    
   
238

   
234
  private static class ExecRunnable implements Runnable {
239
  private static class ExecRunnable implements Runnable {
235

    
   
240

   
236
    public ExecRunnable(String command, ChannelProcessor channelProcessor,
241
    public ExecRunnable(String command, ChannelProcessor channelProcessor,
237
        CounterGroup counterGroup, boolean restart, long restartThrottle,
242
        CounterGroup counterGroup, boolean restart, long restartThrottle,
238
        boolean logStderr, int bufferCount, Charset charset) {
243
        boolean logStderr, int bufferCount, Charset charset, long batchTimeout) {
239
      this.command = command;
244
      this.command = command;
240
      this.channelProcessor = channelProcessor;
245
      this.channelProcessor = channelProcessor;
241
      this.counterGroup = counterGroup;
246
      this.counterGroup = counterGroup;
242
      this.restartThrottle = restartThrottle;
247
      this.restartThrottle = restartThrottle;
243
      this.bufferCount = bufferCount;
248
      this.bufferCount = bufferCount;

    
   
249
      this.batchTimeout = batchTimeout;
244
      this.restart = restart;
250
      this.restart = restart;
245
      this.logStderr = logStderr;
251
      this.logStderr = logStderr;
246
      this.charset = charset;
252
      this.charset = charset;
247
    }
253
    }
248

    
   
254

   

    
   
255
    private SystemClock systemClock = new SystemClock();
249
    private String command;
256
    private String command;
250
    private ChannelProcessor channelProcessor;
257
    private ChannelProcessor channelProcessor;
251
    private CounterGroup counterGroup;
258
    private CounterGroup counterGroup;
252
    private volatile boolean restart;
259
    private volatile boolean restart;
253
    private long restartThrottle;
260
    private long restartThrottle;
254
    private int bufferCount;
261
    private int bufferCount;

    
   
262
    private long batchTimeout;
255
    private boolean logStderr;
263
    private boolean logStderr;
256
    private Charset charset;
264
    private Charset charset;
257
    private Process process = null;
265
    private Process process = null;

    
   
266
    private long lastPushToChannel = systemClock.currentTimeMillis();
258

    
   
267

   
259
    @Override
268
    @Override
260
    public void run() {
269
    public void run() {
261
      do {
270
      do {
262
        String exitCode = "unknown";
271
        String exitCode = "unknown";
[+20] [20] 14 lines
[+20] public void run() {
277
          String line = null;
286
          String line = null;
278
          List<Event> eventList = new ArrayList<Event>();
287
          List<Event> eventList = new ArrayList<Event>();
279
          while ((line = reader.readLine()) != null) {
288
          while ((line = reader.readLine()) != null) {
280
            counterGroup.incrementAndGet("exec.lines.read");
289
            counterGroup.incrementAndGet("exec.lines.read");
281
            eventList.add(EventBuilder.withBody(line.getBytes(charset)));
290
            eventList.add(EventBuilder.withBody(line.getBytes(charset)));
282
            if(eventList.size() >= bufferCount) {
291
            if(eventList.size() >= bufferCount || timeout()) {
283
              channelProcessor.processEventBatch(eventList);
292
              channelProcessor.processEventBatch(eventList);
284
              eventList.clear();
293
              eventList.clear();

    
   
294
              lastPushToChannel = systemClock.currentTimeMillis();
285
            }
295
            }
286
          }
296
          }
287
          if(!eventList.isEmpty()) {
297
          if(!eventList.isEmpty()) {
288
            channelProcessor.processEventBatch(eventList);
298
            channelProcessor.processEventBatch(eventList);
289
          }
299
          }
[+20] [20] 14 lines
[+20] public void run() {
304
        }
314
        }
305
        if(restart) {
315
        if(restart) {
306
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
316
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
307
              exitCode);
317
              exitCode);
308
          try {
318
          try {

    
   
319
            if (restartThrottle > 0){
309
            Thread.sleep(restartThrottle);
320
              Thread.sleep(restartThrottle);

    
   
321
            }
310
          } catch (InterruptedException e) {
322
          } catch (InterruptedException e) {
311
            Thread.currentThread().interrupt();
323
            Thread.currentThread().interrupt();
312
          }
324
          }
313
        } else {
325
        } else {
314
          logger.info("Command [" + command + "] exited with " + exitCode);
326
          logger.info("Command [" + command + "] exited with " + exitCode);
[+20] [20] 15 lines
[+20] [+] public int kill() {
330
      return Integer.MIN_VALUE / 2;
342
      return Integer.MIN_VALUE / 2;
331
    }
343
    }
332
    public void setRestart(boolean restart) {
344
    public void setRestart(boolean restart) {
333
      this.restart = restart;
345
      this.restart = restart;
334
    }
346
    }

    
   
347

   

    
   
348
    public boolean timeout(){

    
   
349
      return (systemClock.currentTimeMillis() - lastPushToChannel) >= 

    
   
350
        batchTimeout; 

    
   
351
    }
335
  }
352
  }
336
  private static class StderrReader extends Thread {
353
  private static class StderrReader extends Thread {
337
    private BufferedReader input;
354
    private BufferedReader input;
338
    private boolean logStderr;
355
    private boolean logStderr;
339
    protected StderrReader(BufferedReader input, boolean logStderr) {
356
    protected StderrReader(BufferedReader input, boolean logStderr) {
[+20] [20] 31 lines
flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
Revision 1b35b01 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java: Loading...