Review Board 1.7.22


SchemaTuple in Pig

Review Request #4651 - Created April 5, 2012 and updated

Jonathan Coveney
PIG-2632
Reviewers
pig
julien
pig
This work builds on Dmitriy's PrimitiveTuple work. The idea is that, knowing the Schema on the frontend, we can code generate Tuples which can be used for fun and profit. In rudimentary tests, the memory efficiency is 2-4x better, and it's ~15% smaller serialized (heavily heavily depends on the data, though). Need to do get/set tests, but assuming that it's on par (or even faster) than Tuple, the memory gain is huge.

Need to clean up the code and add tests.

Right now, it generates a SchemaTuple for every inputSchema and outputSchema given to UDF's. The next step is to make a SchemaBag, where I think the serialization savings will be really huge.

Needs tests and comments, but I want the code to settle a bit.

 

Diff revision 7

This is not the most recent revision of the diff. The latest diff is revision 10. See what's changed.

1 2 3 4 5 6 7 8 9 10
1 2 3 4 5 6 7 8 9 10

  1. trunk/src/docs/src/documentation/content/xdocs/perf.xml: Loading...
  2. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java: Loading...
  3. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java: Loading...
  4. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java: Loading...
  5. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java: Loading...
  6. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java: Loading...
  7. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java: Loading...
  8. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java: Loading...
  9. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java: Loading...
  10. trunk/src/org/apache/pig/data/BinInterSedes.java: Loading...
  11. trunk/src/org/apache/pig/data/BinSedesTupleFactory.java: Loading...
  12. trunk/src/org/apache/pig/data/DataByteArray.java: Loading...
  13. trunk/src/org/apache/pig/data/TupleFactory.java: Loading...
  14. trunk/src/org/apache/pig/data/TypeAwareTuple.java: Loading...
  15. trunk/src/org/apache/pig/impl/PigContext.java: Loading...
  16. trunk/src/org/apache/pig/impl/io/NullableTuple.java: Loading...
  17. trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java: Loading...
  18. trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java: Loading...
  19. trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java: Loading...
  20. trunk/src/org/apache/pig/tools/pigstats/ScriptState.java: Loading...
  21. trunk/test/org/apache/pig/test/TestDataBag.java: Loading...
  22. trunk/test/org/apache/pig/test/TestSchema.java: Loading...
trunk/src/docs/src/documentation/content/xdocs/perf.xml
Revision 1351931 New Change
1
<?xml version="1.0" encoding="UTF-8"?>
1
<?xml version="1.0" encoding="UTF-8"?>
2
<!--
2
<!--
3
  Licensed to the Apache Software Foundation (ASF) under one or more
3
  Licensed to the Apache Software Foundation (ASF) under one or more
4
  contributor license agreements.  See the NOTICE file distributed with
4
  contributor license agreements.  See the NOTICE file distributed with
5
  this work for additional information regarding copyright ownership.
5
  this work for additional information regarding copyright ownership.
6
  The ASF licenses this file to You under the Apache License, Version 2.0
6
  The ASF licenses this file to You under the Apache License, Version 2.0
7
  (the "License"); you may not use this file except in compliance with
7
  (the "License"); you may not use this file except in compliance with
8
  the License.  You may obtain a copy of the License at
8
  the License.  You may obtain a copy of the License at
9

    
   
9

   
10
      http://www.apache.org/licenses/LICENSE-2.0
10
      http://www.apache.org/licenses/LICENSE-2.0
11

    
   
11

   
12
  Unless required by applicable law or agreed to in writing, software
12
  Unless required by applicable law or agreed to in writing, software
13
  distributed under the License is distributed on an "AS IS" BASIS,
13
  distributed under the License is distributed on an "AS IS" BASIS,
14
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
  See the License for the specific language governing permissions and
15
  See the License for the specific language governing permissions and
16
  limitations under the License.
16
  limitations under the License.
17
-->
17
-->
18
<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
18
<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
19
<document>
19
<document>
20
  <header>
20
  <header>
21
    <title>Performance and Efficiency</title>
21
    <title>Performance and Efficiency</title>
22
  </header>
22
  </header>
23
  <body> 
23
  <body> 
24
  
24
  
25
<!-- ================================================================== -->
25
<!-- ================================================================== -->
26
<!-- COMBINER -->
26
<!-- COMBINER -->
27
<section id="combiner">
27
<section id="combiner">
28
<title>Combiner</title> 
28
<title>Combiner</title> 
29

    
   
29

   
30
<p>The Pig combiner is an optimizer that is invoked when the statements in your scripts are arranged in certain ways. The examples below demonstrate when the combiner is used and not used. Whenever possible, make sure the combiner is used as it frequently yields an order of magnitude improvement in performance. </p>
30
<p>The Pig combiner is an optimizer that is invoked when the statements in your scripts are arranged in certain ways. The examples below demonstrate when the combiner is used and not used. Whenever possible, make sure the combiner is used as it frequently yields an order of magnitude improvement in performance. </p>
31

    
   
31

   
32
<section>
32
<section>
33
<title>When the Combiner is Used</title> 
33
<title>When the Combiner is Used</title> 
34
<p>The combiner is generally used in the case of non-nested foreach where all projections are either expressions on the group column or expressions on algebraic UDFs (see  <a href="#Algebraic-interface">Make Your UDFs Algebraic</a>).</p>
34
<p>The combiner is generally used in the case of non-nested foreach where all projections are either expressions on the group column or expressions on algebraic UDFs (see  <a href="#Algebraic-interface">Make Your UDFs Algebraic</a>).</p>
35

    
   
35

   
36
<p>Example:</p>
36
<p>Example:</p>
37

    
   
37

   
38
<source>
38
<source>
39
A = load 'studenttab10k' as (name, age, gpa);
39
A = load 'studenttab10k' as (name, age, gpa);
40
B = group A by age;
40
B = group A by age;
41
C = foreach B generate ABS(SUM(A.gpa)), COUNT(org.apache.pig.builtin.Distinct(A.name)), (MIN(A.gpa) + MAX(A.gpa))/2, group.age;
41
C = foreach B generate ABS(SUM(A.gpa)), COUNT(org.apache.pig.builtin.Distinct(A.name)), (MIN(A.gpa) + MAX(A.gpa))/2, group.age;
42
explain C;
42
explain C;
43
</source>
43
</source>
44
<p></p>
44
<p></p>
45
<p>In the above example:</p>
45
<p>In the above example:</p>
46
<ul>
46
<ul>
47
<li>The GROUP statement can be referred to as a whole or by accessing individual fields (as in the example). </li>
47
<li>The GROUP statement can be referred to as a whole or by accessing individual fields (as in the example). </li>
48
<li>The GROUP statement and its elements can appear anywhere in the projection. </li>
48
<li>The GROUP statement and its elements can appear anywhere in the projection. </li>
49
</ul>
49
</ul>
50

    
   
50

   
51
<p>In the above example, a variety of expressions can be applied to algebraic functions including:</p>
51
<p>In the above example, a variety of expressions can be applied to algebraic functions including:</p>
52
<ul>
52
<ul>
53
<li>A column transformation function such as ABS can be applied to an algebraic function SUM.</li>
53
<li>A column transformation function such as ABS can be applied to an algebraic function SUM.</li>
54
<li>An algebraic function (COUNT) can be applied to another algebraic function (Distinct), but only the inner function is computed using the combiner. </li>
54
<li>An algebraic function (COUNT) can be applied to another algebraic function (Distinct), but only the inner function is computed using the combiner. </li>
55
<li>A mathematical expression can be applied to one or more algebraic functions. </li>
55
<li>A mathematical expression can be applied to one or more algebraic functions. </li>
56
</ul>
56
</ul>
57
<p></p>
57
<p></p>
58

    
   
58

   
59
<p>You can check if the combiner is used for your query by running <a href="test.html#EXPLAIN">EXPLAIN</a> on the FOREACH alias as shown above. You should see the combine section in the MapReduce part of the plan:</p>
59
<p>You can check if the combiner is used for your query by running <a href="test.html#EXPLAIN">EXPLAIN</a> on the FOREACH alias as shown above. You should see the combine section in the MapReduce part of the plan:</p>
60

    
   
60

   
61

    
   
61

   
62

    
   
62

   
63
<source>
63
<source>
64
.....
64
.....
65
Combine Plan
65
Combine Plan
66
B: Local Rearrange[tuple]{bytearray}(false) - scope-42
66
B: Local Rearrange[tuple]{bytearray}(false) - scope-42
67
| |
67
| |
68
| Project[bytearray][0] - scope-43
68
| Project[bytearray][0] - scope-43
69
|
69
|
70
|---C: New For Each(false,false,false)[bag] - scope-28
70
|---C: New For Each(false,false,false)[bag] - scope-28
71
| |
71
| |
72
| Project[bytearray][0] - scope-29
72
| Project[bytearray][0] - scope-29
73
| |
73
| |
74
| POUserFunc(org.apache.pig.builtin.SUM$Intermediate)[tuple] - scope-30
74
| POUserFunc(org.apache.pig.builtin.SUM$Intermediate)[tuple] - scope-30
75
| |
75
| |
76
| |---Project[bag][1] - scope-31
76
| |---Project[bag][1] - scope-31
77
| |
77
| |
78
| POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-32
78
| POUserFunc(org.apache.pig.builtin.Distinct$Intermediate)[tuple] - scope-32
79
| |
79
| |
80
| |---Project[bag][2] - scope-33
80
| |---Project[bag][2] - scope-33
81
|
81
|
82
|---POCombinerPackage[tuple]{bytearray} - scope-36--------
82
|---POCombinerPackage[tuple]{bytearray} - scope-36--------
83
.....
83
.....
84
</source>
84
</source>
85

    
   
85

   
86
<p>The combiner is also used with a nested foreach as long as the only nested operation used is DISTINCT
86
<p>The combiner is also used with a nested foreach as long as the only nested operation used is DISTINCT
87
(see <a href="basic.html#FOREACH">FOREACH</a> and <a href="basic.html#nestedblock">Example: Nested Block</a>).
87
(see <a href="basic.html#FOREACH">FOREACH</a> and <a href="basic.html#nestedblock">Example: Nested Block</a>).
88
</p>
88
</p>
89

    
   
89

   
90
<source>
90
<source>
91
A = load 'studenttab10k' as (name, age, gpa);
91
A = load 'studenttab10k' as (name, age, gpa);
92
B = group A by age;
92
B = group A by age;
93
C = foreach B { D = distinct (A.name); generate group, COUNT(D);}
93
C = foreach B { D = distinct (A.name); generate group, COUNT(D);}
94
</source>
94
</source>
95
<p></p>
95
<p></p>
96
<p>Finally, use of the combiner is influenced by the surrounding environment of the GROUP and FOREACH statements.</p>
96
<p>Finally, use of the combiner is influenced by the surrounding environment of the GROUP and FOREACH statements.</p>
97
 </section>
97
 </section>
98

    
   
98

   
99
 <section>
99
 <section>
100
<title>When the Combiner is Not Used</title> 
100
<title>When the Combiner is Not Used</title> 
101
<p>The combiner is generally not used if there is any operator that comes between the GROUP and FOREACH statements in the execution plan. Even if the statements are next to each other in your script, the optimizer might rearrange them. In this example, the optimizer will push FILTER above FOREACH which will prevent the use of the combiner:</p>
101
<p>The combiner is generally not used if there is any operator that comes between the GROUP and FOREACH statements in the execution plan. Even if the statements are next to each other in your script, the optimizer might rearrange them. In this example, the optimizer will push FILTER above FOREACH which will prevent the use of the combiner:</p>
102
<source>
102
<source>
103
A = load 'studenttab10k' as (name, age, gpa);
103
A = load 'studenttab10k' as (name, age, gpa);
104
B = group A by age;
104
B = group A by age;
105
C = foreach B generate group, COUNT (A);
105
C = foreach B generate group, COUNT (A);
106
D = filter C by group.age &lt;30;
106
D = filter C by group.age &lt;30;
107
</source>
107
</source>
108
<p></p>
108
<p></p>
109

    
   
109

   
110
<p>Please note that the script above can be made more efficient by performing filtering before the GROUP statement:</p>
110
<p>Please note that the script above can be made more efficient by performing filtering before the GROUP statement:</p>
111

    
   
111

   
112
<source>
112
<source>
113
A = load 'studenttab10k' as (name, age, gpa);
113
A = load 'studenttab10k' as (name, age, gpa);
114
B = filter A by age &lt;30;
114
B = filter A by age &lt;30;
115
C = group B by age;
115
C = group B by age;
116
D = foreach C generate group, COUNT (B);
116
D = foreach C generate group, COUNT (B);
117
</source>
117
</source>
118
<p></p>
118
<p></p>
119

    
   
119

   
120
<p><strong>Note:</strong> One exception to the above rule is LIMIT. Starting with Pig 0.9, even if LIMIT comes between GROUP and FOREACH, the combiner will still be used. In this example, the optimizer will push LIMIT above FOREACH but this will not prevent the use of the combiner.</p>
120
<p><strong>Note:</strong> One exception to the above rule is LIMIT. Starting with Pig 0.9, even if LIMIT comes between GROUP and FOREACH, the combiner will still be used. In this example, the optimizer will push LIMIT above FOREACH but this will not prevent the use of the combiner.</p>
121

    
   
121

   
122
<source>
122
<source>
123
A = load 'studenttab10k' as (name, age, gpa);
123
A = load 'studenttab10k' as (name, age, gpa);
124
B = group A by age;
124
B = group A by age;
125
C = foreach B generate group, COUNT (A);
125
C = foreach B generate group, COUNT (A);
126
D = limit C 20;
126
D = limit C 20;
127
</source>
127
</source>
128
<p></p>
128
<p></p>
129

    
   
129

   
130
<p>The combiner is also not used in the case where multiple FOREACH statements are associated with the same GROUP:</p>
130
<p>The combiner is also not used in the case where multiple FOREACH statements are associated with the same GROUP:</p>
131

    
   
131

   
132
<source>
132
<source>
133
A = load 'studenttab10k' as (name, age, gpa);
133
A = load 'studenttab10k' as (name, age, gpa);
134
B = group A by age;
134
B = group A by age;
135
C = foreach B generate group, COUNT (A);
135
C = foreach B generate group, COUNT (A);
136
D = foreach B generate group, MIN (A.gpa). MAX(A.gpa);
136
D = foreach B generate group, MIN (A.gpa). MAX(A.gpa);
137
.....
137
.....
138
</source>
138
</source>
139

    
   
