Review Board 1.7.22


PIG-3215 [piggybank] Add LTSVLoader to load LTSV files

Review Request #9685 - Created Feb. 28, 2013 and updated

Taku Miyakawa
trunk
PIG-3215
Reviewers
pig
pig-git
This is a review board for  https://issues.apache.org/jira/browse/PIG-3215

The patch adds LTSVLoader function and its test class.
ant compile-test
cd contrib/piggybank/java/
ant -Dtestcase=TestLTSVLoader test

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 6. See what's changed.

1 2 3 4 5 6
1 2 3 4 5 6

  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/LTSVLoader.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLTSVLoader.java: Loading...
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/LTSVLoader.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing, software

    
   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
15
 * See the License for the specific language governing permissions and

    
   
16
 * limitations under the License.

    
   
17
 */

    
   
18

   

    
   
19
package org.apache.pig.piggybank.storage;

    
   
20

   

    
   
21
import java.util.List;

    
   
22
import java.util.Map;

    
   
23
import java.util.Set;

    
   
24
import java.util.HashMap;

    
   
25
import java.util.HashSet;

    
   
26
import java.util.Properties;

    
   
27
import java.util.Collections;

    
   
28
import java.io.IOException;

    
   
29

   

    
   
30
import org.apache.commons.logging.Log;

    
   
31
import org.apache.commons.logging.LogFactory;

    
   
32

   

    
   
33
import org.apache.pig.Expression;

    
   
34
import org.apache.pig.FileInputLoadFunc;

    
   
35
import org.apache.pig.LoadCaster;

    
   
36
import org.apache.pig.LoadPushDown;

    
   
37
import org.apache.pig.ResourceStatistics;

    
   
38
import org.apache.pig.ResourceSchema;

    
   
39
import org.apache.pig.PigWarning;

    
   
40
import org.apache.pig.PigException;

    
   
41
import org.apache.pig.LoadMetadata;

    
   
42
import org.apache.pig.bzip2r.Bzip2TextInputFormat;

    
   
43
import org.apache.pig.data.DataByteArray;

    
   
44
import org.apache.pig.data.DataType;

    
   
45
import org.apache.pig.data.Tuple;

    
   
46
import org.apache.pig.data.TupleFactory;

    
   
47
import org.apache.pig.backend.executionengine.ExecException;

    
   
48
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

    
   
49
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;

    
   
50
import org.apache.pig.impl.logicalLayer.schema.Schema;

    
   
51
import org.apache.pig.impl.logicalLayer.FrontendException;

    
   
52
import org.apache.pig.impl.util.CastUtils;

    
   
53
import org.apache.pig.impl.util.Utils;

    
   
54
import org.apache.pig.impl.util.UDFContext;

    
   
55
import org.apache.hadoop.io.Text;

    
   
56
import org.apache.hadoop.mapreduce.Job;

    
   
57
import org.apache.hadoop.mapreduce.InputFormat;

    
   
58
import org.apache.hadoop.mapreduce.RecordReader;

    
   
59
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    
   
60

   

    
   
