Review Board 1.7.22


Support for batch size in StressSource

Review Request #6949 - Created Sept. 7, 2012 and submitted

Ted Malaska
trunk
FLUME-1536
Reviewers
Flume
flume-git
Adding batchSize to StressSource along with junit to test it.  

Important note: the total number of events getting sent will equals maxTotalEvents, so if maxTotalEvents is not devisable by batchSize then the last batch will contain only the remainder of events needed to reach maxTotalEvents.

 

Diff revision 1

This is not the most recent revision of the diff. The latest diff is revision 3. See what's changed.

1 2 3
1 2 3

  1. flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java: Loading...
flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
Revision 5b73910 New Change
[20] 16 lines
[+20]
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19

    
   
19

   
20
package org.apache.flume.source;
20
package org.apache.flume.source;
21

    
   
21

   

    
   
22
import java.util.ArrayList;
22
import java.util.Arrays;
23
import java.util.Arrays;
23

    
   
24

   
24
import org.apache.flume.ChannelException;
25
import org.apache.flume.ChannelException;
25
import org.apache.flume.Context;
26
import org.apache.flume.Context;
26
import org.apache.flume.CounterGroup;
27
import org.apache.flume.CounterGroup;
[+20] [20] 14 lines
[+20]
41
  Configurable, PollableSource {
42
  Configurable, PollableSource {
42

    
   
43

   
43
  private static final Logger logger = LoggerFactory
44
  private static final Logger logger = LoggerFactory
44
      .getLogger(StressSource.class);
45
      .getLogger(StressSource.class);
45

    
   
46

   
46
  private CounterGroup counterGroup;
47
  CounterGroup counterGroup;
47
  private byte[] buffer;
48
  private byte[] buffer;
48
  private Event event;

   
49
  private long maxTotalEvents;
49
  private long maxTotalEvents;
50
  private long maxSuccessfulEvents;
50
  private long maxSuccessfulEvents;

    
   
51
  private int batchSize;

    
   
52
  private long lastSent = 0;

    
   
53
  Event event;

    
   
54
  ArrayList<Event> eventBatchList;
51

    
   
55

   
52
  public StressSource() {
56
  public StressSource() {
53
    counterGroup = new CounterGroup();
57
    counterGroup = new CounterGroup();
54

    
   
58

   
55
  }
59
  }

    
   
60

   

    
   
61
  /**

    
   
62
   * Read parameters from context

    
   
63
   * <li>-maxTotalEvents = type long that defines the total number of events to be sent

    
   
64
   * <li>-maxSuccessfulEvents = type long that defines the total number of events to be sent

    
   
65
   * <li>-size = type int that defines the number of bytes in each event

    
   
66
   * <li>-batchSize = type int that defines the number of events being sent in one batch

    
   
67
   */
56
  @Override
68
  @Override
57
  public void configure(Context context) {
69
  public void configure(Context context) {
58
    /* Limit on the total number of events. */
70
    /* Limit on the total number of events. */
59
    maxTotalEvents = context.getLong("maxTotalEvents", -1L);
71
    maxTotalEvents = context.getLong("maxTotalEvents", -1L);
60
    /* Limit on the total number of successful events. */
72
    /* Limit on the total number of successful events. */
61
    maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);
73
    maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);

    
   
74
    /* Set max events in a batch submission */

    
   
75
    batchSize = context.getInteger("batchSize", 1);
62
    /* Size of events to be generated. */
76
    /* Size of events to be generated. */
63
    int size = context.getInteger("size", 500);
77
    int size = context.getInteger("size", 500);
64
    buffer = new byte[size];
78

   

    
   
79
    prepEventData(size);

    
   
80
  }

    
   
81

   

    
   
82
  private void prepEventData(int bufferSize) {

    
   
83
    buffer = new byte[bufferSize];

    
   
84

   

    
   
85
    if (batchSize > 1) {

    
   
86
      //Create event objects in case of batch test

    
   
87
      eventBatchList = new ArrayList<Event>();

    
   
88

   

    
   
89
      for (int i = 0; i < batchSize; i++)

    
   
90
      {

    
   
91
        Arrays.fill(buffer, (byte)(Math.random() * Byte.MAX_VALUE));

    
   
92
        eventBatchList.add(EventBuilder.withBody(buffer));

    
   
93
      }

    
   
94
    } else {

    
   
95
      //Create single event in case of non-batch test
65
    Arrays.fill(buffer, Byte.MAX_VALUE);
96
      Arrays.fill(buffer, Byte.MAX_VALUE);
66
    event = EventBuilder.withBody(buffer);
97
      event = EventBuilder.withBody(buffer);
67
  }
98
    }

    
   
99
  }

    
   
100

   
68
  @Override
101
  @Override
69
  public Status process() throws EventDeliveryException {
102
  public Status process() throws EventDeliveryException {

    
   
103
    long totalEventSent = counterGroup.addAndGet("events.total", lastSent);

    
   
104

   
70
    if ((maxTotalEvents >= 0 &&
105
    if ((maxTotalEvents >= 0 &&
71
        counterGroup.incrementAndGet("events.total") > maxTotalEvents) ||
106
        totalEventSent >= maxTotalEvents) ||
72
        (maxSuccessfulEvents >= 0 &&
107
        (maxSuccessfulEvents >= 0 &&
73
        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
108
        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
74
      return Status.BACKOFF;
109
      return Status.BACKOFF;
75
    }
110
    }
76
    try {
111
    try {

    
   
112
      lastSent = batchSize;

    
   
113

   

    
   
114
      if (batchSize == 1) {
77
      getChannelProcessor().processEvent(event);
115
        getChannelProcessor().processEvent(event);
78
      counterGroup.incrementAndGet("events.successful");
116
      } else {

    
   
117
        long eventsLeft = maxTotalEvents - totalEventSent;

    
   
118

   

    
   
119
        if (eventsLeft < batchSize) {

    
   
120
          eventBatchList.subList(0, (int)eventsLeft - 1);

    
   
121
          lastSent = eventsLeft;

    
   
122
        }

    
   
123

   

    
   
124
        getChannelProcessor().processEventBatch(eventBatchList);

    
   
125
      }

    
   
126

   

    
   
127
      counterGroup.addAndGet("events.successful", lastSent);
79
    } catch (ChannelException ex) {
128
    } catch (ChannelException ex) {
80
      counterGroup.incrementAndGet("events.failed");
129
      counterGroup.addAndGet("events.failed", lastSent);
81
      return Status.BACKOFF;
130
      return Status.BACKOFF;
82
    }
131
    }
83
    return Status.READY;
132
    return Status.READY;
84
  }
133
  }
85

    
   
134

   
[+20] [20] 18 lines
flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
Revision 4ec16c7 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java: Loading...