139

   
140
<p>Depending on your use case, it might be more efficient (improve performance) to split your script into multiple scripts.</p>
140
<p>Depending on your use case, it might be more efficient (improve performance) to split your script into multiple scripts.</p>
141
 </section>
141
 </section>
142
</section> 
142
</section> 
143

    
   
143

   
144
<!-- ================================================================== -->
144
<!-- ================================================================== -->
145
<!-- HASH-BASED AGGREGATION IN MAP TASK-->
145
<!-- HASH-BASED AGGREGATION IN MAP TASK-->
146
 <section id="hash-based-aggregation">
146
 <section id="hash-based-aggregation">
147
<title>Hash-based Aggregation in Map Task</title>
147
<title>Hash-based Aggregation in Map Task</title>
148

    
   
148

   
149
<p> To improve performance, hash-based aggregation will aggregate records in the map task before sending them to the combiner. This  optimization reduces the serializing/deserializing costs of the combiner by sending it fewer records.</p>
149
<p> To improve performance, hash-based aggregation will aggregate records in the map task before sending them to the combiner. This  optimization reduces the serializing/deserializing costs of the combiner by sending it fewer records.</p>
150

    
   
150

   
151
<p><strong>Turning On Off</strong></p>
151
<p><strong>Turning On Off</strong></p>
152
<p>Hash-based aggregation has been shown to improve the speed of group-by operations by up to 50%. However, since this is a very new feature, it is currently turned OFF by default. To turn it ON, set the property pig.exec.mapPartAgg to true.</p>
152
<p>Hash-based aggregation has been shown to improve the speed of group-by operations by up to 50%. However, since this is a very new feature, it is currently turned OFF by default. To turn it ON, set the property pig.exec.mapPartAgg to true.</p>
153

    
   
153

   
154
<p><strong>Configuring</strong></p>
154
<p><strong>Configuring</strong></p>
155
<p>If the group-by keys used for grouping don't result in a sufficient reduction in the number of records, the performance might be worse with this feature turned ON. To prevent this from happening, the feature turns itself off if the reduction in records sent to combiner is not more than a configurable threshold. This threshold can be set using the property pig.exec.mapPartAgg.minReduction. It is set to a default value of 10, which means that the number of records that get sent to the combiner should be reduced by a factor of 10 or more.</p>
155
<p>If the group-by keys used for grouping don't result in a sufficient reduction in the number of records, the performance might be worse with this feature turned ON. To prevent this from happening, the feature turns itself off if the reduction in records sent to combiner is not more than a configurable threshold. This threshold can be set using the property pig.exec.mapPartAgg.minReduction. It is set to a default value of 10, which means that the number of records that get sent to the combiner should be reduced by a factor of 10 or more.</p>
156

    
   
156

   
157
</section>
157
</section>
158
  
158
  
159
<!-- ================================================================== -->
159
<!-- ================================================================== -->
160
<!-- MEMORY MANAGEMENT -->
160
<!-- MEMORY MANAGEMENT -->
161
<section id="memory-management">
161
<section id="memory-management">
162
<title>Memory Management</title>
162
<title>Memory Management</title>
163

    
   
163

   
164
<p>Pig allocates a fix amount of memory to store bags and spills to disk as soon as the memory limit is reached. This is very similar to how Hadoop decides when to spill data accumulated by the combiner. </p>
164
<p>Pig allocates a fix amount of memory to store bags and spills to disk as soon as the memory limit is reached. This is very similar to how Hadoop decides when to spill data accumulated by the combiner. </p>
165

    
   
165

   
166
<p id="memory-bags">The amount of memory allocated to bags is determined by pig.cachedbag.memusage; the default is set to 20% (0.2) of available memory. Note that this memory is shared across all large bags used by the application.</p>
166
<p id="memory-bags">The amount of memory allocated to bags is determined by pig.cachedbag.memusage; the default is set to 20% (0.2) of available memory. Note that this memory is shared across all large bags used by the application.</p>
167

    
   
167

   
168
</section> 
168
</section> 
169

    
   
169

   
170

    
   
170

   
171
<!-- ==================================================================== -->
171
<!-- ==================================================================== -->
172
<!-- MULTI-QUERY EXECUTION-->
172
<!-- MULTI-QUERY EXECUTION-->
173
<section id="multi-query-execution">
173
<section id="multi-query-execution">
174
<title>Multi-Query Execution</title>
174
<title>Multi-Query Execution</title>
175
<p>With multi-query execution Pig processes an entire script or a batch of statements at once.</p>
175
<p>With multi-query execution Pig processes an entire script or a batch of statements at once.</p>
176

    
   
176

   
177
<section>
177
<section>
178
	<title>Turning it On or Off</title>	
178
	<title>Turning it On or Off</title>	
179
	<p>Multi-query execution is turned on by default. 
179
	<p>Multi-query execution is turned on by default. 
180
	To turn it off and revert to Pig's "execute-on-dump/store" behavior, use the "-M" or "-no_multiquery" options. </p>
180
	To turn it off and revert to Pig's "execute-on-dump/store" behavior, use the "-M" or "-no_multiquery" options. </p>
181
	<p>To run script "myscript.pig" without the optimization, execute Pig as follows: </p>
181
	<p>To run script "myscript.pig" without the optimization, execute Pig as follows: </p>
182
<source>
182
<source>
183
$ pig -M myscript.pig
183
$ pig -M myscript.pig
184
or
184
or
185
$ pig -no_multiquery myscript.pig
185
$ pig -no_multiquery myscript.pig
186
</source>
186
</source>
187
</section>
187
</section>
188

    
   
188

   
189
<section>
189
<section>
190
<title>How it Works</title>
190
<title>How it Works</title>
191
<p>Multi-query execution introduces some changes:</p>
191
<p>Multi-query execution introduces some changes:</p>
192

    
   
192

   
193
<ul>
193
<ul>
194
<li>
194
<li>
195
<p>For batch mode execution, the entire script is first parsed to determine if intermediate tasks 
195
<p>For batch mode execution, the entire script is first parsed to determine if intermediate tasks 
196
can be combined to reduce the overall amount of work that needs to be done; execution starts only after the parsing is completed 
196
can be combined to reduce the overall amount of work that needs to be done; execution starts only after the parsing is completed 
197
(see the <a href="test.html#EXPLAIN">EXPLAIN</a> operator and the <a href="cmds.html#run">run</a> and <a href="cmds.html#exec">exec</a> commands). </p>
197
(see the <a href="test.html#EXPLAIN">EXPLAIN</a> operator and the <a href="cmds.html#run">run</a> and <a href="cmds.html#exec">exec</a> commands). </p>
198

    
   
198

   
199
</li>
199
</li>
200
<li>
200
<li>
201
<p>Two run scenarios are optimized, as explained below: explicit and implicit splits, and storing intermediate results.</p>
201
<p>Two run scenarios are optimized, as explained below: explicit and implicit splits, and storing intermediate results.</p>
202
</li>
202
</li>
203
</ul>
203
</ul>
204

    
   
204

   
205
<section id="splits">
205
<section id="splits">
206
	<title>Explicit and Implicit Splits</title>
206
	<title>Explicit and Implicit Splits</title>
207
<p>There might be cases in which you want different processing on separate parts of the same data stream.</p>
207
<p>There might be cases in which you want different processing on separate parts of the same data stream.</p>
208
<p>Example 1:</p>
208
<p>Example 1:</p>
209
<source>
209
<source>
210
A = LOAD ...
210
A = LOAD ...
211
...
211
...
212
SPLIT A' INTO B IF ..., C IF ...
212
SPLIT A' INTO B IF ..., C IF ...
213
...
213
...
214
STORE B' ...
214
STORE B' ...
215
STORE C' ...
215
STORE C' ...
216
</source>
216
</source>
217
<p>Example 2:</p>
217
<p>Example 2:</p>
218
<source>
218
<source>
219
A = LOAD ...
219
A = LOAD ...
220
...
220
...
221
B = FILTER A' ...
221
B = FILTER A' ...
222
C = FILTER A' ...
222
C = FILTER A' ...
223
...
223
...
224
STORE B' ...
224
STORE B' ...
225
STORE C' ...
225
STORE C' ...
226
</source>
226
</source>
227
<p>In prior Pig releases, Example 1 will dump A' to disk and then start jobs for B' and C'. 
227
<p>In prior Pig releases, Example 1 will dump A' to disk and then start jobs for B' and C'. 
228
Example 2 will execute all the dependencies of B' and store it and then execute all the dependencies of C' and store it. 
228
Example 2 will execute all the dependencies of B' and store it and then execute all the dependencies of C' and store it. 
229
Both are equivalent, but the performance will be different. </p>
229
Both are equivalent, but the performance will be different. </p>
230
<p>Here's what the multi-query execution does to increase the performance: </p>
230
<p>Here's what the multi-query execution does to increase the performance: </p>
231
	<ul>
231
	<ul>
232
		<li><p>For Example 2, adds an implicit split to transform the query to Example 1. 
232
		<li><p>For Example 2, adds an implicit split to transform the query to Example 1. 
233
		This eliminates the processing of A' multiple times.</p></li>
233
		This eliminates the processing of A' multiple times.</p></li>
234
		<li><p>Makes the split non-blocking and allows processing to continue. 
234
		<li><p>Makes the split non-blocking and allows processing to continue. 
235
		This helps reduce the amount of data that has to be stored right at the split.  </p></li>
235
		This helps reduce the amount of data that has to be stored right at the split.  </p></li>
236
		<li><p>Allows multiple outputs from a job. This way some results can be stored as a side-effect of the main job. 
236
		<li><p>Allows multiple outputs from a job. This way some results can be stored as a side-effect of the main job. 
237
		This is also necessary to make the previous item work.  </p></li>
237
		This is also necessary to make the previous item work.  </p></li>
238
		<li><p>Allows multiple split branches to be carried on to the combiner/reducer. 
238
		<li><p>Allows multiple split branches to be carried on to the combiner/reducer. 
239
		This reduces the amount of IO again in the case where multiple branches in the split can benefit from a combiner run. </p></li>
239
		This reduces the amount of IO again in the case where multiple branches in the split can benefit from a combiner run. </p></li>
240
	</ul>
240
	</ul>
241
</section>
241
</section>
242

    
   
242

   
243
<section id="data-store-performance">
243
<section id="data-store-performance">
244
	<title>Storing Intermediate Results</title>
244
	<title>Storing Intermediate Results</title>
245
<p>Sometimes it is necessary to store intermediate results. </p>
245
<p>Sometimes it is necessary to store intermediate results. </p>
246

    
   
246

   
247
<source>
247
<source>
248
A = LOAD ...
248
A = LOAD ...
249
...
249
...
250
STORE A'
250
STORE A'
251
...
251
...
252
STORE A''
252
STORE A''
253
</source>
253
</source>
254

    
   
254

   
255
<p>If the script doesn't re-load A' for the processing of A the steps above A' will be duplicated. 
255
<p>If the script doesn't re-load A' for the processing of A the steps above A' will be duplicated. 
256
This is a special case of Example 2 above, so the same steps are recommended. 
256
This is a special case of Example 2 above, so the same steps are recommended. 
257
With multi-query execution, the script will process A and dump A' as a side-effect.</p>
257
With multi-query execution, the script will process A and dump A' as a side-effect.</p>
258
</section>
258
</section>
259
</section>
259
</section>
260

    
   
260

   
261

    
   
261

   
262
<!-- ++++++++++++++++++++++++++++++++++++++++++ -->
262
<!-- ++++++++++++++++++++++++++++++++++++++++++ -->
263
<section id="store-dump">
263
<section id="store-dump">
264
	<title>Store vs. Dump</title>
264
	<title>Store vs. Dump</title>
265
	<p>With multi-query exection, you want to use <a href="basic.html#STORE">STORE</a> to save (persist) your results. 
265
	<p>With multi-query exection, you want to use <a href="basic.html#STORE">STORE</a> to save (persist) your results. 
266
	You do not want to use <a href="test.html#DUMP">DUMP</a> as it will disable multi-query execution and is likely to slow down execution. (If you have included DUMP statements in your scripts for debugging purposes, you should remove them.) </p>
266
	You do not want to use <a href="test.html#DUMP">DUMP</a> as it will disable multi-query execution and is likely to slow down execution. (If you have included DUMP statements in your scripts for debugging purposes, you should remove them.) </p>
267
	
267
	
268
	<p>DUMP Example: In this script, because the DUMP command is interactive, the multi-query execution will be disabled and two separate jobs will be created to execute this script. The first job will execute A > B > DUMP while the second job will execute A > B > C > STORE.</p>
268
	<p>DUMP Example: In this script, because the DUMP command is interactive, the multi-query execution will be disabled and two separate jobs will be created to execute this script. The first job will execute A > B > DUMP while the second job will execute A > B > C > STORE.</p>
269
	
269
	
270
<source>
270
<source>
271
A = LOAD 'input' AS (x, y, z);
271
A = LOAD 'input' AS (x, y, z);
272
B = FILTER A BY x > 5;
272
B = FILTER A BY x > 5;
273
DUMP B;
273
DUMP B;
274
C = FOREACH B GENERATE y, z;
274
C = FOREACH B GENERATE y, z;
275
STORE C INTO 'output';
275
STORE C INTO 'output';
276
</source>
276
</source>
277
	
277
	
278
	<p>STORE Example: In this script, multi-query optimization will kick in allowing the entire script to be executed as a single job. Two outputs are produced: output1 and output2.</p>
278
	<p>STORE Example: In this script, multi-query optimization will kick in allowing the entire script to be executed as a single job. Two outputs are produced: output1 and output2.</p>
279
	
279
	
280
<source>
280
<source>
281
A = LOAD 'input' AS (x, y, z);
281
A = LOAD 'input' AS (x, y, z);
282
B = FILTER A BY x > 5;
282
B = FILTER A BY x > 5;
283
STORE B INTO 'output1';
283
STORE B INTO 'output1';
284
C = FOREACH B GENERATE y, z;
284
C = FOREACH B GENERATE y, z;
285
STORE C INTO 'output2';	
285
STORE C INTO 'output2';	
286
</source>
286
</source>
287

    
   
287

   
288
</section>
288
</section>
289

    
   
289

   
290
<!-- ++++++++++++++++++++++++++++++++++++++++++ -->
290
<!-- ++++++++++++++++++++++++++++++++++++++++++ -->
291
<section id="error-handling">
291
<section id="error-handling">
292
	<title>Error Handling</title>
292
	<title>Error Handling</title>
293
	<p>With multi-query execution Pig processes an entire script or a batch of statements at once. 
