Review Board 1.7.22


FLUME-1296: Lifecycle supervisor should check if the monitor service is still running before supervising and also make sure shutdown of monitor service happens before component is stopped.

Review Request #5467 - Created June 21, 2012 and submitted

Hari Shreedharan
FLUME-1296
Reviewers
Flume
flume-git
* Checks to make sure the supervisor is still running, before supervising more components
* Make sure shutdown of monitor threads happen before the component's stop() is called.
All current unit tests pass. Ran an agent with a dummy sink that never starts but checks thread interrupt every 5 seconds - works fine.

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java
Revision 78eda05 New Change
[20] 24 lines
[+20]
25
import java.util.concurrent.ScheduledExecutorService;
25
import java.util.concurrent.ScheduledExecutorService;
26
import java.util.concurrent.ScheduledFuture;
26
import java.util.concurrent.ScheduledFuture;
27
import java.util.concurrent.ScheduledThreadPoolExecutor;
27
import java.util.concurrent.ScheduledThreadPoolExecutor;
28
import java.util.concurrent.TimeUnit;
28
import java.util.concurrent.TimeUnit;
29

    
   
29

   

    
   
30
import org.apache.flume.FlumeException;
30
import org.slf4j.Logger;
31
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
32
import org.slf4j.LoggerFactory;
32

    
   
33

   
33
import com.google.common.base.Preconditions;
34
import com.google.common.base.Preconditions;
34
import com.google.common.util.concurrent.ThreadFactoryBuilder;
35
import com.google.common.util.concurrent.ThreadFactoryBuilder;
[+20] [20] 43 lines
[+20] [+] public synchronized void stop() {
78
    logger.info("Stopping lifecycle supervisor {}", Thread.currentThread()
79
    logger.info("Stopping lifecycle supervisor {}", Thread.currentThread()
79
        .getId());
80
        .getId());
80

    
   
81

   
81
    if (monitorService != null) {
82
    if (monitorService != null) {
82
      monitorService.shutdown();
83
      monitorService.shutdown();
83

    
   
84
      try{
84
      while (!monitorService.isTerminated()) {
85
        monitorService.awaitTermination(10, TimeUnit.SECONDS);
85
        try {

   
86
          monitorService.awaitTermination(500, TimeUnit.MILLISECONDS);

   
87
        } catch (InterruptedException e) {
86
      } catch (InterruptedException e) {
88
          logger.debug("Interrupted while waiting for monitor service to stop");
87
        logger.error("Interrupted while waiting for monitor service to stop");

    
   
88
      }

    
   
89
      if(!monitorService.isTerminated()) {
89
          monitorService.shutdownNow();
90
        monitorService.shutdownNow();

    
   
91
        try {

    
   
92
          while(!monitorService.isTerminated()) {

    
   
93
            monitorService.awaitTermination(10, TimeUnit.SECONDS);

    
   
94
          }

    
   
95
        } catch (InterruptedException e) {

    
   
96
          logger.error("Interrupted while waiting for monitor service to stop");
90
        }
97
        }
91
      }
98
      }
92
    }
99
    }
93

    
   
100

   
94
    for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
101
    for (final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
95
        .entrySet()) {
102
        .entrySet()) {
96

    
   
103

   
97
      if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {
104
      if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {

    
   
105
        entry.getValue().status.desiredState = LifecycleState.STOP;
98
        entry.getKey().stop();
106
        entry.getKey().stop();
99
      }
107
      }
100
    }
108
    }
101

    
   
109

   
102
    /* If we've failed, preserve the error state. */
110
    /* If we've failed, preserve the error state. */
[+20] [20] 9 lines
[+20] public synchronized void stop() {
112
    lifecycleState = LifecycleState.ERROR;
120
    lifecycleState = LifecycleState.ERROR;
113
  }
121
  }
114

    
   
122

   
115
  public synchronized void supervise(LifecycleAware lifecycleAware,
123
  public synchronized void supervise(LifecycleAware lifecycleAware,
116
      SupervisorPolicy policy, LifecycleState desiredState) {
124
      SupervisorPolicy policy, LifecycleState desiredState) {

    
   
125
    if(this.monitorService.isShutdown()

    
   
126
        || this.monitorService.isTerminated()

    
   
127
        || this.monitorService.isTerminating()){

    
   
128
      throw new FlumeException("Supervise called on " + lifecycleAware + " " +

    
   
129
          "after shutdown has been initiated. " + lifecycleAware + " will not" +

    
   
130
          " be started");

    
   
131
    }
117

    
   
132

   
118
    Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
133
    Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
119
        "Refusing to supervise " + lifecycleAware + " more than once");
134
        "Refusing to supervise " + lifecycleAware + " more than once");
120

    
   
135

   
121
    if (logger.isDebugEnabled()) {
136
    if (logger.isDebugEnabled()) {
[+20] [20] 240 lines
  1. flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java: Loading...