Review Board 1.7.22


FLUME-997: Support secure transport mechanism

Review Request #10190 - Created March 29, 2013 and submitted

Joey Echeverria
FLUME-997
Reviewers
Flume
mpercy
flume-git
The patch adds support for SSL to AvroSource and AvroSink. The implementation compliments the recent addition of compression in FLUME-1915.
There are tests for having SSL enabled on both the client and server with specific tests using a truststore to verify the server certificate. There's also a test to make sure you can enable both SSL and compression.

I probably need to add some negative tests:

1) SSL server, non-SSL client
2) SSL server, SSL client with a truststore that doesn't include the server certificate
flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
Revision 517d545 New Change
[20] 17 lines
[+20]
18
 */
18
 */
19

    
   
19

   
20
package org.apache.flume.source;
20
package org.apache.flume.source;
21

    
   
21

   
22
import com.google.common.base.Throwables;
22
import com.google.common.base.Throwables;

    
   
23
import java.io.FileInputStream;
23
import java.net.InetSocketAddress;
24
import java.net.InetSocketAddress;

    
   
25
import java.security.KeyStore;

    
   
26
import java.security.Security;
24
import java.util.ArrayList;
27
import java.util.ArrayList;
25
import java.util.HashMap;
28
import java.util.HashMap;
26
import java.util.List;
29
import java.util.List;
27
import java.util.Map;
30
import java.util.Map;
28
import java.util.concurrent.Executors;
31
import java.util.concurrent.Executors;
29
import java.util.concurrent.ScheduledExecutorService;
32
import java.util.concurrent.ScheduledExecutorService;
30
import java.util.concurrent.TimeUnit;
33
import java.util.concurrent.TimeUnit;

    
   
34
import javax.net.ssl.KeyManagerFactory;

    
   
35
import javax.net.ssl.SSLContext;

    
   
36
import javax.net.ssl.SSLEngine;
31

    
   
37

   
32
import org.apache.avro.ipc.NettyServer;
38
import org.apache.avro.ipc.NettyServer;
33
import org.apache.avro.ipc.Responder;
39
import org.apache.avro.ipc.Responder;
34
import org.apache.avro.ipc.Server;
40
import org.apache.avro.ipc.Server;
35
import org.apache.avro.ipc.specific.SpecificResponder;
41
import org.apache.avro.ipc.specific.SpecificResponder;
[+20] [20] 14 lines
[+20]
50
import org.jboss.netty.channel.ChannelPipeline;
56
import org.jboss.netty.channel.ChannelPipeline;
51
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
57
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
52
import org.jboss.netty.channel.Channels;
58
import org.jboss.netty.channel.Channels;
53
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
59
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
54
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
60
import org.jboss.netty.handler.codec.compression.ZlibEncoder;

    
   
61
import org.jboss.netty.handler.ssl.SslHandler;
55
import org.slf4j.Logger;
62
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
63
import org.slf4j.LoggerFactory;
57

    
   
64

   
58
/**
65
/**
59
 * <p>
66
 * <p>
[+20] [20] 59 lines
[+20] [+] public class AvroSource extends AbstractSource implements EventDrivenSource,
119
      .getLogger(AvroSource.class);
126
      .getLogger(AvroSource.class);
120

    
   
127

   
121
  private static final String PORT_KEY = "port";
128
  private static final String PORT_KEY = "port";
122
  private static final String BIND_KEY = "bind";
129
  private static final String BIND_KEY = "bind";
123
  private static final String COMPRESSION_TYPE = "compression-type";
130
  private static final String COMPRESSION_TYPE = "compression-type";

    
   
131
  private static final String KEYSTORE_KEY = "keystore";

    
   
132
  private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";

    
   
133
  private static final String KEYSTORE_TYPE_KEY = "keystore-type";
124
  private int port;
134
  private int port;
125
  private String bindAddress;
135
  private String bindAddress;
126
  private String compressionType;
136
  private String compressionType;

    
   
137
  private String keystore;

    
   
138
  private String keystorePassword;

    
   
139
  private String keystoreType;

    
   
140
  private boolean enableSsl = false;
127

    
   
141

   
128
  private Server server;
142
  private Server server;
129
  private SourceCounter sourceCounter;
143
  private SourceCounter sourceCounter;
130

    
   
144

   
131
  private int maxThreads;
145
  private int maxThreads;
[+20] [20] 12 lines
[+20] [+] public void configure(Context context) {
144
    } catch (NumberFormatException e) {
158
    } catch (NumberFormatException e) {
145
      logger.warn("AVRO source\'s \"threads\" property must specify an integer value.",
159
      logger.warn("AVRO source\'s \"threads\" property must specify an integer value.",
146
              context.getString(THREADS));
160
              context.getString(THREADS));
147
    }
161
    }
148

    
   
162

   

    
   
163
    keystore = context.getString(KEYSTORE_KEY);

    
   
164
    keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);

    
   
165
    keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");

    
   
166
    if (keystore != null && keystorePassword != null) {

    
   
167
      try {

    
   
168
        KeyStore ks = KeyStore.getInstance(keystoreType);

    
   
169
        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());

    
   
170
        enableSsl = true;

    
   
171
      } catch (Exception ex) {

    
   
172
        logger.warn("AVRO source configured with invalid keystore " + keystore, ex);

    
   
173
      }

    
   
174
    }

    
   
175

   
149
    if (sourceCounter == null) {
176
    if (sourceCounter == null) {
150
      sourceCounter = new SourceCounter(getName());
177
      sourceCounter = new SourceCounter(getName());
151
    }
178
    }
152
  }
179
  }
153

    
   
180

   
[+20] [20] 40 lines
[+20] [+] private NioServerSocketChannelFactory initSocketChannelFactory() {
194
    return socketChannelFactory;
221
    return socketChannelFactory;
195
  }
222
  }
196

    
   
223

   
197
  private ChannelPipelineFactory initChannelPipelineFactory() {
224
  private ChannelPipelineFactory initChannelPipelineFactory() {
198
    ChannelPipelineFactory pipelineFactory;
225
    ChannelPipelineFactory pipelineFactory;
199
    if (compressionType.equalsIgnoreCase("deflate")) {
226
    boolean enableCompression = compressionType.equalsIgnoreCase("deflate");
200
      pipelineFactory = new CompressionChannelPipelineFactory();
227
    if (enableCompression || enableSsl) {

    
   
228
      pipelineFactory = new SSLCompressionChannelPipelineFactory(

    
   
229
          enableCompression, enableSsl, keystore,

    
   
230
          keystorePassword, keystoreType);
201
    } else {
231
    } else {
202
      pipelineFactory = new ChannelPipelineFactory() {
232
      pipelineFactory = new ChannelPipelineFactory() {
203
        @Override
233
        @Override
204
        public ChannelPipeline getPipeline() throws Exception {
234
        public ChannelPipeline getPipeline() throws Exception {
205
          return Channels.pipeline();
235
          return Channels.pipeline();
[+20] [20] 105 lines
[+20] [+] public Status appendBatch(List<AvroFlumeEvent> events) {
311
    sourceCounter.addToEventAcceptedCount(events.size());
341
    sourceCounter.addToEventAcceptedCount(events.size());
312

    
   
342

   
313
    return Status.OK;
343
    return Status.OK;
314
  }
344
  }
315

    
   
345

   
316
  private static class CompressionChannelPipelineFactory implements
346
  /**
317
  ChannelPipelineFactory {
347
   * Factory of SSL-enabled server worker channel pipelines

    
   
348
   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test

    
   
349
   */

    
   