293
	<p>With multi-query execution Pig processes an entire script or a batch of statements at once. 
294
	By default Pig tries to run all the jobs that result from that, regardless of whether some jobs fail during execution. 
294
	By default Pig tries to run all the jobs that result from that, regardless of whether some jobs fail during execution. 
295
	To check which jobs have succeeded or failed use one of these options. </p>
295
	To check which jobs have succeeded or failed use one of these options. </p>
296
	
296
	
297
	<p>First, Pig logs all successful and failed store commands. Store commands are identified by output path. 
297
	<p>First, Pig logs all successful and failed store commands. Store commands are identified by output path. 
298
	At the end of execution a summary line indicates success, partial failure or failure of all store commands. </p>	
298
	At the end of execution a summary line indicates success, partial failure or failure of all store commands. </p>	
299
	
299
	
300
	<p>Second, Pig returns different code upon completion for these scenarios:</p>
300
	<p>Second, Pig returns different code upon completion for these scenarios:</p>
301
	<ul>
301
	<ul>
302
		<li><p>Return code 0: All jobs succeeded</p></li>
302
		<li><p>Return code 0: All jobs succeeded</p></li>
303
		<li><p>Return code 1: <em>Used for retrievable errors</em> </p></li>
303
		<li><p>Return code 1: <em>Used for retrievable errors</em> </p></li>
304
		<li><p>Return code 2: All jobs have failed </p></li>
304
		<li><p>Return code 2: All jobs have failed </p></li>
305
		<li><p>Return code 3: Some jobs have failed  </p></li>
305
		<li><p>Return code 3: Some jobs have failed  </p></li>
306
	</ul>
306
	</ul>
307
	<p></p>
307
	<p></p>
308
	<p>In some cases it might be desirable to fail the entire script upon detecting the first failed job. 
308
	<p>In some cases it might be desirable to fail the entire script upon detecting the first failed job. 
309
	This can be achieved with the "-F" or "-stop_on_failure" command line flag. 
309
	This can be achieved with the "-F" or "-stop_on_failure" command line flag. 
310
	If used, Pig will stop execution when the first failed job is detected and discontinue further processing. 
310
	If used, Pig will stop execution when the first failed job is detected and discontinue further processing. 
311
	This also means that file commands that come after a failed store in the script will not be executed (this can be used to create "done" files). </p>
311
	This also means that file commands that come after a failed store in the script will not be executed (this can be used to create "done" files). </p>
312
	
312
	
313
	<p>This is how the flag is used: </p>
313
	<p>This is how the flag is used: </p>
314
<source>
314
<source>
315
$ pig -F myscript.pig
315
$ pig -F myscript.pig
316
or
316
or
317
$ pig -stop_on_failure myscript.pig
317
$ pig -stop_on_failure myscript.pig
318
</source>
318
</source>
319
</section>
319
</section>
320

    
   
320

   
321
<!-- ++++++++++++++++++++++++++++++++++++++++++ -->
321
<!-- ++++++++++++++++++++++++++++++++++++++++++ -->
322
<section id="backward-compatibility">
322
<section id="backward-compatibility">
323
	<title>Backward Compatibility</title>
323
	<title>Backward Compatibility</title>
324
	
324
	
325
	<p>Most existing Pig scripts will produce the same result with or without the multi-query execution. 
325
	<p>Most existing Pig scripts will produce the same result with or without the multi-query execution. 
326
	There are cases though where this is not true. Path names and schemes are discussed here.</p>
326
	There are cases though where this is not true. Path names and schemes are discussed here.</p>
327
	
327
	
328
	<p>Any script is parsed in it's entirety before it is sent to execution. Since the current directory can change 
328
	<p>Any script is parsed in it's entirety before it is sent to execution. Since the current directory can change 
329
	throughout the script any path used in LOAD or STORE statement is translated to a fully qualified and absolute path.</p>
329
	throughout the script any path used in LOAD or STORE statement is translated to a fully qualified and absolute path.</p>
330
		
330
		
331
	<p>In map-reduce mode, the following script will load from "hdfs://&lt;host&gt;:&lt;port&gt;/data1" and store into "hdfs://&lt;host&gt;:&lt;port&gt;/tmp/out1". </p>
331
	<p>In map-reduce mode, the following script will load from "hdfs://&lt;host&gt;:&lt;port&gt;/data1" and store into "hdfs://&lt;host&gt;:&lt;port&gt;/tmp/out1". </p>
332
<source>
332
<source>
333
cd /;
333
cd /;
334
A = LOAD 'data1';
334
A = LOAD 'data1';
335
cd tmp;
335
cd tmp;
336
STORE A INTO 'out1';
336
STORE A INTO 'out1';
337
</source>
337
</source>
338

    
   
338

   
339
	<p>These expanded paths will be passed to any LoadFunc or Slicer implementation. 
339
	<p>These expanded paths will be passed to any LoadFunc or Slicer implementation. 
340
	In some cases this can cause problems, especially when a LoadFunc/Slicer is not used to read from a dfs file or path 
340
	In some cases this can cause problems, especially when a LoadFunc/Slicer is not used to read from a dfs file or path 
341
	(for example, loading from an SQL database). </p>
341
	(for example, loading from an SQL database). </p>
342
	
342
	
343
	<p>Solutions are to either: </p>
343
	<p>Solutions are to either: </p>
344
	<ul>
344
	<ul>
345
		<li><p>Specify "-M" or "-no_multiquery" to revert to the old names</p></li>
345
		<li><p>Specify "-M" or "-no_multiquery" to revert to the old names</p></li>
346
		<li><p>Specify a custom scheme for the LoadFunc/Slicer </p></li>
346
		<li><p>Specify a custom scheme for the LoadFunc/Slicer </p></li>
347
	</ul>	
347
	</ul>	
348
	
348
	
349
	<p>Arguments used in a LOAD statement that have a scheme other than "hdfs" or "file" will not be expanded and passed to the LoadFunc/Slicer unchanged.</p>
349
	<p>Arguments used in a LOAD statement that have a scheme other than "hdfs" or "file" will not be expanded and passed to the LoadFunc/Slicer unchanged.</p>
350
	<p>In the SQL case, the SQLLoader function is invoked with 'sql://mytable'. </p>
350
	<p>In the SQL case, the SQLLoader function is invoked with 'sql://mytable'. </p>
351

    
   
351

   
352
<source>
352
<source>
353
A = LOAD 'sql://mytable' USING SQLLoader();
353
A = LOAD 'sql://mytable' USING SQLLoader();
354
</source>
354
</source>
355
</section>
355
</section>
356

    
   
356

   
357
<section id="Implicit-Dependencies">
357
<section id="Implicit-Dependencies">
358
	<title>Implicit Dependencies</title>
358
	<title>Implicit Dependencies</title>
359
<p>If a script has dependencies on the execution order outside of what Pig knows about, execution may fail. </p>
359
<p>If a script has dependencies on the execution order outside of what Pig knows about, execution may fail. </p>
360

    
   
360

   
361

    
   
361

   
362
<section>
362
<section>
363
	<title>Example</title>
363
	<title>Example</title>
364
<p>In this script, MYUDF might try to read from out1, a file that A was just stored into. 
364
<p>In this script, MYUDF might try to read from out1, a file that A was just stored into. 
365
However, Pig does not know that MYUDF depends on the out1 file and might submit the jobs 
365
However, Pig does not know that MYUDF depends on the out1 file and might submit the jobs 
366
producing the out2 and out1 files at the same time.</p>
366
producing the out2 and out1 files at the same time.</p>
367
<source>
367
<source>
368
...
368
...
369
STORE A INTO 'out1';
369
STORE A INTO 'out1';
370
B = LOAD 'data2';
370
B = LOAD 'data2';
371
C = FOREACH B GENERATE MYUDF($0,'out1');
371
C = FOREACH B GENERATE MYUDF($0,'out1');
372
STORE C INTO 'out2';
372
STORE C INTO 'out2';
373
</source>
373
</source>
374

    
   
374

   
375
<p>To make the script work (to ensure that the right execution order is enforced) add the exec statement. 
375
<p>To make the script work (to ensure that the right execution order is enforced) add the exec statement. 
376
The exec statement will trigger the execution of the statements that produce the out1 file. </p>
376
The exec statement will trigger the execution of the statements that produce the out1 file. </p>
377

    
   
377

   
378
<source>
378
<source>
379
...
379
...
380
STORE A INTO 'out1';
380
STORE A INTO 'out1';
381
EXEC;
381
EXEC;
382
B = LOAD 'data2';
382
B = LOAD 'data2';
383
C = FOREACH B GENERATE MYUDF($0,'out1');
383
C = FOREACH B GENERATE MYUDF($0,'out1');
384
STORE C INTO 'out2';
384
STORE C INTO 'out2';
385
</source>
385
</source>
386
</section>
386
</section>
387

    
   
387

   
388
<section>
388
<section>
389
	<title>Example</title>
389
	<title>Example</title>
390
<p>In this script, the STORE/LOAD operators have different file paths; however, the LOAD operator depends on the STORE operator.</p>
390
<p>In this script, the STORE/LOAD operators have different file paths; however, the LOAD operator depends on the STORE operator.</p>
391
<source>
391
<source>
392
A = LOAD '/user/xxx/firstinput' USING PigStorage();
392
A = LOAD '/user/xxx/firstinput' USING PigStorage();
393
B = group ....
393
B = group ....
394
C = .... agrregation function
394
C = .... agrregation function
395
STORE C INTO '/user/vxj/firstinputtempresult/days1';
395
STORE C INTO '/user/vxj/firstinputtempresult/days1';
396
..
396
..
397
Atab = LOAD '/user/xxx/secondinput' USING  PigStorage();
397
Atab = LOAD '/user/xxx/secondinput' USING  PigStorage();
398
Btab = group ....
398
Btab = group ....
399
Ctab = .... agrregation function
399
Ctab = .... agrregation function
400
STORE Ctab INTO '/user/vxj/secondinputtempresult/days1';
400
STORE Ctab INTO '/user/vxj/secondinputtempresult/days1';
401
..
401
..
402
E = LOAD '/user/vxj/firstinputtempresult/' USING  PigStorage();
402
E = LOAD '/user/vxj/firstinputtempresult/' USING  PigStorage();
403
F = group ....
403
F = group ....
404
G = .... aggregation function
404
G = .... aggregation function
405
STORE G INTO '/user/vxj/finalresult1';
405
STORE G INTO '/user/vxj/finalresult1';
406

    
   
406

   
407
Etab =LOAD '/user/vxj/secondinputtempresult/' USING  PigStorage();
407
Etab =LOAD '/user/vxj/secondinputtempresult/' USING  PigStorage();
408
Ftab = group ....
408
Ftab = group ....
409
Gtab = .... aggregation function
409
Gtab = .... aggregation function
410
STORE Gtab INTO '/user/vxj/finalresult2';
410
STORE Gtab INTO '/user/vxj/finalresult2';
411
</source>
411
</source>
412

    
   
412

   
413
<p>To make the script works, add the exec statement.  </p>
413
<p>To make the script works, add the exec statement.  </p>
414

    
   
414

   
415
<source>
415
<source>
416
A = LOAD '/user/xxx/firstinput' USING PigStorage();
416
A = LOAD '/user/xxx/firstinput' USING PigStorage();
417
B = group ....
417
B = group ....
418
C = .... agrregation function
418
C = .... agrregation function
419
STORE C INTO '/user/vxj/firstinputtempresult/days1';
419
STORE C INTO '/user/vxj/firstinputtempresult/days1';
420
..
420
..
421
Atab = LOAD '/user/xxx/secondinput' USING  PigStorage();
421
Atab = LOAD '/user/xxx/secondinput' USING  PigStorage();
422
Btab = group ....
422
Btab = group ....
423
Ctab = .... agrregation function
423
Ctab = .... agrregation function
424
STORE Ctab INTO '/user/vxj/secondinputtempresult/days1';
424
STORE Ctab INTO '/user/vxj/secondinputtempresult/days1';
425

    
   
425

   
426
EXEC;
426
EXEC;
427

    
   
427

   
428
E = LOAD '/user/vxj/firstinputtempresult/' USING  PigStorage();
428
E = LOAD '/user/vxj/firstinputtempresult/' USING  PigStorage();
429
F = group ....
429
F = group ....
430
G = .... aggregation function
430
G = .... aggregation function
431
STORE G INTO '/user/vxj/finalresult1';
431
STORE G INTO '/user/vxj/finalresult1';
432
..
432
..
433
Etab =LOAD '/user/vxj/secondinputtempresult/' USING  PigStorage();
433
Etab =LOAD '/user/vxj/secondinputtempresult/' USING  PigStorage();
434
Ftab = group ....
434
Ftab = group ....
435
Gtab = .... aggregation function
435
Gtab = .... aggregation function
436
STORE Gtab INTO '/user/vxj/finalresult2';
436
STORE Gtab INTO '/user/vxj/finalresult2';
437
</source>
437
</source>
438
</section>
438
</section>
439
</section>
439
</section>
440
</section>
440
</section>
441

    
   
441

   
442

    
   
442

   
443
<!-- ==================================================================== -->
443
<!-- ==================================================================== -->
444
 <!-- OPTIMIZATION RULES -->
444
 <!-- OPTIMIZATION RULES -->
445
<section id="optimization-rules">
445
<section id="optimization-rules">
446
<title>Optimization Rules</title>
446
<title>Optimization Rules</title>
447

    
   
447

   
448
<p>Pig supports various optimization rules. By default optimization, and all optimization rules, are turned on. 
448
<p>Pig supports various optimization rules. By default optimization, and all optimization rules, are turned on. 
449
To turn off optimiztion, use:</p>
449
To turn off optimiztion, use:</p>
450

    
   
450

   
451
<source>
451
<source>
452
pig -optimizer_off [opt_rule | all ]
452
pig -optimizer_off [opt_rule | all ]
453
</source>
453
</source>
454

    
   
454

   
455
<p>Note that some rules are mandatory and cannot be turned off.</p>
455
<p>Note that some rules are mandatory and cannot be turned off.</p>
456

    
   
456

   
457
<!-- +++++++++++++++++++++++++++++++ -->
457
<!-- +++++++++++++++++++++++++++++++ -->
458
<section id="FilterLogicExpressionSimplifier">
458
<section id="FilterLogicExpressionSimplifier">
459
<title>FilterLogicExpressionSimplifier</title>
459
<title>FilterLogicExpressionSimplifier</title>
460
<p>This rule simplifies the expression in filter statement.</p>
460
<p>This rule simplifies the expression in filter statement.</p>
461
<source>
461
<source>
462
1) Constant pre-calculation 
462
1) Constant pre-calculation 
463

    
   
