Review Board 1.7.22


PIG-3141 [piggybank] Giving CSVExcelStorage an option to handle header rows

Review Request #9697 - Created March 1, 2013 and updated

Jonathan Packer
trunk
Reviewers
pig
pig-git
Reviewboard for https://issues.apache.org/jira/browse/PIG-3141

Adds a "header treatment" option to CSVExcelStorage allowing header rows (first row with column names) in files to be skipped when loading, or for a header row with column names to be written when storing. Should be backwards compatible--all unit-tests from the old CSVExcelStorage pass.
cd contrib/piggybank/java
ant -Dtestcase=TestCSVExcelStorage test
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
Revision 568b3f3 New Change
[20] 28 lines
[+20]
29

    
   
29

   
30
import org.apache.hadoop.io.Text;
30
import org.apache.hadoop.io.Text;
31
import org.apache.hadoop.mapreduce.InputFormat;
31
import org.apache.hadoop.mapreduce.InputFormat;
32
import org.apache.hadoop.mapreduce.Job;
32
import org.apache.hadoop.mapreduce.Job;
33
import org.apache.hadoop.mapreduce.RecordReader;
33
import org.apache.hadoop.mapreduce.RecordReader;

    
   
34
import org.apache.hadoop.mapreduce.RecordWriter;
34
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
35
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    
   
36

   
35
import org.apache.log4j.Logger;
37
import org.apache.log4j.Logger;

    
   
38

   
36
import org.apache.pig.LoadPushDown;
39
import org.apache.pig.LoadPushDown;
37
import org.apache.pig.PigException;
40
import org.apache.pig.PigException;

    
   
41
import org.apache.pig.ResourceSchema;

    
   
42
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
38
import org.apache.pig.StoreFuncInterface;
43
import org.apache.pig.StoreFuncInterface;

    
   
44

   
39
import org.apache.pig.backend.executionengine.ExecException;
45
import org.apache.pig.backend.executionengine.ExecException;
40
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
46
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
41
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
47
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;

    
   
48

   
42
import org.apache.pig.builtin.PigStorage;
49
import org.apache.pig.builtin.PigStorage;
43
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
50
import org.apache.pig.bzip2r.Bzip2TextInputFormat;

    
   
51

   
44
import org.apache.pig.data.DataByteArray;
52
import org.apache.pig.data.DataByteArray;
45
import org.apache.pig.data.Tuple;
53
import org.apache.pig.data.Tuple;
46
import org.apache.pig.data.TupleFactory;
54
import org.apache.pig.data.TupleFactory;

    
   
55

   
47
import org.apache.pig.impl.logicalLayer.FrontendException;
56
import org.apache.pig.impl.logicalLayer.FrontendException;
48
import org.apache.pig.impl.util.ObjectSerializer;
57
import org.apache.pig.impl.util.ObjectSerializer;
49
import org.apache.pig.impl.util.StorageUtil;
58
import org.apache.pig.impl.util.StorageUtil;
50
import org.apache.pig.impl.util.UDFContext;
59
import org.apache.pig.impl.util.UDFContext;

    
   
60
import org.apache.pig.impl.util.Utils;

    
   
61

   

    
   
62
import org.apache.pig.parser.ParserException;
51

    
   
