Review Board 1.7.22


SchemaTuple in Pig

Review Request #4651 - Created April 5, 2012 and updated

Jonathan Coveney
PIG-2632
Reviewers
pig
julien
pig
This work builds on Dmitriy's PrimitiveTuple work. The idea is that, knowing the Schema on the frontend, we can code generate Tuples which can be used for fun and profit. In rudimentary tests, the memory efficiency is 2-4x better, and it's ~15% smaller serialized (heavily heavily depends on the data, though). Need to do get/set tests, but assuming that it's on par (or even faster) than Tuple, the memory gain is huge.

Need to clean up the code and add tests.

Right now, it generates a SchemaTuple for every inputSchema and outputSchema given to UDF's. The next step is to make a SchemaBag, where I think the serialization savings will be really huge.

Needs tests and comments, but I want the code to settle a bit.

 
trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Revision 1309628 New Change
1
/*
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements.  See the NOTICE file
3
 * or more contributor license agreements.  See the NOTICE file
4
 * distributed with this work for additional information
4
 * distributed with this work for additional information
5
 * regarding copyright ownership.  The ASF licenses this file
5
 * regarding copyright ownership.  The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
7
 * "License"); you may not use this file except in compliance
8
 * with the License.  You may obtain a copy of the License at
8
 * with the License.  You may obtain a copy of the License at
9
 *
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18

    
   
18

   
19
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
19
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
20

    
   
20

   
21
import java.io.IOException;
21
import java.io.IOException;
22
import java.io.ObjectInputStream;
22
import java.io.ObjectInputStream;
23
import java.lang.reflect.Type;
23
import java.lang.reflect.Type;
24
import java.util.List;
24
import java.util.List;
25
import java.util.Map;
25
import java.util.Map;
26
import java.util.Properties;
26
import java.util.Properties;
27

    
   
27

   
28
import org.apache.pig.Accumulator;
28
import org.apache.pig.Accumulator;
29
import org.apache.pig.Algebraic;
29
import org.apache.pig.Algebraic;
30
import org.apache.pig.EvalFunc;
30
import org.apache.pig.EvalFunc;
31
import org.apache.pig.FuncSpec;
31
import org.apache.pig.FuncSpec;
32
import org.apache.pig.PigException;
32
import org.apache.pig.PigException;
33
import org.apache.pig.ResourceSchema;
33
import org.apache.pig.ResourceSchema;
34
import org.apache.pig.backend.executionengine.ExecException;
34
import org.apache.pig.backend.executionengine.ExecException;
35
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
35
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
36
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
36
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
37
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
37
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
38
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
38
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
39
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
39
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
40
import org.apache.pig.builtin.MonitoredUDF;
40
import org.apache.pig.builtin.MonitoredUDF;
41
import org.apache.pig.data.DataBag;
41
import org.apache.pig.data.DataBag;
42
import org.apache.pig.data.DataByteArray;
42
import org.apache.pig.data.DataByteArray;
43
import org.apache.pig.data.DataType;
43
import org.apache.pig.data.DataType;

    
   
44
import org.apache.pig.data.SchemaTuple;
44
import org.apache.pig.data.Tuple;
45
import org.apache.pig.data.Tuple;
45
import org.apache.pig.data.TupleFactory;
46
import org.apache.pig.data.TupleFactory;

    
   
47
import org.apache.pig.data.TypeAwareTuple;
46
import org.apache.pig.impl.PigContext;
48
import org.apache.pig.impl.PigContext;
47
import org.apache.pig.impl.logicalLayer.schema.Schema;
49
import org.apache.pig.impl.logicalLayer.schema.Schema;
48
import org.apache.pig.impl.plan.NodeIdGenerator;
50
import org.apache.pig.impl.plan.NodeIdGenerator;
49
import org.apache.pig.impl.plan.OperatorKey;
51
import org.apache.pig.impl.plan.OperatorKey;
50
import org.apache.pig.impl.plan.VisitorException;
52
import org.apache.pig.impl.plan.VisitorException;
51
import org.apache.pig.impl.util.UDFContext;
53
import org.apache.pig.impl.util.UDFContext;
52

    
   
54

   
53
public class POUserFunc extends ExpressionOperator {
55
public class POUserFunc extends ExpressionOperator {
54

    
   
56

   
55
    /**
57
    /**
56
     *
58
     *
57
     */