61
/**

    
   
62
 * Loader UDF for <a href="http://ltsv.org/">LTSV</a> files,

    
   
63
 * or Labeled Tab-separated Values files.

    
   
64
 *

    
   
65
 * <h3>About LTSV</h3>

    
   
66
 *

    
   
67
 * <p>

    
   
68
 * LTSV, or Labeled Tab-separated Values format is a format for log files.

    
   
69
 * LTSV is based on TSV.

    
   
70
 * Columns are separated by tab characters,

    
   
71
 * and each of columns includes a label and a value,

    
   
72
 * separated by a ":" character.

    
   
73
 * </p>

    
   
74
 *

    
   
75
 * <p>

    
   
76
 * This is an example LTSV log file.

    
   
77
 * Suppose that columns are separated by tab characters.

    
   
78
 * </p>

    
   
79
 *

    
   
80
 * <pre>

    
   
81
 * host:host1.example.org   req:GET /index.html   ua:Opera/9.80

    
   
82
 * host:host1.example.org   req:GET /favicon.ico  ua:Opera/9.80

    
   
83
 * host:pc.example.com      req:GET /news.html    ua:Mozilla/5.0

    
   
84
 * </pre>

    
   
85
 *

    
   
86
 * <p>You can read about LTSV on <a href="http://ltsv.org/">http://ltsv.org/</a>.</p>

    
   
87
 *

    
   
88
 * <p>

    
   
89
 * You can use the UDF in two ways.

    
   
90
 * First, you can specify labels and get them as Pig fields.

    
   
91
 * Second, you can extract a map of all columns of each LTSV line.

    
   
92
 * </p>

    
   
93
 *

    
   
94
 * <h3>Extract fields from each line</h3>

    
   
95
 *

    
   
96
 * <p>

    
   
97
 * To extract fields from each line,

    
   
98
 * construct a loader function with a schema string.

    
   
99
 * </p>

    
   
100
 *

    
   
101
 * <dl>

    
   
102
 *   <dt><em>Syntax</em></dt>

    
   
103
 *   <dd>org.apache.pig.piggybank.storage.LTSVLoader('&lt;schema-string&gt;')</dd>

    
   
104
 *

    
   
105
 *   <dt><em>Output</em></dt>

    
   
106
 *   <dd>The schema of the output is specified by the argument of the constructor.

    
   
107
 *   The value of a field comes from a column whoes label is equal to the field name.

    
   
108
 *   If the corresponding label does not exist in the LTSV line,

    
   
109
 *   the field is set to null.</dd>

    
   
110
 * </dl>

    
   
111
 *

    
   
112
 * <p>

    
   
113
 * This example script loads an LTSV file shown in the previous section, named access.log.

    
   
114
 * </p>

    
   
115
 *

    
   
116
 * <pre>

    
   
117
 * -- Parses the access log and count the number of lines

    
   
118
 * -- for each pair of the host column and the ua column.

    
   
119
 * access = LOAD 'access.log' USING org.apache.pig.piggybank.storage.LTSVLoader('host:chararray, ua:chararray');

    
   
120
 * grouped_access = GROUP access BY (host, ua);

    
   
121
 * count_for_host_ua = FOREACH grouped_access GENERATE group.host, group.ua, COUNT(access);

    
   
122
 *

    
   
123
 * -- Prints out the result.

    
   
124
 * DUMP count_for_host_ua;

    
   
125
 * </pre>

    
   
126
 *

    
   
127
 * <p>The below text will be printed out.</p>

    
   
128
 *

    
   
129
 * <pre>

    
   
130
 * (host1.example.org,Opera/9.80,2)

    
   
131
 * (pc.example.com,Firefox/5.0,1)

    
   
132
 * </pre>

    
   
133
 *

    
   
134
 * <h3>Extract a map from each line</h3>

    
   
135
 *

    
   
136
 * <p>

    
   
137
 * To extract a map from each line,

    
   
138
 * construct a loader function without parameters.

    
   
139
 * </p>

    
   
140
 *

    
   
141
 * <dl>

    
   
142
 *   <dt><em>Syntax</em></dt>

    
   
143
 *   <dd>org.apache.pig.piggybank.storage.LTSVLoader()</dd>

    
   
144
 *

    
   
145
 *   <dt><em>Output</em></dt>

    
   
146
 *   <dd>The schema of the output is (:map[]), or an unary tuple of a map, for each LTSV row.

    
   
147
 *   The key of a map is a label of the LTSV column.

    
   
148
 *   The value of a map comes from characters after ":" in the LTSV column.</dd>

    
   
149
 * </dl>

    
   
150
 *

    
   
151
 * <p>

    
   
152
 * This example script loads an LTSV file shown in the previous section, named access.log.

    
   
153
 * </p>

    
   
154
 *

    
   
155
 * <pre>

    
   
156
 * -- Parses the access log and projects the user agent field.

    
   
157
 * access = LOAD 'access.log' USING org.apache.pig.piggybank.storage.LTSVLoader() AS (m:map[]);

    
   
158
 * user_agent = FOREACH access GENERATE m#'ua' AS ua;

    
   
159
 * -- Prints out the user agent for each access.

    
   
160
 * DUMP user_agent;

    
   
161
 * </pre>

    
   
162
 *

    
   
163
 * <p>The below text will be printed out.</p>

    
   
164
 *

    
   
165
 * <pre>

    
   
166
 * (Opera/9.80)

    
   
167
 * (Opera/9.80)

    
   
168
 * (Firefox/5.0)

    
   
169
 * </pre>

    
   
170
 *

    
   
171
 * <h3>Handling of malformed input</h3>

    
   
172
 *

    
   
173
 * <p>

    
   
174
 * All valid LTSV columns contain ":" to separate a field and a value.

    
   
175
 * If a column does not contain ":", the column is malformed.

    
   
176
 * </p>

    
   
177
 *

    
   
178
 * <p>

    
   
179
 * For each of malformed column, the counter

    
   
180
 * {@link PigWarning#UDF_WARNING_8} will be counted up.

    
   
181
 * This counter is printed out when the Pig job completes.

    
   
182
 * </p>

    
   
183
 *

    
   
184
 * <p>

    
   
185
 * In the mapreduce mode, the counter shall be output like below.

    
   
186
 * </p>

    
   
187
 *

    
   
188
 * <pre>

    
   
189
 * 2013-02-17 12:14:22,070 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning UDF_WARNING_8 10000 time(s).

    
   
190
 * </pre>

    
   
191
 *

    
   
192
 * <p>

    
   
193
 * For the first 100 of malformed columns in each task,

    
   
194
 * warning messages are written to the tasklog.

    
   
195
 * Such as:

    
   
196
 * </p>

    
   
197
 *

    
   
198
 * <pre>

    
   
199
 * 2013-02-17 12:13:42,142 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata1" does not contain ":".

    
   
200
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata2" does not contain ":".

    
   
201
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata3" does not contain ":".

    
   
202
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata4" does not contain ":".

    
   
203
 * 2013-02-17 12:13:42,158 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata5" does not contain ":".

    
   
204
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata6" does not contain ":".

    
   
205
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata7" does not contain ":".

    
   
206
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata8" does not contain ":".

    
   
207
 * 2013-02-17 12:13:42,159 WARN org.apache.pig.piggybank.storage.LTSVLoader: MalformedColumn: Column "errordata9" does not contain ":".

    
   
208
 * ...

    
   
209
 * </pre>

    
   
210
 */

    
   