63

   
52
/**
64
/**
53
 * CSV loading and storing with support for multi-line fields, 
65
 * CSV loading and storing with support for multi-line fields, 
54
 * and escaping of delimiters and double quotes within fields; 
66
 * and escaping of delimiters and double quotes within fields; 
55
 * uses CSV conventions of Excel 2007.
67
 * uses CSV conventions of Excel 2007.
56
 * 
68
 * 
57
 * Arguments allow for control over:
69
 * Arguments allow for control over:
58
 * <ul>
70
 *
59
 * <li>   Which delimiter is used (default is ',')
71
 * Which field delimiter to use (default = ',')
60
 * <li>   Whether line breaks are allowed inside of fields
72
 * Whether line breaks are allowed inside of fields (YES_MULTILINE = yes, NO_MULTILINE = no, default = no)
61
 * <li>   Whether line breaks are to be written Unix style, of Windows style
73
 * How line breaks are to be written when storing (UNIX = LF, WINDOWS = CRLF, NOCHANGE = system default, default = system default)
62
 * </ul>
74
 * What to do with header rows (first line of each file):
63
 * <b>Usage:</b><br> 
75
 *     On load: READ_INPUT_HEADER = read header rows, SKIP_INPUT_HEADER = do not read header rows, default = read header rows
64
 * 		{@code STORE x INTO '<destFileName>'}<br> 
76
 *     On store: WRITE_OUTPUT_HEADER = write a header row, SKIP_OUTPUT_HEADER = do not write a header row, default = do not write a header row
65
 *      {@code USING CSVExcelStorage(['<delimiter>' [,{'YES_MULTILINE' | 'NO_MULTILINE'} [,{'UNIX' | 'WINDOWS' | 'UNCHANGED'}]]]);}
77
 *
66
 * <p>
78
 * Usage:
67
 *        Defaults are comma, 'NO_MULTILINE', 'UNCHANGED'
79
 *
68
 *        The linebreak parameter is only used during store. During load
80
 * STORE x INTO '<destFileName>'
69
 *        no conversion is performed.                
81
 *         USING org.apache.pig.piggybank.storage.CSVExcelStorage(
70
 *  <p>               
82
 *              [DELIMITER[, 
71
 * <b>Example:</b><br>
83
 *                  {YES_MULTILINE | NO_MULTILINE}[, 
72
 * 	    {@code STORE res INTO '/tmp/result.csv'}<br> 
84
 *                      {UNIX | WINDOWS | NOCHANGE}[, 
73
 *		{@code USING CSVExcelStorage(',', 'NO_MULTILINE', 'WINDOWS');}
85
 *                          {READ_INPUT_HEADER, SKIP_INPUT_HEADER, WRITE_OUTPUT_HEADER, SKIP_OUTPUT_HEADER}]]]]
74
 *<p>
86
 *         );
75
 *			would expect to see comma separated files for load, would

   
76
 *			use comma as field separator during store, would treat

   
77
 *			every newline as a record terminator, and would use CRLF

   
78
 *          as line break characters (0x0d 0x0a: \r\n).

   
79
 * <p>          

   
80
 * <b>Example:</b><br>

   
81
 *      {@code STORE res INTO '/tmp/result.csv'}<br> 

   
82
 *		{@code USING CSVExcelStorage(',', 'YES_MULTILINE');}

   
83
 * <p>

   
84
 *	        would allow newlines inside of fields. During load

   
85
 *	 	    such fields are expected to conform to the Excel

   
86
 *			requirement that the field is enclosed in double quotes.

   
87
 *			On store, the <code>chararray</code> containing the field will accordingly be

   
88
 *			enclosed in double quotes.

   
89
 * <p>

   
90
 * <b>Note:</b><br>

   
91
 *       A danger with enabling multiline fields during load is that unbalanced

   
92
 * 		 double quotes will cause slurping up of input until a balancing double

   
93
 * 		 quote is found, or until something breaks. If you are not expecting

   
94
 * 		 newlines within fields it is therefore more robust to use NO_MULTILINE,

   
95
 * 	     which is the default for that reason.

   
96
 * <p>

   
97
 * Excel expects double quotes within fields to be escaped with a second

   
98
 * double quote. When such an embedding of double quotes is used, Excel

   
99
 * additionally expects the entire fields to be surrounded by double quotes.

   
100
 * This package follows that escape mechanism, rather than the use of

   
101
 * backslash.

   
102
 * <p>

   
103
 * <b>Tested with:</b> Pig 0.8.0, Windows Vista, Excel 2007 SP2 MSO(12.0.6545.5004).

   
104
 *  <p>

   
105
 * <b>Note:</b><br>

   
106
 * 		 When a file with newlines embedded in a field is loaded into Excel,

   
107
 * 		 the application does not automatically vertically enlarge the respective

   
108
 * 		 rows. It is therefore easy to miss when fields consist of multiple lines.

   
109
 * 	     To make the multiline rows clear:<br>

   
110
 * 		 <ul>

   
111
 * 		   <li>Select the column. 

   
112
 * 		   <li>On the home ribbon, activate the Format pull-down menu

   
113
 *         <li>Select Autofit Row Height.

   
114
 *       </ul>

   
115
 * <p> 

   
116
 * <b>Examples:</b>

   
117
 * <br>

   
118
 * 		 With multiline turned on:<br>

   
119
 *           {@code "Conrad\n}<br>

   
120
 *    	     {@code   Emil",Dinger,40}<br>

   
121
 *    	      Is read as {@code (Conrad\nEmil,Dinger,40)}

   
122
 * <p>   	      

   
123
 *         With multiline turned off:<br>   	       

   
124
 *           {@code "Conrad\n}<br>

   
125
 *  	     {@code  Emil",Dinger,40}<br>

   
126
 *    	      is read as<br>

   
127
 *    					 {@code (Conrad)}<br>

   
128
 *    	      			 {@code (Emil,Dinger,40)}

   
129
 * <p>

   
130
 *      Always:

   
131
 *          <ul>

   
132
 *          <li> {@code "Mac ""the knife""",Cohen,30}

   
133
 *            is read as {@code (Mac "the knife",Cohen,30)}

   
134
 *            

   
135
 *     		<li> {@code Jane, "nee, Smith",20}

   
136
 *            Is read as {@code (Jane,nee, Smith,20)}

   
137
 *          </ul>

   
138
 * <p>            

   
139
 *     That is, the escape character is the double quote,

   
140
 *     not backslash.

   
141
 * <p>

   
142
 * <b>Known Issues:</b> 

   
143
 * <ul>

   
144
 * 		<li> When using {@code TAB} {@code{('\t')} as the field delimiter, Excel does not 

   
145
 * 	         properly handle newlines embedded in fields. Maybe there is a trick...

   
146
 *      <li> Excel will only deal properly with embedded newlines if the file

   
147
 *           name does not have a .csv extension. 

   
148
 * </ul>

   
149
 * 
87
 * 
150
 * @author "Andreas Paepcke" <paepcke@cs.stanford.edu". 
88
 * Linebreak settings are only used during store; during load, no conversion is performed.
151
 * 					The load portion is based on Dmitriy V. Ryaboy's CSVLoader,

   
152
 * 					which in turn is loosely based on a version by James Kebinger.

   
153
 *
89
 *

    
   
90
 * WARNING: A danger with enabling multiline fields during load is that unbalanced

    
   
91
 *          double quotes will cause slurping up of input until a balancing double

    
   
92
 *          quote is found, or until something breaks. If you are not expecting

    
   
93
 *          newlines within fields it is therefore more robust to use NO_MULTILINE,

    
   
94
 *          which is the default for that reason.

    
   
95
 * 

    
   
96
 * This is Adreas Paepcke's <paepcke@cs.stanford.edu> CSVExcelStorage with a few modifications.
154
 */
97
 */
155

    
   