59
     */
58
    private static final long serialVersionUID = 1L;
60
    private static final long serialVersionUID = 1L;
59
    transient EvalFunc func;
61
    transient EvalFunc func;
60
    transient private String[] cacheFiles = null;
62
    transient private String[] cacheFiles = null;
61

    
   
63

   
62
    FuncSpec funcSpec;
64
    FuncSpec funcSpec;
63
    FuncSpec origFSpec;
65
    FuncSpec origFSpec;
64
    public static final byte INITIAL = 0;
66
    public static final byte INITIAL = 0;
65
    public static final byte INTERMEDIATE = 1;
67
    public static final byte INTERMEDIATE = 1;
66
    public static final byte FINAL = 2;
68
    public static final byte FINAL = 2;
67
    private boolean initialized = false;
69
    private boolean initialized = false;
68
    private MonitoredUDFExecutor executor = null;
70
    private MonitoredUDFExecutor executor = null;
69

    
   
71

   
70
    private PhysicalOperator referencedOperator = null;
72
    private PhysicalOperator referencedOperator = null;
71
    private boolean isAccumulationDone;
73
    private boolean isAccumulationDone;
72
    private String signature;
74
    private String signature;
73

    
   
75

   
74
    public PhysicalOperator getReferencedOperator() {
76
    public PhysicalOperator getReferencedOperator() {
75
        return referencedOperator;
77
        return referencedOperator;
76
    }
78
    }
77

    
   
79

   
78
    public void setReferencedOperator(PhysicalOperator referencedOperator) {
80
    public void setReferencedOperator(PhysicalOperator referencedOperator) {
79
        this.referencedOperator = referencedOperator;
81
        this.referencedOperator = referencedOperator;
80
    }
82
    }
81

    
   
83

   
82
    public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
84
    public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
83
        super(k, rp);
85
        super(k, rp);
84
        inputs = inp;
86
        inputs = inp;
85

    
   
87

   
86
    }
88
    }
87

    
   
89

   
88
    public POUserFunc(
90
    public POUserFunc(
89
            OperatorKey k,
91
            OperatorKey k,
90
            int rp,
92
            int rp,
91
            List<PhysicalOperator> inp,
93
            List<PhysicalOperator> inp,
92
            FuncSpec funcSpec) {
94
            FuncSpec funcSpec) {
93
        this(k, rp, inp, funcSpec, null);
95
        this(k, rp, inp, funcSpec, null);
94
    }
96
    }
95

    
   
97

   
96
    public POUserFunc(
98
    public POUserFunc(
97
            OperatorKey k,
99
            OperatorKey k,
98
            int rp,
100
            int rp,
99
            List<PhysicalOperator> inp,
101
            List<PhysicalOperator> inp,
100
            FuncSpec funcSpec,
102
            FuncSpec funcSpec,
101
            EvalFunc func) {
103
            EvalFunc func) {
102
        super(k, rp);
104
        super(k, rp);
103
        super.setInputs(inp);
105
        super.setInputs(inp);
104
        this.funcSpec = funcSpec;
106
        this.funcSpec = funcSpec;
105
        this.origFSpec = funcSpec;
107
        this.origFSpec = funcSpec;
106
        this.func = func;
108
        this.func = func;
107
        instantiateFunc(funcSpec);
109
        instantiateFunc(funcSpec);
108
    }
110
    }
109

    
   
111

   
110
    private void instantiateFunc(FuncSpec fSpec) {
112
    private void instantiateFunc(FuncSpec fSpec) {
111
        this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
113
        this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
112
        this.setSignature(signature);
114
        this.setSignature(signature);
113
        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
115
        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
114
    	Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
116
    	Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
115
    	if(tmpS!=null)
117
    	if(tmpS!=null)
116
    		this.func.setInputSchema(tmpS);
118
    		this.func.setInputSchema(tmpS);
117
        if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
119
        if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
118
            executor = new MonitoredUDFExecutor(func);
120
            executor = new MonitoredUDFExecutor(func);
119
        }
121
        }
120
        //the next couple of initializations do not work as intended for the following reasons
122
        //the next couple of initializations do not work as intended for the following reasons
121
        //the reporter and pigLogger are member variables of PhysicalOperator
123
        //the reporter and pigLogger are member variables of PhysicalOperator
122
        //when instanitateFunc is invoked at deserialization time, both