211
public class LTSVLoader extends FileInputLoadFunc implements LoadPushDown, LoadMetadata {

    
   
212

   

    
   
213

   

    
   
214
    /** Logger of this class. */

    
   
215
    private static final Log LOG = LogFactory.getLog(LTSVLoader.class);

    
   
216

   

    
   
217

   

    
   
218
    /** Emitter of tuples */

    
   
219
    private final TupleEmitter tupleEmitter;

    
   
220

   

    
   
221

   

    
   
222
    /**

    
   
223
     * Constructs a loader which extracts a tuple including a single map

    
   
224
     * for each column of an LTSV line.

    
   
225
     */

    
   
226
    public LTSVLoader() {

    
   
227
        this.tupleEmitter = new MapTupleEmitter();

    
   
228
    }

    
   
229

   

    
   
230

   

    
   
231
    /**

    
   
232
     * Constructs a loader which extracts a tuple including specified fields

    
   
233
     * for each column of an LTSV line.

    
   
234
     *

    
   
235
     * @param schemaString

    
   
236
     *     Schema of fields to extract.

    
   
237
     *

    
   
238
     * @throws IOException

    
   
239
     *     Thrown when an I/O error occurs during construction.

    
   
240
     */

    
   
241
    public LTSVLoader(String schemaString) throws IOException {

    
   
242
        this.tupleEmitter = new FieldsTupleEmitter(schemaString);

    
   
243
    }

    
   
244

   

    
   
245

   

    
   
246
    /** Length of "\t" in UTF-8. */

    
   
247
    private static int TAB_LENGTH = 1;

    
   
248

   

    
   
249

   

    
   
250
    /**

    
   
251
     * Reads an LTSV line and returns a tuple,

    
   
252
     * or returns {@code null} if the loader reaches the end of the block.

    
   
253
     *

    
   
254
     * @return

    
   
255
     *     Tuple corresponding to an LTSV line,

    
   
256
     *     or {@code null} if the loader reaches the end of the block.

    
   
257
     *

    
   
258
     * @throws IOException

    
   
259
     *     If an I/O error occurs while reading the line.

    
   
260
     */

    
   
261
    @Override

    
   
262
    public Tuple getNext() throws IOException {

    
   
263
        Text line = readLine();

    
   
264

   

    
   
265
        boolean hasLine = (line != null);

    
   
266
        if (! hasLine) {

    
   
267
            return null;

    
   
268
        }

    
   
269

   

    
   
270
        assert hasLine;

    
   
271

   

    
   
272
        // Current line: lineBytes[0, lineLength)

    
   
273
        byte[] lineBytes = line.getBytes();

    
   
274
        int lineLength = line.getLength();

    
   
275

   

    
   
276
        // Reads an map entry from each column.

    
   
277
        int startOfColumn = 0;

    
   
278
        while (startOfColumn <= lineLength) {

    
   
279
            int endOfColumn = findUntil((byte) '\t', lineBytes, startOfColumn, lineLength);

    
   
280
            readColumn(lineBytes, startOfColumn, endOfColumn);

    
   
281
            startOfColumn = endOfColumn + TAB_LENGTH;

    
   
282
        }

    
   
283

   

    
   
284
        return this.tupleEmitter.emitTuple();

    
   
285
    }

    
   
286

   

    
   
287

   

    
   
288
    /** Length of ":" in UTF-8. */

    
   
289
    private static int COLON_LENGTH = 1;

    
   
290

   

    
   
291

   

    
   
292
    /**

    
   
293
     * Reads a column to the tuple emitter.

    
   
294
     */

    
   
295
    private void readColumn(byte[] bytes, int start, int end) throws IOException {

    
   
296
        int colon = findUntil((byte) ':', bytes, start, end);

    
   
297
        boolean isLabeled = colon < end;

    
   
298
        if (! isLabeled) {

    
   
299
            warnMalformedColumn(Text.decode(bytes, start, end - start));

    
   
300
            return;

    
   
301
        }

    
   
302

   

    
   
303
        assert isLabeled;

    
   
304

   

    
   
305
        // Label: bytes[start, colon)

    
   
306
        // Colon: bytes[colon, colon + 1)

    
   
307
        // Value: bytes[colon + 1 = startOfValue, end)

    
   
308

   

    
   
309
        String label = Text.decode(bytes, start, colon - start);

    
   
310
        int startOfValue = colon + COLON_LENGTH;

    
   
311
        this.tupleEmitter.addColumn(label, bytes, startOfValue, end);

    
   
312
    }

    
   
313

   

    
   
314

   

    
   
315
    /**

    
   
316
     * Constructs a tuple from columns and emits it.

    
   
317
     *

    
   
318
     * This interface is used to switch the output type between a map and fields.

    
   
319
     */

    
   
320
    private interface TupleEmitter {

    
   
321

   

    
   
322

   

    
   
323
        /**

    
   
324
         * Adds a value from bytes[startOfValue, endOfValue), corresponding to the label,

    
   
325
         * if the columns is required for the output.

    
   
326
         *

    
   
327
         * @param label

    
   
328
         *     Label of the column.

    
   
329
         *

    
   
330
         * @param bytes

    
   
331
         *     Byte array including the value.

    
   
332
         *

    
   
333
         * @param startOfValue

    
   
334
         *     Index a the start of the value (inclusive).

    
   
335
         *

    
   
336
         * @param endOfValue

    
   
337
         *     Index a the end of the value (exclusive).

    
   
338
         */

    
   
339
        void addColumn(String label, byte[] bytes, int startOfValue, int endOfValue)

    
   
340
            throws IOException;

    
   
341

   

    
   
342

   

    
   
343
        /**

    
   
344
         * Emits a tuple and reinitialize the state of the emitter.

    
   
345
         */

    
   
346
        Tuple emitTuple();

    
   
347

   

    
   
348

   

    
   
349
        /**

    
   
350
         * Returns the schema of tuples.

    
   
351
         */

    
   
352
        ResourceSchema getSchema();

    
   
353

   

    
   
354

   

    
   
355
        /**

    
   
356
         * Notifies required fields in the script.

    
   
357
         *

    
   
358
         * <p>Delegation target from {@link LTSVLoader#pushProjection}.</p>

    
   
359
         */

    
   
360
        RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)

    
   
361
            throws FrontendException;

    
   
362

   

    
   
363

   

    
   
364
    }

    
   
