Review Board 1.7.22


SQOOP-994 Sqoop2: upgrade: add calling validation to the upgrade method.

Review Request #12248 - Created July 3, 2013 and submitted

Mengwei Ding
SQOOP-994
Reviewers
Sqoop
abe, hshreedharan, jarcec
sqoop-sqoop2
commit 5772ec00d04595174ad2018dbb3f6af74fd82cba
Author: Mengwei Ding <mengwei.ding@cloudera.com>
Date:   Tue Jul 2 18:48:29 2013 -0700

    SQOOP-994 Sqoop2: upgrade: add calling validation to the upgrade method.

:100644 100644 46cb7e6... 0bedcbb... M	core/src/main/java/org/apache/sqoop/repository/Repository.java
:100644 100644 c616889... 3f3a9e6... M	core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
This implementation is just from my preliminary idea about this jira. I didn't do any test, cause there is a separate jira aiming to do the test for the whole metadata upgrading. And I will do the test in that jira.
core/src/main/java/org/apache/sqoop/repository/Repository.java
Revision 46cb7e6 New Change
[20] 35 lines
[+20]
36
import org.apache.sqoop.model.MJobForms;
36
import org.apache.sqoop.model.MJobForms;
37
import org.apache.sqoop.model.MMapInput;
37
import org.apache.sqoop.model.MMapInput;
38
import org.apache.sqoop.model.MStringInput;
38
import org.apache.sqoop.model.MStringInput;
39
import org.apache.sqoop.model.MSubmission;
39
import org.apache.sqoop.model.MSubmission;
40
import org.apache.sqoop.model.ModelError;
40
import org.apache.sqoop.model.ModelError;

    
   
41
import org.apache.sqoop.validation.Validation;

    
   
42
import org.apache.sqoop.validation.Validator;
41

    
   
43

   
42
import java.util.ArrayList;
44
import java.util.ArrayList;
43
import java.util.Date;
45
import java.util.Date;
44
import java.util.List;
46
import java.util.List;
45

    
   
47

   
[+20] [20] 326 lines
[+20] [+] public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
372
     * 4. Delete the inputs for all of the jobs and connections (in that order)
374
     * 4. Delete the inputs for all of the jobs and connections (in that order)
373
     * 5. Remove all inputs and forms associated with the connector, and
375
     * 5. Remove all inputs and forms associated with the connector, and
374
     *    register the new forms and inputs.
376
     *    register the new forms and inputs.
375
     * 6. Create new connections and jobs with connector part being the ones
377
     * 6. Create new connections and jobs with connector part being the ones
376
     *    returned by the upgrader.
378
     *    returned by the upgrader.
377
     * 7. Insert the connection inputs followed by job inputs (using
379
     * 7. Validate new connections and jobs with connector's validator

    
   
380
     * 8. If any invalid connections or jobs detected, throw an exception

    
   
381
     *    and stop the bootup of Sqoop server

    
   
382
     * 9. Otherwise, Insert the connection inputs followed by job inputs (using
378
     *    updateJob and updateConnection)
383
     *    updateJob and updateConnection)
379
     */
384
     */
380
    RepositoryTransaction tx = null;
385
    RepositoryTransaction tx = null;
381
    try {
386
    try {
382
      SqoopConnector connector =
387
      SqoopConnector connector =
383
        ConnectorManager.getInstance().getConnector(newConnector
388
        ConnectorManager.getInstance().getConnector(newConnector
384
          .getUniqueName());
389
          .getUniqueName());

    
   
390

   

    
   
391
      Validator validator = connector.getValidator();

    
   
392

   

    
   
393
      // lists to buffer invalid connections and jobs

    
   
394
      List<MConnection> invalidConnections = new ArrayList<MConnection>();

    
   
395
      List<MJob> invalidJobs = new ArrayList<MJob>();

    
   
396

   
385
      MetadataUpgrader upgrader = connector.getMetadataUpgrader();
397
      MetadataUpgrader upgrader = connector.getMetadataUpgrader();
386
      List<MConnection> connections = findConnectionsForConnector(
398
      List<MConnection> connections = findConnectionsForConnector(
387
        connectorID);
399
        connectorID);
388
      List<MJob> jobs = findJobsForConnector(connectorID);
400
      List<MJob> jobs = findJobsForConnector(connectorID);
389
      // -- BEGIN TXN --
401
      // -- BEGIN TXN --
[+20] [20] 10 lines
[+20] public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
400
        MConnectionForms newConnectionForms = new MConnectionForms(forms);
412
        MConnectionForms newConnectionForms = new MConnectionForms(forms);
401
        upgrader.upgrade(connection.getConnectorPart(), newConnectionForms);
413
        upgrader.upgrade(connection.getConnectorPart(), newConnectionForms);
402
        MConnection newConnection = new MConnection(connectorID,
414
        MConnection newConnection = new MConnection(connectorID,
403
          newConnectionForms, connection.getFrameworkPart());
415
          newConnectionForms, connection.getFrameworkPart());
404
        newConnection.setPersistenceId(connectionID);
416
        newConnection.setPersistenceId(connectionID);

    
   
417

   

    
   
418
        Validation validation = validator.validateConnection(newConnection);

    
   
419
        if (validation.getStatus().canProceed()) {
405
        updateConnection(newConnection, tx);
420
          updateConnection(newConnection, tx);

    
   
421
        } else {

    
   
422
          invalidConnections.add(newConnection);

    
   
423
        }
406
      }
424
      }
