Review Board 1.7.22


PIG-3567 LogicalPlanPrinter throws OOM for large scripts

Review Request #15565 - Created Nov. 15, 2013 and updated

Aniket Mokashi
trunk
PIG-3567
Reviewers
pig
cheolsoo, daijy, julien, rohini
pig
Changed LPPrinter recursion to directly write to stream so that memory footprint is reduced.
Diff'ed explain plan of few existing scripts. No diffs found. I will try to submit a testcase to show that LP print of before after is same (test need not be committed).
trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
Revision 1538708 New Change
[20] 14 lines
[+20]
15
 * See the License for the specific language governing permissions and
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
16
 * limitations under the License.
17
 */
17
 */
18
package org.apache.pig.newplan.logical.optimizer;
18
package org.apache.pig.newplan.logical.optimizer;
19

    
   
19

   
20
import java.io.ByteArrayOutputStream;

   
21
import java.io.IOException;
20
import java.io.IOException;
22
import java.io.PrintStream;
21
import java.io.PrintStream;
23
import java.util.ArrayList;
22
import java.util.ArrayList;
24
import java.util.List;
23
import java.util.List;
25

    
   
24

   
[+20] [20] 19 lines
[+20]
45
 * A visitor mechanism printing out the logical plan.
44
 * A visitor mechanism printing out the logical plan.
46
 */
45
 */
47
public class LogicalPlanPrinter extends PlanVisitor {
46
public class LogicalPlanPrinter extends PlanVisitor {
48

    
   
47

   
49
    private PrintStream mStream = null;
48
    private PrintStream mStream = null;
50
    private String TAB1 = "    ";
49
    private byte[] TAB1 = "    ".getBytes();
51
    private String TABMore = "|   ";
50
    private byte[] TABMore = "|   ".getBytes();
52
    private String LSep = "|\n|---";
51
    private byte[] Bar = "|\n".getBytes();
53
    private String USep = "|   |\n|   ";
52
    private byte[] LSep = "|---".getBytes();

    
   
53
    private byte[] USep = "|   |\n".getBytes();
54
    static public String SEPERATE = "\t";
54
    static public String SEPERATE = "\t";
55

    
   
55

   

    
   
56
    protected ArrayList<byte[]> tabs;

    
   
57
    protected boolean reverse = false;

    
   
58

   
56
    /**
59
    /**
57
     * @param ps PrintStream to output plan information to
60
     * @param ps PrintStream to output plan information to
58
     * @param plan Logical plan to print
61
     * @param plan Logical plan to print
59
     */
62
     */
60
    public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) throws FrontendException {
63
    public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) throws FrontendException {

    
   
64
        this(plan, ps, new ArrayList<byte[]>());

    
   
65
    }

    
   
66

   

    
   
67
    private LogicalPlanPrinter(OperatorPlan plan, PrintStream ps, ArrayList<byte[]> tabs) throws FrontendException {
61
        super(plan, null);
68
        super(plan, null);
62
        mStream = ps;
69
        mStream = ps;

    
   
70
        this.tabs = tabs;

    
   
71
        if (plan instanceof LogicalPlan) {

    
   
72
            reverse = false;

    
   
73
        }

    
   
74
        else {

    
   
75
            reverse = true;

    
   
76
        }
63
    }
77
    }
64

    
   
78

   
65
    @Override
79
    @Override
66
    public void visit() throws FrontendException {
80
    public void visit() throws FrontendException {
67
        try {
81
        try {
68
            if (plan instanceof LogicalPlan) {
82
            depthFirstLP();
69
                mStream.write(depthFirstLP().getBytes());

   
70
            }

   
71
            else {

   
72
                mStream.write(reverseDepthFirstLP().getBytes());

   
73
            }

   
74
        } catch (IOException e) {
83
        } catch (IOException e) {
75
            throw new FrontendException(e);
84
            throw new FrontendException(e);
76
        }
85
        }
77
    }
86
    }
78

    
   
87

   
79
    protected String depthFirstLP() throws FrontendException, IOException {
88
    protected void depthFirstLP() throws FrontendException, IOException {
80
        StringBuilder sb = new StringBuilder();
89
        List<Operator> leaves;
81
        List<Operator> leaves = plan.getSinks();
90
        if(reverse) {

    
   
91
            leaves = plan.getSources();

    
   
92
        } else {

    
   
93
            leaves = plan.getSinks();

    
   
94
        }
82
        for (Operator leaf : leaves) {
95
        for (Operator leaf : leaves) {
83
            sb.append(depthFirst(leaf));
96
            writeWithTabs((leaf.toString()+"\n").getBytes());
84
            sb.append("\n");
97
            depthFirst(leaf);

    
   
98
        }
85
        }
99
    }
86
        return sb.toString();
100

   

    
   
101
    private void writeWithTabs(byte[] data) throws IOException {

    
   
102
        for(byte[] tab : tabs) {

    
   
103
            mStream.write(tab);

    
   
104
        }

    
   
105
        mStream.write(data);
87
    }
106
    }