124
        //when instanitateFunc is invoked at deserialization time, both
123
        //reporter and pigLogger are null. They are set during map and reduce calls,
125
        //reporter and pigLogger are null. They are set during map and reduce calls,
124
        //making the initializations here basically useless. Look at the processInput
126
        //making the initializations here basically useless. Look at the processInput
125
        //method where these variables are re-initialized. At that point, the PhysicalOperator
127
        //method where these variables are re-initialized. At that point, the PhysicalOperator
126
        //is set up correctly with the reporter and pigLogger references
128
        //is set up correctly with the reporter and pigLogger references
127
        this.func.setReporter(reporter);
129
        this.func.setReporter(reporter);
128
        this.func.setPigLogger(pigLogger);
130
        this.func.setPigLogger(pigLogger);

    
   
131

   

    
   
132
        if (tmpS != null) {

    
   
133
            Schema outputS = func.outputSchema(tmpS);

    
   
134
            TupleFactory tupleFactory = TupleFactory.getInstance();

    
   
135
            Tuple t;

    
   
136
            if (SchemaTuple.isGeneratable(tmpS)) {

    
   
137
                t = tupleFactory.newTupleForSchema(tmpS);

    
   
138
                if (t != null && t instanceof TypeAwareTuple || t instanceof SchemaTuple)

    
   
139
                    inputSchemaForGen = tmpS;

    
   
140
            }

    
   
141
            if (outputS != null && SchemaTuple.isGeneratable(outputS)) {

    
   
142
                t = tupleFactory.newTupleForSchema(outputS);

    
   
143
                if (t != null && t instanceof TypeAwareTuple || t instanceof SchemaTuple)

    
   
144
                    outputSchemaForGen = outputS;

    
   
145
            }

    
   
146
        }

    
   
147
        //THIS IS WHERE I CALL OUTPUTSCHEMA, GENERATE, AND CACHE IT FOR USAGE BELOW

    
   
148
        //NEED TO HAVE A FLAG ON WHETHER THE SCHEMA IS VALID TO USE ie inputIsGeneratable, outputIsGeneratable
129
    }
149
    }
130

    
   
150

   

    
   
151
    private Schema inputSchemaForGen;

    
   
152
    private Schema outputSchemaForGen;

    
   
153

   
131
    @Override
154
    @Override
