Review Board 1.7.22


PIG-3141 [piggybank] Giving CSVExcelStorage an option to handle header rows

Review Request #9697 - Created March 1, 2013 and updated

Jonathan Packer
trunk
Reviewers
pig
pig-git
Reviewboard for https://issues.apache.org/jira/browse/PIG-3141

Adds a "header treatment" option to CSVExcelStorage allowing header rows (first row with column names) in files to be skipped when loading, or for a header row with column names to be written when storing. Should be backwards compatible--all unit-tests from the old CSVExcelStorage pass.
cd contrib/piggybank/java
ant -Dtestcase=TestCSVExcelStorage test

Diff revision 2

This is not the most recent revision of the diff. The latest diff is revision 3. See what's changed.

1 2 3
1 2 3

  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java: Loading...
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
Revision 568b3f3 New Change
[20] 28 lines
[+20]
29

    
   
29

   
30
import org.apache.hadoop.io.Text;
30
import org.apache.hadoop.io.Text;
31
import org.apache.hadoop.mapreduce.InputFormat;
31
import org.apache.hadoop.mapreduce.InputFormat;
32
import org.apache.hadoop.mapreduce.Job;
32
import org.apache.hadoop.mapreduce.Job;
33
import org.apache.hadoop.mapreduce.RecordReader;
33
import org.apache.hadoop.mapreduce.RecordReader;

    
   
34
import org.apache.hadoop.mapreduce.RecordWriter;
34
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
35
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    
   
36

   
35
import org.apache.log4j.Logger;
37
import org.apache.log4j.Logger;

    
   
38

   
36
import org.apache.pig.LoadPushDown;
39
import org.apache.pig.LoadPushDown;
37
import org.apache.pig.PigException;
40
import org.apache.pig.PigException;

    
   
41
import org.apache.pig.ResourceSchema;

    
   
42
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
38
import org.apache.pig.StoreFuncInterface;
43
import org.apache.pig.StoreFuncInterface;

    
   
44

   
39
import org.apache.pig.backend.executionengine.ExecException;
45
import org.apache.pig.backend.executionengine.ExecException;
40
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
46
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
41
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
47
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;

    
   
48

   
42
import org.apache.pig.builtin.PigStorage;
49
import org.apache.pig.builtin.PigStorage;
43
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
50
import org.apache.pig.bzip2r.Bzip2TextInputFormat;

    
   
51

   
44
import org.apache.pig.data.DataByteArray;
52
import org.apache.pig.data.DataByteArray;
45
import org.apache.pig.data.Tuple;
53
import org.apache.pig.data.Tuple;
46
import org.apache.pig.data.TupleFactory;
54
import org.apache.pig.data.TupleFactory;

    
   
55

   
47
import org.apache.pig.impl.logicalLayer.FrontendException;
56
import org.apache.pig.impl.logicalLayer.FrontendException;
48
import org.apache.pig.impl.util.ObjectSerializer;
57
import org.apache.pig.impl.util.ObjectSerializer;
49
import org.apache.pig.impl.util.StorageUtil;
58
import org.apache.pig.impl.util.StorageUtil;
50
import org.apache.pig.impl.util.UDFContext;
59
import org.apache.pig.impl.util.UDFContext;

    
   
60
import org.apache.pig.impl.util.Utils;

    
   
61

   

    
   
62
import org.apache.pig.parser.ParserException;
51

    
   
