Review Board 1.7.22


FLUME-2154 Reducing duplicate events caused by reset-connection-interval

Review Request #13436 - Created Aug. 9, 2013 and updated

Juhani Connolly
Reviewers
Flume
flume-git
I removed the lock and set a flag in scheduled runnable. This flag is checked in process and a reconnect is performed if set.
Unit tests were adjusted(since a reconnect requires process to actually happen) and pass. 

We tested with and without the patch on our servers and everything looks good with log duplication at the same level as having interval = 0

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
Revision b3208fc New Change
[20] 40 lines
[+20]
41
import java.util.Map.Entry;
41
import java.util.Map.Entry;
42
import java.util.Properties;
42
import java.util.Properties;
43
import java.util.concurrent.Executors;
43
import java.util.concurrent.Executors;
44
import java.util.concurrent.ScheduledExecutorService;
44
import java.util.concurrent.ScheduledExecutorService;
45
import java.util.concurrent.TimeUnit;
45
import java.util.concurrent.TimeUnit;

    
   
46
import java.util.concurrent.atomic.AtomicBoolean;
46
import java.util.concurrent.locks.Lock;
47
import java.util.concurrent.locks.Lock;
47
import java.util.concurrent.locks.ReentrantLock;
48
import java.util.concurrent.locks.ReentrantLock;
48

    
   
