Review Board 1.7.22


FLUME-1682: Improve logging message when encrypted file channel is unable to be initialized due to invalid key/keystore

Review Request #7902 - Created Nov. 6, 2012 and updated

Brock Noland
trunk
FLUME-1682
Reviewers
Flume
flume-git
The patch stores a salted SHA-512 hash of the encryption key in the log file metadata. On read that hash is verified and an error message printed out saying there will likely be errors.
Tests for verification functionality and a unit tests which easily allows you to verify the log message.
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
Revision f768d23 New Change
[20] 27 lines
[+20]
28
import java.util.concurrent.BlockingQueue;
28
import java.util.concurrent.BlockingQueue;
29
import java.util.concurrent.LinkedBlockingDeque;
29
import java.util.concurrent.LinkedBlockingDeque;
30

    
   
30

   
31
import javax.annotation.Nullable;
31
import javax.annotation.Nullable;
32

    
   
32

   
33
import org.apache.flume.channel.file.proto.ProtosFactory;

   
34
import org.apache.flume.channel.file.encryption.CipherProvider;
33
import org.apache.flume.channel.file.encryption.CipherProvider;
35
import org.apache.flume.channel.file.encryption.CipherProviderFactory;
34
import org.apache.flume.channel.file.encryption.CipherProviderFactory;

    
   
35
import org.apache.flume.channel.file.encryption.EncryptionUtils;
36
import org.apache.flume.channel.file.encryption.KeyProvider;
36
import org.apache.flume.channel.file.encryption.KeyProvider;

    
   
37
import org.apache.flume.channel.file.proto.ProtosFactory;
37
import org.slf4j.Logger;
38
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39
import org.slf4j.LoggerFactory;
39

    
   
40

   
40
import com.google.common.base.Preconditions;
41
import com.google.common.base.Preconditions;
41
import com.google.protobuf.ByteString;
42
import com.google.protobuf.ByteString;
[+20] [20] 128 lines
[+20] [+] static class Writer extends LogFile.Writer {
170
            ProtosFactory.LogFileEncryption.newBuilder();
171
            ProtosFactory.LogFileEncryption.newBuilder();
171
        logFileEncryptionBuilder.setCipherProvider(encryptionCipherProvider);
172
        logFileEncryptionBuilder.setCipherProvider(encryptionCipherProvider);
172
        logFileEncryptionBuilder.setKeyAlias(encryptionKeyAlias);
173
        logFileEncryptionBuilder.setKeyAlias(encryptionKeyAlias);
173
        logFileEncryptionBuilder.setParameters(
174
        logFileEncryptionBuilder.setParameters(
174
            ByteString.copyFrom(getEncryptor().getParameters()));
175
            ByteString.copyFrom(getEncryptor().getParameters()));

    
   
176
        if(encryptionKey.getEncoded() == null) {

    
   
177
          LOGGER.warn("Not storing key hash as key does not support encoding");

    
   
178
        } else {

    
   
179
          logFileEncryptionBuilder.

    
   
180
          setKeyHashSHA512(ByteString.copyFrom(EncryptionUtils.

    
   
181
              hashKey(encryptionKey.getEncoded())));

    
   
182
        }
175
        metaDataBuilder.setEncryption(logFileEncryptionBuilder);
183
        metaDataBuilder.setEncryption(logFileEncryptionBuilder);
176
      }
184
      }
177
      metaDataBuilder.setVersion(getVersion());
185
      metaDataBuilder.setVersion(getVersion());
178
      metaDataBuilder.setLogFileID(logFileID);
186
      metaDataBuilder.setLogFileID(logFileID);
179
      metaDataBuilder.setCheckpointPosition(0L);
187
      metaDataBuilder.setCheckpointPosition(0L);
[+20] [20] 39 lines
[+20] [+] private void initialize() throws IOException {
219
          }
227
          }
220
          ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
228
          ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
221
          key = getKeyProvider().getKey(encryption.getKeyAlias());
229
          key = getKeyProvider().getKey(encryption.getKeyAlias());
222
          cipherProvider = encryption.getCipherProvider();
230
          cipherProvider = encryption.getCipherProvider();
223
          parameters = encryption.getParameters().toByteArray();
231
          parameters = encryption.getParameters().toByteArray();

    
   
232
          verifyKey(key, encryption);
224
          encryptionEnabled = true;
233
          encryptionEnabled = true;
225
        }
234
        }
226
      } finally {
235
      } finally {
227
        try {
236
        try {
228
          inputStream.close();
237
          inputStream.close();
[+20] [20] 70 lines
[+20] [+] static class SequentialReader extends LogFile.SequentialReader {
299
          ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
308
          ProtosFactory.LogFileEncryption encryption = metaData.getEncryption();
300
          Key key = getKeyProvider().getKey(encryption.getKeyAlias());
309
          Key key = getKeyProvider().getKey(encryption.getKeyAlias());
301
          decryptor = CipherProviderFactory.
310
          decryptor = CipherProviderFactory.
302
              getDecrypter(encryption.getCipherProvider(), key,
311
              getDecrypter(encryption.getCipherProvider(), key,
303
                  encryption.getParameters().toByteArray());
312
                  encryption.getParameters().toByteArray());

    
   
313
          verifyKey(key, encryption);
304
        }
314
        }
305
        setLogFileID(metaData.getLogFileID());
315
        setLogFileID(metaData.getLogFileID());
306
        setLastCheckpointPosition(metaData.getCheckpointPosition());
316
        setLastCheckpointPosition(metaData.getCheckpointPosition());
307
        setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
317
        setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
308
      } finally {
318
      } finally {
[+20] [20] 18 lines
[+20] [+] public int getVersion() {
327
      TransactionEventRecord event =
337
      TransactionEventRecord event =
328
          TransactionEventRecord.fromByteArray(buffer);
338
          TransactionEventRecord.fromByteArray(buffer);
329
      return new LogRecord(getLogFileID(), offset, event);
339
      return new LogRecord(getLogFileID(), offset, event);
330
    }
340
    }
331
  }
341
  }

    
   
342
  private static void verifyKey(Key key, ProtosFactory.LogFileEncryption encryption) {

    
   
343
    if(key.getEncoded() == null) {

    
   
344
      LOGGER.warn("Not verifying key hash as key does not support encoding");

    
   
345
    } else if(!encryption.hasKeyHashSHA512()) {

    
   
346
      LOGGER.info("Not verifying key hash as it does not exist in the metadata");

    
   
347
    } else {

    
   
348
      byte[] keyHash = encryption.getKeyHashSHA512().toByteArray();

    
   
349
      if(!EncryptionUtils.verifyKey(key.getEncoded(), keyHash)) {

    
   
350
        LOGGER.error("The key used to encrypt the data does not appear" +

    
   
351
            " to match the data used to encrypt the data. This is most" +

    
   
352
            " often the result of generating a new keystore or key" +

    
   
353
            " while data still exists in the channel. The channel will" +

    
   
354
            " almost certainly throw exceptions while processing this" +

    
   
355
            " data.");

    
   
356
      }

    
   
357
    }

    
   
358
  }
332
}
359
}
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/EncryptionUtils.java
New File
 
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
Revision e6d4957 New Change
 
flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
Revision 3a4e828 New Change
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestEncryptionUtils.java
New File
 
flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java
Revision d2f5208 New Change
 
  1. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java: Loading...
  2. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/encryption/EncryptionUtils.java: Loading...
  3. flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java: Loading...
  4. flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto: Loading...
  5. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestEncryptionUtils.java: Loading...
  6. flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/encryption/TestFileChannelEncryption.java: Loading...