365

   

    
   
366

   

    
   
367
    /** Factory of tuples. */

    
   
368
    private final TupleFactory tupleFactory = TupleFactory.getInstance();

    
   
369

   

    
   
370

   

    
   
371
    /** Schema of the output of the loader, which is (:map[]). */

    
   
372
    private static final ResourceSchema MAP_SCHEMA;

    
   
373

   

    
   
374

   

    
   
375
    static {

    
   
376
        Schema.FieldSchema fieldSchema = new Schema.FieldSchema(null, DataType.MAP);

    
   
377
        Schema schema = new Schema(fieldSchema);

    
   
378
        MAP_SCHEMA = new ResourceSchema(schema);

    
   
379
    }

    
   
380

   

    
   
381

   

    
   
382
    /**

    
   
383
     * Reads columns and emits a tuple with a single map field (:map[]).

    
   
384
     */

    
   
385
    private class MapTupleEmitter implements TupleEmitter {

    
   
386

   

    
   
387

   

    
   
388
        /** Contents of the single map field. */

    
   
389
        private Map<Object, Object> map = new HashMap<Object, Object>();

    
   
390

   

    
   
391

   

    
   
392
        @Override

    
   
393
        public void addColumn(String label, byte[] bytes, int startOfValue, int endOfValue)

    
   
394
        throws IOException {

    
   
395
            if (shouldOutput(label)) {

    
   
396
                DataByteArray value = new DataByteArray(bytes, startOfValue, endOfValue);

    
   
397
                this.map.put(label, value);

    
   
398
            }

    
   
399
        }

    
   
400

   

    
   
401

   

    
   
402
        @Override

    
   
403
        public Tuple emitTuple() {

    
   
404
            Tuple tuple = tupleFactory.newTuple(map);

    
   
405
            this.map = new HashMap<Object, Object>();

    
   
406
            return tuple;

    
   
407
        }

    
   
408

   

    
   
409

   

    
   
410
        /**

    
   
411
         * Returns {@code true} if the column should be output.

    
   
412
         */

    
   
413
        private boolean shouldOutput(String label) {

    
   
414
            boolean outputsEveryColumn = (labelsToOutput() == null);

    
   
415
            return outputsEveryColumn || labelsToOutput().contains(label);

    
   
416
        }

    
   
417

   

    
   
418

   

    
   
419
        /** True if {@link labelsToOutput} is initialized. */

    
   
420
        private boolean isProjectionInitialized;

    
   
421

   

    
   
422

   

    
   
423
        /**

    
   
424
         * Labels of columns to output, or {@code null} if all columns should be output.

    
   
425
         * This field should be accessed from {@link #labelsToOutput}.

    
   
426
         */

    
   
427
        private Set<String> labelsToOutput;

    
   
428

   

    
   
429

   

    
   
430
        /**

    
   
431
         * Returns labels of columns to output,

    
   
432
         * or {@code null} if all columns should be output.

    
   
433
         */

    
   
434
        private Set<String> labelsToOutput() {

    
   
435
            if (! this.isProjectionInitialized) {

    
   
436
                @SuppressWarnings("unchecked")

    
   
437
                Set<String> labels = (Set<String>) getProperties().get(LABELS_TO_OUTPUT);

    
   
438
                this.labelsToOutput = labels;

    
   
439
                LOG.info("Labels to output: " + this.labelsToOutput);

    
   
440
                this.isProjectionInitialized = true;

    
   
441
            }

    
   
442
            return this.labelsToOutput;

    
   
443
        }

    
   
444

   

    
   
445

   

    
   
446
        @Override

    
   
447
        public ResourceSchema getSchema() {

    
   
448
            return MAP_SCHEMA;

    
   
449
        }

    
   
450

   

    
   
451

   

    
   
452
        @Override

    
   
453
        public RequiredFieldResponse

    
   
454
        pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {

    
   
455
            List<RequiredField> fields = requiredFieldList.getFields();

    
   
456
            boolean anyRequirement = (fields != null);

    
   
457
            if (! anyRequirement) {

    
   
458
                LOG.info("All the fields are required.");

    
   
459
                return new RequiredFieldResponse(false);

    
   
460
            }

    
   
461

   

    
   
462
            assert anyRequirement;

    
   
463
            boolean onlyOneField = (fields.size() == 1);

    
   
464
            if (! onlyOneField) {

    
   
465
                if (fields.isEmpty()) {

    
   
466
                    LOG.info("No fields specified as required.");

    
   
467
                    return new RequiredFieldResponse(false);

    
   
468
                }

    
   
469
                String message = String.format(

    
   
470
                        "The loader expects at most one field but %d fields are specified."

    
   
471
                        , fields.size());

    
   
472
                throw new FrontendException(message, INTERNAL_ERROR_CODE, PigException.BUG);

    
   
473
            }

    
   
474

   

    
   
475
            assert onlyOneField;

    
   
476
            RequiredField field = fields.get(0);

    
   
477
            boolean onlyFirstField = (field.getIndex() == 0);

    
   
478
            if (! onlyFirstField) {

    
   
479
                String message = String.format(

    
   
480
                        "The loader produces only 1ary tuples, but the index %d is specified."

    
   
481
                        , field.getIndex());

    
   
482
                throw new FrontendException(message, INTERNAL_ERROR_CODE, PigException.BUG);

    
   
483
            }

    
   
484

   

    
   
485
            assert onlyFirstField;

    
   
486
            List<RequiredField> mapKeys = field.getSubFields();

    
   
487
            boolean onlyMapField = (mapKeys != null);

    
   
488
            if (! onlyMapField) {

    
   
489
                LOG.info("All the labels are required.");

    
   
490
                return new RequiredFieldResponse(false);

    
   
491
            }

    
   
492

   

    
   
493
            assert onlyMapField;

    
   
494
            Set<String> labels = new HashSet<String>();

    
   
495
            for (RequiredField mapKey : mapKeys) {

    
   
496
                labels.add(mapKey.getAlias());

    
   
497
            }

    
   
498
            getProperties().put(LABELS_TO_OUTPUT, labels);

    
   
499
            LOG.info("Labels to output: " + labels);

    
   
500
            return new RequiredFieldResponse(true);

    
   
501
        }

    
   
502

   

    
   
503

   

    
   
504
    }

    
   
