Review Board 1.7.22


FLUME-1986. doTestInflightCorrupts should not commit transactions

Review Request #10415 - Created April 11, 2013 and submitted

Hari Shreedharan
FLUME-1986
Reviewers
Flume
flume-git
Increase sleeps so forceCheckpoint does not fail because the copy is not completed.
Ran full tests.
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
Revision fb0e208 New Change
[20] 20 lines
[+20]
21
import com.google.common.collect.Lists;
21
import com.google.common.collect.Lists;
22
import com.google.common.collect.Maps;
22
import com.google.common.collect.Maps;
23
import com.google.common.collect.Sets;
23
import com.google.common.collect.Sets;
24
import com.google.common.io.Files;
24
import com.google.common.io.Files;
25
import org.apache.commons.io.FileUtils;
25
import org.apache.commons.io.FileUtils;

    
   
26
import org.apache.flume.Transaction;
26
import org.apache.flume.channel.file.proto.ProtosFactory;
27
import org.apache.flume.channel.file.proto.ProtosFactory;
27
import org.fest.reflect.exception.ReflectionError;
28
import org.fest.reflect.exception.ReflectionError;
28
import org.junit.After;
29
import org.junit.After;
29
import org.junit.Assert;
30
import org.junit.Assert;
30
import org.junit.Before;
31
import org.junit.Before;
[+20] [20] 8 lines
[+20]
39
import java.io.IOException;
40
import java.io.IOException;
40
import java.io.RandomAccessFile;
41
import java.io.RandomAccessFile;
41
import java.util.Map;
42
import java.util.Map;
42
import java.util.Random;
43
import java.util.Random;
43
import java.util.Set;
44
import java.util.Set;

    
   
45
import java.util.concurrent.Executors;
44

    
   
46

   
45
import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
47
import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
46
import static org.apache.flume.channel.file.TestUtils.consumeChannel;
48
import static org.apache.flume.channel.file.TestUtils.consumeChannel;
47
import static org.apache.flume.channel.file.TestUtils.fillChannel;
49
import static org.apache.flume.channel.file.TestUtils.fillChannel;
48
import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
50
import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
49
import static org.apache.flume.channel.file.TestUtils.putEvents;
51
import static org.apache.flume.channel.file.TestUtils.putEvents;

    
   
52
import static org.apache.flume.channel.file.TestUtils.putWithoutCommit;
50
import static org.apache.flume.channel.file.TestUtils.takeEvents;
53
import static org.apache.flume.channel.file.TestUtils.takeEvents;

    
   
54
import static org.apache.flume.channel.file.TestUtils.takeWithoutCommit;
51
import static org.fest.reflect.core.Reflection.*;
55
import static org.fest.reflect.core.Reflection.*;
52

    
   
