Review Board 1.7.22


FLUME-1820 - Should not be possible for RPC client to block indefinitely on close()

Review Request #8814 - Created Jan. 4, 2013 and submitted

Mike Percy
FLUME-1820
Reviewers
Flume
flume-git
Two-phase shutdown for RPC client thread pool instead of indefinite looping awaitShutdown calls.
Unit tests pass.

Diff revision 2 (Latest)

1 2
1 2

  1. flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java: Loading...
flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
Revision bd116ff New Change
[20] 121 lines
[+20] [+] private void connect(long timeout, TimeUnit tu) throws FlumeException {
122

    
   
122

   
123
  @Override
123
  @Override
124
  public void close() throws FlumeException {
124
  public void close() throws FlumeException {
125
    if (callTimeoutPool != null) {
125
    if (callTimeoutPool != null) {
126
      callTimeoutPool.shutdown();
126
      callTimeoutPool.shutdown();
127
      while (!callTimeoutPool.isTerminated()) {

   
128
        try {
127
      try {
129
          callTimeoutPool.awaitTermination(requestTimeout,
128
        if (!callTimeoutPool.awaitTermination(requestTimeout,
130
              TimeUnit.MILLISECONDS);
129
            TimeUnit.MILLISECONDS)) {

    
   
130
          callTimeoutPool.shutdownNow();

    
   
131
          if (!callTimeoutPool.awaitTermination(requestTimeout,

    
   
132
              TimeUnit.MILLISECONDS)) {

    
   
133
            logger.warn(this + ": Unable to cleanly shut down call timeout " +

    
   
134
                "pool");

    
   
135
          }

    
   
136
        }
131
        } catch (InterruptedException ex) {
137
      } catch (InterruptedException ex) {
132
          logger.warn(this + ": Interrupted during close", ex);
138
        logger.warn(this + ": Interrupted during close", ex);

    
   
139
        // re-cancel if current thread also interrupted
133
          callTimeoutPool.shutdownNow();
140
        callTimeoutPool.shutdownNow();

    
   
141
        // preserve interrupt status
134
          Thread.currentThread().interrupt();
142
        Thread.currentThread().interrupt();
135
          break;

   
136
        }

   
137
      }
143
      }

    
   
144

   
138
      callTimeoutPool = null;
145
      callTimeoutPool = null;
139
    }
146
    }
140
    try {
147
    try {
141
      transceiver.close();
148
      transceiver.close();
142
    } catch (IOException ex) {
149
    } catch (IOException ex) {
[+20] [20] 399 lines
  1. flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java: Loading...