Review Board 1.7.22


PIG-3215 [piggybank] Add LTSVLoader to load LTSV files

Review Request #9685 - Created Feb. 28, 2013 and updated

Taku Miyakawa
trunk
PIG-3215
Reviewers
pig
pig-git
This is a review board for  https://issues.apache.org/jira/browse/PIG-3215

The patch adds LTSVLoader function and its test class.
ant compile-test
cd contrib/piggybank/java/
ant -Dtestcase=TestLTSVLoader test
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/LTSVLoader.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing, software

    
   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
15
 * See the License for the specific language governing permissions and

    
   
16
 * limitations under the License.

    
   
17
 */

    
   
18

   

    
   
19
package org.apache.pig.piggybank.storage;

    
   
20

   

    
   
21
import java.util.List;

    
   
22
import java.util.Map;

    
   
23
import java.util.Set;

    
   
24
import java.util.ArrayList;

    
   
25
import java.util.HashMap;

    
   
26
import java.util.HashSet;

    
   
27
import java.util.Properties;

    
   
28
import java.util.Collections;

    
   
29
import java.io.IOException;

    
   
30

   

    
   
31
import org.apache.commons.logging.Log;

    
   
32
import org.apache.commons.logging.LogFactory;

    
   
33

   

    
   
34
import org.apache.pig.Expression;

    
   
35
import org.apache.pig.FileInputLoadFunc;

    
   
36
import org.apache.pig.LoadCaster;

    
   
37
import org.apache.pig.LoadPushDown;

    
   
38
import org.apache.pig.ResourceStatistics;

    
   
39
import org.apache.pig.ResourceSchema;

    
   
40
import org.apache.pig.ResourceSchema.ResourceFieldSchema;

    
   
41
import org.apache.pig.PigWarning;

    
   
42
import org.apache.pig.PigException;

    
   
43
import org.apache.pig.LoadMetadata;

    
   
44
import org.apache.pig.bzip2r.Bzip2TextInputFormat;

    
   
45
import org.apache.pig.data.DataByteArray;

    
   
46
import org.apache.pig.data.DataType;

    
   
47
import org.apache.pig.data.Tuple;

    
   
48
import org.apache.pig.data.TupleFactory;

    
   
49
import org.apache.pig.backend.executionengine.ExecException;

    
   
50
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

    
   
51
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;

    
   
52
import org.apache.pig.impl.logicalLayer.schema.Schema;

    
   
53
import org.apache.pig.impl.logicalLayer.FrontendException;

    
   
54
import org.apache.pig.impl.util.CastUtils;

    
   
55
import org.apache.pig.impl.util.Utils;

    
   
56
import org.apache.pig.impl.util.UDFContext;

    
   
57
import org.apache.hadoop.io.Text;

    
   
58
import org.apache.hadoop.mapreduce.Job;

    
   
59
import org.apache.hadoop.mapreduce.InputFormat;

    
   
60
import org.apache.hadoop.mapreduce.RecordReader;

    
   
61
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    
   
62

   

    
   