88
    
107

   
89
    private String depthFirst(Operator node) throws FrontendException, IOException {
108
    private void depthFirst(Operator node) throws FrontendException, IOException {
90
        String nodeString = printNode(node);
109
        printNodePlan(node);
91
        
110
        List<Operator> operators;
92
        List<Operator> originalPredecessors =  plan.getPredecessors(node);
111

   
93
        if (originalPredecessors == null)
112
        if(reverse) {
94
            return nodeString;
113
            operators = plan.getSuccessors(node);
95
        
114
        } else {
96
        StringBuffer sb = new StringBuffer(nodeString);
115
            operators =  plan.getPredecessors(node);
97
        List<Operator> predecessors =  new ArrayList<Operator>(originalPredecessors);
116
        }
98
        
117
        if (operators == null)

    
   
118
            return;

    
   
119

   

    
   
120
        List<Operator> predecessors =  new ArrayList<Operator>(operators);

    
   
121

   
99
        int i = 0;
122
        int i = 0;
100
        for (Operator pred : predecessors) {
123
        for (Operator pred : predecessors) {
101
            i++;
124
            i++;
102
            String DFStr = depthFirst(pred);
125
            writeWithTabs(Bar);
103
            if (DFStr != null) {
126
            writeWithTabs(LSep);
104
                sb.append(LSep);
127
            mStream.write((pred.toString()+"\n").getBytes());
105
                if (i < predecessors.size())
128
            if (i < predecessors.size()) {
106
                    sb.append(shiftStringByTabs(DFStr, 2));
129
                tabs.add(TABMore);
107
                else
130
            } else {
108
                    sb.append(shiftStringByTabs(DFStr, 1));
131
                tabs.add(TAB1);
109
            }
132
            }
110
        }
133
            depthFirst(pred);
111
        return sb.toString();
134
            tabs.remove(tabs.size() - 1);
112
    }
135
        }
113
    
136
    }
114
    protected String reverseDepthFirstLP() throws FrontendException, IOException {
137

   
115
        StringBuilder sb = new StringBuilder();
138
    private void printPlan(OperatorPlan lp) throws VisitorException, IOException {
116
        List<Operator> roots = plan.getSources();
139
        writeWithTabs(USep);
117
        for (Operator root : roots) {
140
        tabs.add(TABMore);
118
            sb.append(reverseDepthFirst(root));

   
119
            sb.append("\n");

   
120
        }

   
121
        return sb.toString();

   
122
    }

   
123
    

   
124
    private String reverseDepthFirst(Operator node) throws FrontendException, IOException {

   
125
        String nodeString = printNode(node);

   
126
        

   
127
        List<Operator> originalSuccessors =  plan.getSuccessors(node);

   
128
        if (originalSuccessors == null)

   
129
            return nodeString;

   
130
        

   
131
        StringBuffer sb = new StringBuffer(nodeString);

   
132
        List<Operator> successors =  new ArrayList<Operator>(originalSuccessors);

   
133
        

   
134
        int i = 0;

   
135
        for (Operator succ : successors) {

   
136
            i++;

   
137
            String DFStr = reverseDepthFirst(succ);

   
138
            if (DFStr != null) {

   
139
                sb.append(LSep);

   
140
                if (i < successors.size())

   
141
                    sb.append(shiftStringByTabs(DFStr, 2));

   
142
                else

   
143
                    sb.append(shiftStringByTabs(DFStr, 1));

   
144
            }

   
145
        }

   
146
        return sb.toString();

   
147
    }

   
148
    

   
149
    private String planString(OperatorPlan lp) throws VisitorException, IOException {

   
150
        StringBuilder sb = new StringBuilder();

   
151
        ByteArrayOutputStream baos = new ByteArrayOutputStream();

   
152
        PrintStream ps = new PrintStream(baos);

   
153
        if(lp!=null) {
141
        if(lp!=null) {
154
            LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, ps);
142
            LogicalPlanPrinter printer = new LogicalPlanPrinter(lp, mStream, tabs);
155
            printer.visit();
143
            printer.visit();
156
        }
144
        }
157
        else
145
        tabs.remove(tabs.size() - 1);
158
            return "";
146
    }
159
        sb.append(USep);
147

   
160
        sb.append(shiftStringByTabs(baos.toString(), 2));
148
    private void printNodePlan(Operator node) throws FrontendException, IOException {
161
        return sb.toString();

   
162
    }

   
