Review Board 1.7.22


Fix for Flume-2109, Take 3. Incorporated Review comments and implementation in sysc with Avro source

Review Request #12803 - Created July 22, 2013 and updated

Ashish Paliwal
FLUME-2109
Reviewers
Flume
flume-git
Fix for Flume-2109, to support HTTPS.
Incorporates Review comments from Alex
Implementation refined to be as close to Avro Source, like SSL port is not to be defined, the port value shall be used
Documentation updated to reflect the changes
Unit Test case written to test SSL functionality
flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
Revision c90f067 New Change
[20] 26 lines
[+20]
27
import org.apache.flume.instrumentation.SourceCounter;
27
import org.apache.flume.instrumentation.SourceCounter;
28
import org.apache.flume.source.AbstractSource;
28
import org.apache.flume.source.AbstractSource;
29
import org.mortbay.jetty.Connector;
29
import org.mortbay.jetty.Connector;
30
import org.mortbay.jetty.Server;
30
import org.mortbay.jetty.Server;
31
import org.mortbay.jetty.bio.SocketConnector;
31
import org.mortbay.jetty.bio.SocketConnector;

    
   
32
import org.mortbay.jetty.security.SslSocketConnector;
32
import org.mortbay.jetty.servlet.ServletHolder;
33
import org.mortbay.jetty.servlet.ServletHolder;
33
import org.slf4j.Logger;
34
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35
import org.slf4j.LoggerFactory;
35

    
   
36

   
36
import javax.servlet.http.HttpServlet;
37
import javax.servlet.http.HttpServlet;
37
import javax.servlet.http.HttpServletRequest;
38
import javax.servlet.http.HttpServletRequest;
38
import javax.servlet.http.HttpServletResponse;
39
import javax.servlet.http.HttpServletResponse;
39
import java.io.IOException;
40
import java.io.IOException;
40
import java.util.ArrayList;

   
41
import java.util.Collections;
41
import java.util.Collections;
42
import java.util.List;
42
import java.util.List;
43
import java.util.Map;
43
import java.util.Map;
44

    
   
44

   
45
/**
45
/**
[+20] [20] 40 lines
[+20] [+] public class HTTPSource extends AbstractSource implements
86
  private volatile Server srv;
86
  private volatile Server srv;
87
  private volatile String host;
87
  private volatile String host;
88
  private HTTPSourceHandler handler;
88
  private HTTPSourceHandler handler;
89
  private SourceCounter sourceCounter;
89
  private SourceCounter sourceCounter;
90

    
   
90

   

    
   
91
  // SSL configuration variable

    
   
92
  private volatile String keyStorePath;

    
   
93
  private volatile String keyStorePassword;

    
   
94
  private volatile Boolean sslEnabled = false;

    
   
95
  private volatile String keystoreType;

    
   
96

   

    
   
97

   
91
  @Override
98
  @Override
92
  public void configure(Context context) {
99
  public void configure(Context context) {
93
    try {
100
    try {

    
   
101
      // SSL related config

    
   
102
      sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false);

    
   
103

   
94
      port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
104
      port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
95
      host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
105
      host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
96
        HTTPSourceConfigurationConstants.DEFAULT_BIND);
106
        HTTPSourceConfigurationConstants.DEFAULT_BIND);

    
   
107

   
97
      checkHostAndPort();
108
      checkHostAndPort();

    
   
109

   
98
      String handlerClassName = context.getString(
110
      String handlerClassName = context.getString(
99
              HTTPSourceConfigurationConstants.CONFIG_HANDLER,
111
              HTTPSourceConfigurationConstants.CONFIG_HANDLER,
100
              HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();
112
              HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();

    
   
113

   

    
   
114
      if(sslEnabled) {

    
   
115
        LOG.debug("SSL configuration enabled");

    
   
116
        keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE);

    
   
117
        Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),

    
   
118
                                        "A Keystore is required for SSL Conifguration" );

    
   
119
        keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);

    
   
120
        Preconditions.checkArgument(keyStorePassword != null, "A Keystore password is required for SSL Configuration");

    
   
121

   

    
   
122
        keystoreType = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_TYPE, "JKS");

    
   
123
      }

    
   
124

   
101
      @SuppressWarnings("unchecked")
125
      @SuppressWarnings("unchecked")
102
      Class<? extends HTTPSourceHandler> clazz =
126
      Class<? extends HTTPSourceHandler> clazz =
103
              (Class<? extends HTTPSourceHandler>)
127
              (Class<? extends HTTPSourceHandler>)
104
              Class.forName(handlerClassName);
128
              Class.forName(handlerClassName);
105
      handler = clazz.getDeclaredConstructor().newInstance();
129
      handler = clazz.getDeclaredConstructor().newInstance();
[+20] [20] 31 lines
[+20] [+] private void checkHostAndPort() {
137
    Preconditions.checkState(srv == null,
161
    Preconditions.checkState(srv == null,
138
            "Running HTTP Server found in source: " + getName()
162
            "Running HTTP Server found in source: " + getName()
139
            + " before I started one."
163
            + " before I started one."
140
            + "Will not attempt to start.");
164
            + "Will not attempt to start.");
141
    srv = new Server();
165
    srv = new Server();

    
   
166

   

    
   
167
    // Connector Array

    
   
168
    Connector[] connectors = new Connector[1];

    
   
169

   

    
   
170

   

    
   
171
    if(sslEnabled) {

    
   
172
      SslSocketConnector sslSocketConnector = new SslSocketConnector();

    
   
173
      sslSocketConnector.setKeystore(keyStorePath);

    
   
174
      sslSocketConnector.setKeyPassword(keyStorePassword);

    
   
175
      sslSocketConnector.setPort(port);

    
   
176
      sslSocketConnector.setKeystoreType(keystoreType);

    
   
177
      connectors[0] = sslSocketConnector;

    
   
178
    } else {
142
    SocketConnector connector = new SocketConnector();
179
        SocketConnector connector = new SocketConnector();
143
    connector.setPort(port);
180
        connector.setPort(port);
144
    connector.setHost(host);
181
        connector.setHost(host);
145
    srv.setConnectors(new Connector[] { connector });
182
        connectors[0] = connector;

    
   
183
    }

    
   
184

   

    
   
185
    srv.setConnectors(connectors);
146
    try {
186
    try {
147
      org.mortbay.jetty.servlet.Context root =
187
      org.mortbay.jetty.servlet.Context root =
148
              new org.mortbay.jetty.servlet.Context(
188
              new org.mortbay.jetty.servlet.Context(
149
              srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
189
              srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
150
      root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
190
      root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
[+20] [20] 80 lines
flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
Revision f547e0f New Change
 
flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
Revision 8952db3 New Change
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 63cad21 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java: Loading...
  2. flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java: Loading...
  4. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...