63
/**

    
   
64
 * Loader UDF for <a href="http://ltsv.org/">LTSV</a> files,

    
   
65
 * or Labeled Tab-separated Values files.

    
   
66
 *

    
   
67
 * <h3>About LTSV</h3>

    
   
68
 *

    
   
69
 * <p>

    
   
70
 * LTSV, or Labeled Tab-separated Values format is a format for log files.

    
   
71
 * LTSV is based on TSV.

    
   
72
 * Columns are separated by tab characters,

    
   
73
 * and each of columns includes a label and a value,

    
   
74
 * separated by a ":" character.

    
   
75
 * </p>

    
   
76
 *

    
   
77
 * <p>

    
   
78
 * This is an example LTSV log file.

    
   
79
 * Suppose that columns are separated by tab characters.

    
   
80
 * </p>

    
   
81
 *

    
   
82
 * <pre>

    
   
83
 * host:host1.example.org   req:GET /index.html   ua:Opera/9.80

    
   
84
 * host:host1.example.org   req:GET /favicon.ico  ua:Opera/9.80

    
   
85
 * host:pc.example.com      req:GET /news.html    ua:Mozilla/5.0

    
   
86
 * </pre>

    
   
87
 *

    
   
88
 * <p>You can read about LTSV on <a href="http://ltsv.org/">http://ltsv.org/</a>.</p>

    
   
89
 *

    
   
90
 * <p>

    
   
91
 * You can use the UDF in two ways.

    
   
92
 * First, you can specify labels and get them as Pig fields.

    
   
93
 * Second, you can extract a map of all columns of each LTSV line.

    
   
94
 * </p>

    
   
95
 *

    
   
96
 * <h3>Extract fields from each line</h3>

    
   
97
 *

    
   
98
 * <p>

    
   
99
 * To extract fields from each line,

    
   
100
 * construct a loader function with a schema string.

    
   
101
 * </p>

    
   
102
 *

    
   
103
 * <dl>

    
   
104
 *   <dt><em>Syntax</em></dt>

    
   
105
 *   <dd>org.apache.pig.piggybank.storage.LTSVLoader('&lt;schema-string&gt;')</dd>

    
   
106
 *

    
   
107
 *   <dt><em>Output</em></dt>

    
   
108
 *   <dd>The schema of the output is specified by the argument of the constructor.

    
   
109
 *   The value of a field comes from a column whoes label is equal to the field name.

    
   
110
 *   If the corresponding label does not exist in the LTSV line,

    
   
111
 *   the field is set to null.</dd>

    
   
112
 * </dl>

    
   
113
 *

    
   
114
 * <p>

    
   
115
 * This example script loads an LTSV file shown in the previous section, named access.log.

    
   
116
 * </p>

    
   
117
 *

    
   
118
 * <pre>

    
   
119
 * -- Parses the access log and count the number of lines

    
   
120
 * -- for each pair of the host column and the ua column.

    
   
121
 * access = LOAD 'access.log' USING org.apache.pig.piggybank.storage.LTSVLoader('host:chararray, ua:chararray');

    
   
122
 * grouped_access = GROUP access BY (host, ua);

    
   
123
 * count_for_host_ua = FOREACH grouped_access GENERATE group.host, group.ua, COUNT(access);

    
   
124
 *

    
   
125
 * -- Prints out the result.

    
   
126
 * DUMP count_for_host_ua;

    
   
127
 * </pre>

    
   
128
 *

    
   
129
 * <p>The below text will be printed out.</p>

    
   
130
 *

    
   
131
 * <pre>

    
   
132
 * (host1.example.org,Opera/9.80,2)

    
   
133
 * (pc.example.com,Firefox/5.0,1)

    
   
134
 * </pre>

    
   
135
 *

    
   
136
 * <h3>Extract a map from each line</h3>

    
   
137
 *

    
   
138
 * <p>

    
   
139
 * To extract a map from each line,

    
   
140
 * construct a loader function without parameters.

    
   
141
 * </p>

    
   
142
 *

    
   
143
 * <dl>

    
   
144
 *   <dt><em>Syntax</em></dt>

    
   
145
 *   <dd>org.apache.pig.piggybank.storage.LTSVLoader()</dd>

    
   
146
 *

    
   
147
 *   <dt><em>Output</em></dt>

    
   
148
 *   <dd>The schema of the output is (:map[]), or an unary tuple of a map, for each LTSV row.

    
   
149
 *   The key of a map is a label of the LTSV column.

    
   
150
 *   The value of a map comes from characters after ":" in the LTSV column.</dd>

    
   
151
 * </dl>

    
   
152
 *

    
   
153
 * <p>

    
   
154
 * This example script loads an LTSV file shown in the previous section, named access.log.

    
   
155
 * </p>

    
   
156
 *

    
   
157
 * <pre>

    
   
158
 * -- Parses the access log and projects the user agent field.

    
   
159
 * access = LOAD 'access.log' USING org.apache.pig.piggybank.storage.LTSVLoader() AS (m);

    
   
160
 * user_agent = FOREACH access GENERATE m#'ua' AS ua;

    
   
161
 * -- Prints out the user agent for each access.

    
   
162
 * DUMP user_agent;

    
   
163
 * </pre>

    
   
164
 *

    
   
165
 * <p>The below text will be printed out.</p>

    
   
166
 *

    
   
167
 * <pre>

    
   
168
 * (Opera/9.80)

    
   
169
 * (Opera/9.80)

    
   
170
 * (Firefox/5.0)

    
   
171
 * </pre>

    
   
172
 *

    
   
173
 * <h3>Handling of malformed input</h3>

    
   
174
 *

    
   
175
 * <p>

    
   
176
 * All valid LTSV columns contain ":" to separate a field and a value.

    
   
177
 * If a column does not contain ":", the column is malformed.

    
   
178
 * </p>

    
   
179
 *

    
   
180
 * <p>

    
   
181
 * For each of malformed column, the counter

    
   
182
 * {@link PigWarning#UDF_WARNING_8} will be counted up.

    
   
183
 * This counter is printed out when the Pig job completes.

    
   
184
 * </p>

    
   
185
 *

    
   
186
 * <p>

    
   
187
 * In the mapreduce mode, the counter shall be output like below.

    
   
188
 * </p>

    
   
189
 *

    
   
190
 * <pre>

    
   
191
 * 2013-02-17 12:14:22,070 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning UDF_WARNING_8 10000 time(s).

    
   
192
 * </pre>

    
   
193
 *

    
   
194
 * <p>

    
   
195
 * For the first 100 of malformed columns in each task,

    
   
196
 * warning messages are written to the tasklog.

    
   
197
 * Such as:

    
   
198
 * </p>

    
   
199
 *

    
   
200
 * <pre>

    
   
201
 * 2013-02-17 12:13:42,142 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata1" does not contain ":".

    
   
202
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata2" does not contain ":".

    
   
203
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata3" does not contain ":".

    
   
204
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata4" does not contain ":".

    
   
205
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata5" does not contain ":".

    
   
206
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata6" does not contain ":".

    
   
207
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata7" does not contain ":".

    
   
208
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata8" does not contain ":".

    
   
209
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata9" does not contain ":".

    
   
210
 * ...

    
   
211
 * </pre>

    
   
212
 */

    
   
