Review Board 1.7.22


Changes to add support for streaming_python udfs.

Review Request #13781 - Created Aug. 23, 2013 and updated

Jeremy Karn
Reviewers
pig
pig-git
Changes for PIG-2417 (https://issues.apache.org/jira/browse/PIG-2417)

 
src/org/apache/pig/scripting/ScriptingIllustrateOutputCapturer.java
New File

    
   
1
package org.apache.pig.scripting;

    
   
2

   

    
   
3
import java.io.BufferedReader;

    
   
4
import java.io.File;

    
   
5
import java.io.FileInputStream;

    
   
6
import java.io.IOException;

    
   
7
import java.io.InputStreamReader;

    
   
8
import java.io.Reader;

    
   
9
import java.util.HashMap;

    
   
10
import java.util.Map;

    
   
11
import java.util.UUID;

    
   
12

   

    
   
13
import org.apache.commons.logging.Log;

    
   
14
import org.apache.commons.logging.LogFactory;

    
   
15
import org.apache.hadoop.conf.Configuration;

    
   
16
import org.apache.pig.ExecType;

    
   
17
import org.apache.pig.impl.util.UDFContext;

    
   
18

   

    
   
19
import com.google.common.base.Charsets;

    
   
20

   

    
   
21
public class ScriptingIllustrateOutputCapturer {

    
   
22
    private static Log log = LogFactory.getLog(ScriptingIllustrateOutputCapturer.class);

    
   
23
    

    
   
24
    private static Map<String, String> outputFileNames = new HashMap<String, String>();

    
   
25
    private static String runId = UUID.randomUUID().toString(); //Unique ID for this run to ensure udf output files aren't corrupted from previous runs.

    
   
26

   

    
   
27
    //Illustrate will set the static flag telling udf to start capturing its output.  It's up to each

    
   
28
    //instance to react to it and set its own flag.

    
   
29
    private static boolean captureOutput = false;

    
   
30
    private boolean instancedCapturingOutput = false;

    
   
31

   

    
   
32
    private ExecType execType;

    
   
33

   

    
   
34
    public ScriptingIllustrateOutputCapturer(ExecType execType) {

    
   
35
        this.execType = execType;

    
   
36
    }

    
   
37

   

    
   
38
    public String getStandardOutputRootWriteLocation() {

    
   
39
        Configuration conf = UDFContext.getUDFContext().getJobConf();

    
   
40
        

    
   
41
        String jobId = conf.get("mapred.job.id");

    
   
42
        String taskId = conf.get("mapred.task.id");

    
   
43
        log.debug("JobId: " + jobId);

    
   
44
        log.debug("TaskId: " + taskId);

    
   
45

   

    
   
46
        String userlogDir = System.getProperty("hadoop.log.dir") + "/userlogs";

    
   
47

   

    
   
48
        if (execType.isLocal()) {

    
   
49
            String logDir = System.getProperty("pig.udf.scripting.log.dir");

    
   
50
            if (logDir == null)

    
   
51
                logDir = ".";

    
   
52
            return logDir + "/" + (taskId == null ? "" : (taskId + "_"));

    
   
53
        } else {

    
   
54
            String taskLogDir = getTaskLogDir(userlogDir, jobId, taskId);

    
   
55
            return taskLogDir + "/";

    
   
56
        }

    
   
57
    }

    
   
58

   

    
   
59
    public String getTaskLogDir(String userlogDir, String jobId, String taskId) {

    
   
60
        String taskLogDir;

    
   
61
        if ( (new File(userlogDir + "/" + jobId).exists()) ) {

    
   
62
            taskLogDir = userlogDir + "/" + jobId + "/" + taskId;

    
   
63
        } else {

    
   
64
            taskLogDir = userlogDir + "/" + taskId;

    
   
65
        }

    
   
66
        return taskLogDir;

    
   
67
    }

    
   
68
    

    
   
69
    public static void startCapturingOutput() {

    
   
70
       ScriptingIllustrateOutputCapturer.captureOutput = true;

    
   
71
    }

    
   
72
    

    
   
73
    public static Map<String, String> getUdfOutput() throws IOException {

    
   
74
        Map<String, String> udfFuncNameToOutput = new HashMap<String,String>();

    
   
75
        for (Map.Entry<String, String> funcToOutputFileName : outputFileNames.entrySet()) {

    
   
76
            StringBuffer udfOutput = new StringBuffer();

    
   
77
            FileInputStream fis = new FileInputStream(funcToOutputFileName.getValue());

    
   
78
            Reader fr = new InputStreamReader(fis, Charsets.UTF_8);

    
   
79
            BufferedReader br = new BufferedReader(fr);

    
   
80
            

    
   
81
            try {

    
   
82
                String line = br.readLine();

    
   
83
                while (line != null) {

    
   
84
                    udfOutput.append("\t" + line + "\n");

    
   
85
                    line = br.readLine();

    
   
86
                }

    
   
87
            } finally {

    
   
88
                br.close();

    
   
89
            }

    
   
90
            udfFuncNameToOutput.put(funcToOutputFileName.getKey(), udfOutput.toString());

    
   
91
        }

    
   
92
        return udfFuncNameToOutput;

    
   
93
    }

    
   
94
    

    
   
95
    public void registerOutputLocation(String functionName, String fileName) {

    
   
96
        outputFileNames.put(functionName, fileName);

    
   
97
    }

    
   
98
    

    
   
99
    public static String getRunId() {

    
   
100
        return runId;

    
   
101
    }

    
   
102
    

    
   
103
    public static boolean isClassCapturingOutput() {

    
   
104
        return ScriptingIllustrateOutputCapturer.captureOutput;

    
   
105
    }

    
   
106
    

    
   
107
    public boolean isInstanceCapturingOutput() {

    
   
108
        return this.instancedCapturingOutput;

    
   
109
    }

    
   
110
    

    
   
111
    public void setInstanceCapturingOutput(boolean instanceCapturingOutput) {

    
   
112
        this.instancedCapturingOutput = instanceCapturingOutput;

    
   
113
    }

    
   
114
}
src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
New File
 
src/python/streaming/controller.py
New File
 
src/python/streaming/pig_util.py
New File
 
test/unit-tests
Revision d52ad9d New Change
 
test/org/apache/pig/builtin/TestPigToStreamUDF.java
New File
 
test/org/apache/pig/builtin/TestStreamUDFToPig.java
New File
 
test/org/apache/pig/builtin/TestStreamingUDF.java
New File
 
test/org/apache/pig/impl/streaming/TestExecutableManager.java
Revision 6246019 New Change
 
test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
New File
 
test/org/apache/pig/impl/streaming/TestStreamingUtil.java
New File
 
test/org/apache/pig/test/TestPigStreaming.java
New File
 
test/org/apache/pig/test/TestStreaming.java
Revision 1eac5d2 New Change
 
test/python/streaming/test_controller.py
New File
 
  1. src/org/apache/pig/scripting/ScriptingIllustrateOutputCapturer.java: Loading...
  2. src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java: Loading...
  3. src/python/streaming/controller.py: Loading...
  4. src/python/streaming/pig_util.py: Loading...
  5. test/unit-tests: Loading...
  6. test/org/apache/pig/builtin/TestPigToStreamUDF.java: Loading...
  7. test/org/apache/pig/builtin/TestStreamUDFToPig.java: Loading...
  8. test/org/apache/pig/builtin/TestStreamingUDF.java: Loading...
  9. test/org/apache/pig/impl/streaming/TestExecutableManager.java: Loading...
  10. test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java: Loading...
  11. test/org/apache/pig/impl/streaming/TestStreamingUtil.java: Loading...
  12. test/org/apache/pig/test/TestPigStreaming.java: Loading...
  13. test/org/apache/pig/test/TestStreaming.java: Loading...
  14. test/python/streaming/test_controller.py: Loading...
This diff has been split across 2 pages: < 1 2