63

   
52
/**
64
/**
53
 * CSV loading and storing with support for multi-line fields, 
65
 * CSV loading and storing with support for multi-line fields, 
54
 * and escaping of delimiters and double quotes within fields; 
66
 * and escaping of delimiters and double quotes within fields; 
55
 * uses CSV conventions of Excel 2007.
67
 * uses CSV conventions of Excel 2007.
56
 * 
68
 * 
57
 * Arguments allow for control over:
69
 * Arguments allow for control over:
58
 * <ul>
70
 *
59
 * <li>   Which delimiter is used (default is ',')
71
 * Which field delimiter to use (default = ',')
60
 * <li>   Whether line breaks are allowed inside of fields
72
 * Whether line breaks are allowed inside of fields (YES_MULTILINE = yes, NO_MULTILINE = no, default = no)
61
 * <li>   Whether line breaks are to be written Unix style, of Windows style
73
 * How line breaks are to be written when storing (UNIX = LF, WINDOWS = CRLF, NOCHANGE = system default, default = system default)
62
 * </ul>
74
 * What to do with header rows (first line of each file):
63
 * <b>Usage:</b><br> 
75
 *     On load: READ_INPUT_HEADER = read header rows, SKIP_INPUT_HEADER = do not read header rows, default = read header rows
64
 * 		{@code STORE x INTO '<destFileName>'}<br> 
76
 *     On store: WRITE_OUTPUT_HEADER = write a header row, SKIP_OUTPUT_HEADER = do not write a header row, default = do not write a header row
65
 *      {@code USING CSVExcelStorage(['<delimiter>' [,{'YES_MULTILINE' | 'NO_MULTILINE'} [,{'UNIX' | 'WINDOWS' | 'UNCHANGED'}]]]);}
77
 *
66
 * <p>
78
 * Usage:
67
 *        Defaults are comma, 'NO_MULTILINE', 'UNCHANGED'
79
 *
68
 *        The linebreak parameter is only used during store. During load
80
 * STORE x INTO '<destFileName>'
69
 *        no conversion is performed.                
81
 *         USING org.apache.pig.piggybank.storage.CSVExcelStorage(
70
 *  <p>               
82
 *              [DELIMITER[, 
71
 * <b>Example:</b><br>
83
 *                  {YES_MULTILINE | NO_MULTILINE}[, 
72
 * 	    {@code STORE res INTO '/tmp/result.csv'}<br> 
84
 *                      {UNIX | WINDOWS | NOCHANGE}[, 
73
 *		{@code USING CSVExcelStorage(',', 'NO_MULTILINE', 'WINDOWS');}
85
 *                          {READ_INPUT_HEADER, SKIP_INPUT_HEADER, WRITE_OUTPUT_HEADER, SKIP_OUTPUT_HEADER}]]]]
74
 *<p>
86
 *         );
75
 *			would expect to see comma separated files for load, would

   
76
 *			use comma as field separator during store, would treat

   
77
 *			every newline as a record terminator, and would use CRLF

   
78
 *          as line break characters (0x0d 0x0a: \r\n).

   
79
 * <p>          

   
80
 * <b>Example:</b><br>

   
81
 *      {@code STORE res INTO '/tmp/result.csv'}<br> 

   
82
 *		{@code USING CSVExcelStorage(',', 'YES_MULTILINE');}

   
83
 * <p>

   
84
 *	        would allow newlines inside of fields. During load

   
85
 *	 	    such fields are expected to conform to the Excel

   
86
 *			requirement that the field is enclosed in double quotes.

   
87
 *			On store, the <code>chararray</code> containing the field will accordingly be

   
88
 *			enclosed in double quotes.

   
89
 * <p>

   
90
 * <b>Note:</b><br>

   
91
 *       A danger with enabling multiline fields during load is that unbalanced

   
92
 * 		 double quotes will cause slurping up of input until a balancing double

   
93
 * 		 quote is found, or until something breaks. If you are not expecting

   
94
 * 		 newlines within fields it is therefore more robust to use NO_MULTILINE,

   
95
 * 	     which is the default for that reason.

   
96
 * <p>

   
97
 * Excel expects double quotes within fields to be escaped with a second

   
98
 * double quote. When such an embedding of double quotes is used, Excel

   
99
 * additionally expects the entire fields to be surrounded by double quotes.

   
100
 * This package follows that escape mechanism, rather than the use of

   
101
 * backslash.

   
102
 * <p>

   
103
 * <b>Tested with:</b> Pig 0.8.0, Windows Vista, Excel 2007 SP2 MSO(12.0.6545.5004).

   
104
 *  <p>

   
105
 * <b>Note:</b><br>

   
106
 * 		 When a file with newlines embedded in a field is loaded into Excel,

   
107
 * 		 the application does not automatically vertically enlarge the respective

   
108
 * 		 rows. It is therefore easy to miss when fields consist of multiple lines.

   
109
 * 	     To make the multiline rows clear:<br>

   
110
 * 		 <ul>

   
111
 * 		   <li>Select the column. 

   
112
 * 		   <li>On the home ribbon, activate the Format pull-down menu

   
113
 *         <li>Select Autofit Row Height.

   
114
 *       </ul>

   
115
 * <p> 

   
116
 * <b>Examples:</b>

   
117
 * <br>

   
118
 * 		 With multiline turned on:<br>

   
119
 *           {@code "Conrad\n}<br>

   
120
 *    	     {@code   Emil",Dinger,40}<br>

   
121
 *    	      Is read as {@code (Conrad\nEmil,Dinger,40)}

   
122
 * <p>   	      

   
123
 *         With multiline turned off:<br>   	       

   
124
 *           {@code "Conrad\n}<br>

   
125
 *  	     {@code  Emil",Dinger,40}<br>

   
126
 *    	      is read as<br>

   
127
 *    					 {@code (Conrad)}<br>

   
128
 *    	      			 {@code (Emil,Dinger,40)}

   
129
 * <p>

   
130
 *      Always:

   
131
 *          <ul>

   
132
 *          <li> {@code "Mac ""the knife""",Cohen,30}

   
133
 *            is read as {@code (Mac "the knife",Cohen,30)}

   
134
 *            

   
135
 *     		<li> {@code Jane, "nee, Smith",20}

   
136
 *            Is read as {@code (Jane,nee, Smith,20)}

   
137
 *          </ul>

   
138
 * <p>            

   
139
 *     That is, the escape character is the double quote,

   
140
 *     not backslash.

   
141
 * <p>

   
142
 * <b>Known Issues:</b> 

   
143
 * <ul>

   
144
 * 		<li> When using {@code TAB} {@code{('\t')} as the field delimiter, Excel does not 

   
145
 * 	         properly handle newlines embedded in fields. Maybe there is a trick...

   
146
 *      <li> Excel will only deal properly with embedded newlines if the file

   
147
 *           name does not have a .csv extension. 

   
148
 * </ul>

   
149
 * 
87
 * 
150
 * @author "Andreas Paepcke" <paepcke@cs.stanford.edu". 
88
 * Linebreak settings are only used during store; during load, no conversion is performed.
151
 * 					The load portion is based on Dmitriy V. Ryaboy's CSVLoader,

   
152
 * 					which in turn is loosely based on a version by James Kebinger.

   
153
 *
89
 *

    
   
90
 * WARNING: A danger with enabling multiline fields during load is that unbalanced

    
   
91
 *          double quotes will cause slurping up of input until a balancing double

    
   
92
 *          quote is found, or until something breaks. If you are not expecting

    
   
93
 *          newlines within fields it is therefore more robust to use NO_MULTILINE,

    
   
94
 *          which is the default for that reason.

    
   
95
 * 

    
   
96
 * This is Adreas Paepcke's <paepcke@cs.stanford.edu> CSVExcelStorage with a few modifications.
154
 */
97
 */
155

    
   
