Review Board 1.7.22


SchemaTuple in Pig

Review Request #4651 - Created April 5, 2012 and updated

Jonathan Coveney
PIG-2632
Reviewers
pig
julien
pig
This work builds on Dmitriy's PrimitiveTuple work. The idea is that, knowing the Schema on the frontend, we can code generate Tuples which can be used for fun and profit. In rudimentary tests, the memory efficiency is 2-4x better, and it's ~15% smaller serialized (heavily heavily depends on the data, though). Need to do get/set tests, but assuming that it's on par (or even faster) than Tuple, the memory gain is huge.

Need to clean up the code and add tests.

Right now, it generates a SchemaTuple for every inputSchema and outputSchema given to UDF's. The next step is to make a SchemaBag, where I think the serialization savings will be really huge.

Needs tests and comments, but I want the code to settle a bit.

 
trunk/src/org/apache/pig/data/AppendableSchemaTuple.java
New File

    
   
1
/*

    
   
2
 * Licensed to the Apache Software Foundation (ASF) under one

    
   
3
 * or more contributor license agreements.  See the NOTICE file

    
   
4
 * distributed with this work for additional information

    
   
5
 * regarding copyright ownership.  The ASF licenses this file

    
   
6
 * to you under the Apache License, Version 2.0 (the

    
   
7
 * "License"); you may not use this file except in compliance

    
   
8
 * with the License.  You may obtain a copy of the License at

    
   
9
 *

    
   
10
 *     http://www.apache.org/licenses/LICENSE-2.0

    
   
11
 *

    
   
12
 * Unless required by applicable law or agreed to in writing, software

    
   
13
 * distributed under the License is distributed on an "AS IS" BASIS,

    
   
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    
   
15
 * See the License for the specific language governing permissions and

    
   
16
 * limitations under the License.

    
   
17
 */

    
   
18
package org.apache.pig.data;

    
   
19

   

    
   
20
import java.io.DataInput;

    
   
21
import java.io.DataOutput;

    
   
22
import java.io.IOException;

    
   
23
import java.util.Iterator;

    
   
24
import java.util.List;

    
   
25

   

    
   
26
import org.apache.pig.backend.executionengine.ExecException;

    
   
27
import org.apache.pig.classification.InterfaceAudience;

    
   
28
import org.apache.pig.classification.InterfaceStability;

    
   
29
import org.apache.pig.data.utils.SedesHelper;

    
   
30

   

    
   
31
@InterfaceAudience.Public

    
   
32
@InterfaceStability.Unstable

    
   