98

   
156
public class CSVExcelStorage extends PigStorage implements StoreFuncInterface, LoadPushDown {
99
public class CSVExcelStorage extends PigStorage implements StoreFuncInterface, LoadPushDown {
157

    
   
100

   
158
	public static enum Linebreaks {UNIX, WINDOWS, NOCHANGE};
101
    public static enum Linebreaks { UNIX, WINDOWS, NOCHANGE };
159
	public static enum Multiline {YES, NO};
102
    public static enum Multiline { YES, NO };
160
	
103
    public static enum Headers { DEFAULT, READ_INPUT_HEADER, SKIP_INPUT_HEADER, WRITE_OUTPUT_HEADER, SKIP_OUTPUT_HEADER }

    
   
104

   
161
	protected final static byte LINEFEED = '\n';
105
    protected final static byte LINEFEED = '\n';
162
	protected final static byte NEWLINE = '\r';

   
163
    protected final static byte DOUBLE_QUOTE = '"';
106
    protected final static byte DOUBLE_QUOTE = '"';
164
	protected final static byte RECORD_DEL = LINEFEED;
107
    protected final static byte RECORD_DEL = LINEFEED;
165
	
108

   
166
	private static final byte FIELD_DEL_DEFAULT = ',';
109
    private static final String FIELD_DELIMITER_DEFAULT_STR = ",";
167
	private static final String MULTILINE_DEFAULT_STR = "NOMULTILINE";
110
    private static final String MULTILINE_DEFAULT_STR = "NO_MULTILINE";
168
	private static final String LINEBREAKS_DEFAULT_STR = "NOCHANGE";
111
    private static final String EOL_DEFAULT_STR = "NOCHANGE";
169
	private static final Multiline MULTILINE_DEFAULT = Multiline.NO;
112
    private static final String HEADER_DEFAULT_STR = "DEFAULT";
170
	private static final Linebreaks LINEBREAKS_DEFAULT = Linebreaks.NOCHANGE;

   
171
    
113
    
172
	long end = Long.MAX_VALUE;
114
    long end = Long.MAX_VALUE;
173

    
   
115

   
174
	private byte fieldDelimiter = FIELD_DEL_DEFAULT;
116
    private byte fieldDelimiter = ',';
175
	private Linebreaks eolTreatment = LINEBREAKS_DEFAULT;
117
    private Multiline multilineTreatment = Multiline.NO;
176
	private Multiline multilineTreatment = MULTILINE_DEFAULT;
118
    private Linebreaks eolTreatment = Linebreaks.NOCHANGE;
177
	
119
    private Headers headerTreatment = Headers.DEFAULT;

    
   
120

   
178
    private ArrayList<Object> mProtoTuple = null;
121
    private ArrayList<Object> mProtoTuple = null;
179
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
122
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
180
    private String signature;
123
    private String udfContextSignature;
181
    private String loadLocation;
124
    private String loadLocation;
182
    private boolean[] mRequiredColumns = null;
125
    private boolean[] mRequiredColumns = null;
183
    private boolean mRequiredColumnsInitialized = false;
126
    private boolean mRequiredColumnsInitialized = false;
184
    
127
    
185
	final Logger logger = Logger.getLogger(getClass().getName());
128
    final Logger logger = Logger.getLogger(getClass().getName());
186
	

   
187
	// Counters for in an out records:

   
188
	/*   CODE FOR COUNTERS. Also, look for CSVRecords.READ, and CSVRecords.WRITTEN for the other pieces. 

   
189
	protected static enum CSVRecords {

   
190
		READ,

   
191
		WRITTEN

   
192
	};

   
193
	PigStatusReporter reporter = PigStatusReporter.getInstance();

   
194

    
   
129

   
195
	*/

   
196
	

   
197
	@SuppressWarnings("rawtypes")
130
    @SuppressWarnings("rawtypes")
198
    protected RecordReader in = null;    
131
    protected RecordReader in = null;    
199

    
   
132

   
200
	// For replacing LF with CRLF (Unix --> Windows end-of-line convention):
133
    // For replacing LF with CRLF (Unix --> Windows end-of-line convention):
201
	Pattern loneLFDetectorPattern = Pattern.compile("([^\r])\n", Pattern.DOTALL | Pattern.MULTILINE);
134
    Pattern loneLFDetectorPattern = Pattern.compile("([^\r])\n", Pattern.DOTALL | Pattern.MULTILINE);
202
	Matcher loneLFDetector = loneLFDetectorPattern.matcher("");
135
    Matcher loneLFDetector = loneLFDetectorPattern.matcher("");
203
	
136

   
204
	// For removing CR (Windows --> Unix):
137
    // For removing CR (Windows --> Unix):
205
	Pattern CRLFDetectorPattern = Pattern.compile("\r\n", Pattern.DOTALL | Pattern.MULTILINE);
138
    Pattern CRLFDetectorPattern = Pattern.compile("\r\n", Pattern.DOTALL | Pattern.MULTILINE);
206
	Matcher CRLFDetector = CRLFDetectorPattern.matcher("");
139
    Matcher CRLFDetector = CRLFDetectorPattern.matcher("");
207

    
   
140

   
208
	
141

   
209
	// Pig Storage with COMMA as delimiter:
142
    // Pig Storage with COMMA as delimiter:
210
		TupleFactory tupleMaker = TupleFactory.getInstance();
143
    TupleFactory tupleMaker = TupleFactory.getInstance();
211
	private boolean getNextInQuotedField;
144
    private boolean getNextInQuotedField;
212
	private int getNextFieldID;
145
    private int getNextFieldID;
213
	private boolean nextTupleSkipChar;
146
    private boolean nextTupleSkipChar;
214
	
147

   

    
   
148
    // For handling headers

    
   
149
    private boolean loadingFirstRecord = true;

    
   
150
    private boolean storingFirstRecord = true;

    
   
151
    private String header = null;

    
   
152
    private int splitIndex;

    
   
153

   

    
   
154
    private static final String SCHEMA_SIGNATURE = "pig.csvexcelstorage.schema";

    
   
155
    protected ResourceSchema schema = null;

    
   
156

   
215
	/*-----------------------------------------------------
157
    /*-----------------------------------------------------
216
	| Constructors 
158
    | Constructors 
217
	------------------------*/
159
    ------------------------*/
218
		
160

   
219
		

   
220
    /**

   
221
     * Constructs a CSVExcel load/store that uses {@code comma} as the

   
222
     * field delimiter, terminates records on reading a newline

   
223
     * within a field (even if the field is enclosed in double quotes),

   
224
     * and uses {@code LF} as line terminator. 

   
225
     * 

   
226
     */

   
227
    public CSVExcelStorage() {
161
    public CSVExcelStorage() {
228
    	super(new String(new byte[] {FIELD_DEL_DEFAULT}));
162
        super(FIELD_DELIMITER_DEFAULT_STR);

    
   
163
        initializeInstance(FIELD_DELIMITER_DEFAULT_STR, MULTILINE_DEFAULT_STR, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
229
    }
164
    }
230
    
165
    
231
    /**

   
232
     * Constructs a CSVExcel load/store that uses specified string as a field delimiter.

   
233
     * 

   
234
     * @param delimiter

   
235
     *            the single byte character that is used to separate fields.

   
236
     *            ("," is the default.)

   
237
     */

   
238
    public CSVExcelStorage(String delimiter) {
166
    public CSVExcelStorage(String delimiter) {
239
    	super(delimiter);
167
        super(delimiter);
240
        initializeInstance(delimiter, MULTILINE_DEFAULT_STR, LINEBREAKS_DEFAULT_STR);
168
        initializeInstance(delimiter, MULTILINE_DEFAULT_STR, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
241
    }
169
    }
242
		
170

   
243
    /**
171
    public CSVExcelStorage(String delimiter, String multilineTreatmentStr) {
244
     * Constructs a CSVExcel load/store that uses specified string 

   
245
     * as a field delimiter, and allows specification whether to handle

   
246
     * line breaks within fields. 

   
247
     * <ul>

   
248
     * 		<li> For NO_MULTILINE, every line break 

   
249
     * 			 will be considered an end-of-record. 

   
250
     * 		<li> For YES_MULTILINE, fields may include newlines, 

   
251
     * 			 as long as the fields are enclosed in 

   
252
     * 			 double quotes.

   
253
     * </ul>  

   
254
     * <b>Pig example:</b><br>

   
255
     * {@code STORE a INTO '/tmp/foo.csv'}<br>

   
256
     * {@code USING org.apache.pig.piggybank.storage.CSVExcelStorage(",", "YES_MULTILINE");} 

   
257
     * 

   
258
     * @param delimiter

   
259
     *            the single byte character that is used to separate fields.

   
260
     *            ("," is the default.)

   
261
     * @param multilineTreatment

   
262
     * 			  "YES_MULTILINE" or "NO_MULTILINE"

   
263
     *            ("NO_MULTILINE is the default.)

   
264
     */

   
265
    public CSVExcelStorage(String delimiter, String multilineTreatment) {

   
266
    	super(delimiter);
172
        super(delimiter);
267
        initializeInstance(delimiter, multilineTreatment, LINEBREAKS_DEFAULT_STR);
173
        initializeInstance(delimiter, multilineTreatmentStr, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
268
    }
174
    }
269

    
   
175

   
270
    /**
176
    public CSVExcelStorage(String delimiter, String multilineTreatmentStr, String eolTreatmentStr) {
271
     * Constructs a CSVExcel load/store that uses specified string 

   
272
     * as a field delimiter, provides choice whether to manage multiline 

   
273
     * fields, and specifies chars used for end of line.

   
274
     * <p> 

   
275
     * The eofTreatment parameter is only relevant for STORE():

   
276
     * <ul>

   
277
     * <li>      For "UNIX", newlines will be stored as LF chars

   
278
     * <li>      For "WINDOWS", newlines will be stored as CRLF

   
279
     * </ul>

   
280
     * <b>Pig example:</b><br>

   
281
     * {@code STORE a INTO '/tmp/foo.csv'}<br>

   
282
     * {@code USING org.apache.pig.piggybank.storage.CSVExcelStorage(",", "NO_MULTILINE", "WINDOWS");} 

   
283
     * 

   
284
     * @param delimiter

   
285
     *            the single byte character that is used to separate fields.

   
286
     *            ("," is the default.)

   
287
     * @param String 

   
288
     * 			  "YES_MULTILINE" or "NO_MULTILINE"

   
289
     *            ("NO_MULTILINE is the default.)

   
290
     * @param eolTreatment

   
291
     * 			  "UNIX", "WINDOWS", or "NOCHANGE" 

   
292
     *            ("NOCHANGE" is the default.)

   
293
     */

   
294
    public CSVExcelStorage(String delimiter, String multilineTreatment,  String eolTreatment) {

   
295
    	super(delimiter);
177
        super(delimiter);
296
    	initializeInstance(delimiter, multilineTreatment, eolTreatment);
178
        initializeInstance(delimiter, multilineTreatmentStr, eolTreatmentStr, HEADER_DEFAULT_STR);
297
    }
179
    }
298

    
   
180

   

    
   
181
    public CSVExcelStorage(String delimiter, String multilineTreatmentStr, String eolTreatmentStr, String headerTreatmentStr) {

    
   
182
        super(delimiter);

    
   
183
        initializeInstance(delimiter, multilineTreatmentStr, eolTreatmentStr, headerTreatmentStr);

    
   
184
    }
299
    
185
    
300
    private void initializeInstance(String delimiter, String multilineStr, String theEofTreatment) {
186
    private void initializeInstance(String delimiter, String multilineTreatmentStr, String eolTreatmentStr, String headerTreatmentStr) {
301
        fieldDelimiter = StorageUtil.parseFieldDel(delimiter);
187
        fieldDelimiter = StorageUtil.parseFieldDel(delimiter);
302
        multilineTreatment = canonicalizeMultilineTreatmentRequest(multilineStr);
188

   
303
        eolTreatment = canonicalizeEOLTreatmentRequest(theEofTreatment);
189
        multilineTreatment = canonicalizeMultilineTreatmentRequest(multilineTreatmentStr);

    
   
190
        eolTreatment = canonicalizeEOLTreatmentRequest(eolTreatmentStr);

    
   
191
        headerTreatment = canonicalizeHeaderTreatmentRequest(headerTreatmentStr);
304
    }
192
    }
305
    
193
    
306
    private Multiline canonicalizeMultilineTreatmentRequest(String theMultilineStr) {
194
    private Multiline canonicalizeMultilineTreatmentRequest(String multilineTreatmentStr) {
307
    	if (theMultilineStr.equalsIgnoreCase("YES_MULTILINE")) {
195
        if (multilineTreatmentStr.equalsIgnoreCase("YES_MULTILINE"))
308
    		return Multiline.YES;
196
            return Multiline.YES;
309
    	}
197
        else if (multilineTreatmentStr.equalsIgnoreCase("NO_MULTILINE"))
310
    	if (theMultilineStr.equalsIgnoreCase("NO_MULTILINE")) {

   
311
    		return Multiline.NO;
198
            return Multiline.NO;

    
   
199

   

    
   
200
        throw new IllegalArgumentException(

    
   
201
                "Unrecognized multiline treatment argument " + multilineTreatmentStr + ". " +

    
   
202
                "Should be either 'YES_MULTILINE' or 'NO_MULTILINE'");
312
    	}
203
    }
313
    	return MULTILINE_DEFAULT;

   
314
    }

   
315
    
204
    
316
    private Linebreaks canonicalizeEOLTreatmentRequest (String theEolTreatmentStr) {
205
    private Linebreaks canonicalizeEOLTreatmentRequest(String eolTreatmentStr) {
317
    	if (theEolTreatmentStr.equalsIgnoreCase("Unix"))
206
        if (eolTreatmentStr.equalsIgnoreCase("UNIX"))
318
    		return Linebreaks.UNIX;
207
            return Linebreaks.UNIX;
319
    	if (theEolTreatmentStr.equalsIgnoreCase("Windows"))
208
        else if (eolTreatmentStr.equalsIgnoreCase("WINDOWS"))
320
    		return Linebreaks.WINDOWS;
209
            return Linebreaks.WINDOWS;
321
    	return LINEBREAKS_DEFAULT;
210
        else if (eolTreatmentStr.equalsIgnoreCase("NOCHANGE"))

    
   
211
            return Linebreaks.NOCHANGE;

    
   
212

   

    
   
213
        throw new IllegalArgumentException(

    
   
214
                "Unrecognized end-of-line treatment argument " + eolTreatmentStr + ". " +

    
   
215
                "Should be one of 'UNIX', 'WINDOWS', or 'NOCHANGE'");

    
   
216
    }

    
   
217

   

    
   
218
    private Headers canonicalizeHeaderTreatmentRequest(String headerTreatmentStr) {

    
   
219
        if (headerTreatmentStr.equalsIgnoreCase("DEFAULT"))

    
   
220
            return Headers.DEFAULT;

    
   
221
        else if (headerTreatmentStr.equalsIgnoreCase("READ_INPUT_HEADER"))

    
   
222
            return Headers.READ_INPUT_HEADER;

    
   
223
        else if (headerTreatmentStr.equalsIgnoreCase("SKIP_INPUT_HEADER"))

    
   
224
            return Headers.SKIP_INPUT_HEADER;

    
   
225
        else if (headerTreatmentStr.equalsIgnoreCase("WRITE_OUTPUT_HEADER"))

    
   
226
            return Headers.WRITE_OUTPUT_HEADER;

    
   
227
        else if (headerTreatmentStr.equalsIgnoreCase("SKIP_OUTPUT_HEADER"))

    
   
228
            return Headers.SKIP_OUTPUT_HEADER;

    
   
229

   

    
   
230
        throw new IllegalArgumentException(

    
   
231
            "Unrecognized header treatment argument " + headerTreatmentStr + ". " +

    
   
232
            "Should be one of 'READ_INPUT_HEADER', 'SKIP_INPUT_HEADER', 'WRITE_OUTPUT_HEADER', 'SKIP_OUTPUT_HEADER'");
322
    }
233
    }
323
    
234
    
324
    // ---------------------------------------- STORAGE -----------------------------
235
    // ---------------------------------------- STORAGE -----------------------------
325
    
236
    
326
	/*-----------------------------------------------------
237
    public void checkSchema(ResourceSchema s) throws IOException {
327
	| putNext()
238
        // Not actually checking schema
328
	------------------------*/
239
        // Actually, just storing it to use in the backend
329
				
240
        

    
   
241
        UDFContext udfc = UDFContext.getUDFContext();

    
   
242
        Properties p =

    
   
243
            udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });

    
   
244
        p.setProperty(SCHEMA_SIGNATURE, s.toString());

    
   
245
    }

    
   