98

   
156
public class CSVExcelStorage extends PigStorage implements StoreFuncInterface, LoadPushDown {
99
public class CSVExcelStorage extends PigStorage implements StoreFuncInterface, LoadPushDown {
157

    
   
100

   
158
	public static enum Linebreaks {UNIX, WINDOWS, NOCHANGE};
101
    public static enum Linebreaks { UNIX, WINDOWS, NOCHANGE };
159
	public static enum Multiline {YES, NO};
102
    public static enum Multiline { YES, NO };
160
	
103
    public static enum Headers { DEFAULT, READ_INPUT_HEADER, SKIP_INPUT_HEADER, WRITE_OUTPUT_HEADER, SKIP_OUTPUT_HEADER }

    
   
104

   
161
	protected final static byte LINEFEED = '\n';
105
    protected final static byte LINEFEED = '\n';
162
	protected final static byte NEWLINE = '\r';
106
    protected final static byte NEWLINE = '\r';
163
    protected final static byte DOUBLE_QUOTE = '"';
107
    protected final static byte DOUBLE_QUOTE = '"';
164
	protected final static byte RECORD_DEL = LINEFEED;
108
    protected final static byte RECORD_DEL = LINEFEED;
165
	
109

   
166
	private static final byte FIELD_DEL_DEFAULT = ',';
110
    private static final String FIELD_DELIMITER_DEFAULT_STR = ",";
167
	private static final String MULTILINE_DEFAULT_STR = "NOMULTILINE";
111
    private static final String MULTILINE_DEFAULT_STR = "NO_MULTILINE";
168
	private static final String LINEBREAKS_DEFAULT_STR = "NOCHANGE";
112
    private static final String EOL_DEFAULT_STR = "NOCHANGE";
169
	private static final Multiline MULTILINE_DEFAULT = Multiline.NO;
113
    private static final String HEADER_DEFAULT_STR = "DEFAULT";
170
	private static final Linebreaks LINEBREAKS_DEFAULT = Linebreaks.NOCHANGE;

   
171
    
114
    
172
	long end = Long.MAX_VALUE;
115
    long end = Long.MAX_VALUE;
173

    
   
116

   
174
	private byte fieldDelimiter = FIELD_DEL_DEFAULT;
117
    private byte fieldDelimiter = ',';
175
	private Linebreaks eolTreatment = LINEBREAKS_DEFAULT;
118
    private Multiline multilineTreatment = Multiline.NO;
176
	private Multiline multilineTreatment = MULTILINE_DEFAULT;
119
    private Linebreaks eolTreatment = Linebreaks.NOCHANGE;
177
	
120
    private Headers headerTreatment = Headers.DEFAULT;

    
   
121

   
178
    private ArrayList<Object> mProtoTuple = null;
122
    private ArrayList<Object> mProtoTuple = null;
179
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
123
    private TupleFactory mTupleFactory = TupleFactory.getInstance();
180
    private String signature;
124
    private String udfContextSignature;
181
    private String loadLocation;
125
    private String loadLocation;
182
    private boolean[] mRequiredColumns = null;
126
    private boolean[] mRequiredColumns = null;
183
    private boolean mRequiredColumnsInitialized = false;
127
    private boolean mRequiredColumnsInitialized = false;
184
    
128
    
185
	final Logger logger = Logger.getLogger(getClass().getName());
129
    final Logger logger = Logger.getLogger(getClass().getName());
186
	

   
187
	// Counters for in an out records:

   
188
	/*   CODE FOR COUNTERS. Also, look for CSVRecords.READ, and CSVRecords.WRITTEN for the other pieces. 

   
189
	protected static enum CSVRecords {

   
190
		READ,

   
191
		WRITTEN

   
192
	};

   
193
	PigStatusReporter reporter = PigStatusReporter.getInstance();

   
194

    
   
130

   
195
	*/

   
196
	

   
197
	@SuppressWarnings("rawtypes")
131
    @SuppressWarnings("rawtypes")
198
    protected RecordReader in = null;    
132
    protected RecordReader in = null;    
199

    
   
133

   
200
	// For replacing LF with CRLF (Unix --> Windows end-of-line convention):
134
    // For replacing LF with CRLF (Unix --> Windows end-of-line convention):
201
	Pattern loneLFDetectorPattern = Pattern.compile("([^\r])\n", Pattern.DOTALL | Pattern.MULTILINE);
135
    Pattern loneLFDetectorPattern = Pattern.compile("([^\r])\n", Pattern.DOTALL | Pattern.MULTILINE);
202
	Matcher loneLFDetector = loneLFDetectorPattern.matcher("");
136
    Matcher loneLFDetector = loneLFDetectorPattern.matcher("");
203
	
137

   
204
	// For removing CR (Windows --> Unix):
138
    // For removing CR (Windows --> Unix):
205
	Pattern CRLFDetectorPattern = Pattern.compile("\r\n", Pattern.DOTALL | Pattern.MULTILINE);
139
    Pattern CRLFDetectorPattern = Pattern.compile("\r\n", Pattern.DOTALL | Pattern.MULTILINE);
206
	Matcher CRLFDetector = CRLFDetectorPattern.matcher("");
140
    Matcher CRLFDetector = CRLFDetectorPattern.matcher("");
207

    
   
141

   
208
	
142

   
209
	// Pig Storage with COMMA as delimiter:
143
    // Pig Storage with COMMA as delimiter:
210
		TupleFactory tupleMaker = TupleFactory.getInstance();
144
    TupleFactory tupleMaker = TupleFactory.getInstance();
211
	private boolean getNextInQuotedField;
145
    private boolean getNextInQuotedField;
212
	private int getNextFieldID;
146
    private int getNextFieldID;
213
	private boolean nextTupleSkipChar;
147
    private boolean nextTupleSkipChar;
214
	
148

   

    
   
149
    // For handling headers

    
   
150
    private boolean loadingFirstRecord = true;

    
   
151
    private boolean storingFirstRecord = true;

    
   
152
    private String header = null;

    
   
153
    private int splitIndex;

    
   
154

   

    
   
155
    private static final String SCHEMA_SIGNATURE = "pig.csvexcelstorage.schema";

    
   
156
    protected ResourceSchema schema = null;

    
   
157

   
215
	/*-----------------------------------------------------
158
    /*-----------------------------------------------------
216
	| Constructors 
159
    | Constructors 
217
	------------------------*/
160
    ------------------------*/
218
		
161

   
219
		

   
220
    /**

   
221
     * Constructs a CSVExcel load/store that uses {@code comma} as the

   
222
     * field delimiter, terminates records on reading a newline

   
223
     * within a field (even if the field is enclosed in double quotes),

   
224
     * and uses {@code LF} as line terminator. 

   
225
     * 

   
226
     */

   
227
    public CSVExcelStorage() {
162
    public CSVExcelStorage() {
228
    	super(new String(new byte[] {FIELD_DEL_DEFAULT}));
163
        super(new String(new byte[] { (byte) ',' }));

    
   
164
        initializeInstance(FIELD_DELIMITER_DEFAULT_STR, MULTILINE_DEFAULT_STR, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
229
    }
165
    }
230
    
166
    
231
    /**

   
232
     * Constructs a CSVExcel load/store that uses specified string as a field delimiter.

   
233
     * 

   
234
     * @param delimiter

   
235
     *            the single byte character that is used to separate fields.

   
236
     *            ("," is the default.)

   
237
     */

   
238
    public CSVExcelStorage(String delimiter) {
167
    public CSVExcelStorage(String delimiter) {
239
    	super(delimiter);
168
        super(delimiter);
240
        initializeInstance(delimiter, MULTILINE_DEFAULT_STR, LINEBREAKS_DEFAULT_STR);
169
        initializeInstance(delimiter, MULTILINE_DEFAULT_STR, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
241
    }
170
    }
242
		
171

   
243
    /**
172
    public CSVExcelStorage(String delimiter, String multilineTreatmentStr) {
244
     * Constructs a CSVExcel load/store that uses specified string 

   
245
     * as a field delimiter, and allows specification whether to handle

   
246
     * line breaks within fields. 

   
247
     * <ul>

   
248
     * 		<li> For NO_MULTILINE, every line break 

   
249
     * 			 will be considered an end-of-record. 

   
250
     * 		<li> For YES_MULTILINE, fields may include newlines, 

   
251
     * 			 as long as the fields are enclosed in 

   
252
     * 			 double quotes.

   
253
     * </ul>  

   
254
     * <b>Pig example:</b><br>

   
255
     * {@code STORE a INTO '/tmp/foo.csv'}<br>

   
256
     * {@code USING org.apache.pig.piggybank.storage.CSVExcelStorage(",", "YES_MULTILINE");} 

   
257
     * 

   
258
     * @param delimiter

   
259
     *            the single byte character that is used to separate fields.

   
260
     *            ("," is the default.)

   
261
     * @param multilineTreatment

   
262
     * 			  "YES_MULTILINE" or "NO_MULTILINE"

   
263
     *            ("NO_MULTILINE is the default.)

   
264
     */

   
265
    public CSVExcelStorage(String delimiter, String multilineTreatment) {

   
266
    	super(delimiter);
173
        super(delimiter);
267
        initializeInstance(delimiter, multilineTreatment, LINEBREAKS_DEFAULT_STR);
174
        initializeInstance(delimiter, multilineTreatmentStr, EOL_DEFAULT_STR, HEADER_DEFAULT_STR);
268
    }
175
    }
269

    
   
176

   
270
    /**
177
    public CSVExcelStorage(String delimiter, String multilineTreatmentStr, String eolTreatmentStr) {
271
     * Constructs a CSVExcel load/store that uses specified string 

   
272
     * as a field delimiter, provides choice whether to manage multiline 

   
273
     * fields, and specifies chars used for end of line.

   
274
     * <p> 

   
275
     * The eofTreatment parameter is only relevant for STORE():

   
276
     * <ul>

   
277
     * <li>      For "UNIX", newlines will be stored as LF chars

   
278
     * <li>      For "WINDOWS", newlines will be stored as CRLF

   
279
     * </ul>

   
280
     * <b>Pig example:</b><br>

   
281
     * {@code STORE a INTO '/tmp/foo.csv'}<br>

   
282
     * {@code USING org.apache.pig.piggybank.storage.CSVExcelStorage(",", "NO_MULTILINE", "WINDOWS");} 

   
283
     * 

   
284
     * @param delimiter

   
285
     *            the single byte character that is used to separate fields.

   
286
     *            ("," is the default.)

   
287
     * @param String 

   
288
     * 			  "YES_MULTILINE" or "NO_MULTILINE"

   
289
     *            ("NO_MULTILINE is the default.)

   
290
     * @param eolTreatment

   
291
     * 			  "UNIX", "WINDOWS", or "NOCHANGE" 

   
292
     *            ("NOCHANGE" is the default.)

   
293
     */

   
294
    public CSVExcelStorage(String delimiter, String multilineTreatment,  String eolTreatment) {

   
295
    	super(delimiter);
178
        super(delimiter);
296
    	initializeInstance(delimiter, multilineTreatment, eolTreatment);
179
        initializeInstance(delimiter, multilineTreatmentStr, eolTreatmentStr, HEADER_DEFAULT_STR);
297
    }
180
    }
298

    
   
181

   

    
   
182
    public CSVExcelStorage(String delimiter, String multilineTreatmentStr, String eolTreatmentStr, String headerTreatmentStr) {

    
   
183
        super(delimiter);

    
   
184
        initializeInstance(delimiter, multilineTreatmentStr, eolTreatmentStr, headerTreatmentStr);

    
   
185
    }
299
    
186
    
300
    private void initializeInstance(String delimiter, String multilineStr, String theEofTreatment) {
187
    private void initializeInstance(String delimiter, String multilineTreatmentStr, String eolTreatmentStr, String headerTreatmentStr) {
301
        fieldDelimiter = StorageUtil.parseFieldDel(delimiter);
188
        fieldDelimiter = StorageUtil.parseFieldDel(delimiter);
302
        multilineTreatment = canonicalizeMultilineTreatmentRequest(multilineStr);
189

   
303
        eolTreatment = canonicalizeEOLTreatmentRequest(theEofTreatment);
190
        multilineTreatment = canonicalizeMultilineTreatmentRequest(multilineTreatmentStr);

    
   
191
        eolTreatment = canonicalizeEOLTreatmentRequest(eolTreatmentStr);

    
   
192
        headerTreatment = canonicalizeHeaderTreatmentRequest(headerTreatmentStr);
304
    }
193
    }
305
    
194
    
306
    private Multiline canonicalizeMultilineTreatmentRequest(String theMultilineStr) {
195
    private Multiline canonicalizeMultilineTreatmentRequest(String multilineTreatmentStr) {
307
    	if (theMultilineStr.equalsIgnoreCase("YES_MULTILINE")) {
196
        if (multilineTreatmentStr.equalsIgnoreCase("YES_MULTILINE"))
308
    		return Multiline.YES;
197
            return Multiline.YES;
309
    	}
198
        else if (multilineTreatmentStr.equalsIgnoreCase("NO_MULTILINE"))
310
    	if (theMultilineStr.equalsIgnoreCase("NO_MULTILINE")) {

   
311
    		return Multiline.NO;
199
            return Multiline.NO;

    
   
200

   

    
   
201
        throw new IllegalArgumentException(

    
   
202
                "Unrecognized multiline treatment argument " + multilineTreatmentStr + ". " +

    
   
203
                "Should be either 'YES_MULTILINE' or 'NO_MULTILINE'");
312
    	}
204
    }
313
    	return MULTILINE_DEFAULT;

   
314
    }

   
315
    
205
    
316
    private Linebreaks canonicalizeEOLTreatmentRequest (String theEolTreatmentStr) {
206
    private Linebreaks canonicalizeEOLTreatmentRequest(String eolTreatmentStr) {
317
    	if (theEolTreatmentStr.equalsIgnoreCase("Unix"))
207
        if (eolTreatmentStr.equalsIgnoreCase("UNIX"))
318
    		return Linebreaks.UNIX;
208
            return Linebreaks.UNIX;
319
    	if (theEolTreatmentStr.equalsIgnoreCase("Windows"))
209
        else if (eolTreatmentStr.equalsIgnoreCase("WINDOWS"))
320
    		return Linebreaks.WINDOWS;
210
            return Linebreaks.WINDOWS;
321
    	return LINEBREAKS_DEFAULT;
211
        else if (eolTreatmentStr.equalsIgnoreCase("NOCHANGE"))

    
   
212
            return Linebreaks.NOCHANGE;

    
   
213

   

    
   
214
        throw new IllegalArgumentException(

    
   
215
                "Unrecognized end-of-line treatment argument " + eolTreatmentStr + ". " +

    
   
216
                "Should be one of 'UNIX', 'WINDOWS', or 'NOCHANGE'");

    
   
217
    }

    
   
218

   

    
   
219
    private Headers canonicalizeHeaderTreatmentRequest(String headerTreatmentStr) {

    
   
220
        if (headerTreatmentStr.equalsIgnoreCase("DEFAULT"))

    
   
221
            return Headers.DEFAULT;

    
   
222
        else if (headerTreatmentStr.equalsIgnoreCase("READ_INPUT_HEADER"))

    
   
223
            return Headers.READ_INPUT_HEADER;

    
   
224
        else if (headerTreatmentStr.equalsIgnoreCase("SKIP_INPUT_HEADER"))

    
   
225
            return Headers.SKIP_INPUT_HEADER;

    
   
226
        else if (headerTreatmentStr.equalsIgnoreCase("WRITE_OUTPUT_HEADER"))

    
   
227
            return Headers.WRITE_OUTPUT_HEADER;

    
   
228
        else if (headerTreatmentStr.equalsIgnoreCase("SKIP_OUTPUT_HEADER"))

    
   
229
            return Headers.SKIP_OUTPUT_HEADER;

    
   
230

   

    
   
231
        throw new IllegalArgumentException(

    
   
232
            "Unrecognized header treatment argument " + headerTreatmentStr + ". " +

    
   
233
            "Should be one of 'DEFAULT', 'READ_INPUT_HEADER', 'SKIP_INPUT_HEADER', 'WRITE_OUTPUT_HEADER', 'SKIP_OUTPUT_HEADER'");
322
    }
234
    }
323
    
235
    
324
    // ---------------------------------------- STORAGE -----------------------------
236
    // ---------------------------------------- STORAGE -----------------------------
325
    
237
    
326
	/*-----------------------------------------------------
238
    public void checkSchema(ResourceSchema s) throws IOException {
327
	| putNext()
239
        // Not actually checking schema
328
	------------------------*/
240
        // Actually, just storing it to use in the backend
329
				
241
        

    
   
242
        UDFContext udfc = UDFContext.getUDFContext();

    
   
243
        Properties p =

    
   
244
            udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });

    
   