56

   
53
public class TestFileChannelRestart extends TestFileChannelBase {
57
public class TestFileChannelRestart extends TestFileChannelBase {
54
  protected static final Logger LOG = LoggerFactory
58
  protected static final Logger LOG = LoggerFactory
55
      .getLogger(TestFileChannelRestart.class);
59
      .getLogger(TestFileChannelRestart.class);
[+20] [20] 353 lines
[+20] [+] private void doTestIncompleteCheckpoint(boolean backup) throws Exception {
409
    compareInputAndOut(in, out);
413
    compareInputAndOut(in, out);
410
  }
414
  }
411

    
   
415

   
412
  @Test
416
  @Test
413
  public void testCorruptInflightPuts() throws Exception {
417
  public void testCorruptInflightPuts() throws Exception {
414
    doTestCorruptInflights("inflightPuts", false);
418
    doTestCorruptInflights("inflightputs", false);
415
  }
419
  }
416

    
   
420

   
417
  @Test
421
  @Test
418
  public void testCorruptInflightPutsWithBackup() throws Exception {
422
  public void testCorruptInflightPutsWithBackup() throws Exception {
419
    doTestCorruptInflights("inflightPuts", true);
423
    doTestCorruptInflights("inflightputs", true);
420
  }
424
  }
421

    
   
425

   
422
  @Test
426
  @Test
423
  public void testCorruptInflightTakes() throws Exception {
427
  public void testCorruptInflightTakes() throws Exception {
424
    doTestCorruptInflights("inflightTakes", false);
428
    doTestCorruptInflights("inflighttakes", false);
425
  }
429
  }
426

    
   
430

   
427
  @Test
431
  @Test
428
  public void testCorruptInflightTakesWithBackup() throws Exception {
432
  public void testCorruptInflightTakesWithBackup() throws Exception {
429
    doTestCorruptInflights("inflightTakes", true);
433
    doTestCorruptInflights("inflighttakes", true);
430
  }
434
  }
431

    
   
435

   
432
  @Test
436
  @Test
433
  public void testFastReplayWithCheckpoint() throws Exception{
437
  public void testFastReplayWithCheckpoint() throws Exception{
434
    testFastReplay(false, true);
438
    testFastReplay(false, true);
[+20] [20] 52 lines
[+20] [+] private void doTestCorruptInflights(String name,
487
    Map<String, String> overrides = Maps.newHashMap();
491
    Map<String, String> overrides = Maps.newHashMap();
488
    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
492
    overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
489
    channel = createFileChannel(overrides);
493
    channel = createFileChannel(overrides);
490
    channel.start();
494
    channel.start();
491
    Assert.assertTrue(channel.isOpen());
495
    Assert.assertTrue(channel.isOpen());
492
    Set<String> in = putEvents(channel, "restart", 10, 100);
496
    final Set<String> in1 = putEvents(channel, "restart-",10, 100);
493
    Assert.assertEquals(100, in.size());
497
    Assert.assertEquals(100, in1.size());

    
   
498
    Executors.newSingleThreadScheduledExecutor().submit(new Runnable() {

    
   
499
      @Override

    
   
500
      public void run() {

    
   
501
        Transaction tx = channel.getTransaction();

    
   
502
        Set<String> out1 = takeWithoutCommit(channel, tx, 100);

    
   
503
        Assert.assertEquals(100, out1.size());

    
   
504
      }

    
   
505
    });

    
   
506
    Transaction tx = channel.getTransaction();

    
   
507
    Set<String> in2 = putWithoutCommit(channel, tx, "restart", 100);

    
   
508
    Assert.assertEquals(100, in2.size());
494
    forceCheckpoint(channel);
509
    forceCheckpoint(channel);
495
    if(backup) {
510
    if(backup) {
496
      Thread.sleep(2000);
511
      Thread.sleep(2000);
497
    }
512
    }

    
   
513
    tx.commit();

    
   
514
    tx.close();
498
    channel.stop();
515
    channel.stop();
499
    File inflight = new File(checkpointDir, name);
516
    File inflight = new File(checkpointDir, name);
500
    RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
517
    RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
501
    writer.write(new Random().nextInt());
518
    writer.write(new Random().nextInt());
502
    writer.close();
519
    writer.close();
503
    channel = createFileChannel(overrides);
520
    channel = createFileChannel(overrides);
504
    channel.start();
521
    channel.start();
505
    Assert.assertTrue(channel.isOpen());
522
    Assert.assertTrue(channel.isOpen());
506
    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
523
    Assert.assertTrue(!backup || channel.checkpointBackupRestored());
507
    Set<String> out = consumeChannel(channel);
524
    Set<String> out = consumeChannel(channel);
508
    compareInputAndOut(in, out);
525
    in1.addAll(in2);

    
   
526
    compareInputAndOut(in1, out);
509
  }
527
  }
510

    
   
528

   
511
  @Test
529
  @Test
512
  public void testTruncatedCheckpointMeta() throws Exception {
530
  public void testTruncatedCheckpointMeta() throws Exception {
513
    doTestTruncatedCheckpointMeta(false);
531
    doTestTruncatedCheckpointMeta(false);
[+20] [20] 117 lines
[+20] [+] public void testBackupUsedEnsureNoFullReplay() throws Exception {
631
    channel = createFileChannel(overrides);
649
    channel = createFileChannel(overrides);
632
    channel.start();
650
    channel.start();
633
    Assert.assertTrue(channel.isOpen());
651
    Assert.assertTrue(channel.isOpen());
634
    Set<String> in = putEvents(channel, "restart", 10, 100);
652
    Set<String> in = putEvents(channel, "restart", 10, 100);
635
    Assert.assertEquals(100, in.size());
653
    Assert.assertEquals(100, in.size());

    
   
654
    Thread.sleep(5000);
636
    forceCheckpoint(channel);
655
    forceCheckpoint(channel);
637
    Thread.sleep(2000);
656
    Thread.sleep(5000);
638
    in = putEvents(channel, "restart", 10, 100);
657
    in = putEvents(channel, "restart", 10, 100);
639
    takeEvents(channel, 10, 100);
658
    takeEvents(channel, 10, 100);
640
    Assert.assertEquals(100, in.size());
659
    Assert.assertEquals(100, in.size());
641
    for(File file : backupDir.listFiles()) {
660
    for(File file : backupDir.listFiles()) {
642
      if(file.getName().equals(Log.FILE_LOCK)) {
661
      if(file.getName().equals(Log.FILE_LOCK)) {
[+20] [20] 132 lines
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
Revision 7c490b5 New Change
 
  1. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java: Loading...
  2. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java: Loading...