407
      for (MJob job : jobs) {
425
      for (MJob job : jobs) {
408
        // Make a new copy of the forms from the connector,
426
        // Make a new copy of the forms from the connector,
409
        // else the values will get set in the forms in the connector for
427
        // else the values will get set in the forms in the connector for
410
        // each connection.
428
        // each connection.
411
        List<MForm> forms = newConnector.getJobForms(job.getType()).clone(false).getForms();
429
        List<MForm> forms = newConnector.getJobForms(job.getType()).clone(false).getForms();
412
        MJobForms newJobForms = new MJobForms(job.getType(), forms);
430
        MJobForms newJobForms = new MJobForms(job.getType(), forms);
413
        upgrader.upgrade(job.getConnectorPart(), newJobForms);
431
        upgrader.upgrade(job.getConnectorPart(), newJobForms);
414
        MJob newJob = new MJob(connectorID, job.getConnectionId(),
432
        MJob newJob = new MJob(connectorID, job.getConnectionId(),
415
          job.getType(), newJobForms, job.getFrameworkPart());
433
          job.getType(), newJobForms, job.getFrameworkPart());
416
        newJob.setPersistenceId(job.getPersistenceId());
434
        newJob.setPersistenceId(job.getPersistenceId());

    
   
435

   

    
   
436
        Validation validation = validator.validateJob(newJob.getType(), newJob);

    
   
437
        if (validation.getStatus().canProceed()) {
417
        updateJob(newJob, tx);
438
          updateJob(newJob, tx);

    
   
439
        } else {

    
   
440
          invalidJobs.add(newJob);
418
      }
441
        }

    
   
442
      }

    
   
443

   

    
   
444
      if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
419
      tx.commit();
445
        tx.commit();

    
   
446
      } else {

    
   
447
        String msg = "Metadata upgrade for connector failed because of invalid Connections or Jobs.\n";

    
   
448

   

    
   
449
        if (invalidConnections.size() > 0) {

    
   
450
          msg += "Connections: ";

    
   
451
          for (MConnection connection : invalidConnections) {

    
   
452
            msg += connection.getPersistenceId() + ", ";

    
   
453
          }

    
   
454
          msg += "\n";

    
   
455
        }

    
   
456

   

    
   
457
        if (invalidJobs.size() > 0) {

    
   
458
          msg += "Jobs: ";

    
   
459
          for (MJob job : invalidJobs) {

    
   
460
            msg += job.getPersistenceId() + ", ";

    
   
461
          }

    
   
462
          msg += "\n";

    
   
463
        }

    
   
464

   

    
   
465
        throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);

    
   
466
      }
420
    } catch (Exception ex) {
467
    } catch (Exception ex) {
421
      if(tx != null) {
468
      if(tx != null) {
422
        tx.rollback();
469
        tx.rollback();
423
      }
470
      }
424
      throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
471
      throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
[+20] [20] 12 lines
[+20] [+] public final void upgradeFramework(MFramework framework) {
437
      MetadataUpgrader upgrader = FrameworkManager.getInstance()
484
      MetadataUpgrader upgrader = FrameworkManager.getInstance()
438
        .getMetadataUpgrader();
485
        .getMetadataUpgrader();
439
      List<MConnection> connections = findConnections();
486
      List<MConnection> connections = findConnections();
440
      List<MJob> jobs = findJobs();
487
      List<MJob> jobs = findJobs();
441

    
   
488

   

    
   
489
      Validator validator = FrameworkManager.getInstance().getValidator();

    
   
490

   

    
   
491
      // lists to buffer invalid connections and jobs

    
   
492
      List<MConnection> invalidConnections = new ArrayList<MConnection>();

    
   
493
      List<MJob> invalidJobs = new ArrayList<MJob>();

    
   
494

   
442
      // -- BEGIN TXN --
495
      // -- BEGIN TXN --
443
      tx = getTransaction();
496
      tx = getTransaction();
444
      tx.begin();
497
      tx.begin();
445
      deleteConnectionsAndJobs(connections, jobs, tx);
498
      deleteConnectionsAndJobs(connections, jobs, tx);
446
      updateFramework(framework, tx);
499
      updateFramework(framework, tx);
[+20] [20] 6 lines
[+20] public final void upgradeFramework(MFramework framework) {
453
        MConnectionForms newConnectionForms = new MConnectionForms(forms);
506
        MConnectionForms newConnectionForms = new MConnectionForms(forms);
454
        upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms);
507
        upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms);
455
        MConnection newConnection = new MConnection(connection.getConnectorId(),
508
        MConnection newConnection = new MConnection(connection.getConnectorId(),
456
          connection.getConnectorPart(), newConnectionForms);
509
          connection.getConnectorPart(), newConnectionForms);
457
        newConnection.setPersistenceId(connectionID);
510
        newConnection.setPersistenceId(connectionID);

    
   
511

   

    
   
512
        Validation validation = validator.validateConnection(newConnection);

    
   
513
        if (validation.getStatus().canProceed()) {
458
        updateConnection(newConnection, tx);
514
          updateConnection(newConnection, tx);

    
   
515
        } else {

    
   
516
          invalidConnections.add(newConnection);

    
   
517
        }
459
      }