463

   
464
B = FILTER A BY a0 &gt; 5+7; 
464
B = FILTER A BY a0 &gt; 5+7; 
465
is simplified to 
465
is simplified to 
466
B = FILTER A BY a0 &gt; 12; 
466
B = FILTER A BY a0 &gt; 12; 
467

    
   
467

   
468
2) Elimination of negations 
468
2) Elimination of negations 
469

    
   
469

   
470
B = FILTER A BY NOT (NOT(a0 &gt; 5) OR a &gt; 10); 
470
B = FILTER A BY NOT (NOT(a0 &gt; 5) OR a &gt; 10); 
471
is simplified to 
471
is simplified to 
472
B = FILTER A BY a0 &gt; 5 AND a &lt;= 10; 
472
B = FILTER A BY a0 &gt; 5 AND a &lt;= 10; 
473

    
   
473

   
474
3) Elimination of logical implied expression in AND 
474
3) Elimination of logical implied expression in AND 
475

    
   
475

   
476
B = FILTER A BY (a0 &gt; 5 AND a0 &gt; 7); 
476
B = FILTER A BY (a0 &gt; 5 AND a0 &gt; 7); 
477
is simplified to 
477
is simplified to 
478
B = FILTER A BY a0 &gt; 7; 
478
B = FILTER A BY a0 &gt; 7; 
479

    
   
479

   
480
4) Elimination of logical implied expression in OR 
480
4) Elimination of logical implied expression in OR 
481

    
   
481

   
482
B = FILTER A BY ((a0 &gt; 5) OR (a0 &gt; 6 AND a1 &gt; 15); 
482
B = FILTER A BY ((a0 &gt; 5) OR (a0 &gt; 6 AND a1 &gt; 15); 
483
is simplified to 
483
is simplified to 
484
B = FILTER C BY a0 &gt; 5; 
484
B = FILTER C BY a0 &gt; 5; 
485

    
   
485

   
486
5) Equivalence elimination 
486
5) Equivalence elimination 
487

    
   
487

   
488
B = FILTER A BY (a0 v 5 AND a0 &gt; 5); 
488
B = FILTER A BY (a0 v 5 AND a0 &gt; 5); 
489
is simplified to 
489
is simplified to 
490
B = FILTER A BY a0 &gt; 5; 
490
B = FILTER A BY a0 &gt; 5; 
491

    
   
491

   
492
6) Elimination of complementary expressions in OR 
492
6) Elimination of complementary expressions in OR 
493

    
   
493

   
494
B = FILTER A BY (a0 &gt; 5 OR a0 &lt;= 5); 
494
B = FILTER A BY (a0 &gt; 5 OR a0 &lt;= 5); 
495
is simplified to non-filtering 
495
is simplified to non-filtering 
496

    
   
496

   
497
7) Elimination of naive TRUE expression 
497
7) Elimination of naive TRUE expression 
498

    
   
498

   
499
B = FILTER A BY 1==1; 
499
B = FILTER A BY 1==1; 
500
is simplified to non-filtering 
500
is simplified to non-filtering 
501
</source>
501
</source>
502
</section>
502
</section>
503

    
   
503

   
504
<!-- +++++++++++++++++++++++++++++++ -->
504
<!-- +++++++++++++++++++++++++++++++ -->
505
<section id="SplitFilter">
505
<section id="SplitFilter">
506
<title>SplitFilter</title>
506
<title>SplitFilter</title>
507
<p>Split filter conditions so that we can push filter more aggressively.</p>
507
<p>Split filter conditions so that we can push filter more aggressively.</p>
508
<source>
508
<source>
509
A = LOAD 'input1' as (a0, a1);
509
A = LOAD 'input1' as (a0, a1);
510
B = LOAD 'input2' as (b0, b1);
510
B = LOAD 'input2' as (b0, b1);
511
C = JOIN A by a0, B by b0;
511
C = JOIN A by a0, B by b0;
512
D = FILTER C BY a1&gt;0 and b1&gt;0;
512
D = FILTER C BY a1&gt;0 and b1&gt;0;
513
</source>
513
</source>
514
<p>Here D will be splitted into:</p>
514
<p>Here D will be splitted into:</p>
515
<source>
515
<source>
516
X = FILTER C BY a1&gt;0;
516
X = FILTER C BY a1&gt;0;
517
D = FILTER X BY b1&gt;0;
517
D = FILTER X BY b1&gt;0;
518
</source>
518
</source>
519
<p>So "a1&gt;0" and "b1&gt;0" can be pushed up individually.</p>
519
<p>So "a1&gt;0" and "b1&gt;0" can be pushed up individually.</p>
520
</section>
520
</section>
521

    
   
521

   
522
<!-- +++++++++++++++++++++++++++++++ -->
522
<!-- +++++++++++++++++++++++++++++++ -->
523
<section id="PushUpFilter">
523
<section id="PushUpFilter">
524
<title>PushUpFilter</title>
524
<title>PushUpFilter</title>
525
<p>The objective of this rule is to push the FILTER operators up the data flow graph. As a result, the number of records that flow through the pipeline is reduced. </p>
525
<p>The objective of this rule is to push the FILTER operators up the data flow graph. As a result, the number of records that flow through the pipeline is reduced. </p>
526
<source>
526
<source>
527
A = LOAD 'input';
527
A = LOAD 'input';
528
B = GROUP A BY $0;
528
B = GROUP A BY $0;
529
C = FILTER B BY $0 &lt; 10;
529
C = FILTER B BY $0 &lt; 10;
530
</source>
530
</source>
531
</section>
531
</section>
532

    
   
532

   
533
<!-- +++++++++++++++++++++++++++++++ -->
533
<!-- +++++++++++++++++++++++++++++++ -->
534
<section id="MergeFilter">
534
<section id="MergeFilter">
535
<title>MergeFilter</title>
535
<title>MergeFilter</title>
536
<p>Merge filter conditions after PushUpFilter rule to decrease the number of filter statements.</p>
536
<p>Merge filter conditions after PushUpFilter rule to decrease the number of filter statements.</p>
537
</section>
537
</section>
538

    
   
538

   
539
<!-- +++++++++++++++++++++++++++++++ -->
539
<!-- +++++++++++++++++++++++++++++++ -->
540
<section id="PushDownForEachFlatten">
540
<section id="PushDownForEachFlatten">
541
<title>PushDownForEachFlatten</title>
541
<title>PushDownForEachFlatten</title>
542
<p>The objective of this rule is to reduce the number of records that flow through the pipeline by moving FOREACH operators with a FLATTEN down the data flow graph. In the example shown below, it would be more efficient to move the foreach after the join to reduce the cost of the join operation.</p>
542
<p>The objective of this rule is to reduce the number of records that flow through the pipeline by moving FOREACH operators with a FLATTEN down the data flow graph. In the example shown below, it would be more efficient to move the foreach after the join to reduce the cost of the join operation.</p>
543
<source>
543
<source>
544
A = LOAD 'input' AS (a, b, c);
544
A = LOAD 'input' AS (a, b, c);
545
B = LOAD 'input2' AS (x, y, z);
545
B = LOAD 'input2' AS (x, y, z);
546
C = FOREACH A GENERATE FLATTEN($0), B, C;
546
C = FOREACH A GENERATE FLATTEN($0), B, C;
547
D = JOIN C BY $1, B BY $1;
547
D = JOIN C BY $1, B BY $1;
548
</source>
548
</source>
549
</section>
549
</section>
550

    
   
550

   
551
<!-- +++++++++++++++++++++++++++++++ -->
551
<!-- +++++++++++++++++++++++++++++++ -->
552
<section id="LimitOptimizer">
552
<section id="LimitOptimizer">
553
<title>LimitOptimizer</title>
553
<title>LimitOptimizer</title>
554
<p>The objective of this rule is to push the LIMIT operator up the data flow graph (or down the tree for database folks). In addition, for top-k (ORDER BY followed by a LIMIT) the LIMIT is pushed into the ORDER BY.</p>
554
<p>The objective of this rule is to push the LIMIT operator up the data flow graph (or down the tree for database folks). In addition, for top-k (ORDER BY followed by a LIMIT) the LIMIT is pushed into the ORDER BY.</p>
555
<source>
555
<source>
556
A = LOAD 'input';
556
A = LOAD 'input';
557
B = ORDER A BY $0;
557
B = ORDER A BY $0;
558
C = LIMIT B 10;
558
C = LIMIT B 10;
559
</source>
559
</source>
560
</section>
560
</section>
561

    
   
561

   
562
<!-- +++++++++++++++++++++++++++++++ -->
562
<!-- +++++++++++++++++++++++++++++++ -->
563
<section id="ColumnMapKeyPrune">
563
<section id="ColumnMapKeyPrune">
564
<title>ColumnMapKeyPrune</title>
564
<title>ColumnMapKeyPrune</title>
565
<p>Prune the loader to only load necessary columns. The performance gain is more significant if the corresponding loader support column pruning and only load necessary columns (See LoadPushDown.pushProjection). Otherwise, ColumnMapKeyPrune will insert a ForEach statement right after loader.</p>
565
<p>Prune the loader to only load necessary columns. The performance gain is more significant if the corresponding loader support column pruning and only load necessary columns (See LoadPushDown.pushProjection). Otherwise, ColumnMapKeyPrune will insert a ForEach statement right after loader.</p>
566
<source>
566
<source>
567
A = load 'input' as (a0, a1, a2);
567
A = load 'input' as (a0, a1, a2);
568
B = ORDER A by a0;
568
B = ORDER A by a0;
569
C = FOREACH B GENERATE a0, a1;
569
C = FOREACH B GENERATE a0, a1;
570
</source>
570
</source>
571
<p>a2 is irrelevant in this query, so we can prune it earlier. The loader in this query is PigStorage and it supports column pruning. So we only load a0 and a1 from the input file.</p>
571
<p>a2 is irrelevant in this query, so we can prune it earlier. The loader in this query is PigStorage and it supports column pruning. So we only load a0 and a1 from the input file.</p>
572
<p>ColumnMapKeyPrune also prunes unused map keys:</p>
572
<p>ColumnMapKeyPrune also prunes unused map keys:</p>
573
<source>
573
<source>
574
A = load 'input' as (a0:map[]);
574
A = load 'input' as (a0:map[]);
575
B = FOREACH A generate a0#'key1';
575
B = FOREACH A generate a0#'key1';
576
</source>
576
</source>
577
</section>
577
</section>
578

    
   
578

   
579
<!-- +++++++++++++++++++++++++++++++ -->
579
<!-- +++++++++++++++++++++++++++++++ -->
580
<section id="AddForEach">
580
<section id="AddForEach">
581
<title>AddForEach</title>
581
<title>AddForEach</title>
582
<p>Prune unused column as soon as possible. In addition to prune the loader in ColumnMapKeyPrune, we can prune a column as soon as it is not used in the rest of the script</p>
582
<p>Prune unused column as soon as possible. In addition to prune the loader in ColumnMapKeyPrune, we can prune a column as soon as it is not used in the rest of the script</p>
583
<source>
583
<source>
584
-- Original code: 
584
-- Original code: 
585

    
   
585

   
586
A = LOAD 'input' AS (a0, a1, a2); 
586
A = LOAD 'input' AS (a0, a1, a2); 
587
B = ORDER A BY a0;
587
B = ORDER A BY a0;
588
C = FILTER B BY a1&gt;0;
588
C = FILTER B BY a1&gt;0;
589
</source>
589
</source>
590
<p>We can only prune a2 from the loader. However, a0 is never used after "ORDER BY". So we can drop a0 right after "ORDER BY" statement.</p>
590
<p>We can only prune a2 from the loader. However, a0 is never used after "ORDER BY". So we can drop a0 right after "ORDER BY" statement.</p>
591
<source>
591
<source>
592
-- Optimized code: 
592
-- Optimized code: 
593

    
   
593

   
594
A = LOAD 'input' AS (a0, a1, a2); 
594
A = LOAD 'input' AS (a0, a1, a2); 
595
B = ORDER A BY a0;
595
B = ORDER A BY a0;
596
B1 = FOREACH B GENERATE a1;  -- drop a0
596
B1 = FOREACH B GENERATE a1;  -- drop a0
597
C = FILTER B1 BY a1&gt;0;
597
C = FILTER B1 BY a1&gt;0;
598
</source>
598
</source>
599
</section>
599
</section>
600

    
   
600

   
601
<!-- +++++++++++++++++++++++++++++++ -->
601
<!-- +++++++++++++++++++++++++++++++ -->
602
<section id="MergeForEach">
602
<section id="MergeForEach">
603
<title>MergeForEach</title>
603
<title>MergeForEach</title>
604
<p>The objective of this rule is to merge together two feach statements, if these preconditions are met:</p>
604
<p>The objective of this rule is to merge together two feach statements, if these preconditions are met:</p>
605
<ul>
605
<ul>
606
<li>The foreach statements are consecutive.</li>
606
<li>The foreach statements are consecutive.</li>
607
<li>The first foreach statement does not contain flatten.</li>
607
<li>The first foreach statement does not contain flatten.</li>
608
<li>The second foreach is not nested.</li>
608
<li>The second foreach is not nested.</li>
609
</ul>
609
</ul>
610
<source>
610
<source>
611
-- Original code: 
611
-- Original code: 
612

    
   
612

   
613
A = LOAD 'file.txt' AS (a, b, c); 
613
A = LOAD 'file.txt' AS (a, b, c); 
614
B = FOREACH A GENERATE a+b AS u, c-b AS v; 
614
B = FOREACH A GENERATE a+b AS u, c-b AS v; 
615
C = FOREACH B GENERATE $0+5, v; 
615
C = FOREACH B GENERATE $0+5, v; 
616

    
   
616

   
617
-- Optimized code: 
617
-- Optimized code: 
618

    
   
618

   
619
A = LOAD 'file.txt' AS (a, b, c); 
619
A = LOAD 'file.txt' AS (a, b, c); 
620
C = FOREACH A GENERATE a+b+5, c-b;
620
C = FOREACH A GENERATE a+b+5, c-b;
621
</source>
621
</source>
622
</section>
622
</section>
623

    
   
