Review Board 1.7.22


Added new config 'shell' for Exec source

Review Request #7748 - Created Oct. 26, 2012 and submitted

Roshan Naik
https://issues.apache.org/jira/browse/FLUME-1661
Reviewers
Flume
flume-git
Added new optional config directive 'shell' for Exec Source. One can specify how to invoke a shell to run the command (e.g. /bin/sh -c) . This is only needed for commands that use features like wildcards, backticks, pipes, etc that are supported by the shell. 
Have added a few test cases for both simple and more complex commands.
flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
Revision 46f672f New Change
[20] 137 lines
[+20] [+] public class ExecSource extends AbstractSource implements EventDrivenSource,
138

    
   
138

   
139
  private static final Logger logger = LoggerFactory
139
  private static final Logger logger = LoggerFactory
140
      .getLogger(ExecSource.class);
140
      .getLogger(ExecSource.class);
141

    
   
141

   
142

    
   
142

   

    
   
143
  private String shell;
143
  private String command;
144
  private String command;
144
  private CounterGroup counterGroup;
145
  private CounterGroup counterGroup;
145
  private ExecutorService executor;
146
  private ExecutorService executor;
146
  private Future<?> runnerFuture;
147
  private Future<?> runnerFuture;
147
  private long restartThrottle;
148
  private long restartThrottle;
[+20] [20] 7 lines
[+20] public class ExecSource extends AbstractSource implements EventDrivenSource,
155
    logger.info("Exec source starting with command:{}", command);
156
    logger.info("Exec source starting with command:{}", command);
156

    
   
157

   
157
    executor = Executors.newSingleThreadExecutor();
158
    executor = Executors.newSingleThreadExecutor();
158
    counterGroup = new CounterGroup();
159
    counterGroup = new CounterGroup();
159

    
   
160

   
160
    runner = new ExecRunnable(command, getChannelProcessor(), counterGroup,
161
    runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup,
161
        restart, restartThrottle, logStderr, bufferCount);
162
        restart, restartThrottle, logStderr, bufferCount);
162

    
   
163

   
163
    // FIXME: Use a callback-like executor / future to signal us upon failure.
164
    // FIXME: Use a callback-like executor / future to signal us upon failure.
164
    runnerFuture = executor.submit(runner);
165
    runnerFuture = executor.submit(runner);
165

    
   
166

   
[+20] [20] 56 lines
[+20] [+] public void configure(Context context) {
222
    logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
223
    logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
223
        ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
224
        ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
224

    
   
225

   
225
    bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
226
    bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
226
        ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
227
        ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);

    
   
228

   

    
   
229
    shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);

    
   
230

   
227
  }
231
  }
228

    
   
232

   
229
  private static class ExecRunnable implements Runnable {
233
  private static class ExecRunnable implements Runnable {
230

    
   
234

   
231
    public ExecRunnable(String command, ChannelProcessor channelProcessor,
235
    public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
232
        CounterGroup counterGroup, boolean restart, long restartThrottle,
236
        CounterGroup counterGroup, boolean restart, long restartThrottle,
233
        boolean logStderr, int bufferCount) {
237
        boolean logStderr, int bufferCount) {

    
   
238
      this.shell = shell;
234
      this.command = command;
239
      this.command = command;
235
      this.channelProcessor = channelProcessor;
240
      this.channelProcessor = channelProcessor;
236
      this.counterGroup = counterGroup;
241
      this.counterGroup = counterGroup;
237
      this.restartThrottle = restartThrottle;
242
      this.restartThrottle = restartThrottle;
238
      this.bufferCount = bufferCount;
243
      this.bufferCount = bufferCount;
239
      this.restart = restart;
244
      this.restart = restart;
240
      this.logStderr = logStderr;
245
      this.logStderr = logStderr;
241
    }
246
    }
242

    
   
247

   

    
   
248
    private String shell;
243
    private String command;
249
    private String command;
244
    private ChannelProcessor channelProcessor;
250
    private ChannelProcessor channelProcessor;
245
    private CounterGroup counterGroup;
251
    private CounterGroup counterGroup;
246
    private volatile boolean restart;
252
    private volatile boolean restart;
247
    private long restartThrottle;
253
    private long restartThrottle;
[+20] [20] 5 lines
[+20] private static class ExecRunnable implements Runnable {
253
    public void run() {
259
    public void run() {
254
      do {
260
      do {
255
        String exitCode = "unknown";
261
        String exitCode = "unknown";
256
        BufferedReader reader = null;
262
        BufferedReader reader = null;
257
        try {
263
        try {

    
   
264
          if(shell != null) {

    
   
265
            String[] commandArgs = formulateShellCommand(shell, command);

    
   
266
            process = Runtime.getRuntime().exec(commandArgs);

    
   
267
          }  else {
258
          String[] commandArgs = command.split("\\s+");
268
            String[] commandArgs = command.split("\\s+");
259
          process = new ProcessBuilder(commandArgs).start();
269
            process = new ProcessBuilder(commandArgs).start();

    
   
270
          }
260
          reader = new BufferedReader(
271
          reader = new BufferedReader(
261
              new InputStreamReader(process.getInputStream()));
272
              new InputStreamReader(process.getInputStream()));
262

    
   
273

   
263
          // StderrLogger dies as soon as the input stream is invalid
274
          // StderrLogger dies as soon as the input stream is invalid
264
          StderrReader stderrReader = new StderrReader(new BufferedReader(
275
          StderrReader stderrReader = new StderrReader(new BufferedReader(
[+20] [20] 58 lines
[+20] [+] public int kill() {
323
      return Integer.MIN_VALUE / 2;
334
      return Integer.MIN_VALUE / 2;
324
    }
335
    }
325
    public void setRestart(boolean restart) {
336
    public void setRestart(boolean restart) {
326
      this.restart = restart;
337
      this.restart = restart;
327
    }
338
    }

    
   
339

   

    
   
340
    private static String[] formulateShellCommand(String shell, String command) {

    
   
341
       String[] shellArgs = shell.split("\\s+");

    
   
342
       String[] result = new String[shellArgs.length + 1];

    
   
343
       System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);

    
   
344
       result[shellArgs.length] = command;

    
   
345
       return result;

    
   
346
    }
328
  }
347
  }

    
   
348

   
329
  private static class StderrReader extends Thread {
349
  private static class StderrReader extends Thread {
330
    private BufferedReader input;
350
    private BufferedReader input;
331
    private boolean logStderr;
351
    private boolean logStderr;
332
    protected StderrReader(BufferedReader input, boolean logStderr) {
352
    protected StderrReader(BufferedReader input, boolean logStderr) {
333
      this.input = input;
353
      this.input = input;
[+20] [20] 27 lines
flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java
Revision 0ba0508 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
Revision 8bcf320 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 29ead84 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java: Loading...
  4. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...