505

   

    
   
506

   

    
   
507
    /**

    
   
508
     * Reads columns and emits a tuple with fields specified

    
   
509
     * by the constructor of the load function.

    
   
510
     */

    
   
511
    private class FieldsTupleEmitter implements TupleEmitter {

    
   
512

   

    
   
513

   

    
   
514
        /** Schema of tuples. */

    
   
515
        private final ResourceSchema schema;

    
   
516

   

    
   
517

   

    
   
518
        /** Tuple to emit. */

    
   
519
        private Tuple tuple;

    
   
520

   

    
   
521

   

    
   
522
        /** Caster of values. */

    
   
523
        private final LoadCaster loadCaster = getLoadCaster();

    
   
524

   

    
   
525

   

    
   
526
        /** Mapping from labels to indexes in a tuple. */

    
   
527
        private final Map<String, Integer> labelToIndex = new HashMap<String, Integer>();

    
   
528

   

    
   
529

   

    
   
530
        /**

    
   
531
         * Constructs an emitter with the schema.

    
   
532
         */

    
   
533
        private FieldsTupleEmitter(String schemaString) throws IOException {

    
   
534
            Schema rawSchema = Utils.getSchemaFromString(schemaString);

    
   
535
            this.schema = new ResourceSchema(rawSchema);

    
   
536
            for (int index = 0; index < schema.getFields().length; ++ index) {

    
   
537
                this.labelToIndex.put(this.schema.getFields()[index].getName(), index);

    
   
538
            }

    
   
539
            this.tuple = tupleFactory.newTuple(this.schema.getFields().length);

    
   
540
        }

    
   
541

   

    
   
542

   

    
   
543
        @Override

    
   
544
        public void addColumn(String label, byte[] bytes, int startOfValue, int endOfValue)

    
   
545
        throws IOException {

    
   
546
            if (! this.labelToIndex.containsKey(label)) {

    
   
547
                return;

    
   
548
            }

    
   
549

   

    
   
550
            int index = this.labelToIndex.get(label);

    
   
551
            ResourceSchema.ResourceFieldSchema fieldSchema = this.schema.getFields()[index];

    
   
552
            int valueLength = endOfValue - startOfValue;

    
   
553
            byte[] valueBytes = new byte[valueLength];

    
   
554
            System.arraycopy(bytes, startOfValue, valueBytes, 0, valueLength);

    
   
555
            Object value = CastUtils.convertToType(this.loadCaster, valueBytes, fieldSchema, fieldSchema.getType());

    
   
556
            this.tuple.set(index, value);

    
   
557
        }

    
   
558

   

    
   
559

   

    
   
560
        @Override

    
   
561
        public Tuple emitTuple() {

    
   
562
            Tuple resultTuple = this.tuple;

    
   
563
            this.tuple = tupleFactory.newTuple(this.schema.getFields().length);

    
   
564
            return resultTuple;

    
   
565
        }

    
   
566

   

    
   
567

   

    
   
568
        @Override

    
   
569
        public ResourceSchema getSchema() {

    
   
570
            return schema;

    
   
571
        }

    
   
572

   

    
   
573

   

    
   
574
        @Override

    
   
575
        public RequiredFieldResponse

    
   
576
        pushProjection(RequiredFieldList requiredFieldList) {

    
   
577
            return new RequiredFieldResponse(false);

    
   
578
        }

    
   
579

   

    
   
580

   

    
   
581
    }

    
   
