Review Board 1.7.22


SQOOP-1184 Sqoop2: Improve error messages during metadata upgrade

Review Request #13795 - Created Aug. 24, 2013 and submitted

Jarek Cecho
SQOOP-1184
Reviewers
Sqoop
sqoop-sqoop2
I've added code that will iterate over the Validation object and log all validation errors and warnings. I've also removed the Lists of invalid objects to improve the memory footprint of the upgrade procedure.
Metadata upgrade is currently not covered by automatic tests, so this patch do not include them. Nevertheless I've verified the logging on a real cluster. We should add unit tests in separate JIRA.

Diff revision 1 (Latest)

  1. core/src/main/java/org/apache/sqoop/repository/Repository.java: Loading...
core/src/main/java/org/apache/sqoop/repository/Repository.java
Revision a7ccf10d948ae75e3ed2f5c1c19d1fec2d205aae New Change
[20] 28 lines
[+20]
29
import org.apache.sqoop.model.MConnector;
29
import org.apache.sqoop.model.MConnector;
30
import org.apache.sqoop.model.MForm;
30
import org.apache.sqoop.model.MForm;
31
import org.apache.sqoop.model.MFramework;
31
import org.apache.sqoop.model.MFramework;
32
import org.apache.sqoop.model.MJob;
32
import org.apache.sqoop.model.MJob;
33
import org.apache.sqoop.model.MJobForms;
33
import org.apache.sqoop.model.MJobForms;

    
   
34
import org.apache.sqoop.model.MPersistableEntity;
34
import org.apache.sqoop.model.MSubmission;
35
import org.apache.sqoop.model.MSubmission;
35
import org.apache.sqoop.utils.ClassUtils;
36
import org.apache.sqoop.utils.ClassUtils;
36
import org.apache.sqoop.validation.Validation;
37
import org.apache.sqoop.validation.Validation;
37
import org.apache.sqoop.validation.Validator;
38
import org.apache.sqoop.validation.Validator;
38

    
   
39

   
39
import java.util.ArrayList;
40
import java.util.ArrayList;
40
import java.util.Date;
41
import java.util.Date;
41
import java.util.List;
42
import java.util.List;

    
   
43
import java.util.Map;
42

    
   
44

   
43

    
   