213
public class LTSVLoader extends FileInputLoadFunc implements LoadPushDown, LoadMetadata {

    
   
214

   

    
   
215
    /** Logger of this class. */

    
   
216
    private static final Log LOG = LogFactory.getLog(LTSVLoader.class);

    
   
217

   

    
   
218
    /** Emitter of tuples */

    
   
219
    private final TupleEmitter tupleEmitter;

    
   
220

   

    
   
221
    /** Length of "\t" in UTF-8. */

    
   
222
    private static int TAB_LENGTH = 1;

    
   
223

   

    
   
224
    /** Length of ":" in UTF-8. */

    
   
225
    private static int COLON_LENGTH = 1;

    
   
226

   

    
   
227
    /** Factory of tuples. */

    
   
228
    private final TupleFactory tupleFactory = TupleFactory.getInstance();

    
   
229

   

    
   
230
    /** Schema of the output of the loader, which is (:map[]). */

    
   
231
    private static final ResourceSchema MAP_SCHEMA

    
   
232
        = new ResourceSchema(new Schema(new Schema.FieldSchema(null, DataType.MAP)));

    
   
233

   

    
   
234
    /**

    
   
235
     * An error code of an input error.

    
   
236
     * See https://cwiki.apache.org/confluence/display/PIG/PigErrorHandlingFunctionalSpecification.

    
   
237
     */

    
   
238
    private static final int INPUT_ERROR_CODE = 6018;

    
   
239

   

    
   
240
    /**

    
   
241
     * An error message of an input error.

    
   
242
     * See https://cwiki.apache.org/confluence/display/PIG/PigErrorHandlingFunctionalSpecification.

    
   
243
     */

    
   
244
    private static final String INPUT_ERROR_MESSAGE = "Error while reading input";

    
   
245

   

    
   
246
    /** Underlying record reader of a text file. */

    
   
247
    @SuppressWarnings("rawtypes")

    
   
248
    private RecordReader reader = null;

    
   
249

   

    
   
250
    /** Sequence number of the next log message, which starts from 0. */

    
   
251
    private int warnLogSeqNum = 0;

    
   
252

   

    
   
253
    /** Max count of warning logs which will be output to the task log. */

    
   
254
    private static final int MAX_WARN_LOG_COUNT = 100;

    
   
255

   

    
   
256
    /** Key of a property which contains Set[String] of labels to output. */

    
   
257
    private static final String LABELS_TO_OUTPUT = "LABELS_TO_OUTPUT";

    
   
258

   

    
   
259
    /** Key of a property which contains Set[Integer] of indexes of fields to output in a schema. */

    
   
260
    private static final String INDEXES_TO_OUTPUT_IN_SCHEMA = "INDEXES_TO_OUTPUT_IN_SCHEMA";

    
   
261

   

    
   
262
    /**

    
   
263
     * An error code of an internal error.

    
   
264
     * See https://cwiki.apache.org/confluence/display/PIG/PigErrorHandlingFunctionalSpecification.

    
   
265
     */

    
   
266
    private static final int INTERNAL_ERROR_CODE = 2998;

    
   
267

   

    
   
268
    /** Location of input files. */

    
   
269
    private String loadLocation;

    
   
270

   

    
   
271
    /** Signature of the UDF invocation, used to get UDFContext. */

    
   
272
    private String signature;

    
   
273

   

    
   
274
    /**

    
   
275
     * Constructs a loader which extracts a tuple including a single map

    
   
276
     * for each column of an LTSV line.

    
   
277
     */

    
   
278
    public LTSVLoader() {

    
   
279
        this.tupleEmitter = new MapTupleEmitter();

    
   
280
    }

    
   
281

   

    
   
282
    /**

    
   
283
     * Constructs a loader which extracts a tuple including specified fields

    
   
284
     * for each column of an LTSV line.

    
   
285
     *

    
   
286
     * @param schemaString

    
   
287
     *     Schema of fields to extract.

    
   
288
     *

    
   
289
     * @throws IOException

    
   
290
     *     Thrown when an I/O error occurs during construction.

    
   
291
     */

    
   
292
    public LTSVLoader(String schemaString) throws IOException {

    
   
293
        this.tupleEmitter = new FieldsTupleEmitter(schemaString);

    
   
294
    }

    
   
295

   

    
   
296
    /**

    
   
297
     * Reads an LTSV line and returns a tuple,

    
   
298
     * or returns {@code null} if the loader reaches the end of the block.

    
   
299
     *

    
   
300
     * @return

    
   
301
     *     Tuple corresponding to an LTSV line,

    
   
302
     *     or {@code null} if the loader reaches the end of the block.

    
   
303
     *

    
   
304
     * @throws IOException

    
   
305
     *     If an I/O error occurs while reading the line.

    
   
306
     */

    
   
307
    @Override

    
   
308
    public Tuple getNext() throws IOException {

    
   
309
        Text line = readLine();

    
   
310

   

    
   
311
        if (line == null) {

    
   
312
            return null;

    
   
313
        }

    
   
314

   

    
   
315
        // Current line: lineBytes[0, lineLength)

    
   
316
        byte[] lineBytes = line.getBytes();

    
   
317
        int lineLength = line.getLength();

    
   
318

   

    
   
319
        this.tupleEmitter.startTuple();

    
   
320

   

    
   
321
        // Reads columns.

    
   
322
        int startOfColumn = 0;

    
   
323
        while (startOfColumn <= lineLength) {

    
   
324
            int endOfColumn = findUntil((byte) '\t', lineBytes, startOfColumn, lineLength);

    
   
325
            readColumn(lineBytes, startOfColumn, endOfColumn);

    
   
326
            startOfColumn = endOfColumn + TAB_LENGTH;

    
   
327
        }

    
   
328

   

    
   
329
        return this.tupleEmitter.emitTuple();

    
   
330
    }

    
   
331

   

    
   
332
    /**

    
   
333
     * Reads a column to the tuple emitter.

    
   
334
     */

    
   
335
    private void readColumn(byte[] bytes, int start, int end) throws IOException {

    
   
336
        int colon = findUntil((byte) ':', bytes, start, end);

    
   
337
        boolean isLabeled = colon < end;

    
   
338
        if (! isLabeled) {

    
   
339
            warnMalformedColumn(Text.decode(bytes, start, end - start));

    
   
340
            return;

    
   
341
        }

    
   
342

   

    
   
343
        // Label: bytes[start, colon)

    
   
344
        // Colon: bytes[colon, colon + 1)

    
   
345
        // Value: bytes[colon + 1 = startOfValue, end)

    
   
346

   

    
   
347
        String label = Text.decode(bytes, start, colon - start);

    
   
348
        int startOfValue = colon + COLON_LENGTH;

    
   
349
        this.tupleEmitter.addColumnToTuple(label, bytes, startOfValue, end);

    
   
350
    }

    
   
351

   

    
   
352
    /**

    
   
353
     * Returns the index of the first target in bytes[start, end),

    
   
354
     * or the index of the end.

    
   
355
     */

    
   
356
    private static int findUntil(byte target, byte[] bytes, int start, int end) {

    
   
357
        for (int index = start; index < end; ++ index) {

    
   
358
            if (bytes[index] == target) {

    
   
359
                return index;

    
   
360
            }

    
   
361
        }

    
   
362
        return end;

    
   
363
    }

    
   
364

   

    
   
365
    /** Outputs a warning for a malformed column. */

    
   
366
    private void warnMalformedColumn(String column) {

    
   
367
        String message = String.format("MalformedColumn: Column \"%s\" does not contain \":\".", column);

    
   
368
        warn(message, PigWarning.UDF_WARNING_8);

    
   
369

   

    
   
370
        // Output at most MAX_WARN_LOG_COUNT warning messages.

    
   
371
        if (this.warnLogSeqNum < MAX_WARN_LOG_COUNT) {

    
   
372
            LOG.warn(message);

    
   
373
            ++ this.warnLogSeqNum;

    
   
374
        }

    
   
375
    }

    
   
376

   

    
   
377
    /**

    
   
378
     * Reads a line from the block,

    
   
379
     * or {@code null} if the loader reaches the end of the block.

    
   
380
     */

    
   
381
    private Text readLine() throws IOException {

    
   
382
        try {

    
   
383
            if (! this.reader.nextKeyValue()) {

    
   
384
                return null;

    
   
385
            }

    
   
386
            return (Text) this.reader.getCurrentValue();

    
   
387
        } catch (InterruptedException exception) {

    
   
388
            throw new ExecException(INPUT_ERROR_MESSAGE, INPUT_ERROR_CODE,

    
   
389
                    PigException.REMOTE_ENVIRONMENT, exception);

    
   
390
        }

    
   
391
    }

    
   
392

   

    
   
393
    /**

    
   
394
     * Constructs a tuple from columns and emits it.

    
   
395
     *

    
   
396
     * This interface is used to switch the output type between a map and fields.

    
   
397
     *

    
   
398
     * For each row, these methods are called.

    
   
399
     *

    
   
400
     * <ol>

    
   
401
     *   <li>startTuple</li>

    
   
402
     *   <li>addColumnToTuple * the number of tuples</li>

    
   
403
     *   <li>emitTuple</li>

    
   
404
     * </ol>

    
   
405
     */

    
   
406
    private interface TupleEmitter {

    
   
407

   

    
   
408
        /**

    
   
409
         * Notifies the start of a tuple.

    
   
410
         */

    
   
411
        void startTuple();

    
   
412

   

    
   
413
        /**

    
   
414
         * Adds a value from bytes[startOfValue, endOfValue), corresponding to the label,

    
   
415
         * if the columns is required for the output.

    
   
416
         *

    
   
417
         * @param label

    
   
418
         *     Label of the column.

    
   
419
         *

    
   
420
         * @param bytes

    
   
421
         *     Byte array including the value.

    
   
422
         *

    
   
423
         * @param startOfValue

    
   
424
         *     Index a the start of the value (inclusive).

    
   
425
         *

    
   
426
         * @param endOfValue

    
   
427
         *     Index a the end of the value (exclusive).

    
   
428
         */

    
   
429
        void addColumnToTuple(String label, byte[] bytes, int startOfValue, int endOfValue)

    
   
430
            throws IOException;

    
   
431

   

    
   
432
        /**

    
   
433
         * Emits a tuple.

    
   
434
         */

    
   
435
        Tuple emitTuple();

    
   
436

   

    
   
437
        /**

    
   
438
         * Returns the schema of tuples.

    
   
439
         */

    
   
440
        ResourceSchema getSchema();

    
   
441

   

    
   
442
        /**

    
   
443
         * Notifies required fields in the script.

    
   
444
         *

    
   
445
         * <p>Delegation target from {@link LTSVLoader#pushProjection}.</p>

    
   
446
         */

    
   
447
        RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)

    
   
448
            throws FrontendException;

    
   
449
    }

    
   