132
    public Result processInput() throws ExecException {
155
    public Result processInput() throws ExecException {
133

    
   
156

   
134
        // Make sure the reporter is set, because it isn't getting carried
157
        // Make sure the reporter is set, because it isn't getting carried
135
        // across in the serialization (don't know why).  I suspect it's as
158
        // across in the serialization (don't know why).  I suspect it's as
136
        // cheap to call the setReporter call everytime as to check whether I
159
        // cheap to call the setReporter call everytime as to check whether I
137
        // have (hopefully java will inline it).
160
        // have (hopefully java will inline it).
138
        if(!initialized) {
161
        if(!initialized) {
139
            func.setReporter(reporter);
162
            func.setReporter(reporter);
140
            func.setPigLogger(pigLogger);
163
            func.setPigLogger(pigLogger);
141
            initialized = true;
164
            initialized = true;
142
        }
165
        }
143

    
   
166

   
144
        Result res = new Result();
167
        Result res = new Result();
145
        Tuple inpValue = null;
168
        Tuple inpValue = null;
146
        if (input == null && (inputs == null || inputs.size()==0)) {
169
        if (input == null && (inputs == null || inputs.size()==0)) {
147
//			log.warn("No inputs found. Signaling End of Processing.");
170
//			log.warn("No inputs found. Signaling End of Processing.");
148
            res.returnStatus = POStatus.STATUS_EOP;
171
            res.returnStatus = POStatus.STATUS_EOP;
149
            return res;
172
            return res;
150
        }
173
        }
151

    
   
174

   
152
        //Should be removed once the model is clear
175
        //Should be removed once the model is clear
153
        if(reporter!=null) {
176
        if(reporter!=null) {
154
            reporter.progress();
177
            reporter.progress();
155
        }
178
        }
156

    
   
179

   
157

    
   
180

   
158
        if(isInputAttached()) {
181
        if(isInputAttached()) {
159
            res.result = input;
182
            res.result = input;
160
            res.returnStatus = POStatus.STATUS_OK;
183
            res.returnStatus = POStatus.STATUS_OK;
161
            detachInput();
184
            detachInput();
162
            return res;
185
            return res;
163
        } else {
186
        } else {

    
   
187
            if (inputSchemaForGen != null)

    
   
188
                res.result = TupleFactory.getInstance().newTupleForSchema(inputSchemaForGen);

    
   
189
            else
164
            res.result = TupleFactory.getInstance().newTuple();
190
            res.result = TupleFactory.getInstance().newTuple();
165

    
   
191

   
166
            Result temp = null;
192
            Result temp = null;

    
   
193

   
167
            for(PhysicalOperator op : inputs) {
194
            for(PhysicalOperator op : inputs) {
168
                temp = op.getNext(getDummy(op.getResultType()), op.getResultType());
195
                temp = op.getNext(getDummy(op.getResultType()), op.getResultType());
169
                if(temp.returnStatus!=POStatus.STATUS_OK) {
196
                if(temp.returnStatus!=POStatus.STATUS_OK) {
170
                    return temp;
197
                    return temp;
171
                }
198
                }
172

    
   
199

   
173
                if(op instanceof POProject &&
200
                if(op instanceof POProject &&
174
                        op.getResultType() == DataType.TUPLE){
201
                        op.getResultType() == DataType.TUPLE){
175
                    POProject projOp = (POProject)op;
202
                    POProject projOp = (POProject)op;
176
                    if(projOp.isProjectToEnd()){
203
                    if(projOp.isProjectToEnd()){
177
                        Tuple trslt = (Tuple) temp.result;
204
                        Tuple trslt = (Tuple) temp.result;
178
                        Tuple rslt = (Tuple) res.result;
205
                        Tuple rslt = (Tuple) res.result;
179
                        for(int i=0;i<trslt.size();i++) {
206
                        for(int i=0;i<trslt.size();i++) {
180
                            rslt.append(trslt.get(i));
207
                            rslt.append(trslt.get(i));
181
                        }
208
                        }
182
                        continue;
209
                        continue;
183
                    }
210
                    }
184
                }
211
                }
185
                ((Tuple)res.result).append(temp.result);
212
                ((Tuple)res.result).append(temp.result);
186
            }
213
            }
187
            res.returnStatus = temp.returnStatus;
214
            res.returnStatus = temp.returnStatus;

    
   
215

   
188
            return res;
216
            return res;
189
        }
217
        }
190
    }
218
    }
191

    
   