246

   

    
   
247
    public void prepareToWrite(RecordWriter writer) {

    
   
248
        // Get the schema string from the UDFContext object.

    
   
249
        UDFContext udfc = UDFContext.getUDFContext();

    
   
250
        Properties p =

    
   
251
            udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });

    
   
252

   

    
   
253
        String strSchema = p.getProperty(SCHEMA_SIGNATURE);

    
   
254
        if (strSchema != null) {

    
   
255
            // Parse the schema from the string stored in the properties object.

    
   
256
            try {

    
   
257
                schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));

    
   
258
            } catch (ParserException pex) {

    
   
259
                logger.warn("Could not parse schema for storing.");

    
   
260
            }

    
   
261
        }

    
   
262

   

    
   
263
        if (headerTreatment == Headers.DEFAULT) {

    
   
264
            headerTreatment = Headers.SKIP_OUTPUT_HEADER;

    
   
265
        }

    
   
266

   

    
   
267
        // PigStorage's prepareToWrite()

    
   
268
        super.prepareToWrite(writer);

    
   
269
    }

    
   
270

   
330
    /* (non-Javadoc)
271
    /* (non-Javadoc)
331
     * @see org.apache.pig.builtin.PigStorage#putNext(org.apache.pig.data.Tuple)
272
     * @see org.apache.pig.builtin.PigStorage#putNext(org.apache.pig.data.Tuple)
332
     * 
273
     * 
333
     * Given a tuple that corresponds to one record, write
274
     * Given a tuple that corresponds to one record, write
334
     * it out as CSV, converting among Unix/Windows line
275
     * it out as CSV, converting among Unix/Windows line
335
     * breaks as requested in the instantiation. Also take
276
     * breaks as requested in the instantiation. Also take
336
     * care of escaping field delimiters, double quotes,
277
     * care of escaping field delimiters, double quotes,
337
     * and linebreaks embedded within fields,
278
     * and linebreaks embedded within fields,
338
     * 
279
     * 
339
     */
