Review Board 1.7.22


FLUME-1849: Embedded Agent doesn't shutdown supervisor

Review Request #8973 - Created Jan. 16, 2013 and submitted

Brock Noland
FLUME-1849
Reviewers
Flume
flume-git
The EmbeddedAgent does not shutdown the supervisor, this was done initially so that exceptions could be caught during a components stop method. However, this causes non-daemon threads to continue to run causing the JVM to stay alive.
Unit tests pass. Three unit tests are removed which cannot be supported given the fact that we need to use the supervisor to do the shutdown.

I manually verified the JVM can shutdown with this patch.
flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
Revision 4adbea7 New Change
[20] 39 lines
[+20]
40
import org.slf4j.Logger;
40
import org.slf4j.Logger;
41
import org.slf4j.LoggerFactory;
41
import org.slf4j.LoggerFactory;
42

    
   
42

   
43
import com.google.common.annotations.VisibleForTesting;
43
import com.google.common.annotations.VisibleForTesting;
44
import com.google.common.base.Preconditions;
44
import com.google.common.base.Preconditions;
45
import com.google.common.collect.Lists;

   
46

    
   
45

   
47
/**
46
/**
48
 * EmbeddedAgent gives Flume users the ability to embed simple agents in
47
 * EmbeddedAgent gives Flume users the ability to embed simple agents in
49
 * applications. This Agent is mean to be much simpler than a traditional
48
 * applications. This Agent is mean to be much simpler than a traditional
50
 * agent and as such it's more restrictive than what can be configured
49
 * agent and as such it's more restrictive than what can be configured
[+20] [20] 83 lines
[+20] [+] public void start()
134
  public void stop()
133
  public void stop()
135
      throws FlumeException {
134
      throws FlumeException {
136
    if(state != State.STARTED) {
135
    if(state != State.STARTED) {
137
      throw new IllegalStateException("Cannot be stopped unless started");
136
      throw new IllegalStateException("Cannot be stopped unless started");
138
    }
137
    }
139
    doStop();
138
    supervisor.stop();
140
    embeddedSource = null;
139
    embeddedSource = null;
141
    state = State.STOPPED;
140
    state = State.STOPPED;
142
  }
141
  }
143

    
   
142

   
144
  private void doConfigure(Map<String, String> properties) {
143
  private void doConfigure(Map<String, String> properties) {
[+20] [20] 65 lines
[+20] [+] public void putAll(List<Event> events) throws EventDeliveryException {
210
    }
209
    }
211
  }
210
  }
212

    
   
211

   
213
  private void doStart() {
212
  private void doStart() {
214
    boolean error = true;
213
    boolean error = true;
215
    List<LifecycleAware> supervised = Lists.newArrayList();

   
216
    try {
214
    try {
217
      channel.start();
215
      channel.start();
218
      sinkRunner.start();
216
      sinkRunner.start();
219
      sourceRunner.start();
217
      sourceRunner.start();
220

    
   
218

   
221
      supervisor.supervise(channel,
219
      supervisor.supervise(channel,
222
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
220
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
223
      supervised.add(channel);

   
224
      supervisor.supervise(sinkRunner,
221
      supervisor.supervise(sinkRunner,
225
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
222
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
226
      supervised.add(sinkRunner);

   
227
      supervisor.supervise(sourceRunner,
223
      supervisor.supervise(sourceRunner,
228
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
224
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
229
      supervised.add(sourceRunner);

   
230

    
   

   
231
      error = false;
225
      error = false;
232
    } finally {
226
    } finally {
233
      if(error) {
227
      if(error) {
234
        for(LifecycleAware lifeCycleAware : supervised) {

   
235
          try {

   
236
            supervisor.unsupervise(lifeCycleAware);

   
237
          } catch (Exception e) {

   
238
            LOGGER.warn("Exception while stopping " + lifeCycleAware +

   
239
                " due to error on startup", e);

   
240
          }

   
241
        }

   
242
        stopLogError(sourceRunner);
228
        stopLogError(sourceRunner);
243
        stopLogError(channel);
229
        stopLogError(channel);
244
        stopLogError(sinkRunner);
230
        stopLogError(sinkRunner);

    
   
231
        supervisor.stop();
245
      }
232
      }
246
    }
233
    }
247
  }
234
  }
248
  private void stopLogError(LifecycleAware lifeCycleAware) {
235
  private void stopLogError(LifecycleAware lifeCycleAware) {
249
    try {
236
    try {
250
      if(LifecycleState.START.equals(lifeCycleAware.getLifecycleState())) {
237
      if(LifecycleState.START.equals(lifeCycleAware.getLifecycleState())) {
251
        lifeCycleAware.stop();
238
        lifeCycleAware.stop();
252
      }
239
      }
253
    } catch (Exception e) {
240
    } catch (Exception e) {
254
      LOGGER.warn("Exception while stopping " + lifeCycleAware, e);
241
      LOGGER.warn("Exception while stopping " + lifeCycleAware, e);
255
    }
242
    }
256
  }
243
  }
257
  private void doStop() {

   
258
    Exception exception = null;

   
259
    // source

   
260
    try {

   
261
      if(LifecycleState.START.equals(sourceRunner.getLifecycleState())) {

   
262
        sourceRunner.stop();

   
263
      }

   
264
    } catch (Exception e) {

   
265
      exception = e;

   
266
      LOGGER.error("Caught exception stopping source " + sourceRunner, e);

   
267
    }

   
268
    // sink

   
269
    try {

   
270
      if(LifecycleState.START.equals(sinkRunner.getLifecycleState())) {

   
271
        sinkRunner.stop();

   
272
      }

   
273
    } catch (Exception e) {

   
274
      exception = e;

   
275
      LOGGER.error("Caught exception stopping sink " + sinkRunner, e);

   
276
    }

   
277
    // channel

   
278
    try {

   
279
      if(LifecycleState.START.equals(channel.getLifecycleState())) {

   
280
        channel.stop();

   
281
      }

   
282
    } catch (Exception e) {

   
283
      exception = e;

   
284
      LOGGER.error("Caught exception stopping channel " + channel, e);

   
285
    }

   
286
    if(exception != null) {

   
287
      throw new FlumeException("Error stopping one or more components " +

   
288
          "check the logs for an exhaustive list of errors", exception);

   
289
    }

   
290
  }

   
291

    
   
244

   
292
  private static enum State {
245
  private static enum State {
293
    NEW(),
246
    NEW(),
294
    STOPPED(),
247
    STOPPED(),
295
    STARTED();
248
    STARTED();
296
  }
249
  }
297
}
250
}
flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
Revision b315770 New Change
 
  1. flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java: Loading...
  2. flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java: Loading...