219

   
192
    private Result getNext() throws ExecException {
220
    private Result getNext() throws ExecException {
193
        Result result = processInput();
221
        Result result = processInput();
194
        String errMsg = "";
222
        String errMsg = "";
195
        try {
223
        try {
196
            if(result.returnStatus == POStatus.STATUS_OK) {
224
            if(result.returnStatus == POStatus.STATUS_OK) {
197
                if (isAccumulative()) {
225
                if (isAccumulative()) {
198
                    if (isAccumStarted()) {
226
                    if (isAccumStarted()) {
199
                        ((Accumulator)func).accumulate((Tuple)result.result);
227
                        ((Accumulator)func).accumulate((Tuple)result.result);
200
                        result.returnStatus = POStatus.STATUS_BATCH_OK;
228
                        result.returnStatus = POStatus.STATUS_BATCH_OK;
201
                        result.result = null;
229
                        result.result = null;
202
                        isAccumulationDone = false;
230
                        isAccumulationDone = false;
203
                    }else{
231
                    }else{
204
                        if(isAccumulationDone){
232
                        if(isAccumulationDone){
205
                            //PORelationToExprProject does not return STATUS_EOP
233
                            //PORelationToExprProject does not return STATUS_EOP
206
                            // so that udf gets called both when isAccumStarted
234
                            // so that udf gets called both when isAccumStarted
207
                            // is first true and then set to false, even
235
                            // is first true and then set to false, even
208
                            //when the input relation is empty.
236
                            //when the input relation is empty.
209
                            // so the STATUS_EOP has to be sent from POUserFunc, 
237
                            // so the STATUS_EOP has to be sent from POUserFunc, 
210
                            // after the results have been sent.
238
                            // after the results have been sent.
211
                            result.result = null;
239
                            result.result = null;
212
                            result.returnStatus = POStatus.STATUS_EOP;
240
                            result.returnStatus = POStatus.STATUS_EOP;
213
                        }
241
                        }
214
                        else{
242
                        else{
215
                            result.result = ((Accumulator)func).getValue();
243
                            result.result = ((Accumulator)func).getValue();
216
                            result.returnStatus = POStatus.STATUS_OK;
244
                            result.returnStatus = POStatus.STATUS_OK;
217
                            ((Accumulator)func).cleanup();
245
                            ((Accumulator)func).cleanup();
218
                            isAccumulationDone = true;
246
                            isAccumulationDone = true;
219
                        }
247
                        }
220
                    }
248
                    }
221
                } else {
249
                } else {
222
                    if (executor != null) {
250
                    if (executor != null) {
223
                        result.result = executor.monitorExec((Tuple) result.result);
251
                        result.result = executor.monitorExec((Tuple) result.result);
224
                    } else {
252
                    } else {
225
                    result.result = func.exec((Tuple) result.result);
253
                    result.result = func.exec((Tuple) result.result);
226
                    }
254
                    }
227
                }
255
                }
228
                return result;
256
                return result;
229
            }
257
            }
230

    
   
258

   
231
            return result;
259
            return result;
232
        } catch (ExecException ee) {
260
        } catch (ExecException ee) {
233
            throw ee;
261
            throw ee;
234
        } catch (IOException ioe) {
262
        } catch (IOException ioe) {
235
            int errCode = 2078;
263
            int errCode = 2078;
236
            String msg = "Caught error from UDF: " + funcSpec.getClassName();
264
            String msg = "Caught error from UDF: " + funcSpec.getClassName();
237
            String footer = " [" + ioe.getMessage() + "]";
265
            String footer = " [" + ioe.getMessage() + "]";
238

    
   
266

   
239
            if(ioe instanceof PigException) {
267
            if(ioe instanceof PigException) {
240
                int udfErrorCode = ((PigException)ioe).getErrorCode();
268
                int udfErrorCode = ((PigException)ioe).getErrorCode();
241
                if(udfErrorCode != 0) {
269
                if(udfErrorCode != 0) {
242
                    errCode = udfErrorCode;
270
                    errCode = udfErrorCode;
243
                    msg = ((PigException)ioe).getMessage();
271
                    msg = ((PigException)ioe).getMessage();
244
                } else {
272
                } else {
245
                    msg += " [" + ((PigException)ioe).getMessage() + " ]";
273
                    msg += " [" + ((PigException)ioe).getMessage() + " ]";
246
                }
274
                }
247
            } else {
275
            } else {
248
                msg += footer;
276
                msg += footer;
249
            }
277
            }
250

    
   
278

   
251
            throw new ExecException(msg, errCode, PigException.BUG, ioe);
279
            throw new ExecException(msg, errCode, PigException.BUG, ioe);
252
        } catch (IndexOutOfBoundsException ie) {
280
        } catch (IndexOutOfBoundsException ie) {
253
            int errCode = 2078;
281
            int errCode = 2078;
254
            String msg = "Caught error from UDF: " + funcSpec.getClassName() +
282
            String msg = "Caught error from UDF: " + funcSpec.getClassName() +
255
            ", Out of bounds access [" + ie.getMessage() + "]";
283
            ", Out of bounds access [" + ie.getMessage() + "]";
256
            throw new ExecException(msg, errCode, PigException.BUG, ie);
284
            throw new ExecException(msg, errCode, PigException.BUG, ie);
257
        }
285
        }
258
    }
286
    }
259

    
   
287

   
260
    @Override
288
    @Override
261
    public Result getNext(Tuple tIn) throws ExecException {
289
    public Result getNext(Tuple tIn) throws ExecException {
262
        return getNext();
290
        return getNext();
263
    }
291
    }
264

    
   
292

   
265
    @Override
293
    @Override
266
    public Result getNext(DataBag db) throws ExecException {
294
    public Result getNext(DataBag db) throws ExecException {
267
        return getNext();
295
        return getNext();
268
    }
296
    }
269

    
   
297

   
270
    @Override
298
    @Override
271
    public Result getNext(Integer i) throws ExecException {
299
    public Result getNext(Integer i) throws ExecException {
272
        return getNext();
300
        return getNext();
273
    }
301
    }
274

    
   
302

   
275
    @Override
303
    @Override