450

   

    
   
451
    /**

    
   
452
     * Reads columns and emits a tuple with a single map field (:map[]).

    
   
453
     */

    
   
454
    private class MapTupleEmitter implements TupleEmitter {

    
   
455

   

    
   
456
        /** Contents of the single map field. */

    
   
457
        private Map<String, Object> map;

    
   
458

   

    
   
459
        @Override

    
   
460
        public void startTuple() {

    
   
461
            this.map = new HashMap<String, Object>();

    
   
462
        }

    
   
463

   

    
   
464
        @Override

    
   
465
        public void addColumnToTuple(String label, byte[] bytes, int startOfValue, int endOfValue)

    
   
466
        throws IOException {

    
   
467
            if (shouldOutput(label)) {

    
   
468
                DataByteArray value = new DataByteArray(bytes, startOfValue, endOfValue);

    
   
469
                this.map.put(label, value);

    
   
470
            }

    
   
471
        }

    
   
472

   

    
   
473
        @Override

    
   
474
        public Tuple emitTuple() {

    
   
475
            return tupleFactory.newTuple(map);

    
   
476
        }

    
   
477

   

    
   
478
        /**

    
   
479
         * Returns {@code true} if the column should be output.

    
   
480
         */

    
   
481
        private boolean shouldOutput(String label) {

    
   
482
            boolean outputsEveryColumn = (labelsToOutput() == null);

    
   
483
            return outputsEveryColumn || labelsToOutput().contains(label);

    
   
484
        }

    
   
485

   

    
   
486
        /** True if {@link labelsToOutput} is initialized. */

    
   
487
        private boolean isProjectionInitialized;

    
   
488

   

    
   
489
        /**

    
   
490
         * Labels of columns to output, or {@code null} if all columns should be output.

    
   
491
         * This field should be accessed from {@link #labelsToOutput}.

    
   
492
         */

    
   
493
        private Set<String> labelsToOutput;

    
   
494

   

    
   
495
        /**

    
   
496
         * Returns labels of columns to output,

    
   
497
         * or {@code null} if all columns should be output.

    
   
498
         */

    
   
499
        private Set<String> labelsToOutput() {

    
   
500
            if (! this.isProjectionInitialized) {

    
   
501
                @SuppressWarnings("unchecked")

    
   
502
                Set<String> labels = (Set<String>) getProperties().get(LABELS_TO_OUTPUT);

    
   
503
                this.labelsToOutput = labels;

    
   
504
                LOG.debug("Labels to output: " + this.labelsToOutput);

    
   
505
                this.isProjectionInitialized = true;

    
   
506
            }

    
   
507
            return this.labelsToOutput;

    
   
508
        }

    
   
509

   

    
   
510
        @Override

    
   
511
        public ResourceSchema getSchema() {

    
   
512
            return MAP_SCHEMA;

    
   
513
        }

    
   
514

   

    
   
515
        @Override

    
   
516
        public RequiredFieldResponse

    
   
517
        pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {

    
   
518
            List<RequiredField> fields = requiredFieldList.getFields();

    
   
519
            if (fields == null || fields.isEmpty()) {

    
   
520
                LOG.debug("No fields specified as required.");

    
   
521
                return new RequiredFieldResponse(false);

    
   
522
            }

    
   
523

   

    
   
524
            if (fields.size() != 1) {

    
   
525
                String message = String.format(

    
   
526
                        "The loader expects at most one field but %d fields are specified."

    
   
527
                        , fields.size());

    
   
528
                throw new FrontendException(message, INTERNAL_ERROR_CODE, PigException.BUG);

    
   
529
            }

    
   
530

   

    
   
531
            RequiredField field = fields.get(0);

    
   
532
            if (field.getIndex() != 0) {

    
   
533
                String message = String.format(

    
   
534
                        "The loader produces only 1ary tuples, but the index %d is specified."

    
   
535
                        , field.getIndex());

    
   
536
                throw new FrontendException(message, INTERNAL_ERROR_CODE, PigException.BUG);

    
   
537
            }

    
   
538

   

    
   
539
            List<RequiredField> mapKeys = field.getSubFields();

    
   
540
            if (mapKeys == null) {

    
   
541
                LOG.debug("All the labels are required.");

    
   
542
                return new RequiredFieldResponse(false);

    
   
543
            }

    
   
544

   

    
   
545
            Set<String> labels = new HashSet<String>();

    
   
546
            for (RequiredField mapKey : mapKeys) {

    
   
547
                labels.add(mapKey.getAlias());

    
   
548
            }

    
   
549
            getProperties().put(LABELS_TO_OUTPUT, labels);

    
   
550
            LOG.debug("Labels to output: " + labels);

    
   
551
            return new RequiredFieldResponse(true);

    
   
552
        }

    
   
553
    }

    
   
