Review Board 1.7.22


SQOOP-1139: Fix code style

Review Request #12694 - Created July 17, 2013 and updated

Venkat Ranganathan
SQOOP-1139
Reviewers
Sqoop
sqoop-sqoop2
Fixed code style to match rest of the code
Ran tests - All of them passed

Diff revision 2 (Latest)

1 2
1 2

  1. core/src/main/java/org/apache/sqoop/framework/JobManager.java: Loading...
core/src/main/java/org/apache/sqoop/framework/JobManager.java
Revision 2d37020 New Change
[20] 52 lines
[+20] [+] public class JobManager implements Reconfigurable {
53
    */
53
   */
54
   private static JobManager instance;
54
  private static JobManager instance;
55
   /**
55
  /**
56
    * Create default object by default.
56
   * Create default object by default.
57
    *
57
   *
58
    * Every Sqoop server application needs one so this should not be performance issue.
58
   * Every Sqoop server application needs one so this should not be performance

    
   
59
   * issue.
59
    */
60
   */
60
   static {
61
  static {
61
       instance = new JobManager();
62
    instance = new JobManager();
62
   }
63
  }
63

    
   
64

   
[+20] [20] 7 lines
[+20] [+] public static JobManager getInstance() {
71
   }
72
  }
72

    
   
73

   
73
   /**
74
  /**
74
    * Allows to set instance in case that it's need.
75
   * Allows to set instance in case that it's need.
75
    *
76
   *
76
    * This method should not be normally used as the default instance should be sufficient. One target
77
   * This method should not be normally used as the default instance should be
77
    * user use case for this method are unit tests.
78
   * sufficient. One target user use case for this method are unit tests.
78
    *
79
   *
79
    * @param newInstance New instance
80
   * @param newInstance

    
   
81
   *          New instance
80
    */
82
   */
81
   public static void setInstance(JobManager newInstance) {
83
  public static void setInstance(JobManager newInstance) {
82
       instance = newInstance;
84
    instance = newInstance;
83
   }
85
  }
84

    
   
86

   
85
   /**
87
  /**
86
    * Default interval for purging old submissions from repository.
88
   * Default interval for purging old submissions from repository.
87
    */
89
   */
88
   private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000;
90
  private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000;
89

    
   
91

   
90
   /**
92
  /**
91
    * Default sleep interval for purge thread.
93
   * Default sleep interval for purge thread.
92
    */
94
   */
93
   private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000;
95
  private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000;
94

    
   
96

   
95
   /**
97
  /**
96
    * Default interval for update thread.
98
   * Default interval for update thread.
97
    */
99
   */
98
   private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000;
100
  private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000;
99

    
   
101

   
100
   /**
102
  /**
101
    * Configured submission engine instance
103
   * Configured submission engine instance
102
    */
104
   */
103
   private SubmissionEngine submissionEngine;
105
  private SubmissionEngine submissionEngine;