582

   

    
   
583

   

    
   
584
    /**

    
   
585
     * Returns the index of the first target in bytes[start, end),

    
   
586
     * or the index of the end.

    
   
587
     */

    
   
588
    private static int findUntil(byte target, byte[] bytes, int start, int end) {

    
   
589
        for (int index = start; index < end; ++ index) {

    
   
590
            if (bytes[index] == target) {

    
   
591
                return index;

    
   
592
            }

    
   
593
        }

    
   
594
        return end;

    
   
595
    }

    
   
596

   

    
   
597

   

    
   
598
    /** Sequence number of the next log message, which starts from 0. */

    
   
599
    private int warnLogSeqNum = 0;

    
   
600

   

    
   
601

   

    
   
602
    /** Max count of warning logs which will be output to the task log. */

    
   
603
    private static final int MAX_WARN_LOG_COUNT = 100;

    
   
604

   

    
   
605

   

    
   
606
    /** Outputs a warning for a malformed column. */

    
   
607
    private void warnMalformedColumn(String column) {

    
   
608
        String message = String.format("MalformedColumn: Column \"%s\" does not contain \":\".", column);

    
   
609
        warn(message, PigWarning.UDF_WARNING_8);

    
   
610

   

    
   
611
        // Output at most MAX_WARN_LOG_COUNT warning messages.

    
   
612
        if (this.warnLogSeqNum < MAX_WARN_LOG_COUNT) {

    
   
613
            LOG.warn(message);

    
   
614
            ++ this.warnLogSeqNum;

    
   
615
        }

    
   
616
    }

    
   
617

   

    
   
618

   

    
   