554

   

    
   
555
    /**

    
   
556
     * Reads columns and emits a tuple with fields specified

    
   
557
     * by the constructor of the load function.

    
   
558
     */

    
   
559
    private class FieldsTupleEmitter implements TupleEmitter {

    
   
560

   

    
   
561
        /*

    
   
562
         * Note that

    
   
563
         * indexes in the input schema (indexInSchema)

    
   
564
         * and indexes in output tuples (indexInTuple)

    
   
565
         * may differ when the loader performs projection.

    
   
566
         *

    
   
567
         * For example:

    
   
568
         *

    
   
569
         * - At the input schema in LTSVLoader('id, name, salary'), the indexes are

    
   
570
         *   id=>0, name=>1, salary=>2.

    
   
571
         *

    
   
572
         * - If "name" and "salary" fields are projected, the indexes in output tuples are

    
   
573
         *   name=>0, salary=>1.

    
   
574
         */

    
   
575

   

    
   
576
        /** Schema of tuples. */

    
   
577
        private final ResourceSchema schema;

    
   
578

   

    
   
579
        /** Tuple to emit. */

    
   
580
        private Tuple tuple;

    
   
581

   

    
   
582
        /** Caster of values. */

    
   
583
        private final LoadCaster loadCaster = getLoadCaster();

    
   
584

   

    
   
585
        /** Mapping from labels to indexes in a tuple. */

    
   
586
        private Map<String, Integer> labelToIndexInTuple;

    
   
587

   

    
   
588
        /** Schemas of fields in a tuple. */

    
   
589
        private List<ResourceFieldSchema> fieldSchemasInTuple;

    
   
590

   

    
   
591
        /** Set of labels which have occurred and been skipped. */

    
   
592
        private final Set<String> skippedLabels = new HashSet<String>();

    
   
593

   

    
   
594
        /**

    
   
595
         * Constructs an emitter with the schema.

    
   
596
         */

    
   
597
        private FieldsTupleEmitter(String schemaString) throws IOException {

    
   
598
            Schema rawSchema = Utils.getSchemaFromString(schemaString);

    
   
599
            this.schema = new ResourceSchema(rawSchema);

    
   
600
        }

    
   
601

   

    
   
602
        @Override

    
   
603
        public void startTuple() {

    
   
604
            initOrderOfFieldsInTuple();

    
   
605
            this.tuple = tupleFactory.newTuple(this.labelToIndexInTuple.size());

    
   
606
        }

    
   
607

   

    
   
608
        @Override

    
   
609
        public void addColumnToTuple(String label, byte[] bytes, int startOfValue, int endOfValue)

    
   
610
        throws IOException {

    
   
611
            if (! this.labelToIndexInTuple.containsKey(label)) {

    
   
612
                logSkippedLabelAtFirstOccurrence(label);

    
   
613
                return;

    
   
614
            }

    
   
615

   

    
   
616
            int indexInTuple = this.labelToIndexInTuple.get(label);

    
   
617
            ResourceFieldSchema fieldSchema = this.fieldSchemasInTuple.get(indexInTuple);

    
   
618
            int valueLength = endOfValue - startOfValue;

    
   
619
            byte[] valueBytes = new byte[valueLength];

    
   
620
            System.arraycopy(bytes, startOfValue, valueBytes, 0, valueLength);

    
   
621
            Object value = CastUtils.convertToType(this.loadCaster, valueBytes, fieldSchema, fieldSchema.getType());

    
   
622
            this.tuple.set(indexInTuple, value);

    
   
623
        }

    
   
624

   

    
   
625

   

    
   
626
        /**

    
   
627
         * Initializes {@link labelsToOutput} and {@link fieldSchemasInTuple} if required.

    
   
628
         */

    
   
629
        private void initOrderOfFieldsInTuple() {

    
   
630
            if (this.labelToIndexInTuple != null && this.fieldSchemasInTuple != null) {

    
   
631
                return;

    
   
632
            }

    
   
633
            @SuppressWarnings("unchecked")

    
   
634
            Set<Integer> indexesToOutputInSchema = (Set<Integer>) getProperties().get(INDEXES_TO_OUTPUT_IN_SCHEMA);

    
   
635
            Map<String, Integer> labelToIndexInTuple = new HashMap<String, Integer>();

    
   
636
            List<ResourceFieldSchema> fieldSchemasInTuple = new ArrayList<ResourceFieldSchema>();

    
   
637
            for (int indexInSchema = 0; indexInSchema < this.schema.getFields().length; ++ indexInSchema) {

    
   
638
                if (indexesToOutputInSchema == null || indexesToOutputInSchema.contains(indexInSchema)) {

    
   
639
                    ResourceFieldSchema fieldSchema = this.schema.getFields()[indexInSchema];

    
   
640
                    int indexInTuple = fieldSchemasInTuple.size();

    
   
641
                    labelToIndexInTuple.put(fieldSchema.getName(), indexInTuple);

    
   
642
                    fieldSchemasInTuple.add(fieldSchema);

    
   
643
                }

    
   
644
            }

    
   
645
            LOG.debug("Label -> Index: " + labelToIndexInTuple);

    
   
646
            this.labelToIndexInTuple = labelToIndexInTuple;

    
   
647
            this.fieldSchemasInTuple = fieldSchemasInTuple;

    
   
648
        }

    
   
649

   

    
   
650
        /**

    
   
651
         * Outputs the label of a skipped column to the task log at the first occurrence.

    
   
652
         */

    
   
653
        private void logSkippedLabelAtFirstOccurrence(String label) {

    
   
654
            if (LOG.isDebugEnabled() && ! this.skippedLabels.contains(label)) {

    
   
655
                this.skippedLabels.add(label);

    
   
656
                LOG.debug("Skipped label: " + label);

    
   
657
            }

    
   
658
        }

    
   
659

   

    
   
660
        @Override

    
   
661
        public Tuple emitTuple() {

    
   
662
            return this.tuple;

    
   
663
        }

    
   
664

   

    
   
665
        @Override

    
   
666
        public ResourceSchema getSchema() {

    
   
667
            return this.schema;

    
   
668
        }

    
   
669

   

    
   
670
        @Override

    
   
671
        public RequiredFieldResponse

    
   
672
        pushProjection(RequiredFieldList requiredFieldList) {

    
   
673
            List<RequiredField> fields = requiredFieldList.getFields();

    
   
674
            if (fields == null) {

    
   
675
                LOG.debug("No fields specified as required.");

    
   
676
                return new RequiredFieldResponse(false);

    
   
677
            }

    
   
678

   

    
   
679
            Set<Integer> indexesToOutputInSchema = new HashSet<Integer>();

    
   
680
            for (RequiredField field : fields) {

    
   
681
                indexesToOutputInSchema.add(field.getIndex());

    
   
682
            }

    
   
683

   

    
   
684
            getProperties().put(INDEXES_TO_OUTPUT_IN_SCHEMA, indexesToOutputInSchema);

    
   
685
            LOG.debug("Indexes of fields to output in the schema: " + indexesToOutputInSchema);

    
   
686
            return new RequiredFieldResponse(true);

    
   
687
        }

    
   
688
    }

    
   