280
     */
340
    @Override
281
    @Override
341
    public void putNext(Tuple tupleToWrite) throws IOException {
282
    public void putNext(Tuple tupleToWrite) throws IOException {
342
    	
283
        // If WRITE_OUTPUT_HEADER, store a header record with the names of each field

    
   
284
        if (storingFirstRecord && headerTreatment == Headers.WRITE_OUTPUT_HEADER && schema != null) {

    
   
285
            ArrayList<Object> headerProtoTuple = new ArrayList<Object>();

    
   
286
            ResourceFieldSchema[] fields = schema.getFields();

    
   
287
            for (ResourceFieldSchema field : fields) {

    
   
288
                headerProtoTuple.add(field.getName());

    
   
289
            }

    
   
290
            super.putNext(tupleMaker.newTuple(headerProtoTuple));

    
   
291
        }

    
   
292
        storingFirstRecord = false;

    
   
293

   
343
    	ArrayList<Object> mProtoTuple = new ArrayList<Object>();
294
        ArrayList<Object> mProtoTuple = new ArrayList<Object>();
344
    	int embeddedNewlineIndex = -1;
295
        int embeddedNewlineIndex = -1;
345
    	String fieldStr = null;
296
        String fieldStr = null;
346
    	// For good debug messages:
297
        // For good debug messages:
347
    	int fieldCounter = -1;
298
        int fieldCounter = -1;
348
    	
299
        
349
    	// Do the escaping:
300
        // Do the escaping:
350
    	for (Object field : tupleToWrite.getAll()) {
301
        for (Object field : tupleToWrite.getAll()) {
351
    		fieldCounter++;
302
            fieldCounter++;

    
   
303

   

    
   
304
            // Substitute a null value with an empty string. See PIG-2470.
352
    		if (field == null) {
305
            if (field == null) {
353
    			logger.warn("Field " + fieldCounter + " within tuple '" + tupleToWrite + "' is null.");
306
                mProtoTuple.add("");
354
    			return;
307
                continue;
355
    		}
308
            }
356
    		
309
            
357
    		fieldStr = field.toString();
310
            fieldStr = field.toString();
358
    		
311
            
359
    		// Embedded double quotes are replaced by two double quotes:
312
            // Embedded double quotes are replaced by two double quotes:
360
    		fieldStr = fieldStr.replaceAll("[\"]", "\"\"");
313
            fieldStr = fieldStr.replaceAll("[\"]", "\"\"");
361
    		
314
            
362
    		// If any field delimiters are in the field, or if we did replace
315
            // If any field delimiters are in the field, or if we did replace
363
    		// any double quotes with a pair of double quotes above,
316
            // any double quotes with a pair of double quotes above,
364
    		// or if the string includes a newline character (LF:\n:0x0A)
317
            // or if the string includes a newline character (LF:\n:0x0A)
365
    		// and we are to allow newlines in fields,
318
            // and we are to allow newlines in fields,
366
    		// then the entire field must be enclosed in double quotes:
319
            // then the entire field must be enclosed in double quotes:
367
    		embeddedNewlineIndex =  fieldStr.indexOf(LINEFEED);
320
            embeddedNewlineIndex =  fieldStr.indexOf(LINEFEED);
368
    		
321
            
369
    		if ((fieldStr.indexOf(fieldDelimiter) != -1) || 
322
            if ((fieldStr.indexOf(fieldDelimiter) != -1) || 
370
    			(fieldStr.indexOf(DOUBLE_QUOTE) != -1) ||
323
                (fieldStr.indexOf(DOUBLE_QUOTE) != -1) ||
371
    			(multilineTreatment == Multiline.YES) && (embeddedNewlineIndex != -1))  {
324
                (multilineTreatment == Multiline.YES) && (embeddedNewlineIndex != -1))  {
372
    			fieldStr = "\"" + fieldStr + "\"";
325
                fieldStr = "\"" + fieldStr + "\"";
373
    		}
326
            }
374
    		
327
            
375
    		// If requested: replace any Linefeed-only (^J), with LF-Newline (^M^J),
328
            // If requested: replace any Linefeed-only (^J), with LF-Newline (^M^J),
376
    		// This is needed for Excel to recognize a field-internal 
329
            // This is needed for Excel to recognize a field-internal 
377
    		// new line:
330
            // new line:
378

    
   
331

   
379
    		if ((eolTreatment != Linebreaks.NOCHANGE) && (embeddedNewlineIndex != -1)) {
332
            if ((eolTreatment != Linebreaks.NOCHANGE) && (embeddedNewlineIndex != -1)) {
[+20] [20] 5 lines
[+20] private Linebreaks canonicalizeEOLTreatmentRequest (String theEolTreatmentStr) { public void prepareToWrite(RecordWriter writer) {
385
    				CRLFDetector.reset(fieldStr);
338
                    CRLFDetector.reset(fieldStr);
386
    				fieldStr = CRLFDetector.replaceAll("\n");
339
                    fieldStr = CRLFDetector.replaceAll("\n");
387
    			}
340
                }
388
    		}
341
            }
389

    
   
342

   
390
    		mProtoTuple.add(fieldStr);    		
343
            mProtoTuple.add(fieldStr);          
391
    	}
344
        }
392
    	// If Windows line breaks are requested, append 
345
        // If Windows line breaks are requested, append 
393
    	// a newline (0x0D a.k.a. ^M) to the last field
346
        // a newline (0x0D a.k.a. ^M) to the last field
394
    	// so that the row termination will end up being
347
        // so that the row termination will end up being
395
    	// \r\n, once the superclass' putNext() method
348
        // \r\n, once the superclass' putNext() method
396
    	// is done below:
349
        // is done below:
397

    
   
350

   
398
    	if ((eolTreatment == Linebreaks.WINDOWS) && (fieldStr != null))
351
        if ((eolTreatment == Linebreaks.WINDOWS) && (fieldStr != null))
399
    		mProtoTuple.set(mProtoTuple.size() - 1, fieldStr + "\r"); 
352
            mProtoTuple.set(mProtoTuple.size() - 1, fieldStr + "\r"); 
400

    
   
353

   
401
    	Tuple resTuple = tupleMaker.newTuple(mProtoTuple);
354
        Tuple resTuple = tupleMaker.newTuple(mProtoTuple);
402
    	// Checking first for debug enabled is faster:

   
403
    	if (logger.isDebugEnabled())

   
404
    		logger.debug("Row: " + resTuple);

   
405
    	super.putNext(resTuple);
355
        super.putNext(resTuple);
406
        // Increment number of records written (Couldn't get counters to work. So commented out.
356
    }
407
    	//if (reporter != null)
357

   
408
    	//	reporter.getCounter(CSVRecords.WRITTEN).increment(1L);
358
    // ---------------------------------------- LOADING  -----------------------------  
409
    }