518
      }
460
      for (MJob job : jobs) {
519
      for (MJob job : jobs) {
461
        // Make a new copy of the forms from the framework,
520
        // Make a new copy of the forms from the framework,
462
        // else the values will get set in the forms in the connector for
521
        // else the values will get set in the forms in the connector for
463
        // each connection.
522
        // each connection.
464
        List<MForm> forms = framework.getJobForms(job.getType()).clone(false).getForms();
523
        List<MForm> forms = framework.getJobForms(job.getType()).clone(false).getForms();
465
        MJobForms newJobForms = new MJobForms(job.getType(), forms);
524
        MJobForms newJobForms = new MJobForms(job.getType(), forms);
466
        upgrader.upgrade(job.getFrameworkPart(), newJobForms);
525
        upgrader.upgrade(job.getFrameworkPart(), newJobForms);
467
        MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(),
526
        MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(),
468
          job.getType(), job.getConnectorPart(), newJobForms);
527
          job.getType(), job.getConnectorPart(), newJobForms);
469
        newJob.setPersistenceId(job.getPersistenceId());
528
        newJob.setPersistenceId(job.getPersistenceId());

    
   
529

   

    
   
530
        Validation validation = validator.validateJob(newJob.getType(), newJob);

    
   
531
        if (validation.getStatus().canProceed()) {
470
        updateJob(newJob, tx);
532
          updateJob(newJob, tx);

    
   
533
        } else {

    
   
534
          invalidJobs.add(newJob);

    
   
535
        }
471
      }
536
      }

    
   
537

   

    
   
538
      if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
472
      tx.commit();
539
        tx.commit();

    
   
540
      } else {

    
   
541
        String msg = "Metadata upgrade for job failed because of invalid Connections or Jobs.\n";

    
   
542

   

    
   
543
        if (invalidConnections.size() > 0) {

    
   
544
          msg += "Connections: ";

    
   
545
          for (MConnection connection : invalidConnections) {

    
   
546
            msg += connection.getPersistenceId() + ", ";

    
   
547
          }

    
   
548
          msg += "\n";

    
   
549
        }

    
   
550

   

    
   
551
        if (invalidJobs.size() > 0) {

    
   
552
          msg += "Jobs: ";

    
   
553
          for (MJob job : invalidJobs) {

    
   
554
            msg += job.getPersistenceId() + ", ";

    
   
555
          }

    
   
556
          msg += "\n";

    
   
557
        }

    
   
558

   

    
   
559
        throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);

    
   
560
      }
473
    } catch (Exception ex) {
561
    } catch (Exception ex) {
474
      if(tx != null) {
562
      if(tx != null) {
475
        tx.rollback();
563
        tx.rollback();
476
      }
564
      }
477
      throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
565
      throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
478
    } finally {
566
    } finally {
479
      if(tx != null) {
567
      if(tx != null) {
480
        tx.close();
568
        tx.close();
481
      }
569
      }
482
      LOG.info("Framework metadata upgrade finished");
570
      LOG.info("Framework metadata upgrade finished");
483
    }
571
    }
484
  }
572
  }
485
}
573
}
core/src/main/java/org/apache/sqoop/repository/RepositoryError.java
Revision c616889 New Change
 
  1. core/src/main/java/org/apache/sqoop/repository/Repository.java: Loading...
  2. core/src/main/java/org/apache/sqoop/repository/RepositoryError.java: Loading...