623

   
624
<!-- +++++++++++++++++++++++++++++++ -->
624
<!-- +++++++++++++++++++++++++++++++ -->
625
<section id="GroupByConstParallelSetter">
625
<section id="GroupByConstParallelSetter">
626
<title>GroupByConstParallelSetter</title>
626
<title>GroupByConstParallelSetter</title>
627
<p>Force parallel "1" for "group all" statement. That's because even if we set parallel to N, only 1 reducer will be used in this case and all other reducer produce empty result.</p>
627
<p>Force parallel "1" for "group all" statement. That's because even if we set parallel to N, only 1 reducer will be used in this case and all other reducer produce empty result.</p>
628
<source>
628
<source>
629
A = LOAD 'input';
629
A = LOAD 'input';
630
B = GROUP A all PARALLEL 10;
630
B = GROUP A all PARALLEL 10;
631
</source>
631
</source>
632
</section>
632
</section>
633

    
   
633

   
634
</section>
634
</section>
635

    
   
635

   
636
  
636
  
637
<!-- ==================================================================== -->
637
<!-- ==================================================================== -->
638
<!-- PERFORMANCE ENHANCERS-->
638
<!-- PERFORMANCE ENHANCERS-->
639
<section id="performance-enhancers">
639
<section id="performance-enhancers">
640
<title>Performance Enhancers</title>
640
<title>Performance Enhancers</title>
641

    
   
641

   
642
<section>
642
<section>
643
<title>Use Optimization</title>
643
<title>Use Optimization</title>
644
<p>Pig supports various <a href="perf.html#Optimization-Rules">optimization rules</a> which are turned on by default. 
644
<p>Pig supports various <a href="perf.html#Optimization-Rules">optimization rules</a> which are turned on by default. 
645
Become familiar with these rules.</p>
645
Become familiar with these rules.</p>
646
</section>
646
</section>
647

    
   
647

   
648
<!-- +++++++++++++++++++++++++++++++ -->
648
<!-- +++++++++++++++++++++++++++++++ -->
649
<section id="types">
649
<section id="types">
650
<title>Use Types</title>
650
<title>Use Types</title>
651

    
   
651

   
652
<p>If types are not specified in the load statement, Pig assumes the type of =double= for numeric computations. 
652
<p>If types are not specified in the load statement, Pig assumes the type of =double= for numeric computations. 
653
A lot of the time, your data would be much smaller, maybe, integer or long. Specifying the real type will help with 
653
A lot of the time, your data would be much smaller, maybe, integer or long. Specifying the real type will help with 
654
speed of arithmetic computation. It has an additional advantage of early error detection. </p>
654
speed of arithmetic computation. It has an additional advantage of early error detection. </p>
655

    
   
655

   
656
<source>
656
<source>
657
--Query 1
657
--Query 1
658
A = load 'myfile' as (t, u, v);
658
A = load 'myfile' as (t, u, v);
659
B = foreach A generate t + u;
659
B = foreach A generate t + u;
660

    
   
660

   
661
--Query 2
661
--Query 2
662
A = load 'myfile' as (t: int, u: int, v);
662
A = load 'myfile' as (t: int, u: int, v);
663
B = foreach A generate t + u;
663
B = foreach A generate t + u;
664
</source>
664
</source>
665

    
   
665

   
666
<p>The second query will run more efficiently than the first. In some of our queries with see 2x speedup. </p>
666
<p>The second query will run more efficiently than the first. In some of our queries with see 2x speedup. </p>
667
</section>
667
</section>
668

    
   
668

   
669
<!-- +++++++++++++++++++++++++++++++ -->
669
<!-- +++++++++++++++++++++++++++++++ -->
670
<section id="projection">
670
<section id="projection">
671
<title>Project Early and Often </title>
671
<title>Project Early and Often </title>
672

    
   
672

   
673
<p>Pig does not (yet) determine when a field is no longer needed and drop the field from the row. For example, say you have a query like: </p>
673
<p>Pig does not (yet) determine when a field is no longer needed and drop the field from the row. For example, say you have a query like: </p>
674

    
   
674

   
675
<source>
675
<source>
676
A = load 'myfile' as (t, u, v);
676
A = load 'myfile' as (t, u, v);
677
B = load 'myotherfile' as (x, y, z);
677
B = load 'myotherfile' as (x, y, z);
678
C = join A by t, B by x;
678
C = join A by t, B by x;
679
D = group C by u;
679
D = group C by u;
680
E = foreach D generate group, COUNT($1);
680
E = foreach D generate group, COUNT($1);
681
</source>
681
</source>
682

    
   
682

   
683
<p>There is no need for v, y, or z to participate in this query.  And there is no need to carry both t and x past the join, just one will suffice. Changing the query above to the query below will greatly reduce the amount of data being carried through the map and reduce phases by pig. </p>
683
<p>There is no need for v, y, or z to participate in this query.  And there is no need to carry both t and x past the join, just one will suffice. Changing the query above to the query below will greatly reduce the amount of data being carried through the map and reduce phases by pig. </p>
684

    
   
684

   
685
<source>
685
<source>
686
A = load 'myfile' as (t, u, v);
686
A = load 'myfile' as (t, u, v);
687
A1 = foreach A generate t, u;
687
A1 = foreach A generate t, u;
688
B = load 'myotherfile' as (x, y, z);
688
B = load 'myotherfile' as (x, y, z);
689
B1 = foreach B generate x;
689
B1 = foreach B generate x;
690
C = join A1 by t, B1 by x;
690
C = join A1 by t, B1 by x;
691
C1 = foreach C generate t, u;
691
C1 = foreach C generate t, u;
692
D = group C1 by u;
692
D = group C1 by u;
693
E = foreach D generate group, COUNT($1);
693
E = foreach D generate group, COUNT($1);
694
</source>
694
</source>
695

    
   
695

   
696
<p>Depending on your data, this can produce significant time savings. In queries similar to the example shown here we have seen total time drop by 50%.</p>
696
<p>Depending on your data, this can produce significant time savings. In queries similar to the example shown here we have seen total time drop by 50%.</p>
697
</section>
697
</section>
698

    
   
698

   
699
<!-- +++++++++++++++++++++++++++++++ -->
699
<!-- +++++++++++++++++++++++++++++++ -->
700
<section id="filter">
700
<section id="filter">
701
<title>Filter Early and Often</title>
701
<title>Filter Early and Often</title>
702

    
   
702

   
703
<p>As with early projection, in most cases it is beneficial to apply filters as early as possible to reduce the amount of data flowing through the pipeline. </p>
703
<p>As with early projection, in most cases it is beneficial to apply filters as early as possible to reduce the amount of data flowing through the pipeline. </p>
704

    
   
704

   
705
<source>
705
<source>
706
-- Query 1
706
-- Query 1
707
A = load 'myfile' as (t, u, v);
707
A = load 'myfile' as (t, u, v);
708
B = load 'myotherfile' as (x, y, z);
708
B = load 'myotherfile' as (x, y, z);
709
C = filter A by t == 1;
709
C = filter A by t == 1;
710
D = join C by t, B by x;
710
D = join C by t, B by x;
711
E = group D by u;
711
E = group D by u;
712
F = foreach E generate group, COUNT($1);
712
F = foreach E generate group, COUNT($1);
713

    
   
713

   
714
-- Query 2
714
-- Query 2
715
A = load 'myfile' as (t, u, v);
715
A = load 'myfile' as (t, u, v);
716
B = load 'myotherfile' as (x, y, z);
716
B = load 'myotherfile' as (x, y, z);
717
C = join A by t, B by x;
717
C = join A by t, B by x;
718
D = group C by u;
718
D = group C by u;
719
E = foreach D generate group, COUNT($1);
719
E = foreach D generate group, COUNT($1);
720
F = filter E by C.t == 1;
720
F = filter E by C.t == 1;
721
</source>
721
</source>
722

    
   
722

   
723
<p>The first query is clearly more efficient than the second one because it reduces the amount of data going into the join. </p>
723
<p>The first query is clearly more efficient than the second one because it reduces the amount of data going into the join. </p>
724

    
   
724

   
725
<p>One case where pushing filters up might not be a good idea is if the cost of applying filter is very high and only a small amount of data is filtered out. </p>
725
<p>One case where pushing filters up might not be a good idea is if the cost of applying filter is very high and only a small amount of data is filtered out. </p>
726

    
   
726

   
727
</section>
727
</section>
728

    
   
728

   
729
<!-- +++++++++++++++++++++++++++++++ -->
729
<!-- +++++++++++++++++++++++++++++++ -->
730
<section id="pipeline">
730
<section id="pipeline">
731
<title>Reduce Your Operator Pipeline</title>
731
<title>Reduce Your Operator Pipeline</title>
732

    
   
732

   
733
<p>For clarity of your script, you might choose to split your projects into several steps for instance: </p>
733
<p>For clarity of your script, you might choose to split your projects into several steps for instance: </p>
734

    
   
734

   
735
<source>
735
<source>
736
A = load 'data' as (in: map[]);
736
A = load 'data' as (in: map[]);
737
-- get key out of the map
737
-- get key out of the map
738
B = foreach A generate in#'k1' as k1, in#'k2' as k2;
738
B = foreach A generate in#'k1' as k1, in#'k2' as k2;
739
-- concatenate the keys
739
-- concatenate the keys
740
C = foreach B generate CONCAT(k1, k2);
740
C = foreach B generate CONCAT(k1, k2);
741
.......
741
.......
742
</source>
742
</source>
743
<p>While the example above is easier to read, you might want to consider combining the two foreach statements to improve your query performance: </p>
743
<p>While the example above is easier to read, you might want to consider combining the two foreach statements to improve your query performance: </p>
744

    
   
