Review Board 1.7.22


Sravya Tirukkovalur got review request #11811!

JMX support for HTTP Source.

Review Request #11811 - Created June 11, 2013 and submitted

Sravya Tirukkovalur
FLUME-2076
Reviewers
Flume
brocknoland, hshreedharan, mpercy
flume-git
Http Source supports following metrics:
START
STOP
OPEN_CONNECTION_COUNT
EVENTS_RECEIVED
EVENTS_ACCEPTED

Some points for discussion:
1. Should we support APPEND_BATCH_RECEIVED and APPEND_BATCH_ACCEPTED ?
Http doesnt have a batch setting, but multiple events can be sent in a single POST request. 
2. Is EVENTS_RECEIVED always same as APPEND_RECEIVED? 
Went over all the sources which support jmx metrics and it looks like they are redundant and never actually used separately.
Ran existing tests

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
Revision a4c3eb3 New Change
[20] 21 lines
[+20]
22
import org.apache.flume.ChannelException;
22
import org.apache.flume.ChannelException;
23
import org.apache.flume.Context;
23
import org.apache.flume.Context;
24
import org.apache.flume.Event;
24
import org.apache.flume.Event;
25
import org.apache.flume.EventDrivenSource;
25
import org.apache.flume.EventDrivenSource;
26
import org.apache.flume.conf.Configurable;
26
import org.apache.flume.conf.Configurable;

    
   
27
import org.apache.flume.instrumentation.SourceCounter;
27
import org.apache.flume.source.AbstractSource;
28
import org.apache.flume.source.AbstractSource;
28
import org.mortbay.jetty.Connector;
29
import org.mortbay.jetty.Connector;
29
import org.mortbay.jetty.Server;
30
import org.mortbay.jetty.Server;
30
import org.mortbay.jetty.bio.SocketConnector;
31
import org.mortbay.jetty.bio.SocketConnector;
31
import org.mortbay.jetty.servlet.ServletHolder;
32
import org.mortbay.jetty.servlet.ServletHolder;
[+20] [20] 18 lines
[+20]
50
 *
51
 *
51
 * The source accepts the following parameters: <p> <tt>port</tt>: port to which
52
 * The source accepts the following parameters: <p> <tt>port</tt>: port to which
52
 * the server should bind. Mandatory <p> <tt>handler</tt>: the class that
53
 * the server should bind. Mandatory <p> <tt>handler</tt>: the class that
53
 * deserializes a HttpServletRequest into a list of flume events. This class
54
 * deserializes a HttpServletRequest into a list of flume events. This class
54
 * must implement HTTPSourceHandler. Default:
55
 * must implement HTTPSourceHandler. Default:
55
 * {@linkplain JSONDeserializer}. <p> <tt>handler.*</tt> Any configuration
56
 * {@linkplain JSONHandler}. <p> <tt>handler.*</tt> Any configuration
56
 * to be passed to the handler. <p>
57
 * to be passed to the handler. <p>
57
 *
58
 *
58
 * All events deserialized from one Http request are committed to the channel in
59
 * All events deserialized from one Http request are committed to the channel in
59
 * one transaction, thus allowing for increased efficiency on channels like the
60
 * one transaction, thus allowing for increased efficiency on channels like the
60
 * file channel. If the handler throws an exception this source will return
61
 * file channel. If the handler throws an exception this source will return
[+20] [20] 22 lines
[+20] [+] public class HTTPSource extends AbstractSource implements
83
  private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
84
  private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
84
  private volatile Integer port;
85
  private volatile Integer port;
85
  private volatile Server srv;
86
  private volatile Server srv;
86
  private volatile String host;
87
  private volatile String host;
87
  private HTTPSourceHandler handler;
88
  private HTTPSourceHandler handler;

    
   
89
  private SourceCounter sourceCounter;
88

    
   
90

   
89
  @Override
91
  @Override