689

   

    
   
690
    /**

    
   
691
     * Saves the RecordReader.

    
   
692
     *

    
   
693
     * @param reader

    
   
694
     *     RecordReader to read LTSV lines from.

    
   
695
     *

    
   
696
     * @param split

    
   
697
     *     Ignored.

    
   
698
     *

    
   
699
     */

    
   
700
    @Override

    
   
701
    public void

    
   
702
    prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {

    
   
703
        this.reader = reader;

    
   
704
    }

    
   
705

   

    
   
706
    /**

    
   
707
     * Extracts information about which labels are required in the script.

    
   
708
     *

    
   
709
     * @param requiredFieldList

    
   
710
     *     {@inheritDoc}.

    
   
711
     *

    
   
712
     * @return

    
   
713
     *     {@code new RequiredFieldResponse(true)} if projection will be performed,

    
   
714
     *     or {@code new RequiredFieldResponse(false)} otherwise.

    
   
715
     *

    
   
716
     * @throws FrontendException

    
   
717
     *     When an unexpected internal error occurs.

    
   
718
     */

    
   
719
    @Override

    
   
720
    public RequiredFieldResponse

    
   
721
    pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {

    
   
722
        return this.tupleEmitter.pushProjection(requiredFieldList);

    
   
723
    }

    
   