744

   
745
<source>
745
<source>
746
A = load 'data' as (in: map[]);
746
A = load 'data' as (in: map[]);
747
-- concatenate the keys from the map
747
-- concatenate the keys from the map
748
B = foreach A generate CONCAT(in#'k1', in#'k2');
748
B = foreach A generate CONCAT(in#'k1', in#'k2');
749
....
749
....
750
</source>
750
</source>
751

    
   
751

   
752
<p>The same goes for filters. </p>
752
<p>The same goes for filters. </p>
753

    
   
753

   
754
</section>
754
</section>
755

    
   
755

   
756
<!-- +++++++++++++++++++++++++++++++ -->
756
<!-- +++++++++++++++++++++++++++++++ -->
757
<section id="algebraic-interface">
757
<section id="algebraic-interface">
758
<title>Make Your UDFs Algebraic</title>
758
<title>Make Your UDFs Algebraic</title>
759

    
   
759

   
760
<p>Queries that can take advantage of the combiner generally ran much faster (sometimes several times faster) than the versions that don't. The latest code significantly improves combiner usage; however, you need to make sure you do your part. If you have a UDF that works on grouped data and is, by nature, algebraic (meaning their computation can be decomposed into multiple steps) make sure you implement it as such. For details on how to write algebraic UDFs, see <a href="udf.html#algebraic-interface">Algebraic Interface</a>.</p>
760
<p>Queries that can take advantage of the combiner generally ran much faster (sometimes several times faster) than the versions that don't. The latest code significantly improves combiner usage; however, you need to make sure you do your part. If you have a UDF that works on grouped data and is, by nature, algebraic (meaning their computation can be decomposed into multiple steps) make sure you implement it as such. For details on how to write algebraic UDFs, see <a href="udf.html#algebraic-interface">Algebraic Interface</a>.</p>
761

    
   
761

   
762
<source>
762
<source>
763
A = load 'data' as (x, y, z)
763
A = load 'data' as (x, y, z)
764
B = group A by x;
764
B = group A by x;
765
C = foreach B generate group, MyUDF(A);
765
C = foreach B generate group, MyUDF(A);
766
....
766
....
767
</source>
767
</source>
768

    
   
768

   
769
<p>If <code>MyUDF</code> is algebraic, the query will use combiner and run much faster. You can run <code>explain</code> command on your query to make sure that combiner is used. </p>
769
<p>If <code>MyUDF</code> is algebraic, the query will use combiner and run much faster. You can run <code>explain</code> command on your query to make sure that combiner is used. </p>
770

    
   
770

   
771

    
   
771

   
772
</section>
772
</section>
773

    
   
773

   
774
<!-- +++++++++++++++++++++++++++++++ -->
774
<!-- +++++++++++++++++++++++++++++++ -->
775
<section id="accumulator-interface">
775
<section id="accumulator-interface">
776
<title>Use the Accumulator Interface</title>
776
<title>Use the Accumulator Interface</title>
777
<p>
777
<p>
778
If your UDF can't be made Algebraic but is able to deal with getting input in chunks rather than all at once, consider implementing the Accumulator  interface to reduce the amount of memory used by your script. If your function <em>is</em> Algebraic and can be used on conjunction with Accumulator functions, you will need to implement the Accumulator interface as well as the Algebraic interface. For more information, see <a href="udf.html#Accumulator-Interface">Accumulator Interface</a>.</p>
778
If your UDF can't be made Algebraic but is able to deal with getting input in chunks rather than all at once, consider implementing the Accumulator  interface to reduce the amount of memory used by your script. If your function <em>is</em> Algebraic and can be used on conjunction with Accumulator functions, you will need to implement the Accumulator interface as well as the Algebraic interface. For more information, see <a href="udf.html#Accumulator-Interface">Accumulator Interface</a>.</p>
779

    
   
779

   
780
<p><strong>Note:</strong> Pig automatically chooses the interface that it expects to provide the best performance: Algebraic &gt; Accumulator &gt; Default. </p>
780
<p><strong>Note:</strong> Pig automatically chooses the interface that it expects to provide the best performance: Algebraic &gt; Accumulator &gt; Default. </p>
781

    
   
781

   
782
</section>
782
</section>
783

    
   
783

   
784
<!-- +++++++++++++++++++++++++++++++ -->
784
<!-- +++++++++++++++++++++++++++++++ -->
785
<section id="nulls">
785
<section id="nulls">
786
<title>Drop Nulls Before a Join</title>
786
<title>Drop Nulls Before a Join</title>
787
<p>With the introduction of nulls, join and cogroup semantics were altered to work with nulls. The semantic for cogrouping with nulls is that nulls from a given input are grouped together, but nulls across inputs are not grouped together. This preserves the semantics of grouping (nulls are collected together from a single input to be passed to aggregate functions like COUNT) and the semantics of join (nulls are not joined across inputs). Since flattening an empty bag results in an empty row (and no output), in a standard join the rows with a null key will always be dropped. </p>
787
<p>With the introduction of nulls, join and cogroup semantics were altered to work with nulls. The semantic for cogrouping with nulls is that nulls from a given input are grouped together, but nulls across inputs are not grouped together. This preserves the semantics of grouping (nulls are collected together from a single input to be passed to aggregate functions like COUNT) and the semantics of join (nulls are not joined across inputs). Since flattening an empty bag results in an empty row (and no output), in a standard join the rows with a null key will always be dropped. </p>
788

    
   
788

   
789
<p>This join</p>
789
<p>This join</p>
790
<source>
790
<source>
791
A = load 'myfile' as (t, u, v);
791
A = load 'myfile' as (t, u, v);
792
B = load 'myotherfile' as (x, y, z);
792
B = load 'myotherfile' as (x, y, z);
793
C = join A by t, B by x;
793
C = join A by t, B by x;
794
</source>
794
</source>
795

    
   
795

   
796
<p>is rewritten by Pig to </p>
796
<p>is rewritten by Pig to </p>
797
<source>
797
<source>
798
A = load 'myfile' as (t, u, v);
798
A = load 'myfile' as (t, u, v);
799
B = load 'myotherfile' as (x, y, z);
799
B = load 'myotherfile' as (x, y, z);
800
C1 = cogroup A by t INNER, B by x INNER;
800
C1 = cogroup A by t INNER, B by x INNER;
801
C = foreach C1 generate flatten(A), flatten(B);
801
C = foreach C1 generate flatten(A), flatten(B);
802
</source>
802
</source>
803

    
   
803

   
804
<p>Since the nulls from A and B won't be collected together, when the nulls are flattened we're guaranteed to have an empty bag, which will result in no output. So the null keys will be dropped. But they will not be dropped until the last possible moment. </p> 
804
<p>Since the nulls from A and B won't be collected together, when the nulls are flattened we're guaranteed to have an empty bag, which will result in no output. So the null keys will be dropped. But they will not be dropped until the last possible moment. </p> 
805

    
   
805

   
806
<p>If the query is rewritten to </p>
806
<p>If the query is rewritten to </p>
807
<source>
807
<source>
808
A = load 'myfile' as (t, u, v);
808
A = load 'myfile' as (t, u, v);
809
B = load 'myotherfile' as (x, y, z);
809
B = load 'myotherfile' as (x, y, z);
810
A1 = filter A by t is not null;
810
A1 = filter A by t is not null;
811
B1 = filter B by x is not null;
811
B1 = filter B by x is not null;
812
C = join A1 by t, B1 by x;
812
C = join A1 by t, B1 by x;
813
</source>
813
</source>
814

    
   
814

   
815
<p>then the nulls will be dropped before the join.  Since all null keys go to a single reducer, if your key is null even a small percentage of the time the gain can be significant.  In one test where the key was null 7% of the time and the data was spread across 200 reducers, we saw a about a 10x speed up in the query by adding the early filters. </p>
815
<p>then the nulls will be dropped before the join.  Since all null keys go to a single reducer, if your key is null even a small percentage of the time the gain can be significant.  In one test where the key was null 7% of the time and the data was spread across 200 reducers, we saw a about a 10x speed up in the query by adding the early filters. </p>
816

    
   
816

   
817
</section>
817
</section>
818

    
   
818

   
819
<!-- +++++++++++++++++++++++++++++++ -->
819
<!-- +++++++++++++++++++++++++++++++ -->
820
<section id="join-optimizations">
820
<section id="join-optimizations">
821
<title>Take Advantage of Join Optimizations</title>
821
<title>Take Advantage of Join Optimizations</title>
822
<p><strong>Regular Join Optimizations</strong></p>
822
<p><strong>Regular Join Optimizations</strong></p>
823
<p>Optimization for regular joins ensures that the last table in the join is not brought into memory but streamed through instead. Optimization reduces the amount of memory used which means you can avoid spilling the data and also should be able to scale your query to larger data volumes. </p>
823
<p>Optimization for regular joins ensures that the last table in the join is not brought into memory but streamed through instead. Optimization reduces the amount of memory used which means you can avoid spilling the data and also should be able to scale your query to larger data volumes. </p>
824
<p>To take advantage of this optimization, make sure that the table with the largest number of tuples per key is the last table in your query. 
824
<p>To take advantage of this optimization, make sure that the table with the largest number of tuples per key is the last table in your query. 
825
In some of our tests we saw 10x performance improvement as the result of this optimization.</p>
825
In some of our tests we saw 10x performance improvement as the result of this optimization.</p>
826
<source>
826
<source>
827
small = load 'small_file' as (t, u, v);
827
small = load 'small_file' as (t, u, v);
828
large = load 'large_file' as (x, y, z);
828
large = load 'large_file' as (x, y, z);
829
C = join small by t, large by x;
829
C = join small by t, large by x;
830
</source>
830
</source>
831

    
   
831

   
832
<p><strong>Specialized Join Optimizations</strong></p>
832
<p><strong>Specialized Join Optimizations</strong></p>
833
<p>Optimization can also be achieved using fragment replicate joins, skewed joins, and merge joins. 
833
<p>Optimization can also be achieved using fragment replicate joins, skewed joins, and merge joins. 
834
For more information see <a href="perf.html#Specialized-Joins">Specialized Joins</a>.</p>
834
For more information see <a href="perf.html#Specialized-Joins">Specialized Joins</a>.</p>
835

    
   
835

   
836
</section>
836
</section>
837

    
   
837

   
838
<!-- +++++++++++++++++++++++++++++++ -->
838
<!-- +++++++++++++++++++++++++++++++ -->
839
<section id="parallel">
839
<section id="parallel">
840
<title>Use the Parallel Features</title>
840
<title>Use the Parallel Features</title>
841

    
   
841

   
842
<p>You can set the number of reduce tasks for the MapReduce jobs generated by Pig using two parallel features. 
842
<p>You can set the number of reduce tasks for the MapReduce jobs generated by Pig using two parallel features. 
843
(The parallel features only affect the number of reduce tasks. Map parallelism is determined by the input file, one map for each HDFS block.)</p>
843
(The parallel features only affect the number of reduce tasks. Map parallelism is determined by the input file, one map for each HDFS block.)</p>
844

    
   
844

   
845
<p><strong>You Set the Number of Reducers</strong></p>
845
<p><strong>You Set the Number of Reducers</strong></p>
846
<p>Use the <a href="cmds.html#set">set default parallel</a> command to set the number of reducers at the script level.</p>
846
<p>Use the <a href="cmds.html#set">set default parallel</a> command to set the number of reducers at the script level.</p>
847

    
   
847

   
848
<p>Alternatively, use the PARALLEL clause to set the number of reducers at the operator level. 
848
<p>Alternatively, use the PARALLEL clause to set the number of reducers at the operator level. 
849
(In a script, the value set via the PARALLEL clause will override any value set via "set default parallel.")
849
(In a script, the value set via the PARALLEL clause will override any value set via "set default parallel.")
850
You can include the PARALLEL clause with any operator that starts a reduce phase:  
850
You can include the PARALLEL clause with any operator that starts a reduce phase:  
851
<a href="basic.html#COGROUP">COGROUP</a>, 
851
<a href="basic.html#COGROUP">COGROUP</a>, 
852
<a href="basic.html#CROSS">CROSS</a>, 
852
<a href="basic.html#CROSS">CROSS</a>, 
853
<a href="basic.html#DISTINCT">DISTINCT</a>, 
853
<a href="basic.html#DISTINCT">DISTINCT</a>, 
854
<a href="basic.html#GROUP">GROUP</a>, 
854
<a href="basic.html#GROUP">GROUP</a>, 
855
<a href="basic.html#JOIN-inner">JOIN (inner)</a>, 
855
<a href="basic.html#JOIN-inner">JOIN (inner)</a>, 
856
<a href="basic.html#JOIN-outer">JOIN (outer)</a>, and
856
<a href="basic.html#JOIN-outer">JOIN (outer)</a>, and
857
<a href="basic.html#ORDER-BY">ORDER BY</a>.
857
<a href="basic.html#ORDER-BY">ORDER BY</a>.
858
</p>
858
</p>
859

    
   
859

   
860
<p>The number of reducers you need for a particular construct in Pig that forms a MapReduce boundary depends entirely on (1) your data and the number of intermediate keys you are generating in your mappers and (2) the partitioner and distribution of map (combiner) output keys. In the best cases we have seen that a reducer processing about 1 GB of data behaves efficiently.</p>
860
<p>The number of reducers you need for a particular construct in Pig that forms a MapReduce boundary depends entirely on (1) your data and the number of intermediate keys you are generating in your mappers and (2) the partitioner and distribution of map (combiner) output keys. In the best cases we have seen that a reducer processing about 1 GB of data behaves efficiently.</p>
861

    
   
861

   
862
<p><strong>Let Pig Set the Number of Reducers</strong></p>
862
<p><strong>Let Pig Set the Number of Reducers</strong></p>
863
<p>If  neither "set default parallel" nor the PARALLEL clause are used, Pig sets the number of reducers using a heuristic based on the size of the input data. You can set the values for these properties:</p>
863
<p>If  neither "set default parallel" nor the PARALLEL clause are used, Pig sets the number of reducers using a heuristic based on the size of the input data. You can set the values for these properties:</p>
864
<ul>
864
<ul>
865
	<li>pig.exec.reducers.bytes.per.reducer - Defines the number of input bytes per reduce; default value is 1000*1000*1000 (1GB).</li>
865
	<li>pig.exec.reducers.bytes.per.reducer - Defines the number of input bytes per reduce; default value is 1000*1000*1000 (1GB).</li>
866
	<li>pig.exec.reducers.max - Defines the upper bound on the number of reducers; default is 999. </li>
866
	<li>pig.exec.reducers.max - Defines the upper bound on the number of reducers; default is 999. </li>
867
</ul>
867
</ul>
868
<p></p>
868
<p></p>
869

    
   
869

   
870
<p>The formula, shown below, is very simple and will improve over time. The computed value takes all inputs within the script into account and applies the computed value to all the jobs within Pig script.</p>
870
<p>The formula, shown below, is very simple and will improve over time. The computed value takes all inputs within the script into account and applies the computed value to all the jobs within Pig script.</p>
871

    
   
871

   
872
<p><code>#reducers = MIN (pig.exec.reducers.max, total input size (in bytes) / bytes per reducer) </code></p>
872
<p><code>#reducers = MIN (pig.exec.reducers.max, total input size (in bytes) / bytes per reducer) </code></p>
873

    
   
873

   
874
<p><strong>Examples</strong></p>
874
<p><strong>Examples</strong></p>
875
<p>In this example PARALLEL is used with the GROUP operator. </p>
875
<p>In this example PARALLEL is used with the GROUP operator. </p>
876
<source>
876
<source>
877
A = LOAD 'myfile' AS (t, u, v);
877
A = LOAD 'myfile' AS (t, u, v);
878
B = GROUP A BY t PARALLEL 18;
878
B = GROUP A BY t PARALLEL 18;
879
...
879
...
880
</source>
880
</source>
881

    
   
881

   
882
<p>In this example all the MapReduce jobs that get launched use 20 reducers.</p>
882
<p>In this example all the MapReduce jobs that get launched use 20 reducers.</p>
883
<source>
883
<source>
884
SET default_parallel 20;
884
SET default_parallel 20;
885
A = LOAD ‘myfile.txt’ USING PigStorage() AS (t, u, v);
885
A = LOAD ‘myfile.txt’ USING PigStorage() AS (t, u, v);
886
B = GROUP A BY t;
886
B = GROUP A BY t;
887
C = FOREACH B GENERATE group, COUNT(A.t) as mycount;
887
C = FOREACH B GENERATE group, COUNT(A.t) as mycount;
888
D = ORDER C BY mycount;
888
D = ORDER C BY mycount;
889
STORE D INTO ‘mysortedcount’ USING PigStorage();
889
STORE D INTO ‘mysortedcount’ USING PigStorage();
890
</source>
890
</source>
891
</section>
891
</section>
892

    
   
892

   
893
<!-- +++++++++++++++++++++++++++++++ -->
893
<!-- +++++++++++++++++++++++++++++++ -->
894
<section id="limit">
894
<section id="limit">
895
<title>Use the LIMIT Operator</title>
895
<title>Use the LIMIT Operator</title>
896
<p>Often you are not interested in the entire output but rather a sample or top results. In such cases, using LIMIT can yield a much better performance as we push the limit as high as possible to minimize the amount of data travelling through the pipeline. </p>
896
<p>Often you are not interested in the entire output but rather a sample or top results. In such cases, using LIMIT can yield a much better performance as we push the limit as high as possible to minimize the amount of data travelling through the pipeline. </p>
897
<p>Sample: 
897
<p>Sample: 
898
</p>
898
</p>
899

    
   
899

   
900
<source>
900
<source>
901
A = load 'myfile' as (t, u, v);
901
A = load 'myfile' as (t, u, v);
902
B = limit A 500;
902
B = limit A 500;
903
</source>
903
</source>
904

    
   
904

   
905
<p>Top results: </p>
905
<p>Top results: </p>
906

    
   
906

   
907
<source>
907
<source>
908
A = load 'myfile' as (t, u, v);
908
A = load 'myfile' as (t, u, v);
909
B = order A by t;
909
B = order A by t;
910
C = limit B 500;
910
C = limit B 500;
911
</source>
911
</source>
912

    
   
912

   
913
</section>
913
</section>
914

    
   
914

   
915
<!-- +++++++++++++++++++++++++++++++ -->
915
<!-- +++++++++++++++++++++++++++++++ -->
916
<section id="distinct">
916
<section id="distinct">
917
<title>Prefer DISTINCT over GROUP BY/GENERATE</title>
917
<title>Prefer DISTINCT over GROUP BY/GENERATE</title>
918

    
   
918

   
919
<p>To extract unique values from a column in a relation you can use DISTINCT or GROUP BY/GENERATE. DISTINCT is the preferred method; it is faster and more efficient.</p>
919
<p>To extract unique values from a column in a relation you can use DISTINCT or GROUP BY/GENERATE. DISTINCT is the preferred method; it is faster and more efficient.</p>
920

    
   
920

   
921
<p>Example using GROUP BY - GENERATE:</p>
921
<p>Example using GROUP BY - GENERATE:</p>
922

    
   
922

   
923
<source>
923
<source>
924
A = load 'myfile' as (t, u, v);
924
A = load 'myfile' as (t, u, v);
925
B = foreach A generate u;
925
B = foreach A generate u;
926
C = group B by u;
926
C = group B by u;
927
D = foreach C generate group as uniquekey;
927
D = foreach C generate group as uniquekey;
928
dump D; 
928
dump D; 
929
</source>
929
</source>
930

    
   
930

   
931
<p>Example using DISTINCT:</p>
931
<p>Example using DISTINCT:</p>
932

    
   
932

   
933
<source>
933
<source>
934
A = load 'myfile' as (t, u, v);
934
A = load 'myfile' as (t, u, v);
935
B = foreach A generate u;
935
B = foreach A generate u;
936
C = distinct B;
936
C = distinct B;
937
dump C; 
937
dump C; 
938
</source>
938
</source>
939
</section>
939
</section>
940

    
   
940

   
941
<!-- +++++++++++++++++++++++++++++++ -->
941
<!-- +++++++++++++++++++++++++++++++ -->
942
<section id="compression">
942
<section id="compression">
943
<title>Compress the Results of Intermediate Jobs</title>
943
<title>Compress the Results of Intermediate Jobs</title>
944
<p>If your Pig script generates a sequence of MapReduce jobs, you can compress the output of the intermediate jobs using LZO compression. (Use the <a href="test.html#EXPLAIN">EXPLAIN</a> operator to determine if your script produces multiple MapReduce Jobs.)</p>
944
<p>If your Pig script generates a sequence of MapReduce jobs, you can compress the output of the intermediate jobs using LZO compression. (Use the <a href="test.html#EXPLAIN">EXPLAIN</a> operator to determine if your script produces multiple MapReduce Jobs.)</p>
945

    
   
945

   
946
<p>By doing this, you will save HDFS space used to store the intermediate data used by PIG and potentially improve query execution speed. In general, the more intermediate data that is generated, the more benefits in storage and speed that result.</p>
946
<p>By doing this, you will save HDFS space used to store the intermediate data used by PIG and potentially improve query execution speed. In general, the more intermediate data that is generated, the more benefits in storage and speed that result.</p>
947

    
   
947

   
948
<p>You can set the value for these properties:</p>
948
<p>You can set the value for these properties:</p>
949
<ul>
949
<ul>
950
	<li>pig.tmpfilecompression - Determines if the temporary files should be compressed or not (set to false by default).</li>
950
	<li>pig.tmpfilecompression - Determines if the temporary files should be compressed or not (set to false by default).</li>
951
	<li>pig.tmpfilecompression.codec - Specifies which compression codec to use. Currently, Pig accepts "gz" and "lzo" as possible values. However, because LZO is under GPL license (and disabled by default) you will need to configure your cluster to use the LZO codec to take advantage of this feature. For details, see http://code.google.com/p/hadoop-gpl-compression/wiki/FAQ.</li>
951
	<li>pig.tmpfilecompression.codec - Specifies which compression codec to use. Currently, Pig accepts "gz" and "lzo" as possible values. However, because LZO is under GPL license (and disabled by default) you will need to configure your cluster to use the LZO codec to take advantage of this feature. For details, see http://code.google.com/p/hadoop-gpl-compression/wiki/FAQ.</li>
952
</ul>
952
</ul>
953
<p></p>
953
<p></p>
954

    
   
954

   
955
<p>On the non-trivial queries (one ran longer than a couple of minutes) we saw significant improvements both in terms of query latency and space usage. For some queries we saw up to 96% disk saving and up to 4x query speed up. Of course, the performance characteristics are very much query and data dependent and testing needs to be done to determine gains. We did not see any slowdown in the tests we peformed which means that you are at least saving on space while using compression.</p>
955
<p>On the non-trivial queries (one ran longer than a couple of minutes) we saw significant improvements both in terms of query latency and space usage. For some queries we saw up to 96% disk saving and up to 4x query speed up. Of course, the performance characteristics are very much query and data dependent and testing needs to be done to determine gains. We did not see any slowdown in the tests we peformed which means that you are at least saving on space while using compression.</p>
956

    
   
956

   
957
<p>With gzip we saw a better compression (96-99%) but at a cost of 4% slowdown. Thus, we don't recommend using gzip. </p>
957
<p>With gzip we saw a better compression (96-99%) but at a cost of 4% slowdown. Thus, we don't recommend using gzip. </p>
958

    
   
958

   
959
<p><strong>Example</strong></p>
959
<p><strong>Example</strong></p>
960
<source>
960
<source>
961
-- launch Pig script using lzo compression 
961
-- launch Pig script using lzo compression 
962

    
   
962

   
963
java -cp $PIG_HOME/pig.jar 
963
java -cp $PIG_HOME/pig.jar 
964
-Djava.library.path=&lt;path to the lzo library&gt; 
964
-Djava.library.path=&lt;path to the lzo library&gt; 
965
-Dpig.tmpfilecompression=true 
965
-Dpig.tmpfilecompression=true 
966
-Dpig.tmpfilecompression.codec=lzo org.apache.pig.Main  myscript.pig 
966
-Dpig.tmpfilecompression.codec=lzo org.apache.pig.Main  myscript.pig 
967
</source>
967
</source>
968
</section>
968
</section>
969

    
   
969

   
970
<!-- +++++++++++++++++++++++++++++++ -->
970
<!-- +++++++++++++++++++++++++++++++ -->
971
<section id="combine-files">
971
<section id="combine-files">
972
<title>Combine Small Input Files</title>
972
<title>Combine Small Input Files</title>
973
<p>Processing input (either user input or intermediate input) from multiple small files can be inefficient because a separate map has to be created for each file. Pig can now combined small files so that they are processed as a single map.</p>
973
<p>Processing input (either user input or intermediate input) from multiple small files can be inefficient because a separate map has to be created for each file. Pig can now combined small files so that they are processed as a single map.</p>
974

    
   
974

   
975
<p>You can set the values for these properties:</p>
975
<p>You can set the values for these properties:</p>
976

    
   
976

   
977
<ul>
977
<ul>
978
<li>pig.maxCombinedSplitSize – Specifies the size, in bytes, of data to be processed by a single map. Smaller files are combined untill this size is reached. </li>
978
<li>pig.maxCombinedSplitSize – Specifies the size, in bytes, of data to be processed by a single map. Smaller files are combined untill this size is reached. </li>
979
<li>pig.splitCombination – Turns combine split files on or off (set to “true” by default).</li>
979
<li>pig.splitCombination – Turns combine split files on or off (set to “true” by default).</li>
980
</ul>
980
</ul>
981
<p></p>
981
<p></p>
982

    
   
982

   
983
<p>This feature works with <a href="func.html#PigStorage">PigStorage</a>. However, if you are using a custom loader, please note the following:</p>
983
<p>This feature works with <a href="func.html#PigStorage">PigStorage</a>. However, if you are using a custom loader, please note the following:</p>
984

    
   
984

   
985
<ul>
985
<ul>
986
<li>If your loader implementation makes use of the PigSplit object passed through the prepareToRead method, then you may need to rebuild the loader since the definition of PigSplit has been modified. </li>
986
<li>If your loader implementation makes use of the PigSplit object passed through the prepareToRead method, then you may need to rebuild the loader since the definition of PigSplit has been modified. </li>
987
<li>The loader must be stateless across the invocations to the prepareToRead method. That is, the method should reset any internal states that are not affected by the RecordReader argument.</li>
987
<li>The loader must be stateless across the invocations to the prepareToRead method. That is, the method should reset any internal states that are not affected by the RecordReader argument.</li>
988
<li>If a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.</li>
988
<li>If a loader implements IndexableLoadFunc, or implements OrderedLoadFunc and CollectableLoadFunc, its input splits won't be subject to possible combinations.</li>
989
</ul>
989
</ul>
990
<p></p>
990
<p></p>
991
</section>
991
</section>
992
</section>
992
</section>
993
  
993
  
994
<!-- ==================================================================== -->
994
<!-- ==================================================================== -->
995
<!-- SPECIALIZED JOINS-->
995
<!-- SPECIALIZED JOINS-->
996
  <section id="specialized-joins">
996
  <section id="specialized-joins">
997
   <title>Specialized Joins</title>
997
   <title>Specialized Joins</title>
998
<!-- FRAGMENT REPLICATE JOINS-->
998
<!-- FRAGMENT REPLICATE JOINS-->
999

    
   
999

   
1000
<!-- +++++++++++++++++++++++++++++++ -->
1000
<!-- +++++++++++++++++++++++++++++++ -->
1001
<section id="replicated-joins">
1001
<section id="replicated-joins">
1002
<title>Replicated Joins</title>
1002
<title>Replicated Joins</title>
1003
<p>Fragment replicate join is a special type of join that works well if one or more relations are small enough to fit into main memory. 
1003
<p>Fragment replicate join is a special type of join that works well if one or more relations are small enough to fit into main memory. 
1004
In such cases, Pig can perform a very efficient join because all of the hadoop work is done on the map side. In this type of join the 
1004
In such cases, Pig can perform a very efficient join because all of the hadoop work is done on the map side. In this type of join the 
1005
large relation is followed by one or more small relations. The small relations must be small enough to fit into main memory; if they 
1005
large relation is followed by one or more small relations. The small relations must be small enough to fit into main memory; if they 
1006
don't, the process fails and an error is generated.</p>
1006
don't, the process fails and an error is generated.</p>
1007
 
1007
 
1008
<section>
1008
<section>
1009
<title>Usage</title>
1009
<title>Usage</title>
1010
<p>Perform a replicated join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>).
1010
<p>Perform a replicated join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>).
1011
In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations; 
1011
In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations; 
1012
and, all small relations together must fit into main memory, otherwise an error is generated. </p>
1012
and, all small relations together must fit into main memory, otherwise an error is generated. </p>
1013
<source>
1013
<source>
1014
big = LOAD 'big_data' AS (b1,b2,b3);
1014
big = LOAD 'big_data' AS (b1,b2,b3);
1015

    
   
