Review Board 1.7.22


ACCUMULO-1307 - adds flag to prompt fate threads to end

Review Request #15002 - Created Oct. 28, 2013 and submitted

John Vines
master
ACCUMULO-1307
Reviewers
accumulo
accumulo
First pass at shutting down FATE. Maybe makes sense to daemonize the thread and add a daemonized monitor thread that 'manages' them somehow in case operation is stuck?

 

Diff revision 4 (Latest)

1 2 3 4
1 2 3 4

  1. fate/src/main/java/org/apache/accumulo/fate/Fate.java: Loading...
  2. server/src/main/java/org/apache/accumulo/server/master/Master.java: Loading...
fate/src/main/java/org/apache/accumulo/fate/Fate.java
Revision bd36edb New Change
[20] 14 lines
[+20]
15
 * limitations under the License.
15
 * limitations under the License.
16
 */
16
 */
17
package org.apache.accumulo.fate;
17
package org.apache.accumulo.fate;
18

    
   
18

   
19
import java.util.EnumSet;
19
import java.util.EnumSet;

    
   
20
import java.util.concurrent.atomic.AtomicBoolean;
20

    
   
21

   
21
import org.apache.accumulo.fate.TStore.TStatus;
22
import org.apache.accumulo.fate.TStore.TStatus;
22
import org.apache.accumulo.fate.util.Daemon;
23
import org.apache.accumulo.fate.util.Daemon;
23
import org.apache.accumulo.fate.util.LoggingRunnable;
24
import org.apache.accumulo.fate.util.LoggingRunnable;
24
import org.apache.log4j.Logger;
25
import org.apache.log4j.Logger;
25

    
   
26

   
26
/**
27
/**
27
 * Fault tolerant executor
28
 * Fault tolerant executor
28
 * 
29
 * 
29
 * 
30
 * 
30
 */
31
 */
31

    
   