[+20] [20] 41 lines
[+20] public static void setInstance(JobManager newInstance) {
145
   private String notificationBaseUrl;
147
  private String notificationBaseUrl;
146

    
   
148

   
147
   /**
149
  /**
148
    * Set notification base URL.
150
   * Set notification base URL.
149
    *
151
   *
150
    * @param url Base URL
152
   * @param url

    
   
153
   *          Base URL
151
    */
154
   */
152
   public void setNotificationBaseUrl(String url) {
155
  public void setNotificationBaseUrl(String url) {
153
       LOG.debug("Setting notification base URL to " + url);
156
    LOG.debug("Setting notification base URL to " + url);
154
       notificationBaseUrl = url;
157
    notificationBaseUrl = url;
155
   }
158
  }
[+20] [20] 5 lines
[+20] public void setNotificationBaseUrl(String url) {
161
    */
164
   */
162
   public String getNotificationBaseUrl() {
165
  public String getNotificationBaseUrl() {
163
       return notificationBaseUrl;
166
    return notificationBaseUrl;
164
   }
167
  }
165

    
   
168

   
166
   public  synchronized void destroy() {
169
  public synchronized void destroy() {
167
       LOG.trace("Begin submission engine manager destroy");
170
    LOG.trace("Begin submission engine manager destroy");
168

    
   
171

   
169
       running = false;
172
    running = false;
170

    
   
173

   
171
       try {
174
    try {
172
           purgeThread.interrupt();
175
      purgeThread.interrupt();
173
           purgeThread.join();
176
      purgeThread.join();
174
       } catch (InterruptedException e) {
177
    } catch (InterruptedException e) {
175
           //TODO(jarcec): Do I want to wait until it actually finish here?
178
      // TODO(jarcec): Do I want to wait until it actually finish here?
176
           LOG.error("Interrupted joining purgeThread");
179
      LOG.error("Interrupted joining purgeThread");
177
       }
180
    }
178

    
   
181

   
179
       try {
182
    try {
180
           updateThread.interrupt();
183
      updateThread.interrupt();
181
           updateThread.join();
184
      updateThread.join();
182
       } catch (InterruptedException e) {
185
    } catch (InterruptedException e) {
183
           //TODO(jarcec): Do I want to wait until it actually finish here?
186
      // TODO(jarcec): Do I want to wait until it actually finish here?
184
           LOG.error("Interrupted joining updateThread");
187
      LOG.error("Interrupted joining updateThread");
185
       }
188
    }
186

    
   
189

   
187
       if(submissionEngine != null) {
190
    if (submissionEngine != null) {
188
           submissionEngine.destroy();
191
      submissionEngine.destroy();
189
       }
192
    }
190

    
   
193

   
191
       if(executionEngine != null) {
194
    if (executionEngine != null) {
192
           executionEngine.destroy();
195
      executionEngine.destroy();
193
       }
196
    }
194
   }
197
  }
195

    
   
198

   
196

    
   

   
197
   public synchronized void initialize() {
199
  public synchronized void initialize() {
198
       LOG.trace("Begin submission engine manager initialization");
200
    LOG.trace("Begin submission engine manager initialization");
199
       MapContext context = SqoopConfiguration.getInstance().getContext();
201
    MapContext context = SqoopConfiguration.getInstance().getContext();
200

    
   
202

   
201

    
   

   
202
       // Let's load configured submission engine
203
    // Let's load configured submission engine
203
       String submissionEngineClassName =
204
    String submissionEngineClassName =
204
               context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
205
      context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
205

    
   
206

   
206
       submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName);
207
    submissionEngine = (SubmissionEngine) ClassUtils
207
       if(submissionEngine == null) {
208
      .instantiate(submissionEngineClassName);

    
   
209
    if (submissionEngine == null) {
208
           throw new SqoopException(FrameworkError.FRAMEWORK_0001,
210
      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
209
                   submissionEngineClassName);
211
        submissionEngineClassName);
210
       }
212
    }
211

    
   
213

   
212
       submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
214
    submissionEngine.initialize(context,

    
   
215
      FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
213

    
   
216

   
214
       // Execution engine
217
    // Execution engine
215
       String executionEngineClassName =
218
    String executionEngineClassName =
216
               context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
219
      context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
217

    
   
220

   
218
       executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName);
221
    executionEngine = (ExecutionEngine) ClassUtils
219
       if(executionEngine == null) {
222
      .instantiate(executionEngineClassName);

    
   
223
    if (executionEngine == null) {
220
           throw new SqoopException(FrameworkError.FRAMEWORK_0007,
224
      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
221
                   executionEngineClassName);
225
        executionEngineClassName);
222
       }
226
    }
223

    
   
227

   
224
       // We need to make sure that user has configured compatible combination of
228
    // We need to make sure that user has configured compatible combination of
225
       // submission engine and execution engine
229
    // submission engine and execution engine
226
       if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) {
230
    if (!submissionEngine

    
   
231
      .isExecutionEngineSupported(executionEngine.getClass())) {
227
           throw new SqoopException(FrameworkError.FRAMEWORK_0008);
232
      throw new SqoopException(FrameworkError.FRAMEWORK_0008);
228
       }
233
    }
229

    
   
234

   
230
       executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
235
    executionEngine.initialize(context,

    
   
236
      FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
231

    
   
237

   
232
       // Set up worker threads
238
    // Set up worker threads
233
       purgeThreshold = context.getLong(
239
    purgeThreshold = context.getLong(
234
               FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
240
      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
235
               DEFAULT_PURGE_THRESHOLD
241
      DEFAULT_PURGE_THRESHOLD
[+20] [20] 12 lines
[+20] public synchronized void initialize() {
248
       );
254
      );
249

    
   
255

   
250
       updateThread = new UpdateThread();
256
    updateThread = new UpdateThread();
251
       updateThread.start();
257
    updateThread.start();
252

    
   
258

   
253
       SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
259
    SqoopConfiguration.getInstance().getProvider()

    
   
260
      .registerListener(new CoreConfigurationListener(this));
254

    
   
261

   
255
       LOG.info("Submission manager initialized: OK");
262
    LOG.info("Submission manager initialized: OK");
256
   }
263
  }

    
   
