Review Board 1.7.22


Support for batch size in StressSource

Review Request #6949 - Created Sept. 7, 2012 and submitted

Ted Malaska
trunk
FLUME-1536
Reviewers
Flume
flume-git
Adding batchSize to StressSource along with junit to test it.  

Important note: the total number of events getting sent will equals maxTotalEvents, so if maxTotalEvents is not devisable by batchSize then the last batch will contain only the remainder of events needed to reach maxTotalEvents.

 
flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java
Revision 5b73910 New Change
[20] 16 lines
[+20]
17
 * under the License.
17
 * under the License.
18
 */
18
 */
19

    
   
19

   
20
package org.apache.flume.source;
20
package org.apache.flume.source;
21

    
   
21

   

    
   
22
import java.util.ArrayList;
22
import java.util.Arrays;
23
import java.util.Arrays;
23

    
   
24

   
24
import org.apache.flume.ChannelException;
25
import org.apache.flume.ChannelException;
25
import org.apache.flume.Context;
26
import org.apache.flume.Context;
26
import org.apache.flume.CounterGroup;
27
import org.apache.flume.CounterGroup;
[+20] [20] 16 lines
[+20] [+] public class StressSource extends AbstractSource implements
43
  private static final Logger logger = LoggerFactory
44
  private static final Logger logger = LoggerFactory
44
      .getLogger(StressSource.class);
45
      .getLogger(StressSource.class);
45

    
   
46

   
46
  private CounterGroup counterGroup;
47
  private CounterGroup counterGroup;
47
  private byte[] buffer;
48
  private byte[] buffer;
48
  private Event event;

   
49
  private long maxTotalEvents;
49
  private long maxTotalEvents;
50
  private long maxSuccessfulEvents;
50
  private long maxSuccessfulEvents;

    
   
51
  private int batchSize;

    
   
52
  private long lastSent = 0;

    
   
53
  private Event event;

    
   
54
  private ArrayList<Event> eventBatchList;
51

    
   
55

   
52
  public StressSource() {
56
  public StressSource() {
53
    counterGroup = new CounterGroup();
57
    counterGroup = new CounterGroup();
54

    
   
58

   
55
  }
59
  }

    
   
60

   

    
   
61
  /**

    
   
62
   * Read parameters from context

    
   
63
   * <li>-maxTotalEvents = type long that defines the total number of events to be sent

    
   
64
   * <li>-maxSuccessfulEvents = type long that defines the total number of events to be sent

    
   
65
   * <li>-size = type int that defines the number of bytes in each event

    
   
66
   * <li>-batchSize = type int that defines the number of events being sent in one batch

    
   
67
   */
56
  @Override
68
  @Override
57
  public void configure(Context context) {
69
  public void configure(Context context) {
58
    /* Limit on the total number of events. */
70
    /* Limit on the total number of events. */
59
    maxTotalEvents = context.getLong("maxTotalEvents", -1L);
71
    maxTotalEvents = context.getLong("maxTotalEvents", -1L);
60
    /* Limit on the total number of successful events. */
72
    /* Limit on the total number of successful events. */
61
    maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);
73
    maxSuccessfulEvents = context.getLong("maxSuccessfulEvents", -1L);

    
   
74
    /* Set max events in a batch submission */

    
   
75
    batchSize = context.getInteger("batchSize", 1);
62
    /* Size of events to be generated. */
76
    /* Size of events to be generated. */
63
    int size = context.getInteger("size", 500);
77
    int size = context.getInteger("size", 500);
64
    buffer = new byte[size];
78

   

    
   
79
    prepEventData(size);

    
   
80
  }

    
   
81

   

    
   
82
  private void prepEventData(int bufferSize) {

    
   
83
    buffer = new byte[bufferSize];
65
    Arrays.fill(buffer, Byte.MAX_VALUE);
84
    Arrays.fill(buffer, Byte.MAX_VALUE);

    
   
85

   

    
   
86
    if (batchSize > 1) {

    
   
87
      //Create event objects in case of batch test

    
   
88
      eventBatchList = new ArrayList<Event>();

    
   
89

   

    
   
90
      for (int i = 0; i < batchSize; i++)

    
   
91
      {

    
   
92
        eventBatchList.add(EventBuilder.withBody(buffer));

    
   
93
      }

    
   
94
    } else {

    
   
95
      //Create single event in case of non-batch test
66
    event = EventBuilder.withBody(buffer);
96
      event = EventBuilder.withBody(buffer);
67
  }
97
    }

    
   
98
  }

    
   
99

   
68
  @Override
100
  @Override
69
  public Status process() throws EventDeliveryException {
101
  public Status process() throws EventDeliveryException {

    
   
102
    long totalEventSent = counterGroup.addAndGet("events.total", lastSent);

    
   
103

   
70
    if ((maxTotalEvents >= 0 &&
104
    if ((maxTotalEvents >= 0 &&
71
        counterGroup.incrementAndGet("events.total") > maxTotalEvents) ||
105
        totalEventSent >= maxTotalEvents) ||
72
        (maxSuccessfulEvents >= 0 &&
106
        (maxSuccessfulEvents >= 0 &&
73
        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
107
        counterGroup.get("events.successful") >= maxSuccessfulEvents)) {
74
      return Status.BACKOFF;
108
      return Status.BACKOFF;
75
    }
109
    }
76
    try {
110
    try {

    
   
111
      lastSent = batchSize;

    
   
112

   

    
   
113
      if (batchSize == 1) {
77
      getChannelProcessor().processEvent(event);
114
        getChannelProcessor().processEvent(event);
78
      counterGroup.incrementAndGet("events.successful");
115
      } else {

    
   
116
        long eventsLeft = maxTotalEvents - totalEventSent;

    
   
117

   

    
   
118
        if (eventsLeft < batchSize) {

    
   
119
          eventBatchList.subList(0, (int)eventsLeft - 1);

    
   
120
          lastSent = eventsLeft;

    
   
121
        }

    
   
122

   

    
   
123
        getChannelProcessor().processEventBatch(eventBatchList);

    
   
124
      }

    
   
125

   

    
   
126
      counterGroup.addAndGet("events.successful", lastSent);
79
    } catch (ChannelException ex) {
127
    } catch (ChannelException ex) {
80
      counterGroup.incrementAndGet("events.failed");
128
      counterGroup.addAndGet("events.failed", lastSent);
81
      return Status.BACKOFF;
129
      return Status.BACKOFF;
82
    }
130
    }
83
    return Status.READY;
131
    return Status.READY;
84
  }
132
  }
85

    
   
133

   
[+20] [20] 18 lines
flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java
Revision 4ec16c7 New Change
 
  1. flume-ng-core/src/main/java/org/apache/flume/source/StressSource.java: Loading...
  2. flume-ng-core/src/test/java/org/apache/flume/source/TestStressSource.java: Loading...