Review Board 1.7.22


FLUME-1709 - HDFS CompressedDataStream doesn't support serializer parameter

Review Request #8227 - Created Nov. 26, 2012 and updated

Cameron Gandevia
trunk
Reviewers
Flume
flume-git
When configuring the HDFS sink you can specify a serializer parameter which is hooked
into the append method of the sink. 

The HDFSCompressedDataStream does not respect this parameter and the specified serializer
is not used.

 
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
Revision 80341ef New Change
[20] 18 lines
[+20]
19
package org.apache.flume.sink.hdfs;
19
package org.apache.flume.sink.hdfs;
20

    
   
20

   
21
import java.io.IOException;
21
import java.io.IOException;
22
import org.apache.flume.Context;
22
import org.apache.flume.Context;
23
import org.apache.flume.Event;
23
import org.apache.flume.Event;

    
   
24
import org.apache.flume.serialization.EventSerializer;

    
   
25
import org.apache.flume.serialization.EventSerializerFactory;
24
import org.apache.flume.sink.FlumeFormatter;
26
import org.apache.flume.sink.FlumeFormatter;
25
import org.apache.hadoop.conf.Configuration;
27
import org.apache.hadoop.conf.Configuration;
26
import org.apache.hadoop.fs.FSDataOutputStream;
28
import org.apache.hadoop.fs.FSDataOutputStream;
27
import org.apache.hadoop.fs.FileSystem;
29
import org.apache.hadoop.fs.FileSystem;
28
import org.apache.hadoop.fs.Path;
30
import org.apache.hadoop.fs.Path;
[+20] [20] 11 lines
[+20] [+] public class HDFSCompressedDataStream implements HDFSWriter {
40

    
   
42

   
41
  private FSDataOutputStream fsOut;
43
  private FSDataOutputStream fsOut;
42
  private CompressionOutputStream cmpOut;
44
  private CompressionOutputStream cmpOut;
43
  private boolean isFinished = false;
45
  private boolean isFinished = false;
44

    
   
46

   

    
   
47
  private String serializerType;

    
   
48
  private Context serializerContext;

    
   
49
  private EventSerializer serializer;

    
   
50

   
45
  @Override
51
  @Override
46
  public void configure(Context context) {
52
  public void configure(Context context) {
47
    // no-op
53
    serializerType = context.getString("serializer", "TEXT");

    
   
54
    serializerContext = new Context(

    
   
55
        context.getSubProperties(EventSerializer.CTX_PREFIX));
48
  }
56
  }
49

    
   
57

   
50
  @Override
58
  @Override
51
  public void open(String filePath, FlumeFormatter fmt) throws IOException {
59
  public void open(String filePath, FlumeFormatter fmt) throws IOException {
52
    DefaultCodec defCodec = new DefaultCodec();
60
    DefaultCodec defCodec = new DefaultCodec();
[+20] [20] 6 lines
[+20] public void open(String filePath, FlumeFormatter fmt) throws IOException {
59
      CompressionType cType, FlumeFormatter fmt) throws IOException {
67
      CompressionType cType, FlumeFormatter fmt) throws IOException {
60
    Configuration conf = new Configuration();
68
    Configuration conf = new Configuration();
61
    Path dstPath = new Path(filePath);
69
    Path dstPath = new Path(filePath);
62
    FileSystem hdfs = dstPath.getFileSystem(conf);
70
    FileSystem hdfs = dstPath.getFileSystem(conf);
63

    
   
71

   

    
   
72
    boolean appending = false;
64
    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
73
    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
65
    (dstPath)) {
74
    (dstPath)) {
66
      fsOut = hdfs.append(dstPath);
75
      fsOut = hdfs.append(dstPath);

    
   
76
      appending = true;
67
    } else {
77
    } else {
68
      fsOut = hdfs.create(dstPath);
78
      fsOut = hdfs.create(dstPath);
69
    }
79
    }
70
    cmpOut = codec.createOutputStream(fsOut);
80
    cmpOut = codec.createOutputStream(fsOut);

    
   
81
    serializer = EventSerializerFactory.getInstance(serializerType,

    
   
82
        serializerContext, cmpOut);

    
   
83
    if (appending && !serializer.supportsReopen()) {

    
   
84
      cmpOut.close();

    
   
85
      serializer = null;

    
   
86
      throw new IOException("serializer (" + serializerType

    
   
87
          + ") does not support append");

    
   
88
    }

    
   
89
    if (appending) {

    
   
90
      serializer.afterReopen();

    
   
91
    } else {

    
   
92
      serializer.afterCreate();

    
   
93
    }
71
    isFinished = false;
94
    isFinished = false;
72
  }
95
  }
73

    
   
96

   
74
  @Override
97
  @Override
75
  public void append(Event e, FlumeFormatter fmt) throws IOException {
98
  public void append(Event e, FlumeFormatter fmt) throws IOException {
76
    if (isFinished) {
99
    if (isFinished) {
77
      cmpOut.resetState();
100
      cmpOut.resetState();
78
      isFinished = false;
101
      isFinished = false;
79
    }
102
    }
80
    byte[] bValue = fmt.getBytes(e);
103
    serializer.write(e);
81
    cmpOut.write(bValue);

   
82
  }
104
  }
83

    
   
105

   
84
  @Override
106
  @Override
85
  public void sync() throws IOException {
107
  public void sync() throws IOException {
86
    // We must use finish() and resetState() here -- flush() is apparently not
108
    // We must use finish() and resetState() here -- flush() is apparently not
87
    // supported by the compressed output streams (it's a no-op).
109
    // supported by the compressed output streams (it's a no-op).
88
    // Also, since resetState() writes headers, avoid calling it without an
110
    // Also, since resetState() writes headers, avoid calling it without an
89
    // additional write/append operation.
111
    // additional write/append operation.
90
    // Note: There are bugs in Hadoop & JDK w/ pure-java gzip; see HADOOP-8522.
112
    // Note: There are bugs in Hadoop & JDK w/ pure-java gzip; see HADOOP-8522.

    
   
113
    serializer.flush();
91
    if (!isFinished) {
114
    if (!isFinished) {
92
      cmpOut.finish();
115
      cmpOut.finish();
93
      isFinished = true;
116
      isFinished = true;
94
    }
117
    }
95
    fsOut.flush();
118
    fsOut.flush();
96
    fsOut.sync();
119
    fsOut.sync();
97
  }
120
  }
98

    
   
121

   
99
  @Override
122
  @Override
100
  public void close() throws IOException {
123
  public void close() throws IOException {
101
    sync();
124
    serializer.flush();

    
   
125
    serializer.beforeClose();

    
   
126
    if (!isFinished) {

    
   
127
      cmpOut.finish();

    
   
128
      isFinished = true;

    
   
129
    }

    
   
130
    fsOut.flush();

    
   
131
    fsOut.sync();
102
    cmpOut.close();
132
    cmpOut.close();
103
  }
133
  }
104

    
   
134

   
105
}
135
}
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
Revision f537732 New Change
 
  1. flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java: Loading...
  2. flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java: Loading...