Review Board 1.7.22


Moving the BlobDeserializer from Morphline Sink to flume-ng-core/.../serialization

Review Request #15110 - Created Oct. 31, 2013 and updated

Roshan Naik
flume-1.5
FLUME-2227
Reviewers
Flume
flume-git
- Moved BlobDeserializer out of morphline sink and into core/.../serialization along with tests. 
- Updated docs to reflect new FQCN
- Retained dummy class for old FQCN compat
unit tests
flume-ng-core/src/main/java/org/apache/flume/serialization/BlobDeserializer.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one or more

    
   
3
 * contributor license agreements.  See the NOTICE file distributed with

    
   
4
 * this work for additional information regarding copyright ownership.

    
   
5
 * The ASF licenses this file to You under the Apache License, Version 2.0

    
   
6
 * (the "License"); you may not use this file except in compliance with

    
   
7
 * the License.  You may obtain a copy of the License at

    
   
8
 *

    
   
9
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
10
 *

    
   
11
 * Unless required by applicable law or agreed to in writing, software

    
   
12
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
14
 * See the License for the specific language governing permissions and

    
   
15
 * limitations under the License.

    
   
16
 */

    
   
17
package org.apache.flume.serialization;

    
   
18

   

    
   
19
import java.io.IOException;

    
   
20
import java.util.List;

    
   
21

   

    
   
22
import org.apache.commons.io.output.ByteArrayOutputStream;

    
   
23
import org.apache.flume.Context;

    
   
24
import org.apache.flume.Event;

    
   
25
import org.apache.flume.annotations.InterfaceAudience;

    
   
26
import org.apache.flume.annotations.InterfaceStability;

    
   
27
import org.apache.flume.conf.ConfigurationException;

    
   
28
import org.apache.flume.event.EventBuilder;

    
   
29
import org.apache.flume.serialization.EventDeserializer;

    
   
30
import org.apache.flume.serialization.ResettableInputStream;

    
   
31
import org.slf4j.Logger;

    
   
32
import org.slf4j.LoggerFactory;

    
   
33

   

    
   
34
import com.google.common.collect.Lists;

    
   
35

   

    
   
36
/**

    
   
37
 * A deserializer that reads a Binary Large Object (BLOB) per event, typically

    
   
38
 * one BLOB per file; To be used in conjunction with Flume SpoolDirectorySource.

    
   
39
 * <p>

    
   
40
 * Note that this approach is not suitable for very large objects because it

    
   
41
 * buffers up the entire BLOB.

    
   
42
 */

    
   
43
@InterfaceAudience.Private

    
   
44
@InterfaceStability.Evolving

    
   
45
public class BlobDeserializer implements EventDeserializer {

    
   
46

   

    
   
47
  private ResettableInputStream in;

    
   
48
  private final int maxBlobLength;

    
   
49
  private volatile boolean isOpen;

    
   
50

   

    
   
51
  public static final String MAX_BLOB_LENGTH_KEY = "maxBlobLength";

    
   
52
  public static final int MAX_BLOB_LENGTH_DEFAULT = 100 * 1000 * 1000;

    
   
53

   

    
   
54
  private static final int DEFAULT_BUFFER_SIZE = 1024 * 8;

    
   
55
  private static final Logger LOGGER = LoggerFactory.getLogger(BlobDeserializer.class);

    
   
56

   

    
   
57
  protected BlobDeserializer(Context context, ResettableInputStream in) {

    
   
58
    this.in = in;

    
   
59
    this.maxBlobLength = context.getInteger(MAX_BLOB_LENGTH_KEY, MAX_BLOB_LENGTH_DEFAULT);

    
   
60
    if (this.maxBlobLength <= 0) {

    
   
61
      throw new ConfigurationException("Configuration parameter " + MAX_BLOB_LENGTH_KEY

    
   
62
          + " must be greater than zero: " + maxBlobLength);

    
   
63
    }

    
   
64
    this.isOpen = true;

    
   
65
  }

    
   
66

   

    
   
67
  /**

    
   
68
   * Reads a BLOB from a file and returns an event

    
   
69
   * @return Event containing a BLOB

    
   
70
   * @throws IOException

    
   
71
   */

    
   
72
  @SuppressWarnings("resource")

    
   
73
  @Override

    
   
74
  public Event readEvent() throws IOException {

    
   
75
    ensureOpen();

    
   
76
    ByteArrayOutputStream blob = null;

    
   
77
    byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];

    
   
78
    int blobLength = 0;

    
   
79
    int n = 0;

    
   
80
    while ((n = in.read(buf, 0, Math.min(buf.length, maxBlobLength - blobLength))) != -1) {

    
   
81
      if (blob == null) {

    
   
82
        blob = new ByteArrayOutputStream(n);

    
   
83
      }

    
   
84
      blob.write(buf, 0, n);

    
   
85
      blobLength += n;

    
   
86
      if (blobLength >= maxBlobLength) {

    
   
87
        LOGGER.warn("File length exceeds maxBlobLength ({}), truncating BLOB event!", maxBlobLength);

    
   
88
        break;

    
   
89
      }

    
   
90
    }

    
   
91

   

    
   
92
    if (blob == null) {

    
   
93
      return null;

    
   
94
    } else {

    
   
95
      return EventBuilder.withBody(blob.toByteArray());

    
   
96
    }

    
   
97
  }

    
   