33
public abstract class AppendableSchemaTuple<T extends AppendableSchemaTuple<T>> extends SchemaTuple<T> {

    
   
34
    private static final long serialVersionUID = 1L;

    
   
35

   

    
   
36
    private Tuple appendedFields;

    
   
37

   

    
   
38
    private static final TupleFactory mTupleFactory = TupleFactory.getInstance();

    
   
39

   

    
   
40
    @Override

    
   
41
    public void append(Object val) {

    
   
42
        if (appendedFields == null) {

    
   
43
            appendedFields = mTupleFactory.newTuple();

    
   
44
        }

    
   
45

   

    
   
46
        appendedFields.append(val);

    
   
47
    }

    
   
48

   

    
   
49
    protected int appendedFieldsSize() {

    
   
50
        return appendedFields == null ? 0 : appendedFields.size();

    
   
51
    }

    
   
52

   

    
   
53
    protected boolean isAppendedFieldsNull() {

    
   
54
        return appendedFieldsSize() == 0;

    
   
55
    }

    
   
56

   

    
   
57
    protected Object getAppendedField(int i) throws ExecException {

    
   
58
        return isAppendedFieldNull(i) ? null : appendedFields.get(i);

    
   
59
    }

    
   
60

   

    
   
61
    private boolean isAppendedFieldNull(int i) throws ExecException {

    
   
62
        return isAppendedFieldsNull() || appendedFields.isNull(i);

    
   
63
    }

    
   
64

   

    
   
65
    //protected Tuple getAppend() {

    
   
66
    public Tuple getAppendedFields() {

    
   
67
        return appendedFields;

    
   
68
    }

    
   
69

   

    
   
70
    protected void setAppendedFields(Tuple t) {

    
   
71
        appendedFields = t;

    
   
72
    }

    
   
73

   

    
   
74
    private void resetAppendedFields() {

    
   
75
        appendedFields = null;

    
   
76
    }

    
   
77

   

    
   
78
    private void setAppendedField(int fieldNum, Object val) throws ExecException {

    
   
79
        appendedFields.set(fieldNum, val);

    
   
80
    }

    
   
81

   

    
   
82
    /**

    
   
83
     * This adds the additional overhead of the append Tuple

    
   
84
     */

    
   
85
    @Override

    
   
86
    public long getMemorySize() {

    
   
87
        return SizeUtil.roundToEight(appendedFields.getMemorySize()) + super.getMemorySize();

    
   
88
    }

    
   
89

   

    
   
90

   

    
   
91
    private byte getAppendedFieldType(int i) throws ExecException {

    
   
92
        return appendedFields == null ? DataType.UNKNOWN : appendedFields.getType(i);

    
   
93
    }

    
   
94

   

    
   
95
    protected SchemaTuple<T> set(SchemaTuple<?> t, boolean checkType) throws ExecException {

    
   
96
        resetAppendedFields();

    
   
97
        for (int j = schemaSize(); j < t.size(); j++) {

    
   
98
            append(t.get(j));

    
   
99
        }

    
   
100
        return super.set(t, checkType);

    
   
101
    }

    
   
102

   

    
   
103
    protected SchemaTuple<T> setSpecific(T t) {

    
   
104
        resetAppendedFields();

    
   
105
        setAppendedFields(t.getAppendedFields());

    
   
106
        return super.setSpecific(t);

    
   
107
    }

    
   
108

   

    
   
109
    public SchemaTuple<T> set(List<Object> l) throws ExecException {

    
   
110
        int listSize = l.size();

    
   
111
        int schemaSize = schemaSize();

    
   
112

   

    
   
113
        if (listSize < schemaSize) {

    
   
114
            throw new ExecException("Given list of objects has too few fields ("+l.size()+" vs "+schemaSize()+")");

    
   
115
        }

    
   
116

   

    
   
117
        Iterator<Object> it = l.iterator();

    
   
118

   

    
   
119
        generatedCodeSetIterator(it);

    
   
120

   

    
   
121
        resetAppendedFields();

    
   
122

   

    
   
123
        while (it.hasNext()) {

    
   
124
            append(it.next());

    
   
125
        }

    
   
126

   

    
   
127
        return this;

    
   
128
    }

    
   
129

   

    
   
130
    protected int compareTo(SchemaTuple<?> t, boolean checkType) {

    
   
131
        if (checkType && getClass() == t.getClass()) {

    
   
132
            return compareToSpecific((T)t);

    
   
133
        }

    
   
134
        int i = super.compareTo(t, false);

    
   
135
        if (i != 0) {

    
   
136
            return i;

    
   
137
        }

    
   
138
        if (appendedFieldsSize() > 0) {

    
   
139
            int m = schemaSize();

    
   
140
            for (int k = 0; k < size() - schemaSize(); k++) {

    
   
141
                try {

    
   
142
                    i = DataType.compare(getAppendedField(k), t.get(m++));

    
   
143
                } catch (ExecException e) {

    
   
144
                    throw new RuntimeException("Unable to get append value", e);

    
   
145
                }

    
   
146
                if (i != 0) {

    
   
147
                    return i;

    
   
148
                }

    
   
149
            }

    
   
150
        }

    
   
151
        return 0;

    
   
152
    }

    
   
153

   

    
   
154
    protected int compareToSpecific(T t) {

    
   
155
        int i = compareSize(t);

    
   
156
        if (i != 0) {

    
   
157
            return i;

    
   
158
        }

    
   
159
        i = super.compareToSpecific(t);

    
   
160
        if (i != 0) {

    
   
161
            return i;

    
   
162
        }

    
   
163
        for (int z = 0; z < appendedFieldsSize(); z++) {

    
   
164
            try {

    
   
165
                i = DataType.compare(getAppendedField(z), t.getAppendedField(z));

    
   
166
            } catch (ExecException e) {

    
   
167
                throw new RuntimeException("Unable to get append", e);

    
   
168
            }

    
   
169
            if (i != 0) {

    
   
170
                return i;

    
   
171
            }

    
   
172
        }

    
   
173
        return 0;

    
   
174
    }

    
   
175

   

    
   
176
    public int hashCode() {

    
   
177
        return super.hashCode() + appendedFields.hashCode();

    
   
178
    }

    
   
179

   

    
   
180
    public void set(int fieldNum, Object val) throws ExecException {

    
   
181
        int diff = fieldNum - schemaSize();

    
   
182
        if (diff >= 0 && diff < appendedFieldsSize()) {

    
   
183
            setAppendedField(diff, val);

    
   
184
        } else {

    
   
185
            super.set(fieldNum, val);

    
   
186
        }

    
   
187
    }

    
   
188

   

    
   
189
    @Override

    
   
190
    public Object get(int fieldNum) throws ExecException {

    
   
191
        int diff = fieldNum - schemaSize();

    
   
192
        if (diff >= 0 && diff < appendedFieldsSize()) {

    
   
193
            return getAppendedField(diff);

    
   
194
        } else {

    
   
195
            return super.get(fieldNum);

    
   
196
        }

    
   
197
    }

    
   
198

   

    
   
199
    @Override

    
   
200
    public boolean isNull(int fieldNum) throws ExecException {

    
   
201
        int diff = fieldNum - schemaSize();

    
   
202
        if (diff >= 0 && diff < appendedFieldsSize()) {

    
   
203
            return isAppendedFieldNull(diff);

    
   
204
        } else {

    
   
205
            return super.isNull(fieldNum);

    
   
206
        }

    
   
207
    }

    
   
208

   

    
   
209
    @Override

    
   
210
    public byte getType(int fieldNum) throws ExecException {

    
   
211
        int diff = fieldNum - schemaSize();

    
   
212
        if (diff >= 0 && diff < appendedFieldsSize()) {

    
   
213
            return getAppendedFieldType(diff);

    
   
214
        } else {

    
   
215
            return super.getType(fieldNum);

    
   
216
        }

    
   
217
    }

    
   
218

   

    
   
219
    @Override

    
   
220
    protected void setTypeAwareBase(int fieldNum, Object val, String type) throws ExecException {

    
   
221
        int diff = fieldNum - schemaSize();

    
   
222
        if (diff >= 0 && diff < appendedFieldsSize()) {

    
   
223
            setAppendedField(diff, val);

    
   
224
        } else {

    
   
225
            super.setTypeAwareBase(fieldNum, val, type);

    
   
226
        }

    
   
227
    }

    
   
228

   

    
   
229
    @Override

    
   
230
    protected Object getTypeAwareBase(int fieldNum, String type) throws ExecException {

    
   
231
        int diff = fieldNum - schemaSize();

    
   
232
        if (diff >= 0 && diff < appendedFieldsSize()) {

    
   
233
            return getAppendedField(diff);

    
   
234
        } else {

    
   
235
            return super.getTypeAwareBase(fieldNum, type);

    
   
236
        }

    
   
237
    }

    
   
238

   

    
   
239
    protected void writeElements(DataOutput out) throws IOException {

    
   
240
        boolean[] b = generatedCodeNullsArray();

    
   
241
        SedesHelper.writeBooleanArray(out, b, isAppendedFieldsNull());

    
   
242
        generatedCodeWriteElements(out);

    
   
243
        if (!isAppendedFieldsNull()) {

    
   
244
            SedesHelper.writeGenericTuple(out, getAppendedFields());

    
   
245
        }

    
   
246
    }

    
   
247

   

    
   
248
    public int size() {

    
   
249
        return super.size() + appendedFieldsSize();

    
   
250
    }

    
   
251

   

    
   
252
    @Override

    
   
253
    public void readFields(DataInput in) throws IOException {

    
   
254
        int len = schemaSize() + 1;

    
   
255
        boolean[] b = SedesHelper.readBooleanArray(in, len);

    
   
256
        generatedCodeReadFields(in, b);

    
   
257
        if (!b[len - 1]) {

    
   
258
            setAppendedFields(SedesHelper.readGenericTuple(in, in.readByte()));

    
   
259
        }

    
   
260
    }

    
   
261
}
trunk/src/org/apache/pig/data/FieldIsNullException.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PBooleanTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PDoubleTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PFloatTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PIntTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PLongTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PStringTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PrimitiveFieldTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/PrimitiveTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/SchemaTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/SchemaTupleBackend.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/SchemaTupleFactory.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/utils/BytesHelper.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/utils/MethodHelper.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/utils/SedesHelper.java
Diff Revision 7 Diff Revision 8
 
