Review Board 1.7.22


FLUME-1571. Make sure capacity and transaction capacity of channels are positive.

Review Request #7062 - Created Sept. 12, 2012 and submitted

Hari Shreedharan
FLUME-1571
Reviewers
Flume
flume-git
Add checks to make sure they are positive, if not, reset to default.
Added unit tests.
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
Revision 950ea8c New Change
[20] 71 lines
[+20]
72
public class FileChannel extends BasicChannelSemantics {
72
public class FileChannel extends BasicChannelSemantics {
73

    
   
73

   
74
  private static final Logger LOG = LoggerFactory
74
  private static final Logger LOG = LoggerFactory
75
      .getLogger(FileChannel.class);
75
      .getLogger(FileChannel.class);
76

    
   
76

   
77
  private int capacity;
77
  private Integer capacity = 0;
78
  private int keepAlive;
78
  private int keepAlive;
79
  private int transactionCapacity;
79
  private Integer transactionCapacity = 0;
80
  private long checkpointInterval;
80
  private Long checkpointInterval = 0L;
81
  private long maxFileSize;
81
  private long maxFileSize;
82
  private long minimumRequiredSpace;
82
  private long minimumRequiredSpace;
83
  private File checkpointDir;
83
  private File checkpointDir;
84
  private File[] dataDirs;
84
  private File[] dataDirs;
85
  private Log log;
85
  private Log log;
[+20] [20] 61 lines
[+20] [+] public void configure(Context context) {
147
      }
147
      }
148
    }
148
    }
149

    
   
149

   
150
    int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
150
    int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
151
        FileChannelConfiguration.DEFAULT_CAPACITY);
151
        FileChannelConfiguration.DEFAULT_CAPACITY);

    
   
152
    if(newCapacity <= 0 && capacity == 0) {

    
   
153
      newCapacity = FileChannelConfiguration.DEFAULT_CAPACITY;

    
   
154
      LOG.warn("Invalid capacity specified, initializing channel to "

    
   
155
              + "default capacity of {}", newCapacity);

    
   
156
    }
152
    if(capacity > 0 && newCapacity != capacity) {
157
    if(capacity > 0 && newCapacity != capacity) {
153
      LOG.warn("Capacity of this channel cannot be sized on the fly due " +
158
      LOG.warn("Capacity of this channel cannot be sized on the fly due " +
154
          "the requirement we have enough DirectMemory for the queue and " +
159
          "the requirement we have enough DirectMemory for the queue and " +
155
          "downsizing of the queue cannot be guranteed due to the " +
160
          "downsizing of the queue cannot be guranteed due to the " +
156
          "fact there maybe more items on the queue than the new capacity.");
161
          "fact there maybe more items on the queue than the new capacity.");
[+20] [20] 6 lines
[+20] public void configure(Context context) {
163
            FileChannelConfiguration.DEFAULT_KEEP_ALIVE);
168
            FileChannelConfiguration.DEFAULT_KEEP_ALIVE);
164
    transactionCapacity =
169
    transactionCapacity =
165
        context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY,
170
        context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY,
166
            FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY);
171
            FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY);
167

    
   
172

   

    
   
173
    if(transactionCapacity <= 0) {

    
   
174
      transactionCapacity =

    
   
175
              FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY;

    
   
176
      LOG.warn("Invalid transaction capacity specified, " +

    
   
177
          "initializing channel to default " +

    
   
178
          "capacity of {}", transactionCapacity);

    
   
179
    }

    
   
180

   

    
   
181
    Preconditions.checkState(transactionCapacity <= capacity,

    
   
182
        "File Channel transaction capacity cannot be greater than the " +

    
   
183
            "capacity of the channel.");

    
   
184

   
168
    checkpointInterval =
185
    checkpointInterval =
169
        context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
186
            context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
170
            FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
187
            FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);

    
   
188
    if (checkpointInterval <= 0) {

    
   
189
      LOG.warn("Checkpoint interval is invalid: " + checkpointInterval

    
   
190
              + ", using default: "

    
   
191
              + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);

    
   
192

   

    
   
193
      checkpointInterval =

    
   
194
              FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL;

    
   
195
    }
171

    
   
196

   
172
    // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
197
    // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE
173
    maxFileSize = Math.min(
198
    maxFileSize = Math.min(
174
        context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
199
        context.getLong(FileChannelConfiguration.MAX_FILE_SIZE,
175
            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
200
            FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE),
[+20] [20] 406 lines
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
Revision ffc4623 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
Revision 87a0a3f New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java
Revision 184f956 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
Revision f548f31 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
Revision 9978f86 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
Revision d2f5208 New Change
 
flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Revision 87a1305 New Change
 
flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
Revision e1a61c2 New Change
 
  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java: Loading...
  2. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java: Loading...
  3. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java: Loading...
  4. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java: Loading...
  5. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java: Loading...
  6. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java: Loading...
  7. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java: Loading...
  8. flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java: Loading...
  9. flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java: Loading...