32

   
32
public class Fate<T> {
33
public class Fate<T> {
33
  
34

   
34
  private static final String DEBUG_PROP = "debug";
35
  private static final String DEBUG_PROP = "debug";
35
  private static final String AUTO_CLEAN_PROP = "autoClean";
36
  private static final String AUTO_CLEAN_PROP = "autoClean";
36
  private static final String EXCEPTION_PROP = "exception";
37
  private static final String EXCEPTION_PROP = "exception";
37
  private static final String RETURN_PROP = "return";
38
  private static final String RETURN_PROP = "return";
38
  
39

   
39
  final private static Logger log = Logger.getLogger(Fate.class);
40
  final private static Logger log = Logger.getLogger(Fate.class);
40
  
41

   
41
  private TStore<T> store;
42
  private TStore<T> store;
42
  private T environment;
43
  private T environment;
43
  
44

   
44
  private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(TStatus.FAILED, TStatus.SUCCESSFUL, TStatus.UNKNOWN);
45
  private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(TStatus.FAILED, TStatus.SUCCESSFUL, TStatus.UNKNOWN);
45
  
46

   

    
   
47
  private AtomicBoolean keepRunning = new AtomicBoolean(true);

    
   
48

   
46
  private class TransactionRunner implements Runnable {
49
  private class TransactionRunner implements Runnable {
47
    
50

   
48
    @Override
51
    @Override
49
    public void run() {
52
    public void run() {
50
      while (true) {
53
      while (keepRunning.get()) {
51
        long deferTime = 0;
54
        long deferTime = 0;
52
        long tid = store.reserve();
55
        long tid = store.reserve();
53
        try {
56
        try {
54
          TStatus status = store.getStatus(tid);
57
          TStatus status = store.getStatus(tid);
55
          Repo<T> op = store.top(tid);
58
          Repo<T> op = store.top(tid);
[+20] [20] 6 lines
[+20] private class TransactionRunner implements Runnable {
62
              if (deferTime == 0) {
65
              if (deferTime == 0) {
63
                prevOp = op;
66
                prevOp = op;
64
                op = op.call(tid, environment);
67
                op = op.call(tid, environment);
65
              } else
68
              } else
66
                continue;
69
                continue;
67
              
70

   
68
            } catch (Exception e) {
71
            } catch (Exception e) {
69
              transitionToFailed(tid, op, e);
72
              transitionToFailed(tid, op, e);
70
              continue;
73
              continue;
71
            }
74
            }
72
            
75

   
73
            if (op == null) {
76
            if (op == null) {
74
              // transaction is finished
77
              // transaction is finished
75
              String ret = prevOp.getReturn();
78
              String ret = prevOp.getReturn();
76
              if (ret != null)
79
              if (ret != null)
77
                store.setProperty(tid, RETURN_PROP, ret);
80
                store.setProperty(tid, RETURN_PROP, ret);
[+20] [20] 11 lines
[+20] private class TransactionRunner implements Runnable {
89
            }
92
            }
90
          }
93
          }
91
        } finally {
94
        } finally {
92
          store.unreserve(tid, deferTime);
95
          store.unreserve(tid, deferTime);
93
        }
96
        }
94
        
97

   
95
      }
98
      }
96
    }
99
    }
97
    
100

   
98
    private void transitionToFailed(long tid, Repo<T> op, Exception e) {
101
    private void transitionToFailed(long tid, Repo<T> op, Exception e) {
99
      store.setProperty(tid, EXCEPTION_PROP, e);
102
      store.setProperty(tid, EXCEPTION_PROP, e);
100
      store.setStatus(tid, TStatus.FAILED_IN_PROGRESS);
103
      store.setStatus(tid, TStatus.FAILED_IN_PROGRESS);
101
      log.warn("Failed to execute Repo, tid=" + String.format("%016x", tid), e);
104
      log.warn("Failed to execute Repo, tid=" + String.format("%016x", tid), e);
102
    }
105
    }
103
    
106

   
104
    private void processFailed(long tid, Repo<T> op) {
107
    private void processFailed(long tid, Repo<T> op) {
105
      while (op != null) {
108
      while (op != null) {
106
        undo(tid, op);
109
        undo(tid, op);
107
        
110

   
108
        store.pop(tid);
111
        store.pop(tid);
109
        op = store.top(tid);
112
        op = store.top(tid);
110
      }
113
      }
111
      
114

   
112
      store.setStatus(tid, TStatus.FAILED);
115
      store.setStatus(tid, TStatus.FAILED);
113
      doCleanUp(tid);
116
      doCleanUp(tid);
114
    }
117
    }
115
    
118

   
116
    private void doCleanUp(long tid) {
119
    private void doCleanUp(long tid) {
117
      Boolean autoClean = (Boolean) store.getProperty(tid, AUTO_CLEAN_PROP);
120
      Boolean autoClean = (Boolean) store.getProperty(tid, AUTO_CLEAN_PROP);
118
      if (autoClean != null && autoClean) {
121
      if (autoClean != null && autoClean) {
119
        store.delete(tid);
122
        store.delete(tid);
120
      } else {
123
      } else {
121
        // no longer need persisted operations, so delete them to save space in case
124
        // no longer need persisted operations, so delete them to save space in case
122
        // TX is never cleaned up...
125
        // TX is never cleaned up...
123
        while (store.top(tid) != null)
126
        while (store.top(tid) != null)
124
          store.pop(tid);
127
          store.pop(tid);
125
      }
128
      }
126
    }
129
    }
127
    
130

   
128
    private void undo(long tid, Repo<T> op) {
131
    private void undo(long tid, Repo<T> op) {
129
      try {
132
      try {
130
        op.undo(tid, environment);
133
        op.undo(tid, environment);
131
      } catch (Exception e) {
134
      } catch (Exception e) {
132
        log.warn("Failed to undo Repo, tid=" + String.format("%016x", tid), e);
135
        log.warn("Failed to undo Repo, tid=" + String.format("%016x", tid), e);
133
      }
136
      }
134
    }
137
    }
135
    
138

   
136
  }
139
  }
137
  
140

   
138
  public Fate(T environment, TStore<T> store, int numTreads) {
141
  public Fate(T environment, TStore<T> store, int numThreads) {
139
    this.store = store;
142
    this.store = store;
140
    this.environment = environment;
143
    this.environment = environment;
141
    
144

   
142
    for (int i = 0; i < numTreads; i++) {
145
    for (int i = 0; i < numThreads; i++) {
143
      // TODO: use an ExecutorService, maybe a utility to do these steps throughout the server packages - ACCUMULO-1311
146
      // TODO: use an ExecutorService, maybe a utility to do these steps throughout the server packages - ACCUMULO-1311
144
      Thread thread = new Daemon(new LoggingRunnable(log, new TransactionRunner()), "Repo runner " + i);
147
      Thread thread = new Daemon(new LoggingRunnable(log, new TransactionRunner()), "Repo runner " + i);
145
      thread.start();
148
      thread.start();
146
    }
149
    }
147
  }
150
  }
148
  
151

   
149
  // get a transaction id back to the requester before doing any work
152
  // get a transaction id back to the requester before doing any work
150
  public long startTransaction() {
153
  public long startTransaction() {
151
    long dir = store.create();
154
    long dir = store.create();
152
    return dir;
155
    return dir;
153
  }
156
  }
154
  
157

   
155
  // start work in the transaction.. it is safe to call this
158
  // start work in the transaction.. it is safe to call this
156
  // multiple times for a transaction... but it will only seed once
159
  // multiple times for a transaction... but it will only seed once