276
    public Result getNext(Boolean b) throws ExecException {
304
    public Result getNext(Boolean b) throws ExecException {
277

    
   
305

   
278
        return getNext();
306
        return getNext();
279
    }
307
    }
280

    
   
308

   
281
    @Override
309
    @Override
282
    public Result getNext(DataByteArray ba) throws ExecException {
310
    public Result getNext(DataByteArray ba) throws ExecException {
283

    
   
311

   
284
        return getNext();
312
        return getNext();
285
    }
313
    }
286

    
   
314

   
287
    @Override
315
    @Override
288
    public Result getNext(Double d) throws ExecException {
316
    public Result getNext(Double d) throws ExecException {
289

    
   
317

   
290
        return getNext();
318
        return getNext();
291
    }
319
    }
292

    
   
320

   
293
    @Override
321
    @Override
294
    public Result getNext(Float f) throws ExecException {
322
    public Result getNext(Float f) throws ExecException {
295

    
   
323

   
296
        return getNext();
324
        return getNext();
297
    }
325
    }
298

    
   
326

   
299
    @Override
327
    @Override
300
    public Result getNext(Long l) throws ExecException {
328
    public Result getNext(Long l) throws ExecException {
301

    
   
329

   
302
        return getNext();
330
        return getNext();
303
    }
331
    }
304

    
   
332

   
305
    @Override
333
    @Override
306
    public Result getNext(Map m) throws ExecException {
334
    public Result getNext(Map m) throws ExecException {
307

    
   
335

   
308
        return getNext();
336
        return getNext();
309
    }
337
    }
310

    
   
338

   
311
    @Override
339
    @Override
312
    public Result getNext(String s) throws ExecException {
340
    public Result getNext(String s) throws ExecException {
313

    
   
341

   
314
        return getNext();
342
        return getNext();
315
    }
343
    }
316

    
   
344

   
317
    public void setAlgebraicFunction(byte Function) throws ExecException {
345
    public void setAlgebraicFunction(byte Function) throws ExecException {
318
        // This will only be used by the optimizer for putting correct functions
346
        // This will only be used by the optimizer for putting correct functions
319
        // in the mapper,
347
        // in the mapper,
320
        // combiner and reduce. This helps in maintaining the physical plan as
348
        // combiner and reduce. This helps in maintaining the physical plan as
321
        // is without the
349
        // is without the
322
        // optimiser having to replace any operators.
350
        // optimiser having to replace any operators.
323
        // You wouldn't be able to make two calls to this function on the same
351
        // You wouldn't be able to make two calls to this function on the same
324
        // algebraic EvalFunc as
352
        // algebraic EvalFunc as
325
        // func is being changed.
353
        // func is being changed.
326
        switch (Function) {
354
        switch (Function) {
327
        case INITIAL:
355
        case INITIAL:
328
            funcSpec = new FuncSpec(getInitial());
356
            funcSpec = new FuncSpec(getInitial());
329
            break;
357
            break;
330
        case INTERMEDIATE:
358
        case INTERMEDIATE:
331
            funcSpec = new FuncSpec(getIntermed());
359
            funcSpec = new FuncSpec(getIntermed());
332
            break;
360
            break;
333
        case FINAL:
361
        case FINAL:
334
            funcSpec = new FuncSpec(getFinal());
362
            funcSpec = new FuncSpec(getFinal());
335
            break;
363
            break;
336
        }
364
        }
337
        funcSpec.setCtorArgs(origFSpec.getCtorArgs());
365
        funcSpec.setCtorArgs(origFSpec.getCtorArgs());
338
        instantiateFunc(funcSpec);
366
        instantiateFunc(funcSpec);
339
        setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
367
        setResultType(DataType.findType(((EvalFunc<?>) func).getReturnType()));
340
    }
368
    }
341

    
   
369

   
342
    public String getInitial() throws ExecException {
370
    public String getInitial() throws ExecException {
343
        instantiateFunc(origFSpec);
371
        instantiateFunc(origFSpec);
344
        if (func instanceof Algebraic) {
372
        if (func instanceof Algebraic) {
345
            return ((Algebraic) func).getInitial();
373
            return ((Algebraic) func).getInitial();
346
        } else {
374
        } else {
347
            int errCode = 2072;
375
            int errCode = 2072;
348
            String msg = "Attempt to run a non-algebraic function"
376
            String msg = "Attempt to run a non-algebraic function"
349
                + " as an algebraic function";
377
                + " as an algebraic function";
350
            throw new ExecException(msg, errCode, PigException.BUG);
378
            throw new ExecException(msg, errCode, PigException.BUG);
351
        }
379
        }
352
    }
380
    }