619
    /**

    
   
620
     * An error code of an input error.

    
   
621
     * See https://cwiki.apache.org/confluence/display/PIG/PigErrorHandlingFunctionalSpecification.

    
   
622
     */

    
   
623
    private static final int INPUT_ERROR_CODE = 6018;

    
   
624

   

    
   
625

   

    
   
626
    /**

    
   
627
     * An error message of an input error.

    
   
628
     * See https://cwiki.apache.org/confluence/display/PIG/PigErrorHandlingFunctionalSpecification.

    
   
629
     */

    
   
630
    private static final String INPUT_ERROR_MESSAGE = "Error while reading input";

    
   
631

   

    
   
632

   

    
   
633
    /** Underlying record reader of a text file. */

    
   
634
    @SuppressWarnings("rawtypes")

    
   
635
    private RecordReader reader = null;    

    
   
636

   

    
   
637

   

    
   
638
    /**

    
   
639
     * Reads a line from the block,

    
   
640
     * or {@code null} if the loader reaches the end of the block.

    
   
641
     */

    
   
642
    private Text readLine() throws IOException {

    
   
643
        try {

    
   
644
            boolean hasLine = this.reader.nextKeyValue();

    
   
645
            if (! hasLine) {

    
   
646
                return null;

    
   
647
            }

    
   
648

   

    
   
649
            assert hasLine;

    
   
650
            return (Text) this.reader.getCurrentValue();

    
   
651
        } catch (InterruptedException exception) {

    
   
652
            throw new ExecException(INPUT_ERROR_MESSAGE, INPUT_ERROR_CODE, 

    
   
653
                    PigException.REMOTE_ENVIRONMENT, exception);

    
   
654
        }

    
   
655
    }

    
   
656

   

    
   
657

   

    
   
658
    /**

    
   
659
     * Saves the RecordReader.

    
   
660
     *

    
   
661
     * @param reader

    
   
662
     *     RecordReader to read LTSV lines from.

    
   
663
     *

    
   
664
     * @param split

    
   
665
     *     Ignored.

    
   
666
     *

    
   
667
     */

    
   
668
    @Override

    
   
669
    public void

    
   
670
    prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split) {

    
   
671
        this.reader = reader;

    
   
672
    }

    
   
673

   

    
   
674

   

    
   
675
    /** Key of a property which contains Set[String] of labels to output. */

    
   
676
    private static final String LABELS_TO_OUTPUT = "LABELS_TO_OUTPUT";

    
   
677

   

    
   
678

   

    
   
679
    /**

    
   
680
     * An error code of an internal error.

    
   
681
     * See https://cwiki.apache.org/confluence/display/PIG/PigErrorHandlingFunctionalSpecification.

    
   
682
     */

    
   
683
    private static final int INTERNAL_ERROR_CODE = 2998;

    
   
684

   

    
   
685

   

    
   
686
    /**

    
   
687
     * Extracts information about which labels are required in the script.

    
   
688
     *

    
   
689
     * @param requiredFieldList

    
   
690
     *     {@inheritDoc}.

    
   
691
     *

    
   
692
     * @return

    
   
693
     *     {@code new RequiredFieldResponse(true)} if projection will be performed,

    
   
694
     *     or {@code new RequiredFieldResponse(false)} otherwise.

    
   
695
     *

    
   
696
     * @throws FrontendException

    
   
697
     *     When an unexpected internal error occurs.

    
   
698
     */

    
   
699
    @Override

    
   
700
    public RequiredFieldResponse

    
   
701
    pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {

    
   
702
        return this.tupleEmitter.pushProjection(requiredFieldList);

    
   
703
    }

    
   
704

   

    
   
705

   

    
   
706
    /**

    
   
707
     * <p>This UDF supports

    
   
708
     * {@linkplain org.apache.pig.LoadPushDown.OperatorSet#PROJECTION projection push-down}.</p>

    
   
709
     *

    
   
710
     * @return

    
   
711
     *     Singleton list of

    
   
712
     *     {@link org.apache.pig.LoadPushDown.OperatorSet#PROJECTION}.

    
   
713
     */

    
   
714
    @Override

    
   
715
    public List<OperatorSet> getFeatures() {

    
   
716
        return Collections.singletonList(OperatorSet.PROJECTION);

    
   
717
    }

    
   
718

   

    
   
719

   

    
   
720
    /** Location of input files. */

    
   
721
    private String loadLocation;

    
   
722

   

    
   
723

   

    
   
724
    /**

    
   
725
     * Configures the underlying input format

    
   
726
     * to read lines from {@code location}.

    
   
727
     *

    
   
728
     * @param location

    
   
729
     *     Location of LTSV file(s).

    
   
730
     *

    
   
731
     * @param job

    
   
732
     *     Job.

    
   
733
     *

    
   
734
     * @throws IOException

    
   
735
     *     If an I/O error occurs while configuring the job.

    
   
736
     */

    
   