45

   
44
/**
46
/**
45
 * Defines the contract of a Repository used by Sqoop. A Repository allows
47
 * Defines the contract of a Repository used by Sqoop. A Repository allows
46
 * Sqoop to store metadata, statistics and other state relevant to Sqoop
48
 * Sqoop to store metadata, statistics and other state relevant to Sqoop
[+20] [20] 352 lines
[+20] [+] public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
399
        ConnectorManager.getInstance().getConnector(newConnector
401
        ConnectorManager.getInstance().getConnector(newConnector
400
          .getUniqueName());
402
          .getUniqueName());
401

    
   
403

   
402
      Validator validator = connector.getValidator();
404
      Validator validator = connector.getValidator();
403

    
   
405

   
404
      // lists to buffer invalid connections and jobs
406
      boolean upgradeSuccessful = true;
405
      List<MConnection> invalidConnections = new ArrayList<MConnection>();

   
406
      List<MJob> invalidJobs = new ArrayList<MJob>();

   
407

    
   
407

   
408
      MetadataUpgrader upgrader = connector.getMetadataUpgrader();
408
      MetadataUpgrader upgrader = connector.getMetadataUpgrader();
409
      List<MConnection> connections = findConnectionsForConnector(
409
      List<MConnection> connections = findConnectionsForConnector(
410
        connectorID);
410
        connectorID);
411
      List<MJob> jobs = findJobsForConnector(connectorID);
411
      List<MJob> jobs = findJobsForConnector(connectorID);
[+20] [20] 20 lines
[+20] public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
432

    
   
432

   
433
        Validation validation = validator.validateConnection(newConfigurationObject);
433
        Validation validation = validator.validateConnection(newConfigurationObject);
434
        if (validation.getStatus().canProceed()) {
434
        if (validation.getStatus().canProceed()) {
435
          updateConnection(newConnection, tx);
435
          updateConnection(newConnection, tx);
436
        } else {
436
        } else {
437
          invalidConnections.add(newConnection);
437
          logInvalidModelObject("connection", newConnection, validation);

    
   
438
          upgradeSuccessful = false;
438
        }
439
        }
439
      }
440
      }
440
      for (MJob job : jobs) {
441
      for (MJob job : jobs) {
441
        // Make a new copy of the forms from the connector,
442
        // Make a new copy of the forms from the connector,
442
        // else the values will get set in the forms in the connector for
443
        // else the values will get set in the forms in the connector for
[+20] [20] 11 lines
[+20] public final void upgradeConnector(MConnector oldConnector, MConnector newConnector) {
454

    
   
455

   
455
        Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
456
        Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
456
        if (validation.getStatus().canProceed()) {
457
        if (validation.getStatus().canProceed()) {
457
          updateJob(newJob, tx);
458
          updateJob(newJob, tx);
458
        } else {
459
        } else {
459
          invalidJobs.add(newJob);
460
          logInvalidModelObject("job", newJob, validation);

    
   
461
          upgradeSuccessful = false;
460
        }
462
        }
461
      }
463
      }
462

    
   
464

   
463
      if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
465
      if (upgradeSuccessful) {
464
        tx.commit();
466
        tx.commit();
465
      } else {
467
      } else {
466
        String msg = "Metadata upgrade for connector failed because of invalid Connections or Jobs.\n";
468
        throw new SqoopException(RepositoryError.JDBCREPO_0027);
467

    
   

   
468
        if (invalidConnections.size() > 0) {

   
469
          msg += "Connections: ";

   
470
          for (MConnection connection : invalidConnections) {

   
471
            msg += connection.getPersistenceId() + ", ";

   
472
          }

   
473
          msg += "\n";

   
474
        }

   
475

    
   

   
476
        if (invalidJobs.size() > 0) {

   
477
          msg += "Jobs: ";

   
478
          for (MJob job : invalidJobs) {

   
479
            msg += job.getPersistenceId() + ", ";

   
480
          }

   
481
          msg += "\n";

   
482
        }

   
483

    
   

   
484
        throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);

   
485
      }
469
      }
486
    } catch (SqoopException ex) {
470
    } catch (SqoopException ex) {
487
      if(tx != null) {
471
      if(tx != null) {
488
        tx.rollback();
472
        tx.rollback();
489
      }
473
      }
[+20] [20] 20 lines
[+20] [+] public final void upgradeFramework(MFramework framework) {
510
      List<MConnection> connections = findConnections();
494
      List<MConnection> connections = findConnections();
511
      List<MJob> jobs = findJobs();
495
      List<MJob> jobs = findJobs();
512

    
   
496

   
513
      Validator validator = FrameworkManager.getInstance().getValidator();
497
      Validator validator = FrameworkManager.getInstance().getValidator();
514

    
   
498

   
515
      // lists to buffer invalid connections and jobs
499
      boolean upgradeSuccessful = true;
516
      List<MConnection> invalidConnections = new ArrayList<MConnection>();

   
517
      List<MJob> invalidJobs = new ArrayList<MJob>();

   
518

    
   
500

   
519
      // -- BEGIN TXN --
501
      // -- BEGIN TXN --
520
      tx = getTransaction();
502
      tx = getTransaction();
521
      tx.begin();
503
      tx.begin();
522
      deleteConnectionsAndJobs(connections, jobs, tx);
504
      deleteConnectionsAndJobs(connections, jobs, tx);
[+20] [20] 16 lines
[+20] public final void upgradeFramework(MFramework framework) {
539

    
   
521

   
540
        Validation validation = validator.validateConnection(newConfigurationObject);
522
        Validation validation = validator.validateConnection(newConfigurationObject);
541
        if (validation.getStatus().canProceed()) {
523
        if (validation.getStatus().canProceed()) {
542
          updateConnection(newConnection, tx);
524
          updateConnection(newConnection, tx);
543
        } else {
525
        } else {
544
          invalidConnections.add(newConnection);
526
          logInvalidModelObject("connection", newConnection, validation);

    
   
527
          upgradeSuccessful = false;
545
        }
528
        }
546
      }
529
      }
547
      for (MJob job : jobs) {
530
      for (MJob job : jobs) {
548
        // Make a new copy of the forms from the framework,
531
        // Make a new copy of the forms from the framework,
549
        // else the values will get set in the forms in the connector for
532
        // else the values will get set in the forms in the connector for
[+20] [20] 11 lines
[+20] public final void upgradeFramework(MFramework framework) {
561

    
   
544

   
562
        Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
545
        Validation validation = validator.validateJob(newJob.getType(), newConfigurationObject);
563
        if (validation.getStatus().canProceed()) {
546
        if (validation.getStatus().canProceed()) {
564
          updateJob(newJob, tx);
547
          updateJob(newJob, tx);
565
        } else {
548
        } else {
566
          invalidJobs.add(newJob);
549
          logInvalidModelObject("job", newJob, validation);

    
   
550
          upgradeSuccessful = false;
567
        }
551
        }
568
      }
552
      }
569

    
   
553

   
570
      if (invalidConnections.size() == 0 && invalidJobs.size() == 0) {
554
      if (upgradeSuccessful) {
571
        tx.commit();
555
        tx.commit();
572
      } else {
556
      } else {
573
        String msg = "Metadata upgrade for job failed because of invalid Connections or Jobs.\n";
557
        throw new SqoopException(RepositoryError.JDBCREPO_0027);
574

    
   

   
575
        if (invalidConnections.size() > 0) {

   
576
          msg += "Connections: ";

   
577
          for (MConnection connection : invalidConnections) {

   
578
            msg += connection.getPersistenceId() + ", ";

   
579
          }

   
580
          msg += "\n";

   
581
        }

   
582

    
   

   
583
        if (invalidJobs.size() > 0) {

   
584
          msg += "Jobs: ";

   
585
          for (MJob job : invalidJobs) {

   
586
            msg += job.getPersistenceId() + ", ";

   
587
          }

   
588
          msg += "\n";

   
589
        }

   
590

    
   

   
591
        throw new SqoopException(RepositoryError.JDBCREPO_0027, msg);

   
592
      }
558
      }
593
    } catch (SqoopException ex) {
559
    } catch (SqoopException ex) {
594
      if(tx != null) {
560
      if(tx != null) {
595
        tx.rollback();
561
        tx.rollback();
596
      }
562
      }
[+20] [20] 8 lines
[+20] public final void upgradeFramework(MFramework framework) {
605
        tx.close();
571
        tx.close();
606
      }
572
      }
607
      LOG.info("Framework metadata upgrade finished");
573
      LOG.info("Framework metadata upgrade finished");
608
    }
574
    }
609
  }
575
  }

    
   
576

   

    
   
577
  private void logInvalidModelObject(String objectType, MPersistableEntity entity, Validation validation) {

    
   
578
    LOG.error("Upgrader created invalid " + objectType + " with id" + entity.getPersistenceId());

    
   
579

   

    
   
580
    for(Map.Entry<Validation.FormInput, Validation.Message> entry : validation.getMessages().entrySet()) {

    
   
581
      LOG.error("\t" + entry.getKey() + ": " + entry.getValue());

    
   
582
    }

    
   
583
  }
610
}
584
}
  1. core/src/main/java/org/apache/sqoop/repository/Repository.java: Loading...