264

   
257
   public MSubmission submit(long jobId) {
265
  public MSubmission submit(long jobId) {
258
       Repository repository = RepositoryManager.getInstance().getRepository();
266
    Repository repository = RepositoryManager.getInstance().getRepository();
259

    
   
267

   
260
       MJob job = repository.findJob(jobId);
268
    MJob job = repository.findJob(jobId);
261
       if(job == null) {
269
    if (job == null) {
262
           throw new SqoopException(FrameworkError.FRAMEWORK_0004,
270
      throw new SqoopException(FrameworkError.FRAMEWORK_0004,
263
                   "Unknown job id " + jobId);
271
        "Unknown job id " + jobId);
264
       }
272
    }
265
       MConnection connection = repository.findConnection(job.getConnectionId());
273
    MConnection connection = repository.findConnection(job.getConnectionId());
266
       SqoopConnector connector =
274
    SqoopConnector connector =
[+20] [20] 57 lines
[+20] public MSubmission submit(long jobId) {
324
               break;
332
        break;
325
           case EXPORT:
333
      case EXPORT:
326
               request.setConnectorCallbacks(connector.getExporter());
334
        request.setConnectorCallbacks(connector.getExporter());
327
               break;
335
        break;
328
           default:
336
      default:
329
               throw  new SqoopException(FrameworkError.FRAMEWORK_0005,
337
        throw new SqoopException(FrameworkError.FRAMEWORK_0005,
330
                       "Unsupported job type " + job.getType().name());
338
          "Unsupported job type " + job.getType().name());
331
       }
339
    }
332
       LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
340
    LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
333

    
   
341

   
334
       // Initialize submission from connector perspective
342
    // Initialize submission from connector perspective
335
       CallbackBase baseCallbacks = request.getConnectorCallbacks();
343
    CallbackBase baseCallbacks = request.getConnectorCallbacks();
336

    
   
344

   
337
       Class<? extends Initializer> initializerClass = baseCallbacks.getInitializer();
345
    Class<? extends Initializer> initializerClass = baseCallbacks
338
       Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
346
      .getInitializer();

    
   
347
    Initializer initializer = (Initializer) ClassUtils

    
   
348
      .instantiate(initializerClass);
339

    
   
349

   
340
       if(initializer == null) {
350
    if (initializer == null) {
341
           throw  new SqoopException(FrameworkError.FRAMEWORK_0006,
351
      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
342
                   "Can't create initializer instance: " + initializerClass.getName());
352
        "Can't create initializer instance: " + initializerClass.getName());
343
       }
353
    }
344

    
   
354

   
345
       // Initializer context
355
    // Initializer context
346
       InitializerContext initializerContext = new InitializerContext(request.getConnectorContext());
356
    InitializerContext initializerContext = new InitializerContext(

    
   
357
      request.getConnectorContext());
347

    
   
358

   
348
       // Initialize submission from connector perspective
359
    // Initialize submission from connector perspective
349
       initializer.initialize(initializerContext,
360
    initializer.initialize(initializerContext,
350
               request.getConfigConnectorConnection(),
361
      request.getConfigConnectorConnection(),
351
               request.getConfigConnectorJob());
362
      request.getConfigConnectorJob());
[+20] [20] 17 lines
[+20] public MSubmission submit(long jobId) {
369
               break;
380
        break;
370
           case EXPORT:
381
      case EXPORT:
371
               prepareExportSubmission(request);
382
        prepareExportSubmission(request);
372
               break;
383
        break;
373
           default:
384
      default:
374
               throw  new SqoopException(FrameworkError.FRAMEWORK_0005,
385
        throw new SqoopException(FrameworkError.FRAMEWORK_0005,
375
                       "Unsupported job type " + job.getType().name());
386
          "Unsupported job type " + job.getType().name());
376
       }
387
    }
377

    
   
388

   
378
       // Make sure that this job id is not currently running and submit the job
389
    // Make sure that this job id is not currently running and submit the job
379
       // only if it's not.
390
    // only if it's not.
