Review Board 1.7.22


FLUME-2198. Avro Source must throw and disable itself if ipFilterRules contains invalid rules

Review Request #14246 - Created Sept. 20, 2013 and updated

Hari Shreedharan
FLUME-2198
Reviewers
Flume
flume-git
Modified the way rules get configured. Modified tests to support this.
All tests pass

Diff revision 3 (Latest)

1 2 3
1 2 3

  1. flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
Revision f6e4cfe New Change
[20] 56 lines
[+20]
57
import org.jboss.netty.channel.ChannelPipeline;
57
import org.jboss.netty.channel.ChannelPipeline;
58
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
58
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
59
import org.jboss.netty.channel.Channels;
59
import org.jboss.netty.channel.Channels;
60
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
60
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
61
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
61
import org.jboss.netty.handler.codec.compression.ZlibEncoder;

    
   
62
import org.jboss.netty.handler.ipfilter.IpFilterRule;
62
import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler;
63
import org.jboss.netty.handler.ipfilter.IpFilterRuleHandler;
63
import org.jboss.netty.handler.ipfilter.PatternRule;
64
import org.jboss.netty.handler.ipfilter.PatternRule;
64
import org.jboss.netty.handler.ssl.SslHandler;
65
import org.jboss.netty.handler.ssl.SslHandler;
65
import org.slf4j.Logger;
66
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
67
import org.slf4j.LoggerFactory;
[+20] [20] 84 lines
[+20] [+] public class AvroSource extends AbstractSource implements EventDrivenSource,
151
  private SourceCounter sourceCounter;
152
  private SourceCounter sourceCounter;
152

    
   
153

   
153
  private int maxThreads;
154
  private int maxThreads;
154
  private ScheduledExecutorService connectionCountUpdater;
155
  private ScheduledExecutorService connectionCountUpdater;
155

    
   
156

   

    
   
157
  private List<IpFilterRule> rules;

    
   
158

   
156
  @Override
159
  @Override
