Review Board 1.7.22


FLUME-2002: Flume RPC Client creates 2 threads per each log attempt if the remote flume agent goes down

Review Request #10880 - Created May 1, 2013 and updated

Mike Percy
FLUME-2002
Reviewers
Flume
flume-git
Shut down thread pools on failure to connect.
Added unit test.
flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
Revision 8285129 New Change
[20] 107 lines
[+20] [+] private void connect() throws FlumeException {
108
   * @throws FlumeException
108
   * @throws FlumeException
109
   */
109
   */
110
  private void connect(long timeout, TimeUnit tu) throws FlumeException {
110
  private void connect(long timeout, TimeUnit tu) throws FlumeException {
111
    callTimeoutPool = Executors.newCachedThreadPool(
111
    callTimeoutPool = Executors.newCachedThreadPool(
112
        new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
112
        new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
113
    try {
113
    NioClientSocketChannelFactory socketChannelFactory = null;
114

    
   
114

   
115
      NioClientSocketChannelFactory socketChannelFactory;
115
    try {
116

    
   
116

   
117
      if (enableDeflateCompression) {
117
      if (enableDeflateCompression) {
118
        socketChannelFactory = new CompressionChannelFactory(
118
        socketChannelFactory = new CompressionChannelFactory(
119
            Executors.newCachedThreadPool(new TransceiverThreadFactory(
119
            Executors.newCachedThreadPool(new TransceiverThreadFactory(
120
                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
120
                "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
[+20] [20] 11 lines
[+20] private void connect(long timeout, TimeUnit tu) throws FlumeException {
132
          socketChannelFactory,
132
          socketChannelFactory,
133
          tu.toMillis(timeout));
133
          tu.toMillis(timeout));
134
      avroClient =
134
      avroClient =
135
          SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
135
          SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
136
          transceiver);
136
          transceiver);
137
    } catch (IOException ex) {
137
    } catch (Throwable t) {
138
      throw new FlumeException(this + ": RPC connection error", ex);
138
      if (callTimeoutPool != null) {

    
   
139
        callTimeoutPool.shutdownNow();

    
   
140
      }

    
   
141
      if (socketChannelFactory != null) {

    
   
142
        socketChannelFactory.releaseExternalResources();

    
   
143
      }

    
   
144
      if (t instanceof IOException) {

    
   
145
        throw new FlumeException(this + ": RPC connection error", t);

    
   
146
      } else if (t instanceof FlumeException) {

    
   
147
        throw (FlumeException) t;

    
   
148
      } else if (t instanceof Error) {

    
   
149
        throw (Error) t;

    
   
150
      } else {

    
   
151
        throw new FlumeException(this + ": Unexpected exception", t);

    
   
152
      }
139
    }
153
    }
140

    
   
154

   
141
    setState(ConnState.READY);
155
    setState(ConnState.READY);
142
  }
156
  }
143

    
   
157

   
[+20] [20] 465 lines
flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
Revision 1e6d2b2 New Change
 
  1. flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java: Loading...
  2. flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java: Loading...