724

   

    
   
725
    /**

    
   
726
     * <p>This UDF supports

    
   
727
     * {@linkplain org.apache.pig.LoadPushDown.OperatorSet#PROJECTION projection push-down}.</p>

    
   
728
     *

    
   
729
     * @return

    
   
730
     *     Singleton list of

    
   
731
     *     {@link org.apache.pig.LoadPushDown.OperatorSet#PROJECTION}.

    
   
732
     */

    
   
733
    @Override

    
   
734
    public List<OperatorSet> getFeatures() {

    
   
735
        return Collections.singletonList(OperatorSet.PROJECTION);

    
   
736
    }

    
   
737

   

    
   
738
    /**

    
   
739
     * Configures the underlying input format

    
   
740
     * to read lines from {@code location}.

    
   
741
     *

    
   
742
     * @param location

    
   
743
     *     Location of LTSV file(s).

    
   
744
     *

    
   
745
     * @param job

    
   
746
     *     Job.

    
   
747
     *

    
   
748
     * @throws IOException

    
   
749
     *     If an I/O error occurs while configuring the job.

    
   
750
     */

    
   
751
    @Override

    
   
752
    public void setLocation(String location, Job job) throws IOException {

    
   
753
        this.loadLocation = location;

    
   
754
        FileInputFormat.setInputPaths(job, location);

    
   
755
    }

    
   