353

    
   
381

   
354
    public String getIntermed() throws ExecException {
382
    public String getIntermed() throws ExecException {
355
        instantiateFunc(origFSpec);
383
        instantiateFunc(origFSpec);
356
        if (func instanceof Algebraic) {
384
        if (func instanceof Algebraic) {
357
            return ((Algebraic) func).getIntermed();
385
            return ((Algebraic) func).getIntermed();
358
        } else {
386
        } else {
359
            int errCode = 2072;
387
            int errCode = 2072;
360
            String msg = "Attempt to run a non-algebraic function"
388
            String msg = "Attempt to run a non-algebraic function"
361
                + " as an algebraic function";
389
                + " as an algebraic function";
362
            throw new ExecException(msg, errCode, PigException.BUG);
390
            throw new ExecException(msg, errCode, PigException.BUG);
363
        }
391
        }
364
    }
392
    }
365

    
   
393

   
366
    public String getFinal() throws ExecException {
394
    public String getFinal() throws ExecException {
367
        instantiateFunc(origFSpec);
395
        instantiateFunc(origFSpec);
368
        if (func instanceof Algebraic) {
396
        if (func instanceof Algebraic) {
369
            return ((Algebraic) func).getFinal();
397
            return ((Algebraic) func).getFinal();
370
        } else {
398
        } else {
371
            int errCode = 2072;
399
            int errCode = 2072;
372
            String msg = "Attempt to run a non-algebraic function"
400
            String msg = "Attempt to run a non-algebraic function"
373
                + " as an algebraic function";
401
                + " as an algebraic function";
374
            throw new ExecException(msg, errCode, PigException.BUG);
402
            throw new ExecException(msg, errCode, PigException.BUG);
375
        }
403
        }
376
    }
404
    }
377

    
   
405

   
378
    public Type getReturnType() {
406
    public Type getReturnType() {
379
        return func.getReturnType();
407
        return func.getReturnType();
380
    }
408
    }
381

    
   
409

   
382
    public void finish() {
410
    public void finish() {
383
        func.finish();
411
        func.finish();
384
        if (executor != null) {
412
        if (executor != null) {
385
            executor.terminate();
413
            executor.terminate();
386
        }
414
        }
387
    }
415
    }
388

    
   
416

   
389
    public Schema outputSchema(Schema input) {
417
    public Schema outputSchema(Schema input) {
390
        return func.outputSchema(input);
418
        return func.outputSchema(input);
391
    }
419
    }
392

    
   
420

   
393
    public Boolean isAsynchronous() {
421
    public Boolean isAsynchronous() {
394
        return func.isAsynchronous();
422
        return func.isAsynchronous();
395
    }
423
    }
396

    
   
424

   
397
    @Override
425
    @Override
398
    public String name() {
426
    public String name() {
399
        return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
427
        return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
400
    }
428
    }
401

    
   
429

   
402
    @Override
430
    @Override
403
    public boolean supportsMultipleInputs() {
431
    public boolean supportsMultipleInputs() {
404

    
   
432

   
405
        return true;
433
        return true;
406
    }
434
    }
407

    
   
435

   
408
    @Override
436
    @Override
409
    public boolean supportsMultipleOutputs() {
437
    public boolean supportsMultipleOutputs() {
410

    
   
438

   
411
        return false;
439
        return false;
412
    }
440
    }
413

    
   
441

   
414
    @Override
442
    @Override
415
    public void visit(PhyPlanVisitor v) throws VisitorException {
443
    public void visit(PhyPlanVisitor v) throws VisitorException {
416

    
   
444

   
417
        v.visitUserFunc(this);
445
        v.visitUserFunc(this);
418
    }
446
    }
419

    
   
447

   
420
    public FuncSpec getFuncSpec() {
448
    public FuncSpec getFuncSpec() {
421
        return funcSpec;
449
        return funcSpec;
422
    }
450
    }
423

    
   
451

   
424
    public String[] getCacheFiles() {
452
    public String[] getCacheFiles() {
425
        return cacheFiles;
453
        return cacheFiles;
426
    }
454
    }