163
    

   
164
    private String printNode(Operator node) throws FrontendException, IOException {

   
165
        StringBuilder sb = new StringBuilder(node.toString()+"\n");

   
166
        

   
167
        if(node instanceof LOFilter){
149
        if(node instanceof LOFilter){
168
            sb.append(planString(((LOFilter)node).getFilterPlan()));
150
            printPlan(((LOFilter)node).getFilterPlan());
169
        }
151
        }
170
        else if(node instanceof LOLimit){
152
        else if(node instanceof LOLimit){
171
            sb.append(planString(((LOLimit)node).getLimitPlan()));
153
            printPlan(((LOLimit)node).getLimitPlan());
172
        }
154
        }
173
        else if(node instanceof LOForEach){
155
        else if(node instanceof LOForEach){
174
            sb.append(planString(((LOForEach)node).getInnerPlan()));        
156
            printPlan(((LOForEach)node).getInnerPlan());        
175
        }
157
        }
176
        else if(node instanceof LOCogroup){
158
        else if(node instanceof LOCogroup){
177
            MultiMap<Integer, LogicalExpressionPlan> plans = ((LOCogroup)node).getExpressionPlans();
159
            MultiMap<Integer, LogicalExpressionPlan> plans = ((LOCogroup)node).getExpressionPlans();
178
            for (int i : plans.keySet()) {
160
            for (int i : plans.keySet()) {
179
                // Visit the associated plans
161
                // Visit the associated plans
180
                for (OperatorPlan plan : plans.get(i)) {
162
                for (OperatorPlan plan : plans.get(i)) {
181
                    sb.append(planString(plan));
163
                    printPlan(plan);
182
                }
164
                }
183
            }
165
            }
184
        }
166
        }
185
        else if(node instanceof LOJoin){
167
        else if(node instanceof LOJoin){
186
            MultiMap<Integer, LogicalExpressionPlan> plans = ((LOJoin)node).getExpressionPlans();
168
            MultiMap<Integer, LogicalExpressionPlan> plans = ((LOJoin)node).getExpressionPlans();
187
            for (int i: plans.keySet()) {
169
            for (int i: plans.keySet()) {
188
                // Visit the associated plans
170
                // Visit the associated plans
189
                for (OperatorPlan plan : plans.get(i)) {
171
                for (OperatorPlan plan : plans.get(i)) {
190
                    sb.append(planString(plan));
172
                    printPlan(plan);
191
                }
173
                }
192
            }
174
            }
193
        }
175
        }
194
        else if(node instanceof LORank){
176
        else if(node instanceof LORank){
195
            // Visit fields for rank
177
            // Visit fields for rank
196
            for (OperatorPlan plan : ((LORank)node).getRankColPlans())
178
            for (OperatorPlan plan : ((LORank)node).getRankColPlans())
197
                sb.append(planString(plan));
179
                printPlan(plan);
198
        }
180
        }
199
        else if(node instanceof LOSort){
181
        else if(node instanceof LOSort){
200
            for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
182
            for (OperatorPlan plan : ((LOSort)node).getSortColPlans())
201
                sb.append(planString(plan));
183
                printPlan(plan);
202
        }
184
        }
203
        else if(node instanceof LOSplitOutput){
185
        else if(node instanceof LOSplitOutput){
204
            sb.append(planString(((LOSplitOutput)node).getFilterPlan()));
186
            printPlan(((LOSplitOutput)node).getFilterPlan());
205
        }
187
        }
206
        else if(node instanceof LOGenerate){
188
        else if(node instanceof LOGenerate){
207
            for (OperatorPlan plan : ((LOGenerate)node).getOutputPlans()) {
189
            for (OperatorPlan plan : ((LOGenerate)node).getOutputPlans()) {
208
                sb.append(planString(plan));
190
                printPlan(plan);
209
            }

   
210
        }

   
211
        return sb.toString();

   
212
    }
191
            }
213

    
   

   
214
    private String shiftStringByTabs(String DFStr, int TabType) {

   
215
        StringBuilder sb = new StringBuilder();

   
216
        String[] spl = DFStr.split("\n");

   
217

    
   

   
218
        String tab = (TabType == 1) ? TAB1 : TABMore;

   
219

    
   

   
220
        sb.append(spl[0] + "\n");

   
221
        for (int i = 1; i < spl.length; i++) {

   
222
            sb.append(tab);

   
223
            sb.append(spl[i]);

   
224
            sb.append("\n");

   
225
        }
192
        }
226
        return sb.toString();

   
227
    }
193
    }
228
}
194
}
229

    
   
195

   
230
        
196

   
  1. trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java: Loading...