359

   
410
	

   
411
    // ---------------------------------------- LOADING  -----------------------------	

   
412
	

   
413
    /* (non-Javadoc)
360
    /* (non-Javadoc)
414
     * @see org.apache.pig.builtin.PigStorage#getNext()
361
     * @see org.apache.pig.builtin.PigStorage#getNext()
415
     */
362
     */
416
    @Override
363
    @Override
417
    public Tuple getNext() throws IOException {
364
    public Tuple getNext() throws IOException {

    
   
365
        // If SKIP_INPUT_HEADER and this is the first input split, skip header record

    
   
366
        // We store its value as a string though, so we can compare

    
   
367
        // further records to it. If they are the same (this would 

    
   
368
        // happen if multiple small files each with a header were combined

    
   
369
        // into one split), we know to skip the duplicate header record as well.

    
   
370
        if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER && splitIndex == 0) {

    
   
371
            try {

    
   
372
                if (!in.nextKeyValue())

    
   
373
                    return null;

    
   
374
                header = ((Text) in.getCurrentValue()).toString();

    
   
375
            } catch (InterruptedException e) {

    
   
376
                int errCode = 6018;

    
   
377
                String errMsg = "Error while reading input";

    
   
378
                throw new ExecException(errMsg, errCode, 

    
   
379
                        PigException.REMOTE_ENVIRONMENT, e);

    
   
380
            }

    
   
381
        }

    
   
382
        loadingFirstRecord = false;

    
   
383

   
418
        mProtoTuple = new ArrayList<Object>();
384
        mProtoTuple = new ArrayList<Object>();
419

    
   
385

   
420
        getNextInQuotedField = false;
386
        getNextInQuotedField = false;
421
        boolean evenQuotesSeen = true;
387
        boolean evenQuotesSeen = true;
422
        boolean sawEmbeddedRecordDelimiter = false;
388
        boolean sawEmbeddedRecordDelimiter = false;
423
        byte[] buf = null;
389
        byte[] buf = null;
424
        
390
        
425
        if (!mRequiredColumnsInitialized) {
391
        if (!mRequiredColumnsInitialized) {
426
            if (signature != null) {
392
            if (udfContextSignature != null) {
427
                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
393
                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
428
                mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
394
                mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p.getProperty(udfContextSignature));
429
            }
395
            }
430
            mRequiredColumnsInitialized = true;
396
            mRequiredColumnsInitialized = true;
431
        }
397
        }