737
    @Override

    
   
738
    public void setLocation(String location, Job job) throws IOException {

    
   
739
        this.loadLocation = location;

    
   
740
        FileInputFormat.setInputPaths(job, location);        

    
   
741
    }

    
   
742

   

    
   
743

   

    
   
744
    /**

    
   
745
     * Makes an instance of an input format from which LTSV lines are read.

    
   
746
     *

    
   
747
     * @return

    
   
748
     *     Instance of {@link PigTextInputFormat}.

    
   
749
     */

    
   
750
    @Override

    
   
751
    public InputFormat getInputFormat() {

    
   
752
        if (loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {

    
   
753
            // Bzip2TextInputFormat can split bzipped files.

    
   
754
            LOG.info("Uses Bzip2TextInputFormat");

    
   
755
            return new Bzip2TextInputFormat();

    
   
756
        } else {

    
   
757
            LOG.info("Uses PigTextInputFormat");

    
   
758
            return new PigTextInputFormat();

    
   
759
        }

    
   
760
    }

    
   
761

   

    
   
762

   

    
   
763
    /** Signature of the UDF invocation, used to get UDFContext. */

    
   
764
    private String signature;

    
   
765

   

    
   
766

   

    
   
767
    /**

    
   
768
     * Saves the signature of the current invocation of the UDF.

    
   
769
     *

    
   
770
     * @param signature

    
   
771
     *     Signature of the current invocation of the UDF.

    
   
772
     */

    
   
773
    @Override

    
   
774
    public void setUDFContextSignature(String signature) {

    
   
775
        this.signature = signature;

    
   
776
    }

    
   
777

   

    
   
778

   

    
   
779
    /**

    
   
780
     * Returns the properties related with the current invocation of the UDF.

    
   
781
     */

    
   
782
    private Properties getProperties() {

    
   
783
        return UDFContext.getUDFContext().getUDFProperties(

    
   
784
                getClass(), new String[] { this.signature });

    
   
785
    }

    
   
786

   

    
   
787

   

    
   
788
    // Methods for LoadMetadata

    
   
789

   

    
   
790

   

    
   
791
    /**

    
   
792
     * Does nothing,

    
   
793
     * because the loader does not know about partitioning.

    
   
794
     *

    
   
795
     * @param partitionFilter

    
   
796
     *     Ignored.

    
   
797
     */

    
   
798
    @Override

    
   
799
    public void setPartitionFilter(Expression partitionFilter) {

    
   
800
    }

    
   
801

   

    
   
802

   

    
   
803
    /**

    
   
804
     * Returns {@code null},

    
   
805
     * because the loader does not know about partitioning.

    
   
806
     *

    
   
807
     * @param location

    
   
808
     *     Ignored.

    
   
809
     *

    
   
810
     * @param job

    
   
811
     *     Ignored.

    
   
812
     *

    
   
813
     * @return

    
   
814
     *     null.

    
   
815
     */

    
   
816
    @Override

    
   
817
    public String[] getPartitionKeys(String location, Job job) {

    
   
818
        return null;

    
   
819
    }

    
   
820

   

    
   
821

   

    
   
822
    /**

    
   
823
     * Returns {@code null}, because no statistics are available.

    
   
824
     *

    
   
825
     * @param location

    
   
826
     *     Ignored.

    
   
827
     *

    
   
828
     * @param job

    
   
829
     *     Ignored.

    
   
830
     *

    
   
831
     * @return

    
   
832
     *     null.

    
   
833
     */

    
   
834
    @Override

    
   
835
    public ResourceStatistics getStatistics(String location, Job job) {

    
   
836
        return null;

    
   
837
    }

    
   
838

   

    
   
839

   

    
   
840
    /**

    
   
841
     * Returns the schema of the output of the loader.

    
   
842
     *

    
   
843
     * @param location

    
   
844
     *     Ignored, because the schema is constant.

    
   
845
     *

    
   
846
     * @param job

    
   
847
     *     Ignored, because the schema is constant.

    
   
848
     *

    
   
849
     * @return

    
   
850
     *     Schema of the output of the loader.

    
   
851
     */

    
   
852
    @Override

    
   
853
    public ResourceSchema getSchema(String location, Job job) {

    
   
854
        return this.tupleEmitter.getSchema();

    
   
855
    }

    
   
856

   

    
   
857

   

    
   
858
}

    
   
859

   

    
   
860
// vim: et sw=4 sts=4
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLTSVLoader.java
New File
 
  1. contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/LTSVLoader.java: Loading...
  2. contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLTSVLoader.java: Loading...