157
  public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp) {
160
  public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp) {
158
    store.reserve(tid);
161
    store.reserve(tid);
159
    try {
162
    try {
[+20] [20] 4 lines
[+20] public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp) {
164
          } catch (StackOverflowException e) {
167
          } catch (StackOverflowException e) {
165
            // this should not happen
168
            // this should not happen
166
            throw new RuntimeException(e);
169
            throw new RuntimeException(e);
167
          }
170
          }
168
        }
171
        }
169
        
172

   
170
        if (autoCleanUp)
173
        if (autoCleanUp)
171
          store.setProperty(tid, AUTO_CLEAN_PROP, new Boolean(autoCleanUp));
174
          store.setProperty(tid, AUTO_CLEAN_PROP, new Boolean(autoCleanUp));
172
        
175

   
173
        store.setProperty(tid, DEBUG_PROP, repo.getDescription());
176
        store.setProperty(tid, DEBUG_PROP, repo.getDescription());
174
        
177

   
175
        store.setStatus(tid, TStatus.IN_PROGRESS);
178
        store.setStatus(tid, TStatus.IN_PROGRESS);
176
      }
179
      }
177
    } finally {
180
    } finally {
178
      store.unreserve(tid, 0);
181
      store.unreserve(tid, 0);
179
    }
182
    }
180
    
183

   
181
  }
184
  }
182
  
185

   
183
  // check on the transaction
186
  // check on the transaction
184
  public TStatus waitForCompletion(long tid) {
187
  public TStatus waitForCompletion(long tid) {
185
    return store.waitForStatusChange(tid, FINISHED_STATES);
188
    return store.waitForStatusChange(tid, FINISHED_STATES);
186
  }
189
  }
187
  
190

   
188
  // resource cleanup
191
  // resource cleanup
189
  public void delete(long tid) {
192
  public void delete(long tid) {
190
    store.reserve(tid);
193
    store.reserve(tid);
191
    try {
194
    try {
192
      switch (store.getStatus(tid)) {
195
      switch (store.getStatus(tid)) {
[+20] [20] 11 lines
[+20] public void delete(long tid) {
204
      }
207
      }
205
    } finally {
208
    } finally {
206
      store.unreserve(tid, 0);
209
      store.unreserve(tid, 0);
207
    }
210
    }
208
  }
211
  }
209
  
212

   
210
  public String getReturn(long tid) {
213
  public String getReturn(long tid) {
211
    store.reserve(tid);
214
    store.reserve(tid);
212
    try {
215
    try {
213
      if (store.getStatus(tid) != TStatus.SUCCESSFUL)
216
      if (store.getStatus(tid) != TStatus.SUCCESSFUL)
214
        throw new IllegalStateException("Tried to get exception when transaction " + String.format("%016x", tid) + " not in successful state");
217
        throw new IllegalStateException("Tried to get exception when transaction " + String.format("%016x", tid) + " not in successful state");
215
      return (String) store.getProperty(tid, RETURN_PROP);
218
      return (String) store.getProperty(tid, RETURN_PROP);
216
    } finally {
219
    } finally {
217
      store.unreserve(tid, 0);
220
      store.unreserve(tid, 0);
218
    }
221
    }
219
  }
222
  }
220
  
223

   
221
  // get reportable failures
224
  // get reportable failures
222
  public Exception getException(long tid) {
225
  public Exception getException(long tid) {
223
    store.reserve(tid);
226
    store.reserve(tid);
224
    try {
227
    try {
225
      if (store.getStatus(tid) != TStatus.FAILED)
228
      if (store.getStatus(tid) != TStatus.FAILED)
226
        throw new IllegalStateException("Tried to get exception when transaction " + String.format("%016x", tid) + " not in failed state");
229
        throw new IllegalStateException("Tried to get exception when transaction " + String.format("%016x", tid) + " not in failed state");
227
      return (Exception) store.getProperty(tid, EXCEPTION_PROP);
230
      return (Exception) store.getProperty(tid, EXCEPTION_PROP);
228
    } finally {
231
    } finally {
229
      store.unreserve(tid, 0);
232
      store.unreserve(tid, 0);
230
    }
233
    }
231
  }
234
  }
232
  
235

   

    
   
236
  /**

    
   
237
   * Flags that FATE threadpool to clear out and end. Does not actively stop running FATE processes.

    
   
238
   */

    
   
239
  public void shutdown() {

    
   
240
    keepRunning.set(false);

    
   
241
  }

    
   
242

   
233
}
243
}
server/src/main/java/org/apache/accumulo/server/master/Master.java
Revision c029ae5 New Change
 
  1. fate/src/main/java/org/apache/accumulo/fate/Fate.java: Loading...
  2. server/src/main/java/org/apache/accumulo/server/master/Master.java: Loading...