380
       synchronized (getClass()) {
391
    synchronized (getClass()) {
381
           MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
392
      MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
382
           if(lastSubmission != null && lastSubmission.getStatus().isRunning()) {
393
      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
383
               throw new SqoopException(FrameworkError.FRAMEWORK_0002,
394
        throw new SqoopException(FrameworkError.FRAMEWORK_0002,
384
                       "Job with id " + jobId);
395
          "Job with id " + jobId);
385
           }
396
      }
386

    
   
397

   
387
           // TODO(jarcec): We might need to catch all exceptions here to ensure
398
      // TODO(jarcec): We might need to catch all exceptions here to ensure
388
           // that Destroyer will be executed in all cases.
399
      // that Destroyer will be executed in all cases.
389
           boolean submitted = submissionEngine.submit(request);
400
      boolean submitted = submissionEngine.submit(request);
390
           if(!submitted) {
401
      if (!submitted) {
391
               destroySubmission(request);
402
        destroySubmission(request);
392
               summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
403
        summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
393
           }
404
      }
394

    
   
405

   
395
           repository.createSubmission(summary);
406
      repository.createSubmission(summary);
396
       }
407
    }
397

    
   
408

   
398
       // Return job status most recent
409
    // Return job status most recent
399
       return summary;
410
    return summary;
400
   }
411
  }
401

    
   
412

   
402
   private void prepareImportSubmission(SubmissionRequest request) {
413
  private void prepareImportSubmission(SubmissionRequest request) {
403
       ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob();
414
    ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request

    
   
415
      .getConfigFrameworkJob();
404

    
   
416

   
405
       // Initialize the map-reduce part (all sort of required classes, ...)
417
    // Initialize the map-reduce part (all sort of required classes, ...)
406
       request.setOutputDirectory(jobConfiguration.output.outputDirectory);
418
    request.setOutputDirectory(jobConfiguration.output.outputDirectory);
407

    
   
419

   
408
       // We're directly moving configured number of extractors and loaders to
420
    // We're directly moving configured number of extractors and loaders to
[+20] [20] 5 lines
[+20] public MSubmission submit(long jobId) {
414
       // Delegate rest of the job to execution engine
426
    // Delegate rest of the job to execution engine
415
       executionEngine.prepareImportSubmission(request);
427
    executionEngine.prepareImportSubmission(request);
416
   }
428
  }
417

    
   
429

   
418
   private void prepareExportSubmission(SubmissionRequest request) {
430
  private void prepareExportSubmission(SubmissionRequest request) {
419
       ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request.getConfigFrameworkJob();
431
    ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request

    
   
432
      .getConfigFrameworkJob();
420

    
   
433

   
421
       // We're directly moving configured number of extractors and loaders to
434
    // We're directly moving configured number of extractors and loaders to
422
       // underlying request object. In the future we might need to throttle this
435
    // underlying request object. In the future we might need to throttle this
423
       // count based on other running jobs to meet our SLAs.
436
    // count based on other running jobs to meet our SLAs.
424
       request.setExtractors(jobConfiguration.throttling.extractors);
437
    request.setExtractors(jobConfiguration.throttling.extractors);
[+20] [20] 11 lines
[+20] public MSubmission submit(long jobId) {
436
       CallbackBase baseCallbacks = request.getConnectorCallbacks();
449
    CallbackBase baseCallbacks = request.getConnectorCallbacks();
437

    
   
450

   
438
       Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer();
451
    Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer();
439
       Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass);
452
    Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass);
440

    
   
453

   
441
       if(destroyer == null) {
454
    if (destroyer == null) {
442
           throw  new SqoopException(FrameworkError.FRAMEWORK_0006,
455
      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
443
                   "Can't create destroyer instance: " + destroyerClass.getName());
456
        "Can't create destroyer instance: " + destroyerClass.getName());
444
       }
457
    }
445

    
   
458

   
446
       DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false, request.getSummary().getConnectorSchema());
459
    DestroyerContext destroyerContext = new DestroyerContext(

    
   
460
      request.getConnectorContext(), false, request.getSummary()

    
   
461
        .getConnectorSchema());
447

    
   
462

   
448
       // Initialize submission from connector perspective
463
    // Initialize submission from connector perspective
449
       destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob());
464
    destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(),

    
   
465
      request.getConfigConnectorJob());
450
   }
466
  }
451

    
   