trunk/src/org/apache/pig/data/utils/StructuresHelper.java
Diff Revision 7 Diff Revision 8
 
trunk/test/org/apache/pig/data/TestSchemaTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/test/org/apache/pig/data/utils/TestMethodHelper.java
Diff Revision 7 Diff Revision 8
 
trunk/test/org/apache/pig/test/TestPrimitiveFieldTuple.java
Diff Revision 7 Diff Revision 8
 
trunk/test/org/apache/pig/test/TestPrimitiveTuple.java
Diff Revision 7 Diff Revision 8
 
  1. trunk/src/org/apache/pig/data/AppendableSchemaTuple.java: Loading...
  2. trunk/src/org/apache/pig/data/FieldIsNullException.java: Loading...
  3. trunk/src/org/apache/pig/data/PBooleanTuple.java: Loading...
  4. trunk/src/org/apache/pig/data/PDoubleTuple.java: Loading...
  5. trunk/src/org/apache/pig/data/PFloatTuple.java: Loading...
  6. trunk/src/org/apache/pig/data/PIntTuple.java: Loading...
  7. trunk/src/org/apache/pig/data/PLongTuple.java: Loading...
  8. trunk/src/org/apache/pig/data/PStringTuple.java: Loading...
  9. trunk/src/org/apache/pig/data/PrimitiveFieldTuple.java: Loading...
  10. trunk/src/org/apache/pig/data/PrimitiveTuple.java: Loading...
  11. trunk/src/org/apache/pig/data/SchemaTuple.java: Loading...
  12. trunk/src/org/apache/pig/data/SchemaTupleBackend.java: Loading...
  13. trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java: Loading...
  14. trunk/src/org/apache/pig/data/SchemaTupleFactory.java: Loading...
  15. trunk/src/org/apache/pig/data/SchemaTupleFrontend.java: Loading...
  16. trunk/src/org/apache/pig/data/utils/BytesHelper.java: Loading...
  17. trunk/src/org/apache/pig/data/utils/MethodHelper.java: Loading...
  18. trunk/src/org/apache/pig/data/utils/SedesHelper.java: Loading...
  19. trunk/src/org/apache/pig/data/utils/StructuresHelper.java: Loading...
  20. trunk/test/org/apache/pig/data/TestSchemaTuple.java: Loading...
  21. trunk/test/org/apache/pig/data/utils/TestMethodHelper.java: Loading...
  22. trunk/test/org/apache/pig/test/TestPrimitiveFieldTuple.java: Loading...
  23. trunk/test/org/apache/pig/test/TestPrimitiveTuple.java: Loading...