Review Board 1.7.22


Moving BlobHandler out of morphline sink and into HTTP source

Review Request #15107 - Created Oct. 31, 2013 and updated

Roshan Naik
flume-1.5
FLUME-2226
Reviewers
Flume
flume-git
- Moved BlobHandler out of morphline sink and into HTTP source along with tests. 
- Updated docs to reflect new FQCN
- Retained dummy class for old FQCN compat
Ran unit tests & some manual Test.
flume-ng-core/src/main/java/org/apache/flume/source/http/BlobHandler.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one or more

    
   
3
 * contributor license agreements.  See the NOTICE file distributed with

    
   
4
 * this work for additional information regarding copyright ownership.

    
   
5
 * The ASF licenses this file to You under the Apache License, Version 2.0

    
   
6
 * (the "License"); you may not use this file except in compliance with

    
   
7
 * the License.  You may obtain a copy of the License at

    
   
8
 *

    
   
9
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
10
 *

    
   
11
 * Unless required by applicable law or agreed to in writing, software

    
   
12
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
14
 * See the License for the specific language governing permissions and

    
   
15
 * limitations under the License.

    
   
16
 */

    
   
17
package org.apache.flume.source.http;

    
   
18

   

    
   
19
import java.io.InputStream;

    
   
20
import java.util.Collections;

    
   
21
import java.util.Enumeration;

    
   
22
import java.util.HashMap;

    
   
23
import java.util.List;

    
   
24
import java.util.Map;

    
   
25

   

    
   
26
import javax.servlet.http.HttpServletRequest;

    
   
27

   

    
   
28
import org.apache.commons.io.output.ByteArrayOutputStream;

    
   
29
import org.apache.flume.Context;

    
   
30
import org.apache.flume.Event;

    
   
31
import org.apache.flume.conf.ConfigurationException;

    
   
32
import org.apache.flume.event.EventBuilder;

    
   
33
import org.slf4j.Logger;

    
   
34
import org.slf4j.LoggerFactory;

    
   
35

   

    
   
36
/**

    
   
37
 * BlobHandler for HTTPSource that returns event that contains the request

    
   
38
 * parameters as well as the Binary Large Object (BLOB) uploaded with this

    
   
39
 * request.

    
   
40
 * <p>

    
   
41
 * Note that this approach is not suitable for very large objects because it

    
   
42
 * buffers up the entire BLOB.

    
   
43
 * <p>

    
   
44
 * Example client usage:

    
   
45
 * <pre>

    
   
46
 * curl --data-binary @sample-statuses-20120906-141433-medium.avro 'http://127.0.0.1:5140?resourceName=sample-statuses-20120906-141433-medium.avro' --header 'Content-Type:application/octet-stream' --verbose

    
   
47
 * </pre>

    
   
48
 */

    
   
49
public class BlobHandler implements HTTPSourceHandler {

    
   
50

   

    
   
51
  private int maxBlobLength = MAX_BLOB_LENGTH_DEFAULT;

    
   
52

   

    
   
53
  public static final String MAX_BLOB_LENGTH_KEY = "maxBlobLength";

    
   
54
  public static final int MAX_BLOB_LENGTH_DEFAULT = 100 * 1000 * 1000;

    
   
55

   

    
   
56
  private static final int DEFAULT_BUFFER_SIZE = 1024 * 8;

    
   
57
  private static final Logger LOGGER = LoggerFactory.getLogger(BlobHandler.class);

    
   
58
  private static final String HTTP_CONTENT_TYPE = "Content-Type";

    
   
59

   

    
   
60
  public BlobHandler() {

    
   
61
  }

    
   
62

   

    
   
63
  @Override

    
   
64
  public void configure(Context context) {

    
   
65
    this.maxBlobLength = context.getInteger(MAX_BLOB_LENGTH_KEY, MAX_BLOB_LENGTH_DEFAULT);

    
   
66
    if (this.maxBlobLength <= 0) {

    
   
67
      throw new ConfigurationException("Configuration parameter " + MAX_BLOB_LENGTH_KEY

    
   
68
          + " must be greater than zero: " + maxBlobLength);

    
   
69
    }

    
   
70
  }

    
   
71

   

    
   
72
  @SuppressWarnings("resource")

    
   
73
  @Override

    
   
74
  public List<Event> getEvents(HttpServletRequest request) throws Exception {

    
   
75
    Map<String, String> headers = getHeaders(request);

    
   
76
    InputStream in = request.getInputStream();

    
   
77
    try {

    
   
78
      ByteArrayOutputStream blob = null;

    
   
79
      byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];

    
   
80
      int blobLength = 0;

    
   
81
      int n = 0;

    
   
82
      while ((n = in.read(buf, 0, Math.min(buf.length, maxBlobLength - blobLength))) != -1) {

    
   
83
        if (blob == null) {

    
   
84
          blob = new ByteArrayOutputStream(n);

    
   
85
        }

    
   
86
        blob.write(buf, 0, n);

    
   
87
        blobLength += n;

    
   
88
        if (blobLength >= maxBlobLength) {

    
   
89
          LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);

    
   
90
          break;

    
   
91
        }

    
   
92
      }

    
   
93

   

    
   
94
      byte[] array = blob != null ? blob.toByteArray() : new byte[0];

    
   
95
      Event event = EventBuilder.withBody(array, headers);

    
   
96
      LOGGER.debug("blobEvent: {}", event);

    
   
97
      return Collections.singletonList(event);

    
   
98
    } finally {

    
   
99
      in.close();

    
   
100
    }

    
   
101
  }

    
   
102

   

    
   
103
  private Map<String, String> getHeaders(HttpServletRequest request) {

    
   
104
    if (LOGGER.isDebugEnabled()) {

    
   
105
      Map requestHeaders = new HashMap();

    
   
106
      Enumeration iter = request.getHeaderNames();

    
   
107
      while (iter.hasMoreElements()) {

    
   
108
        String name = (String) iter.nextElement();

    
   
109
        requestHeaders.put(name, request.getHeader(name));

    
   
110
      }

    
   
111
      LOGGER.debug("requestHeaders: {}", requestHeaders);

    
   
112
    }

    
   
113
    Map<String, String> headers = new HashMap();

    
   
114
    if (request.getContentType() != null) {

    
   
115
      headers.put(HTTP_CONTENT_TYPE, request.getContentType());

    
   
116
    }

    
   
117
    Enumeration iter = request.getParameterNames();

    
   
118
    while (iter.hasMoreElements()) {

    
   
119
      String name = (String) iter.nextElement();

    
   
120
      headers.put(name, request.getParameter(name));

    
   
121
    }

    
   
122
    return headers;

    
   
123
  }

    
   
124

   

    
   
125
}
flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletByteRequestWrapper.java
New File
 
flume-ng-core/src/test/java/org/apache/flume/source/http/TestBlobHandler.java
New File
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 3a3038c New Change
 
flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java
Revision e84dec1 New Change
 
flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java
Revision 9711a3a New Change
 
flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobHandler.java
Revision 3e7de99 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/http/BlobHandler.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/http/FlumeHttpServletByteRequestWrapper.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/source/http/TestBlobHandler.java: Loading...
  4. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  5. flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobHandler.java: Loading...
  6. flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/FlumeHttpServletRequestWrapper.java: Loading...
  7. flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobHandler.java: Loading...