467

   
452
   public MSubmission stop(long jobId) {
468
  public MSubmission stop(long jobId) {
453
       Repository repository = RepositoryManager.getInstance().getRepository();
469
    Repository repository = RepositoryManager.getInstance().getRepository();
454
       MSubmission submission = repository.findSubmissionLastForJob(jobId);
470
    MSubmission submission = repository.findSubmissionLastForJob(jobId);
455

    
   
471

   
456
       if(submission == null || !submission.getStatus().isRunning()) {
472
    if (submission == null || !submission.getStatus().isRunning()) {
457
           throw new SqoopException(FrameworkError.FRAMEWORK_0003,
473
      throw new SqoopException(FrameworkError.FRAMEWORK_0003,
458
                   "Job with id " + jobId + " is not running");
474
        "Job with id " + jobId + " is not running");
459
       }
475
    }
460

    
   
476

   
461
       String externalId = submission.getExternalId();
477
    String externalId = submission.getExternalId();
[+20] [20] 8 lines
[+20] public MSubmission stop(long jobId) {
470

    
   
486

   
471
   public MSubmission status(long jobId) {
487
  public MSubmission status(long jobId) {
472
       Repository repository = RepositoryManager.getInstance().getRepository();
488
    Repository repository = RepositoryManager.getInstance().getRepository();
473
       MSubmission submission = repository.findSubmissionLastForJob(jobId);
489
    MSubmission submission = repository.findSubmissionLastForJob(jobId);
474

    
   
490

   
475
       if(submission == null) {
491
    if (submission == null) {
476
           return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
492
      return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
477
       }
493
    }
478

    
   
494

   
479
       // If the submission is in running state, let's update it
495
    // If the submission is in running state, let's update it
480
       if(submission.getStatus().isRunning()) {
496
    if (submission.getStatus().isRunning()) {
481
           update(submission);
497
      update(submission);
482
       }
498
    }
483

    
   
499

   
484
       return submission;
500
    return submission;
485
   }
501
  }
486

    
   
502

   
487
   private void update(MSubmission submission) {
503
  private void update(MSubmission submission) {
488
       double progress  = -1;
504
    double progress = -1;
489
       Counters counters = null;
505
    Counters counters = null;
490
       String externalId = submission.getExternalId();
506
    String externalId = submission.getExternalId();
491
       SubmissionStatus newStatus = submissionEngine.status(externalId);
507
    SubmissionStatus newStatus = submissionEngine.status(externalId);
492
       String externalLink = submissionEngine.externalLink(externalId);
508
    String externalLink = submissionEngine.externalLink(externalId);
493

    
   
509

   
494
       if(newStatus.isRunning()) {
510
    if (newStatus.isRunning()) {
495
           progress = submissionEngine.progress(externalId);
511
      progress = submissionEngine.progress(externalId);
496
       } else {
512
    } else {
497
           counters = submissionEngine.counters(externalId);
513
      counters = submissionEngine.counters(externalId);
498
       }
514
    }
499

    
   
515

   
500
       submission.setStatus(newStatus);
516
    submission.setStatus(newStatus);
501
       submission.setProgress(progress);
517
    submission.setProgress(progress);
502
       submission.setCounters(counters);
518
    submission.setCounters(counters);
503
       submission.setExternalLink(externalLink);
519
    submission.setExternalLink(externalLink);
504
       submission.setLastUpdateDate(new Date());
520
    submission.setLastUpdateDate(new Date());
505

    
   
521

   
506
       RepositoryManager.getInstance().getRepository().updateSubmission(submission);
522
    RepositoryManager.getInstance().getRepository()

    
   
523
      .updateSubmission(submission);
507
   }
524
  }
508

    
   
525

   
509
   @Override
526
  @Override
510
   public synchronized void configurationChanged() {
527
  public synchronized void configurationChanged() {
511
     LOG.info("Begin submission engine manager reconfiguring");
528
    LOG.info("Begin submission engine manager reconfiguring");
512
     MapContext newContext = SqoopConfiguration.getInstance().getContext();
529
    MapContext newContext = SqoopConfiguration.getInstance().getContext();
513
     MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
530
    MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
514

    
   
531

   
515
     String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
532
    String newSubmissionEngineClassName = newContext

    
   
533
      .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
516
     if (newSubmissionEngineClassName == null
534
    if (newSubmissionEngineClassName == null
517
         || newSubmissionEngineClassName.trim().length() == 0) {
535
      || newSubmissionEngineClassName.trim().length() == 0) {
518
       throw new SqoopException(FrameworkError.FRAMEWORK_0001,
536
      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
519
           newSubmissionEngineClassName);
537
        newSubmissionEngineClassName);
520
     }
538
    }
521

    
   
539

   
522
     String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
540
    String oldSubmissionEngineClassName = oldContext

    
   
541
      .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
523
     if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
542
    if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
524
       LOG.warn("Submission engine cannot be replaced at the runtime. " +
543
      LOG.warn("Submission engine cannot be replaced at the runtime. " +
525
                "You might need to restart the server.");
544
        "You might need to restart the server.");
526
     }
545
    }
527

    
   
546

   
528
     String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
547
    String newExecutionEngineClassName = newContext

    
   
548
      .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
529
     if (newExecutionEngineClassName == null
549
    if (newExecutionEngineClassName == null
530
         || newExecutionEngineClassName.trim().length() == 0) {
550
      || newExecutionEngineClassName.trim().length() == 0) {
531
       throw new SqoopException(FrameworkError.FRAMEWORK_0007,
551
      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
532
           newExecutionEngineClassName);
552
        newExecutionEngineClassName);
533
     }
553
    }
534

    
   
554

   
535
     String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
555
    String oldExecutionEngineClassName = oldContext

    
   
556
      .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
536
     if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
557
    if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
537
       LOG.warn("Execution engine cannot be replaced at the runtime. " +
558
      LOG.warn("Execution engine cannot be replaced at the runtime. " +
538
                "You might need to restart the server.");
559
        "You might need to restart the server.");
539
     }