157
  public void configure(Context context) {
160
  public void configure(Context context) {
158
    Configurables.ensureRequiredNonNull(context, PORT_KEY, BIND_KEY);
161
    Configurables.ensureRequiredNonNull(context, PORT_KEY, BIND_KEY);
159

    
   
162

   
160
    port = context.getInteger(PORT_KEY);
163
    port = context.getInteger(PORT_KEY);
[+20] [20] 28 lines
[+20] public void configure(Context context) {
189

    
   
192

   
190
    enableIpFilter = context.getBoolean(IP_FILTER_KEY, false);
193
    enableIpFilter = context.getBoolean(IP_FILTER_KEY, false);
191
    if (enableIpFilter) {
194
    if (enableIpFilter) {
192
      patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
195
      patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
193
      if (patternRuleConfigDefinition == null ||
196
      if (patternRuleConfigDefinition == null ||
194
        patternRuleConfigDefinition.isEmpty()) {
197
        patternRuleConfigDefinition.trim().isEmpty()) {
195
        throw new FlumeException(
198
        throw new FlumeException(
196
          "ipFilter is configured with true but ipFilterRules is not defined:" +
199
          "ipFilter is configured with true but ipFilterRules is not defined:" +
197
            " ");
200
            " ");
198
      }
201
      }
Moved from 454

    
   
202
      String[] patternRuleDefinitions = patternRuleConfigDefinition.split(
Moved from 455

    
   
203
        ",");

    
   
204
      rules = new ArrayList<IpFilterRule>(patternRuleDefinitions.length);

    
   
205
      for (String patternRuleDefinition : patternRuleDefinitions) {

    
   
206
        rules.add(generateRule(patternRuleDefinition));

    
   
207
      }
199
    }
208
    }
200

    
   
209

   
201
    if (sourceCounter == null) {
210
    if (sourceCounter == null) {
202
      sourceCounter = new SourceCounter(getName());
211
      sourceCounter = new SourceCounter(getName());
203
    }
212
    }
[+20] [20] 163 lines
[+20] [+] public Status appendBatch(List<AvroFlumeEvent> events) {
367
    sourceCounter.addToEventAcceptedCount(events.size());
376
    sourceCounter.addToEventAcceptedCount(events.size());
368

    
   
377

   
369
    return Status.OK;
378
    return Status.OK;
370
  }
379
  }
371

    
   
380

   

    
   
381
  private PatternRule generateRule(
Moved from 479

    
   
382
    String patternRuleDefinition) throws FlumeException {
Moved from 480

    
   
383
    patternRuleDefinition = patternRuleDefinition.trim();

    
   
384
    //first validate the format
Moved from 483

    
   
385
    int firstColonIndex = patternRuleDefinition.indexOf(":");
Moved from 484

    
   
386
    if (firstColonIndex == -1) {

    
   
387
      throw new FlumeException(
Moved from 486

    
   
388
        "Invalid ipFilter patternRule '" + patternRuleDefinition +
Moved from 487

    
   
389
          "' should look like <'allow'  or 'deny'>:<'ip' or " +
Moved from 488

    
   
390
          "'name'>:<pattern>");

    
   
391
    } else {

    
   
392
      String ruleAccessFlag = patternRuleDefinition.substring(0,

    
   
393
        firstColonIndex);

    
   
394
      int secondColonIndex = patternRuleDefinition.indexOf(":",

    
   
395
        firstColonIndex + 1);

    
   
396
      if ((!ruleAccessFlag.equals("allow") &&

    
   
397
        !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) {

    
   
398
        throw new FlumeException(
Moved from 486

    
   
399
          "Invalid ipFilter patternRule '" + patternRuleDefinition +
Moved from 487

    
   
400
            "' should look like <'allow'  or 'deny'>:<'ip' or " +
Moved from 488

    
   
401
            "'name'>:<pattern>");

    
   
402
      }

    
   
403

   
Moved from 505

    
   
404
      String patternTypeFlag = patternRuleDefinition.substring(
Moved from 506

    
   
405
        firstColonIndex + 1, secondColonIndex);
Moved from 507

    
   
406
      if ((!patternTypeFlag.equals("ip") &&
Moved from 508

    
   
407
        !patternTypeFlag.equals("name"))) {

    
   
408
        throw new FlumeException(
Moved from 486

    
   
409
          "Invalid ipFilter patternRule '" + patternRuleDefinition +
Moved from 487

    
   
410
            "' should look like <'allow'  or 'deny'>:<'ip' or " +
Moved from 488

    
   
411
            "'name'>:<pattern>");

    
   
412
      }

    
   
413

   

    
   
414
      boolean isAllow = ruleAccessFlag.equals("allow");

    
   
415
      String patternRuleString = (patternTypeFlag.equals("ip") ? "i" : "n")

    
   
416
        + ":" + patternRuleDefinition.substring(secondColonIndex + 1);

    
   
417
      logger.info("Adding ipFilter PatternRule: "

    
   
418
        + (isAllow ? "Allow" : "deny") + " " + patternRuleString);
Moved from 523

    
   
419
      return new PatternRule(isAllow, patternRuleString);
Moved from 524

    
   
420
    }
Moved from 525

    
   
421
  }

    
   
422

   
372
  /**
423
  /**
373
   * Factory of SSL-enabled server worker channel pipelines
424
   * Factory of SSL-enabled server worker channel pipelines
374
   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
425
   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
375
   */
426
   */
376
  private static class AdvancedChannelPipelineFactory
427
  private class AdvancedChannelPipelineFactory
377
      implements ChannelPipelineFactory {
428
      implements ChannelPipelineFactory {
378

    
   
429

   
379
    private boolean enableCompression;
430
    private boolean enableCompression;
380
    private boolean enableSsl;
431
    private boolean enableSsl;
381
    private String keystore;
432
    private String keystore;
[+20] [20] 64 lines
[+20] [+] public ChannelPipeline getPipeline() throws Exception {
446
      if (enableIpFilter) {
497
      if (enableIpFilter) {
447

    
   
498

   
448
        logger.info("Setting up ipFilter with the following rule definition: " +
499
        logger.info("Setting up ipFilter with the following rule definition: " +
449
          patternRuleConfigDefinition);
500
          patternRuleConfigDefinition);
450
        IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
501
        IpFilterRuleHandler ipFilterHandler = new IpFilterRuleHandler();
451

    
   
502
        ipFilterHandler.addAll(rules);
452
        if (patternRuleConfigDefinition != null &&

   
453
          !patternRuleConfigDefinition.isEmpty()) {

   
454
          String[] patternRuleDefinitions = patternRuleConfigDefinition.split(
Moved to 202

   
455
            ",");
Moved to 203

   
456
          for (String patternRuleDefinition : patternRuleDefinitions) {

   
457

    
   

   
458
            PatternRule patternRule

   
459
              = PatternRuleBuilder.withConfigRuleDefinition(

   
460
              patternRuleDefinition);

   
461

    
   

   
462
            if (patternRule != null) {

   
463
              ipFilterHandler.add(patternRule);

   
464
            }

   
465
          }

   
466
        }

   
467

    
   

   
468
        logger.info(
503
        logger.info(
469
          "Adding ipFilter with " + ipFilterHandler.size() + " rules");
504
          "Adding ipFilter with " + ipFilterHandler.size() + " rules");
470

    
   
505

   
471
        pipeline.addFirst("ipFilter", ipFilterHandler);
506
        pipeline.addFirst("ipFilter", ipFilterHandler);
472
      }
507
      }
473

    
   
508

   
474
      return pipeline;
509
      return pipeline;
475
    }
510
    }
476

    
   

   
477
    public static class PatternRuleBuilder {

   
478
      public static PatternRule withConfigRuleDefinition(

   
479
        String patternRuleDefinition) throws FlumeException {
Moved to 382

   
480
        patternRuleDefinition = patternRuleDefinition.trim();
Moved to 383

   
481
        //first validation the format

   
482

    
   

   
483
        int firstColonIndex = patternRuleDefinition.indexOf(":");
Moved to 385

   
484
        if (firstColonIndex == -1) {
Moved to 386

   
485
          logger.error(

   
486
            "Invalid ipFilter patternRule '" + patternRuleDefinition +
Moved to 409

   
487
              "' should look like <'allow'  or 'deny'>:<'ip' or " +
Moved to 410

   
488
              "'name'>:<pattern>");
Moved to 411

   
489
          return null;

   
490
        } else {

   
491

    
   

   
492
          String ruleAccessFlag = patternRuleDefinition.substring(0,

   
493
            firstColonIndex);

   
494
          int secondColonIndex = patternRuleDefinition.indexOf(":",

   
495
            firstColonIndex + 1);

   
496
          if ((!ruleAccessFlag.equals("allow") &&

   
497
            !ruleAccessFlag.equals("deny")) || secondColonIndex == -1) {

   
498
            logger.error(

   
499
              "Invalid ipFilter patternRule '" + patternRuleDefinition +

   
500
                "' should look like <'allow'  or 'deny'>:<'ip' or " +

   
501
                "'name'>:<pattern>");

   
502
            return null;

   
503
          }

   
504

    
   

   
505
          String patternTypeFlag = patternRuleDefinition.substring(
Moved to 404

   
506
            firstColonIndex + 1, secondColonIndex);
Moved to 405

   
507
          if ((!patternTypeFlag.equals("ip") &&
Moved to 406

   
508
            !patternTypeFlag.equals("name"))) {
Moved to 407

   
509
            logger.error(

   
510
              "Invalid ipFilter patternRule '" + patternRuleDefinition +

   
511
                "' should look like <'allow'  or 'deny'>:<'ip' or " +

   
512
                "'name'>:<pattern>");

   
513
            return null;

   
514
          }

   
515

    
   

   
516
          boolean isAllow = ruleAccessFlag.equals("allow");

   
517
          String patternRuleString =

   
518
            (patternTypeFlag.equals("ip") ? "i" : "n") + ":" +

   
519
              patternRuleDefinition.substring(secondColonIndex + 1);

   
520
          logger.info("Adding ipFilter PatternRule: "

   
521
            + (isAllow ? "Allow" : "deny") +

   
522
            " " + patternRuleString);

   
523
          return new PatternRule(isAllow, patternRuleString);
Moved to 419

   
524
        }
Moved to 420

   
525
      }
Moved to 421

   
526
    }

   
527

    
   

   
528
  }
511
  }
529
}
512
}
flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
Revision e208fff New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java: Loading...