Review Board 1.7.22


FLUME-1926. Optionally timeout Avro Sink Rpc Clients to avoid stickiness

Review Request #9813 - Created March 8, 2013 and submitted

Hari Shreedharan
FLUME-1926
Reviewers
Flume
flume-git
Reset connection after a fixed number of seconds in Avro Sink so that a load balancer can add hosts without having to restart Flume
Added unit tests testing the enabled and disabled conditions.
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
Revision f5699e4 New Change
[20] 15 lines
[+20]
16
 * specific language governing permissions and limitations
16
 * specific language governing permissions and limitations
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19
package org.apache.flume.sink;
19
package org.apache.flume.sink;
20

    
   
20

   

    
   
21
import com.google.common.annotations.VisibleForTesting;
21
import com.google.common.base.Preconditions;
22
import com.google.common.base.Preconditions;

    
   
23
import com.google.common.base.Throwables;
22
import com.google.common.collect.Lists;
24
import com.google.common.collect.Lists;

    
   
25
import com.google.common.util.concurrent.ThreadFactoryBuilder;
23
import org.apache.flume.Channel;
26
import org.apache.flume.Channel;
24
import org.apache.flume.ChannelException;
27
import org.apache.flume.ChannelException;
25
import org.apache.flume.Context;
28
import org.apache.flume.Context;
26
import org.apache.flume.Event;
29
import org.apache.flume.Event;
27
import org.apache.flume.EventDeliveryException;
30
import org.apache.flume.EventDeliveryException;
[+20] [20] 7 lines
[+20]
35
import org.slf4j.LoggerFactory;
38
import org.slf4j.LoggerFactory;
36

    
   
39

   
37
import java.util.List;
40
import java.util.List;
38
import java.util.Map.Entry;
41
import java.util.Map.Entry;
39
import java.util.Properties;
42
import java.util.Properties;

    
   
43
import java.util.concurrent.Executors;

    
   
44
import java.util.concurrent.ScheduledExecutorService;

    
   
45
import java.util.concurrent.TimeUnit;
40

    
   
46

   
41
/**
47
/**
42
 * This sink provides the basic RPC functionality for Flume. This sink takes
48
 * This sink provides the basic RPC functionality for Flume. This sink takes
43
 * several arguments which are used in RPC.
49
 * several arguments which are used in RPC.
44
 * This sink forms one half of Flume's tiered collection support. Events sent to
50
 * This sink forms one half of Flume's tiered collection support. Events sent to
[+20] [20] 93 lines
[+20]
138
  private String hostname;
144
  private String hostname;
139
  private Integer port;
145
  private Integer port;
140
  private RpcClient client;
146
  private RpcClient client;
141
  private Properties clientProps;
147
  private Properties clientProps;
142
  private SinkCounter sinkCounter;
148
  private SinkCounter sinkCounter;

    
   
149
  private int cxnResetInterval;

    
   
150
  private final int DEFAULT_CXN_RESET_INTERVAL = 0;

    
   
151
  private final ScheduledExecutorService cxnResetExecutor = Executors

    
   
152
    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()

    
   
153
      .setNameFormat("Rpc Sink Reset Thread").build());
143

    
   
154

   
144
  @Override
155
  @Override
145
  public void configure(Context context) {
156
  public void configure(Context context) {
146
    clientProps = new Properties();
157
    clientProps = new Properties();
147

    
   
158

   
[+20] [20] 12 lines
[+20] public void configure(Context context) {
160
    }
171
    }
161

    
   
172

   
162
    if (sinkCounter == null) {
173
    if (sinkCounter == null) {
163
      sinkCounter = new SinkCounter(getName());
174
      sinkCounter = new SinkCounter(getName());
164
    }
175
    }

    
   
176
    cxnResetInterval = context.getInteger("reset-connection-interval",

    
   
177
      DEFAULT_CXN_RESET_INTERVAL);

    
   
178
    if(cxnResetInterval == DEFAULT_CXN_RESET_INTERVAL) {

    
   
179
      logger.info("Connection reset is set to " + String.valueOf

    
   
180
        (DEFAULT_CXN_RESET_INTERVAL) +". Will not reset connection to next " +

    
   
181
        "hop");

    
   
182
    }
165
  }
183
  }
166

    
   
184

   
167
  /**
185
  /**
168
   * Returns a new {@linkplain RpcClient} instance configured using the given
186
   * Returns a new {@linkplain RpcClient} instance configured using the given
169
   * {@linkplain Properties} object. This method is called whenever a new
187
   * {@linkplain Properties} object. This method is called whenever a new
[+20] [20] 17 lines
[+20] [+] private void createConnection() throws FlumeException {
187
      try {
205
      try {
188
        client = initializeRpcClient(clientProps);
206
        client = initializeRpcClient(clientProps);
189
        Preconditions.checkNotNull(client, "Rpc Client could not be " +
207
        Preconditions.checkNotNull(client, "Rpc Client could not be " +
190
          "initialized. " + getName() + " could not be started");
208
          "initialized. " + getName() + " could not be started");
191
        sinkCounter.incrementConnectionCreatedCount();
209
        sinkCounter.incrementConnectionCreatedCount();

    
   
210
        if (cxnResetInterval > 0) {

    
   
211
          cxnResetExecutor.schedule(new Runnable() {

    
   
212
            @Override

    
   
213
            public void run() {

    
   
214
              destroyConnection();

    
   
215
            }

    
   
216
          }, cxnResetInterval, TimeUnit.SECONDS);

    
   
217
        }
192
      } catch (Exception ex) {
218
      } catch (Exception ex) {
193
        sinkCounter.incrementConnectionFailedCount();
219
        sinkCounter.incrementConnectionFailedCount();
194
        if (ex instanceof FlumeException) {
220
        if (ex instanceof FlumeException) {
195
          throw (FlumeException) ex;
221
          throw (FlumeException) ex;
196
        } else {
222
        } else {
[+20] [20] 67 lines
[+20] [+] public void start() {
264
  @Override
290
  @Override
265
  public void stop() {
291
  public void stop() {
266
    logger.info("Rpc sink {} stopping...", getName());
292
    logger.info("Rpc sink {} stopping...", getName());
267

    
   
293

   
268
    destroyConnection();
294
    destroyConnection();

    
   
295
    cxnResetExecutor.shutdown();

    
   
296
    try {

    
   
297
      if (cxnResetExecutor.awaitTermination(5, TimeUnit.SECONDS)) {

    
   
298
        cxnResetExecutor.shutdownNow();

    
   
299
      }

    
   
300
    } catch (Exception ex) {

    
   
301
      logger.error("Interrupted while waiting for connection reset executor " +

    
   
302
        "to shut down");

    
   
303
    }
269
    sinkCounter.stop();
304
    sinkCounter.stop();
270
    super.stop();
305
    super.stop();
271

    
   
306

   
272
    logger.info("Rpc sink {} stopped. Metrics: {}", getName(), sinkCounter);
307
    logger.info("Rpc sink {} stopped. Metrics: {}", getName(), sinkCounter);
273
  }
308
  }
[+20] [20] 62 lines
[+20] [+] public Status process() throws EventDeliveryException {
336
      transaction.close();
371
      transaction.close();
337
    }
372
    }
338

    
   
373

   
339
    return status;
374
    return status;
340
  }
375
  }

    
   
376

   

    
   
377
  @VisibleForTesting

    
   
378
  RpcClient getUnderlyingClient() {

    
   
379
    return client;

    
   
380
  }
341
}
381
}
flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Revision 3b1c8db New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision f9088f9 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java: Loading...
  3. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...