1015

   
1016
tiny = LOAD 'tiny_data' AS (t1,t2,t3);
1016
tiny = LOAD 'tiny_data' AS (t1,t2,t3);
1017

    
   
1017

   
1018
mini = LOAD 'mini_data' AS (m1,m2,m3);
1018
mini = LOAD 'mini_data' AS (m1,m2,m3);
1019

    
   
1019

   
1020
C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated';
1020
C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated';
1021
</source>
1021
</source>
1022
</section>
1022
</section>
1023

    
   
1023

   
1024
<section>
1024
<section>
1025
<title>Conditions</title>
1025
<title>Conditions</title>
1026
<p>Fragment replicate joins are experimental; we don't have a strong sense of how small the small relation must be to fit 
1026
<p>Fragment replicate joins are experimental; we don't have a strong sense of how small the small relation must be to fit 
1027
into memory. In our tests with a simple query that involves just a JOIN, a relation of up to 100 M can be used if the process overall 
1027
into memory. In our tests with a simple query that involves just a JOIN, a relation of up to 100 M can be used if the process overall 
1028
gets 1 GB of memory. Please share your observations and experience with us.</p>
1028
gets 1 GB of memory. Please share your observations and experience with us.</p>
1029
</section>
1029
</section>
1030
</section>
1030
</section>
1031
<!-- END FRAGMENT REPLICATE JOINS-->
1031
<!-- END FRAGMENT REPLICATE JOINS-->
1032

    
   
1032

   
1033
<!-- +++++++++++++++++++++++++++++++ -->
1033
<!-- +++++++++++++++++++++++++++++++ -->
1034
<!-- SKEWED JOINS-->
1034
<!-- SKEWED JOINS-->
1035
<section id="skewed-joins">
1035
<section id="skewed-joins">
1036
<title>Skewed Joins</title>
1036
<title>Skewed Joins</title>
1037

    
   
1037

   
1038
<p>
1038
<p>
1039
Parallel joins are vulnerable to the presence of skew in the underlying data. 
1039
Parallel joins are vulnerable to the presence of skew in the underlying data. 
1040
If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains. 
1040
If the underlying data is sufficiently skewed, load imbalances will swamp any of the parallelism gains. 
1041
In order to counteract this problem, skewed join computes a histogram of the key space and uses this 
1041
In order to counteract this problem, skewed join computes a histogram of the key space and uses this 
1042
data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input keys. 
1042
data to allocate reducers for a given key. Skewed join does not place a restriction on the size of the input keys. 
1043
It accomplishes this by splitting the left input on the join predicate and streaming the right input. The left input is 
1043
It accomplishes this by splitting the left input on the join predicate and streaming the right input. The left input is 
1044
sampled to create the histogram.
1044
sampled to create the histogram.
1045
</p>
1045
</p>
1046

    
   
1046

   
1047
<p>
1047
<p>
1048
Skewed join can be used when the underlying data is sufficiently skewed and you need a finer 
1048
Skewed join can be used when the underlying data is sufficiently skewed and you need a finer 
1049
control over the allocation of reducers to counteract the skew. It should also be used when the data 
1049
control over the allocation of reducers to counteract the skew. It should also be used when the data 
1050
associated with a given key is too large to fit in memory.
1050
associated with a given key is too large to fit in memory.
1051
</p>
1051
</p>
1052

    
   
1052

   
1053
<section>
1053
<section>
1054
<title>Usage</title>
1054
<title>Usage</title>
1055
<p>Perform a skewed join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>). </p>
1055
<p>Perform a skewed join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>). </p>
1056
<source>
1056
<source>
1057
big = LOAD 'big_data' AS (b1,b2,b3);
1057
big = LOAD 'big_data' AS (b1,b2,b3);
1058
massive = LOAD 'massive_data' AS (m1,m2,m3);
1058
massive = LOAD 'massive_data' AS (m1,m2,m3);
1059
C = JOIN big BY b1, massive BY m1 USING 'skewed';
1059
C = JOIN big BY b1, massive BY m1 USING 'skewed';
1060
</source>
1060
</source>
1061
</section>
1061
</section>
1062

    
   