560
    }
540

    
   
561

   
[+20] [20] 23 lines
[+20] [+] private class PurgeThread extends Thread {
564
       }
585
    }
565

    
   
586

   
566
       public void run() {
587
    public void run() {
567
           LOG.info("Starting submission manager purge thread");
588
      LOG.info("Starting submission manager purge thread");
568

    
   
589

   
569
           while(running) {
590
      while (running) {
570
               try {
591
        try {
571
                   LOG.info("Purging old submissions");
592
          LOG.info("Purging old submissions");
572
                   Date threshold = new Date((new Date()).getTime() - purgeThreshold);
593
          Date threshold = new Date((new Date()).getTime() - purgeThreshold);
573
                   RepositoryManager.getInstance().getRepository().purgeSubmissions(threshold);
594
          RepositoryManager.getInstance().getRepository()

    
   
595
            .purgeSubmissions(threshold);
574
                   Thread.sleep(purgeSleep);
596
          Thread.sleep(purgeSleep);
575
               } catch (InterruptedException e) {
597
        } catch (InterruptedException e) {
576
                   LOG.debug("Purge thread interrupted", e);
598
          LOG.debug("Purge thread interrupted", e);
577
               }
599
        }
578
           }
600
      }
[+20] [20] 8 lines
[+20] [+] private class UpdateThread extends Thread {
587
       }
609
    }
588

    
   
610

   
589
       public void run() {
611
    public void run() {
590
           LOG.info("Starting submission manager update thread");
612
      LOG.info("Starting submission manager update thread");
591

    
   
613

   
592
           while(running) {
614
      while (running) {
593
               try {
615
        try {
594
                   LOG.debug("Updating running submissions");
616
          LOG.debug("Updating running submissions");
595

    
   
617

   
596
                   // Let's get all running submissions from repository to check them out
618
          // Let's get all running submissions from repository to check them out
597
                   List<MSubmission> unfinishedSubmissions =
619
          List<MSubmission> unfinishedSubmissions =
598
                           RepositoryManager.getInstance().getRepository().findSubmissionsUnfinished();
620
            RepositoryManager.getInstance().getRepository()

    
   
621
              .findSubmissionsUnfinished();
599

    
   
622

   
600
                   for(MSubmission submission : unfinishedSubmissions) {
623
          for (MSubmission submission : unfinishedSubmissions) {
601
                       update(submission);
624
            update(submission);
602
                   }
625
          }
603

    
   
626

   
604
                   Thread.sleep(updateSleep);
627
          Thread.sleep(updateSleep);
605
               } catch (InterruptedException e) {
628
        } catch (InterruptedException e) {
606
                   LOG.debug("Purge thread interrupted", e);
629
          LOG.debug("Purge thread interrupted", e);
607
               }
630
        }
608
           }
631
      }
609

    
   
632

   
610
           LOG.info("Ending submission manager update thread");
633
      LOG.info("Ending submission manager update thread");
611
       }
634
    }
612
   }
635
  }
613
}
636
}
  1. core/src/main/java/org/apache/sqoop/framework/JobManager.java: Loading...