245
        p.setProperty(SCHEMA_SIGNATURE, s.toString());

    
   
246
    }

    
   
247

   

    
   
248
    public void prepareToWrite(RecordWriter writer) {

    
   
249
        // Get the schema string from the UDFContext object.

    
   
250
        UDFContext udfc = UDFContext.getUDFContext();

    
   
251
        Properties p =

    
   
252
            udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });

    
   
253

   

    
   
254
        String strSchema = p.getProperty(SCHEMA_SIGNATURE);

    
   
255
        if (strSchema != null) {

    
   
256
            // Parse the schema from the string stored in the properties object.

    
   
257
            try {

    
   
258
                schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));

    
   
259
            } catch (ParserException pex) {}

    
   
260
        }

    
   
261

   

    
   
262
        if (headerTreatment == Headers.DEFAULT) {

    
   
263
            headerTreatment = Headers.SKIP_OUTPUT_HEADER;

    
   
264
        }

    
   
265

   

    
   
266
        // PigStorage's prepareToWrite()

    
   
267
        super.prepareToWrite(writer);

    
   
268
    }

    
   
269

   
330
    /* (non-Javadoc)
270
    /* (non-Javadoc)
331
     * @see org.apache.pig.builtin.PigStorage#putNext(org.apache.pig.data.Tuple)
271
     * @see org.apache.pig.builtin.PigStorage#putNext(org.apache.pig.data.Tuple)
332
     * 
272
     * 
333
     * Given a tuple that corresponds to one record, write
273
     * Given a tuple that corresponds to one record, write
334
     * it out as CSV, converting among Unix/Windows line
274
     * it out as CSV, converting among Unix/Windows line
335
     * breaks as requested in the instantiation. Also take
275
     * breaks as requested in the instantiation. Also take
336
     * care of escaping field delimiters, double quotes,
276
     * care of escaping field delimiters, double quotes,
337
     * and linebreaks embedded within fields,
277
     * and linebreaks embedded within fields,
338
     * 
278
     * 
339
     */