1062

   
1063
<section>
1063
<section>
1064
<title>Conditions</title>
1064
<title>Conditions</title>
1065
<p>
1065
<p>
1066
Skewed join will only work under these conditions: 
1066
Skewed join will only work under these conditions: 
1067
</p>
1067
</p>
1068
<ul>
1068
<ul>
1069
<li>Skewed join works with two-table inner join. Currently we do not support more than two tables for skewed join. 
1069
<li>Skewed join works with two-table inner join. Currently we do not support more than two tables for skewed join. 
1070
Specifying three-way (or more) joins will fail validation. For such joins, we rely on you to break them up into two-way joins.</li>
1070
Specifying three-way (or more) joins will fail validation. For such joins, we rely on you to break them up into two-way joins.</li>
1071
<li>The pig.skewedjoin.reduce.memusage Java parameter specifies the fraction of heap available for the 
1071
<li>The pig.skewedjoin.reduce.memusage Java parameter specifies the fraction of heap available for the 
1072
reducer to perform the join. A low fraction forces Pig to use more reducers but increases 
1072
reducer to perform the join. A low fraction forces Pig to use more reducers but increases 
1073
copying cost. We have seen good performance when we set this value 
1073
copying cost. We have seen good performance when we set this value 
1074
in the range 0.1 - 0.4. However, note that this is hardly an accurate range. Its value 
1074
in the range 0.1 - 0.4. However, note that this is hardly an accurate range. Its value 
1075
depends on the amount of heap available for the operation, the number of columns 
1075
depends on the amount of heap available for the operation, the number of columns 
1076
in the input and the skew. An appropriate value is best obtained by conducting experiments to achieve 
1076
in the input and the skew. An appropriate value is best obtained by conducting experiments to achieve 
1077
a good performance. The default value is 0.5. </li>
1077
a good performance. The default value is 0.5. </li>
1078
<li>Skewed join does not address (balance) uneven data distribution across reducers. 
1078
<li>Skewed join does not address (balance) uneven data distribution across reducers. 
1079
However, in most cases, skewed join ensures that the join will finish (however slowly) rather than fail.
1079
However, in most cases, skewed join ensures that the join will finish (however slowly) rather than fail.
1080
</li>
1080
</li>
1081
</ul>
1081
</ul>
1082
</section>
1082
</section>
1083
</section><!-- END SKEWED JOINS-->
1083
</section><!-- END SKEWED JOINS-->
1084

    
   
1084

   
1085
<!-- +++++++++++++++++++++++++++++++ -->
1085
<!-- +++++++++++++++++++++++++++++++ -->
1086
<!-- MERGE JOIN-->
1086
<!-- MERGE JOIN-->
1087
<section id="merge-joins">
1087
<section id="merge-joins">
1088
<title>Merge Joins</title>
1088
<title>Merge Joins</title>
1089

    
   
1089

   
1090
<p>
1090
<p>
1091
Often user data is stored such that both inputs are already sorted on the join key. 
1091
Often user data is stored such that both inputs are already sorted on the join key. 
1092
In this case, it is possible to join the data in the map phase of a MapReduce job. 
1092
In this case, it is possible to join the data in the map phase of a MapReduce job. 
1093
This provides a significant performance improvement compared to passing all of the data through 
1093
This provides a significant performance improvement compared to passing all of the data through 
1094
unneeded sort and shuffle phases. 
1094
unneeded sort and shuffle phases. 
1095
</p>
1095
</p>
1096

    
   
1096

   
1097
<p>
1097
<p>
1098
Pig has implemented a merge join algorithm, or sort-merge join, although in this case the sort is already 
1098
Pig has implemented a merge join algorithm, or sort-merge join. It works on pre-sorted data, and does not
1099
assumed to have been done (see the Conditions, below). 
1099
sort data for you. See Conditions, below, for restrictions that apply when using this join algorithm.
1100

    
   
1100

   
1101
Pig implements the merge join algorithm by selecting the left input of the join to be the input file for the map phase, 
1101
Pig implements the merge join algorithm by selecting the left input of the join to be the input file for the map phase, 
1102
and the right input of the join to be the side file. It then samples records from the right input to build an
1102
and the right input of the join to be the side file. It then samples records from the right input to build an
1103
 index that contains, for each sampled record, the key(s) the filename and the offset into the file the record 
1103
 index that contains, for each sampled record, the key(s) the filename and the offset into the file the record 
1104
 begins at. This sampling is done in the first MapReduce job. A second MapReduce job is then initiated, 
1104
 begins at. This sampling is done in the first MapReduce job. A second MapReduce job is then initiated, 
1105
 with the left input as its input. Each map uses the index to seek to the appropriate record in the right 
1105
 with the left input as its input. Each map uses the index to seek to the appropriate record in the right 
1106
 input and begin doing the join. 
1106
 input and begin doing the join. 
1107
</p>
1107
</p>
1108

    
   
1108

   
1109
<section>
1109
<section>
1110
<title>Usage</title>
1110
<title>Usage</title>
1111
<p>Perform a merge join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>). </p>
1111
<p>Perform a merge join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>). </p>
1112
<source>
1112
<source>
1113
C = JOIN A BY a1, B BY b1, C BY c1 USING 'merge';
1113
C = JOIN A BY a1, B BY b1, C BY c1 USING 'merge';
1114
</source>
1114
</source>
1115
</section>
1115
</section>
1116

    
   
1116

   
1117
<section>
1117
<section>
1118
<title>Conditions</title>
1118
<title>Conditions</title>
1119
<p><strong>Condition A</strong></p>
1119
<p><strong>Condition A</strong></p>
1120
<p>Inner merge join (between two tables) will only work under these conditions: </p>
1120
<p>Inner merge join (between two tables) will only work under these conditions: </p>
1121
<ul>
1121
<ul>
1122
<li>Between the load of the sorted input and the merge join statement there can only be filter statements and 
1122
<li>Data must come directly from either a Load or an Order statement.
1123
foreach statement where the foreach statement should meet the following conditions: 
1123
<li>There may be filter statements and foreach statements between the sorted data source and the join statement. The foreach statement should meet the following conditions: 
1124
<ul>
1124
<ul>
1125
<li>There should be no UDFs in the foreach statement. </li>
1125
<li>There should be no UDFs in the foreach statement. </li>
1126
<li>The foreach statement should not change the position of the join keys. </li>
1126
<li>The foreach statement should not change the position of the join keys. </li>
1127
<li>There should be no transformation on the join keys which will change the sort order. </li>
1127
<li>There should be no transformation on the join keys which will change the sort order. </li>
1128
</ul>
1128
</ul>
1129
</li>
1129
</li>
1130
<li>Data must be sorted on join keys in ascending (ASC) order on both sides.</li>
1130
<li>Data must be sorted on join keys in ascending (ASC) order on both sides.</li>
1131
<li>Right-side loader must implement either the {OrderedLoadFunc} interface or {IndexableLoadFunc} interface.</li>
1131
<li>If sort is provided by the loader, rather than an explicit Order operation, the right-side loader must implement either the {OrderedLoadFunc} interface or {IndexableLoadFunc} interface.</li>
1132
<li>Type information must be provided for the join key in the schema.</li>
1132
<li>Type information must be provided for the join key in the schema.</li>
1133
</ul>
1133
</ul>
1134
<p></p>
1134
<p></p>
1135
<p>The PigStorage loader satisfies all of these conditions.</p>
1135
<p>The PigStorage loader satisfies all of these conditions.</p>
1136
<p></p>
1136
<p></p>
1137

    
   
1137

   
1138
<p><strong>Condition B</strong></p>
1138
<p><strong>Condition B</strong></p>
1139
<p>Outer merge join (between two tables) and inner merge join (between three or more tables) will only work under these conditions: </p>
1139
<p>Outer merge join (between two tables) and inner merge join (between three or more tables) will only work under these conditions: </p>
1140
<ul>
1140
<ul>
1141
<li>No other operations can be done between the load and join statements. </li>
1141
<li>No other operations can be done between the load and join statements. </li>
1142
<li>Data must be sorted on join keys in ascending (ASC) order on both sides. </li>
1142
<li>Data must be sorted on join keys in ascending (ASC) order on both sides. </li>
1143
<li>Left-most loader must implement {CollectableLoader} interface as well as {OrderedLoadFunc}. </li>
1143
<li>Left-most loader must implement {CollectableLoader} interface as well as {OrderedLoadFunc}. </li>
1144
<li>All other loaders must implement {IndexableLoadFunc}. </li>
1144
<li>All other loaders must implement {IndexableLoadFunc}. </li>
1145
<li>Type information must be provided for the join key in the schema.</li>
1145
<li>Type information must be provided for the join key in the schema.</li>
1146
</ul>
1146
</ul>
1147
<p></p>
1147
<p></p>
1148
<p>Pig does not provide a loader that supports outer merge joins. You will need to build your own loader to take advantage of this feature.</p>
1148
<p>Pig does not provide a loader that supports outer merge joins. You will need to build your own loader to take advantage of this feature.</p>
1149
</section>
1149
</section>
1150
</section>
1150
</section>
1151
<!-- END MERGE JOIN -->
1151
<!-- END MERGE JOIN -->
1152

    
   
1152

   
1153

    
   
1153

   
1154

    
   
1154

   
1155
<!-- +++++++++++++++++++++++++++++++ -->
1155
<!-- +++++++++++++++++++++++++++++++ -->
1156
<!-- MERGE SPARSE JOIN-->
1156
<!-- MERGE SPARSE JOIN-->
1157
<section id="merge-sparse-joins">
1157
<section id="merge-sparse-joins">
1158
<title>Merge-Sparse Joins</title>
1158
<title>Merge-Sparse Joins</title>
1159
<p>Merge-Sparse join is a specialization of merge join. Merge-sparse join is intended for use when one of the tables is very sparse, meaning you expect only a small number of records to be matched during the join. In tests this join performed well for cases where less than 1% of the data was matched in the join.</p>
1159
<p>Merge-Sparse join is a specialization of merge join. Merge-sparse join is intended for use when one of the tables is very sparse, meaning you expect only a small number of records to be matched during the join. In tests this join performed well for cases where less than 1% of the data was matched in the join.</p>
1160

    
   
1160

   
1161

    
   
1161

   
1162
<section>
1162
<section>
1163
<title>Usage</title>
1163
<title>Usage</title>
1164
<p>Perform a merge-sparse join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a>). </p>
1164
<p>Perform a merge-sparse join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a>). </p>
1165
<source>
1165
<source>
1166
a = load 'sorted_input1' using org.apache.pig.piggybank.storage.IndexedStorage('\t', '0');
1166
a = load 'sorted_input1' using org.apache.pig.piggybank.storage.IndexedStorage('\t', '0');
1167
b = load 'sorted_input2' using org.apache.pig.piggybank.storage.IndexedStorage('\t', '0');
1167
b = load 'sorted_input2' using org.apache.pig.piggybank.storage.IndexedStorage('\t', '0');
1168
c = join a by $0, b by $0 using 'merge-sparse';
1168
c = join a by $0, b by $0 using 'merge-sparse';
1169
store c into 'results';
1169
store c into 'results';
1170
</source>
1170
</source>
1171
</section>
1171
</section>
1172

    
   
1172

   
1173
<section>
1173
<section>
1174
<title>Conditions</title>
1174
<title>Conditions</title>
1175
<p>Merge-sparse join only works for inner joins and is not currently implemented for outer joins.</p>
1175
<p>Merge-sparse join only works for inner joins and is not currently implemented for outer joins.</p>
1176

    
   
1176

   
1177
<p>For inner joins, the preconditions are the same as for merge join with the exception of constrains on the right-side loader. For sparse-merge joins the loader must implement IndexedLoadFunc or the join will fail.</p>
1177
<p>For inner joins, the preconditions are the same as for merge join with the exception of constrains on the right-side loader. For sparse-merge joins the loader must implement IndexedLoadFunc or the join will fail.</p>
1178

    
   
1178

   
1179
<p>Piggybank now contains a load function called org.apache.pig.piggybank.storage.IndexedStorage that is a derivation of PigStorage and implements IndexedLoadFunc. This is the only loader included in the standard Pig distribution that can be used for merge-sparse join.</p>
1179
<p>Piggybank now contains a load function called org.apache.pig.piggybank.storage.IndexedStorage that is a derivation of PigStorage and implements IndexedLoadFunc. This is the only loader included in the standard Pig distribution that can be used for merge-sparse join.</p>
1180

    
   
1180

   
1181
</section>
1181
</section>
1182

    
   
1182

   
1183
</section>
1183
</section>
1184
<!-- END MERGE-SPARSE JOIN -->
1184
<!-- END MERGE-SPARSE JOIN -->
1185

    
   
1185

   
1186
<!-- +++++++++++++++++++++++++++++++ -->
1186
<!-- +++++++++++++++++++++++++++++++ -->
1187
<section id="specialized-joins-performance">
1187
<section id="specialized-joins-performance">
1188
<title>Performance Considerations</title>
1188
<title>Performance Considerations</title>
1189
<p>Note the following:</p>
1189
<p>Note the following:</p>
1190
<ul>
1190
<ul>
1191
<li>If one of the data sets is small enough to fit into memory, a Replicated Join is very likely to provide better performance.</li>
1191
<li>If one of the data sets is small enough to fit into memory, a Replicated Join is very likely to provide better performance.</li>
1192
<li>You will also see better performance if the data in the left table is partitioned evenly across part files (no significant skew and each part file contains at least one full block of data).</li>
1192
<li>You will also see better performance if the data in the left table is partitioned evenly across part files (no significant skew and each part file contains at least one full block of data).</li>
1193
</ul>
1193
</ul>
1194
</section>
1194
</section>
1195
<!-- END SPECIALIZED JOINS--> 
1195
<!-- END SPECIALIZED JOINS--> 
1196
   
1196
   
1197
	</section>
1197
	</section>
1198

    
   
1198

   
1199
</body>
1199
</body>
1200
</document>
1200
</document>
trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/data/BinInterSedes.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/data/DataByteArray.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/data/TupleFactory.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/data/TypeAwareTuple.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/impl/PigContext.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/impl/io/NullableTuple.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
Revision 1351931 New Change
 
trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
Revision 1351931 New Change
 
trunk/test/org/apache/pig/test/TestDataBag.java
Revision 1351931 New Change
 
trunk/test/org/apache/pig/test/TestSchema.java
Revision 1351931 New Change
 
  1. trunk/src/docs/src/documentation/content/xdocs/perf.xml: Loading...
  2. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java: Loading...
  3. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java: Loading...
  4. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java: Loading...
  5. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java: Loading...
  6. trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java: Loading...
  7. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java: Loading...
  8. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java: Loading...
  9. trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java: Loading...
  10. trunk/src/org/apache/pig/data/BinInterSedes.java: Loading...
  11. trunk/src/org/apache/pig/data/BinSedesTupleFactory.java: Loading...
  12. trunk/src/org/apache/pig/data/DataByteArray.java: Loading...
  13. trunk/src/org/apache/pig/data/TupleFactory.java: Loading...
  14. trunk/src/org/apache/pig/data/TypeAwareTuple.java: Loading...
  15. trunk/src/org/apache/pig/impl/PigContext.java: Loading...
  16. trunk/src/org/apache/pig/impl/io/NullableTuple.java: Loading...
  17. trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java: Loading...
  18. trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java: Loading...
  19. trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java: Loading...
  20. trunk/src/org/apache/pig/tools/pigstats/ScriptState.java: Loading...
  21. trunk/test/org/apache/pig/test/TestDataBag.java: Loading...
  22. trunk/test/org/apache/pig/test/TestSchema.java: Loading...