427

    
   
455

   
428
    public void setCacheFiles(String[] cf) {
456
    public void setCacheFiles(String[] cf) {
429
        cacheFiles = cf;
457
        cacheFiles = cf;
430
    }
458
    }
431

    
   
459

   
432
    public boolean combinable() {
460
    public boolean combinable() {
433
        return (func instanceof Algebraic);
461
        return (func instanceof Algebraic);
434
    }
462
    }
435

    
   
463

   
436
    @Override
464
    @Override
437
    public POUserFunc clone() throws CloneNotSupportedException {
465
    public POUserFunc clone() throws CloneNotSupportedException {
438
        // Inputs will be patched up later by PhysicalPlan.clone()
466
        // Inputs will be patched up later by PhysicalPlan.clone()
439
        POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope,
467
        POUserFunc clone = new POUserFunc(new OperatorKey(mKey.scope,
440
            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
468
            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
441
            requestedParallelism, null, funcSpec.clone());
469
            requestedParallelism, null, funcSpec.clone());
442
        clone.setResultType(resultType);
470
        clone.setResultType(resultType);
443
        clone.signature = signature;
471
        clone.signature = signature;
444
        return clone;
472
        return clone;
445
    }
473
    }
446

    
   
474

   
447
    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
475
    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{
448
        is.defaultReadObject();
476
        is.defaultReadObject();
449
        instantiateFunc(funcSpec);
477
        instantiateFunc(funcSpec);
450
    }
478
    }
451

    
   
479

   
452
    /**
480
    /**
453
     * Get child expression of this expression
481
     * Get child expression of this expression
454
     */
482
     */
455
    @Override
483
    @Override
456
    public List<ExpressionOperator> getChildExpressions() {
484
    public List<ExpressionOperator> getChildExpressions() {
457
        return null;
485
        return null;
458
    }
486
    }
459

    
   
487

   
460
    @SuppressWarnings("unchecked")
488
    @SuppressWarnings("unchecked")
461
    @Override
489
    @Override
462
    public void setAccumStart() {
490
    public void setAccumStart() {
463
        if (isAccumulative() && !isAccumStarted()) {
491
        if (isAccumulative() && !isAccumStarted()) {
464
            super.setAccumStart();
492
            super.setAccumStart();
465
            ((Accumulator)func).cleanup();
493
            ((Accumulator)func).cleanup();
466
        }
494
        }
467
    }
495
    }
468

    
   
496

   
469
    @Override
497
    @Override
470
    public void setResultType(byte resultType) {
498
    public void setResultType(byte resultType) {
471
        this.resultType = resultType;
499
        this.resultType = resultType;
472
    }
500
    }
473
    
501
    
474
    @Override
502
    @Override
475
    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
503
    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
476
        return (Tuple) out;
504
        return (Tuple) out;
477
    }
505
    }
478
    
506
    
479
    public EvalFunc getFunc() {
507
    public EvalFunc getFunc() {
480
        return func;
508
        return func;
481
    }
509
    }
482
    
510
    
483
    public void setSignature(String signature) {
511
    public void setSignature(String signature) {
484
        this.signature = signature;
512
        this.signature = signature;
485
        if (this.func!=null) {
513
        if (this.func!=null) {
486
            this.func.setUDFContextSignature(signature);
514
            this.func.setUDFContextSignature(signature);
487
        }
515
        }
488
    }
516
    }
489
}
517
}
trunk/src/org/apache/pig/data/BinInterSedes.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/data/PrimitiveTuple.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/data/SchemaTuple.java
New File
 
trunk/src/org/apache/pig/data/TupleFactory.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/data/TypeAwareTuple.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/impl/PigContext.java
Revision 1309628 New Change
 
trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
Revision 1309628 New Change
 
  1. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java: Loading...
  2. trunk/src/org/apache/pig/data/BinInterSedes.java: Loading...
  3. trunk/src/org/apache/pig/data/PrimitiveTuple.java: Loading...
  4. trunk/src/org/apache/pig/data/SchemaTuple.java: Loading...
  5. trunk/src/org/apache/pig/data/TupleFactory.java: Loading...
  6. trunk/src/org/apache/pig/data/TypeAwareTuple.java: Loading...
  7. trunk/src/org/apache/pig/impl/PigContext.java: Loading...
  8. trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java: Loading...