98

   

    
   
99
  /**

    
   
100
   * Batch BLOB read

    
   
101
   * @param numEvents Maximum number of events to return.

    
   
102
   * @return List of events containing read BLOBs

    
   
103
   * @throws IOException

    
   
104
   */

    
   
105
  @Override

    
   
106
  public List<Event> readEvents(int numEvents) throws IOException {

    
   
107
    ensureOpen();

    
   
108
    List<Event> events = Lists.newLinkedList();

    
   
109
    for (int i = 0; i < numEvents; i++) {

    
   
110
      Event event = readEvent();

    
   
111
      if (event != null) {

    
   
112
        events.add(event);

    
   
113
      } else {

    
   
114
        break;

    
   
115
      }

    
   
116
    }

    
   
117
    return events;

    
   
118
  }

    
   
119

   

    
   
120
  @Override

    
   
121
  public void mark() throws IOException {

    
   
122
    ensureOpen();

    
   
123
    in.mark();

    
   
124
  }

    
   
125

   

    
   
126
  @Override

    
   
127
  public void reset() throws IOException {

    
   
128
    ensureOpen();

    
   
129
    in.reset();

    
   
130
  }

    
   
131

   

    
   
132
  @Override

    
   
133
  public void close() throws IOException {

    
   
134
    if (isOpen) {

    
   
135
      reset();

    
   
136
      in.close();

    
   
137
      isOpen = false;

    
   
138
    }

    
   
139
  }

    
   
140

   

    
   
141
  private void ensureOpen() {

    
   
142
    if (!isOpen) {

    
   
143
      throw new IllegalStateException("Serializer has been closed");

    
   
144
    }

    
   
145
  }

    
   
146

   

    
   
147

   

    
   
148
  ///////////////////////////////////////////////////////////////////////////////

    
   
149
  // Nested classes:

    
   
150
  ///////////////////////////////////////////////////////////////////////////////

    
   
151
  /** Builder implementations MUST have a public no-arg constructor */

    
   
152
  public static class Builder implements EventDeserializer.Builder {

    
   
153

   

    
   
154
    @Override

    
   
155
    public BlobDeserializer build(Context context, ResettableInputStream in) {

    
   
156
      return new BlobDeserializer(context, in);

    
   
157
    }

    
   
158

   

    
   
159
  }

    
   
160

   

    
   
161
}
flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestByteInputStream.java
New File
 
flume-ng-core/src/test/java/org/apache/flume/serialization/TestBlobDeserializer.java
New File
 
flume-ng-doc/sphinx/FlumeUserGuide.rst
Revision 3a3038c New Change
 
flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java
Revision 12bdc40 New Change
 
flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/ResettableTestStringInputStream.java
Revision e6ee9b9 New Change
 
flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
Revision 6172c68 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/serialization/BlobDeserializer.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestByteInputStream.java: Loading...
  3. flume-ng-core/src/test/java/org/apache/flume/serialization/TestBlobDeserializer.java: Loading...
  4. flume-ng-doc/sphinx/FlumeUserGuide.rst: Loading...
  5. flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/BlobDeserializer.java: Loading...
  6. flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/ResettableTestStringInputStream.java: Loading...
  7. flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java: Loading...