Review Board 1.7.22


Patch to support custom FlumeFormatter implementations for writing HDFS SequenceFiles

Review Request #6918 - Created Sept. 5, 2012 and updated

Chris Birchall
FLUME-1100
Reviewers
Flume
flume-git
This patch allows users to customise the format of HDFS SequenceFiles by providing a custom FlumeFormatter implementation.

Currently, the user can set hdfs.writeFormat to either "Text" or "Writable", corresponding to HDFSTextFormatter and HDFSWritableFormatter respectively.

With this patch, hdfs.writeFormat can also be set to the full class name of a Builder implementation, e.g.:
    agent_foo.sinks.hdfs-sink.writeFormat=com.mycompany.flume.MyCustomFormatter$Builder

They can also pass custom configuration params to the builder, e.g.:
    agent_foo.sinks.hdfs-sink.writeFormat.ignoreHeaders=foo,bar

These params will be passed to the Builder's build() method as a Context object.

I've tried to be as consistent as possible with the design of EventSerializerFactory:
* Use an enum for the different formatter types, rather than static strings.
* Use a Builder, rather than constructing a FlumeFormatter directly.
Unit tests included in patch.

Using a patched build of Flume in an internal project (not in production).
Review request changed
Updated (Sept. 14, 2012, 5:27 a.m.)
Mike,

Sorry for taking a while to get around to this. Here is my second attempt, based on your comments.

Major changes:

* Made FlumeFormatter specific to HDFSSequenceFile. HDFS{Compressed}DataStream, BucketWriter, HDFSEventSink now have no reference to FlumeFormatter. In terms of configuration, HDFS{Compressed}DataStream now ignore "hdfs.writeFormat" and only care about "hdfs.serializer".

* Altered FlumeFormatter's interface to allow an event to be serialized into zero, one or more SequenceFile records, rather than a 1-to-1 event->record mapping.

* Renamed FlumeFormatter to SeqFileFormatter and moved it from flume-ng-core to flume-hdfs-sink.

* Updated to latest trunk

Notes:

* My IDE seems to have altered some formatting and moved a few imports around. I can fix those if you like.

* You're absolutely right that HDFSSequenceFile should be calling syncFs(). I always thought having both sync() and syncFs() in the Hadoop API was a recipe for disaster :) I will open a new issue for that and fix it in a separate patch.

* I'd like you to look most closely at HDFSCompressedDataStream and check I haven't broken anything there. FWIW, the unit test still passes.
Posted (Dec. 11, 2012, 9:06 p.m.)
Chris, this looks great! Unfortunately, it's been lying around for a little while and the patch no longer applies cleanly. Can you please provide an updated patch?

What I'd recommend at this point is to update this patch with just the required parts (these all still apply cleanly I believe):

flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java

And we can leave the part about removing FlumeFormatter completely for a 2nd phase.
Ship it!
Posted (Dec. 19, 2012, 12:26 a.m.)
+1 for rebased patch