90
  public void configure(Context context) {
92
  public void configure(Context context) {
91
    try {
93
    try {
92
      port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
94
      port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
[+20] [20] 23 lines
[+20] public void configure(Context context) {
116
      Throwables.propagate(ex);
118
      Throwables.propagate(ex);
117
    } catch (Exception ex) {
119
    } catch (Exception ex) {
118
      LOG.error("Error configuring HTTPSource!", ex);
120
      LOG.error("Error configuring HTTPSource!", ex);
119
      Throwables.propagate(ex);
121
      Throwables.propagate(ex);
120
    }
122
    }

    
   
123
    if (sourceCounter == null) {

    
   
124
      sourceCounter = new SourceCounter(getName());

    
   
125
    }
121
  }
126
  }
122

    
   
127

   
123
  private void checkHostAndPort() {
128
  private void checkHostAndPort() {
124
    Preconditions.checkState(host != null && !host.isEmpty(),
129
    Preconditions.checkState(host != null && !host.isEmpty(),
125
      "HTTPSource hostname specified is empty");
130
      "HTTPSource hostname specified is empty");
[+20] [20] 22 lines
[+20] [+] public void start() {
148
    } catch (Exception ex) {
153
    } catch (Exception ex) {
149
      LOG.error("Error while starting HTTPSource. Exception follows.", ex);
154
      LOG.error("Error while starting HTTPSource. Exception follows.", ex);
150
      Throwables.propagate(ex);
155
      Throwables.propagate(ex);
151
    }
156
    }
152
    Preconditions.checkArgument(srv.isRunning());
157
    Preconditions.checkArgument(srv.isRunning());

    
   
158
    sourceCounter.start();
153
    super.start();
159
    super.start();
154
  }
160
  }
155

    
   
161

   
156
  @Override
162
  @Override
157
  public void stop() {
163
  public void stop() {
158
    try {
164
    try {
159
      srv.stop();
165
      srv.stop();
160
      srv.join();
166
      srv.join();
161
      srv = null;
167
      srv = null;
162
    } catch (Exception ex) {
168
    } catch (Exception ex) {
163
      LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
169
      LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
164
    }
170
    }

    
   
171
    sourceCounter.stop();

    
   
172
    LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
165
  }
173
  }
166

    
   
174

   
167
  private class FlumeHTTPServlet extends HttpServlet {
175
  private class FlumeHTTPServlet extends HttpServlet {
168

    
   
176

   
169
    private static final long serialVersionUID = 4891924863218790344L;
177
    private static final long serialVersionUID = 4891924863218790344L;
[+20] [20] 15 lines
[+20] [+] public void doPost(HttpServletRequest request, HttpServletResponse response)
185
        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
193
        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
186
                "Deserializer threw unexpected exception. "
194
                "Deserializer threw unexpected exception. "
187
                + ex.getMessage());
195
                + ex.getMessage());
188
        return;
196
        return;
189
      }
197
      }

    
   
198
      sourceCounter.incrementAppendBatchReceivedCount();

    
   
199
      sourceCounter.addToEventReceivedCount(events.size());
190
      try {
200
      try {
191
        getChannelProcessor().processEventBatch(events);
201
        getChannelProcessor().processEventBatch(events);
192
      } catch (ChannelException ex) {
202
      } catch (ChannelException ex) {
193
        LOG.warn("Error appending event to channel. "
203
        LOG.warn("Error appending event to channel. "
194
                + "Channel might be full. Consider increasing the channel "
204
                + "Channel might be full. Consider increasing the channel "
[+20] [20] 10 lines
[+20] public void doPost(HttpServletRequest request, HttpServletResponse response)
205
        return;
215
        return;
206
      }
216
      }
207
      response.setCharacterEncoding(request.getCharacterEncoding());
217
      response.setCharacterEncoding(request.getCharacterEncoding());
208
      response.setStatus(HttpServletResponse.SC_OK);
218
      response.setStatus(HttpServletResponse.SC_OK);
209
      response.flushBuffer();
219
      response.flushBuffer();

    
   
220
      sourceCounter.incrementAppendBatchAcceptedCount();

    
   
221
      sourceCounter.addToEventAcceptedCount(events.size());
210
    }
222
    }
211

    
   
223

   
212
    @Override
224
    @Override
213
    public void doGet(HttpServletRequest request, HttpServletResponse response)
225
    public void doGet(HttpServletRequest request, HttpServletResponse response)
214
            throws IOException {
226
            throws IOException {
215
      doPost(request, response);
227
      doPost(request, response);
216
    }
228
    }
217
  }
229
  }
218
}
230
}
  1. flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java: Loading...