756

   

    
   
757
    /**

    
   
758
     * Makes an instance of an input format from which LTSV lines are read.

    
   
759
     *

    
   
760
     * @return

    
   
761
     *     Instance of {@link PigTextInputFormat}.

    
   
762
     */

    
   
763
    @Override

    
   
764
    public InputFormat getInputFormat() {

    
   
765
        if (loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {

    
   
766
            // Bzip2TextInputFormat can split bzipped files.

    
   
767
            LOG.debug("Uses Bzip2TextInputFormat");

    
   
768
            return new Bzip2TextInputFormat();

    
   
769
        } else {

    
   
770
            LOG.debug("Uses PigTextInputFormat");

    
   
771
            return new PigTextInputFormat();

    
   
772
        }

    
   
773
    }

    
   
774

   

    
   
775
    /**

    
   
776
     * Saves the signature of the current invocation of the UDF.

    
   
777
     *

    
   
778
     * @param signature

    
   
779
     *     Signature of the current invocation of the UDF.

    
   
780
     */

    
   
781
    @Override

    
   
782
    public void setUDFContextSignature(String signature) {

    
   
783
        this.signature = signature;

    
   
784
    }

    
   
785

   

    
   
786
    /**

    
   
787
     * Returns the properties related with the current invocation of the UDF.

    
   
788
     */

    
   
789
    private Properties getProperties() {

    
   
790
        return UDFContext.getUDFContext().getUDFProperties(

    
   
791
                getClass(), new String[] { this.signature });

    
   
792
    }

    
   
793

   

    
   
794
    // Methods for LoadMetadata

    
   
795

   

    
   
796
    /**

    
   
797
     * Does nothing,

    
   
798
     * because the loader does not know about partitioning.

    
   
799
     *

    
   
800
     * @param partitionFilter

    
   
801
     *     Ignored.

    
   
802
     */

    
   
803
    @Override

    
   
804
    public void setPartitionFilter(Expression partitionFilter) {

    
   
805
    }

    
   
806

   

    
   
807
    /**

    
   
808
     * Returns {@code null},

    
   
809
     * because the loader does not know about partitioning.

    
   
810
     *

    
   
811
     * @param location

    
   
812
     *     Ignored.

    
   
813
     *

    
   
814
     * @param job

    
   
815
     *     Ignored.

    
   
816
     *

    
   
817
     * @return

    
   
818
     *     null.

    
   
819
     */

    
   
820
    @Override

    
   
821
    public String[] getPartitionKeys(String location, Job job) {

    
   
822
        return null;

    
   
823
    }

    
   
824

   

    
   
825
    /**

    
   
826
     * Returns {@code null}, because no statistics are available.

    
   
827
     *

    
   
828
     * @param location

    
   
829
     *     Ignored.

    
   
830
     *

    
   
831
     * @param job

    
   
832
     *     Ignored.

    
   
833
     *

    
   
834
     * @return

    
   
835
     *     null.

    
   
836
     */

    
   
837
    @Override

    
   
838
    public ResourceStatistics getStatistics(String location, Job job) {

    
   
839
        return null;

    
   
840
    }

    
   
841

   

    
   
842
    /**

    
   
843
     * Returns the schema of the output of the loader.

    
   
844
     *

    
   
845
     * @param location

    
   
846
     *     Ignored, because the schema is constant.

    
   
847
     *

    
   
848
     * @param job

    
   
849
     *     Ignored, because the schema is constant.

    
   
850
     *

    
   
851
     * @return

    
   
852
     *     Schema of the output of the loader.

    
   
853
     */

    
   
854
    @Override

    
   
855
    public ResourceSchema getSchema(String location, Job job) {

    
   
856
        return this.tupleEmitter.getSchema();

    
   
857
    }

    
   
858
}

    
   
859

   

    
   
860
// vim: et sw=4 sts=4
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLTSVLoader.java
New File
 
  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/LTSVLoader.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLTSVLoader.java: Loading...