Review Board 1.7.22


PIG-3629 Implement STREAM operator in Tez

Review Request #16309 - Created Dec. 18, 2013 and updated

Alex Bain
tez
PIG-3629
Reviewers
pig
cheolsoo, daijy, mwagner, rohini
pig-git
Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629

In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU

Basic Changes:
-Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
-Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
-Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.

Resource Manager Changes:
-TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
-CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.

Race condition:
-I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
-I also made a small cleanup to PhysicalOperator.java.
Added a unit test to TestTezCompiler.java
Added a new unit test e2e test to tez.conf with session reuse enabled
Ported three other e2e tests from streaming.conf to tez.conf to increase coverage

ant test-tez passes
ant test-e2e-tez passes
Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass
src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
Revision 28a110a New Change
[20] 94 lines
[+20] [+] public TezJobControl compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
95
    private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
95
    private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
96
            throws JobCreationException {
96
            throws JobCreationException {
97
        try {
97
        try {
98
            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
98
            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
99
            localResources.putAll(planContainer.getLocalResources());
99
            localResources.putAll(planContainer.getLocalResources());
100
            localResources.putAll(tezPlan.getLocalExtraResources());
100
            localResources.putAll(tezPlan.getExtraResources());
101
            TezDAG tezDag = buildDAG(tezPlan, localResources);
101
            TezDAG tezDag = buildDAG(tezPlan, localResources);
102
            return new TezJob(tezConf, tezDag, localResources);
102
            return new TezJob(tezConf, tezDag, localResources);
103
        } catch (Exception e) {
103
        } catch (Exception e) {
104
            int errCode = 2017;
104
            int errCode = 2017;
105
            String msg = "Internal error creating job configuration.";
105
            String msg = "Internal error creating job configuration.";
106
            throw new JobCreationException(msg, errCode, PigException.BUG, e);
106
            throw new JobCreationException(msg, errCode, PigException.BUG, e);
107
        }
107
        }
108
    }
108
    }
109
}
109
}
110

    
   
110

   
src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
Revision 3e6ec7b New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
Revision 7342dab New Change
 
src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
Revision e28de47 New Change
 
  1. src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java: Loading...
  2. src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java: Loading...
  3. src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java: Loading...
  4. src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java: Loading...