49

   
49
/**
50
/**
50
 * This sink provides the basic RPC functionality for Flume. This sink takes
51
 * This sink provides the basic RPC functionality for Flume. This sink takes
[+20] [20] 96 lines
[+20]
147
  private Integer port;
148
  private Integer port;
148
  private RpcClient client;
149
  private RpcClient client;
149
  private Properties clientProps;
150
  private Properties clientProps;
150
  private SinkCounter sinkCounter;
151
  private SinkCounter sinkCounter;
151
  private int cxnResetInterval;
152
  private int cxnResetInterval;

    
   
153
  private AtomicBoolean resetConnectionFlag;
152
  private final int DEFAULT_CXN_RESET_INTERVAL = 0;
154
  private final int DEFAULT_CXN_RESET_INTERVAL = 0;
153
  private final ScheduledExecutorService cxnResetExecutor = Executors
155
  private final ScheduledExecutorService cxnResetExecutor = Executors
154
    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
156
    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
155
      .setNameFormat("Rpc Sink Reset Thread").build());
157
      .setNameFormat("Rpc Sink Reset Thread").build());
156
  private final Lock resetLock = new ReentrantLock();

   
157

    
   
158

   
158
  @Override
159
  @Override
159
  public void configure(Context context) {
160
  public void configure(Context context) {
160
    clientProps = new Properties();
161
    clientProps = new Properties();
161

    
   
162

   
[+20] [20] 42 lines
[+20] [+] private void createConnection() throws FlumeException {
204
    if (client == null) {
205
    if (client == null) {
205
      logger.info("Rpc sink {}: Building RpcClient with hostname: {}, " +
206
      logger.info("Rpc sink {}: Building RpcClient with hostname: {}, " +
206
          "port: {}",
207
          "port: {}",
207
          new Object[] { getName(), hostname, port });
208
          new Object[] { getName(), hostname, port });
208
      try {
209
      try {

    
   
210
        resetConnectionFlag = new AtomicBoolean(false);
209
        client = initializeRpcClient(clientProps);
211
        client = initializeRpcClient(clientProps);
210
        Preconditions.checkNotNull(client, "Rpc Client could not be " +
212
        Preconditions.checkNotNull(client, "Rpc Client could not be " +
211
          "initialized. " + getName() + " could not be started");
213
          "initialized. " + getName() + " could not be started");
212
        sinkCounter.incrementConnectionCreatedCount();
214
        sinkCounter.incrementConnectionCreatedCount();
213
        if (cxnResetInterval > 0) {
215
        if (cxnResetInterval > 0) {
214
          cxnResetExecutor.schedule(new Runnable() {
216
          cxnResetExecutor.schedule(new Runnable() {
215
            @Override
217
            @Override
216
            public void run() {
218
            public void run() {
217
              resetLock.lock();
219
              resetConnectionFlag.set(true);
218
              try {
Moved to 237

   
219
                destroyConnection();
Moved to 238

   
220
                createConnection();
Moved to 239

   
221
              } catch (Throwable throwable) {
Moved to 240

   
222
                //Don't rethrow, else this runnable won't get scheduled again.
Moved to 241

   
223
                logger.error("Error while trying to expire connection",
Moved to 242

   
224
                  throwable);
Moved to 243

   
225
              } finally {

   
226
                resetLock.unlock();

   
227
              }

   
228
            }
220
            }
229
          }, cxnResetInterval, TimeUnit.SECONDS);
221
          }, cxnResetInterval, TimeUnit.SECONDS);
230
        }
222
        }
231
      } catch (Exception ex) {
223
      } catch (Exception ex) {
232
        sinkCounter.incrementConnectionFailedCount();
224
        sinkCounter.incrementConnectionFailedCount();
[+20] [20] 6 lines
[+20] private void createConnection() throws FlumeException {
239
       logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client);
231
       logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), client);
240
    }
232
    }
241

    
   
233

   
242
  }
234
  }
243

    
   
235

   

    
   
236
  private void resetConnection() {
Moved from 218

    
   
237
      try {
Moved from 219

    
   
238
        destroyConnection();
Moved from 220

    
   
239
        createConnection();
Moved from 221

    
   
240
      } catch (Throwable throwable) {
Moved from 222

    
   
241
        //Don't rethrow, else this runnable won't get scheduled again.
Moved from 223

    
   
242
        logger.error("Error while trying to expire connection",
Moved from 224

    
   
243
          throwable);

    
   
244
      }

    
   
245
  }

    
   
246

   
244
  private void destroyConnection() {
247
  private void destroyConnection() {
245
    if (client != null) {
248
    if (client != null) {
246
      logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client);
249
      logger.debug("Rpc sink {} closing Rpc client: {}", getName(), client);
247
      try {
250
      try {
248
        client.close();
251
        client.close();
[+20] [20] 81 lines
[+20] [+] public String toString() {
330
  public Status process() throws EventDeliveryException {
333
  public Status process() throws EventDeliveryException {
331
    Status status = Status.READY;
334
    Status status = Status.READY;
332
    Channel channel = getChannel();
335
    Channel channel = getChannel();
333
    Transaction transaction = channel.getTransaction();
336
    Transaction transaction = channel.getTransaction();
334

    
   
337

   
335
    resetLock.lock();
338
    if(resetConnectionFlag.get()) {

    
   
339
      resetConnection();

    
   
340
      // if the time to reset is long and the timeout is short

    
   
341
      // this may cancel the next reset request

    
   
342
      // this should however not be an issue

    
   
343
      resetConnectionFlag.set(false);

    
   
344
    }

    
   
345

   
336
    try {
346
    try {
337
      transaction.begin();
347
      transaction.begin();
338

    
   
348

   
339
      verifyConnection();
349
      verifyConnection();
340

    
   
350

   
[+20] [20] 39 lines
[+20] public Status process() throws EventDeliveryException {
380
      } else {
390
      } else {
381
        destroyConnection();
391
        destroyConnection();
382
        throw new EventDeliveryException("Failed to send events", t);
392
        throw new EventDeliveryException("Failed to send events", t);
383
      }
393
      }
384
    } finally {
394
    } finally {
385
      resetLock.unlock();

   
386
      transaction.close();
395
      transaction.close();
387
    }
396
    }
388

    
   
397

   
389
    return status;
398
    return status;
390
  }
399
  }
391

    
   
400

   
392
  @VisibleForTesting
401
  @VisibleForTesting
393
  RpcClient getUnderlyingClient() {
402
  RpcClient getUnderlyingClient() {
394
    return client;
403
    return client;
395
  }
404
  }
396
}
405
}
flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Revision 8760c25 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java: Loading...