350
  private static class SSLCompressionChannelPipelineFactory

    
   
351
      implements ChannelPipelineFactory {

    
   
352

   

    
   
353
    private boolean enableCompression;

    
   
354
    private boolean enableSsl;

    
   
355
    private String keystore;

    
   
356
    private String keystorePassword;

    
   
357
    private String keystoreType;

    
   
358

   

    
   
359
    public SSLCompressionChannelPipelineFactory(boolean enableCompression, boolean enableSsl, String keystore, String keystorePassword, String keystoreType) {

    
   
360
      this.enableCompression = enableCompression;

    
   
361
      this.enableSsl = enableSsl;

    
   
362
      this.keystore = keystore;

    
   
363
      this.keystorePassword = keystorePassword;

    
   
364
      this.keystoreType = keystoreType;

    
   
365
    }

    
   
366

   

    
   
367
    private SSLContext createServerSSLContext() {

    
   
368
      try {

    
   
369
        KeyStore ks = KeyStore.getInstance(keystoreType);

    
   
370
        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());

    
   
371

   

    
   
372
        // Set up key manager factory to use our key store

    
   
373
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());

    
   
374
        kmf.init(ks, keystorePassword.toCharArray());

    
   
375

   

    
   
376
        SSLContext serverContext = SSLContext.getInstance("TLS");

    
   
377
        serverContext.init(kmf.getKeyManagers(), null, null);

    
   
378
        return serverContext;

    
   
379
      } catch (Exception e) {

    
   
380
        throw new Error("Failed to initialize the server-side SSLContext", e);

    
   
381
      }

    
   
382
    }

    
   
383

   

    
   
384
    private String getAlgorithm() {

    
   
385
      String algorithm = Security.getProperty(

    
   
386
          "ssl.KeyManagerFactory.algorithm");

    
   
387
      if (algorithm == null) {

    
   
388
        algorithm = "SunX509";

    
   
389
      }

    
   
390
      return algorithm;

    
   
391
    }
318

    
   
392

   
319
    @Override
393
    @Override
320
    public ChannelPipeline getPipeline() throws Exception {
394
    public ChannelPipeline getPipeline() throws Exception {
321
      ChannelPipeline pipeline = Channels.pipeline();
395
      ChannelPipeline pipeline = Channels.pipeline();

    
   
396
      if (enableCompression) {
322
      ZlibEncoder encoder = new ZlibEncoder(6);
397
        ZlibEncoder encoder = new ZlibEncoder(6);
323
      pipeline.addFirst("deflater", encoder);
398
        pipeline.addFirst("deflater", encoder);
324
      pipeline.addFirst("inflater", new ZlibDecoder());
399
        pipeline.addFirst("inflater", new ZlibDecoder());

    
   
400
      }

    
   
401
      if (enableSsl) {

    
   
402
        SSLEngine sslEngine = createServerSSLContext().createSSLEngine();

    
   
403
        sslEngine.setUseClientMode(false);

    
   
404
        // addFirst() will make SSL handling the first stage of decoding

    
   
405
        // and the last stage of encoding this must be added after

    
   
406
        // adding compression handling above

    
   
407
        pipeline.addFirst("ssl", new SslHandler(sslEngine));

    
   
408
      }
325
      return pipeline;
409
      return pipeline;
326
    }
410
    }
327
  }
411
  }
328
}
412
}
flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Revision ac47ee9 New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
Revision c699241 New Change
 
flume-ng-core/src/test/resources/server.p12
New File
 
flume-ng-core/src/test/resources/truststore.jks
New File
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 38f2205 New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
Revision 8285129 New Change
 
flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
Revision 34d73a3 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java: Loading...
  4. flume-ng-core/src/test/resources/server.p12: Loading...
  5. flume-ng-core/src/test/resources/truststore.jks: Loading...
  6. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  7. flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java: Loading...
  8. flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java: Loading...