279
     */
340
    @Override
280
    @Override
341
    public void putNext(Tuple tupleToWrite) throws IOException {
281
    public void putNext(Tuple tupleToWrite) throws IOException {
342
    	
282
        // If WRITE_OUTPUT_HEADER, store a header record with the names of each field

    
   
283
        if (storingFirstRecord && headerTreatment == Headers.WRITE_OUTPUT_HEADER && schema != null) {

    
   
284
            ArrayList<Object> headerProtoTuple = new ArrayList<Object>();

    
   
285
            ResourceFieldSchema[] fields = schema.getFields();

    
   
286
            for (ResourceFieldSchema field : fields) {

    
   
287
                headerProtoTuple.add(field.getName());

    
   
288
            }

    
   
289
            super.putNext(tupleMaker.newTuple(headerProtoTuple));

    
   
290
        }

    
   
291
        storingFirstRecord = false;

    
   
292

   
343
    	ArrayList<Object> mProtoTuple = new ArrayList<Object>();
293
        ArrayList<Object> mProtoTuple = new ArrayList<Object>();
344
    	int embeddedNewlineIndex = -1;
294
        int embeddedNewlineIndex = -1;
345
    	String fieldStr = null;
295
        String fieldStr = null;
346
    	// For good debug messages:
296
        // For good debug messages:
347
    	int fieldCounter = -1;
297
        int fieldCounter = -1;
348
    	
298
        
349
    	// Do the escaping:
299
        // Do the escaping:
350
    	for (Object field : tupleToWrite.getAll()) {
300
        for (Object field : tupleToWrite.getAll()) {
351
    		fieldCounter++;
301
            fieldCounter++;

    
   
302

   

    
   
303
            // Modified from Pig 0.10 version to address PIG-2470
352
    		if (field == null) {
304
            if (field == null) {
353
    			logger.warn("Field " + fieldCounter + " within tuple '" + tupleToWrite + "' is null.");
305
                mProtoTuple.add("");
354
    			return;
306
                continue;
355
    		}
307
            }
356
    		
308
            
357
    		fieldStr = field.toString();
309
            fieldStr = field.toString();
358
    		
310
            
359
    		// Embedded double quotes are replaced by two double quotes:
311
            // Embedded double quotes are replaced by two double quotes:
360
    		fieldStr = fieldStr.replaceAll("[\"]", "\"\"");
312
            fieldStr = fieldStr.replaceAll("[\"]", "\"\"");
361
    		
313
            
362
    		// If any field delimiters are in the field, or if we did replace
314
            // If any field delimiters are in the field, or if we did replace
363
    		// any double quotes with a pair of double quotes above,
315
            // any double quotes with a pair of double quotes above,
364
    		// or if the string includes a newline character (LF:\n:0x0A)
316
            // or if the string includes a newline character (LF:\n:0x0A)
365
    		// and we are to allow newlines in fields,
317
            // and we are to allow newlines in fields,
366
    		// then the entire field must be enclosed in double quotes:
318
            // then the entire field must be enclosed in double quotes:
367
    		embeddedNewlineIndex =  fieldStr.indexOf(LINEFEED);
319
            embeddedNewlineIndex =  fieldStr.indexOf(LINEFEED);
368
    		
320
            
369
    		if ((fieldStr.indexOf(fieldDelimiter) != -1) || 
321
            if ((fieldStr.indexOf(fieldDelimiter) != -1) || 
370
    			(fieldStr.indexOf(DOUBLE_QUOTE) != -1) ||
322
                (fieldStr.indexOf(DOUBLE_QUOTE) != -1) ||
371
    			(multilineTreatment == Multiline.YES) && (embeddedNewlineIndex != -1))  {
323
                (multilineTreatment == Multiline.YES) && (embeddedNewlineIndex != -1))  {
372
    			fieldStr = "\"" + fieldStr + "\"";
324
                fieldStr = "\"" + fieldStr + "\"";
373
    		}
325
            }
374
    		
326
            
375
    		// If requested: replace any Linefeed-only (^J), with LF-Newline (^M^J),
327
            // If requested: replace any Linefeed-only (^J), with LF-Newline (^M^J),
376
    		// This is needed for Excel to recognize a field-internal 
328
            // This is needed for Excel to recognize a field-internal 
377
    		// new line:
329
            // new line:
378

    
   
330

   
379
    		if ((eolTreatment != Linebreaks.NOCHANGE) && (embeddedNewlineIndex != -1)) {
331
            if ((eolTreatment != Linebreaks.NOCHANGE) && (embeddedNewlineIndex != -1)) {
[+20] [20] 5 lines
[+20] private Linebreaks canonicalizeEOLTreatmentRequest (String theEolTreatmentStr) { public void prepareToWrite(RecordWriter writer) {
385
    				CRLFDetector.reset(fieldStr);
337
                    CRLFDetector.reset(fieldStr);
386
    				fieldStr = CRLFDetector.replaceAll("\n");
338
                    fieldStr = CRLFDetector.replaceAll("\n");
387
    			}
339
                }
388
    		}
340
            }
389

    
   
341

   
390
    		mProtoTuple.add(fieldStr);    		
342
            mProtoTuple.add(fieldStr);          
391
    	}
343
        }
392
    	// If Windows line breaks are requested, append 
344
        // If Windows line breaks are requested, append 
393
    	// a newline (0x0D a.k.a. ^M) to the last field
345
        // a newline (0x0D a.k.a. ^M) to the last field
394
    	// so that the row termination will end up being
346
        // so that the row termination will end up being
395
    	// \r\n, once the superclass' putNext() method
347
        // \r\n, once the superclass' putNext() method
396
    	// is done below:
348
        // is done below:
397

    
   
349

   
398
    	if ((eolTreatment == Linebreaks.WINDOWS) && (fieldStr != null))
350
        if ((eolTreatment == Linebreaks.WINDOWS) && (fieldStr != null))
399
    		mProtoTuple.set(mProtoTuple.size() - 1, fieldStr + "\r"); 
351
            mProtoTuple.set(mProtoTuple.size() - 1, fieldStr + "\r"); 
400

    
   
352

   
401
    	Tuple resTuple = tupleMaker.newTuple(mProtoTuple);
353
        Tuple resTuple = tupleMaker.newTuple(mProtoTuple);
402
    	// Checking first for debug enabled is faster:

   
403
    	if (logger.isDebugEnabled())

   
404
    		logger.debug("Row: " + resTuple);

   
405
    	super.putNext(resTuple);
354
        super.putNext(resTuple);
406
        // Increment number of records written (Couldn't get counters to work. So commented out.
355
    }
407
    	//if (reporter != null)
356

   
408
    	//	reporter.getCounter(CSVRecords.WRITTEN).increment(1L);
357
    // ---------------------------------------- LOADING  -----------------------------  
409
    }
