Review Board 1.7.22


SchemaTuple in Pig

Review Request #4651 - Created April 5, 2012 and updated

Jonathan Coveney
PIG-2632
Reviewers
pig
julien
pig
This work builds on Dmitriy's PrimitiveTuple work. The idea is that, knowing the Schema on the frontend, we can code generate Tuples which can be used for fun and profit. In rudimentary tests, the memory efficiency is 2-4x better, and it's ~15% smaller serialized (heavily heavily depends on the data, though). Need to do get/set tests, but assuming that it's on par (or even faster) than Tuple, the memory gain is huge.

Need to clean up the code and add tests.

Right now, it generates a SchemaTuple for every inputSchema and outputSchema given to UDF's. The next step is to make a SchemaBag, where I think the serialization savings will be really huge.

Needs tests and comments, but I want the code to settle a bit.

 
trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Revision 1309628 New Change
[20] 38 lines
[+20]
39
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
39
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
40
import org.apache.pig.builtin.MonitoredUDF;
40
import org.apache.pig.builtin.MonitoredUDF;
41
import org.apache.pig.data.DataBag;
41
import org.apache.pig.data.DataBag;
42
import org.apache.pig.data.DataByteArray;
42
import org.apache.pig.data.DataByteArray;
43
import org.apache.pig.data.DataType;
43
import org.apache.pig.data.DataType;

    
   
44
import org.apache.pig.data.SchemaTuple;
44
import org.apache.pig.data.Tuple;
45
import org.apache.pig.data.Tuple;
45
import org.apache.pig.data.TupleFactory;
46
import org.apache.pig.data.TupleFactory;

    
   
47
import org.apache.pig.data.TypeAwareTuple;
46
import org.apache.pig.impl.PigContext;
48
import org.apache.pig.impl.PigContext;
47
import org.apache.pig.impl.logicalLayer.schema.Schema;
49
import org.apache.pig.impl.logicalLayer.schema.Schema;
48
import org.apache.pig.impl.plan.NodeIdGenerator;
50
import org.apache.pig.impl.plan.NodeIdGenerator;
49
import org.apache.pig.impl.plan.OperatorKey;
51
import org.apache.pig.impl.plan.OperatorKey;
50
import org.apache.pig.impl.plan.VisitorException;
52
import org.apache.pig.impl.plan.VisitorException;
[+20] [20] 73 lines
[+20] [+] private void instantiateFunc(FuncSpec fSpec) {
124
        //making the initializations here basically useless. Look at the processInput
126
        //making the initializations here basically useless. Look at the processInput
125
        //method where these variables are re-initialized. At that point, the PhysicalOperator
127
        //method where these variables are re-initialized. At that point, the PhysicalOperator
126
        //is set up correctly with the reporter and pigLogger references
128
        //is set up correctly with the reporter and pigLogger references
127
        this.func.setReporter(reporter);
129
        this.func.setReporter(reporter);
128
        this.func.setPigLogger(pigLogger);
130
        this.func.setPigLogger(pigLogger);

    
   
131

   

    
   
132
        if (tmpS != null) {

    
   
133
            Schema outputS = func.outputSchema(tmpS);

    
   
134
            TupleFactory tupleFactory = TupleFactory.getInstance();

    
   
135
            Tuple t;

    
   
136
            if (SchemaTuple.isGeneratable(tmpS)) {

    
   
137
                t = tupleFactory.newTupleForSchema(tmpS);

    
   
138
                if (t != null && t instanceof TypeAwareTuple || t instanceof SchemaTuple)

    
   
139
                    inputSchemaForGen = tmpS;

    
   
140
            }

    
   
141
            if (outputS != null && SchemaTuple.isGeneratable(outputS)) {

    
   
142
                t = tupleFactory.newTupleForSchema(outputS);

    
   
143
                if (t != null && t instanceof TypeAwareTuple || t instanceof SchemaTuple)

    
   
144
                    outputSchemaForGen = outputS;

    
   
145
            }

    
   
146
        }

    
   
147
        //THIS IS WHERE I CALL OUTPUTSCHEMA, GENERATE, AND CACHE IT FOR USAGE BELOW

    
   
148
        //NEED TO HAVE A FLAG ON WHETHER THE SCHEMA IS VALID TO USE ie inputIsGeneratable, outputIsGeneratable
129
    }
149
    }
130

    
   
150

   

    
   
151
    private Schema inputSchemaForGen;

    
   
152
    private Schema outputSchemaForGen;

    
   
153

   
131
    @Override
154
    @Override
132
    public Result processInput() throws ExecException {
155
    public Result processInput() throws ExecException {
133

    
   
156

   
134
        // Make sure the reporter is set, because it isn't getting carried
157
        // Make sure the reporter is set, because it isn't getting carried
135
        // across in the serialization (don't know why).  I suspect it's as
158
        // across in the serialization (don't know why).  I suspect it's as
[+20] [20] 23 lines
[+20] public Result processInput() throws ExecException {
159
            res.result = input;
182
            res.result = input;
160
            res.returnStatus = POStatus.STATUS_OK;
183
            res.returnStatus = POStatus.STATUS_OK;
161
            detachInput();
184
            detachInput();
162
            return res;
185
            return res;
163
        } else {
186
        } else {

    
   
187
            if (inputSchemaForGen != null)

    
   
188
                res.result = TupleFactory.getInstance().newTupleForSchema(inputSchemaForGen);

    
   
189
            else
164
            res.result = TupleFactory.getInstance().newTuple();
190
            res.result = TupleFactory.getInstance().newTuple();
165

    
   
191

   
166
            Result temp = null;
192
            Result temp = null;

    
   
193

   
167
            for(PhysicalOperator op : inputs) {
194
            for(PhysicalOperator op : inputs) {
168
                temp = op.getNext(getDummy(op.getResultType()), op.getResultType());
195
                temp = op.getNext(getDummy(op.getResultType()), op.getResultType());
169
                if(temp.returnStatus!=POStatus.STATUS_OK) {
196
                if(temp.returnStatus!=POStatus.STATUS_OK) {
170
                    return temp;
197
                    return temp;
171
                }
198
                }
[+20] [20] 11 lines
[+20] public Result processInput() throws ExecException {
183
                    }
210
                    }
184
                }
211
                }
185
                ((Tuple)res.result).append(temp.result);
212
                ((Tuple)res.result).append(temp.result);
186
            }
213
            }
187
            res.returnStatus = temp.returnStatus;
214
            res.returnStatus = temp.returnStatus;

    
   
215

   
188
            return res;
216
            return res;
189
        }
217
        }
190
    }
218
    }
191

    
   
219

   
192
    private Result getNext() throws ExecException {
220
    private Result getNext() throws ExecException {
[+20] [20] 297 lines
trunk/src/org/apache/pig/data/BinInterSedes.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/data/PrimitiveTuple.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/data/SchemaTuple.java
New File
 
trunk/src/org/apache/pig/data/TupleFactory.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/data/TypeAwareTuple.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/impl/PigContext.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
Revision 1309628 New Change
 
  1. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java: Loading...
  2. trunk/src/org/apache/pig/data/BinInterSedes.java: Loading...
  3. trunk/src/org/apache/pig/data/PrimitiveTuple.java: Loading...
  4. trunk/src/org/apache/pig/data/SchemaTuple.java: Loading...
  5. trunk/src/org/apache/pig/data/TupleFactory.java: Loading...
  6. trunk/src/org/apache/pig/data/TypeAwareTuple.java: Loading...
  7. trunk/src/org/apache/pig/impl/PigContext.java: Loading...
  8. trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java: Loading...