Review Board 1.7.22


PIG-3096 Make PigUnit thread safe

Review Request #8631 - Created Dec. 17, 2012 and submitted

Cheolsoo Park
PIG-3096
Reviewers
pig
sms
pig-git
Currently, PigUnit is not thread-safe because Cluster and PigServer are declared as static. Converting them to ThreadLocal allows PigUnit to run in multi-threaded environment.
ant test -Dtestcase=TestPigTest

I also tested it by running multiple PigUnit cases in parallel with tempus-fugit (http://tempusfugitlibrary.org/documentation/junit/parallel/) on a real cluster.

Diff revision 1 (Latest)

  1. test/org/apache/pig/pigunit/PigTest.java: Loading...
test/org/apache/pig/pigunit/PigTest.java
Revision 50a5c79 New Change
[20] 31 lines
[+20]
32
import org.apache.log4j.Logger;
32
import org.apache.log4j.Logger;
33
import org.apache.pig.ExecType;
33
import org.apache.pig.ExecType;
34
import org.apache.pig.backend.executionengine.ExecException;
34
import org.apache.pig.backend.executionengine.ExecException;
35
import org.apache.pig.data.DataType;
35
import org.apache.pig.data.DataType;
36
import org.apache.pig.data.Tuple;
36
import org.apache.pig.data.Tuple;

    
   
37
import org.apache.pig.impl.io.FileLocalizer;
37
import org.apache.pig.impl.logicalLayer.schema.Schema;
38
import org.apache.pig.impl.logicalLayer.schema.Schema;
38
import org.apache.pig.pigunit.pig.PigServer;
39
import org.apache.pig.pigunit.pig.PigServer;
39
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
40
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
40
import org.apache.pig.tools.parameters.ParseException;
41
import org.apache.pig.tools.parameters.ParseException;
41

    
   
42

   
[+20] [20] 13 lines
[+20] [+] public class PigTest {
55
  /** The list of file arguments of the script. */
56
  /** The list of file arguments of the script. */
56
  private final String[] argFiles;
57
  private final String[] argFiles;
57
  /** The list of aliases to override in the script. */
58
  /** The list of aliases to override in the script. */
58
  private final Map<String, String> aliasOverrides;
59
  private final Map<String, String> aliasOverrides;
59

    
   
60

   
60
  private static PigServer pig;
61
  private static ThreadLocal<PigServer> pig = new ThreadLocal<PigServer>();
61
  private static Cluster cluster;
62
  private static ThreadLocal<Cluster> cluster = new ThreadLocal<Cluster>();

    
   
63

   
62
  private static final Logger LOG = Logger.getLogger(PigTest.class);
64
  private static final Logger LOG = Logger.getLogger(PigTest.class);
63
  private static final String EXEC_CLUSTER = "pigunit.exectype.cluster";
65
  private static final String EXEC_CLUSTER = "pigunit.exectype.cluster";
64

    
   
66

   
65
  /**
67
  /**
66
   * Initializes the Pig test.
68
   * Initializes the Pig test.
[+20] [20] 38 lines
[+20] public class PigTest {
105
  }
107
  }
106

    
   
108

   
107
  public PigTest(String scriptPath, String[] args, PigServer pig, Cluster cluster)
109
  public PigTest(String scriptPath, String[] args, PigServer pig, Cluster cluster)
108
      throws IOException {
110
      throws IOException {
109
    this(args, null, readFile(scriptPath));
111
    this(args, null, readFile(scriptPath));
110
    PigTest.pig = pig;
112
    PigTest.pig.set(pig);
111
    PigTest.cluster = cluster;
113
    PigTest.cluster.set(cluster);
112
  }
114
  }
113

    
   
115

   
114
  /**
116
  /**
115
   * Connects and starts if needed the PigServer.
117
   * Connects and starts if needed the PigServer.
116
   *
118
   *
117
   * @return The cluster where input files can be copied.
119
   * @return Reference to the Cluster in ThreadLocal.
118
   * @throws ExecException If the PigServer can't be started.
120
   * @throws ExecException If the PigServer can't be started.
119
   */
121
   */
120
  public static Cluster getCluster() throws ExecException {
122
  public static Cluster getCluster() throws ExecException {
121
    if (cluster == null) {
123
    if (cluster.get() == null) {
122
      if (System.getProperties().containsKey(EXEC_CLUSTER)) {
124
      if (System.getProperties().containsKey(EXEC_CLUSTER)) {
123
        LOG.info("Using cluster mode");
125
        LOG.info("Using cluster mode");
124
        pig = new PigServer(ExecType.MAPREDUCE);
126
        pig.set(new PigServer(ExecType.MAPREDUCE));
125
      } else {
127
      } else {
126
        LOG.info("Using default local mode");
128
        LOG.info("Using default local mode");
127
        pig = new PigServer(ExecType.LOCAL);
129
        pig.set(new PigServer(ExecType.LOCAL));

    
   
130
      }

    
   
131

   

    
   
132
      cluster.set(new Cluster(pig.get().getPigContext()));
128
      }
133
    }
129

    
   
134

   
130
      cluster = new Cluster(pig.getPigContext());
135
    return cluster.get();
131
    }
136
  }
132

    
   
137

   
133
    return cluster;
138
  /**

    
   
139
   * Return the PigServer.

    
   
140
   *

    
   
141
   * @return Reference to the PigServer in ThreadLocal.

    
   
142
   */

    
   
143
  public static PigServer getPigServer() {

    
   
144
    return pig.get();
134
  }
145
  }
135

    
   
146

   
136
  /**
147
  /**
137
   * Registers a pig scripts with its variables substituted.
148
   * Registers a pig scripts with its variables substituted.
138
   *
149
   *
139
   * @throws IOException If a temp file containing the pig script could not be created.
150
   * @throws IOException If a temp file containing the pig script could not be created.
140
   * @throws ParseException The pig script could not have all its variables substituted.
151
   * @throws ParseException The pig script could not have all its variables substituted.
141
   */
152
   */
142
  protected void registerScript() throws IOException, ParseException {
153
  protected void registerScript() throws IOException, ParseException {
143
    PigTest.getCluster();
154
    getCluster();
144

    
   
155

   
145
    BufferedReader pigIStream = new BufferedReader(new StringReader(this.originalTextPigScript));
156
    BufferedReader pigIStream = new BufferedReader(new StringReader(this.originalTextPigScript));
146
    StringWriter pigOStream = new StringWriter();
157
    StringWriter pigOStream = new StringWriter();
147

    
   
158

   
148
    ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
159
    ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50);
[+20] [20] 6 lines
[+20] public class PigTest { public static PigServer getPigServer() {
155
    PrintWriter pw = new PrintWriter(f);
166
    PrintWriter pw = new PrintWriter(f);
156
    pw.println(substitutedPig);
167
    pw.println(substitutedPig);
157
    pw.close();
168
    pw.close();
158

    
   
169

   
159
    String pigSubstitutedFile = f.getCanonicalPath();
170
    String pigSubstitutedFile = f.getCanonicalPath();
160
    pig.registerScript(pigSubstitutedFile, aliasOverrides);
171
    getPigServer().registerScript(pigSubstitutedFile, aliasOverrides);
161
  }
172
  }
162

    
   
173

   
163
  /**
174
  /**
164
   * Executes the Pig script with its current overrides.
175
   * Executes the Pig script with its current overrides.
165
   *
176
   *
[+20] [20] 12 lines
[+20] [+] public void runScript() throws IOException, ParseException {
178
   * @throws ParseException If the Pig script could not be parsed.
189
   * @throws ParseException If the Pig script could not be parsed.
179
   * @throws IOException If the Pig script could not be executed correctly.
190
   * @throws IOException If the Pig script could not be executed correctly.
180
   */
191
   */
181
  public Iterator<Tuple> getAlias(String alias) throws IOException, ParseException {
192
  public Iterator<Tuple> getAlias(String alias) throws IOException, ParseException {
182
    registerScript();
193
    registerScript();
183
    return pig.openIterator(alias);
194
    return getPigServer().openIterator(alias);
184
  }
195
  }
185

    
   
196

   
186
  /**
197
  /**
187
   * Gets an iterator on the content of the latest STORE alias of the script.
198
   * Gets an iterator on the content of the latest STORE alias of the script.
188
   *
199
   *
[+20] [20] 60 lines
[+20] [+] public void assertOutput(String alias, File expected) throws IOException, ParseException {
249
  public void assertOutput(String aliasInput, String[] input, String alias, String[] expected)
260
  public void assertOutput(String aliasInput, String[] input, String alias, String[] expected)
250
      throws IOException, ParseException {
261
      throws IOException, ParseException {
251
    registerScript();
262
    registerScript();
252

    
   
263

   
253
    StringBuilder sb = new StringBuilder();
264
    StringBuilder sb = new StringBuilder();
254
    Schema.stringifySchema(sb, pig.dumpSchema(aliasInput), DataType.TUPLE) ;
265
    Schema.stringifySchema(sb, getPigServer().dumpSchema(aliasInput), DataType.TUPLE) ;
255

    
   
266

   
256
    final String destination = "pigunit-input-overriden.txt";
267
    final String destination = FileLocalizer.getTemporaryPath(getPigServer().getPigContext()).toString();
257
    cluster.copyFromLocalFile(input, destination, true);
268
    getCluster().copyFromLocalFile(input, destination, true);
258
    override(aliasInput,
269
    override(aliasInput,
259
        String.format("%s = LOAD '%s' AS %s;", aliasInput, destination, sb.toString()));
270
        String.format("%s = LOAD '%s' AS %s;", aliasInput, destination, sb.toString()));
260

    
   
271

   
261
    assertOutput(alias, expected);
272
    assertOutput(alias, expected);
262
  }
273
  }
[+20] [20] 21 lines
  1. test/org/apache/pig/pigunit/PigTest.java: Loading...