358

   
410
	

   
411
    // ---------------------------------------- LOADING  -----------------------------	

   
412
	

   
413
    /* (non-Javadoc)
359
    /* (non-Javadoc)
414
     * @see org.apache.pig.builtin.PigStorage#getNext()
360
     * @see org.apache.pig.builtin.PigStorage#getNext()
415
     */
361
     */
416
    @Override
362
    @Override
417
    public Tuple getNext() throws IOException {
363
    public Tuple getNext() throws IOException {

    
   
364
        // If SKIP_INPUT_HEADER and this is the first input split, skip header record

    
   
365
        // We store its value as a string though, so we can compare

    
   
366
        // further records to it. If they are the same (this would 

    
   
367
        // happen if multiple small files each with a header were combined

    
   
368
        // into one split), we know to skip the duplicate header record as well.

    
   
369
        if (loadingFirstRecord && headerTreatment == Headers.SKIP_INPUT_HEADER && splitIndex == 0) {

    
   
370
            try {

    
   
371
                if (!in.nextKeyValue())

    
   
372
                    return null;

    
   
373
                header = ((Text) in.getCurrentValue()).toString();

    
   
374
            } catch (InterruptedException e) {

    
   
375
                int errCode = 6018;

    
   
376
                String errMsg = "Error while reading input";

    
   
377
                throw new ExecException(errMsg, errCode, 

    
   
378
                        PigException.REMOTE_ENVIRONMENT, e);

    
   
379
            }

    
   
380
        }

    
   
381
        loadingFirstRecord = false;

    
   
382

   
418
        mProtoTuple = new ArrayList<Object>();
383
        mProtoTuple = new ArrayList<Object>();
419

    
   
384

   
420
        getNextInQuotedField = false;
385
        getNextInQuotedField = false;
421
        boolean evenQuotesSeen = true;
386
        boolean evenQuotesSeen = true;
422
        boolean sawEmbeddedRecordDelimiter = false;
387
        boolean sawEmbeddedRecordDelimiter = false;
423
        byte[] buf = null;
388
        byte[] buf = null;
424
        
389
        
425
        if (!mRequiredColumnsInitialized) {
390
        if (!mRequiredColumnsInitialized) {
426
            if (signature != null) {
391
            if (udfContextSignature != null) {
427
                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
392
                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
428
                mRequiredColumns = (boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
393
                mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p.getProperty(udfContextSignature));
429
            }
394
            }
430
            mRequiredColumnsInitialized = true;
395
            mRequiredColumnsInitialized = true;
431
        }
396
        }
432
        // Note: we cannot factor out the check for nextKeyValue() being null,
397
        // Note: we cannot factor out the check for nextKeyValue() being null,
433
        // because that call overwrites buf with the new line, which is
398
        // because that call overwrites buf with the new line, which is
434
        // bad if we have a field with a newline.
399
        // bad if we have a field with a newline.
435

    
   
400

   
436
        try {
401
        try {
437
        	int recordLen = 0;
402
            int recordLen = 0;
438
        	getNextFieldID = 0;
403
            getNextFieldID = 0;
439
        	
404
            
440
        	while (sawEmbeddedRecordDelimiter || getNextFieldID == 0) {
405
            while (sawEmbeddedRecordDelimiter || getNextFieldID == 0) {
441
        		Text value = null;
406
                Text value = null;
442
        		if (sawEmbeddedRecordDelimiter) {
407
                if (sawEmbeddedRecordDelimiter) {
443
        			
408
                    
444
        			// Deal with pulling more records from the input, because
409
                    // Deal with pulling more records from the input, because
445
        			// a double quoted embedded newline was encountered in a field.
410
                    // a double quoted embedded newline was encountered in a field.
446
        			// Save the length of the record so far, plus one byte for the 
411
                    // Save the length of the record so far, plus one byte for the 
447
        			// record delimiter (usually newline) that's embedded in the field 
412
                    // record delimiter (usually newline) that's embedded in the field 
448
        			// we were working on before falling into this branch:
413
                    // we were working on before falling into this branch:
449
        			int prevLineLen = recordLen + 1;
414
                    int prevLineLen = recordLen + 1;
450
        			
415
                    
451
        			// Save previous line (the one with the field that has the newline) in a new array.
416
                    // Save previous line (the one with the field that has the newline) in a new array.
452
        			// The last byte will be random; we'll fill in the embedded
417
                    // The last byte will be random; we'll fill in the embedded
453
        			// record delimiter (usually newline) below:
418
                    // record delimiter (usually newline) below:
454
        			byte[] prevLineSaved = Arrays.copyOf(buf, prevLineLen);
419
                    byte[] prevLineSaved = Arrays.copyOf(buf, prevLineLen);
455
        			prevLineSaved[prevLineLen - 1] = RECORD_DEL;
420
                    prevLineSaved[prevLineLen - 1] = RECORD_DEL;
456
        			
421
                    
457
        			// Read the continuation of the record, unless EOF:
422
                    // Read the continuation of the record, unless EOF:
458
        			if (!in.nextKeyValue()) {
423
                    if (!in.nextKeyValue()) {
459
        				return null;
424
                        return null;
460
        			}                                                                                           
425
                    }                                                                                           
461
        			value = (Text) in.getCurrentValue();
426
                    value = (Text) in.getCurrentValue();
462
        			recordLen = value.getLength();
427
                    recordLen = value.getLength();
463
        			// Grab the continuation's bytes:
428
                    // Grab the continuation's bytes:
464
        			buf = value.getBytes();
429
                    buf = value.getBytes();
465
        			
430
                    
466
        			// Combine the previous line and the continuation into a new array.
431
                    // Combine the previous line and the continuation into a new array.
467
        			// The following copyOf() does half the job: it allocates all the
432
                    // The following copyOf() does half the job: it allocates all the
468
        			// space, and also copies the previous line into that space:
433
                    // space, and also copies the previous line into that space:
469
        			byte[] prevLineAndContinuation = Arrays.copyOf(prevLineSaved, prevLineLen + recordLen);
434
                    byte[] prevLineAndContinuation = Arrays.copyOf(prevLineSaved, prevLineLen + recordLen);
470
        			
435
                    
471
        			

   
472
        			// Now append the continuation. Parms: fromBuf, fromStartPos, toBuf, toStartPos, lengthToCopy:
436
                    // Now append the continuation. Parms: fromBuf, fromStartPos, toBuf, toStartPos, lengthToCopy:
473
        			System.arraycopy(buf, 0, prevLineAndContinuation, prevLineLen, recordLen);
437
                    System.arraycopy(buf, 0, prevLineAndContinuation, prevLineLen, recordLen);
474
        			
438
                    
475
        			// We'll work with the combination now:
439
                    // We'll work with the combination now:
476
        			buf = prevLineAndContinuation;
440
                    buf = prevLineAndContinuation;
477
        			
441
                    
478
        			// Do the whole record over from the start:
442
                    // Do the whole record over from the start:
479
        			mProtoTuple.clear();
443
                    mProtoTuple.clear();
480
        			getNextInQuotedField = false;
444
                    getNextInQuotedField = false;
481
        			evenQuotesSeen = true;
445
                    evenQuotesSeen = true;
482
        			getNextFieldID = 0;
446
                    getNextFieldID = 0;
483
        			recordLen = prevLineAndContinuation.length;
447
                    recordLen = prevLineAndContinuation.length;
484
        			
448
                    
485
        		} else {
449
                } else {
486
        			// Previous record finished cleanly: start with the next record,
450
                    // Previous record finished cleanly: start with the next record,
487
        			// unless EOF:
451
                    // unless EOF:
488
        			if (!in.nextKeyValue()) {
452
                    if (!in.nextKeyValue()) {
489
        				return null;
453
                        return null;
490
        			}                                                                                           
454
                    }                                                                                           
491
        			value = (Text) in.getCurrentValue();
455
                    value = (Text) in.getCurrentValue();

    
   
456

   

    
   
457
                    // if the line is a duplicate header and 'SKIP_INPUT_HEADER' is set, ignore it

    
   
458
                    // (this might happen if multiple files each with a header are combined into a single split)

    
   
459
                    if (headerTreatment == Headers.SKIP_INPUT_HEADER && value.toString().equals(header)) {

    
   
460
                        if (!in.nextKeyValue())

    
   
461
                            return null;

    
   
462
                        value = (Text) in.getCurrentValue();

    
   
463
                    }

    
   
464

   
492
        			buf = value.getBytes();
465
                    buf = value.getBytes();
493
        			getNextFieldID = 0;
466
                    getNextFieldID = 0;
494
        			recordLen = value.getLength();
467
                    recordLen = value.getLength();
495
        		}
468
                }
496
        		
469
                
497
        		nextTupleSkipChar = false;
470
                nextTupleSkipChar = false;
498

    
   
471

   
499
        		ByteBuffer fieldBuffer = ByteBuffer.allocate(recordLen);
472
                ByteBuffer fieldBuffer = ByteBuffer.allocate(recordLen);
500

    
   
473

   
501
                        sawEmbeddedRecordDelimiter = processOneInRecord(evenQuotesSeen,
474
                sawEmbeddedRecordDelimiter = processOneInRecord(evenQuotesSeen,
[+20] [20] 14 lines
[+20] private Linebreaks canonicalizeEOLTreatmentRequest (String theEolTreatmentStr) { public void prepareToWrite(RecordWriter writer) {
516
        	throw new ExecException(errMsg, errCode, 
489
            throw new ExecException(errMsg, errCode, 
517
        			PigException.REMOTE_ENVIRONMENT, e);
490
                    PigException.REMOTE_ENVIRONMENT, e);
518
        }
491
        }
519

    
   
492

   
520
        Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
493
        Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
521
        // Increment number of records read (Couldn't get counters to work. So commented out.

   
522
        //if (reporter != null)

   
523
        //	reporter.getCounter(CSVRecords.READ).increment(1L);

   
524
        return t;
494
        return t;
525
    }
495
    }
526

    
   
496

   
527
	/*
497
    /*
528
	 * Service method for getNext().
498
     * Service method for getNext().
[+20] [20] 52 lines
[+20] [+] private boolean processOneInRecord(boolean evenQuotesSeen,
581
						fieldBuffer.put(b);
551
                        fieldBuffer.put(b);
582
						nextTupleSkipChar = true;
552
                        nextTupleSkipChar = true;
583
						continue;
553
                        continue;
584
					}
554
                    }
585
					evenQuotesSeen = !evenQuotesSeen;
555
                    evenQuotesSeen = !evenQuotesSeen;

    
   
556

   

    
   
557
                    // Modified from Pig 0.10 version to fix a bug

    
   
558
                    // where if the last field in a record was quoted, 

    
   
559
                    // the end of the record would not be detected

    
   
560
                    if (!evenQuotesSeen && i == recordLen - 1) {

    
   
561
                        getNextInQuotedField = false;

    
   
562
                    }

    
   
563

   
586
					if (evenQuotesSeen) {
564
                    if (evenQuotesSeen) {
587
						fieldBuffer.put(DOUBLE_QUOTE);
565
                        fieldBuffer.put(DOUBLE_QUOTE);
588
					}
566
                    }
589
				} else
567
                } else
590
					if (!evenQuotesSeen &&
568
                    if (!evenQuotesSeen &&
591
							(b == fieldDelimiter || b == RECORD_DEL)) {
569
                            (b == fieldDelimiter || b == RECORD_DEL)) {
592
						getNextInQuotedField = false;
570
                        getNextInQuotedField = false;
593
						readField(fieldBuffer, getNextFieldID++);
571
                        readField(fieldBuffer, getNextFieldID++);
594
					} else {
572
                    } else {
595
						fieldBuffer.put(b);
573
                        fieldBuffer.put(b);
596
					}
574
                    }
597
			} else if (b == DOUBLE_QUOTE) {
575
            } else if (b == DOUBLE_QUOTE) {
598
				// Does a double quote immediately follow?                	
576
                // Does a double quote immediately follow?                  
599
				if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
577
                if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
600
					fieldBuffer.put(b);
578
                    fieldBuffer.put(b);
601
					nextTupleSkipChar = true;
579
                    nextTupleSkipChar = true;
602
					continue;
580
                    continue;
603
				}
581
                }
[+20] [20] 38 lines
[+20] [+] public InputFormat getInputFormat() {
642
    }
620
    }
643

    
   
621

   
644
    @Override
622
    @Override
645
    public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {
623
    public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {
646
        in = reader;
624
        in = reader;

    
   
625
        splitIndex = split.getSplitIndex();

    
   
626
        

    
   
627
        if (headerTreatment == Headers.DEFAULT) {

    
   
628
            headerTreatment = Headers.READ_INPUT_HEADER;

    
   
629
        }
647
    }
630
    }
648

    
   
631

   
649
    @Override
632
    @Override
650
    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
633
    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
651
        if (requiredFieldList == null)
634
        if (requiredFieldList == null)
[+20] [20] 14 lines
[+20] public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
666
                if (rf.getIndex()!=-1)
649
                if (rf.getIndex()!=-1)
667
                    mRequiredColumns[rf.getIndex()] = true;
650
                    mRequiredColumns[rf.getIndex()] = true;
668
            }
651
            }
669
            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
652
            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
670
            try {
653
            try {
671
                p.setProperty(signature, ObjectSerializer.serialize(mRequiredColumns));
654
                p.setProperty(udfContextSignature, ObjectSerializer.serialize(mRequiredColumns));
672
            } catch (Exception e) {
655
            } catch (Exception e) {
673
                throw new RuntimeException("Cannot serialize mRequiredColumns");
656
                throw new RuntimeException("Cannot serialize mRequiredColumns");
674
            }
657
            }
675
        }
658
        }
676
        return new RequiredFieldResponse(true);
659
        return new RequiredFieldResponse(true);
677
    }
660
    }
678

    
   
661

   
679
    @Override
662
    @Override
680
    public void setUDFContextSignature(String signature) {
663
    public void setUDFContextSignature(String signature) {
681
        this.signature = signature; 
664
        this.udfContextSignature = signature; 
682
    }
665
    }
683

    
   
666

   
684
    @Override
667
    @Override
685
    public List<OperatorSet> getFeatures() {
668
    public List<OperatorSet> getFeatures() {
686
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
669
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
687
    }
670
    }
688

    
   

   
689
}
671
}
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java
Revision 9bed527 New Change
 
  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java: Loading...