Review Board 1.7.22


FLUME-1490: Option to limit number of events sent in Stress source

Review Request #6673 - Created Aug. 17, 2012 and updated

Patrick Wendell
FLUME-1490
Reviewers
Flume
flume-git
This patch implements a count limit, both for overall (successful + failed) events and for successful events. Seems like both could be useful in testing.
Includes unit test and a bit of code clean-up.

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 2. See what's changed.

1 2
1 2

  1. flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
Revision 4f7b255 New Change
[20] 29 lines
[+20]
30
import org.apache.flume.conf.Configurable;
30
import org.apache.flume.conf.Configurable;
31
import org.apache.flume.event.EventBuilder;
31
import org.apache.flume.event.EventBuilder;
32
import org.slf4j.Logger;
32
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
33
import org.slf4j.LoggerFactory;
34

    
   
34

   

    
   
35
/**

    
   
36
 * Internal load-generating source implementation. Useful for tests.

    
   
37
 *

    
   
38
 * See {@link StressSource#configure(Context)} for configuration options.

    
   
39
 */
35
public class StressSource extends AbstractSource implements
40
public class StressSource extends AbstractSource implements
36
  Configurable, PollableSource {
41
  Configurable, PollableSource {
37

    
   
42

   
38
  private static final Logger logger = LoggerFactory
43
  private static final Logger logger = LoggerFactory
39
      .getLogger(StressSource.class);
44
      .getLogger(StressSource.class);
40

    
   
45

   
41
  private CounterGroup counterGroup;
46
  private CounterGroup counterGroup;
42
  private byte[] buffer;
47
  private byte[] buffer;
43
  private Event event;
48
  protected Event event; // exposed for package-level tests

    
   
49
  private long maxTotalEvents;

    
   
50
  private long maxSuccessfulEvents;
44

    
   
51

   
45
  public StressSource() {
52
  public StressSource() {
46
    counterGroup = new CounterGroup();
53
    counterGroup = new CounterGroup();
47

    
   
54

   
48
  }
55
  }
49
  @Override
56
  @Override
50
  public void configure(Context context) {
57
  public void configure(Context context) {

    
   
58
    /* Limit on the total number of events. */

    
   
59
    maxTotalEvents = context.getLong("maxTotalEvents", -1L);

    
   
60
    /* Limit on the total number of successful events. */

    
   
61
    maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);

    
   
62
    /* Size of events to be generated. */
51
    int size = context.getInteger("size", 500);
63
    int size = context.getInteger("size", 500);
52
    buffer = new byte[size];
64
    buffer = new byte[size];
53
    Arrays.fill(buffer, Byte.MAX_VALUE);
65
    Arrays.fill(buffer, Byte.MAX_VALUE);
54
    event = EventBuilder.withBody(buffer);
66
    event = EventBuilder.withBody(buffer);
55
  }
67
  }
56
  @Override
68
  @Override
57
  public Status process() throws EventDeliveryException {
69
  public Status process() throws EventDeliveryException {

    
   
70
    if ((maxTotalEvents >= 0 &&

    
   
71
        counterGroup.incrementAndGet("events.total") > maxTotalEvents) ||

    
   
72
        (maxSuccessfulEvents >= 0 &&

    
   
73
        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {

    
   
74
      return Status.BACKOFF;

    
   
75
    }
58
    try {
76
    try {
59
      getChannelProcessor().processEvent(event);
77
      getChannelProcessor().processEvent(event);
60
      counterGroup.incrementAndGet("events.successful");
78
      counterGroup.incrementAndGet("events.successful");
61
    } catch (ChannelException ex) {
79
    } catch (ChannelException ex) {
62
      counterGroup.incrementAndGet("events.failed");
80
      counterGroup.incrementAndGet("events.failed");
[+20] [20] 24 lines
flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
New File
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java: Loading...