432
        // Note: we cannot factor out the check for nextKeyValue() being null,
398
        // Note: we cannot factor out the check for nextKeyValue() being null,
433
        // because that call overwrites buf with the new line, which is
399
        // because that call overwrites buf with the new line, which is
434
        // bad if we have a field with a newline.
400
        // bad if we have a field with a newline.
435

    
   
401

   
436
        try {
402
        try {
437
        	int recordLen = 0;
403
            int recordLen = 0;
438
        	getNextFieldID = 0;
404
            getNextFieldID = 0;
439
        	
405
            
440
        	while (sawEmbeddedRecordDelimiter || getNextFieldID == 0) {
406
            while (sawEmbeddedRecordDelimiter || getNextFieldID == 0) {
441
        		Text value = null;
407
                Text value = null;
442
        		if (sawEmbeddedRecordDelimiter) {
408
                if (sawEmbeddedRecordDelimiter) {
443
        			
409
                    
444
        			// Deal with pulling more records from the input, because
410
                    // Deal with pulling more records from the input, because
445
        			// a double quoted embedded newline was encountered in a field.
411
                    // a double quoted embedded newline was encountered in a field.
446
        			// Save the length of the record so far, plus one byte for the 
412
                    // Save the length of the record so far, plus one byte for the 
447
        			// record delimiter (usually newline) that's embedded in the field 
413
                    // record delimiter (usually newline) that's embedded in the field 
448
        			// we were working on before falling into this branch:
414
                    // we were working on before falling into this branch:
449
        			int prevLineLen = recordLen + 1;
415
                    int prevLineLen = recordLen + 1;
450
        			
416
                    
451
        			// Save previous line (the one with the field that has the newline) in a new array.
417
                    // Save previous line (the one with the field that has the newline) in a new array.
452
        			// The last byte will be random; we'll fill in the embedded
418
                    // The last byte will be random; we'll fill in the embedded
453
        			// record delimiter (usually newline) below:
419
                    // record delimiter (usually newline) below:
454
        			byte[] prevLineSaved = Arrays.copyOf(buf, prevLineLen);
420
                    byte[] prevLineSaved = Arrays.copyOf(buf, prevLineLen);
455
        			prevLineSaved[prevLineLen - 1] = RECORD_DEL;
421
                    prevLineSaved[prevLineLen - 1] = RECORD_DEL;
456
        			
422
                    
457
        			// Read the continuation of the record, unless EOF:
423
                    // Read the continuation of the record, unless EOF:
458
        			if (!in.nextKeyValue()) {
424
                    if (!in.nextKeyValue()) {
459
        				return null;
425
                        return null;
460
        			}                                                                                           
426
                    }                                                                                           
461
        			value = (Text) in.getCurrentValue();
427
                    value = (Text) in.getCurrentValue();
462
        			recordLen = value.getLength();
428
                    recordLen = value.getLength();
463
        			// Grab the continuation's bytes:
429
                    // Grab the continuation's bytes:
464
        			buf = value.getBytes();
430
                    buf = value.getBytes();
465
        			
431
                    
466
        			// Combine the previous line and the continuation into a new array.
432
                    // Combine the previous line and the continuation into a new array.
467
        			// The following copyOf() does half the job: it allocates all the
433
                    // The following copyOf() does half the job: it allocates all the
468
        			// space, and also copies the previous line into that space:
434
                    // space, and also copies the previous line into that space:
469
        			byte[] prevLineAndContinuation = Arrays.copyOf(prevLineSaved, prevLineLen + recordLen);
435
                    byte[] prevLineAndContinuation = Arrays.copyOf(prevLineSaved, prevLineLen + recordLen);
470
        			
436
                    
471
        			

   
472
        			// Now append the continuation. Parms: fromBuf, fromStartPos, toBuf, toStartPos, lengthToCopy:
437
                    // Now append the continuation. Parms: fromBuf, fromStartPos, toBuf, toStartPos, lengthToCopy:
473
        			System.arraycopy(buf, 0, prevLineAndContinuation, prevLineLen, recordLen);
438
                    System.arraycopy(buf, 0, prevLineAndContinuation, prevLineLen, recordLen);
474
        			
439
                    
475
        			// We'll work with the combination now:
440
                    // We'll work with the combination now:
476
        			buf = prevLineAndContinuation;
441
                    buf = prevLineAndContinuation;
477
        			
442
                    
478
        			// Do the whole record over from the start:
443
                    // Do the whole record over from the start:
479
        			mProtoTuple.clear();
444
                    mProtoTuple.clear();
480
        			getNextInQuotedField = false;
445
                    getNextInQuotedField = false;
481
        			evenQuotesSeen = true;
446
                    evenQuotesSeen = true;
482
        			getNextFieldID = 0;
447
                    getNextFieldID = 0;
483
        			recordLen = prevLineAndContinuation.length;
448
                    recordLen = prevLineAndContinuation.length;
484
        			
449
                    
485
        		} else {
450
                } else {
486
        			// Previous record finished cleanly: start with the next record,
451
                    // Previous record finished cleanly: start with the next record,
487
        			// unless EOF:
452
                    // unless EOF:
488
        			if (!in.nextKeyValue()) {
453
                    if (!in.nextKeyValue()) {
489
        				return null;
454
                        return null;
490
        			}                                                                                           
455
                    }                                                                                           
491
        			value = (Text) in.getCurrentValue();
456
                    value = (Text) in.getCurrentValue();

    
   
457

   

    
   
458
                    // if the line is a duplicate header and 'SKIP_INPUT_HEADER' is set, ignore it

    
   
459
                    // (this might happen if multiple files each with a header are combined into a single split)

    
   
460
                    if (headerTreatment == Headers.SKIP_INPUT_HEADER && value.toString().equals(header)) {

    
   
461
                        if (!in.nextKeyValue())

    
   
462
                            return null;

    
   
463
                        value = (Text) in.getCurrentValue();

    
   
464
                    }

    
   
465

   
492
        			buf = value.getBytes();
466
                    buf = value.getBytes();
493
        			getNextFieldID = 0;
467
                    getNextFieldID = 0;
494
        			recordLen = value.getLength();
468
                    recordLen = value.getLength();
495
        		}
469
                }
496
        		
470
                
497
        		nextTupleSkipChar = false;
471
                nextTupleSkipChar = false;
498

    
   
472

   
499
        		ByteBuffer fieldBuffer = ByteBuffer.allocate(recordLen);
473
                ByteBuffer fieldBuffer = ByteBuffer.allocate(recordLen);
500

    
   
474

   
501
                        sawEmbeddedRecordDelimiter = processOneInRecord(evenQuotesSeen,
475
                sawEmbeddedRecordDelimiter = processOneInRecord(evenQuotesSeen,
[+20] [20] 14 lines
[+20] private Linebreaks canonicalizeEOLTreatmentRequest (String theEolTreatmentStr) { public void prepareToWrite(RecordWriter writer) {
516
        	throw new ExecException(errMsg, errCode, 
490
            throw new ExecException(errMsg, errCode, 
517
        			PigException.REMOTE_ENVIRONMENT, e);
491
                    PigException.REMOTE_ENVIRONMENT, e);
518
        }
492
        }
519

    
   
493

   
520
        Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
494
        Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
521
        // Increment number of records read (Couldn't get counters to work. So commented out.

   
522
        //if (reporter != null)

   
523
        //	reporter.getCounter(CSVRecords.READ).increment(1L);

   
524
        return t;
495
        return t;
525
    }
496
    }
526

    
   
497

   
527
	/*
498
    /*
528
	 * Service method for getNext().
499
     * Service method for getNext().
[+20] [20] 52 lines
[+20] [+] private boolean processOneInRecord(boolean evenQuotesSeen,
581
						fieldBuffer.put(b);
552
                        fieldBuffer.put(b);
582
						nextTupleSkipChar = true;
553
                        nextTupleSkipChar = true;
583
						continue;
554
                        continue;
584
					}
555
                    }
585
					evenQuotesSeen = !evenQuotesSeen;
556
                    evenQuotesSeen = !evenQuotesSeen;

    
   
557

   

    
   
558
                    // If the quote is ending the last field in a record,

    
   
559
                    // set the genNextInQuotedField flag to false,

    
   
560
                    // so the return statement conditional (see below)

    
   
561
                    // is false, indicating that we're ready for the next record

    
   
562
                    if (!evenQuotesSeen && i == recordLen - 1) {

    
   
563
                        getNextInQuotedField = false;

    
   
564
                    }

    
   
565

   
586
					if (evenQuotesSeen) {
566
                    if (evenQuotesSeen) {
587
						fieldBuffer.put(DOUBLE_QUOTE);
567
                        fieldBuffer.put(DOUBLE_QUOTE);
588
					}
568
                    }
589
				} else
569
                } else
590
					if (!evenQuotesSeen &&
570
                    if (!evenQuotesSeen &&
591
							(b == fieldDelimiter || b == RECORD_DEL)) {
571
                            (b == fieldDelimiter || b == RECORD_DEL)) {
592
						getNextInQuotedField = false;
572
                        getNextInQuotedField = false;
593
						readField(fieldBuffer, getNextFieldID++);
573
                        readField(fieldBuffer, getNextFieldID++);
594
					} else {
574
                    } else {
595
						fieldBuffer.put(b);
575
                        fieldBuffer.put(b);
596
					}
576
                    }
597
			} else if (b == DOUBLE_QUOTE) {
577
            } else if (b == DOUBLE_QUOTE) {
598
				// Does a double quote immediately follow?                	
578
                // Does a double quote immediately follow?                  
599
				if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
579
                if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
600
					fieldBuffer.put(b);
580
                    fieldBuffer.put(b);
601
					nextTupleSkipChar = true;
581
                    nextTupleSkipChar = true;
602
					continue;
582
                    continue;
603
				}
583
                }
[+20] [20] 38 lines
[+20] [+] public InputFormat getInputFormat() {
642
    }
622
    }
643

    
   
623

   
644
    @Override
624
    @Override
645
    public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {
625
    public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {
646
        in = reader;
626
        in = reader;

    
   
627
        splitIndex = split.getSplitIndex();

    
   
628
        

    
   
629
        if (headerTreatment == Headers.DEFAULT) {

    
   
630
            headerTreatment = Headers.READ_INPUT_HEADER;

    
   
631
        }
647
    }
632
    }
648

    
   
633

   
649
    @Override
634
    @Override
650
    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
635
    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
651
        if (requiredFieldList == null)
636
        if (requiredFieldList == null)
[+20] [20] 14 lines
[+20] public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
666
                if (rf.getIndex()!=-1)
651
                if (rf.getIndex()!=-1)
667
                    mRequiredColumns[rf.getIndex()] = true;
652
                    mRequiredColumns[rf.getIndex()] = true;
668
            }
653
            }
669
            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
654
            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
670
            try {
655
            try {
671
                p.setProperty(signature, ObjectSerializer.serialize(mRequiredColumns));
656
                p.setProperty(udfContextSignature, ObjectSerializer.serialize(mRequiredColumns));
672
            } catch (Exception e) {
657
            } catch (Exception e) {
673
                throw new RuntimeException("Cannot serialize mRequiredColumns");
658
                throw new RuntimeException("Cannot serialize mRequiredColumns");
674
            }
659
            }
675
        }
660
        }
676
        return new RequiredFieldResponse(true);
661
        return new RequiredFieldResponse(true);
677
    }
662
    }
678

    
   
663

   
679
    @Override
664
    @Override
680
    public void setUDFContextSignature(String signature) {
665
    public void setUDFContextSignature(String signature) {
681
        this.signature = signature; 
666
        this.udfContextSignature = signature; 
682
    }
667
    }
683

    
   
668

   
684
    @Override
669
    @Override
685
    public List<OperatorSet> getFeatures() {
670
    public List<OperatorSet> getFeatures() {
686
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
671
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
687
    }
672
    }
688

    
   

   
689
}
673
}
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java
Revision 9bed527 New Change
 
  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java: Loading...